import gradio as gr import pandas as pd import os from qatch.connectors.sqlite_connector import SqliteConnector from qatch.generate_dataset.orchestrator_generator import OrchestratorGenerator from qatch.evaluate_dataset.orchestrator_evaluator import OrchestratorEvaluator from predictor.orchestrator_predictor import OrchestratorPredictor import utilities as us import plotly.express as px import plotly.graph_objects as go with open('style.css', 'r') as file: css = file.read() # DataFrame di default df_default = pd.DataFrame({ 'Name': ['Alice', 'Bob', 'Charlie'], 'Age': [25, 30, 35], 'City': ['New York', 'Los Angeles', 'Chicago'] }) models_path = "models.csv" # Variabile globale per tenere traccia dei dati correnti df_current = df_default.copy() input_data = { 'input_method': "", 'data_path': "", 'db_name': "", 'data': { 'data_frames': {}, # dictionary of dataframes 'db': None # SQLITE3 database object }, 'models': [] } def load_data(file, path, use_default): """Carica i dati da un file, un percorso o usa il DataFrame di default.""" global df_current if use_default: input_data["input_method"] = 'default' input_data["data_path"] = os.path.join(".", "data", "datainterface", "mytable.sqlite") input_data["db_name"] = os.path.splitext(os.path.basename(input_data["data_path"]))[0] input_data["data"]['data_frames'] = {'MyTable': df_current} #TODO assegna il db a input_data["data"]['db'] df_current = df_default.copy() # Ripristina i dati di default return input_data["data"]['data_frames'] selected_inputs = sum([file is not None, bool(path), use_default]) if selected_inputs > 1: return 'Errore: Selezionare solo un metodo di input alla volta.' if file is not None: try: input_data["input_method"] = 'uploaded_file' input_data["db_name"] = os.path.splitext(os.path.basename(file))[0] input_data["data_path"] = os.path.join(".", "data", f"data_interface{input_data['db_name']}.sqlite") input_data["data"] = us.load_data(input_data["data_path"], input_data["db_name"]) df_current = input_data["data"]['data_frames'].get('MyTable', df_default) # Carica il DataFrame print(df_current) print(input_data["data"]) if( input_data["data"]['data_frames'] and not input_data["data"]['db']): table2primary_key = {} print("ok") for table_name, df in input_data["data"]['data_frames'].items(): # Assign primary keys for each table table2primary_key[table_name] = 'id' print("ok2") input_data["data"]["db"] = SqliteConnector( relative_db_path=input_data["data_path"], db_name=input_data["db_name"], tables= input_data["data"]['data_frames'], table2primary_key=table2primary_key ) print(input_data["data"]["db"]) return input_data["data"]['data_frames'] except Exception as e: return f'Errore nel caricamento del file: {e}' if path: if not os.path.exists(path): return 'Errore: Il percorso specificato non esiste.' try: input_data["input_method"] = 'uploaded_file' input_data["data_path"] = path input_data["db_name"] = os.path.splitext(os.path.basename(path))[0] input_data["data"] = us.load_data(input_data["data_path"], input_data["db_name"]) df_current = input_data["data"]['data_frames'].get('MyTable', df_default) # Carica il DataFrame return input_data["data"]['data_frames'] except Exception as e: return f'Errore nel caricamento del file dal percorso: {e}' return input_data["data"]['data_frames'] def preview_default(use_default): """Mostra il DataFrame di default se il checkbox è selezionato.""" if use_default: return df_default # Mostra il DataFrame di default return df_current # Mostra il DataFrame corrente, che potrebbe essere stato modificato def update_df(new_df): """Aggiorna il DataFrame corrente.""" global df_current # Usa la variabile globale per aggiornarla df_current = new_df return df_current def open_accordion(target): # Apre uno e chiude l'altro if target == "reset": return gr.update(open=True), gr.update(open=False, visible=False), gr.update(open=False, visible=False), gr.update(open=False, visible=False), gr.update(open=False, visible=False) elif target == "model_selection": return gr.update(open=False), gr.update(open=False), gr.update(open=True, visible=True), gr.update(open=False), gr.update(open=False) # Interfaccia Gradio interface = gr.Blocks(theme='d8ahazard/rd_blue', css_paths='style.css') with interface: gr.Markdown("# QATCH") data_state = gr.State(None) # Memorizza i dati caricati upload_acc = gr.Accordion("Upload your data section", open=True, visible=True) select_table_acc = gr.Accordion("Select tables", open=False, visible=False) select_model_acc = gr.Accordion("Select models", open=False, visible=False) qatch_acc = gr.Accordion("QATCH execution", open=False, visible=False) metrics_acc = gr.Accordion("Metrics", open=False, visible=False) ################################# # PARTE DI INSERIMENTO DEL DB # ################################# with upload_acc: gr.Markdown("## Caricamento dei Dati") file_input = gr.File(label="Trascina e rilascia un file", file_types=[".csv", ".xlsx", ".sqlite"]) path_input = gr.Textbox(label="Oppure inserisci il percorso locale del file") with gr.Row(): default_checkbox = gr.Checkbox(label="Usa DataFrame di default") preview_output = gr.DataFrame(interactive=True, visible=True, value=df_default) submit_button = gr.Button("Carica Dati", interactive=False) # Disabilitato di default output = gr.JSON(visible=False) # Output dizionario # Funzione per abilitare il bottone se sono presenti dati da caricare def enable_submit(file, path, use_default): return gr.update(interactive=bool(file or path or use_default)) # Abilita il bottone quando i campi di input sono valorizzati file_input.change(fn=enable_submit, inputs=[file_input, path_input, default_checkbox], outputs=[submit_button]) path_input.change(fn=enable_submit, inputs=[file_input, path_input, default_checkbox], outputs=[submit_button]) default_checkbox.change(fn=enable_submit, inputs=[file_input, path_input, default_checkbox], outputs=[submit_button]) # Mostra l'anteprima del DataFrame di default quando il checkbox è selezionato default_checkbox.change(fn=preview_default, inputs=[default_checkbox], outputs=[preview_output]) preview_output.change(fn=update_df, inputs=[preview_output], outputs=[preview_output]) def handle_output(file, path, use_default): """Gestisce l'output quando si preme il bottone 'Carica Dati'.""" result = load_data(file, path, use_default) if isinstance(result, dict): # Se result è un dizionario di DataFrame if len(result) == 1: # Se c'è solo una tabella return ( gr.update(visible=False), # Nasconde l'output JSON result, # Salva lo stato dei dati gr.update(visible=False), # Nasconde la selezione tabella result, # Mantiene lo stato dei dati gr.update(interactive=False), # Disabilita il pulsante di submit gr.update(visible=True, open=True), # Passa direttamente a select_model_acc gr.update(visible=True, open=False) ) else: return ( gr.update(visible=False), result, gr.update(open=True, visible=True), result, gr.update(interactive=False), gr.update(visible=False), # Mantiene il comportamento attuale gr.update(visible=True, open=True) ) else: return ( gr.update(visible=False), None, gr.update(open=False, visible=True), None, gr.update(interactive=True), gr.update(visible=False), gr.update(visible=True, open=True) ) submit_button.click( fn=handle_output, inputs=[file_input, path_input, default_checkbox], outputs=[output, output, select_table_acc, data_state, submit_button, select_model_acc, upload_acc] ) ###################################### # PARTE DI SELEZIONE DELLE TABELLE # ###################################### with select_table_acc: table_selector = gr.CheckboxGroup(choices=[], label="Seleziona le tabelle da visualizzare", value=[]) table_outputs = [gr.DataFrame(label=f"Tabella {i+1}", interactive=True, visible=False) for i in range(5)] selected_table_names = gr.Textbox(label="Tabelle selezionate", visible=False, interactive=False) # Bottone di selezione modelli (inizialmente disabilitato) open_model_selection = gr.Button("Choose your models", interactive=False) def update_table_list(data): """Aggiorna dinamicamente la lista delle tabelle disponibili.""" if isinstance(data, dict) and data: table_names = list(data.keys()) # Ritorna solo i nomi delle tabelle return gr.update(choices=table_names, value=[]) # Reset delle selezioni return gr.update(choices=[], value=[]) def show_selected_tables(data, selected_tables): """Mostra solo le tabelle selezionate dall'utente e abilita il bottone.""" updates = [] if isinstance(data, dict) and data: available_tables = list(data.keys()) # Nomi effettivamente disponibili selected_tables = [t for t in selected_tables if t in available_tables] # Filtra selezioni valide tables = {name: data[name] for name in selected_tables} # Filtra i DataFrame for i, (name, df) in enumerate(tables.items()): updates.append(gr.update(value=df, label=f"Tabella: {name}", visible=True)) # Se ci sono meno di 5 tabelle, nascondi gli altri DataFrame for _ in range(len(tables), 5): updates.append(gr.update(visible=False)) else: updates = [gr.update(value=pd.DataFrame(), visible=False) for _ in range(5)] # Abilitare/disabilitare il bottone in base alle selezioni button_state = bool(selected_tables) # True se almeno una tabella è selezionata, False altrimenti updates.append(gr.update(interactive=button_state)) # Aggiorna stato bottone return updates def show_selected_table_names(selected_tables): """Mostra i nomi delle tabelle selezionate quando si preme il bottone.""" if selected_tables: return gr.update(value=", ".join(selected_tables), visible=False) return gr.update(value="", visible=False) # Aggiorna automaticamente la lista delle checkbox quando `data_state` cambia data_state.change(fn=update_table_list, inputs=[data_state], outputs=[table_selector]) # Aggiorna le tabelle visibili e lo stato del bottone in base alle selezioni dell'utente table_selector.change(fn=show_selected_tables, inputs=[data_state, table_selector], outputs=table_outputs + [open_model_selection]) # Mostra la lista delle tabelle selezionate quando si preme "Choose your models" open_model_selection.click(fn=show_selected_table_names, inputs=[table_selector], outputs=[selected_table_names]) open_model_selection.click(open_accordion, inputs=gr.State("model_selection"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc]) #################################### # PARTE DI SELEZIONE DEL MODELLO # #################################### with select_model_acc: gr.Markdown("**Model Selection**") # Supponiamo che `us.read_models_csv` restituisca anche il percorso dell'immagine model_list_dict = us.read_models_csv(models_path) model_list = [model["name"] for model in model_list_dict] model_images = [model["image_path"] for model in model_list_dict] # Creazione dinamica di checkbox con immagini model_checkboxes = [] for model, image_path in zip(model_list, model_images): with gr.Row(): with gr.Column(scale=1): gr.Image(image_path, show_label=False) with gr.Column(scale=2): model_checkboxes.append(gr.Checkbox(label=model, value=False)) selected_models_output = gr.JSON(visible = False) # Funzione per ottenere i modelli selezionati def get_selected_models(*model_selections): selected_models = [model for model, selected in zip(model_list, model_selections) if selected] input_data['models'] = selected_models button_state = bool(selected_models) # True se almeno un modello è selezionato, False altrimenti return selected_models, gr.update(open=True, visible=True), gr.update(interactive=button_state) # Bottone di submit (inizialmente disabilitato) submit_models_button = gr.Button("Submit Models", interactive=False) # Collegamento dei checkbox agli eventi di selezione for checkbox in model_checkboxes: checkbox.change( fn=get_selected_models, inputs=model_checkboxes, outputs=[selected_models_output, select_model_acc, submit_models_button] ) submit_models_button.click( fn=lambda *args: (get_selected_models(*args), gr.update(open=False, visible=True), gr.update(open=True, visible=True)), inputs=model_checkboxes, outputs=[selected_models_output, select_model_acc, qatch_acc] ) reset_data = gr.Button("Open upload data section") reset_data.click(open_accordion, inputs=gr.State("reset"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc]) ############################### # PARTE DI ESECUZIONE QATCH # ############################### with qatch_acc: selected_models_display = gr.JSON(label="Modelli selezionati") submit_models_button.click( fn=lambda: gr.update(value=input_data), outputs=[selected_models_display] ) proceed_to_metrics_button = gr.Button("Proceed to Metrics") proceed_to_metrics_button.click( fn=lambda: (gr.update(open=False, visible=True), gr.update(open=True, visible=True)), outputs=[qatch_acc, metrics_acc] ) reset_data = gr.Button("Open upload data section") reset_data.click(open_accordion, inputs=gr.State("reset"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc]) ####################################### # PARTE DI VISUALIZZAZIONE METRICHE # ####################################### with metrics_acc: confirmation_text = gr.Markdown("## Metrics successfully loaded") data_path = 'metrics_random2.csv' def load_data_csv_es(): return pd.read_csv(data_path) def calculate_average_metrics(df, selected_metrics): df['avg_metric'] = df[selected_metrics].mean(axis=1) return df def plot_metric(df, selected_metrics, group_by, selected_models): df = df[df['model'].isin(selected_models)] df = calculate_average_metrics(df, selected_metrics) avg_metrics = df.groupby(group_by)['avg_metric'].mean().reset_index() fig = px.bar( avg_metrics, x=group_by[0], y='avg_metric', color=group_by[-1], barmode='group', title=f'Media metrica per {group_by[0]}', labels={group_by[0]: group_by[0].capitalize(), 'avg_metric': 'Media Metrica'}, template='plotly_dark' ) return fig def plot_radar(df, selected_models): radar_data = [] for model in selected_models: model_df = df[df['model'] == model] valid_efficiency = model_df['valid_efficiency_score'].mean() avg_time = model_df['time'].mean() avg_tuple_order = model_df['tuple_order'].dropna().mean() radar_data.append({ 'model': model, 'valid_efficiency_score': valid_efficiency, 'time': avg_time, 'tuple_order': avg_tuple_order }) radar_df = pd.DataFrame(radar_data) categories = ['valid_efficiency_score', 'time', 'tuple_order'] # Calcola il range dinamico per il grafico min_val = radar_df[categories].min().min() max_val = radar_df[categories].max().max() radar_df[categories] = (radar_df[categories] - min_val) / (max_val - min_val) fig = go.Figure() for _, row in radar_df.iterrows(): fig.add_trace(go.Scatterpolar( r=[row[cat] for cat in categories], theta=categories, fill='toself', name=row['model'] )) fig.update_layout( polar=dict(radialaxis=dict(visible=True, range=[min_val, max_val])), title='Radar Plot delle Metriche per Modello', template='plotly_dark', width=700, height=700 ) return fig def plot_query_rate(df, selected_models, show_labels): df = df[df['model'].isin(selected_models)] fig = go.Figure() for model in selected_models: model_df = df[df['model'] == model].copy() model_df['cumulative_time'] = model_df['time'].cumsum() model_df['query_rate'] = 1 / model_df['time'] fig.add_trace(go.Scatter( x=model_df['cumulative_time'], y=model_df['query_rate'], mode='lines+markers', name=model, line=dict(width=2) )) if show_labels: prev_category = None prev_time = -float('inf') y_positions = [1.1, 1.3] y_idx = 0 for i, row in model_df.iterrows(): current_category = row['test_category'] if current_category != prev_category and row['cumulative_time'] - prev_time > 5: fig.add_vline(x=row['cumulative_time'], line_width=1, line_dash="dash", line_color="gray") fig.add_annotation( x=row['cumulative_time'], y=max(model_df['query_rate']) * y_positions[y_idx % 2], text=current_category, showarrow=False, font=dict(size=10, color="white"), textangle=45, yshift=10, bgcolor="rgba(0,0,0,0.6)" ) prev_category = current_category prev_time = row['cumulative_time'] y_idx += 1 fig.update_layout( title="Rate di Generazione delle Query per Modello", xaxis_title="Tempo Cumulativo (s)", yaxis_title="Query al Secondo", template='plotly_dark', legend_title="Modelli" ) return fig def update_plot(selected_metrics, group_by, selected_models): df = load_data_csv_es() return plot_metric(df, selected_metrics, group_by, selected_models) def update_radar(selected_models): df = load_data_csv_es() return plot_radar(df, selected_models) def update_query_rate(selected_models, show_labels): df = load_data_csv_es() return plot_query_rate(df, selected_models, show_labels) def plot_query_time_evolution(df, selected_models): # Filtriamo i dati per i modelli selezionati df = df[df['model'].isin(selected_models)] # Ordinare per modello e tempo per tracciare l'evoluzione df_sorted = df.sort_values(by=['model', 'time']) fig = go.Figure() # Aggiungiamo una traccia per ogni modello for model in selected_models: model_df = df_sorted[df_sorted['model'] == model] fig.add_trace(go.Scatter( x=model_df.index, y=model_df['time'], mode='lines+markers', name=model, line=dict(shape='linear'), text=model_df['model'] )) fig.update_layout( title="Evoluzione del Tempo di Generazione per Modello", xaxis_title="Indice della Query", yaxis_title="Tempo (s)", template='plotly_dark' ) return fig metrics = ["cell_precision", "cell_recall", "execution_accuracy", "tuple_cardinality", "tuple_constraint"] group_options = { "SQL Category": ["test_category", "model"], "Tabella": ["tbl_name", "model"], "Modello": ["model"] } df_initial = load_data_csv_es() models = df_initial['model'].unique().tolist() #with gr.Blocks(theme=gr.themes.Default(primary_hue='blue')) as demo: gr.Markdown("""## Analisi delle prestazioni dei modelli Seleziona una o più metriche per calcolare la media e visualizzare gli istogrammi e radar plots. """) # Sezione di selezione delle opzioni with gr.Row(): metric_multiselect = gr.CheckboxGroup(choices=metrics, label="Seleziona le metriche") model_multiselect = gr.CheckboxGroup(choices=models, label="Seleziona i modelli", value=models) group_radio = gr.Radio(choices=list(group_options.keys()), label="Seleziona il raggruppamento", value="SQL Category") #show_labels_checkbox = gr.Checkbox(label="Mostra etichette test category", value=True) with gr.Row(): output_plot = gr.Plot() # Dividi la pagina in due colonne with gr.Row(): with gr.Column(scale=1): # Imposta la colonna a occupare metà della larghezza radar_plot = gr.Plot(value=update_radar(models)) with gr.Column(scale=2): # Imposta la seconda colonna a occupare l'altra metà show_labels_checkbox = gr.Checkbox(label="Mostra etichette test category", value=True) query_rate_plot = gr.Plot(value=update_query_rate(models, True)) # Funzioni di callback per il cambiamento dei grafici def on_change(selected_metrics, selected_group, selected_models): return update_plot(selected_metrics, group_options[selected_group], selected_models) def on_radar_change(selected_models): return update_radar(selected_models) show_labels_checkbox.change(update_query_rate, inputs=[model_multiselect, show_labels_checkbox], outputs=query_rate_plot) metric_multiselect.change(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) group_radio.change(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) model_multiselect.change(on_change, inputs=[metric_multiselect, group_radio, model_multiselect], outputs=output_plot) model_multiselect.change(on_radar_change, inputs=model_multiselect, outputs=radar_plot) model_multiselect.change(update_query_rate, inputs=[model_multiselect, show_labels_checkbox], outputs=query_rate_plot) reset_data = gr.Button("Open upload data section") reset_data.click(open_accordion, inputs=gr.State("reset"), outputs=[upload_acc, select_table_acc, select_model_acc, qatch_acc, metrics_acc]) interface.launch()