Demo entry 5865369

pythoncode

   

Submitted by anonymous on Aug 19, 2016 at 06:22
Language: Python. Code size: 14.2 kB.

#!/usr/bin/env python
# 
# Prebuild cache for data entry tasks
#
import numpy as np
import skimage.exposure
from PIL import Image, ImageDraw

from kinect.recordings.models import Recording
from kinect.calibration.mapping import CameraMapping
from kinect.stream_processor.frame_cropping import type_map
from kinect.core.precompute import ImageCache, CacheWriter, BodyCache
from kinect.calibration.bounding_box import get_basler_bbox_from_depth_bbox, get_kinect_bbox


#######################################################################################################
#
# Data Entry Image Cache
#
#######################################################################################################

class BodyBagCache(ImageCache):
    data_path = "data_entry/%(kinect_id)s/%(kinect_id)s_%(identifier)s/body-bags/images-%(source)s.dat"
    index_path = "data_entry/%(kinect_id)s/%(kinect_id)s_%(identifier)s/body-bags/images-%(source)s.idx"

    # Guess (see comments in FaceCache)
    preallocate_size = 5 * (1024 * 1024 * 1024)

    # One body may have more than one bag
    # so we need to add bag index into the index key fields
    index_key_fields = ['kinect_ts', 'body_number', 'bag_index']
    # We also want to store bounding boxes info in the idx file
    # bboxes is a dict of body, bag and strap's bbox in list format (ltrb)
    index_data_fields = ['shape', 'offset', 'bboxes']


class BodyShoesCache(ImageCache):
    data_path = "data_entry/%(kinect_id)s/%(kinect_id)s_%(identifier)s/body-shoes/images-%(source)s.dat"
    index_path = "data_entry/%(kinect_id)s/%(kinect_id)s_%(identifier)s/body-shoes/images-%(source)s.idx"

    # Guess (see comments in FaceCache)
    preallocate_size = 5 * (1024 * 1024 * 1024)

    # One cache image, one body's all shoes
    # bboxes is a dict of 1st/2nd shoes bbox in list format (ltrb)
    index_data_fields = ['shape', 'offset', 'bboxes']


class BodySkeletonCache(ImageCache):
    data_path = "data_entry/%(kinect_id)s/%(kinect_id)s_%(identifier)s/body-skeletons/images-%(source)s.dat"
    index_path = "data_entry/%(kinect_id)s/%(kinect_id)s_%(identifier)s/body-skeletons/images-%(source)s.idx"

    # Guess (see comments in FaceCache)
    preallocate_size = 100 * (1024 * 1024 * 1024)


class PoseEstimationCache(ImageCache):
    data_path = "data_entry/%(kinect_id)s/%(kinect_id)s_%(identifier)s/pose-estimation/images-%(source)s.dat"
    index_path = "data_entry/%(kinect_id)s/%(kinect_id)s_%(identifier)s/pose-estimation/images-%(source)s.idx"

    # Guess (see comments in FaceCache), using 20 GB now
    preallocate_size = 20 * (1024 * 1024 * 1024)



#######################################################################################################
#
# Data Entry Image Cache Writer
#
#######################################################################################################

class BodyBagCacheWriter(CacheWriter):
    cache_type = BodyBagCache


class BodyShoesCacheWriter(CacheWriter):
    cache_type = BodyShoesCache


class BodySkeletonCacheWriter(CacheWriter):
    cache_type = BodySkeletonCache

class PoseEstimationCacheWriter(CacheWriter):
    cache_type = PoseEstimationCache



##################################################################################
#
# Functions to generate cache files
#
##################################################################################

