import json import os from typing import Any, Dict import pandas as pd from huggingface_hub import HfApi, hf_hub_download, metadata_load from .dataset_handler import VIDORE_2_DATASETS_KEYWORDS, VIDORE_DATASETS_KEYWORDS, get_datasets_nickname BLOCKLIST = ["impactframes"] class ModelHandler: def __init__(self, model_infos_path="model_infos.json"): self.api = HfApi() self.model_infos_path = model_infos_path self.model_infos = self._load_model_infos() def _load_model_infos(self) -> Dict: if os.path.exists(self.model_infos_path): with open(self.model_infos_path) as f: return json.load(f) return {} def _save_model_infos(self): with open(self.model_infos_path, "w") as f: json.dump(self.model_infos, f) def _are_results_in_new_vidore_format(self, results: Dict[str, Any]) -> bool: return "metadata" in results and "metrics" in results def _is_baseline_repo(self, repo_id: str) -> bool: return repo_id == "vidore/baseline-results" def sanitize_model_name(self, model_name): return model_name.replace("/", "_").replace(".", "-thisisapoint-") def fuze_model_infos(self, model_name, results): for dataset, metrics in results.items(): if dataset not in self.model_infos[model_name]["results"].keys(): self.model_infos[model_name]["results"][dataset] = metrics else: continue def get_vidore_data(self, metric="ndcg_at_5"): models = self.api.list_models(filter="vidore") repositories = [model.modelId for model in models] # type: ignore # Sort repositories to process non-baseline repos first (to prioritize their results) repositories.sort(key=lambda x: self._is_baseline_repo(x)) for repo_id in repositories: org_name = repo_id.split("/")[0] if org_name in BLOCKLIST: continue files = [f for f in self.api.list_repo_files(repo_id) if f.endswith("_metrics.json") or f == "results.json"] if len(files) == 0: continue else: for file in files: if file.endswith("results.json"): model_name = repo_id.replace("/", "_").replace(".", "-thisisapoint-") else: model_name = file.split("_metrics.json")[0] model_name = model_name.replace("/", "_").replace(".", "-thisisapoint-") # Skip if the model is from baseline and we already have results readme_path = hf_hub_download(repo_id, filename="README.md") meta = metadata_load(readme_path) try: result_path = hf_hub_download(repo_id, filename=file) with open(result_path) as f: results = json.load(f) if self._are_results_in_new_vidore_format(results): metadata = results["metadata"] results = results["metrics"] # Handles the case where the model is both in baseline and outside of it # (prioritizes the non-baseline results) if self._is_baseline_repo(repo_id) and self.sanitize_model_name(model_name) in self.model_infos: self.fuze_model_infos(model_name, results) self.model_infos[model_name] = {"meta": meta, "results": results} except Exception as e: print(f"Error loading {model_name} - {e}") continue # In order to keep only models relevant to a benchmark def filter_models_by_benchmark(self, benchmark_version=1): filtered_model_infos = {} keywords = VIDORE_DATASETS_KEYWORDS if benchmark_version == 1 else VIDORE_2_DATASETS_KEYWORDS for model, info in self.model_infos.items(): results = info["results"] if any(any(keyword in dataset for keyword in keywords) for dataset in results.keys()): filtered_model_infos[model] = info return filtered_model_infos # Compute the average of a metric for each model, def compute_averages(self, metric="ndcg_at_5", benchmark_version=1): model_res = {} filtered_model_infos = self.filter_models_by_benchmark(benchmark_version) if len(filtered_model_infos) > 0: for model in filtered_model_infos.keys(): res = filtered_model_infos[model]["results"] dataset_res = {} keywords = VIDORE_DATASETS_KEYWORDS if benchmark_version == 1 else VIDORE_2_DATASETS_KEYWORDS for dataset in res.keys(): if not any(keyword in dataset for keyword in keywords): continue dataset_nickname = get_datasets_nickname(dataset) dataset_res[dataset_nickname] = res[dataset][metric] model_res[model] = dataset_res df = pd.DataFrame(model_res).T return df return pd.DataFrame() @staticmethod def add_rank(df, benchmark_version=1): df.fillna(0.0, inplace=True) cols_to_rank = [ col for col in df.columns if col not in [ "Model", "Model Size (Million Parameters)", "Memory Usage (GB, fp32)", "Embedding Dimensions", "Max Tokens", ] ] if len(cols_to_rank) == 1: df.sort_values(cols_to_rank[0], ascending=False, inplace=True) else: df.insert(len(df.columns) - len(cols_to_rank), "Average", df[cols_to_rank].mean(axis=1, skipna=False)) df.sort_values("Average", ascending=False, inplace=True) df.insert(0, "Rank", list(range(1, len(df) + 1))) # multiply values by 100 if they are floats and round to 1 decimal place for col in df.columns: if df[col].dtype == "float64": df[col] = df[col].apply(lambda x: round(x * 100, 1)) return df