import subprocess |
import gradio as gr |
import zipfile |
import os |
import shutil |
import pandas as pd |
from apscheduler.schedulers.background import BackgroundScheduler |
from huggingface_hub import snapshot_download, Repository, HfFolder |
from src.about import ( |
) |
from src.display.css_html_js import custom_css |
from src.display.utils import ( |
AutoEvalColumn, |
ModelType, |
fields, |
WeightType, |
Precision |
) |
from src.populate import get_evaluation_queue_df, get_leaderboard_df |
from src.submission.submit import add_new_eval |
from src.submission.evaluate import calculate_metrics |
import json |
def handle_new_eval_submission(model_name, model_zip, model_link=None) -> str: |
try: |
if not model_name: |
return "Please enter a model name." |
if not isinstance(model_name, str): |
return "Model name must be a string." |
if len(model_name.split()) > 1: |
return "Model name should be a single word with hyphens." |
if model_name in leaderboard_df[AutoEvalColumn.model.name].values: |
return "Model name already exists in the leaderboard. Please choose a different name." |
if model_zip is None: |
return "Please provide a zip file." |
extraction_path = os.path.join(EVAL_RESULTS_PATH_BACKEND, model_name) |
if model_zip is not None: |
if not zipfile.is_zipfile(model_zip): |
return "Please upload a valid zip file." |
os.makedirs(extraction_path, exist_ok=True) |
try: |
with zipfile.ZipFile(model_zip, 'r') as zip_ref: |
zip_ref.extractall(extraction_path) |
except zipfile.BadZipFile: |
return "The uploaded file is not a valid zip file." |
except Exception as e: |
return f"An error occurred while extracting the zip file: {str(e)}" |
print("File unzipped successfully to:", extraction_path) |
try: |
calculate_metrics(extraction_path, model_name) |
except Exception as e: |
return f"An error occurred while calculating metrics: {str(e)}" |
results_file_path = os.path.join(os.getcwd(), EVAL_RESULTS_PATH, '3d-pope', model_name, 'results.json') |
if not os.path.exists(results_file_path): |
return f"Results file not found at {results_file_path}" |
try: |
with open(results_file_path, 'r') as f: |
json.load(f) |
except json.JSONDecodeError: |
return "The results file is not a valid JSON file." |
try: |
API.upload_file( |
path_or_fileobj=results_file_path, |
path_in_repo=os.path.join('3d-pope', model_name, 'results.json'), |
repo_id=RESULTS_REPO, |
repo_type="dataset", |
) |
except Exception as e: |
return f"An error occurred while uploading results: {str(e)}" |
try: |
restart_space() |
except Exception as e: |
return f"An error occurred while restarting the space: {str(e)}" |
return "Submission received and results are being processed. Please check the leaderboard for updates." |
except Exception as e: |
return f"An unexpected error occurred: {str(e)}" |
def restart_space(): |
API.restart_space(repo_id=REPO_ID) |
try: |
snapshot_download( |
repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN |
) |
except Exception: |
restart_space() |
try: |
snapshot_download( |
repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30, token=TOKEN |
) |
except Exception: |
restart_space() |
raw_data, original_df = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS) |
leaderboard_df = original_df.copy() |
def custom_format(x): |
if pd.isna(x): |
return x |
try: |
float_x = float(x) |
if float_x.is_integer(): |
return f"{int(float_x)}" |
else: |
return f"{float_x:.2f}".rstrip('0').rstrip('.') |
except ValueError: |
return x |
numeric_cols = [col for col in leaderboard_df.columns if leaderboard_df[col].dtype in ['float64', 'float32']] |
leaderboard_df[numeric_cols] = leaderboard_df[numeric_cols].applymap(custom_format) |
( |
finished_eval_queue_df, |
running_eval_queue_df, |
pending_eval_queue_df, |
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS) |
def update_table( |
hidden_df: pd.DataFrame, |
columns: list, |
query: str, |
): |
filtered_df = filter_queries(query, hidden_df) |
df = select_columns(filtered_df, columns) |
return df |
def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame: |
return df[(df[AutoEvalColumn.model.name].str.contains(query, case=False))] |
def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame: |
always_here_cols = [ |
AutoEvalColumn.model.name, |
] |
filtered_df = df[ |
always_here_cols + [c for c in COLS if c in df.columns and c in columns] |
] |
return filtered_df |
def filter_queries(query: str, filtered_df: pd.DataFrame) -> pd.DataFrame: |
final_df = [] |
if query != "": |
queries = [q.strip() for q in query.split(";")] |
for _q in queries: |
_q = _q.strip() |
if _q != "": |
temp_filtered_df = search_table(filtered_df, _q) |
if len(temp_filtered_df) > 0: |
final_df.append(temp_filtered_df) |
if len(final_df) > 0: |
filtered_df = pd.concat(final_df) |
existing_columns = [col for col in [AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name] if col in filtered_df.columns] |
filtered_df = filtered_df.drop_duplicates(subset=existing_columns) |
return filtered_df |
def filter_models( |
df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, show_deleted: bool |
) -> pd.DataFrame: |
filtered_df = df |
type_emoji = [t[0] for t in type_query] |
numeric_interval = pd.IntervalIndex(sorted([NUMERIC_INTERVALS[s] for s in size_query])) |
params_column = pd.to_numeric(df[AutoEvalColumn.params.name], errors="coerce") |
mask = params_column.apply(lambda x: any(numeric_interval.contains(x))) |
filtered_df = filtered_df.loc[mask] |
return filtered_df |
demo = gr.Blocks(css=custom_css) |
with demo: |
gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") |
with gr.Tabs(elem_classes="tab-buttons") as tabs: |
with gr.TabItem("π
3D-POPE Benchmark", elem_id="llm-benchmark-tab-table", id=0): |
with gr.Row(): |
with gr.Column(): |
with gr.Row(): |
search_bar = gr.Textbox( |
placeholder=" π Search for your model (separate multiple queries with `;`) and press ENTER...", |
show_label=False, |
elem_id="search-bar", |
) |
with gr.Row(): |
shown_columns = gr.CheckboxGroup( |
choices=[ |
c.name |
for c in fields(AutoEvalColumn) |
if not c.hidden and not c.never_hidden |
], |
value=[ |
c.name |
for c in fields(AutoEvalColumn) |
if c.displayed_by_default and not c.hidden and not c.never_hidden |
], |
label="Select columns to show", |
elem_id="column-select", |
interactive=True, |
) |
leaderboard_table = gr.components.Dataframe( |
value=leaderboard_df[ |
[c.name for c in fields(AutoEvalColumn) if c.never_hidden] |
+ shown_columns.value |
], |
headers=[c.name for c in fields(AutoEvalColumn) if c.never_hidden] + shown_columns.value, |
datatype=TYPES, |
elem_id="leaderboard-table", |
interactive=False, |
visible=True, |
) |
hidden_leaderboard_table_for_search = gr.components.Dataframe( |
value=original_df[COLS], |
headers=COLS, |
datatype=TYPES, |
visible=False, |
) |
search_bar.submit( |
update_table, |
[ |
hidden_leaderboard_table_for_search, |
shown_columns, |
search_bar, |
], |
leaderboard_table, |
) |
for selector in [shown_columns]: |
selector.change( |
update_table, |
[ |
hidden_leaderboard_table_for_search, |
shown_columns, |
search_bar, |
], |
leaderboard_table, |
queue=True, |
) |
with gr.TabItem("π About", elem_id="llm-benchmark-tab-table", id=2): |
gr.Markdown(LLM_BENCHMARKS_TEXT, elem_classes="markdown-text") |
with gr.TabItem("π Submit here! ", elem_id="llm-benchmark-tab-table", id=3): |
with gr.Column(): |
with gr.Row(): |
gr.Markdown(EVALUATION_QUEUE_TEXT, elem_classes="markdown-text") |
with gr.Row(): |
gr.Markdown("# π Submit your results here!", elem_classes="markdown-text") |
with gr.Row(): |
model_name_textbox = gr.Textbox(label="Model name") |
model_zip_file = gr.File(label="Upload model prediction result ZIP file") |
with gr.Row(): |
gr.Column() |
with gr.Column(scale=2): |
submit_button = gr.Button("Submit Model") |
submission_result = gr.Markdown() |
submit_button.click( |
handle_new_eval_submission, |
[model_name_textbox, model_zip_file], |
submission_result |
) |
gr.Column() |
with gr.Row(): |
with gr.Accordion("π Citation", open=False): |
citation_button = gr.Textbox( |
lines=20, |
elem_id="citation-button", |
show_copy_button=True, |
) |
scheduler = BackgroundScheduler() |
scheduler.add_job(restart_space, "interval", seconds=1800) |
scheduler.start() |
demo.queue(default_concurrency_limit=40).launch() |