Hisab Cloud
Upload folder using huggingface_hub
45e92bd verified
# Copyright (c) OpenMMLab. All rights reserved.
# Partly adopted from https://github.com/GT-Vision-Lab/VQA
# Copyright (c) 2014, Aishwarya Agrawal
import re
from vlmeval.smp import *
from typing import Optional
from functools import partial
def _process_digit_article(inText):
outText = []
tempText = inText.lower().split()
articles = ['a', 'an', 'the']
manualMap = {
'none': '0',
'zero': '0',
'one': '1',
'two': '2',
'three': '3',
'four': '4',
'five': '5',
'six': '6',
'seven': '7',
'eight': '8',
'nine': '9',
'ten': '10',
}
contractions = {
'aint': "ain't",
'arent': "aren't",
'cant': "can't",
'couldve': "could've",
'couldnt': "couldn't",
"couldn'tve": "couldn't've",
"couldnt've": "couldn't've",
'didnt': "didn't",
'doesnt': "doesn't",
'dont': "don't",
'hadnt': "hadn't",
"hadnt've": "hadn't've",
"hadn'tve": "hadn't've",
'hasnt': "hasn't",
'havent': "haven't",
'hed': "he'd",
"hed've": "he'd've",
"he'dve": "he'd've",
'hes': "he's",
'howd': "how'd",
'howll': "how'll",
'hows': "how's",
"Id've": "I'd've",
"I'dve": "I'd've",
'Im': "I'm",
'Ive': "I've",
'isnt': "isn't",
'itd': "it'd",
"itd've": "it'd've",
"it'dve": "it'd've",
'itll': "it'll",
"let's": "let's",
'maam': "ma'am",
'mightnt': "mightn't",
"mightnt've": "mightn't've",
"mightn'tve": "mightn't've",
'mightve': "might've",
'mustnt': "mustn't",
'mustve': "must've",
'neednt': "needn't",
'notve': "not've",
'oclock': "o'clock",
'oughtnt': "oughtn't",
"ow's'at": "'ow's'at",
"'ows'at": "'ow's'at",
"'ow'sat": "'ow's'at",
'shant': "shan't",
"shed've": "she'd've",
"she'dve": "she'd've",
"she's": "she's",
'shouldve': "should've",
'shouldnt': "shouldn't",
"shouldnt've": "shouldn't've",
"shouldn'tve": "shouldn't've",
"somebody'd": 'somebodyd',
"somebodyd've": "somebody'd've",
"somebody'dve": "somebody'd've",
'somebodyll': "somebody'll",
'somebodys': "somebody's",
'someoned': "someone'd",
"someoned've": "someone'd've",
"someone'dve": "someone'd've",
'someonell': "someone'll",
'someones': "someone's",
'somethingd': "something'd",
"somethingd've": "something'd've",
"something'dve": "something'd've",
'somethingll': "something'll",
'thats': "that's",
'thered': "there'd",
"thered've": "there'd've",
"there'dve": "there'd've",
'therere': "there're",
'theres': "there's",
'theyd': "they'd",
"theyd've": "they'd've",
"they'dve": "they'd've",
'theyll': "they'll",
'theyre': "they're",
'theyve': "they've",
'twas': "'twas",
'wasnt': "wasn't",
"wed've": "we'd've",
"we'dve": "we'd've",
'weve': "we've",
'werent': "weren't",
'whatll': "what'll",
'whatre': "what're",
'whats': "what's",
'whatve': "what've",
'whens': "when's",
'whered': "where'd",
'wheres': "where's",
'whereve': "where've",
'whod': "who'd",
"whod've": "who'd've",
"who'dve": "who'd've",
'wholl': "who'll",
'whos': "who's",
'whove': "who've",
'whyll': "why'll",
'whyre': "why're",
'whys': "why's",
'wont': "won't",
'wouldve': "would've",
'wouldnt': "wouldn't",
"wouldnt've": "wouldn't've",
"wouldn'tve": "wouldn't've",
'yall': "y'all",
"yall'll": "y'all'll",
"y'allll": "y'all'll",
"yall'd've": "y'all'd've",
"y'alld've": "y'all'd've",
"y'all'dve": "y'all'd've",
'youd': "you'd",
"youd've": "you'd've",
"you'dve": "you'd've",
'youll': "you'll",
'youre': "you're",
'youve': "you've",
}
for word in tempText:
word = manualMap.setdefault(word, word)
if word not in articles:
outText.append(word)
for wordId, word in enumerate(outText):
if word in contractions:
outText[wordId] = contractions[word]
outText = ' '.join(outText)
return outText
def hit_calculate(result, dataset_name, anls_threshold=0.5):
if listinstr(['TextVQA'], dataset_name):
return [np.mean(x['match']) for x in result]
elif listinstr(['DocVQA', 'InfoVQA'], dataset_name):
# return [1 - np.min(x['match']) >= anls_threshold for x in result]
return [0.0 if 1 - np.min(x['match']) < anls_threshold else 1 - np.min(x['match']) for x in result]
elif listinstr(['ChartQA', 'OCRVQA'], dataset_name):
return [np.max(x['match']) for x in result]
else: # default using vqa_score to calculate score
return [np.mean(x['match']) for x in result]
# https://github.com/google-research/pix2struct/blob/main/pix2struct/metrics.py#L81
def relaxed_correctness(target: str,
prediction: str,
max_relative_change: float = 0.05) -> bool:
"""Calculates relaxed correctness.
The correctness tolerates certain error ratio defined by max_relative_change.
See https://arxiv.org/pdf/2203.10244.pdf, end of section 5.1:
“Following Methani et al. (2020), we use a relaxed accuracy measure for the
numeric answers to allow a minor inaccuracy that may result from the automatic
data extraction process. We consider an answer to be correct if it is within
5% of the gold answer. For non-numeric answers, we still need an exact match
to consider an answer to be correct.”
Args:
target: Target string.
prediction: Predicted string.
max_relative_change: Maximum relative change.
Returns:
Whether the prediction was correct given the specified tolerance.
"""
def _to_float(text: str) -> Optional[float]:
try:
if text.endswith('%'):
# Convert percentages to floats.
return float(text.rstrip('%')) / 100.0
else:
return float(text)
except ValueError:
return None
prediction = str(prediction)
target = str(target)
prediction_float = _to_float(prediction)
target_float = _to_float(target)
if prediction_float is not None and target_float:
relative_change = abs(prediction_float - target_float) / abs(target_float)
return relative_change <= max_relative_change
else:
return prediction.lower() == target.lower()
def levenshtein_distance(s1, s2):
if len(s1) > len(s2):
s1, s2 = s2, s1
distances = range(len(s1) + 1)
for i2, c2 in enumerate(s2):
distances_ = [i2 + 1]
for i1, c1 in enumerate(s1):
if c1 == c2:
distances_.append(distances[i1])
else:
distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1])))
distances = distances_
return distances[-1]
def anls_compute(groundtruth, prediction):
gt_answer = ' '.join(groundtruth.strip().lower().split())
det_answer = ' '.join(prediction.strip().lower().split())
dist = levenshtein_distance(gt_answer, det_answer)
length = max(len(groundtruth.upper()), len(prediction.upper()))
values = 0.0 if length == 0 else float(dist) / float(length)
return values
def process_answer(answer):
answer = answer.replace('\n', ' ')
answer = answer.replace('\t', ' ')
answer = answer.strip()
answer = process_punctuation(answer)
answer = _process_digit_article(answer)
return answer
def process_line(line, method='vqa_score'):
ret = {}
if istype(line['answer'], list):
answers = eval(line['answer'])
else:
answers = [line['answer']]
if method == 'vqa_score':
ret['gt'] = [process_answer(x) for x in answers]
ret['pred'] = process_answer(line['prediction'])
ret['match'] = []
for current_idx, gtAnsDatum in enumerate(ret['gt']):
otherGTAns = [
item for ret_gt_idx, item in enumerate(ret['gt'])
if ret_gt_idx != current_idx
]
matchingAns = [
item for item in otherGTAns if item == ret['pred']
]
acc = min(1, float(len(matchingAns)) / 3)
ret['match'].append(acc)
elif method == 'anls':
ret['gt'] = answers
ret['pred'] = line['prediction']
ret['match'] = [anls_compute(x, ret['pred']) for x in ret['gt']]
elif method == 'relaxed_accuracy':
ret['gt'] = answers
ret['pred'] = line['prediction'].strip()
ret['match'] = [relaxed_correctness(ret['pred'], x) for x in ret['gt']]
elif method == 'accuracy':
ret['gt'] = answers
ret['pred'] = line['prediction'].strip()
ret['match'] = [(1.0 if (x.strip().lower() == ret['pred'].strip().lower()) else 0.0) for x in ret['gt']]
else: # default using vqa_score to calculate score
ret['gt'] = [process_answer(x) for x in answers]
ret['pred'] = process_answer(line['prediction'])
ret['match'] = [x == ret['pred'] for x in ret['gt']]
return ret
def VQAEval(eval_file, dataset_name, **kwargs):
logger = get_logger('Evaluation')
data = load(eval_file)
assert 'answer' in data and 'prediction' in data
data['prediction'] = [str(x) for x in data['prediction']]
data['answer'] = [str(x) for x in data['answer']]
lt = len(data)
pool = mp.Pool(16)
lines = [data.iloc[i] for i in range(lt)]
if listinstr(['TextVQA'], dataset_name):
res = pool.map(partial(process_line, method='vqa_score'), lines)
elif listinstr(['ChartQA'], dataset_name):
res = pool.map(partial(process_line, method='relaxed_accuracy'), lines)
elif listinstr(['OCRVQA'], dataset_name):
res = pool.map(partial(process_line, method='accuracy'), lines)
elif listinstr(['DocVQA', 'InfoVQA'], dataset_name):
res = pool.map(partial(process_line, method='anls'), lines)
else: # default using vqa_score to calculate score
res = pool.map(process_line, lines)
# [np.mean(x['match']) >= full_score_weight for x in res]
hit = hit_calculate(res, dataset_name)
ret = dict()
if 'split' in data:
splits = set(data['split'])
for sp in splits:
sub = [r for l, r in zip(lines, res) if l['split'] == sp]
# [np.mean(x['match']) >= full_score_weight for x in sub]
hit = hit_calculate(sub, dataset_name)
ret[sp] = np.mean(hit) * 100
sub = [r for l, r in zip(lines, res)]
hit = hit_calculate(sub, dataset_name)
ret['Overall'] = np.mean(hit) * 100
else:
ret['Overall'] = np.mean(hit) * 100
if 'category' in data:
cates = list(set(data['category']))
cates.sort()
for c in cates:
sub = [r for l, r in zip(lines, res) if l['category'] == c]
# [np.mean(x['match']) >= full_score_weight for x in sub]
hit = hit_calculate(sub, dataset_name)
ret[c] = np.mean(hit) * 100
ret = d2df(ret)
ret.round(2)
suffix = eval_file.split('.')[-1]
result_file = eval_file.replace(f'.{suffix}', '_acc.csv')
logger.info(f'VQA Eval Finished. Saved to {result_file}. ')
logger.info(ret)
dump(ret, result_file)