Spaces:
Running
Running
import argparse | |
import json | |
import os | |
import re | |
import sys | |
import evaluate | |
import numpy as np | |
import pandas as pd | |
from tqdm import tqdm | |
from libra.eval import temporal_f1_score | |
# Pre-load metrics | |
bertscore_metric = evaluate.load("bertscore") | |
rouge_metric = evaluate.load('rouge') | |
bleu_metric = evaluate.load("bleu") | |
meteor_metric = evaluate.load('meteor') | |
def clean_text(text: str) -> str: | |
""" | |
Perform basic cleanup of text by removing newlines, dashes, and some special patterns. | |
""" | |
text = re.sub(r'\n+', ' ', text) | |
text = re.sub(r'[_-]+', ' ', text) | |
text = re.sub(r'\(___, __, __\)', '', text) | |
text = re.sub(r'---, ---, ---', '', text) | |
text = re.sub(r'\(__, __, ___\)', '', text) | |
text = re.sub(r'[_-]+', ' ', text) | |
text = re.sub(r'[^\w\s.,:;()\-]', '', text) | |
text = re.sub(r'\s{2,}', ' ', text).strip() | |
return text | |
def load_json(path: str) -> list: | |
""" | |
Load a JSONL file and return a list of parsed objects. | |
Each line should be a valid JSON object. | |
""" | |
content = [] | |
with open(path, 'r', encoding='utf-8') as file: | |
for line in file: | |
content.append(json.loads(line)) | |
return content | |
def extract_sections(data: list) -> list: | |
""" | |
Extract relevant text sections (e.g., findings, impression, text) | |
from a list of JSON objects and clean each item. | |
""" | |
sections_list = [] | |
for item in data: | |
if 'reference' in item: | |
cleaned_text = clean_text(item['reference'].lower()) | |
sections_list.append(cleaned_text) | |
elif 'findings' in item: | |
cleaned_text = clean_text(item['findings'].lower()) | |
sections_list.append(cleaned_text) | |
elif 'impression' in item: | |
cleaned_text = clean_text(item['impression'].lower()) | |
sections_list.append(cleaned_text) | |
elif 'text' in item: | |
cleaned_text = clean_text(item['text'].lower()) | |
sections_list.append(cleaned_text) | |
return sections_list | |
def append_results_to_csv(results: dict, model_name: str, csv_path: str) -> None: | |
""" | |
Convert the results dictionary into a DataFrame and append it to a CSV file. | |
Inserts 'Model Name' at the first column if it doesn't exist. | |
Creates a new CSV if it doesn't exist, otherwise appends. | |
""" | |
df = pd.DataFrame([results]) | |
df.insert(0, "Model Name", model_name) | |
header = not os.path.isfile(csv_path) # If file doesn't exist, write the header | |
df.to_csv(csv_path, mode='a', header=header, index=False) | |
def evaluate_report( | |
references: str, | |
predictions: str, | |
) -> dict: | |
""" | |
Evaluate the model outputs against reference texts using multiple metrics: | |
- BLEU (1–4) | |
- METEOR | |
- ROUGE-L | |
- BERTScore (F1) | |
- Temporal F1 | |
Returns a dictionary of computed metrics. | |
""" | |
# Load data | |
references_data = load_json(references) | |
predictions_data = load_json(predictions) | |
# Basic validation: question_id alignment | |
gt_ids = [item['question_id'] for item in references_data] | |
pred_ids = [item['question_id'] for item in predictions_data] | |
assert gt_ids == pred_ids, "Please make sure predictions and references are perfectly matched by question_id." | |
# Extract text sections | |
references_list = extract_sections(references_data) | |
predictions_list = extract_sections(predictions_data) | |
# Calculate metrics | |
with tqdm(total=8, desc="Calculating metrics") as pbar: | |
# BLEU-1 | |
bleu1 = bleu_metric.compute( | |
predictions=predictions_list, | |
references=references_list, | |
max_order=1 | |
)['bleu'] | |
print(f"BLEU-1 Score: {round(bleu1 * 100, 2)}") | |
pbar.update(1) | |
# BLEU-2 | |
bleu2 = bleu_metric.compute( | |
predictions=predictions_list, | |
references=references_list, | |
max_order=2 | |
)['bleu'] | |
print(f"BLEU-2 Score: {round(bleu2 * 100, 2)}") | |
pbar.update(1) | |
# BLEU-3 | |
bleu3 = bleu_metric.compute( | |
predictions=predictions_list, | |
references=references_list, | |
max_order=3 | |
)['bleu'] | |
print(f"BLEU-3 Score: {round(bleu3 * 100, 2)}") | |
pbar.update(1) | |
# BLEU-4 | |
bleu4 = bleu_metric.compute( | |
predictions=predictions_list, | |
references=references_list, | |
max_order=4 | |
)['bleu'] | |
print(f"BLEU-4 Score: {round(bleu4 * 100, 2)}") | |
pbar.update(1) | |
# ROUGE-L | |
rougel = rouge_metric.compute( | |
predictions=predictions_list, | |
references=references_list | |
)['rougeL'] | |
print(f"ROUGE-L Score: {round(rougel * 100, 2)}") | |
pbar.update(1) | |
# METEOR | |
meteor = meteor_metric.compute( | |
predictions=predictions_list, | |
references=references_list | |
)['meteor'] | |
print(f"METEOR Score: {round(meteor * 100, 2)}") | |
pbar.update(1) | |
# BERTScore (mean F1) | |
bert_f1 = bertscore_metric.compute( | |
predictions=predictions_list, | |
references=references_list, | |
model_type='distilbert-base-uncased' | |
)['f1'] | |
bert_score = float(np.mean(bert_f1)) | |
print(f"Bert Score: {round(bert_score * 100, 2)}") | |
pbar.update(1) | |
# Temporal F1 | |
tem_f1 = temporal_f1_score( | |
predictions=predictions_list, | |
references=references_list | |
)["f1"] | |
print(f"Temporal F1 Score: {round(tem_f1 * 100, 2)}") | |
pbar.update(1) | |
return { | |
'BLEU1': round(bleu1 * 100, 2), | |
'BLEU2': round(bleu2 * 100, 2), | |
'BLEU3': round(bleu3 * 100, 2), | |
'BLEU4': round(bleu4 * 100, 2), | |
'METEOR': round(meteor * 100, 2), | |
'ROUGE-L': round(rougel * 100, 2), | |
'Bert_score': round(bert_score * 100, 2), | |
'Temporal_entity_score': round(tem_f1 * 100, 2) | |
} | |
def main(): | |
""" | |
Parse arguments, compute evaluation metrics, and append the results to a CSV file. | |
""" | |
parser = argparse.ArgumentParser( | |
description='Evaluation for Libra Generated Outputs' | |
) | |
parser.add_argument('--references', type=str, required=True, | |
help='Path to the ground truth file (JSONL).') | |
parser.add_argument('--predictions', type=str, required=True, | |
help='Path to the prediction file (JSONL).') | |
parser.add_argument('--model-name', type=str, required=True, | |
help='Unique model identifier for tracking in the results CSV.') | |
parser.add_argument('--save-to-csv', type=str, required=True, | |
help='Path of the CSV file where results will be saved/appended.') | |
args = parser.parse_args() | |
# Calculate metrics | |
scores_results = evaluate_report( | |
references=args.references, | |
predictions=args.predictions | |
) | |
# Append results to CSV | |
append_results_to_csv(scores_results, args.model_name, args.save_to_csv) | |
if __name__ == "__main__": | |
main() |