import os import json import numpy as np from datasets import load_dataset from transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline import torch from sklearn.metrics import f1_score import re from collections import Counter import string from huggingface_hub import login import gradio as gr import pandas as pd from datetime import datetime def normalize_answer(s): """Normalize answer for evaluation""" def remove_articles(text): return re.sub(r'\b(a|an|the)\b', ' ', text) def white_space_fix(text): return ' '.join(text.split()) def remove_punc(text): exclude = set(string.punctuation) return ''.join(ch for ch in text if ch not in exclude) def lower(text): return text.lower() return white_space_fix(remove_articles(remove_punc(lower(s)))) def f1_score_qa(prediction, ground_truth): """Calculate F1 score for QA""" prediction_tokens = normalize_answer(prediction).split() ground_truth_tokens = normalize_answer(ground_truth).split() if len(prediction_tokens) == 0 or len(ground_truth_tokens) == 0: return int(prediction_tokens == ground_truth_tokens) common = Counter(prediction_tokens) & Counter(ground_truth_tokens) num_same = sum(common.values()) if num_same == 0: return 0 precision = 1.0 * num_same / len(prediction_tokens) recall = 1.0 * num_same / len(ground_truth_tokens) f1 = (2 * precision * recall) / (precision + recall) return f1 def exact_match_score(prediction, ground_truth): """Calculate exact match score""" return normalize_answer(prediction) == normalize_answer(ground_truth) def has_answer(answers): """Check if the question has any valid answers""" if not answers or not answers.get("text"): return False answer_texts = answers["text"] if isinstance(answers["text"], list) else [answers["text"]] return any(text.strip() for text in answer_texts) def get_top_k_predictions(qa_pipeline, question, context, k=3): """Get top-k predictions from the model""" # Get raw model outputs inputs = qa_pipeline.tokenizer(question, context, return_tensors="pt", truncation=True, max_length=512) with torch.no_grad(): outputs = qa_pipeline.model(**inputs) start_logits = outputs.start_logits end_logits = outputs.end_logits # Get top-k start and end positions start_scores, start_indices = torch.topk(start_logits.flatten(), k) end_scores, end_indices = torch.topk(end_logits.flatten(), k) predictions = [] # Generate all combinations of start and end positions for start_idx in start_indices: for end_idx in end_indices: if start_idx <= end_idx: # Valid span # Convert to answer text input_ids = inputs["input_ids"][0] answer_tokens = input_ids[start_idx:end_idx + 1] answer_text = qa_pipeline.tokenizer.decode(answer_tokens, skip_special_tokens=True) # Calculate combined score start_score = start_logits[0][start_idx].item() end_score = end_logits[0][end_idx].item() combined_score = start_score + end_score predictions.append({ "answer": answer_text, "score": combined_score, "start": start_idx.item(), "end": end_idx.item() }) # Sort by score and return top-k unique answers predictions.sort(key=lambda x: x["score"], reverse=True) unique_answers = [] seen_answers = set() for pred in predictions: normalized_answer = normalize_answer(pred["answer"]) if normalized_answer not in seen_answers and len(unique_answers) < k: unique_answers.append(pred) seen_answers.add(normalized_answer) return unique_answers def calculate_top_k_has_ans_f1(predictions, ground_truths, k=1): """Calculate Top-K Has Answer F1 score""" f1_scores = [] for preds, gt in zip(predictions, ground_truths): if not has_answer(gt): continue # Skip questions without answers # Get ground truth text gt_text = gt["text"][0] if isinstance(gt["text"], list) else gt["text"] # Calculate F1 for top-k predictions max_f1 = 0 for i in range(min(k, len(preds))): pred_text = preds[i]["answer"] f1 = f1_score_qa(pred_text, gt_text) max_f1 = max(max_f1, f1) f1_scores.append(max_f1) return np.mean(f1_scores) if f1_scores else 0 def evaluate_model(): # Authenticate with Hugging Face using the token hf_token = os.getenv("EVAL_TOKEN") if hf_token: try: login(token=hf_token) print("✓ Authenticated with Hugging Face") except Exception as e: print(f"⚠ Warning: Could not authenticate with HF token: {e}") else: print("⚠ Warning: EVAL_TOKEN not found in environment variables") print("Loading model and tokenizer...") model_name = "AvocadoMuffin/roberta-cuad-qa-v2" try: tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token) model = AutoModelForQuestionAnswering.from_pretrained(model_name, token=hf_token) qa_pipeline = pipeline("question-answering", model=model, tokenizer=tokenizer) print("✓ Model loaded successfully") return qa_pipeline, hf_token except Exception as e: print(f"✗ Error loading model: {e}") return None, None def run_evaluation(num_samples, progress=gr.Progress()): """Run evaluation and return results for Gradio interface""" # Load model qa_pipeline, hf_token = evaluate_model() if qa_pipeline is None: return "❌ Failed to load model", pd.DataFrame(), None progress(0.1, desc="Loading CUAD dataset...") # Load dataset try: dataset = load_dataset("theatticusproject/cuad-qa", trust_remote_code=True, token=hf_token) test_data = dataset["test"] print(f"✓ Loaded CUAD-QA dataset with {len(test_data)} samples") except Exception as e: try: dataset = load_dataset("cuad", split="test[:1000]", trust_remote_code=True, token=hf_token) test_data = dataset print(f"✓ Loaded CUAD dataset with {len(test_data)} samples") except Exception as e2: return f"❌ Error loading dataset: {e2}", pd.DataFrame(), None # Limit samples num_samples = min(num_samples, len(test_data)) test_subset = test_data.select(range(num_samples)) progress(0.2, desc=f"Starting evaluation on {num_samples} samples...") # Initialize storage for predictions and ground truths all_top_k_predictions = [] all_ground_truths = [] all_has_answer_flags = [] # Storage for detailed results detailed_results = [] # Run evaluation for i, example in enumerate(test_subset): progress((0.2 + 0.6 * i / num_samples), desc=f"Processing sample {i+1}/{num_samples}") try: context = example["context"] question = example["question"] answers = example["answers"] # Check if question has answers has_ans = has_answer(answers) all_has_answer_flags.append(has_ans) all_ground_truths.append(answers) # Get top-3 predictions top_k_preds = get_top_k_predictions(qa_pipeline, question, context, k=3) all_top_k_predictions.append(top_k_preds) # Get ground truth for display if has_ans: ground_truth = answers["text"][0] if isinstance(answers["text"], list) else answers["text"] else: ground_truth = "[No Answer]" # Calculate metrics for this sample if has_ans and top_k_preds: top1_f1 = f1_score_qa(top_k_preds[0]["answer"], ground_truth) top3_f1 = max([f1_score_qa(pred["answer"], ground_truth) for pred in top_k_preds[:3]]) em = exact_match_score(top_k_preds[0]["answer"], ground_truth) else: top1_f1 = 0 top3_f1 = 0 em = 0 detailed_results.append({ "Sample_ID": i+1, "Question": question[:100] + "..." if len(question) > 100 else question, "Has_Answer": has_ans, "Top1_Prediction": top_k_preds[0]["answer"] if top_k_preds else "[No Prediction]", "Top3_Predictions": " | ".join([p["answer"] for p in top_k_preds[:3]]), "Ground_Truth": ground_truth, "Top1_F1": round(top1_f1, 3), "Top3_F1": round(top3_f1, 3), "Exact_Match": em, "Top1_Confidence": round(top_k_preds[0]["score"], 3) if top_k_preds else 0 }) except Exception as e: print(f"Error processing sample {i}: {e}") continue progress(0.8, desc="Calculating final metrics...") # Filter for questions with answers only has_ans_predictions = [pred for pred, has_ans in zip(all_top_k_predictions, all_has_answer_flags) if has_ans] has_ans_ground_truths = [gt for gt, has_ans in zip(all_ground_truths, all_has_answer_flags) if has_ans] if len(has_ans_predictions) == 0: return "❌ No samples with answers were found", pd.DataFrame(), None # Calculate Top-K Has Answer F1 scores top1_has_ans_f1 = calculate_top_k_has_ans_f1(has_ans_predictions, has_ans_ground_truths, k=1) * 100 top3_has_ans_f1 = calculate_top_k_has_ans_f1(has_ans_predictions, has_ans_ground_truths, k=3) * 100 # Calculate overall metrics total_samples = len(detailed_results) has_answer_samples = len(has_ans_predictions) avg_exact_match = np.mean([r["Exact_Match"] for r in detailed_results]) * 100 avg_top1_f1 = np.mean([r["Top1_F1"] for r in detailed_results if r["Has_Answer"]]) * 100 avg_top3_f1 = np.mean([r["Top3_F1"] for r in detailed_results if r["Has_Answer"]]) * 100 # Create results summary results_summary = f""" # 📊 CUAD Model Evaluation Results ## 🎯 Model Performance - **Model**: AvocadoMuffin/roberta-cuad-qa-v3 - **Dataset**: CUAD (Contract Understanding Atticus Dataset) - **Total Samples**: {total_samples} - **Samples with Answers**: {has_answer_samples} - **Evaluation Date**: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} ## 📈 Key Metrics (Industry Standard) - **Top 1 Has Ans F1**: {top1_has_ans_f1:.2f}% - **Top 3 Has Ans F1**: {top3_has_ans_f1:.2f}% ## 📋 Additional Metrics - **Exact Match Score**: {avg_exact_match:.2f}% - **Average Top-1 F1**: {avg_top1_f1:.2f}% - **Average Top-3 F1**: {avg_top3_f1:.2f}% ## 🔍 Performance Breakdown - **High Confidence Predictions (>0.8)**: {len([r for r in detailed_results if r['Top1_Confidence'] > 0.8])} ({len([r for r in detailed_results if r['Top1_Confidence'] > 0.8])/total_samples*100:.1f}%) - **Perfect Matches**: {len([r for r in detailed_results if r['Exact_Match'] == 1])} ({len([r for r in detailed_results if r['Exact_Match'] == 1])/total_samples*100:.1f}%) - **High F1 Scores (>0.8)**: {len([r for r in detailed_results if r['Top1_F1'] > 0.8])} ({len([r for r in detailed_results if r['Top1_F1'] > 0.8])/has_answer_samples*100:.1f}%) ## 📊 Comparison with Benchmarks Your model's **Top 1 Has Ans F1** of {top1_has_ans_f1:.2f}% can be compared to: - gustavhartz/roberta-base-cuad-finetuned: 85.68% - Rakib/roberta-base-on-cuad: 81.26% """ # Create detailed results DataFrame df = pd.DataFrame(detailed_results) # Save results to file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") results_file = f"cuad_evaluation_results_{timestamp}.json" complete_results = { "model_name": "AvocadoMuffin/roberta-cuad-qa-v3", "dataset": "cuad", "total_samples": total_samples, "has_answer_samples": has_answer_samples, "top1_has_ans_f1": top1_has_ans_f1, "top3_has_ans_f1": top3_has_ans_f1, "exact_match_score": avg_exact_match, "avg_top1_f1": avg_top1_f1, "avg_top3_f1": avg_top3_f1, "evaluation_date": datetime.now().isoformat(), "detailed_results": detailed_results } try: with open(results_file, "w") as f: json.dump(complete_results, f, indent=2) print(f"✓ Results saved to {results_file}") except Exception as e: print(f"⚠ Warning: Could not save results file: {e}") results_file = None progress(1.0, desc="✅ Evaluation completed!") return results_summary, df, results_file def create_gradio_interface(): """Create Gradio interface for CUAD evaluation""" with gr.Blocks(title="CUAD Model Evaluator", theme=gr.themes.Soft()) as demo: gr.HTML("""
Evaluate your CUAD (Contract Understanding Atticus Dataset) Question Answering model
Model: AvocadoMuffin/roberta-cuad-qa-v3
Now with industry-standard Top-K Has Answer F1 metrics!
Note: "Has Ans" metrics only consider questions that have valid answers.
🤖 Powered by Hugging Face Transformers & Gradio
📚 CUAD Dataset by The Atticus Project
📊 Now with industry-standard Top-K Has Answer F1 metrics