Spaces:
Running
Running
import os | |
import json | |
import numpy as np | |
from datasets import load_dataset | |
from transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline | |
import torch | |
from sklearn.metrics import f1_score | |
import re | |
from collections import Counter | |
import string | |
from huggingface_hub import login | |
import gradio as gr | |
import pandas as pd | |
from datetime import datetime | |
def normalize_answer(s): | |
"""Normalize answer for evaluation""" | |
def remove_articles(text): | |
return re.sub(r'\b(a|an|the)\b', ' ', text) | |
def white_space_fix(text): | |
return ' '.join(text.split()) | |
def remove_punc(text): | |
exclude = set(string.punctuation) | |
return ''.join(ch for ch in text if ch not in exclude) | |
def lower(text): | |
return text.lower() | |
return white_space_fix(remove_articles(remove_punc(lower(s)))) | |
def f1_score_qa(prediction, ground_truth): | |
"""Calculate F1 score for QA""" | |
prediction_tokens = normalize_answer(prediction).split() | |
ground_truth_tokens = normalize_answer(ground_truth).split() | |
if len(prediction_tokens) == 0 or len(ground_truth_tokens) == 0: | |
return int(prediction_tokens == ground_truth_tokens) | |
common = Counter(prediction_tokens) & Counter(ground_truth_tokens) | |
num_same = sum(common.values()) | |
if num_same == 0: | |
return 0 | |
precision = 1.0 * num_same / len(prediction_tokens) | |
recall = 1.0 * num_same / len(ground_truth_tokens) | |
f1 = (2 * precision * recall) / (precision + recall) | |
return f1 | |
def exact_match_score(prediction, ground_truth): | |
"""Calculate exact match score""" | |
return normalize_answer(prediction) == normalize_answer(ground_truth) | |
def has_answer(answers): | |
"""Check if the question has any valid answers""" | |
if not answers or not answers.get("text"): | |
return False | |
answer_texts = answers["text"] if isinstance(answers["text"], list) else [answers["text"]] | |
return any(text.strip() for text in answer_texts) | |
def get_top_k_predictions(qa_pipeline, question, context, k=3): | |
"""Get top-k predictions from the model""" | |
# Get raw model outputs | |
inputs = qa_pipeline.tokenizer(question, context, return_tensors="pt", truncation=True, max_length=512) | |
with torch.no_grad(): | |
outputs = qa_pipeline.model(**inputs) | |
start_logits = outputs.start_logits | |
end_logits = outputs.end_logits | |
# Get top-k start and end positions | |
start_scores, start_indices = torch.topk(start_logits.flatten(), k) | |
end_scores, end_indices = torch.topk(end_logits.flatten(), k) | |
predictions = [] | |
# Generate all combinations of start and end positions | |
for start_idx in start_indices: | |
for end_idx in end_indices: | |
if start_idx <= end_idx: # Valid span | |
# Convert to answer text | |
input_ids = inputs["input_ids"][0] | |
answer_tokens = input_ids[start_idx:end_idx + 1] | |
answer_text = qa_pipeline.tokenizer.decode(answer_tokens, skip_special_tokens=True) | |
# Calculate combined score | |
start_score = start_logits[0][start_idx].item() | |
end_score = end_logits[0][end_idx].item() | |
combined_score = start_score + end_score | |
predictions.append({ | |
"answer": answer_text, | |
"score": combined_score, | |
"start": start_idx.item(), | |
"end": end_idx.item() | |
}) | |
# Sort by score and return top-k unique answers | |
predictions.sort(key=lambda x: x["score"], reverse=True) | |
unique_answers = [] | |
seen_answers = set() | |
for pred in predictions: | |
normalized_answer = normalize_answer(pred["answer"]) | |
if normalized_answer not in seen_answers and len(unique_answers) < k: | |
unique_answers.append(pred) | |
seen_answers.add(normalized_answer) | |
return unique_answers | |
def calculate_top_k_has_ans_f1(predictions, ground_truths, k=1): | |
"""Calculate Top-K Has Answer F1 score""" | |
f1_scores = [] | |
for preds, gt in zip(predictions, ground_truths): | |
if not has_answer(gt): | |
continue # Skip questions without answers | |
# Get ground truth text | |
gt_text = gt["text"][0] if isinstance(gt["text"], list) else gt["text"] | |
# Calculate F1 for top-k predictions | |
max_f1 = 0 | |
for i in range(min(k, len(preds))): | |
pred_text = preds[i]["answer"] | |
f1 = f1_score_qa(pred_text, gt_text) | |
max_f1 = max(max_f1, f1) | |
f1_scores.append(max_f1) | |
return np.mean(f1_scores) if f1_scores else 0 | |
def evaluate_model(): | |
# Authenticate with Hugging Face using the token | |
hf_token = os.getenv("EVAL_TOKEN") | |
if hf_token: | |
try: | |
login(token=hf_token) | |
print("β Authenticated with Hugging Face") | |
except Exception as e: | |
print(f"β Warning: Could not authenticate with HF token: {e}") | |
else: | |
print("β Warning: EVAL_TOKEN not found in environment variables") | |
print("Loading model and tokenizer...") | |
model_name = "AvocadoMuffin/roberta-cuad-qa-v2" | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token) | |
model = AutoModelForQuestionAnswering.from_pretrained(model_name, token=hf_token) | |
qa_pipeline = pipeline("question-answering", model=model, tokenizer=tokenizer) | |
print("β Model loaded successfully") | |
return qa_pipeline, hf_token | |
except Exception as e: | |
print(f"β Error loading model: {e}") | |
return None, None | |
def run_evaluation(num_samples, progress=gr.Progress()): | |
"""Run evaluation and return results for Gradio interface""" | |
# Load model | |
qa_pipeline, hf_token = evaluate_model() | |
if qa_pipeline is None: | |
return "β Failed to load model", pd.DataFrame(), None | |
progress(0.1, desc="Loading CUAD dataset...") | |
# Load dataset | |
try: | |
dataset = load_dataset("theatticusproject/cuad-qa", trust_remote_code=True, token=hf_token) | |
test_data = dataset["test"] | |
print(f"β Loaded CUAD-QA dataset with {len(test_data)} samples") | |
except Exception as e: | |
try: | |
dataset = load_dataset("cuad", split="test[:1000]", trust_remote_code=True, token=hf_token) | |
test_data = dataset | |
print(f"β Loaded CUAD dataset with {len(test_data)} samples") | |
except Exception as e2: | |
return f"β Error loading dataset: {e2}", pd.DataFrame(), None | |
# Limit samples | |
num_samples = min(num_samples, len(test_data)) | |
test_subset = test_data.select(range(num_samples)) | |
progress(0.2, desc=f"Starting evaluation on {num_samples} samples...") | |
# Initialize storage for predictions and ground truths | |
all_top_k_predictions = [] | |
all_ground_truths = [] | |
all_has_answer_flags = [] | |
# Storage for detailed results | |
detailed_results = [] | |
# Run evaluation | |
for i, example in enumerate(test_subset): | |
progress((0.2 + 0.6 * i / num_samples), desc=f"Processing sample {i+1}/{num_samples}") | |
try: | |
context = example["context"] | |
question = example["question"] | |
answers = example["answers"] | |
# Check if question has answers | |
has_ans = has_answer(answers) | |
all_has_answer_flags.append(has_ans) | |
all_ground_truths.append(answers) | |
# Get top-3 predictions | |
top_k_preds = get_top_k_predictions(qa_pipeline, question, context, k=3) | |
all_top_k_predictions.append(top_k_preds) | |
# Get ground truth for display | |
if has_ans: | |
ground_truth = answers["text"][0] if isinstance(answers["text"], list) else answers["text"] | |
else: | |
ground_truth = "[No Answer]" | |
# Calculate metrics for this sample | |
if has_ans and top_k_preds: | |
top1_f1 = f1_score_qa(top_k_preds[0]["answer"], ground_truth) | |
top3_f1 = max([f1_score_qa(pred["answer"], ground_truth) for pred in top_k_preds[:3]]) | |
em = exact_match_score(top_k_preds[0]["answer"], ground_truth) | |
else: | |
top1_f1 = 0 | |
top3_f1 = 0 | |
em = 0 | |
detailed_results.append({ | |
"Sample_ID": i+1, | |
"Question": question[:100] + "..." if len(question) > 100 else question, | |
"Has_Answer": has_ans, | |
"Top1_Prediction": top_k_preds[0]["answer"] if top_k_preds else "[No Prediction]", | |
"Top3_Predictions": " | ".join([p["answer"] for p in top_k_preds[:3]]), | |
"Ground_Truth": ground_truth, | |
"Top1_F1": round(top1_f1, 3), | |
"Top3_F1": round(top3_f1, 3), | |
"Exact_Match": em, | |
"Top1_Confidence": round(top_k_preds[0]["score"], 3) if top_k_preds else 0 | |
}) | |
except Exception as e: | |
print(f"Error processing sample {i}: {e}") | |
continue | |
progress(0.8, desc="Calculating final metrics...") | |
# Filter for questions with answers only | |
has_ans_predictions = [pred for pred, has_ans in zip(all_top_k_predictions, all_has_answer_flags) if has_ans] | |
has_ans_ground_truths = [gt for gt, has_ans in zip(all_ground_truths, all_has_answer_flags) if has_ans] | |
if len(has_ans_predictions) == 0: | |
return "β No samples with answers were found", pd.DataFrame(), None | |
# Calculate Top-K Has Answer F1 scores | |
top1_has_ans_f1 = calculate_top_k_has_ans_f1(has_ans_predictions, has_ans_ground_truths, k=1) * 100 | |
top3_has_ans_f1 = calculate_top_k_has_ans_f1(has_ans_predictions, has_ans_ground_truths, k=3) * 100 | |
# Calculate overall metrics | |
total_samples = len(detailed_results) | |
has_answer_samples = len(has_ans_predictions) | |
avg_exact_match = np.mean([r["Exact_Match"] for r in detailed_results]) * 100 | |
avg_top1_f1 = np.mean([r["Top1_F1"] for r in detailed_results if r["Has_Answer"]]) * 100 | |
avg_top3_f1 = np.mean([r["Top3_F1"] for r in detailed_results if r["Has_Answer"]]) * 100 | |
# Create results summary | |
results_summary = f""" | |
# π CUAD Model Evaluation Results | |
## π― Model Performance | |
- **Model**: AvocadoMuffin/roberta-cuad-qa-v3 | |
- **Dataset**: CUAD (Contract Understanding Atticus Dataset) | |
- **Total Samples**: {total_samples} | |
- **Samples with Answers**: {has_answer_samples} | |
- **Evaluation Date**: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | |
## π Key Metrics (Industry Standard) | |
- **Top 1 Has Ans F1**: {top1_has_ans_f1:.2f}% | |
- **Top 3 Has Ans F1**: {top3_has_ans_f1:.2f}% | |
## π Additional Metrics | |
- **Exact Match Score**: {avg_exact_match:.2f}% | |
- **Average Top-1 F1**: {avg_top1_f1:.2f}% | |
- **Average Top-3 F1**: {avg_top3_f1:.2f}% | |
## π Performance Breakdown | |
- **High Confidence Predictions (>0.8)**: {len([r for r in detailed_results if r['Top1_Confidence'] > 0.8])} ({len([r for r in detailed_results if r['Top1_Confidence'] > 0.8])/total_samples*100:.1f}%) | |
- **Perfect Matches**: {len([r for r in detailed_results if r['Exact_Match'] == 1])} ({len([r for r in detailed_results if r['Exact_Match'] == 1])/total_samples*100:.1f}%) | |
- **High F1 Scores (>0.8)**: {len([r for r in detailed_results if r['Top1_F1'] > 0.8])} ({len([r for r in detailed_results if r['Top1_F1'] > 0.8])/has_answer_samples*100:.1f}%) | |
## π Comparison with Benchmarks | |
Your model's **Top 1 Has Ans F1** of {top1_has_ans_f1:.2f}% can be compared to: | |
- gustavhartz/roberta-base-cuad-finetuned: 85.68% | |
- Rakib/roberta-base-on-cuad: 81.26% | |
""" | |
# Create detailed results DataFrame | |
df = pd.DataFrame(detailed_results) | |
# Save results to file | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
results_file = f"cuad_evaluation_results_{timestamp}.json" | |
complete_results = { | |
"model_name": "AvocadoMuffin/roberta-cuad-qa-v3", | |
"dataset": "cuad", | |
"total_samples": total_samples, | |
"has_answer_samples": has_answer_samples, | |
"top1_has_ans_f1": top1_has_ans_f1, | |
"top3_has_ans_f1": top3_has_ans_f1, | |
"exact_match_score": avg_exact_match, | |
"avg_top1_f1": avg_top1_f1, | |
"avg_top3_f1": avg_top3_f1, | |
"evaluation_date": datetime.now().isoformat(), | |
"detailed_results": detailed_results | |
} | |
try: | |
with open(results_file, "w") as f: | |
json.dump(complete_results, f, indent=2) | |
print(f"β Results saved to {results_file}") | |
except Exception as e: | |
print(f"β Warning: Could not save results file: {e}") | |
results_file = None | |
progress(1.0, desc="β Evaluation completed!") | |
return results_summary, df, results_file | |
def create_gradio_interface(): | |
"""Create Gradio interface for CUAD evaluation""" | |
with gr.Blocks(title="CUAD Model Evaluator", theme=gr.themes.Soft()) as demo: | |
gr.HTML(""" | |
<div style="text-align: center; padding: 20px;"> | |
<h1>ποΈ CUAD Model Evaluation Dashboard</h1> | |
<p>Evaluate your CUAD (Contract Understanding Atticus Dataset) Question Answering model</p> | |
<p><strong>Model:</strong> AvocadoMuffin/roberta-cuad-qa-v3</p> | |
<p><em>Now with industry-standard Top-K Has Answer F1 metrics!</em></p> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.HTML("<h3>βοΈ Evaluation Settings</h3>") | |
num_samples = gr.Slider( | |
minimum=10, | |
maximum=500, | |
value=100, | |
step=10, | |
label="Number of samples to evaluate", | |
info="Choose between 10-500 samples (more samples = more accurate but slower)" | |
) | |
evaluate_btn = gr.Button( | |
"π Start Evaluation", | |
variant="primary", | |
size="lg" | |
) | |
gr.HTML(""" | |
<div style="margin-top: 20px; padding: 15px; background-color: #f0f0f0; border-radius: 8px;"> | |
<h4>π Evaluation Metrics:</h4> | |
<ul> | |
<li><strong>Top 1 Has Ans F1</strong>: F1 score for single best answer (industry standard)</li> | |
<li><strong>Top 3 Has Ans F1</strong>: F1 score allowing up to 3 predictions</li> | |
<li><strong>Exact Match</strong>: Percentage of perfect predictions</li> | |
<li><strong>Confidence</strong>: Model's confidence in predictions</li> | |
</ul> | |
<p><em>Note: "Has Ans" metrics only consider questions that have valid answers.</em></p> | |
</div> | |
""") | |
with gr.Column(scale=2): | |
gr.HTML("<h3>π Results</h3>") | |
results_summary = gr.Markdown( | |
value="Click 'π Start Evaluation' to begin...", | |
label="Evaluation Summary" | |
) | |
gr.HTML("<hr>") | |
with gr.Row(): | |
gr.HTML("<h3>π Detailed Results</h3>") | |
with gr.Row(): | |
detailed_results = gr.Dataframe( | |
label="Sample-by-Sample Results", | |
interactive=False, | |
wrap=True | |
) | |
with gr.Row(): | |
download_file = gr.File( | |
label="π₯ Download Complete Results (JSON)", | |
visible=False | |
) | |
# Event handlers | |
def handle_evaluation(num_samples): | |
summary, df, file_path = run_evaluation(num_samples) | |
if file_path and os.path.exists(file_path): | |
return summary, df, gr.update(visible=True, value=file_path) | |
else: | |
return summary, df, gr.update(visible=False) | |
evaluate_btn.click( | |
fn=handle_evaluation, | |
inputs=[num_samples], | |
outputs=[results_summary, detailed_results, download_file], | |
show_progress=True | |
) | |
# Footer | |
gr.HTML(""" | |
<div style="text-align: center; margin-top: 30px; padding: 20px; color: #666;"> | |
<p>π€ Powered by Hugging Face Transformers & Gradio</p> | |
<p>π CUAD Dataset by The Atticus Project</p> | |
<p>π Now with industry-standard Top-K Has Answer F1 metrics</p> | |
</div> | |
""") | |
return demo | |
if __name__ == "__main__": | |
print("CUAD Model Evaluation with Top-K Has Answer F1 Metrics") | |
print("=" * 60) | |
# Check if CUDA is available | |
if torch.cuda.is_available(): | |
print(f"β CUDA available: {torch.cuda.get_device_name(0)}") | |
else: | |
print("! Running on CPU") | |
# Create and launch Gradio interface | |
demo = create_gradio_interface() | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True, | |
debug=True | |
) |