import gradio as gr import pandas as pd import os # # https://discuss.huggingface.co/t/issues-with-sadtalker-zerogpu-spaces-inquiry-about-community-grant/110625/10 # if os.environ.get("SPACES_ZERO_GPU") is not None: # import spaces # else: # class spaces: # @staticmethod # def GPU(func): # def wrapper(*args, **kwargs): # return func(*args, **kwargs) # return wrapper import sys from qatch.connectors.sqlite_connector import SqliteConnector from qatch.generate_dataset.orchestrator_generator import OrchestratorGenerator from qatch.evaluate_dataset.orchestrator_evaluator import OrchestratorEvaluator #from predictor.orchestrator_predictor import OrchestratorPredictor import utils_get_db_tables_info import utilities as us import time import plotly.express as px import plotly.graph_objects as go import plotly.colors as pc # @spaces.GPU # def model_prediction(): # pass with open('style.css', 'r') as file: css = file.read() # DataFrame di default df_default = pd.DataFrame({ 'Name': ['Alice', 'Bob', 'Charlie'], 'Age': [25, 30, 35], 'City': ['New York', 'Los Angeles', 'Chicago'] }) models_path = "models.csv" # Variabile globale per tenere traccia dei dati correnti df_current = df_default.copy() input_data = { 'input_method': "", 'data_path': "", 'db_name': "", 'data': { 'data_frames': {}, # dictionary of dataframes 'db': None # SQLITE3 database object }, 'models': [] } def load_data(file, path, use_default): """Carica i dati da un file, un percorso o usa il DataFrame di default.""" global df_current if use_default: input_data["input_method"] = 'default' input_data["data_path"] = os.path.join(".", "data", "data_interface", "mytable.sqlite") input_data["db_name"] = os.path.splitext(os.path.basename(input_data["data_path"]))[0] input_data["data"]['data_frames'] = {'MyTable': df_current} if( input_data["data"]['data_frames']): table2primary_key = {} for table_name, df in input_data["data"]['data_frames'].items(): # Assign primary keys for each table table2primary_key[table_name] = 'id' input_data["data"]["db"] = SqliteConnector( relative_db_path=input_data["data_path"], db_name=input_data["db_name"], tables= input_data["data"]['data_frames'], table2primary_key=table2primary_key ) df_current = df_default.copy() # Ripristina i dati di default return input_data["data"]['data_frames'] selected_inputs = sum([file is not None, bool(path), use_default]) if selected_inputs > 1: return 'Errore: Selezionare solo un metodo di input alla volta.' if file is not None: try: input_data["input_method"] = 'uploaded_file' input_data["db_name"] = os.path.splitext(os.path.basename(file))[0] input_data["data_path"] = os.path.join(".", "data", "data_interface",f"{input_data['db_name']}.sqlite") input_data["data"] = us.load_data(file, input_data["db_name"]) df_current = input_data["data"]['data_frames'].get('MyTable', df_default) # Carica il DataFrame if( input_data["data"]['data_frames']): table2primary_key = {} for table_name, df in input_data["data"]['data_frames'].items(): # Assign primary keys for each table table2primary_key[table_name] = 'id' input_data["data"]["db"] = SqliteConnector( relative_db_path=input_data["data_path"], db_name=input_data["db_name"], tables= input_data["data"]['data_frames'], table2primary_key=table2primary_key ) return input_data["data"]['data_frames'] except Exception as e: return f'Errore nel caricamento del file: {e}' """ if path: if not os.path.exists(path): return 'Errore: Il percorso specificato non esiste.' try: input_data["input_method"] = 'uploaded_file' input_data["data_path"] = path input_data["db_name"] = os.path.splitext(os.path.basename(path))[0] input_data["data"] = us.load_data(input_data["data_path"], input_data["db_name"]) df_current = input_data["data"]['data_frames'].get('MyTable', df_default) # Carica il DataFrame return input_data["data"]['data_frames'] except Exception as e: return f'Errore nel caricamento del file dal percorso: {e}' """ return input_data["data"]['data_frames'] def preview_default(use_default): """Mostra il DataFrame di default se il checkbox รจ selezionato.""" if use_default: return df_default # Mostra il DataFrame di default return df_current # Mostra il DataFrame corrente, che potrebbe essere stato modificato def update_df(new_df): """Aggiorna il DataFrame corrente.""" global df_current # Usa la variabile globale per aggiornarla df_current = new_df return df_current def open_accordion(target): # Apre uno e chiude l'altro if target == "reset": df_current = df_default.copy() input_data['input_method'] = "" input_data['data_path'] = "" input_data['db_name'] = "" input_data['data']['data_frames'] = {} input_data['data']['db'] = None input_data['models'] = [] return gr.update(open=True), gr.update(open=False, visible=False), gr.update(open=False, visible=False), gr.update(open=False, visible=False), gr.update(open=False, visible=False), gr.update(value=False), gr.update(value=None) elif target == "model_selection": return gr.update(open=False), gr.update(open=False), gr.update(open=True, visible=True), gr.update(open=False), gr.update(open=False) # Interfaccia Gradio with gr.Blocks(theme='d8ahazard/rd_blue', css_paths='style.css') as interface: with gr.Row(): gr.Column(scale=1) gr.Image( value="https://github.com/CristianDegni01/Automatic-LLM-Benchmark-Analysis-for-Text2SQL-GRADIO/blob/master/models_logo/QATCH.png?raw=true", show_label=False, container=False, height=200, # in pixel width=400 ) gr.Column(scale=1) data_state = gr.State(None) # Memorizza i dati caricati upload_acc = gr.Accordion("Upload your data section", open=True, visible=True) select_table_acc = gr.Accordion("Select tables", open=False, visible=False) select_model_acc = gr.Accordion("Select models", open=False, visible=False) qatch_acc = gr.Accordion("QATCH execution", open=False, visible=False) metrics_acc = gr.Accordion("Metrics", open=False, visible=False) #metrics_acc = gr.Accordion("Metrics", open=False, visible=False, render=False) ################################# # DATABASE INSERTION # ################################# with upload_acc: gr.Markdown("## Data Upload") file_input = gr.File(label="Drag and drop a file", file_types=[".csv", ".xlsx", ".sqlite"]) with gr.Row(): default_checkbox = gr.Checkbox(label="Use default DataFrame") preview_output = gr.DataFrame(interactive=True, visible=True, value=df_default) submit_button = gr.Button("Load Data", interactive=False) # Disabled by default output = gr.JSON(visible=False) # Dictionary output # Function to enable the button if there is data to load def enable_submit(file, use_default): return gr.update(interactive=bool(file or use_default)) # Function to uncheck the checkbox if a file is uploaded def deselect_default(file): if file: return gr.update(value=False) return gr.update() # Enable the button when inputs are provided file_input.change(fn=enable_submit, inputs=[file_input, default_checkbox], outputs=[submit_button]) default_checkbox.change(fn=enable_submit, inputs=[file_input, default_checkbox], outputs=[submit_button]) # Show preview of the default DataFrame when checkbox is selected default_checkbox.change(fn=preview_default, inputs=[default_checkbox], outputs=[preview_output]) preview_output.change(fn=update_df, inputs=[preview_output], outputs=[preview_output]) # Uncheck the checkbox when a file is uploaded file_input.change(fn=deselect_default, inputs=[file_input], outputs=[default_checkbox]) def handle_output(file, use_default): """Handles the output when the 'Load Data' button is pressed.""" result = load_data(file, None, use_default) if isinstance(result, dict): # If result is a dictionary of DataFrames if len(result) == 1: # If there's only one table return ( gr.update(visible=False), # Hide JSON output result, # Save the data state gr.update(visible=False), # Hide table selection result, # Maintain the data state gr.update(interactive=False), # Disable the submit button gr.update(visible=True, open=True), # Proceed to select_model_acc gr.update(visible=True, open=False) ) else: return ( gr.update(visible=False), result, gr.update(open=True, visible=True), result, gr.update(interactive=False), gr.update(visible=False), # Keep current behavior gr.update(visible=True, open=True) ) else: return ( gr.update(visible=False), None, gr.update(open=False, visible=True), None, gr.update(interactive=True), gr.update(visible=False), gr.update(visible=True, open=True) ) submit_button.click( fn=handle_output, inputs=[file_input, default_checkbox], outputs=[output, output, select_table_acc, data_state, submit_button, select_model_acc, upload_acc] ) ###################################### # TABLE SELECTION PART # ###################################### with select_table_acc: table_selector = gr.CheckboxGroup(choices=[], label="Select tables to display", value=[]) table_outputs = [gr.DataFrame(label=f"Table {i+1}", interactive=True, visible=False) for i in range(5)] selected_table_names = gr.Textbox(label="Selected tables", visible=False, interactive=False) # Model selection button (initially disabled) open_model_selection = gr.Button("Choose your models", interactive=False) def update_table_list(data): """Dynamically updates the list of available tables.""" if isinstance(data, dict) and data: table_names = list(data.keys()) # Return only the table names return gr.update(choices=table_names, value=[]) # Reset selections return gr.update(choices=[], value=[]) def show_selected_tables(data, selected_tables): """Displays only the tables selected by the user and enables the button.""" updates = [] if isinstance(data, dict) and data: available_tables = list(data.keys()) # Actually available names selected_tables = [t for t in selected_tables if t in available_tables] # Filter valid selections tables = {name: data[name] for name in selected_tables} # Filter the DataFrames for i, (name, df) in enumerate(tables.items()): updates.append(gr.update(value=df, label=f"Table: {name}", visible=True)) # If there are fewer than 5 tables, hide the other DataFrames for _ in range(len(tables), 5): updates.append(gr.update(visible=False)) else: updates = [gr.update(value=pd.DataFrame(), visible=False) for _ in range(5)] # Enable/disable the button based on selections button_state = bool(selected_tables) # True if at least one table is selected, False otherwise updates.append(gr.update(interactive=button_state)) # Update button state return updates def show_selected_table_names(selected_tables): """Displays the names of the selected tables when the button is pressed.""" if selected_tables: return gr.update(value=", ".join(selected_tables), visible=False) return gr.update(value="", visible=False) # Automatically updates the checkbox list when `data_state` changes data_state.change(fn=update_table_list, inputs=[data_state], outputs=[table_selector]) # Updates the visible tables and the button state based on user selections table_selector.change(fn=show_selected_tables, inputs=[data_state, table_selector], outputs=table_outputs + [open_model_selection]) # Shows the list of selected tables when "Choose your models" is clicked open_model_selection.click(fn=show_selected_table_names, inputs=[table_selector], outputs=[selected_table_names]) open_model_selection.click(open_accordion, inputs=gr.State("model_selection"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc]) #################################### # MODEL SELECTION PART # #################################### with select_model_acc: gr.Markdown("**Model Selection**") # Assume that `us.read_models_csv` also returns the image path model_list_dict = us.read_models_csv(models_path) model_list = [model["code"] for model in model_list_dict] model_images = [model["image_path"] for model in model_list_dict] model_checkboxes = [] rows = [] # Dynamically create checkboxes with images (3 per row) for i in range(0, len(model_list), 3): with gr.Row(): cols = [] for j in range(3): if i + j < len(model_list): model = model_list[i + j] image_path = model_images[i + j] with gr.Column(): gr.Image(image_path, show_label=False) checkbox = gr.Checkbox(label=model, value=False) model_checkboxes.append(checkbox) cols.append(checkbox) rows.append(cols) selected_models_output = gr.JSON(visible=False) # Function to get selected models def get_selected_models(*model_selections): selected_models = [model for model, selected in zip(model_list, model_selections) if selected] input_data['models'] = selected_models button_state = bool(selected_models) # True if at least one model is selected, False otherwise return selected_models, gr.update(open=True, visible=True), gr.update(interactive=button_state) # Submit button (initially disabled) submit_models_button = gr.Button("Submit Models", interactive=False) # Link checkboxes to selection events for checkbox in model_checkboxes: checkbox.change( fn=get_selected_models, inputs=model_checkboxes, outputs=[selected_models_output, select_model_acc, submit_models_button] ) submit_models_button.click( fn=lambda *args: (get_selected_models(*args), gr.update(open=False, visible=True), gr.update(open=True, visible=True)), inputs=model_checkboxes, outputs=[selected_models_output, select_model_acc, qatch_acc] ) def enable_disable(enable): return ( *[gr.update(interactive=enable) for _ in model_checkboxes], gr.update(interactive=enable), gr.update(interactive=enable), gr.update(interactive=enable), gr.update(interactive=enable), gr.update(interactive=enable), gr.update(interactive=enable), *[gr.update(interactive=enable) for _ in table_outputs], gr.update(interactive=enable) ) reset_data = gr.Button("Back to upload data section") submit_models_button.click( fn=enable_disable, inputs=[gr.State(False)], outputs=[ *model_checkboxes, submit_models_button, preview_output, submit_button, file_input, default_checkbox, table_selector, *table_outputs, open_model_selection ] ) reset_data.click(open_accordion, inputs=gr.State("reset"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc, default_checkbox, file_input]) reset_data.click( fn=enable_disable, inputs=[gr.State(True)], outputs=[ *model_checkboxes, submit_models_button, preview_output, submit_button, file_input, default_checkbox, table_selector, *table_outputs, open_model_selection ] ) ############################# # QATCH EXECUTION # ############################# with qatch_acc: def change_text(text): return text loading_symbols= {1:"๐“†Ÿ", 2: "๐“†ž ๐“†Ÿ", 3: "๐“†Ÿ ๐“†ž ๐“†Ÿ", 4: "๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ", 5: "๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ", 6: "๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ", 7: "๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ", 8: "๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ", 9: "๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ", 10:"๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ ๐“†ž ๐“†Ÿ", } def generate_loading_text(percent): num_symbols = (round(percent) % 11) + 1 symbols = loading_symbols.get(num_symbols, "๐“†Ÿ") mirrored_symbols = f'{symbols.strip()}' css_symbols = f'{symbols.strip()}' return f"
{css_symbols} Generation {percent}%{mirrored_symbols}
" #return f"{css_symbols}"+f"# Loading {percent}% #"+f"{mirrored_symbols}" def qatch_flow(): orchestrator_generator = OrchestratorGenerator() # TODO: add to target_df column target_df["columns_used"], tables selection # print(input_data['data']['db']) target_df = orchestrator_generator.generate_dataset(connector=input_data['data']['db']) schema_text = utils_get_db_tables_info.utils_extract_db_schema_as_string( db_id = input_data["db_name"], base_path = input_data["data_path"], normalize=False, sql=None ) # TODO: QUERY PREDICTION predictions_dict = {model: pd.DataFrame(columns=['id', 'question', 'predicted_sql', 'time', 'query', 'db_path']) for model in model_list} metrics_conc = pd.DataFrame() for model in input_data["models"]: model_image_path = next((m["image_path"] for m in model_list_dict if m["code"] == model), None) yield gr.Image(model_image_path), gr.Markdown(), gr.Markdown(), gr.Markdown(), metrics_conc, *[predictions_dict[model] for model in model_list] for index, row in target_df.iterrows(): percent_complete = round(((index+1) / len(target_df)) * 100, 2) load_text = f"{generate_loading_text(percent_complete)}" question = row['question'] display_question = f"
Natural Language:
{row['question']}
" # yield gr.Textbox(question), gr.Textbox(), *[predictions_dict[model] for model in input_data["models"]], None yield gr.Image(), gr.Markdown(load_text), gr.Markdown(display_question), gr.Markdown(), metrics_conc, *[predictions_dict[model] for model in model_list] start_time = time.time() # Simulate prediction time.sleep(0.4) prediction = "Prediction_placeholder" display_prediction = f"
Generated SQL:
{prediction}
" # Run real prediction here # prediction = predictor.run(model, schema_text, question) end_time = time.time() # Create a new row as dataframe new_row = pd.DataFrame([{ 'id': index, 'question': question, 'predicted_sql': prediction, 'time': end_time - start_time, 'query': row["query"], 'db_path': input_data["data_path"] }]).dropna(how="all") # Remove only completely empty rows # TODO: use a for loop for col in target_df.columns: if col not in new_row.columns: new_row[col] = row[col] # Update model's prediction dataframe incrementally if not new_row.empty: predictions_dict[model] = pd.concat([predictions_dict[model], new_row], ignore_index=True) # yield gr.Textbox(), gr.Textbox(prediction), *[predictions_dict[model] for model in input_data["models"]], None yield gr.Image(), gr.Markdown(load_text), gr.Markdown(), gr.Markdown(display_prediction), metrics_conc, *[predictions_dict[model] for model in model_list] yield gr.Image(), gr.Markdown(load_text), gr.Markdown(), gr.Markdown(display_prediction), metrics_conc, *[predictions_dict[model] for model in model_list] # END evaluator = OrchestratorEvaluator() for model in input_data["models"]: metrics_df_model = evaluator.evaluate_df( df=predictions_dict[model], target_col_name="query", prediction_col_name="predicted_sql", db_path_name="db_path" ) metrics_df_model['model'] = model metrics_conc = pd.concat([metrics_conc, metrics_df_model], ignore_index=True) if 'valid_efficiency_score' not in metrics_conc.columns: metrics_conc['valid_efficiency_score'] = metrics_conc['VES'] yield gr.Image(), gr.Markdown(), gr.Markdown(), gr.Markdown(), metrics_conc, *[predictions_dict[model] for model in model_list] # Loading Bar with gr.Row(): # progress = gr.Progress() variable = gr.Markdown() # NL -> MODEL -> Generated Query with gr.Row(): with gr.Column(): with gr.Column(): question_display = gr.Markdown() with gr.Column(): gr.Markdown("
โคด
") with gr.Column(): model_logo = gr.Image(visible=True, show_label=False) with gr.Column(): with gr.Column(): prediction_display = gr.Markdown() with gr.Column(): gr.Markdown("
โคด
") dataframe_per_model = {} with gr.Tabs() as model_tabs: tab_dict = {} for model in model_list: with gr.TabItem(model, visible=(model in input_data["models"])) as tab: gr.Markdown(f"**Results for {model}**") tab_dict[model] = tab dataframe_per_model[model] = gr.DataFrame() # download_pred_model = gr.DownloadButton(label="Download Prediction per Model", visible=False) def change_tab(): return [gr.update(visible=(model in input_data["models"])) for model in model_list] submit_models_button.click( change_tab, inputs=[], outputs=[tab_dict[model] for model in model_list] # Update TabItem visibility ) selected_models_display = gr.JSON(label="Final input data", visible=False) metrics_df = gr.DataFrame(visible=False) metrics_df_out = gr.DataFrame(visible=False) submit_models_button.click( fn=qatch_flow, inputs=[], outputs=[model_logo, variable, question_display, prediction_display, metrics_df] + list(dataframe_per_model.values()) ) submit_models_button.click( fn=lambda: gr.update(value=input_data), outputs=[selected_models_display] ) # Works for METRICS metrics_df.change(fn=change_text, inputs=[metrics_df], outputs=[metrics_df_out]) proceed_to_metrics_button = gr.Button("Proceed to Metrics") proceed_to_metrics_button.click( fn=lambda: (gr.update(open=False, visible=True), gr.update(open=True, visible=True)), outputs=[qatch_acc, metrics_acc] ) def allow_download(metrics_df_out): path = os.path.join(".", "data", "data_results", "results.csv") metrics_df_out.to_csv(path, index=False) return gr.update(value=path, visible=True) download_metrics = gr.DownloadButton(label="Download Metrics Evaluation", visible=False) submit_models_button.click( fn=lambda: gr.update(visible=False), outputs=[download_metrics] ) #TODO WHY? # download_metrics.click( # fn=lambda: gr.update(open=True, visible=True), # outputs=[download_metrics] # ) metrics_df_out.change(fn=allow_download, inputs=[metrics_df_out], outputs=[download_metrics]) reset_data = gr.Button("Back to upload data section") reset_data.click(open_accordion, inputs=gr.State("reset"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc, default_checkbox, file_input]) #WHY NOT WORKING? reset_data.click( fn=lambda: gr.update(visible=False), outputs=[download_metrics] ) reset_data.click( fn=enable_disable, inputs=[gr.State(True)], outputs=[ *model_checkboxes, submit_models_button, preview_output, submit_button, file_input, default_checkbox, table_selector, *table_outputs, open_model_selection ] ) ########################################## # METRICS VISUALIZATION SECTION # ########################################## with metrics_acc: #confirmation_text = gr.Markdown("## Metrics successfully loaded") data_path = 'test_results.csv' @gr.render(inputs=metrics_df_out) def function_metrics(metrics_df_out): def load_data_csv_es(): return pd.read_csv(data_path) #return metrics_df_out def calculate_average_metrics(df, selected_metrics): df['avg_metric'] = df[selected_metrics].mean(axis=1) return df def generate_model_colors(): """Generates a unique color map for models in the dataset.""" df = load_data_csv_es() unique_models = df['model'].unique() # Extract unique models num_models = len(unique_models) # Use the Plotly color scale (you can change it if needed) color_palette = pc.qualitative.Plotly # ['#636EFA', '#EF553B', '#00CC96', ...] # If there are more models than colors, cycle through them colors = {model: color_palette[i % len(color_palette)] for i, model in enumerate(unique_models)} return colors MODEL_COLORS = generate_model_colors() # BAR CHART FOR AVERAGE METRICS WITH UPDATE FUNCTION def plot_metric(df, selected_metrics, group_by, selected_models): df = df[df['model'].isin(selected_models)] df = calculate_average_metrics(df, selected_metrics) # Ensure the group_by value is always valid if group_by not in [["tbl_name", "model"], ["model"]]: group_by = ["tbl_name", "model"] # Default avg_metrics = df.groupby(group_by)['avg_metric'].mean().reset_index() fig = px.bar( avg_metrics, x=group_by[0], y='avg_metric', color='model', color_discrete_map=MODEL_COLORS, barmode='group', title=f'Average metric per {group_by[0]} ๐Ÿ“Š', labels={group_by[0]: group_by[0].capitalize(), 'avg_metric': 'Average Metric'}, template='plotly_dark' ) return gr.Plot(fig, visible=True) def update_plot(selected_metrics, group_by, selected_models): df = load_data_csv_es() return plot_metric(df, selected_metrics, group_by, selected_models) # RADAR CHART FOR AVERAGE METRICS PER MODEL WITH UPDATE FUNCTION def plot_radar(df, selected_models): # Filter only selected models df = df[df['model'].isin(selected_models)] # Select relevant metrics selected_metrics = ["cell_precision", "cell_recall", "execution_accuracy", "tuple_cardinality", "tuple_constraint"] # Compute average metrics per test_category and model df = calculate_average_metrics(df, selected_metrics) avg_metrics = df.groupby(['model', 'test_category'])['avg_metric'].mean().reset_index() # Check if data is available if avg_metrics.empty: print("Error: No data available to compute averages.") return go.Figure() fig = go.Figure() categories = avg_metrics['test_category'].unique() for model in selected_models: model_data = avg_metrics[avg_metrics['model'] == model] # Build a list of values for each category (if a value is missing, set it to 0) values = [ model_data[model_data['test_category'] == cat]['avg_metric'].values[0] if cat in model_data['test_category'].values else 0 for cat in categories ] fig.add_trace(go.Scatterpolar( r=values, theta=categories, fill='toself', name=model, line=dict(color=MODEL_COLORS.get(model, "gray")) )) fig.update_layout( polar=dict(radialaxis=dict(visible=True, range=[0, max(avg_metrics['avg_metric'].max(), 0.5)])), # Set the radar range title='โ‡๏ธ Radar Plot of Metrics per Model (Average per Category) โ‡๏ธ ', template='plotly_dark', width=700, height=700 ) return fig def update_radar(selected_models): df = load_data_csv_es() return plot_radar(df, selected_models) # LINE CHART FOR CUMULATIVE TIME WITH UPDATE FUNCTION def plot_cumulative_flow(df, selected_models): df = df[df['model'].isin(selected_models)] fig = go.Figure() for model in selected_models: model_df = df[df['model'] == model].copy() # Calculate cumulative time model_df['cumulative_time'] = model_df['time'].cumsum() # Calculate cumulative number of queries over time model_df['cumulative_queries'] = range(1, len(model_df) + 1) # Select a color for the model color = MODEL_COLORS.get(model, "gray") # Assigned model color fillcolor = color.replace("rgb", "rgba").replace(")", ", 0.2)") # ๐Ÿ”น Makes the area semi-transparent #color = f"rgba({hash(model) % 256}, {hash(model * 2) % 256}, {hash(model * 3) % 256}, 1)" fig.add_trace(go.Scatter( x=model_df['cumulative_time'], y=model_df['cumulative_queries'], mode='lines+markers', name=model, line=dict(width=2, color=color) )) # Adds the underlying colored area (same color but transparent) """ fig.add_trace(go.Scatter( x=model_df['cumulative_time'], y=model_df['cumulative_queries'], fill='tozeroy', mode='none', showlegend=False, # Hides the area in the legend fillcolor=fillcolor )) """ fig.update_layout( title="Cumulative Query Flow Chart ๐Ÿ“ˆ", xaxis_title="Cumulative Time (s)", yaxis_title="Number of Queries Completed", template='plotly_dark', legend_title="Models" ) return fig def update_query_rate(selected_models): df = load_data_csv_es() return plot_cumulative_flow(df, selected_models) # RANKING FOR THE TOP 3 MODELS WITH UPDATE FUNCTION def ranking_text(df, selected_models, ranking_type): #df = load_data_csv_es() df = df[df['model'].isin(selected_models)] df['valid_efficiency_score'] = pd.to_numeric(df['valid_efficiency_score'], errors='coerce') if ranking_type == "valid_efficiency_score": rank_df = df.groupby('model')['valid_efficiency_score'].mean().reset_index() #rank_df = df.groupby('model')['valid_efficiency_score'].mean().reset_index() ascending_order = False # Higher is better elif ranking_type == "time": rank_df = df.groupby('model')['time'].sum().reset_index() rank_df["Ranking Value"] = rank_df["time"].round(2).astype(str) + " s" # Adds "s" for seconds ascending_order = True # For time, lower is better elif ranking_type == "metrics": selected_metrics = ["cell_precision", "cell_recall", "execution_accuracy", "tuple_cardinality", "tuple_constraint"] df = calculate_average_metrics(df, selected_metrics) rank_df = df.groupby('model')['avg_metric'].mean().reset_index() ascending_order = False # Higher is better if ranking_type != "time": rank_df.rename(columns={rank_df.columns[1]: "Ranking Value"}, inplace=True) rank_df["Ranking Value"] = rank_df["Ranking Value"].round(2) # Round values except for time # Sort based on the selected criterion rank_df = rank_df.sort_values(by="Ranking Value", ascending=ascending_order).reset_index(drop=True) # Select only the top 3 models rank_df = rank_df.head(3) # Add medal icons for the top 3 medals = ["๐Ÿฅ‡", "๐Ÿฅˆ", "๐Ÿฅ‰"] rank_df.insert(0, "Rank", medals[:len(rank_df)]) # Build the formatted ranking string ranking_str = "## ๐Ÿ† Model Ranking\n" for _, row in rank_df.iterrows(): ranking_str += f"{row['Rank']} {row['model']} ({row['Ranking Value']})
\n" return ranking_str def update_ranking_text(selected_models, ranking_type): df = load_data_csv_es() return ranking_text(df, selected_models, ranking_type) # RANKING FOR THE 3 WORST RESULTS WITH UPDATE FUNCTION def worst_cases_text(df, selected_models): df = df[df['model'].isin(selected_models)] selected_metrics = ["cell_precision", "cell_recall", "execution_accuracy", "tuple_cardinality", "tuple_constraint"] df = calculate_average_metrics(df, selected_metrics) worst_cases_df = df.groupby(['model', 'tbl_name', 'test_category', 'question', 'query', 'predicted_sql'])['avg_metric'].mean().reset_index() worst_cases_df = worst_cases_df.sort_values(by="avg_metric", ascending=True).reset_index(drop=True) worst_cases_top_3 = worst_cases_df.head(3) worst_cases_top_3["avg_metric"] = worst_cases_top_3["avg_metric"].round(2) worst_str = "## โŒ Top 3 Worst Cases\n" medals = ["๐Ÿฅ‡", "๐Ÿฅˆ", "๐Ÿฅ‰"] for i, row in worst_cases_top_3.iterrows(): worst_str += ( f"{medals[i]} {row['model']} - {row['tbl_name']} - {row['test_category']} ({row['avg_metric']}) \n" f"- Question: {row['question']} \n" f"- Original Query: `{row['query']}` \n" f"- Predicted SQL: `{row['predicted_sql']}` \n\n" ) return worst_str def update_worst_cases_text(selected_models): df = load_data_csv_es() return worst_cases_text(df, selected_models) metrics = ["cell_precision", "cell_recall", "execution_accuracy", "tuple_cardinality", "tuple_constraint"] group_options = { "Table": ["tbl_name", "model"], "Model": ["model"] } df_initial = load_data_csv_es() models = df_initial['model'].unique().tolist() #with gr.Blocks(theme=gr.themes.Default(primary_hue='blue')) as demo: gr.Markdown("""## ๐Ÿ“Š Model Performance Analysis ๐Ÿ“Š Select one or more metrics to calculate the average and visualize histograms and radar plots. """) # Options selection section with gr.Row(): metric_multiselect = gr.CheckboxGroup(choices=metrics, label="Select metrics", value=metrics) model_multiselect = gr.CheckboxGroup(choices=models, label="Select models", value=models) group_radio = gr.Radio(choices=list(group_options.keys()), label="Select grouping", value="Table") output_plot = gr.Plot(visible=False) query_rate_plot = gr.Plot(value=update_query_rate(models)) with gr.Row(): with gr.Column(scale=1): radar_plot = gr.Plot(value=update_radar(models)) with gr.Column(scale=1): ranking_type_radio = gr.Radio( ["valid_efficiency_score", "time", "metrics"], label="Choose ranking criteria", value="valid_efficiency_score" ) ranking_text_display = gr.Markdown(value=update_ranking_text(models, "valid_efficiency_score")) worst_cases_display = gr.Markdown(value=update_worst_cases_text(models)) # Callback functions for updating charts def on_change(selected_metrics, selected_group, selected_models): return update_plot(selected_metrics, group_options[selected_group], selected_models) def on_radar_change(selected_models): return update_radar(selected_models) #metrics_df_out.change(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) proceed_to_metrics_button.click(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) proceed_to_metrics_button.click(update_query_rate, inputs=[model_multiselect], outputs=query_rate_plot) metric_multiselect.change(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) group_radio.change(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) model_multiselect.change(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) model_multiselect.change(update_radar, inputs=model_multiselect, outputs=radar_plot) model_multiselect.change(update_ranking_text, inputs=[model_multiselect, ranking_type_radio], outputs=ranking_text_display) ranking_type_radio.change(update_ranking_text, inputs=[model_multiselect, ranking_type_radio], outputs=ranking_text_display) model_multiselect.change(update_worst_cases_text, inputs=model_multiselect, outputs=worst_cases_display) model_multiselect.change(update_query_rate, inputs=[model_multiselect], outputs=query_rate_plot) reset_data = gr.Button("Back to upload data section") reset_data.click(open_accordion, inputs=gr.State("reset"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc, default_checkbox, file_input]) reset_data.click( fn=lambda: gr.update(visible=False), outputs=[download_metrics] ) reset_data.click( fn=lambda: gr.update(visible=False), outputs=[download_metrics] ) reset_data.click( fn=enable_disable, inputs=[gr.State(True)], outputs=[ *model_checkboxes, submit_models_button, preview_output, submit_button, file_input, default_checkbox, table_selector, *table_outputs, open_model_selection ] ) interface.launch()