import os import re import xml.etree.ElementTree as ET from typing import Dict, List from tqdm import tqdm def get_sentence_data(fn): """ Parses a sentence file from the Flickr30K Entities dataset input: fn - full file path to the sentence file to parse output: a list of dictionaries for each sentence with the following fields: sentence - the original sentence phrases - a list of dictionaries for each phrase with the following fields: phrase - the text of the annotated phrase first_word_index - the position of the first word of the phrase in the sentence phrase_id - an identifier for this phrase phrase_type - a list of the coarse categories this phrase belongs to """ with open(fn, 'r', encoding='utf8') as f: sentences = f.read().split('\n') annotations = [] for sentence in sentences: if not sentence: continue first_word = [] phrases = [] phrase_id = [] phrase_type = [] words = [] current_phrase = [] add_to_phrase = False for token in sentence.split(): if add_to_phrase: if token[-1] == ']': add_to_phrase = False token = token[:-1] current_phrase.append(token) phrases.append(' '.join(current_phrase)) current_phrase = [] else: current_phrase.append(token) words.append(token) else: if token[0] == '[': add_to_phrase = True first_word.append(len(words)) parts = token.split('/') phrase_id.append(parts[1][3:]) phrase_type.append(parts[2:]) else: words.append(token) sentence_data = {'sentence': ' '.join(words), 'phrases': []} for index, phrase, p_id, p_type in zip(first_word, phrases, phrase_id, phrase_type): sentence_data['phrases'].append({'first_word_index': index, 'phrase': phrase, 'phrase_id': p_id, 'phrase_type': p_type}) annotations.append(sentence_data) return annotations def get_annotations(fn): """ Parses the xml files in the Flickr30K Entities dataset input: fn - full file path to the annotations file to parse output: dictionary with the following fields: scene - list of identifiers which were annotated as pertaining to the whole scene nobox - list of identifiers which were annotated as not being visible in the image boxes - a dictionary where the fields are identifiers and the values are its list of boxes in the [xmin ymin xmax ymax] format """ tree = ET.parse(fn) root = tree.getroot() size_container = root.findall('size')[0] anno_info = {'boxes': {}, 'scene': [], 'nobox': []} for size_element in size_container: anno_info[size_element.tag] = int(size_element.text) for object_container in root.findall('object'): for names in object_container.findall('name'): box_id = names.text box_container = object_container.findall('bndbox') if len(box_container) > 0: if box_id not in anno_info['boxes']: anno_info['boxes'][box_id] = [] xmin = int(box_container[0].findall('xmin')[0].text) - 1 ymin = int(box_container[0].findall('ymin')[0].text) - 1 xmax = int(box_container[0].findall('xmax')[0].text) - 1 ymax = int(box_container[0].findall('ymax')[0].text) - 1 anno_info['boxes'][box_id].append([xmin, ymin, xmax, ymax]) else: nobndbox = int(object_container.findall('nobndbox')[0].text) if nobndbox > 0: anno_info['nobox'].append(box_id) scene = int(object_container.findall('scene')[0].text) if scene > 0: anno_info['scene'].append(box_id) return anno_info def get_ann_path(idx, *, annotation_dir=""): return os.path.join(annotation_dir, rf'Annotations/{idx}.xml') def get_sen_path(idx, *, annotation_dir=""): return os.path.join(annotation_dir, rf"Sentences/{idx}.txt") def get_img_path(idx, *, image_dir=""): return os.path.join(image_dir, rf'{idx}.jpg') PHRASE_ST_PLACEHOLDER = '' PHRASE_ED_PLACEHOLDER = '' def flatten_annotation(annotation_dir, indexes): data = [] for index in tqdm(indexes): image_id = index ann_path = get_ann_path(index, annotation_dir=annotation_dir) sen_path = get_sen_path(index, annotation_dir=annotation_dir) anns = get_annotations(ann_path) sens = get_sentence_data(sen_path) for sen in sens: pids = list(set(phrase['phrase_id'] for phrase in sen['phrases'] if phrase['phrase_id'] in anns['boxes'])) boxes_mapping: Dict[str, List[int]] = {} boxes_filtered: List[List[int]] = [] for pid in pids: v = anns['boxes'][pid] mapping = [] for box in v: mapping.append(len(boxes_filtered)) boxes_filtered.append(box) boxes_mapping[pid] = mapping boxes_seq: List[List[int]] = [] for phrase in sen['phrases']: if not phrase['phrase_id'] in anns['boxes']: continue pid = phrase['phrase_id'] boxes_seq.append(boxes_mapping[pid]) sent = list(sen['sentence'].split()) for phrase in sen['phrases'][::-1]: if not phrase['phrase_id'] in anns['boxes']: continue span = [phrase['first_word_index'], phrase['first_word_index'] + len(phrase['phrase'].split())] sent[span[0]:span[1]] = [f"{PHRASE_ST_PLACEHOLDER}{' '.join(sent[span[0]:span[1]])}{PHRASE_ED_PLACEHOLDER}"] sent_converted = " ".join(sent) assert len(re.findall(PHRASE_ST_PLACEHOLDER, sent_converted)) \ == len(re.findall(PHRASE_ED_PLACEHOLDER, sent_converted)) \ == len(boxes_seq), f"error when parse: {sent_converted}, {boxes_seq}, {sen}, {anns}" assert sent_converted.replace(PHRASE_ST_PLACEHOLDER, "").replace(PHRASE_ED_PLACEHOLDER, "") == sen['sentence'] item = { 'id': len(data), 'image_id': image_id, 'boxes': boxes_filtered, 'sentence': sent_converted, 'boxes_seq': boxes_seq, } data.append(item) return data if __name__ == '__main__': filenames = [ r'D:\home\dataset\flickr30kentities\train.txt', r'D:\home\dataset\flickr30kentities\val.txt', r'D:\home\dataset\flickr30kentities\test.txt', ] for filename in filenames: annotation_dir = r'D:\home\dataset\flickr30kentities' indexes = [line.strip() for line in open(filename, 'r', encoding='utf8')] flatten_annotation(annotation_dir, indexes)