Spaces:
Running
Running
import os | |
import json | |
import pickle | |
import math | |
import random | |
import glob | |
import numpy as np | |
import torch | |
import time | |
import cv2 | |
from torch.utils.data import Dataset | |
from PIL import Image, ImageDraw | |
import cv2 | |
from pycocotools.coco import COCO | |
from minigpt4.datasets.datasets.base_dataset import BaseDataset | |
def pt_paint(strokes, num_steps=999): | |
# Create a black canvas | |
img = Image.new('RGB', (256, 256), color='black') | |
draw = ImageDraw.Draw(img) | |
max_steps = len(strokes) | |
num_steps = min(num_steps, max_steps) | |
for i in range(0, num_steps): | |
stroke = strokes[i] | |
x = stroke[0] | |
y = stroke[1] | |
w = stroke[2] | |
h = stroke[3] | |
theta = stroke[4] * 180 | |
rgb = tuple(int(val * 255) for val in stroke[5:8]) # Scale RGB values to 0-255 | |
# Convert degrees to radians for rotation | |
angle_rad = theta * (3.141592653589793 / 180.0) | |
cos_val = math.cos(angle_rad) | |
sin_val = math.sin(angle_rad) | |
# Calculate the coordinates of the rectangle vertices after rotation | |
x1 = x - w/2 | |
y1 = y - h/2 | |
x2 = x + w/2 | |
y2 = y - h/2 | |
x3 = x + w/2 | |
y3 = y + h/2 | |
x4 = x - w/2 | |
y4 = y + h/2 | |
# Rotate the rectangle coordinates | |
x1_new = cos_val * (x1 - x) - sin_val * (y1 - y) + x | |
y1_new = sin_val * (x1 - x) + cos_val * (y1 - y) + y | |
x2_new = cos_val * (x2 - x) - sin_val * (y2 - y) + x | |
y2_new = sin_val * (x2 - x) + cos_val * (y2 - y) + y | |
x3_new = cos_val * (x3 - x) - sin_val * (y3 - y) + x | |
y3_new = sin_val * (x3 - x) + cos_val * (y3 - y) + y | |
x4_new = cos_val * (x4 - x) - sin_val * (y4 - y) + x | |
y4_new = sin_val * (x4 - x) + cos_val * (y4 - y) + y | |
# Draw the rotated rectangle | |
draw.polygon([(x1_new, y1_new), (x2_new, y2_new), (x3_new, y3_new), (x4_new, y4_new)], fill=rgb) | |
return img | |
def pt_stroke2str(single_stroke): | |
x, y, w, h, theta, r, g, b = single_stroke | |
theta = theta * 180 | |
r, g, b = r * 255, g * 255, b * 255 | |
param = [x, y, w, h, theta, r, g, b] | |
param = ','.join([str(int(i)) for i in param]) | |
str_stroke = '({})'.format(param) | |
return str_stroke | |
class PaintPTCOCODataset(Dataset): | |
def __init__(self, vis_processor, text_processor, img_root, stroke_root, max_step=200): | |
""" | |
vis_root (string): Root directory of images (e.g. coco/images/) | |
ann_root (string): directory to store the annotation file | |
""" | |
self.img_root = img_root | |
self.stroke_root = stroke_root | |
self.image_ids = [file.split('/')[-1].split('.')[0] | |
for file in glob.glob(os.path.join(self.stroke_root, '*.pkl'))] | |
self.max_step = max_step | |
self.vis_processor = vis_processor | |
self.text_processor = text_processor | |
def __len__(self): | |
return len(self.image_ids) | |
def preprocess(self, index, step=-1): | |
image_id = self.image_ids[index] | |
with open(os.path.join(self.stroke_root, '{}.pkl'.format(image_id)), "rb") as f: | |
strokes_dict = pickle.load(f) | |
strokes = np.concatenate(strokes_dict['strokes'], axis=0) | |
if step < 0: | |
step = random.randint(0, min(len(strokes) - 1, self.max_step)) | |
canvas = pt_paint(strokes, num_steps=step) | |
next_stroke = strokes[step] | |
image_file = '{}.jpg'.format(image_id) | |
image_path = os.path.join(self.img_root, image_file) | |
orig_image = Image.open(image_path).convert("RGB") | |
return { | |
"orig_image": orig_image, | |
"canvas": canvas, | |
"next_stroke": pt_stroke2str(next_stroke), | |
"image_id": image_id, | |
} | |
def __getitem__(self, index): | |
data = self.preprocess(index) | |
orig_image = self.vis_processor(data['orig_image']) | |
canvas = self.vis_processor(data['canvas']) | |
instruction = "<Image><ImageHere><Canvas><ImageHere> Next Stroke: " | |
return { | |
"image": torch.stack([orig_image, canvas], dim=0), | |
"instruction_input": instruction, | |
"answer": data['next_stroke'], | |
"image_id": data['image_id'], | |
"length": 2 | |
} | |
def normal(x, width): | |
return (int)(x * (width - 1) + 0.5) | |
def draw(f, canvas=None, width=128, res=100): | |
x0, y0, x1, y1, x2, y2, z0, z2, w0, w2, b, g, r = [float(i) for i in f] | |
x1 = x0 + (x2 - x0) * x1 | |
y1 = y0 + (y2 - y0) * y1 | |
x0 = normal(x0, width) | |
x1 = normal(x1, width) | |
x2 = normal(x2, width) | |
y0 = normal(y0, width) | |
y1 = normal(y1, width) | |
y2 = normal(y2, width) | |
z0 = (int)(1 + z0 * width // 4) | |
z2 = (int)(1 + z2 * width // 4) | |
if canvas is None: | |
canvas = np.zeros([width, width, 4]) | |
tmp = 1. / res | |
for i in range(res): | |
t = i * tmp | |
x = (int)((1-t) * (1-t) * x0 + 2 * t * (1-t) * x1 + t * t * x2) | |
y = (int)((1-t) * (1-t) * y0 + 2 * t * (1-t) * y1 + t * t * y2) | |
z = (int)((1-t) * z0 + t * z2) | |
# w = (1-t) * w0 + t * w2 | |
w = 1 | |
cv2.circle(canvas, (y, x), z, [w, r * w, g * w, b * w], -1) | |
return canvas | |
def rl_decode(x, canvas, res=100): | |
stroke = [] | |
color_stroke = [] | |
for step in range(x.shape[1]): | |
stroke_canvas = np.zeros([canvas.shape[-1], canvas.shape[-1], 4], dtype=np.float32) # alpha, alpha * r, alpha * g, alpha * b | |
for idx in range(x.shape[0]): | |
stroke_canvas = draw(x[idx, step], canvas=stroke_canvas, width=canvas.shape[-1], res=res) | |
stroke_canvas = stroke_canvas.transpose(2, 0, 1) | |
stroke.append(stroke_canvas[:1]) | |
color_stroke.append(stroke_canvas[1:]) | |
for i in range(len(stroke)): | |
canvas = canvas * (1 - stroke[i]) + color_stroke[i] | |
return canvas | |
def rel2abs(strokes, n_d=4): | |
abs_strokes = [] | |
for i, stroke in enumerate(strokes): | |
yi = i % n_d | |
xi = i // n_d | |
stroke = np.stack([ | |
stroke[:, 0] / n_d + xi / n_d, | |
stroke[:, 1] / n_d + yi / n_d, | |
stroke[:, 2] / n_d + xi / n_d, | |
stroke[:, 3] / n_d + yi / n_d, | |
stroke[:, 4] / n_d + xi / n_d, | |
stroke[:, 5] / n_d + yi / n_d, | |
stroke[:, 6] / n_d, | |
stroke[:, 7] / n_d, | |
stroke[:, 8], | |
stroke[:, 9], | |
stroke[:, 10], | |
stroke[:, 11], | |
stroke[:, 12], | |
], axis=1) | |
abs_strokes.append(stroke) | |
abs_strokes = np.stack(abs_strokes) | |
return abs_strokes | |
def rl_paint(strokes_dict, step, width=256, single_stroke=False): | |
canvas = np.zeros([1, 3, width, width], dtype=np.float32) | |
if_fine_strokes = [int(len(strokes.shape) > 2) for strokes in strokes_dict['strokes']] | |
if single_stroke: | |
n_steps = (len(if_fine_strokes) - sum(if_fine_strokes)) * 5 + 16 * 5 * sum(if_fine_strokes) | |
else: | |
n_steps = len(if_fine_strokes) + 4 * sum(if_fine_strokes) | |
step = min(step, n_steps-1) | |
for strokes in strokes_dict['strokes']: | |
strokes = strokes.astype(np.float32) | |
if len(strokes.shape) < 3: # coarse stage. shape 5, 13 | |
if single_stroke: # 1 stroke per step | |
actions_list = [stroke[None, None] for stroke in strokes] | |
else: # 5 strokes per step | |
actions_list = [strokes[None]] | |
else: # fine stage. shape 16, 5, 13 | |
strokes = rel2abs(strokes) | |
if single_stroke: # 1 stroke per step | |
strokes = strokes.transpose(1, 0, 2) | |
actions_list = [stroke[None, None] for step_strokes in strokes for stroke in step_strokes] | |
else: # 16 strokes per step. each variable strokes contains 5 steps | |
actions_list = [strokes[:, i:i+1] for i in range(strokes.shape[1])] | |
for actions in actions_list: | |
if step > 0: | |
canvas = rl_decode(actions, canvas, res=100) | |
step = step - 1 | |
else: | |
next_stroke = actions | |
return canvas, next_stroke | |
raise StopIteration | |
def rl_stroke2str(action): | |
a, b, _ = action.shape | |
if a == 1 and b == 5: # coarse step, contains 5 strokes | |
action = action[0] # 5 x 13 | |
tag = '[coarse]' | |
elif a == 16 and b == 1: # fine step. contains 16 strokes | |
action = action[:, 0] # 16 x 13 | |
tag = '[detail]' | |
elif a == 1 and b == 1: | |
action = action[0] | |
tag = '' | |
else: | |
raise ValueError | |
strokes = [] | |
for i, stroke in enumerate(action): | |
stroke = [str(int(i * 255)) for i in stroke] | |
stroke = ",".join(stroke) | |
stroke = "{}({})".format(i, stroke) | |
strokes.append(stroke) | |
strokes = ';'.join(strokes) | |
strokes = tag + strokes | |
return strokes | |
def rlo_stroke2str(action): | |
a, b, _ = action.shape | |
if a == 1 and b == 5: # coarse step, contains 5 strokes | |
action = action[0] # 5 x 13 | |
tag = '[coarse]' | |
elif a == 16 and b == 1: # fine step. contains 16 strokes | |
action = action[:, 0] # 16 x 13 | |
tag = '[detail]' | |
elif a == 1 and b == 1: | |
action = action[0] | |
tag = '' | |
else: | |
raise ValueError | |
strokes = [] | |
for i, stroke in enumerate(action): | |
x0, y0, x1, y1, x2, y2, z0, z2, w0, w2, b, g, r = stroke | |
stroke = [x0, y0, x1, y1, x2, y2, z0, z2, b, g, r] # remove unused transparancy | |
stroke = [str(int(i * 255)) for i in stroke] | |
stroke = ",".join(stroke) | |
stroke = "{}({})".format(i, stroke) | |
strokes.append(stroke) | |
strokes = ';'.join(strokes) | |
strokes = tag + strokes | |
return strokes | |
class PaintRLCOCODataset(Dataset): | |
def __init__(self, vis_processor, text_processor, img_root, stroke_root, single_stroke=False, max_step=50): | |
""" | |
vis_root (string): Root directory of images (e.g. coco/images/) | |
ann_root (string): directory to store the annotation file | |
""" | |
self.img_root = img_root | |
self.stroke_root = stroke_root | |
self.image_ids = [file.split('/')[-1].split('.')[0] | |
for file in glob.glob(os.path.join(self.stroke_root, '*.pkl'))] | |
self.max_step = max_step | |
self.vis_processor = vis_processor | |
self.text_processor = text_processor | |
self.single_stroke=single_stroke | |
self.width = 256 | |
def __len__(self): | |
return len(self.image_ids) | |
def preprocess(self, index, step=-1): | |
image_id = self.image_ids[index] | |
image_file = '{}.jpg'.format(image_id) | |
image_path = os.path.join(self.img_root, image_file) | |
orig_image = Image.open(image_path).convert("RGB") | |
with open(os.path.join(self.stroke_root, '{}.pkl'.format(image_id)), "rb") as f: | |
strokes_dict = pickle.load(f) | |
if_fine_strokes = [int(len(strokes.shape) > 2) for strokes in strokes_dict['strokes']] | |
if self.single_stroke: | |
n_steps = (len(if_fine_strokes) - sum(if_fine_strokes)) * 5 + 16 * 5 * sum(if_fine_strokes) | |
else: | |
n_steps = len(if_fine_strokes) + 4 * sum(if_fine_strokes) | |
if step < 0: | |
step = random.randint(0, min(n_steps - 1, self.max_step)) | |
canvas, next_stroke = rl_paint(strokes_dict, step, width=self.width, single_stroke=self.single_stroke) | |
canvas = Image.fromarray((canvas[0].transpose(1, 2, 0) * 255).astype(np.uint8)) | |
return { | |
"orig_image": orig_image, | |
"canvas": canvas, | |
"next_stroke": rl_stroke2str(next_stroke), | |
"image_id": image_id, | |
} | |
def __getitem__(self, index): | |
data = self.preprocess(index) | |
orig_image = self.vis_processor(data['orig_image']) | |
canvas = self.vis_processor(data['canvas']) | |
instruction = "<Image><ImageHere><Canvas><ImageHere> Action: " | |
return { | |
"image": torch.stack([orig_image, canvas], dim=0), | |
"instruction_input": instruction, | |
"answer": data['next_stroke'], | |
"image_id": data['image_id'], | |
"length": 2 | |
} | |
class PaintLanRLOpaqueCOCODataset(Dataset): | |
def __init__(self, vis_processor, text_processor, img_root, stroke_root, ann_path, single_stroke=False, max_step=50): | |
""" | |
vis_root (string): Root directory of images (e.g. coco/images/) | |
ann_root (string): directory to store the annotation file | |
""" | |
self.img_root = img_root | |
self.stroke_root = stroke_root | |
self.image_ids = [file.split('/')[-1].split('.')[0] | |
for file in glob.glob(os.path.join(self.stroke_root, '*.pkl'))] | |
self.max_step = max_step | |
self.vis_processor = vis_processor | |
self.text_processor = text_processor | |
self.single_stroke = single_stroke | |
self.captions = {} | |
with open(ann_path, 'r') as f: | |
anns = json.load(f) | |
for ann in anns['annotations']: | |
if ann['image_id'] in self.captions: | |
self.captions[ann['image_id']].append(ann['caption']) | |
else: | |
self.captions[ann['image_id']] = [ann['caption']] | |
for idx in self.image_ids: | |
assert int(idx) in self.captions | |
self.width = 256 | |
self.instruction = "Task: {}\nCanvas: <ImageHere> Action: " | |
def __len__(self): | |
return len(self.image_ids) | |
def preprocess(self, index, step=-1): | |
image_id = self.image_ids[index] | |
image_file = '{}.jpg'.format(image_id) | |
image_path = os.path.join(self.img_root, image_file) | |
orig_image = Image.open(image_path).convert("RGB") | |
captions = self.captions[int(image_id)] | |
with open(os.path.join(self.stroke_root, '{}.pkl'.format(image_id)), "rb") as f: | |
strokes_dict = pickle.load(f) | |
if_fine_strokes = [int(len(strokes.shape) > 2) for strokes in strokes_dict['strokes']] | |
if self.single_stroke: | |
n_steps = (len(if_fine_strokes) - sum(if_fine_strokes)) * 5 + 16 * 5 * sum(if_fine_strokes) | |
else: | |
n_steps = len(if_fine_strokes) + 4 * sum(if_fine_strokes) | |
if step < 0: | |
step = random.randint(0, min(n_steps - 1, self.max_step)) | |
canvas, next_stroke = rl_paint(strokes_dict, step, width=self.width, single_stroke=self.single_stroke) | |
canvas = Image.fromarray((canvas[0].transpose(1, 2, 0) * 255).astype(np.uint8)) | |
return { | |
"orig_image": orig_image, | |
"captions": captions, | |
"canvas": canvas, | |
"next_stroke": rlo_stroke2str(next_stroke), | |
"image_id": image_id, | |
} | |
def __getitem__(self, index): | |
data = self.preprocess(index) | |
canvas = self.vis_processor(data['canvas']) | |
instruction = self.instruction.format(random.choice(data['captions'])) | |
return { | |
"image": canvas, | |
"instruction_input": instruction, | |
"answer": data['next_stroke'], | |
"image_id": data['image_id'], | |
} | |
class PaintPixelCOCODataset(BaseDataset): | |
def __init__(self, vis_processor, text_processor, vis_root, ann_paths, res): | |
""" | |
vis_root (string): Root directory of images (e.g. coco/images/) | |
ann_root (string): directory to store the annotation file | |
""" | |
super().__init__(vis_processor, text_processor, vis_root, ann_paths) | |
self.res = res | |
self.img_ids = {} | |
n = 0 | |
self.filter_anntation = [] | |
for ann in self.annotation: | |
if "train" in ann["image"]: | |
self.filter_anntation.append(ann) | |
self.annotation = self.filter_anntation | |
for ann in self.annotation: | |
img_id = ann["image_id"] | |
if img_id not in self.img_ids.keys(): | |
self.img_ids[img_id] = n | |
n += 1 | |
def __getitem__(self, index): | |
ann = self.annotation[index] | |
img_file = ann["image"].split("/")[-1] | |
image_path = os.path.join(self.vis_root, img_file) | |
image = Image.open(image_path).convert("RGB") | |
pixelized = np.array(image.resize([self.res, self.res])) | |
image = self.vis_processor(image) | |
loc_y = random.randint(0, self.res - 1) | |
loc_x = random.randint(0, self.res - 1) | |
rgb = pixelized[loc_y, loc_x] | |
instruction = "<Img><ImageHere></Img> [reconstruct] loc: [{},{}] rgb: ".format(loc_y, loc_x) | |
answer = '[{},{},{}]'.format(rgb[0], rgb[1], rgb[2]) | |
return { | |
"image": image, | |
"answer": answer, | |
"instruction_input": instruction, | |
} | |
class SegReferCOCODataset(Dataset): | |
def __init__(self, vis_processor, text_processor, vis_root, ann_path, res, dataset='refcoco', splitBy='unc'): | |
""" | |
vis_root (string): Root directory of images (e.g. coco/images/) | |
ann_path (string): directory to store the annotation file | |
""" | |
self.vis_root = vis_root | |
self.ann_path = ann_path | |
self.splitBy = splitBy | |
self.res = res | |
self.vis_processor = vis_processor | |
self.text_processor = text_processor | |
self.ann_dir = os.path.join(ann_path, dataset) | |
ref_file = os.path.join(self.ann_dir, 'refs(' + splitBy + ').p') | |
self.data = {} | |
with open(ref_file, 'rb') as f: | |
data_refs = pickle.load(f) | |
data_refs = [ref for ref in data_refs if ref['split'] == 'train'] # only use train split | |
for ref in data_refs: | |
if ref['image_id'] in self.data: | |
self.data[ref['image_id']].append(ref) | |
else: | |
self.data[ref['image_id']] = [ref] | |
self.img_id_list = list(self.data.keys()) | |
# load annotations from data/dataset/instances.json | |
instances_file = os.path.join(self.ann_dir, 'instances.json') | |
self.coco = COCO(instances_file) | |
def __len__(self): | |
return len(self.img_id_list) | |
def prepare_data(self, index): | |
image_id = self.img_id_list[index] | |
raw_anns = self.data[image_id] | |
anns = [] | |
for ann in raw_anns: | |
refers = [sentence['sent'] for sentence in ann['sentences']] | |
ann_id = ann['ann_id'] | |
annotations = self.coco.loadAnns([ann_id]) | |
mask = Image.fromarray(self.coco.annToMask(annotations[0])) | |
anns.append({'refers': refers, 'mask': mask}) | |
img_data = self.coco.loadImgs(image_id)[0] | |
image_path = os.path.join(self.vis_root, img_data['file_name']) | |
image = Image.open(image_path).convert("RGB") | |
return { | |
'image': image, | |
'anns': anns, | |
} | |
def __getitem__(self, index): | |
data = self.prepare_data(index) | |
image = self.vis_processor(data['image']) | |
all_masks = [np.array(ann['mask'].resize([self.res, self.res], 0)) for ann in data['anns']] | |
ann_id = random.randint(0, len(data['anns']) - 1) | |
selected_ann = data['anns'][ann_id] | |
selected_refer = random.choice(selected_ann['refers']) | |
pixelized_mask = all_masks[ann_id] | |
all_mask = sum(all_masks) | |
pixelized_mask[pixelized_mask != 0] = 1 | |
all_mask[all_mask != 0] = 1 | |
has_other_obj = bool((all_mask != pixelized_mask).sum()) | |
if (pixelized_mask == 0).sum() in [0, pixelized_mask.size]: # all black or all white | |
loc_y = random.randint(0, self.res - 1) | |
loc_x = random.randint(0, self.res - 1) | |
else: | |
if random.uniform(0, 1) < 0.4: # in 40% cases we sample object region | |
# object region | |
ys, xs = np.where(pixelized_mask != 0) | |
else: | |
# background | |
dice = random.uniform(0, 1) | |
if dice < 0.1: | |
# easy background points | |
ys, xs = np.where(pixelized_mask == 0) | |
elif has_other_obj and dice < 0.6: | |
# points on other unrelated objects | |
other_obj_mask = cv2.bitwise_xor(pixelized_mask, all_mask) | |
ys, xs = np.where(other_obj_mask != 0) | |
else: | |
# contour points around the object | |
dilate_mask = cv2.dilate(pixelized_mask, np.ones([self.res // 8, self.res // 8], dtype=np.uint8), | |
iterations=1) | |
contour_mask = cv2.bitwise_xor(pixelized_mask, dilate_mask) | |
ys, xs = np.where(contour_mask != 0) | |
idx = random.randint(0, len(ys) - 1) | |
loc_y, loc_x = ys[idx], xs[idx] | |
mask_value = pixelized_mask[loc_y, loc_x] | |
instruction = "<Img><ImageHere></Img> [segmentation] {} loc: [{},{}] mask: ".format( | |
selected_refer, loc_y, loc_x) | |
answer = str(mask_value) | |
return { | |
"image": image, | |
"answer": answer, | |
"instruction_input": instruction, | |
} | |