import transformers from torch.utils.data import Dataset from torchvision import transforms import torch import logging import random from typing import Dict import os import numpy as np from pycocotools.coco import COCO import cv2 DEFAULT_IMAGE_PATCH_TOKEN = "" PREFIX_IMAGE = "Image: " PREFIX_NO_IMAGE = "Image: N/A" BEGIN_DESCRIPTION = "" END_DESCRIPTION = "" IGNORE_INDEX = -100 DEFAULT_EOS_TOKEN = "" from .constants import COCO_KEYPOINT_NAME, KeypointLocationDescription, KeypointLocationQuestion DEFAULT_IMAGE_PATCH_TOKEN = "" PREFIX_IMAGE = "Image: " PREFIX_NO_IMAGE = "Image: N/A" BEGIN_DESCRIPTION = "" END_DESCRIPTION = "" IGNORE_INDEX = -100 DEFAULT_EOS_TOKEN = "" BEGIN_OPTIONS = "" END_OPTIONS = "" BEGIN_LOC = "" END_LOC = "" BEGIN_QUESTION = "" END_QUESTION = "" class PoseCOCODataset(Dataset): """Dataset for supervised fine-tuning.""" def __init__(self, data_path: str, multimodal_cfg: dict, is_train=True, is_RL=False ): super(PoseCOCODataset, self).__init__() logging.warning("Loading data...") self.size = multimodal_cfg['image_size'] self.aspect_ratio = 1.0 self.pixel_std = 200 self.num_joints = 17 coco = COCO(data_path) list_data_dict = [] instance_id = 0 for index in coco.getImgIds(): im_ann = coco.loadImgs(index)[0] width = im_ann['width'] height = im_ann['height'] annIds = coco.getAnnIds(imgIds=index, iscrowd=False) objs = coco.loadAnns(annIds) # sanitize bboxes valid_objs = [] for obj in objs: x, y, w, h = obj['bbox'] x1 = np.max((0, x)) y1 = np.max((0, y)) x2 = np.min((width - 1, x1 + np.max((0, w - 1)))) y2 = np.min((height - 1, y1 + np.max((0, h - 1)))) if obj['area'] > 0 and x2 >= x1 and y2 >= y1: obj['clean_bbox'] = [x1, y1, x2-x1, y2-y1] valid_objs.append(obj) objs = valid_objs for obj in objs: cls = obj['category_id'] if cls != 1: continue # ignore objs without keypoints annotation if max(obj['keypoints']) == 0: continue joints_3d = np.zeros((self.num_joints, 3), dtype=np.float32) joints_3d_vis = np.zeros((self.num_joints, 3), dtype=np.float32) visible = np.zeros((self.num_joints), dtype=np.float32) for ipt in range(self.num_joints): joints_3d[ipt, 0] = obj['keypoints'][ipt * 3 + 0] joints_3d[ipt, 1] = obj['keypoints'][ipt * 3 + 1] joints_3d[ipt, 2] = 0 t_vis = obj['keypoints'][ipt * 3 + 2] visible[ipt] = t_vis if t_vis > 1: t_vis = 1 joints_3d_vis[ipt, 0] = t_vis joints_3d_vis[ipt, 1] = t_vis joints_3d_vis[ipt, 2] = 0 center, scale = self._box2cs(obj['clean_bbox'][:4]) list_data_dict.append({ 'file_name': im_ann['file_name'], 'image_id': index, 'center': center, 'scale': scale, 'joints_3d': joints_3d, 'joints_3d_vis': joints_3d_vis, 'instance_id': instance_id, 'human_bbox': obj['clean_bbox'] }) instance_id += 1 logging.warning("The number of training samples is {}".format(len(list_data_dict))) logging.warning("Formatting inputs...Skip in lazy mode") self.list_data_dict = list_data_dict self.multimodal_cfg = multimodal_cfg self.data_aug = False self.is_train = is_train def __len__(self): return len(self.list_data_dict) def __getitem__(self, i): return self._parse_data_item_val(i) def _parse_data_item_val(self, i): sources = self.list_data_dict[i] result_dict = {} image, joints, joints_vis, c, s, file_name, image_size = self._get_pose_item(sources) image_id = sources['image_id'] result_dict['image'] = image result_dict['image_id'] = image_id result_dict['c'] = c result_dict['s'] = s result_dict['joints'] = joints result_dict['joints_vis'] = joints_vis result_dict['file_name'] = file_name result_dict['human_bbox'] = sources['human_bbox'] result_dict['image_size'] = image_size return result_dict def _get_pose_item(self, sources): file_name = sources['file_name'] image_folder = self.multimodal_cfg['image_folder'] image_file = os.path.join(image_folder, file_name) image = cv2.imread( image_file, cv2.IMREAD_COLOR | cv2.IMREAD_IGNORE_ORIENTATION ) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) h, w, c = image.shape # process image joints = sources['joints_3d'] joints_vis = sources['joints_3d_vis'] c = sources['center'] s = sources['scale'] r = 0 if self.data_aug: sf = 0.3 rf = 40 s = s * np.clip(np.random.randn()*sf + 1, 1 - sf, 1 + sf) r = random.uniform(-rf, rf) if random.random() <= 0.5 else 0 flip_pairs = [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12], [13, 14], [15, 16]] # flip if random.random() <= 0.5: image = image[:, ::-1, :] joints, joints_vis = fliplr_joints( joints, joints_vis, image.shape[1], flip_pairs) c[0] = image.shape[1] - c[0] - 1 trans = get_affine_transform(c, s, r, (int(self.size), int(self.size))) image = cv2.warpAffine( image, trans, (int(self.size), int(self.size)), flags=cv2.INTER_LINEAR) for i in range(self.num_joints): if joints_vis[i, 0] > 0.0: joints[i, 0:2] = affine_transform(joints[i, 0:2], trans) return image, joints, joints_vis, c, s, file_name, [h,w] def _box2cs(self, box): x, y, w, h = box[:4] return self._xywh2cs(x, y, w, h) def _xywh2cs(self, x, y, w, h): center = np.zeros((2), dtype=np.float32) center[0] = x + w * 0.5 center[1] = y + h * 0.5 if w > self.aspect_ratio * h: h = w * 1.0 / self.aspect_ratio elif w < self.aspect_ratio * h: w = h * self.aspect_ratio scale = np.array( [w * 1.0 / self.pixel_std, h * 1.0 / self.pixel_std], dtype=np.float32) if center[0] != -1: scale = scale * 1.0 return center, scale def fliplr_joints(joints, joints_vis, width, matched_parts): """ flip coords """ # Flip horizontal joints[:, 0] = width - joints[:, 0] - 1 # Change left-right parts for pair in matched_parts: joints[pair[0], :], joints[pair[1], :] = \ joints[pair[1], :], joints[pair[0], :].copy() joints_vis[pair[0], :], joints_vis[pair[1], :] = \ joints_vis[pair[1], :], joints_vis[pair[0], :].copy() return joints*joints_vis, joints_vis def transform_preds(coords, center, scale, output_size): target_coords = np.zeros(coords.shape) trans = get_affine_transform(center, scale, 0, output_size, inv=1) for p in range(coords.shape[0]): target_coords[p, 0:2] = affine_transform(coords[p, 0:2], trans) return target_coords def get_affine_transform( center, scale, rot, output_size, shift=np.array([0, 0], dtype=np.float32), inv=0 ): if not isinstance(scale, np.ndarray) and not isinstance(scale, list): print(scale) scale = np.array([scale, scale]) scale_tmp = scale * 200.0 src_w = scale_tmp[0] dst_w = output_size[0] dst_h = output_size[1] rot_rad = np.pi * rot / 180 src_dir = get_dir([0, src_w * -0.5], rot_rad) dst_dir = np.array([0, dst_w * -0.5], np.float32) src = np.zeros((3, 2), dtype=np.float32) dst = np.zeros((3, 2), dtype=np.float32) src[0, :] = center + scale_tmp * shift src[1, :] = center + src_dir + scale_tmp * shift dst[0, :] = [dst_w * 0.5, dst_h * 0.5] dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir src[2:, :] = get_3rd_point(src[0, :], src[1, :]) dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :]) if inv: trans = cv2.getAffineTransform(np.float32(dst), np.float32(src)) else: trans = cv2.getAffineTransform(np.float32(src), np.float32(dst)) return trans def affine_transform(pt, t): new_pt = np.array([pt[0], pt[1], 1.]).T new_pt = np.dot(t, new_pt) return new_pt[:2] def get_3rd_point(a, b): direct = a - b return b + np.array([-direct[1], direct[0]], dtype=np.float32) def get_dir(src_point, rot_rad): sn, cs = np.sin(rot_rad), np.cos(rot_rad) src_result = [0, 0] src_result[0] = src_point[0] * cs - src_point[1] * sn src_result[1] = src_point[0] * sn + src_point[1] * cs return src_result