leaderboard / src /populate.py
Martin Jurkovic
Update dataset names
7891337
import json
import os
import pandas as pd
import numpy as np
from src.display.formatting import make_clickable_model
from src.display.utils import EvalQueueColumn
from src.about import Tasks, SingleTableTasks, SingleColumnTasks
# Model name mapping dictionary
model_names = {
'CLAVADDPM': "ClavaDDPM",
'RGCLD': "RGCLD",
'MOSTLYAI': "TabularARGN",
'RCTGAN': "RCTGAN",
'REALTABFORMER': "REaLTabFormer",
'SDV': "SDV",
}
# Dataset name mapping dictionary
dataset_names = {
"airbnb-simplified_subsampled": "Airbnb",
"Berka_subsampled": "Berka",
"Biodegradability_v1": "Biodegradability",
"CORA_v1": "Cora",
"imdb_MovieLens_v1": "IMDB",
"rossmann_subsampled": "Rossmann",
"walmart_subsampled": "Walmart",
"f1_subsampled": "F1",
}
# def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame:
# """Creates a dataframe from all the individual experiment results"""
# raw_data = get_raw_eval_results(results_path, requests_path)
# all_data_json = [v.to_dict() for v in raw_data]
# df = pd.DataFrame.from_records(all_data_json)
# df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False)
# df = df[cols].round(decimals=2)
# # filter out if any of the benchmarks have not been produced
# df = df[has_no_nan_values(df, benchmark_cols)]
# return df
def strip_emoji(text: str) -> str:
"""Removes emojis from text"""
return text.encode("ascii", "ignore").decode("ascii").rstrip()
def get_leaderboard_df(results_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame:
"""Creates a dataframe from all the individual experiment results"""
# iterate thorugh all files in the results path and read them into json
all_data_json = []
res_path = os.path.join(results_path, "demo-leaderboard", "syntherela-demo")
for entry in os.listdir(res_path):
if entry.endswith(".json"):
file_path = os.path.join(res_path, entry)
with open(file_path) as fp:
data = json.load(fp)
all_data_json.append(data)
multi_table_metrics = [task.value.metric for task in Tasks]
single_table_metrics = [task.value.metric for task in SingleTableTasks]
single_column_metrics = [task.value.metric for task in SingleColumnTasks]
multi_table_metric_names = [task.value.col_name for task in Tasks]
single_table_metric_names = [task.value.col_name for task in SingleTableTasks]
single_column_metric_names = [task.value.col_name for task in SingleColumnTasks]
# Create mapping between metrics and their display names
multi_table_metric_mapping = dict(zip(multi_table_metrics, multi_table_metric_names))
single_table_metric_mapping = dict(zip(single_table_metrics, single_table_metric_names))
single_column_metric_mapping = dict(zip(single_column_metrics, single_column_metric_names))
# create empty dataframe with the display column names
multitable_df = pd.DataFrame(columns=["Dataset", "Model"] + multi_table_metric_names)
singletable_df = pd.DataFrame(columns=["Dataset", "Model"] + single_table_metric_names)
singlecolumn_df = pd.DataFrame(columns=["Dataset", "Table", "Model"] + single_column_metric_names)
# iterate through all json files and add the data to the dataframe
for data in all_data_json:
model = data["method_name"]
# Rename model if it exists in the mapping dictionary
if model.upper() in model_names:
model = model_names[model.upper()]
dataset = data["dataset_name"]
# Rename dataset if it exists in the mapping dictionary
if dataset in dataset_names:
dataset = dataset_names[dataset]
row = {"Dataset": dataset, "Model": model}
for metric in multi_table_metrics:
stripped_metric = strip_emoji(metric)
display_name = multi_table_metric_mapping[metric] # Get the display name for this metric
# Special case for CardinalityShapeSimilarity which is stored under "Trends"
if "CardinalityShapeSimilarity" in metric:
if "Trends" in data["multi_table_metrics"] and "cardinality" in data["multi_table_metrics"]["Trends"]:
row[display_name] = data["multi_table_metrics"]["Trends"]["cardinality"]
else:
row[display_name] = np.nan
continue
if stripped_metric in data["multi_table_metrics"]:
metric_values = []
for table in data["multi_table_metrics"][stripped_metric].keys():
if "accuracy" in data["multi_table_metrics"][stripped_metric][table]:
metric_values.append(data["multi_table_metrics"][stripped_metric][table]["accuracy"])
if "statistic" in data["multi_table_metrics"][stripped_metric][table]:
metric_values.append(data["multi_table_metrics"][stripped_metric][table]["statistic"])
row[display_name] = np.mean(metric_values).round(decimals=2) # Use display name as column
else:
row[display_name] = np.nan # Use display name as column
multitable_df = pd.concat([multitable_df, pd.DataFrame([row])], ignore_index=True)
singletable_row = {"Dataset": dataset, "Model": model}
for metric in single_table_metrics:
stripped_metric = strip_emoji(metric)
display_name = single_table_metric_mapping[metric] # Get the display name for this metric
if stripped_metric in data["single_table_metrics"]:
metric_values = []
for table in data["single_table_metrics"][stripped_metric].keys():
if "accuracy" in data["single_table_metrics"][stripped_metric][table]:
metric_values.append(data["single_table_metrics"][stripped_metric][table]["accuracy"])
if "value" in data["single_table_metrics"][stripped_metric][table]:
metric_values.append(data["single_table_metrics"][stripped_metric][table]["value"])
singletable_row[display_name] = np.mean(metric_values).round(decimals=2) # Use display name as column
else:
singletable_row[display_name] = np.nan # Use display name as column
singletable_df = pd.concat([singletable_df, pd.DataFrame([singletable_row])], ignore_index=True)
singlecolumn_row = {"Dataset": dataset, "Model": model, "Table": ""}
# insert row
for metric in single_column_metrics:
stripped_metric = strip_emoji(metric)
display_name = single_column_metric_mapping[metric] # Get the display name for this metric
if stripped_metric in data["single_column_metrics"]:
for table in data["single_column_metrics"][stripped_metric].keys():
# check if row where dataset = dataset, model = model, table = table exists
if singlecolumn_df[
(singlecolumn_df["Dataset"] == dataset) &
(singlecolumn_df["Model"] == model) &
(singlecolumn_df["Table"] == table)
].empty:
singlecolumn_row = {"Dataset": dataset, "Model": model, "Table": table}
singlecolumn_df = pd.concat([singlecolumn_df, pd.DataFrame([singlecolumn_row])], ignore_index=True)
metric_values = []
for column in data["single_column_metrics"][stripped_metric][table].keys():
if "accuracy" in data["single_column_metrics"][stripped_metric][table][column]:
metric_values.append(data["single_column_metrics"][stripped_metric][table][column]["accuracy"])
if "value" in data["single_column_metrics"][stripped_metric][table][column]:
metric_values.append(data["single_column_metrics"][stripped_metric][table][column]["value"])
if "statistic" in data["single_column_metrics"][stripped_metric][table][column]:
metric_values.append(data["single_column_metrics"][stripped_metric][table][column]["statistic"])
# save np.mean(metric_values).round(decimals=2) to singlecolumn_df where dataset = dataset, model = model, table = table
singlecolumn_df.loc[
(singlecolumn_df["Dataset"] == dataset) &
(singlecolumn_df["Model"] == model) &
(singlecolumn_df["Table"] == table), display_name] = np.mean(metric_values).round(decimals=2) # Use display name as column
return singlecolumn_df, singletable_df, multitable_df