import numpy as np import pandas as pd from transformers import AutoModelForCausalLM, AutoTokenizer from sentence_transformers import CrossEncoder from leaderboard.src.backend.util import generate_prompt def load_evaluation_model(model_path): model = CrossEncoder(model_path) model.save_pretrained('.checkpoints/{model_path}') return model class SummaryGenerator: def __init__(self, model_id, revision): self.tokenizer = AutoTokenizer.from_pretrained(model_id, revision) self.model = AutoModelForCausalLM.from_pretrained(model_id, revision) self.summaries_df = pd.DataFrame() self.revision = revision self.avg_length = None self.answer_rate = None def generate_summaries(self, df): source, summary, dataset = [], [], [] for index, row in df.iterrows(): _source = row['text'] _dataset = row['dataset'] prompt = generate_prompt(_source) inputs = self.tokenizer(prompt, return_tensors='pt', max_length=1024, revision=self.revision) try: outputs = self.model.generate(**inputs, max_new_tokens=1024, do_sample=False, temperature=0.0, revision=self.revision) response = self.tokenizer.decode(outputs[0], skip_special_tokens=True, revision=self.revision) except Exception as e: print(f"Error at index {index}: {e}") response = "" summary.append(response) source.append(_source) dataset.append(_dataset) self.summaries_df = pd.DataFrame(list(zip(source, summary, dataset)), columns=["source", "summary", "dataset"]) self._compute_avg_length() self._compute_answer_rate() return self.summaries_df def _compute_avg_length(self): total_words = 0 count = 0 for summary in self.summaries_df['summary']: if summary != "": words = summary.split() total_words += len(words) count += 1 self.avg_length = 0 if count == 0 else total_words / count def _compute_answer_rate(self): non_empty_count = sum(1 for summary in self.summaries_df['summary'] if summary != "") total_rows = len(self.summaries_df) self.answer_rate = 0 if total_rows == 0 else non_empty_count / total_rows class EvaluationModel: def __init__(self, model_path): self.model = load_evaluation_model(model_path) self.scores = [] self.accuracy = None self.hallucination_rate = None def evaluate_hallucination(self, summaries_df): # Convert to NumPy arrays for efficient processing source_docs = np.array(summaries_df['source']) generated_summaries = np.array(summaries_df['summary']) scores = self.model.predict(source_docs, generated_summaries) self.scores = scores return self.scores def compute_accuracy(self): if not self.scores: raise ValueError("Scores not calculated. Call evaluate_hallucination() first.") # Use threshold of 0.5 to compute accuracy num_above_threshold = sum(score >= 0.5 for score in self.scores) num_total = len(self.scores) if num_total == 0: raise ValueError("No scores available to compute accuracy.") self.accuracy = (num_above_threshold / num_total) * 100 self.hallucination_rate = 100 - self.accuracy return self.accuracy