import os import json import numpy as np from datasets import load_dataset from transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline import torch from sklearn.metrics import f1_score import re from collections import Counter import string from huggingface_hub import login import gradio as gr import pandas as pd from datetime import datetime def normalize_answer(s): """Normalize answer for evaluation""" def remove_articles(text): return re.sub(r'\b(a|an|the)\b', ' ', text) def white_space_fix(text): return ' '.join(text.split()) def remove_punc(text): exclude = set(string.punctuation) return ''.join(ch for ch in text if ch not in exclude) def lower(text): return text.lower() return white_space_fix(remove_articles(remove_punc(lower(s)))) def f1_score_qa(prediction, ground_truth): """Calculate F1 score for QA""" prediction_tokens = normalize_answer(prediction).split() ground_truth_tokens = normalize_answer(ground_truth).split() if len(prediction_tokens) == 0 or len(ground_truth_tokens) == 0: return int(prediction_tokens == ground_truth_tokens) common = Counter(prediction_tokens) & Counter(ground_truth_tokens) num_same = sum(common.values()) if num_same == 0: return 0 precision = 1.0 * num_same / len(prediction_tokens) recall = 1.0 * num_same / len(ground_truth_tokens) f1 = (2 * precision * recall) / (precision + recall) return f1 def exact_match_score(prediction, ground_truth): """Calculate exact match score""" return normalize_answer(prediction) == normalize_answer(ground_truth) def max_over_ground_truths(metric_fn, prediction, ground_truths): """Calculate maximum score over all ground truth answers""" scores = [] for ground_truth in ground_truths: score = metric_fn(prediction, ground_truth) scores.append(score) return max(scores) if scores else 0 def load_cuad_dataset(hf_token=None): """Try multiple methods to load CUAD dataset""" print("Attempting to load CUAD dataset...") # Method 1: Try theatticusproject/cuad try: print("Trying theatticusproject/cuad...") dataset = load_dataset("theatticusproject/cuad", token=hf_token) if "test" in dataset: test_data = dataset["test"] print(f"✓ Loaded theatticusproject/cuad with {len(test_data)} test samples") return test_data, "theatticusproject/cuad" elif "validation" in dataset: test_data = dataset["validation"] print(f"✓ Loaded theatticusproject/cuad with {len(test_data)} validation samples") return test_data, "theatticusproject/cuad" else: print("No test or validation split found in theatticusproject/cuad") except Exception as e: print(f"Failed to load theatticusproject/cuad: {e}") # Method 2: Try theatticusproject/cuad-qa try: print("Trying theatticusproject/cuad-qa...") dataset = load_dataset("theatticusproject/cuad-qa", token=hf_token) if "test" in dataset: test_data = dataset["test"] print(f"✓ Loaded theatticusproject/cuad-qa with {len(test_data)} test samples") return test_data, "theatticusproject/cuad-qa" elif "validation" in dataset: test_data = dataset["validation"] print(f"✓ Loaded theatticusproject/cuad-qa with {len(test_data)} validation samples") return test_data, "theatticusproject/cuad-qa" except Exception as e: print(f"Failed to load theatticusproject/cuad-qa: {e}") # Method 3: Try the original cuad identifier try: print("Trying cuad...") dataset = load_dataset("cuad", token=hf_token) if "test" in dataset: test_data = dataset["test"] print(f"✓ Loaded cuad with {len(test_data)} test samples") return test_data, "cuad" elif "validation" in dataset: test_data = dataset["validation"] print(f"✓ Loaded cuad with {len(test_data)} validation samples") return test_data, "cuad" except Exception as e: print(f"Failed to load cuad: {e}") # Method 4: Try with trust_remote_code=True try: print("Trying with trust_remote_code=True...") dataset = load_dataset("theatticusproject/cuad", token=hf_token, trust_remote_code=True) if "test" in dataset: test_data = dataset["test"] print(f"✓ Loaded with trust_remote_code, test samples: {len(test_data)}") return test_data, "theatticusproject/cuad (trust_remote_code)" elif "validation" in dataset: test_data = dataset["validation"] print(f"✓ Loaded with trust_remote_code, validation samples: {len(test_data)}") return test_data, "theatticusproject/cuad (trust_remote_code)" except Exception as e: print(f"Failed with trust_remote_code: {e}") # Method 5: Create a synthetic CUAD-like dataset for testing print("⚠️ Creating synthetic CUAD-like test data...") synthetic_data = [] # Create some contract-like questions and contexts contract_samples = [ { "context": "This Agreement shall commence on January 1, 2024 and shall continue for a period of twelve (12) months unless terminated earlier in accordance with the terms hereof. The initial term may be extended for additional periods of twelve (12) months each upon mutual written consent of both parties.", "question": "What is the duration of the agreement?", "answers": {"text": ["twelve (12) months", "12 months"], "answer_start": [85, 85]} }, { "context": "The Company shall pay the Consultant a fee of $50,000 per month for services rendered under this Agreement. Payment shall be made within thirty (30) days of the end of each calendar month.", "question": "What is the monthly fee?", "answers": {"text": ["$50,000 per month", "$50,000"], "answer_start": [45, 45]} }, { "context": "Either party may terminate this Agreement immediately upon written notice in the event of a material breach by the other party that remains uncured for thirty (30) days after written notice of such breach.", "question": "What is the cure period for material breach?", "answers": {"text": ["thirty (30) days", "30 days"], "answer_start": [125, 132]} }, { "context": "The Contractor shall maintain commercial general liability insurance with coverage of not less than $1,000,000 per occurrence and $2,000,000 in the aggregate.", "question": "What is the minimum insurance coverage per occurrence?", "answers": {"text": ["$1,000,000 per occurrence", "$1,000,000"], "answer_start": [85, 85]} }, { "context": "All intellectual property developed under this Agreement shall be owned by the Company. The Contractor hereby assigns all rights, title and interest in such intellectual property to the Company.", "question": "Who owns the intellectual property?", "answers": {"text": ["the Company", "Company"], "answer_start": [70, 74]} } ] # Duplicate samples to create a larger dataset for i in range(100): # Create 500 samples sample = contract_samples[i % len(contract_samples)].copy() sample["id"] = f"synthetic_{i}" synthetic_data.append(sample) # Convert to dataset format from datasets import Dataset test_data = Dataset.from_list(synthetic_data) print(f"✓ Created synthetic CUAD-like dataset with {len(test_data)} samples") return test_data, "synthetic_cuad" def inspect_dataset_structure(dataset, dataset_name="dataset"): """Inspect dataset structure for debugging""" print(f"\n=== {dataset_name} Dataset Structure ===") print(f"Dataset type: {type(dataset)}") print(f"Dataset length: {len(dataset)}") if len(dataset) > 0: sample = dataset[0] print(f"Sample keys: {list(sample.keys()) if isinstance(sample, dict) else 'Not a dict'}") print(f"Sample structure:") for key, value in sample.items(): if isinstance(value, dict): print(f" {key} (dict): {list(value.keys())}") for sub_key, sub_value in value.items(): print(f" {sub_key}: {type(sub_value)} - {str(sub_value)[:50]}...") else: print(f" {key}: {type(value)} - {str(value)[:100]}...") print("=" * 50) return dataset def evaluate_model(): # Authenticate with Hugging Face using the token hf_token = os.getenv("EVAL_TOKEN") if hf_token: try: login(token=hf_token) print("✓ Authenticated with Hugging Face") except Exception as e: print(f"⚠ Warning: Could not authenticate with HF token: {e}") else: print("⚠ Warning: EVAL_TOKEN not found in environment variables") print("Loading model and tokenizer...") model_name = "AvocadoMuffin/roberta-cuad-qa-v3" try: tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token) model = AutoModelForQuestionAnswering.from_pretrained(model_name, token=hf_token) qa_pipeline = pipeline("question-answering", model=model, tokenizer=tokenizer) print("✓ Model loaded successfully") return qa_pipeline, hf_token except Exception as e: print(f"✗ Error loading model: {e}") return None, None def run_evaluation(num_samples, progress=gr.Progress()): """Run evaluation and return results for Gradio interface""" # Load model qa_pipeline, hf_token = evaluate_model() if qa_pipeline is None: return "❌ Failed to load model", pd.DataFrame(), None progress(0.1, desc="Loading CUAD dataset...") # Load dataset test_data, dataset_name = load_cuad_dataset(hf_token) if test_data is None: return "❌ Failed to load any dataset", pd.DataFrame(), None # Inspect dataset structure test_data = inspect_dataset_structure(test_data, dataset_name) # Limit samples num_samples = min(num_samples, len(test_data)) test_subset = test_data.select(range(num_samples)) progress(0.2, desc=f"Starting evaluation on {num_samples} samples from {dataset_name}...") # Initialize metrics exact_matches = [] f1_scores = [] predictions = [] # Run evaluation for i, example in enumerate(test_subset): progress((0.2 + 0.7 * i / num_samples), desc=f"Processing sample {i+1}/{num_samples}") try: # Handle different dataset formats if "context" in example: context = example["context"] elif "text" in example: context = example["text"] else: print(f"Warning: No context found in sample {i}") continue if "question" in example: question = example["question"] elif "title" in example: question = example["title"] else: print(f"Warning: No question found in sample {i}") continue # Handle answers field ground_truths = [] if "answers" in example: answers = example["answers"] if isinstance(answers, dict): if "text" in answers: if isinstance(answers["text"], list): ground_truths = [ans for ans in answers["text"] if ans and ans.strip()] else: ground_truths = [answers["text"]] if answers["text"] and answers["text"].strip() else [] elif isinstance(answers, list): ground_truths = [ans for ans in answers if ans and ans.strip()] # Skip if no ground truth if not ground_truths: print(f"Warning: No ground truth found for sample {i}") continue # Get model prediction try: result = qa_pipeline(question=question, context=context) predicted_answer = result["answer"] confidence = result["score"] except Exception as e: print(f"Error getting prediction for sample {i}: {e}") continue # Calculate metrics using max over ground truths em = max_over_ground_truths(exact_match_score, predicted_answer, ground_truths) f1 = max_over_ground_truths(f1_score_qa, predicted_answer, ground_truths) exact_matches.append(em) f1_scores.append(f1) predictions.append({ "Sample_ID": i+1, "Question": question[:100] + "..." if len(question) > 100 else question, "Predicted_Answer": predicted_answer[:100] + "..." if len(predicted_answer) > 100 else predicted_answer, "Ground_Truth": ground_truths[0][:100] + "..." if len(ground_truths[0]) > 100 else ground_truths[0], "Num_Ground_Truths": len(ground_truths), "Exact_Match": em, "F1_Score": round(f1, 3), "Confidence": round(confidence, 3) }) except Exception as e: print(f"Error processing sample {i}: {e}") continue progress(0.9, desc="Calculating final metrics...") # Calculate final metrics if len(exact_matches) == 0: return "❌ No samples were successfully processed", pd.DataFrame(), None avg_exact_match = np.mean(exact_matches) * 100 avg_f1_score = np.mean(f1_scores) * 100 # Calculate additional statistics high_confidence_samples = [p for p in predictions if p['Confidence'] > 0.8] perfect_matches = [p for p in predictions if p['Exact_Match'] == 1] high_f1_samples = [p for p in predictions if p['F1_Score'] > 0.8] # Create results summary results_summary = f""" # 📊 CUAD Model Evaluation Results ## ⚠️ Dataset Information - **Dataset Used**: {dataset_name} - **Dataset Status**: {"✅ Authentic CUAD" if "cuad" in dataset_name.lower() and "synthetic" not in dataset_name else "⚠️ Fallback/Synthetic Data"} ## 🎯 Overall Performance - **Model**: AvocadoMuffin/roberta-cuad-qa-v3 - **Samples Evaluated**: {len(exact_matches)} - **Evaluation Date**: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} ## 📈 Core Metrics - **Exact Match Score**: {avg_exact_match:.2f}% - **F1 Score**: {avg_f1_score:.2f}% ## 🔍 Performance Analysis - **High Confidence Predictions (>0.8)**: {len(high_confidence_samples)} ({len(high_confidence_samples)/len(predictions)*100:.1f}%) - **Perfect Matches**: {len(perfect_matches)} ({len(perfect_matches)/len(predictions)*100:.1f}%) - **High F1 Scores (>0.8)**: {len(high_f1_samples)} ({len(high_f1_samples)/len(predictions)*100:.1f}%) ## 📊 Distribution - **Average Confidence**: {np.mean([p['Confidence'] for p in predictions]):.3f} - **Median F1 Score**: {np.median([p['F1_Score'] for p in predictions]):.3f} - **Samples with Multiple Ground Truths**: {len([p for p in predictions if p['Num_Ground_Truths'] > 1])} ## 🎯 Evaluation Quality {"✅ This evaluation uses the proper CUAD dataset for contract understanding tasks." if "cuad" in dataset_name.lower() and "synthetic" not in dataset_name else "⚠️ WARNING: This evaluation used fallback data. Results may not be representative of actual CUAD performance."} The evaluation accounts for multiple ground truth answers where available, using the maximum score across all valid answers for each question. """ # Create detailed results DataFrame df = pd.DataFrame(predictions) # Save results to file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") results_file = f"cuad_evaluation_results_{timestamp}.json" detailed_results = { "model_name": "AvocadoMuffin/roberta-cuad-qa-v3", "dataset": dataset_name, "num_samples": len(exact_matches), "exact_match_score": avg_exact_match, "f1_score": avg_f1_score, "evaluation_date": datetime.now().isoformat(), "evaluation_methodology": "max_over_ground_truths", "dataset_authentic": "cuad" in dataset_name.lower() and "synthetic" not in dataset_name, "predictions": predictions, "summary_stats": { "avg_confidence": float(np.mean([p['Confidence'] for p in predictions])), "median_f1": float(np.median([p['F1_Score'] for p in predictions])), "samples_with_multiple_ground_truths": len([p for p in predictions if p['Num_Ground_Truths'] > 1]) } } try: with open(results_file, "w") as f: json.dump(detailed_results, f, indent=2) print(f"✓ Results saved to {results_file}") except Exception as e: print(f"⚠ Warning: Could not save results file: {e}") results_file = None progress(1.0, desc="✅ Evaluation completed!") return results_summary, df, results_file def create_gradio_interface(): """Create Gradio interface for CUAD evaluation""" with gr.Blocks(title="CUAD Model Evaluator", theme=gr.themes.Soft()) as demo: gr.HTML("""

