Spaces:
Runtime error
Runtime error
import json | |
import os | |
import pandas as pd | |
import numpy as np | |
from src.display.formatting import make_clickable_model | |
from src.display.utils import EvalQueueColumn | |
from src.about import Tasks, SingleTableTasks, SingleColumnTasks | |
# Model name mapping dictionary | |
model_names = { | |
'CLAVADDPM': "ClavaDDPM", | |
'RGCLD': "RGCLD", | |
'MOSTLYAI': "TabularARGN", | |
'RCTGAN': "RCTGAN", | |
'REALTABFORMER': "REaLTabFormer", | |
'SDV': "SDV", | |
} | |
# Dataset name mapping dictionary | |
dataset_names = { | |
"airbnb-simplified_subsampled": "Airbnb", | |
"Berka_subsampled": "Berka", | |
"Biodegradability_v1": "Biodegradability", | |
"CORA_v1": "Cora", | |
"imdb_MovieLens_v1": "IMDB", | |
"rossmann_subsampled": "Rossmann", | |
"walmart_subsampled": "Walmart", | |
"f1_subsampled": "F1", | |
} | |
# def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: | |
# """Creates a dataframe from all the individual experiment results""" | |
# raw_data = get_raw_eval_results(results_path, requests_path) | |
# all_data_json = [v.to_dict() for v in raw_data] | |
# df = pd.DataFrame.from_records(all_data_json) | |
# df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False) | |
# df = df[cols].round(decimals=2) | |
# # filter out if any of the benchmarks have not been produced | |
# df = df[has_no_nan_values(df, benchmark_cols)] | |
# return df | |
def strip_emoji(text: str) -> str: | |
"""Removes emojis from text""" | |
return text.encode("ascii", "ignore").decode("ascii").rstrip() | |
def get_leaderboard_df(results_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: | |
"""Creates a dataframe from all the individual experiment results""" | |
# iterate thorugh all files in the results path and read them into json | |
all_data_json = [] | |
res_path = os.path.join(results_path, "demo-leaderboard", "syntherela-demo") | |
for entry in os.listdir(res_path): | |
if entry.endswith(".json"): | |
file_path = os.path.join(res_path, entry) | |
with open(file_path) as fp: | |
data = json.load(fp) | |
all_data_json.append(data) | |
multi_table_metrics = [task.value.metric for task in Tasks] | |
single_table_metrics = [task.value.metric for task in SingleTableTasks] | |
single_column_metrics = [task.value.metric for task in SingleColumnTasks] | |
multi_table_metric_names = [task.value.col_name for task in Tasks] | |
single_table_metric_names = [task.value.col_name for task in SingleTableTasks] | |
single_column_metric_names = [task.value.col_name for task in SingleColumnTasks] | |
# Create mapping between metrics and their display names | |
multi_table_metric_mapping = dict(zip(multi_table_metrics, multi_table_metric_names)) | |
single_table_metric_mapping = dict(zip(single_table_metrics, single_table_metric_names)) | |
single_column_metric_mapping = dict(zip(single_column_metrics, single_column_metric_names)) | |
# create empty dataframe with the display column names | |
multitable_df = pd.DataFrame(columns=["Dataset", "Model"] + multi_table_metric_names) | |
singletable_df = pd.DataFrame(columns=["Dataset", "Model"] + single_table_metric_names) | |
singlecolumn_df = pd.DataFrame(columns=["Dataset", "Table", "Model"] + single_column_metric_names) | |
# iterate through all json files and add the data to the dataframe | |
for data in all_data_json: | |
model = data["method_name"] | |
# Rename model if it exists in the mapping dictionary | |
if model.upper() in model_names: | |
model = model_names[model.upper()] | |
dataset = data["dataset_name"] | |
# Rename dataset if it exists in the mapping dictionary | |
if dataset in dataset_names: | |
dataset = dataset_names[dataset] | |
row = {"Dataset": dataset, "Model": model} | |
for metric in multi_table_metrics: | |
stripped_metric = strip_emoji(metric) | |
display_name = multi_table_metric_mapping[metric] # Get the display name for this metric | |
# Special case for CardinalityShapeSimilarity which is stored under "Trends" | |
if "CardinalityShapeSimilarity" in metric: | |
if "Trends" in data["multi_table_metrics"] and "cardinality" in data["multi_table_metrics"]["Trends"]: | |
row[display_name] = data["multi_table_metrics"]["Trends"]["cardinality"] | |
else: | |
row[display_name] = np.nan | |
continue | |
if stripped_metric in data["multi_table_metrics"]: | |
metric_values = [] | |
for table in data["multi_table_metrics"][stripped_metric].keys(): | |
if "accuracy" in data["multi_table_metrics"][stripped_metric][table]: | |
metric_values.append(data["multi_table_metrics"][stripped_metric][table]["accuracy"]) | |
if "statistic" in data["multi_table_metrics"][stripped_metric][table]: | |
metric_values.append(data["multi_table_metrics"][stripped_metric][table]["statistic"]) | |
row[display_name] = np.mean(metric_values).round(decimals=2) # Use display name as column | |
else: | |
row[display_name] = np.nan # Use display name as column | |
multitable_df = pd.concat([multitable_df, pd.DataFrame([row])], ignore_index=True) | |
singletable_row = {"Dataset": dataset, "Model": model} | |
for metric in single_table_metrics: | |
stripped_metric = strip_emoji(metric) | |
display_name = single_table_metric_mapping[metric] # Get the display name for this metric | |
if stripped_metric in data["single_table_metrics"]: | |
metric_values = [] | |
for table in data["single_table_metrics"][stripped_metric].keys(): | |
if "accuracy" in data["single_table_metrics"][stripped_metric][table]: | |
metric_values.append(data["single_table_metrics"][stripped_metric][table]["accuracy"]) | |
if "value" in data["single_table_metrics"][stripped_metric][table]: | |
metric_values.append(data["single_table_metrics"][stripped_metric][table]["value"]) | |
singletable_row[display_name] = np.mean(metric_values).round(decimals=2) # Use display name as column | |
else: | |
singletable_row[display_name] = np.nan # Use display name as column | |
singletable_df = pd.concat([singletable_df, pd.DataFrame([singletable_row])], ignore_index=True) | |
singlecolumn_row = {"Dataset": dataset, "Model": model, "Table": ""} | |
# insert row | |
for metric in single_column_metrics: | |
stripped_metric = strip_emoji(metric) | |
display_name = single_column_metric_mapping[metric] # Get the display name for this metric | |
if stripped_metric in data["single_column_metrics"]: | |
for table in data["single_column_metrics"][stripped_metric].keys(): | |
# check if row where dataset = dataset, model = model, table = table exists | |
if singlecolumn_df[ | |
(singlecolumn_df["Dataset"] == dataset) & | |
(singlecolumn_df["Model"] == model) & | |
(singlecolumn_df["Table"] == table) | |
].empty: | |
singlecolumn_row = {"Dataset": dataset, "Model": model, "Table": table} | |
singlecolumn_df = pd.concat([singlecolumn_df, pd.DataFrame([singlecolumn_row])], ignore_index=True) | |
metric_values = [] | |
for column in data["single_column_metrics"][stripped_metric][table].keys(): | |
if "accuracy" in data["single_column_metrics"][stripped_metric][table][column]: | |
metric_values.append(data["single_column_metrics"][stripped_metric][table][column]["accuracy"]) | |
if "value" in data["single_column_metrics"][stripped_metric][table][column]: | |
metric_values.append(data["single_column_metrics"][stripped_metric][table][column]["value"]) | |
if "statistic" in data["single_column_metrics"][stripped_metric][table][column]: | |
metric_values.append(data["single_column_metrics"][stripped_metric][table][column]["statistic"]) | |
# save np.mean(metric_values).round(decimals=2) to singlecolumn_df where dataset = dataset, model = model, table = table | |
singlecolumn_df.loc[ | |
(singlecolumn_df["Dataset"] == dataset) & | |
(singlecolumn_df["Model"] == model) & | |
(singlecolumn_df["Table"] == table), display_name] = np.mean(metric_values).round(decimals=2) # Use display name as column | |
return singlecolumn_df, singletable_df, multitable_df | |