Spaces:
Running
Running
| import copy | |
| import glob | |
| import json | |
| import os | |
| import hashlib | |
| import gradio as gr | |
| import pandas as pd | |
| from huggingface_hub import HfApi, snapshot_download | |
| from compare_significance import check_significance, SUPPORTED_METRICS | |
| VISIBLE_METRICS = SUPPORTED_METRICS + ["macro_f1"] | |
| api = HfApi() | |
| ORG = "xdolez52" | |
| REPO = f"{ORG}/LLM_benchmark_data" | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| TASKS_METADATA_PATH = "./tasks_metadata.json" | |
| class LeaderboardServer: | |
| def __init__(self): | |
| self.server_address = REPO | |
| self.repo_type = "dataset" | |
| self.local_leaderboard = snapshot_download( | |
| self.server_address, | |
| repo_type=self.repo_type, | |
| token=HF_TOKEN, | |
| local_dir="./", | |
| ) | |
| self.submission_id_to_file = {} # Map submission ids to file paths | |
| self.tasks_metadata = json.load(open(TASKS_METADATA_PATH)) | |
| self.tasks_categories = {self.tasks_metadata[task]["category"] for task in self.tasks_metadata} | |
| self.submission_ids = set() | |
| self.fetch_existing_models() | |
| self.tournament_results = self.load_tournament_results() | |
| self.pre_submit = None | |
| def update_leaderboard(self): | |
| self.local_leaderboard = snapshot_download( | |
| self.server_address, | |
| repo_type=self.repo_type, | |
| token=HF_TOKEN, | |
| local_dir="./", | |
| ) | |
| self.fetch_existing_models() | |
| self.tournament_results = self.load_tournament_results() | |
| def load_tournament_results(self): | |
| metadata_rank_paths = os.path.join(self.local_leaderboard, "tournament.json") | |
| if not os.path.exists(metadata_rank_paths): | |
| return {} | |
| with open(metadata_rank_paths) as ranks_file: | |
| results = json.load(ranks_file) | |
| return results | |
| def fetch_existing_models(self): | |
| # Models data | |
| for submission_file in glob.glob(os.path.join(self.local_leaderboard, "data") + "/*.json"): | |
| data = json.load(open(submission_file)) | |
| metadata = data.get('metadata') | |
| if metadata is None: | |
| continue | |
| submission_id = metadata["submission_id"] | |
| self.submission_ids.add(submission_id) | |
| self.submission_id_to_file[submission_id] = submission_file | |
| def get_leaderboard(self, tournament_results=None): | |
| results = tournament_results if tournament_results else self.tournament_results | |
| if len(results) == 0: | |
| return pd.DataFrame(columns=['No submissions yet']) | |
| else: | |
| processed_results = [] | |
| for submission_id in results.keys(): | |
| path = self.submission_id_to_file.get(submission_id) | |
| if path is None: | |
| if self.pre_submit and submission_id == self.pre_submit[1]: | |
| data = json.load(open(self.pre_submit[2])) | |
| else: | |
| raise gr.Error(f"Internal error: Submission [{submission_id}] not found") | |
| elif path: | |
| data = json.load(open(path)) | |
| else: | |
| raise gr.Error(f"Submission [{submission_id}] not found") | |
| if submission_id != data["metadata"]["submission_id"]: | |
| raise gr.Error(f"Proper submission [{submission_id}] not found") | |
| local_results = {} | |
| for task in self.tasks_metadata.keys(): | |
| local_results[task] = 0 | |
| for model in results[submission_id].keys(): | |
| if results[submission_id][model][task]: | |
| local_results[task] += 1 | |
| for metric in VISIBLE_METRICS: | |
| metric_value = data['results'][task].get(metric) | |
| if metric_value is not None: | |
| local_results[task + "_" + metric] = metric_value | |
| local_results["submission_id"] = submission_id | |
| if self.pre_submit and submission_id == self.pre_submit[1]: | |
| processed_results.insert(0, local_results) | |
| else: | |
| processed_results.append(local_results) | |
| dataframe = pd.DataFrame.from_records(processed_results) | |
| df_order = ( | |
| ["submission_id"] | |
| + list(self.tasks_metadata.keys()) | |
| + [ | |
| col | |
| for col in dataframe.columns | |
| if col != "submission_id" and col not in self.tasks_metadata.keys() | |
| ] | |
| ) | |
| dataframe = dataframe[df_order] | |
| dataframe = dataframe.rename( | |
| columns={key: value["name"] for key, value in self.tasks_metadata.items()} | |
| ) | |
| return dataframe | |
| def start_tournament(self, new_submission_id, new_model_file): | |
| new_tournament = copy.deepcopy(self.tournament_results) | |
| new_tournament[new_submission_id] = {} | |
| new_tournament[new_submission_id][new_submission_id] = { | |
| task: False for task in self.tasks_metadata.keys() | |
| } | |
| for submission_id in self.submission_ids: | |
| res = check_significance(new_model_file, self.submission_id_to_file[submission_id]) | |
| res_inverse = check_significance(self.submission_id_to_file[submission_id], new_model_file) | |
| new_tournament[new_submission_id][submission_id] = { | |
| task: data["significant"] for task, data in res.items() | |
| } | |
| new_tournament[submission_id][new_submission_id] = { | |
| task: data["significant"] for task, data in res_inverse.items() | |
| } | |
| return new_tournament | |
| def create_submission_id(metadata): | |
| # Délka ID můsí být omezena, protože se používá v názvu souboru | |
| submission_id = "_".join([metadata[key][:7] for key in ( | |
| "team_name", | |
| "model_name", | |
| "model_predictions_sha256", | |
| "model_results_sha256", | |
| )]) | |
| return submission_id | |
| def get_sha256_hexdigest(obj): | |
| data = json.dumps( | |
| obj, | |
| separators=(',', ':'), | |
| sort_keys=True, | |
| ensure_ascii=True, | |
| ).encode() | |
| result = hashlib.sha256(data).hexdigest() | |
| return result | |
| def prepare_model_for_submission(self, file, metadata) -> None: | |
| with open(file, "r") as f: | |
| data = json.load(f) | |
| data["metadata"] = metadata | |
| metadata["model_predictions_sha256"] = self.get_sha256_hexdigest(data["predictions"]) | |
| metadata["model_results_sha256"] = self.get_sha256_hexdigest(data["results"]) | |
| # Délka ID můsí být omezena, protože se používá v názvu souboru | |
| submission_id = self.create_submission_id(metadata) | |
| metadata["submission_id"] = submission_id | |
| with open(file, "w") as f: | |
| json.dump(data, f, separators=(',', ':')) # compact JSON | |
| tournament_results = self.start_tournament(submission_id, file) | |
| self.pre_submit = tournament_results, submission_id, file | |
| def save_pre_submit(self): | |
| if self.pre_submit: | |
| tournament_results, submission_id, file = self.pre_submit | |
| api.upload_file( | |
| path_or_fileobj=file, | |
| path_in_repo=f"data/{submission_id}.json", | |
| repo_id=self.server_address, | |
| repo_type=self.repo_type, | |
| token=HF_TOKEN, | |
| ) | |
| # Temporary save tournament results | |
| tournament_results_path = os.path.join(self.local_leaderboard, "tournament.json") | |
| with open(tournament_results_path, "w") as f: | |
| json.dump(tournament_results, f, sort_keys=True, indent=2) # readable JSON | |
| api.upload_file( | |
| path_or_fileobj=tournament_results_path, | |
| path_in_repo="tournament.json", | |
| repo_id=self.server_address, | |
| repo_type=self.repo_type, | |
| token=HF_TOKEN, | |
| ) | |
| def get_model_detail(self, submission_id): | |
| path = self.submission_id_to_file.get(submission_id) | |
| if path is None: | |
| raise gr.Error(f"Submission [{submission_id}] not found") | |
| data = json.load(open(path)) | |
| return data["metadata"] | |