🏛️ CUAD Model Evaluation Dashboard

Evaluate your CUAD (Contract Understanding Atticus Dataset) Question Answering model

Model: AvocadoMuffin/roberta-cuad-qa-v3

This tool will attempt to load the authentic CUAD dataset, with fallbacks if needed.

""") with gr.Row(): with gr.Column(scale=1): gr.HTML("

⚙️ Evaluation Settings

") num_samples = gr.Slider( minimum=10, maximum=500, value=100, step=10, label="Number of samples to evaluate", info="Choose between 10-500 samples (more samples = more accurate but slower)" ) evaluate_btn = gr.Button( "🚀 Start Evaluation", variant="primary", size="lg" ) gr.HTML("""

📋 What this evaluates:

Note: This tool will try to load the authentic CUAD dataset. If that fails, it will use synthetic contract data for testing purposes.

""") with gr.Column(scale=2): gr.HTML("

📊 Results

") results_summary = gr.Markdown( value="Click '🚀 Start Evaluation' to begin...", label="Evaluation Summary" ) gr.HTML("
") with gr.Row(): gr.HTML("

📋 Detailed Results

") with gr.Row(): detailed_results = gr.Dataframe( label="Sample-by-Sample Results", interactive=False, wrap=True ) with gr.Row(): download_file = gr.File( label="📥 Download Complete Results (JSON)", visible=False ) # Event handlers def handle_evaluation(num_samples): summary, df, file_path = run_evaluation(num_samples) if file_path and os.path.exists(file_path): return summary, df, gr.update(visible=True, value=file_path) else: return summary, df, gr.update(visible=False) evaluate_btn.click( fn=handle_evaluation, inputs=[num_samples], outputs=[results_summary, detailed_results, download_file], show_progress=True ) # Footer gr.HTML("""

🤖 Powered by Hugging Face Transformers & Gradio

📚 CUAD Dataset by The Atticus Project

⚠️ If authentic CUAD data cannot be loaded, synthetic contract data will be used for testing purposes.

""") return demo if __name__ == "__main__": print("CUAD Model Evaluation with Gradio Interface") print("=" * 50) # Check if CUDA is available if torch.cuda.is_available(): print(f"✓ CUDA available: {torch.cuda.get_device_name(0)}") else: print("! Running on CPU") # Create and launch Gradio interface demo = create_gradio_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=True, debug=True )