def generate_body_bag_cache(kid, vid, image_src):
    """ To generate an oversized full body with bag bboxes image dat/idx cache file
    Steps: 
        1) Get all BagBoundingV3 results per body for recording (kid, vid), by
             min_response = 2
             min_agreement = 0.66
             min_intersection_ratio = 0.7
             max_union_ratio = 1.3
           besides, filtered bbox needs to have a "bag" bounding box for classification
        2) Get each body's body bounding box
        3) Get a union area of body bbox and bag/strap bbox
        4) Draw each bag/strap bbox on the full frame image
        4) Crop full image by the union bbox and saved data into cache
    """
    from kinect.data_entry.models import BagBoundingV3
    from kinect.data_entry.utils import get_bag_consensus_bboxes

    recording = Recording.objects.get(
        installation__kinect_camera__kinect_id = kid,
        identifier = vid
    )

    bbs = BagBoundingV3.objects.filter(
        assignment__is_test = False,
        assignment__state = 'complete',
        assignment__parameters__image_src = image_src,
        recording__installation__kinect_camera__kinect_id = kid,
        recording__identifier = vid,
    ).select_related(
        'recording',
        'process_user'
    ).prefetch_related(
        'recording__installation__kinect_camera'
    )

    if not bbs.exists():
        print "No Data Entry Results!"
        return

    # Check if cache if already existing
    if BodyBagCache.cache_files_exist(kid, vid, image_src):
        print "Cache files already exist, please check!"
        return

    image_type = type_map[image_src]
    kvf = recording.get_video_file(type=image_type)
    camera_mapping = CameraMapping(recording.installation)

    # Get a list of body which has valid bag bounding box
    body_list = {}
    for bb in bbs:
        kts = bb.kinect_timestamp
        bno = bb.body_number
        if (kts, bno) not in body_list.keys():
            # See if we can get a valid bag/strap bounding boxes
            bag_bboxes = get_bag_consensus_bboxes(kid, vid, kts, bno, image_src)
            body_list[(kts, bno)] = bag_bboxes 

    # Remove empty bag bboxes in body_list
    body_list = {k:v for k,v in body_list.iteritems() if v}

    if not body_list:
        print "No Qualified Bag Bounding Results for %s" % recording
        return

    # Create body bag cache 
    writer = BodyBagCacheWriter(kid, vid, image_src)
    writer.create_cache_dat_file()

    for (kts, bno), bag_bboxes in body_list.iteritems():
        video_skeleton = recording.video_skeletons.get(kinect_timestamp=kts)
        body_data = video_skeleton.body_data[str(bno)]
        source_img = kvf.get_frame_img_nprgb(kts)
        # Get body bounding box
        if image_type == 'l':
            # TODO: Find a more generic way to do this
            # Adjust the brightness of basler img
            source_img = skimage.exposure.adjust_gamma(source_img, gamma=0.75)
            # Get basler body bbox
            joints = body_data['joints']
            z_val = int(joints['Head']['position']['z'] * 1000)
            depth_bbox = get_kinect_bbox(body_data, 'body', 'd')
            body_bbox = get_basler_bbox_from_depth_bbox(camera_mapping, depth_bbox, z_val, source_space='d')
        else:
            body_bbox = get_kinect_bbox(body_data, 'body', image_type)

        for bag_index, bag_bbox in enumerate(bag_bboxes):
            # Get a union area for body, bag and strap bounding box
            # Draw bag/strap bounding boxes on full frame image
            img = Image.fromarray(source_img, mode='RGB')
            draw = ImageDraw.Draw(img)

            # For saving body, bag, strap bboxes into cache idx file
            # Eech bounding box is saved as a list: ltrb
            bboxes = {}
            bboxes['body'] = body_bbox.as_nparray().tolist()

            union_bbox = body_bbox.copy()
            for bbox_type in ['bag', 'strap']:
                if bag_bbox.has_key(bbox_type):
                    bbox = bag_bbox[bbox_type].as_nparray().tolist()
                    bboxes[bbox_type] = bbox
                    draw.rectangle(bbox, outline='red' if bbox_type == 'bag' else 'blue')
                    union_bbox = union_bbox.union(bag_bbox[bbox_type])

            # Crop full image using union bbox
            left, top, right, bottom = union_bbox.left, union_bbox.top, union_bbox.right, union_bbox.bottom
            cropped_img = np.array(img)[top:bottom + 1, left:right + 1]
            # Write body bag cache
            writer.write_cache_dat_file(
                cropped_img, kinect_ts=kts, body_number=bno, bag_index=bag_index, bboxes=bboxes
            )

    writer.build_cache_idx_file()


