import glob import json import math import os from dataclasses import dataclass from src.display.formatting import make_clickable_model from src.display.utils import AutoEvalColumn, ModelType, Tasks, Precision, WeightType from src.submission.check_validity import is_model_on_hub from src.evaluation.model_trace_eval import compute_model_trace_p_value from src.evaluation.initialize_models import is_model_allowed @dataclass class EvalResult: """Represents one perplexity evaluation result.""" eval_name: str # org_model_precision (uid) full_model: str # org/model (path on hub) org: str model: str revision: str # commit hash, "" if main results: dict precision: Precision = Precision.Unknown model_type: ModelType = ModelType.PT # Default to pretrained weight_type: WeightType = WeightType.Original architecture: str = "Unknown" still_on_hub: bool = False @classmethod def init_from_json_file(self, json_filepath): """Inits the result from the specific model result file""" with open(json_filepath) as fp: data = json.load(fp) config = data.get("config") # Precision precision = Precision.from_str(config.get("model_dtype")) # Get model and org org_and_model = config.get("model_name", config.get("model_args", None)) org_and_model = org_and_model.split("/", 1) if len(org_and_model) == 1: org = None model = org_and_model[0] result_key = f"{model}_{precision.value.name}" else: org = org_and_model[0] model = org_and_model[1] result_key = f"{org}_{model}_{precision.value.name}" full_model = "/".join(org_and_model) still_on_hub, _, model_config = is_model_on_hub( full_model, config.get("model_sha", "main"), trust_remote_code=True, test_tokenizer=False ) architecture = "?" if model_config is not None: architectures = getattr(model_config, "architectures", None) if architectures: architecture = ";".join(architectures) # No perplexity extraction - we only care about p-values results = {} return self( eval_name=result_key, full_model=full_model, org=org, model=model, results=results, precision=precision, revision=config.get("model_sha", ""), still_on_hub=still_on_hub, architecture=architecture ) def to_dict(self): """Converts the Eval Result to a dict compatible with our dataframe display - P-VALUES ONLY""" import sys sys.stderr.write(f"\n=== PROCESSING RESULT TO_DICT (P-VALUES ONLY) ===\n") sys.stderr.write(f"Processing result for model: {self.full_model}\n") sys.stderr.flush() # Create data dictionary - NO TASK PROCESSING AT ALL data_dict = {} # Add core columns data_dict["eval_name"] = self.eval_name data_dict[AutoEvalColumn.precision.name] = self.precision.value.name data_dict[AutoEvalColumn.model_type.name] = self.model_type.value.name data_dict[AutoEvalColumn.model_type_symbol.name] = self.model_type.value.symbol data_dict[AutoEvalColumn.weight_type.name] = self.weight_type.value.name data_dict[AutoEvalColumn.architecture.name] = self.architecture data_dict[AutoEvalColumn.model.name] = make_clickable_model(self.full_model) data_dict[AutoEvalColumn.revision.name] = self.revision data_dict[AutoEvalColumn.still_on_hub.name] = self.still_on_hub # Add default values for missing model info data_dict[AutoEvalColumn.license.name] = "Unknown" data_dict[AutoEvalColumn.params.name] = 0 data_dict[AutoEvalColumn.likes.name] = 0 # Compute model trace p-value sys.stderr.write(f"🧬 COMPUTING MODEL TRACE P-VALUE FOR: {self.full_model}\n") sys.stderr.flush() try: model_trace_p_value = compute_model_trace_p_value( self.full_model, self.revision if self.revision else "main", self.precision.value.name.lower() ) if model_trace_p_value is not None: sys.stderr.write(f"✅ P-value: {model_trace_p_value}\n") else: sys.stderr.write(f"⚠️ P-value computation failed\n") except Exception as e: sys.stderr.write(f"💥 Exception during p-value computation: {e}\n") model_trace_p_value = None data_dict[AutoEvalColumn.model_trace_p_value.name] = model_trace_p_value sys.stderr.write(f"=== END PROCESSING - ONLY P-VALUES ===\n") sys.stderr.flush() return data_dict def get_raw_eval_results(results_path: str) -> list[EvalResult]: """From the path of the results folder root, extract all perplexity results""" import sys sys.stderr.write(f"\nSearching for result files in: {results_path}\n") sys.stderr.flush() model_result_filepaths = [] for root, _, files in os.walk(results_path): # Process all JSON files, regardless of other files in the directory for file in files: if file.endswith(".json"): model_result_filepaths.append(os.path.join(root, file)) sys.stderr.write(f"Found {len(model_result_filepaths)} result files\n") sys.stderr.flush() eval_results = {} for model_result_filepath in model_result_filepaths: try: sys.stderr.write(f"\nProcessing file: {model_result_filepath}\n") sys.stderr.flush() # Quick pre-check: Try to extract model name from file before full processing try: with open(model_result_filepath, 'r') as f: data = json.load(f) config = data.get("config", {}) model_name = config.get("model_name", "") if model_name and not is_model_allowed(model_name): sys.stderr.write(f"⏭️ Skipping non-allowed model file: {model_result_filepath} (model: {model_name})\n") sys.stderr.flush() continue except Exception as e: sys.stderr.write(f"⚠️ Error pre-checking file {model_result_filepath}: {e}\n") sys.stderr.flush() continue # Creation of result eval_result = EvalResult.init_from_json_file(model_result_filepath) sys.stderr.write(f"Created result object for: {eval_result.full_model}\n") sys.stderr.flush() # Store results of same eval together eval_name = eval_result.eval_name if eval_name in eval_results.keys(): eval_results[eval_name].results.update({k: v for k, v in eval_result.results.items() if v is not None}) sys.stderr.write(f"Updated existing result for {eval_name}\n") sys.stderr.flush() else: eval_results[eval_name] = eval_result sys.stderr.write(f"Added new result for {eval_name}\n") sys.stderr.flush() except Exception as e: sys.stderr.write(f"Error processing result file {model_result_filepath}: {e}\n") import traceback sys.stderr.write(f"Traceback: {traceback.format_exc()}\n") sys.stderr.flush() continue results = [] sys.stderr.write(f"\nProcessing {len(eval_results)} evaluation results\n") sys.stderr.flush() for v in eval_results.values(): try: sys.stderr.write(f"\nConverting result to dict for: {v.full_model}\n") sys.stderr.flush() # Filter to only allowed models if not is_model_allowed(v.full_model): sys.stderr.write(f"⏭️ Skipping non-allowed model: {v.full_model}\n") sys.stderr.flush() continue v.to_dict() # we test if the dict version is complete results.append(v) sys.stderr.write("Successfully converted and added result\n") sys.stderr.flush() except KeyError as e: sys.stderr.write(f"Error converting result to dict: {e}\n") import traceback sys.stderr.write(f"Traceback: {traceback.format_exc()}\n") sys.stderr.flush() continue sys.stderr.write(f"\nReturning {len(results)} processed results\n") sys.stderr.flush() return results