Spaces:
Running
on
Zero
Running
on
Zero
""" | |
Copyright (c) 2022, salesforce.com, inc. | |
All rights reserved. | |
SPDX-License-Identifier: BSD-3-Clause | |
For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause | |
""" | |
import os | |
import json | |
import random | |
from PIL import Image | |
from minigpt4.datasets.datasets.vqa_datasets import VQADataset, VQAEvalDataset | |
from collections import OrderedDict | |
class __DisplMixin: | |
def displ_item(self, index): | |
sample, ann = self.__getitem__(index), self.annotation[index] | |
return OrderedDict( | |
{ | |
"file": ann["image"], | |
"question": ann["question"], | |
"question_id": ann["question_id"], | |
"answers": "; ".join(ann["answer"]), | |
"image": sample["image"], | |
} | |
) | |
class COCOVQADataset(VQADataset, __DisplMixin): | |
def __init__(self, vis_processor, text_processor, vis_root, ann_paths): | |
super().__init__(vis_processor, text_processor, vis_root, ann_paths) | |
self.instruction_pool =[ | |
"[vqa] {}", | |
"[vqa] Based on the image, respond to this question with a short answer: {}" | |
] | |
exist_annotation = [] | |
for ann in self.annotation: | |
image_path = os.path.join(self.vis_root, ann["image"].split('/')[-1]) | |
if os.path.exists(image_path): | |
exist_annotation.append(ann) | |
self.annotation = exist_annotation | |
def get_data(self, index): | |
ann = self.annotation[index] | |
image_path = os.path.join(self.vis_root, ann["image"].split('/')[-1]) | |
image = Image.open(image_path).convert("RGB") | |
image = self.vis_processor(image) | |
question = self.text_processor(ann["question"]) | |
question_id = ann["question_id"] | |
answer_weight = {} | |
for answer in ann["answer"]: | |
if answer in answer_weight.keys(): | |
answer_weight[answer] += 1 / len(ann["answer"]) | |
else: | |
answer_weight[answer] = 1 / len(ann["answer"]) | |
answers = list(answer_weight.keys()) | |
weights = list(answer_weight.values()) | |
answer = random.choices(answers, weights=weights, k=1)[0] # random sample an answer according to weights | |
if "unk" in answer: | |
print("cocovqa", answer) | |
return { | |
"image": image, | |
"question": question, | |
"question_id": question_id, | |
"answer": answer, | |
} | |
def __getitem__(self, index): | |
data = self.get_data(index) | |
instruction = random.choice(self.instruction_pool).format(data['question']) | |
instruction = "<Img><ImageHere></Img> {} ".format(instruction) | |
return { | |
"image": data['image'], | |
"question_id": data["question_id"], | |
"instruction_input": instruction, | |
"answer": self.text_processor(data['answer']), | |
} | |
class COCOVQGDataset(COCOVQADataset): | |
def __init__(self, vis_processor, text_processor, vis_root, ann_paths): | |
super().__init__(vis_processor, text_processor, vis_root, ann_paths) | |
self.instruction_pool = [ | |
'Given the image, generate a question whose answer is: {}', | |
'Based on the image, provide a question with the answer: {}', | |
'Given the visual representation, create a question for which the answer is "{}"', | |
'From the image provided, craft a question that leads to the reply: {}', | |
'Considering the picture, come up with a question where the answer is: {}', | |
'Taking the image into account, generate an question that has the answer: {}' | |
] | |
def __getitem__(self, index): | |
data = self.get_data(index) | |
instruction = random.choice(self.instruction_pool).format(data['answer']) | |
instruction = "<Img><ImageHere></Img> {}".format(instruction) | |
return { | |
"image": data['image'], | |
"question_id": data["question_id"], | |
"instruction_input": instruction, | |
"answer": data['question'], | |
} | |
class COCOVQAEvalDataset(VQAEvalDataset, __DisplMixin): | |
def __init__(self, vis_processor, text_processor, vis_root, ann_paths): | |
""" | |
vis_root (string): Root directory of images (e.g. coco/images/) | |
ann_root (string): directory to store the annotation file | |
""" | |
self.instruction_pool = [ | |
# '{}', | |
# 'Question: {}', | |
# '{} A short answer to the question is', | |
# 'Q: {} A:', | |
'Question: {} Short answer:', | |
# 'Given the image, answer the following question with no more than three words. {}', | |
# 'Based on the image, respond to this question with a short answer: {}.', | |
# 'Use the provided image to answer the question: {} Provide your answer as short as possible.', | |
# 'What is the answer to the following question? "{}"', | |
# 'The question "{}" can be answered using the image. A short answer is' | |
] | |
# print('vis_root', vis_root) | |
self.vis_root = vis_root | |
self.annotation = json.load(open(ann_paths[0])) | |
answer_list_path = ann_paths[1] | |
if os.path.exists(answer_list_path): | |
self.answer_list = json.load(open(answer_list_path)) | |
else: | |
self.answer_list = None | |
try: | |
self.coco_fmt_qust_file = ann_paths[2] | |
self.coco_fmt_anno_file = ann_paths[3] | |
except IndexError: | |
self.coco_fmt_qust_file = None | |
self.coco_fmt_anno_file = None | |
self.vis_processor = vis_processor | |
self.text_processor = text_processor | |
self._add_instance_ids() | |
def __getitem__(self, index): | |
ann = self.annotation[index] | |
image_path = os.path.join(self.vis_root, ann["image"]) | |
image = Image.open(image_path).convert("RGB") | |
image = self.vis_processor(image) | |
question = self.text_processor(ann["question"]) | |
instruction = random.choice(self.instruction_pool).format(question) | |
instruction = "<Img><ImageHere></Img> {} ".format(instruction) | |
return { | |
"image": image, | |
'image_path': image_path, | |
"question": question, | |
"question_id": ann["question_id"], | |
"instruction_input": instruction, | |
"instance_id": ann["instance_id"], | |
} | |