def generate_body_shoes_cache(kid, vid, image_src):
    """ To generate an union shoes cropped image dat/idx cache file
        Based on results from ShoesBoundingV1 task
    """
    from kinect.data_entry.models import ShoesBoundingV1
    from kinect.data_entry.utils import get_shoes_consensus_bboxes

    recording = Recording.objects.get(
        installation__kinect_camera__kinect_id = kid,
        identifier = vid
    )

    sbs = ShoesBoundingV1.objects.filter(
        assignment__is_test = False,
        assignment__state = 'complete',
        assignment__parameters__image_src = image_src,
        recording__installation__kinect_camera__kinect_id = kid,
        recording__identifier = vid,
    ).select_related(
        'recording',
        'process_user'
    ).prefetch_related(
        'recording__installation__kinect_camera'
    )

    if not sbs.exists():
        print "No Data Entry Results!"
        return

    # Check if cache if already existing
    if BodyShoesCache.cache_files_exist(kid, vid, image_src):
        print "Cache files already exist, please check!"
        return

    # Get a list of body which has valid bag bounding box
    body_list = {}
    for sb in sbs:
        kts = sb.kinect_timestamp
        bno = sb.body_number
        if (kts, bno) not in body_list.keys():
            # See if we can get a valid shoes bounding boxes
            shoes_bboxes = get_shoes_consensus_bboxes(kid, vid, kts, bno, image_src)
            body_list[(kts, bno)] = shoes_bboxes

    # Remove empty shoes bboxes in body_list
    body_list = {k:v for k,v in body_list.iteritems() if v}

    if not body_list:
        print "No Qualified Shoes Bounding Results for %s" % recording
        return

    # Create body shoes cache
    writer = BodyShoesCacheWriter(kid, vid, image_src)
    writer.create_cache_dat_file()

    # Get body cache
    body_cache = BodyCache(kid, vid, image_src)
    # Use a margin for shoes image, which is easier to view
    image_margin = 20

    for (kts, bno), shoes_bboxes in body_list.iteritems():
        img = Image.fromarray(body_cache.get_image(kts, bno))
        draw = ImageDraw.Draw(img)
        bboxes = {}
        for shoe_index, shoe_bbox in shoes_bboxes.iteritems():
            # Draw bounding boxes on full frame image
            bbox = shoe_bbox.as_nparray().tolist()
            bboxes[shoe_index] = bbox
            draw.rectangle(bbox, outline='red' if shoe_index == '1' else 'blue')

        # Get a union area for shoes bounding boxes
        if len(shoes_bboxes.keys()) == 1:
            union_bbox = shoes_bboxes.values()[0]
        else:
            union_bbox = shoes_bboxes['1'].union(shoes_bboxes['2'])

        # Crop full image using union bbox's, only crop top & bottom with margin
        left, top, right, bottom = union_bbox.left, union_bbox.top, union_bbox.right, union_bbox.bottom
        cropped_img = np.array(img)[max(0, top-image_margin):bottom+image_margin, ]
        # Write body bag cache
        writer.write_cache_dat_file(
            cropped_img, kinect_ts=kts, body_number=bno, bboxes=bboxes
        )

    writer.build_cache_idx_file()


def generate_body_skeleton_cache(kid, vid, image_src='colour'):
    """ To generate body colour image cache with skeleton
        for every two frames of a body instance
        Also filter last 3 frames which are usually wonky
    """
    # Use the same padding as pose estimation result images
    BODY_PADDING = 130

    recording = Recording.objects.get(
        installation__kinect_camera__kinect_id = kid,
        identifier = vid
    )

    # Create body skeleton cache
    writer = BodySkeletonCacheWriter(kid, vid, image_src)
    writer.create_cache_dat_file()

    body_instances = recording.body_instances.filter(
        type = 'complete'
    ).order_by('start_timestamp', 'body_number')

    for body_instance in body_instances:
        body_number = body_instance.body_number
        bi_images = body_instance.get_images('c', as_np_array=True)
        length = body_instance.total_frames
        for i in xrange(0, length):
            timestamp, img_data = bi_images.next()
            # Generate cache for every 2 frames
            # Skip Image which is None
            if i % 2 == 1 or img_data is None:
                continue

            img = Image.fromarray(img_data, mode='RGB')
            try:
                video_skeleton = recording.video_skeletons.get(kinect_timestamp=timestamp)
            except VideoSkeleton.DoesNotExist, e:
                # There is no relative skeleton existing
                raise Exception("No Video Skeleton! Please Check the Video!")

            # Draw body skeleton on the colour image
            img = video_skeleton.draw_stickmen(img, True, body_index=str(body_number))
            # Get body bounding box in colour image 
            body_data = video_skeleton.body_data[str(body_number)]
            bbox = get_kinect_bbox(body_data, 'body', 'c')
            if bbox:
                # Expand body bounding box with padding
                bbox = bbox.expand(BODY_PADDING)
                # Use max(0, left) to avoid negative number after expanding
                left, top, right, bottom = max(0, bbox.left), max(0, bbox.top), bbox.right, bbox.bottom
                # Crop body from the full image
                cropped_img = np.array(img)[top:bottom + 1, left:right + 1]
                if cropped_img.any():
                    # Write body bag cache if image is not empty
                    writer.write_cache_dat_file(
                        cropped_img, kinect_ts=timestamp, body_number=body_number
                    )

    writer.build_cache_idx_file()

This snippet took 0.02 seconds to highlight.

Back to the Entry List or Home.

Delete this entry (admin only).