Spaces:
Running
Running
# --- START OF MODIFIED FILE app.py --- | |
import gradio as gr | |
import pandas as pd | |
import plotly.express as px | |
import time | |
from datasets import load_dataset # Import the datasets library | |
# --- Constants --- | |
# REMOVED the old MODEL_SIZE_RANGES dictionary. | |
# NEW: Define the discrete steps for the parameter range slider. | |
PARAM_CHOICES = ['< 1B', '1B', '5B', '12B', '32B', '64B', '128B', '256B', '> 500B'] | |
PARAM_CHOICES_DEFAULT = [PARAM_CHOICES[0], PARAM_CHOICES[-1]] | |
# The Hugging Face dataset ID to load. | |
HF_DATASET_ID = "evijit/orgstats_daily_data" | |
TAG_FILTER_CHOICES = [ | |
"Audio & Speech", "Time series", "Robotics", "Music", "Video", "Images", | |
"Text", "Biomedical", "Sciences" | |
] | |
PIPELINE_TAGS = [ | |
'text-generation', 'text-to-image', 'text-classification', 'text2text-generation', | |
'audio-to-audio', 'feature-extraction', 'image-classification', 'translation', | |
'reinforcement-learning', 'fill-mask', 'text-to-speech', 'automatic-speech-recognition', | |
'image-text-to-text', 'token-classification', 'sentence-similarity', 'question-answering', | |
'image-feature-extraction', 'summarization', 'zero-shot-image-classification', | |
'object-detection', 'image-segmentation', 'image-to-image', 'image-to-text', | |
'audio-classification', 'visual-question-answering', 'text-to-video', | |
'zero-shot-classification', 'depth-estimation', 'text-ranking', 'image-to-video', | |
'multiple-choice', 'unconditional-image-generation', 'video-classification', | |
'text-to-audio', 'time-series-forecasting', 'any-to-any', 'video-text-to-text', | |
'table-question-answering', | |
] | |
def load_models_data(): | |
""" | |
Loads the pre-processed models data using the HF datasets library. | |
""" | |
overall_start_time = time.time() | |
print(f"Attempting to load dataset from Hugging Face Hub: {HF_DATASET_ID}") | |
expected_cols = [ | |
'id', 'downloads', 'downloadsAllTime', 'likes', 'pipeline_tag', 'tags', 'params', | |
'organization', 'has_audio', 'has_speech', 'has_music', | |
'has_robot', 'has_bio', 'has_med', 'has_series', 'has_video', 'has_image', | |
'has_text', 'has_science', 'is_audio_speech', 'is_biomed', | |
'data_download_timestamp' | |
] | |
try: | |
dataset_dict = load_dataset(HF_DATASET_ID) | |
if not dataset_dict: | |
raise ValueError(f"Dataset '{HF_DATASET_ID}' loaded but appears empty.") | |
split_name = list(dataset_dict.keys())[0] | |
print(f"Using dataset split: '{split_name}'. Converting to Pandas.") | |
df = dataset_dict[split_name].to_pandas() | |
elapsed = time.time() - overall_start_time | |
missing_cols = [col for col in expected_cols if col not in df.columns] | |
if missing_cols: | |
# The 'params' column is crucial for the new slider. | |
if 'params' in missing_cols: | |
raise ValueError(f"FATAL: Loaded dataset is missing the crucial 'params' column.") | |
print(f"Warning: Loaded dataset is missing some expected columns: {missing_cols}.") | |
# Ensure 'params' column is numeric, coercing errors to NaN and then filling with 0. | |
# This is important for filtering. Assumes params are in billions. | |
if 'params' in df.columns: | |
df['params'] = pd.to_numeric(df['params'], errors='coerce').fillna(0) | |
else: | |
# If 'params' is missing after all, create a dummy column to prevent crashes. | |
df['params'] = 0 | |
print("CRITICAL WARNING: 'params' column not found in data. Parameter filtering will not work.") | |
msg = f"Successfully loaded dataset '{HF_DATASET_ID}' (split: {split_name}) from HF Hub in {elapsed:.2f}s. Shape: {df.shape}" | |
print(msg) | |
return df, True, msg | |
except Exception as e: | |
err_msg = f"Failed to load dataset '{HF_DATASET_ID}' from Hugging Face Hub. Error: {e}" | |
print(err_msg) | |
return pd.DataFrame(), False, err_msg | |
# --- NEW: Helper function to parse slider labels into numerical values --- | |
def get_param_range_values(param_range_labels): | |
"""Converts a list of two string labels from the slider into a numerical min/max tuple.""" | |
if not param_range_labels or len(param_range_labels) != 2: | |
return None, None | |
min_label, max_label = param_range_labels | |
# Min value logic: '< 1B' becomes 0, otherwise parse the number. | |
min_val = 0.0 if '<' in min_label else float(min_label.replace('B', '')) | |
# Max value logic: '> 500B' becomes infinity, otherwise parse the number. | |
max_val = float('inf') if '>' in max_label else float(max_label.replace('B', '')) | |
return min_val, max_val | |
# --- MODIFIED: Function signature and filtering logic updated for parameter range --- | |
def make_treemap_data(df, count_by, top_k=25, tag_filter=None, pipeline_filter=None, param_range=None, skip_orgs=None): | |
if df is None or df.empty: return pd.DataFrame() | |
filtered_df = df.copy() | |
col_map = { "Audio & Speech": "is_audio_speech", "Music": "has_music", "Robotics": "has_robot", | |
"Biomedical": "is_biomed", "Time series": "has_series", "Sciences": "has_science", | |
"Video": "has_video", "Images": "has_image", "Text": "has_text"} | |
if tag_filter and tag_filter in col_map: | |
target_col = col_map[tag_filter] | |
if target_col in filtered_df.columns: | |
filtered_df = filtered_df[filtered_df[target_col]] | |
else: | |
print(f"Warning: Tag filter column '{col_map[tag_filter]}' not found in DataFrame.") | |
if pipeline_filter: | |
if "pipeline_tag" in filtered_df.columns: | |
filtered_df = filtered_df[filtered_df["pipeline_tag"].astype(str) == pipeline_filter] | |
else: | |
print(f"Warning: 'pipeline_tag' column not found for filtering.") | |
# --- MODIFIED: Filtering logic now uses the numerical parameter range --- | |
if param_range: | |
min_params, max_params = get_param_range_values(param_range) | |
is_default_range = (param_range == PARAM_CHOICES_DEFAULT) | |
# Only filter if the range is not the default full range | |
if not is_default_range and 'params' in filtered_df.columns: | |
# The 'params' column is in billions, so the values match our slider | |
if min_params is not None: | |
filtered_df = filtered_df[filtered_df['params'] >= min_params] | |
if max_params is not None and max_params != float('inf'): | |
# The upper bound is exclusive, e.g., 5B to 64B is [5, 64) | |
filtered_df = filtered_df[filtered_df['params'] < max_params] | |
elif 'params' not in filtered_df.columns: | |
print("Warning: 'params' column not found for filtering.") | |
if skip_orgs and len(skip_orgs) > 0: | |
if "organization" in filtered_df.columns: | |
filtered_df = filtered_df[~filtered_df["organization"].isin(skip_orgs)] | |
else: | |
print("Warning: 'organization' column not found for filtering.") | |
if filtered_df.empty: return pd.DataFrame() | |
if count_by not in filtered_df.columns: | |
print(f"Warning: Metric column '{count_by}' not found. Using 0.") | |
filtered_df[count_by] = 0.0 | |
filtered_df[count_by] = pd.to_numeric(filtered_df[count_by], errors="coerce").fillna(0.0) | |
org_totals = filtered_df.groupby("organization")[count_by].sum().nlargest(top_k, keep='first') | |
top_orgs_list = org_totals.index.tolist() | |
treemap_data = filtered_df[filtered_df["organization"].isin(top_orgs_list)][["id", "organization", count_by]].copy() | |
treemap_data["root"] = "models" | |
treemap_data[count_by] = pd.to_numeric(treemap_data[count_by], errors="coerce").fillna(0.0) | |
return treemap_data | |
def create_treemap(treemap_data, count_by, title=None): | |
if treemap_data.empty: | |
fig = px.treemap(names=["No data matches filters"], parents=[""], values=[1]) | |
fig.update_layout(title="No data matches the selected filters", margin=dict(t=50, l=25, r=25, b=25)) | |
return fig | |
fig = px.treemap( | |
treemap_data, path=["root", "organization", "id"], values=count_by, | |
title=title or f"HuggingFace Models - {count_by.capitalize()} by Organization", | |
color_discrete_sequence=px.colors.qualitative.Plotly | |
) | |
fig.update_layout(margin=dict(t=50, l=25, r=25, b=25)) | |
fig.update_traces(textinfo="label+value+percent root", hovertemplate="<b>%{label}</b><br>%{value:,} " + count_by + "<br>%{percentRoot:.2%} of total<extra></extra>") | |
return fig | |
with gr.Blocks(title="HuggingFace Model Explorer", fill_width=True) as demo: | |
models_data_state = gr.State(pd.DataFrame()) | |
loading_complete_state = gr.State(False) | |
with gr.Row(): gr.Markdown("# HuggingFace Models TreeMap Visualization") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
count_by_dropdown = gr.Dropdown(label="Metric", choices=[("Downloads (last 30 days)", "downloads"), ("Downloads (All Time)", "downloadsAllTime"), ("Likes", "likes")], value="downloads") | |
filter_choice_radio = gr.Radio(label="Filter Type", choices=["None", "Tag Filter", "Pipeline Filter"], value="None") | |
tag_filter_dropdown = gr.Dropdown(label="Select Tag", choices=TAG_FILTER_CHOICES, value=None, visible=False) | |
pipeline_filter_dropdown = gr.Dropdown(label="Select Pipeline Tag", choices=PIPELINE_TAGS, value=None, visible=False) | |
# --- MODIFIED: Replaced Dropdown with RangeSlider and a Reset Button --- | |
with gr.Group(): | |
with gr.Row(): | |
gr.Markdown("<div style='padding-top: 10px; font-weight: 500;'>Parameters</div>") | |
reset_params_button = gr.Button("🔄 Reset", visible=False, size="sm", min_width=80) | |
param_range_slider = gr.RangeSlider( | |
label=None, # Label is handled by Markdown above | |
choices=PARAM_CHOICES, | |
value=PARAM_CHOICES_DEFAULT, | |
) | |
# --- END OF MODIFICATION --- | |
top_k_slider = gr.Slider(label="Number of Top Organizations", minimum=5, maximum=50, value=25, step=5) | |
skip_orgs_textbox = gr.Textbox(label="Organizations to Skip (comma-separated)", value="TheBloke,MaziyarPanahi,unsloth,modularai,Gensyn,bartowski") | |
generate_plot_button = gr.Button(value="Generate Plot", variant="primary", interactive=False) | |
with gr.Column(scale=3): | |
plot_output = gr.Plot() | |
status_message_md = gr.Markdown("Initializing...") | |
data_info_md = gr.Markdown("") | |
# --- NEW: Event handlers for the new parameter slider and reset button --- | |
def _update_reset_button_visibility(current_range): | |
"""Shows the reset button only if the slider is not at its default full range.""" | |
is_default = (current_range == PARAM_CHOICES_DEFAULT) | |
return gr.update(visible=not is_default) | |
def _reset_param_slider_and_button(): | |
"""Resets the slider to its default value and hides the reset button.""" | |
return gr.update(value=PARAM_CHOICES_DEFAULT), gr.update(visible=False) | |
param_range_slider.release(fn=_update_reset_button_visibility, inputs=param_range_slider, outputs=reset_params_button) | |
reset_params_button.click(fn=_reset_param_slider_and_button, outputs=[param_range_slider, reset_params_button]) | |
# --- END OF NEW EVENT HANDLERS --- | |
def _update_button_interactivity(is_loaded_flag): | |
return gr.update(interactive=is_loaded_flag) | |
loading_complete_state.change(fn=_update_button_interactivity, inputs=loading_complete_state, outputs=generate_plot_button) | |
def _toggle_filters_visibility(choice): | |
return gr.update(visible=choice == "Tag Filter"), gr.update(visible=choice == "Pipeline Filter") | |
filter_choice_radio.change(fn=_toggle_filters_visibility, inputs=filter_choice_radio, outputs=[tag_filter_dropdown, pipeline_filter_dropdown]) | |
def ui_load_data_controller(progress=gr.Progress()): | |
progress(0, desc=f"Loading dataset '{HF_DATASET_ID}' from Hugging Face Hub...") | |
print("ui_load_data_controller called.") | |
status_msg_ui = "Loading data..." | |
data_info_text = "" | |
current_df = pd.DataFrame() | |
load_success_flag = False | |
try: | |
current_df, load_success_flag, status_msg_from_load = load_models_data() | |
if load_success_flag: | |
progress(0.9, desc="Processing loaded data...") | |
if 'data_download_timestamp' in current_df.columns and not current_df.empty and pd.notna(current_df['data_download_timestamp'].iloc[0]): | |
timestamp_from_parquet = pd.to_datetime(current_df['data_download_timestamp'].iloc[0]).tz_localize('UTC') | |
data_as_of_date_display = timestamp_from_parquet.strftime('%B %d, %Y, %H:%M:%S %Z') | |
else: | |
data_as_of_date_display = "Pre-processed (date unavailable)" | |
# --- MODIFIED: Removed the old size category distribution text --- | |
param_count = (current_df['params'] > 0).sum() if 'params' in current_df.columns else 0 | |
data_info_text = (f"### Data Information\n" | |
f"- Source: `{HF_DATASET_ID}`\n" | |
f"- Overall Status: {status_msg_from_load}\n" | |
f"- Total models loaded: {len(current_df):,}\n" | |
f"- Models with parameter counts: {param_count:,}\n" | |
f"- Data as of: {data_as_of_date_display}\n") | |
status_msg_ui = "Data loaded successfully. Ready to generate plot." | |
else: | |
data_info_text = f"### Data Load Failed\n- {status_msg_from_load}" | |
status_msg_ui = status_msg_from_load | |
except Exception as e: | |
status_msg_ui = f"An unexpected error occurred in ui_load_data_controller: {str(e)}" | |
data_info_text = f"### Critical Error\n- {status_msg_ui}" | |
print(f"Critical error in ui_load_data_controller: {e}") | |
load_success_flag = False | |
return current_df, load_success_flag, data_info_text, status_msg_ui | |
# --- MODIFIED: Updated controller signature and logic to handle new slider --- | |
def ui_generate_plot_controller(metric_choice, filter_type, tag_choice, pipeline_choice, | |
param_range_choice, k_orgs, skip_orgs_input, df_current_models, progress=gr.Progress()): | |
if df_current_models is None or df_current_models.empty: | |
empty_fig = create_treemap(pd.DataFrame(), metric_choice, "Error: Model Data Not Loaded") | |
error_msg = "Model data is not loaded or is empty. Please wait for data to load." | |
gr.Warning(error_msg) | |
return empty_fig, error_msg | |
progress(0.1, desc="Preparing data for visualization...") | |
tag_to_use = tag_choice if filter_type == "Tag Filter" else None | |
pipeline_to_use = pipeline_choice if filter_type == "Pipeline Filter" else None | |
orgs_to_skip = [org.strip() for org in skip_orgs_input.split(',') if org.strip()] if skip_orgs_input else [] | |
# Pass the param_range_choice directly to make_treemap_data | |
treemap_df = make_treemap_data(df_current_models, metric_choice, k_orgs, tag_to_use, pipeline_to_use, param_range_choice, orgs_to_skip) | |
progress(0.7, desc="Generating Plotly visualization...") | |
title_labels = {"downloads": "Downloads (last 30 days)", "downloadsAllTime": "Downloads (All Time)", "likes": "Likes"} | |
chart_title = f"HuggingFace Models - {title_labels.get(metric_choice, metric_choice)} by Organization" | |
plotly_fig = create_treemap(treemap_df, metric_choice, chart_title) | |
if treemap_df.empty: | |
plot_stats_md = "No data matches the selected filters. Try adjusting your filters." | |
else: | |
total_items_in_plot = len(treemap_df['id'].unique()) | |
total_value_in_plot = treemap_df[metric_choice].sum() | |
plot_stats_md = (f"## Plot Statistics\n- **Models shown**: {total_items_in_plot:,}\n- **Total {metric_choice}**: {int(total_value_in_plot):,}") | |
return plotly_fig, plot_stats_md | |
demo.load( | |
fn=ui_load_data_controller, | |
inputs=[], | |
outputs=[models_data_state, loading_complete_state, data_info_md, status_message_md] | |
) | |
# --- MODIFIED: Updated the inputs list for the click event --- | |
generate_plot_button.click( | |
fn=ui_generate_plot_controller, | |
inputs=[count_by_dropdown, filter_choice_radio, tag_filter_dropdown, pipeline_filter_dropdown, | |
param_range_slider, top_k_slider, skip_orgs_textbox, models_data_state], | |
outputs=[plot_output, status_message_md] | |
) | |
if __name__ == "__main__": | |
print(f"Application starting. Data will be loaded from Hugging Face dataset: {HF_DATASET_ID}") | |
demo.queue().launch() | |
# --- END OF MODIFIED FILE app.py --- |