File size: 7,634 Bytes
3e1d9f3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import os
import re
import xml.etree.ElementTree as ET
from typing import Dict, List

from tqdm import tqdm


def get_sentence_data(fn):
    """
    Parses a sentence file from the Flickr30K Entities dataset

    input:
      fn - full file path to the sentence file to parse
    
    output:
      a list of dictionaries for each sentence with the following fields:
          sentence - the original sentence
          phrases - a list of dictionaries for each phrase with the
                    following fields:
                      phrase - the text of the annotated phrase
                      first_word_index - the position of the first word of
                                         the phrase in the sentence
                      phrase_id - an identifier for this phrase
                      phrase_type - a list of the coarse categories this 
                                    phrase belongs to

    """
    with open(fn, 'r', encoding='utf8') as f:
        sentences = f.read().split('\n')

    annotations = []
    for sentence in sentences:
        if not sentence:
            continue

        first_word = []
        phrases = []
        phrase_id = []
        phrase_type = []
        words = []
        current_phrase = []
        add_to_phrase = False
        for token in sentence.split():
            if add_to_phrase:
                if token[-1] == ']':
                    add_to_phrase = False
                    token = token[:-1]
                    current_phrase.append(token)
                    phrases.append(' '.join(current_phrase))
                    current_phrase = []
                else:
                    current_phrase.append(token)

                words.append(token)
            else:
                if token[0] == '[':
                    add_to_phrase = True
                    first_word.append(len(words))
                    parts = token.split('/')
                    phrase_id.append(parts[1][3:])
                    phrase_type.append(parts[2:])
                else:
                    words.append(token)

        sentence_data = {'sentence': ' '.join(words), 'phrases': []}
        for index, phrase, p_id, p_type in zip(first_word, phrases, phrase_id, phrase_type):
            sentence_data['phrases'].append({'first_word_index': index,
                                             'phrase': phrase,
                                             'phrase_id': p_id,
                                             'phrase_type': p_type})

        annotations.append(sentence_data)

    return annotations


def get_annotations(fn):
    """
    Parses the xml files in the Flickr30K Entities dataset

    input:
      fn - full file path to the annotations file to parse

    output:
      dictionary with the following fields:
          scene - list of identifiers which were annotated as
                  pertaining to the whole scene
          nobox - list of identifiers which were annotated as
                  not being visible in the image
          boxes - a dictionary where the fields are identifiers
                  and the values are its list of boxes in the 
                  [xmin ymin xmax ymax] format
    """
    tree = ET.parse(fn)
    root = tree.getroot()
    size_container = root.findall('size')[0]
    anno_info = {'boxes': {}, 'scene': [], 'nobox': []}
    for size_element in size_container:
        anno_info[size_element.tag] = int(size_element.text)

    for object_container in root.findall('object'):
        for names in object_container.findall('name'):
            box_id = names.text
            box_container = object_container.findall('bndbox')
            if len(box_container) > 0:
                if box_id not in anno_info['boxes']:
                    anno_info['boxes'][box_id] = []
                xmin = int(box_container[0].findall('xmin')[0].text) - 1
                ymin = int(box_container[0].findall('ymin')[0].text) - 1
                xmax = int(box_container[0].findall('xmax')[0].text) - 1
                ymax = int(box_container[0].findall('ymax')[0].text) - 1
                anno_info['boxes'][box_id].append([xmin, ymin, xmax, ymax])
            else:
                nobndbox = int(object_container.findall('nobndbox')[0].text)
                if nobndbox > 0:
                    anno_info['nobox'].append(box_id)

                scene = int(object_container.findall('scene')[0].text)
                if scene > 0:
                    anno_info['scene'].append(box_id)

    return anno_info


def get_ann_path(idx, *, annotation_dir=""):
    return os.path.join(annotation_dir, rf'Annotations/{idx}.xml')


def get_sen_path(idx, *, annotation_dir=""):
    return os.path.join(annotation_dir, rf"Sentences/{idx}.txt")


def get_img_path(idx, *, image_dir=""):
    return os.path.join(image_dir, rf'{idx}.jpg')


PHRASE_ST_PLACEHOLDER = '<ph_st>'
PHRASE_ED_PLACEHOLDER = '<ph_ed>'


def flatten_annotation(annotation_dir, indexes):
    data = []

    for index in tqdm(indexes):
        image_id = index
        ann_path = get_ann_path(index, annotation_dir=annotation_dir)
        sen_path = get_sen_path(index, annotation_dir=annotation_dir)
        anns = get_annotations(ann_path)
        sens = get_sentence_data(sen_path)

        for sen in sens:
            pids = list(set(phrase['phrase_id'] for phrase in sen['phrases'] if phrase['phrase_id'] in anns['boxes']))
            boxes_mapping: Dict[str, List[int]] = {}
            boxes_filtered: List[List[int]] = []
            for pid in pids:
                v = anns['boxes'][pid]
                mapping = []
                for box in v:
                    mapping.append(len(boxes_filtered))
                    boxes_filtered.append(box)
                boxes_mapping[pid] = mapping

            boxes_seq: List[List[int]] = []
            for phrase in sen['phrases']:
                if not phrase['phrase_id'] in anns['boxes']:
                    continue
                pid = phrase['phrase_id']
                boxes_seq.append(boxes_mapping[pid])

            sent = list(sen['sentence'].split())
            for phrase in sen['phrases'][::-1]:
                if not phrase['phrase_id'] in anns['boxes']:
                    continue
                span = [phrase['first_word_index'], phrase['first_word_index'] + len(phrase['phrase'].split())]
                sent[span[0]:span[1]] = [f"{PHRASE_ST_PLACEHOLDER}{' '.join(sent[span[0]:span[1]])}{PHRASE_ED_PLACEHOLDER}"]
            sent_converted = " ".join(sent)

            assert len(re.findall(PHRASE_ST_PLACEHOLDER, sent_converted)) \
                   == len(re.findall(PHRASE_ED_PLACEHOLDER, sent_converted)) \
                   == len(boxes_seq), f"error when parse: {sent_converted}, {boxes_seq}, {sen}, {anns}"
            assert sent_converted.replace(PHRASE_ST_PLACEHOLDER, "").replace(PHRASE_ED_PLACEHOLDER, "") == sen['sentence']

            item = {
                'id': len(data),
                'image_id': image_id,
                'boxes': boxes_filtered,
                'sentence': sent_converted,
                'boxes_seq': boxes_seq,
            }
            data.append(item)

    return data


if __name__ == '__main__':
    filenames = [
        r'D:\home\dataset\flickr30kentities\train.txt',
        r'D:\home\dataset\flickr30kentities\val.txt',
        r'D:\home\dataset\flickr30kentities\test.txt',
    ]
    for filename in filenames:
        annotation_dir = r'D:\home\dataset\flickr30kentities'
        indexes = [line.strip() for line in open(filename, 'r', encoding='utf8')]
        flatten_annotation(annotation_dir, indexes)