import glob import json import math import os from dataclasses import dataclass from src.display.formatting import make_clickable_model from src.display.utils import AutoEvalColumn, ModelType, Tasks, Precision, WeightType from src.submission.check_validity import is_model_on_hub @dataclass class EvalResult: """Represents one perplexity evaluation result.""" eval_name: str # org_model_precision (uid) full_model: str # org/model (path on hub) org: str model: str revision: str # commit hash, "" if main results: dict precision: Precision = Precision.Unknown model_type: ModelType = ModelType.PT # Default to pretrained weight_type: WeightType = WeightType.Original architecture: str = "Unknown" still_on_hub: bool = False @classmethod def init_from_json_file(self, json_filepath): """Inits the result from the specific model result file""" with open(json_filepath) as fp: data = json.load(fp) config = data.get("config") # Precision precision = Precision.from_str(config.get("model_dtype")) # Get model and org org_and_model = config.get("model_name", config.get("model_args", None)) org_and_model = org_and_model.split("/", 1) if len(org_and_model) == 1: org = None model = org_and_model[0] result_key = f"{model}_{precision.value.name}" else: org = org_and_model[0] model = org_and_model[1] result_key = f"{org}_{model}_{precision.value.name}" full_model = "/".join(org_and_model) still_on_hub, _, model_config = is_model_on_hub( full_model, config.get("model_sha", "main"), trust_remote_code=True, test_tokenizer=False ) architecture = "?" if model_config is not None: architectures = getattr(model_config, "architectures", None) if architectures: architecture = ";".join(architectures) # Extract perplexity result results = {} if "perplexity" in data["results"]: results["perplexity"] = data["results"]["perplexity"]["perplexity"] return self( eval_name=result_key, full_model=full_model, org=org, model=model, results=results, precision=precision, revision=config.get("model_sha", ""), still_on_hub=still_on_hub, architecture=architecture ) def to_dict(self): """Converts the Eval Result to a dict compatible with our dataframe display""" print(f"\nProcessing result for model: {self.full_model}", flush=True) print(f"Raw results: {self.results}", flush=True) # Calculate average, handling perplexity (lower is better) scores = [] perplexity_score = None for task in Tasks: if task.value.benchmark in self.results: score = self.results[task.value.benchmark] perplexity_score = score # Save the raw score # Convert perplexity to a 0-100 scale where lower perplexity = higher score # Using a log scale since perplexity can vary widely # Cap at 100 for very low perplexity and 0 for very high perplexity score = max(0, min(100, 100 * (1 - math.log(score) / 10))) scores.append(score) average = sum(scores) / len(scores) if scores else 0 print(f"Calculated average score: {average}", flush=True) data_dict = { "eval_name": self.eval_name, # not a column, just a save name, AutoEvalColumn.precision.name: self.precision.value.name, AutoEvalColumn.model_type.name: self.model_type.value.name, AutoEvalColumn.model_type_symbol.name: self.model_type.value.symbol, AutoEvalColumn.weight_type.name: self.weight_type.value.name, AutoEvalColumn.architecture.name: self.architecture, AutoEvalColumn.model.name: make_clickable_model(self.full_model), AutoEvalColumn.revision.name: self.revision, AutoEvalColumn.average.name: average, AutoEvalColumn.still_on_hub.name: self.still_on_hub, # Add missing columns with default values AutoEvalColumn.license.name: "Unknown", # Default license AutoEvalColumn.params.name: 0, # Default params AutoEvalColumn.likes.name: 0, # Default likes } # Add perplexity score with the exact column name from Tasks if perplexity_score is not None: data_dict[Tasks.task0.value.col_name] = perplexity_score print(f"Added perplexity score {perplexity_score} under column {Tasks.task0.value.col_name}", flush=True) else: data_dict[Tasks.task0.value.col_name] = None print(f"No perplexity score found for column {Tasks.task0.value.col_name}", flush=True) print(f"Final data dict keys: {list(data_dict.keys())}", flush=True) return data_dict def get_raw_eval_results(results_path: str) -> list[EvalResult]: """From the path of the results folder root, extract all perplexity results""" print(f"\nSearching for result files in: {results_path}", flush=True) model_result_filepaths = [] for root, _, files in os.walk(results_path): # We should only have json files in model results if len(files) == 0 or any([not f.endswith(".json") for f in files]): continue for file in files: model_result_filepaths.append(os.path.join(root, file)) print(f"Found {len(model_result_filepaths)} result files", flush=True) eval_results = {} for model_result_filepath in model_result_filepaths: try: print(f"\nProcessing file: {model_result_filepath}", flush=True) # Creation of result eval_result = EvalResult.init_from_json_file(model_result_filepath) print(f"Created result object for: {eval_result.full_model}", flush=True) # Store results of same eval together eval_name = eval_result.eval_name if eval_name in eval_results.keys(): eval_results[eval_name].results.update({k: v for k, v in eval_result.results.items() if v is not None}) print(f"Updated existing result for {eval_name}", flush=True) else: eval_results[eval_name] = eval_result print(f"Added new result for {eval_name}", flush=True) except Exception as e: print(f"Error processing result file {model_result_filepath}: {e}", flush=True) continue results = [] print(f"\nProcessing {len(eval_results)} evaluation results", flush=True) for v in eval_results.values(): try: print(f"\nConverting result to dict for: {v.full_model}", flush=True) v.to_dict() # we test if the dict version is complete results.append(v) print("Successfully converted and added result", flush=True) except KeyError as e: print(f"Error converting result to dict: {e}", flush=True) continue print(f"\nReturning {len(results)} processed results", flush=True) return results