RxnIM / mllm /dataset /utils /flickr30k_entities_utils.py
CYF200127's picture
Upload 235 files
3e1d9f3 verified
raw
history blame
7.63 kB
import os
import re
import xml.etree.ElementTree as ET
from typing import Dict, List
from tqdm import tqdm
def get_sentence_data(fn):
"""
Parses a sentence file from the Flickr30K Entities dataset
input:
fn - full file path to the sentence file to parse
output:
a list of dictionaries for each sentence with the following fields:
sentence - the original sentence
phrases - a list of dictionaries for each phrase with the
following fields:
phrase - the text of the annotated phrase
first_word_index - the position of the first word of
the phrase in the sentence
phrase_id - an identifier for this phrase
phrase_type - a list of the coarse categories this
phrase belongs to
"""
with open(fn, 'r', encoding='utf8') as f:
sentences = f.read().split('\n')
annotations = []
for sentence in sentences:
if not sentence:
continue
first_word = []
phrases = []
phrase_id = []
phrase_type = []
words = []
current_phrase = []
add_to_phrase = False
for token in sentence.split():
if add_to_phrase:
if token[-1] == ']':
add_to_phrase = False
token = token[:-1]
current_phrase.append(token)
phrases.append(' '.join(current_phrase))
current_phrase = []
else:
current_phrase.append(token)
words.append(token)
else:
if token[0] == '[':
add_to_phrase = True
first_word.append(len(words))
parts = token.split('/')
phrase_id.append(parts[1][3:])
phrase_type.append(parts[2:])
else:
words.append(token)
sentence_data = {'sentence': ' '.join(words), 'phrases': []}
for index, phrase, p_id, p_type in zip(first_word, phrases, phrase_id, phrase_type):
sentence_data['phrases'].append({'first_word_index': index,
'phrase': phrase,
'phrase_id': p_id,
'phrase_type': p_type})
annotations.append(sentence_data)
return annotations
def get_annotations(fn):
"""
Parses the xml files in the Flickr30K Entities dataset
input:
fn - full file path to the annotations file to parse
output:
dictionary with the following fields:
scene - list of identifiers which were annotated as
pertaining to the whole scene
nobox - list of identifiers which were annotated as
not being visible in the image
boxes - a dictionary where the fields are identifiers
and the values are its list of boxes in the
[xmin ymin xmax ymax] format
"""
tree = ET.parse(fn)
root = tree.getroot()
size_container = root.findall('size')[0]
anno_info = {'boxes': {}, 'scene': [], 'nobox': []}
for size_element in size_container:
anno_info[size_element.tag] = int(size_element.text)
for object_container in root.findall('object'):
for names in object_container.findall('name'):
box_id = names.text
box_container = object_container.findall('bndbox')
if len(box_container) > 0:
if box_id not in anno_info['boxes']:
anno_info['boxes'][box_id] = []
xmin = int(box_container[0].findall('xmin')[0].text) - 1
ymin = int(box_container[0].findall('ymin')[0].text) - 1
xmax = int(box_container[0].findall('xmax')[0].text) - 1
ymax = int(box_container[0].findall('ymax')[0].text) - 1
anno_info['boxes'][box_id].append([xmin, ymin, xmax, ymax])
else:
nobndbox = int(object_container.findall('nobndbox')[0].text)
if nobndbox > 0:
anno_info['nobox'].append(box_id)
scene = int(object_container.findall('scene')[0].text)
if scene > 0:
anno_info['scene'].append(box_id)
return anno_info
def get_ann_path(idx, *, annotation_dir=""):
return os.path.join(annotation_dir, rf'Annotations/{idx}.xml')
def get_sen_path(idx, *, annotation_dir=""):
return os.path.join(annotation_dir, rf"Sentences/{idx}.txt")
def get_img_path(idx, *, image_dir=""):
return os.path.join(image_dir, rf'{idx}.jpg')
PHRASE_ST_PLACEHOLDER = '<ph_st>'
PHRASE_ED_PLACEHOLDER = '<ph_ed>'
def flatten_annotation(annotation_dir, indexes):
data = []
for index in tqdm(indexes):
image_id = index
ann_path = get_ann_path(index, annotation_dir=annotation_dir)
sen_path = get_sen_path(index, annotation_dir=annotation_dir)
anns = get_annotations(ann_path)
sens = get_sentence_data(sen_path)
for sen in sens:
pids = list(set(phrase['phrase_id'] for phrase in sen['phrases'] if phrase['phrase_id'] in anns['boxes']))
boxes_mapping: Dict[str, List[int]] = {}
boxes_filtered: List[List[int]] = []
for pid in pids:
v = anns['boxes'][pid]
mapping = []
for box in v:
mapping.append(len(boxes_filtered))
boxes_filtered.append(box)
boxes_mapping[pid] = mapping
boxes_seq: List[List[int]] = []
for phrase in sen['phrases']:
if not phrase['phrase_id'] in anns['boxes']:
continue
pid = phrase['phrase_id']
boxes_seq.append(boxes_mapping[pid])
sent = list(sen['sentence'].split())
for phrase in sen['phrases'][::-1]:
if not phrase['phrase_id'] in anns['boxes']:
continue
span = [phrase['first_word_index'], phrase['first_word_index'] + len(phrase['phrase'].split())]
sent[span[0]:span[1]] = [f"{PHRASE_ST_PLACEHOLDER}{' '.join(sent[span[0]:span[1]])}{PHRASE_ED_PLACEHOLDER}"]
sent_converted = " ".join(sent)
assert len(re.findall(PHRASE_ST_PLACEHOLDER, sent_converted)) \
== len(re.findall(PHRASE_ED_PLACEHOLDER, sent_converted)) \
== len(boxes_seq), f"error when parse: {sent_converted}, {boxes_seq}, {sen}, {anns}"
assert sent_converted.replace(PHRASE_ST_PLACEHOLDER, "").replace(PHRASE_ED_PLACEHOLDER, "") == sen['sentence']
item = {
'id': len(data),
'image_id': image_id,
'boxes': boxes_filtered,
'sentence': sent_converted,
'boxes_seq': boxes_seq,
}
data.append(item)
return data
if __name__ == '__main__':
filenames = [
r'D:\home\dataset\flickr30kentities\train.txt',
r'D:\home\dataset\flickr30kentities\val.txt',
r'D:\home\dataset\flickr30kentities\test.txt',
]
for filename in filenames:
annotation_dir = r'D:\home\dataset\flickr30kentities'
indexes = [line.strip() for line in open(filename, 'r', encoding='utf8')]
flatten_annotation(annotation_dir, indexes)