import gradio as gr import pandas as pd from glob import glob import matplotlib.pyplot as plt import seaborn as sns from matplotlib.colors import ListedColormap, BoundaryNorm from glob import glob import os import matplotlib.pyplot as plt import seaborn as sns from matplotlib.colors import ListedColormap, BoundaryNorm import pandas as pd # Load text benchmark results noncot_results = glob("results/*.pkl") noncot_results_qwen = glob("results_qwen/*.pkl") # Load vision benchmark results vision_results = glob("results-vision/*.pkl") # Load CoT text benchmark results cot_text_results = glob("results-cot/*.pkl") # Load CoT vision benchmark results # cot_vision_results = glob("results-vision-CoT/*.pkl") # Function to load data, add model type and name def load_data(files, model_type): data = [] for file in files: df = pd.read_pickle(file) df["Model Type"] = model_type df["Model Name"] = file.split("/")[-1].replace(".pkl", "") data.append(df) return pd.concat(data, ignore_index=True) # Load and label all data data = load_data(noncot_results, "Text Only") data_qwen = load_data(noncot_results_qwen, "Text Only") vision_data = load_data(vision_results, "Vision") cot_text_data = load_data(cot_text_results, "CoT Text Only") # cot_vision_data = load_data(cot_vision_results, "CoT Vision") # Combine all data into a single DataFrame all_data = pd.concat([data_qwen, vision_data, cot_text_data], ignore_index=True) all_model_names = all_data["Model Name"].unique() all_text_only_model_names = list( all_data[all_data["Model Type"] == "Text Only"]["Model Name"].unique() ) all_cot_text_only_models = list( all_data[all_data["Model Type"] == "CoT Text Only"]["Model Name"].unique() ) text_only_filtered_raw = None text_only_filtered_raw_cot = None ## Continue with the cold code -- # TODO: Update me to read from all_data for later # Load the csv files into a dict with keys being name of the file and values being the data data = {file: pd.read_pickle(file) for file in noncot_results} # Load the vision files into a dict vision_data = {file: pd.read_pickle(file) for file in vision_results} # Load the CoT text files into a dict cot_text_data = {file: pd.read_pickle(file) for file in cot_text_results} # Load the CoT vision files into a dict # cot_vision_data = {file: pd.read_pickle(file) for file in cot_vision_results} data_qwen = {file: pd.read_pickle(file) for file in noncot_results_qwen} intersection_df = pd.read_pickle( "./intersection_results/gpt-3.5-judge-by_Qwen_5times_intersection_subset_1.pkl" ) # accuracy for each model intersection_df_acc = ( intersection_df.groupby("model_name")["parsed_judge_response"].mean().reset_index() ) intersection_df_acc["Accuracy"] = intersection_df_acc["parsed_judge_response"] * 100 intersection_df_acc.drop("parsed_judge_response", axis=1, inplace=True) intersection_df_acc.sort_values("Accuracy", ascending=False, inplace=True) def calculate_accuracy(df): return df["parsed_judge_response"].mean() * 100 def accuracy_breakdown(df): # 4 level accuracy return (df.groupby("difficulty_level")["parsed_judge_response"].mean() * 100).values # Define the column names with icons headers_with_icons = [ "🤖 Model Name", "⭐ Overall", "📈 Level 1", "🔍 Level 2", "📘 Level 3", "🔬 Level 4", ] column_names = [ "Model Name", "Overall Accuracy", "Level 1 Accuracy", "Level 2 Accuracy", "Level 3 Accuracy", "Level 4 Accuracy", ] # Function to process data def process_data(data): data_for_df = [] for file, df in data.items(): overall_accuracy = round(calculate_accuracy(df), 2) breakdown_accuracy = [round(acc, 2) for acc in accuracy_breakdown(df)] model_name = file.split("/")[-1].replace(".pkl", "") data_for_df.append([model_name, overall_accuracy] + breakdown_accuracy) return data_for_df # Process all data text_data_for_df = process_data(data) text_data_for_df_qwen = process_data(data_qwen) vision_data_for_df = process_data(vision_data) cot_text_data_for_df = process_data(cot_text_data) # cot_vision_data_for_df = process_data(cot_vision_data) # Create DataFrames accuracy_df = pd.DataFrame(text_data_for_df, columns=column_names) accuracy_df_qwen = pd.DataFrame(text_data_for_df_qwen, columns=column_names) vision_accuracy_df = pd.DataFrame(vision_data_for_df, columns=column_names) cot_text_accuracy_df = pd.DataFrame(cot_text_data_for_df, columns=column_names) # cot_vision_accuracy_df = pd.DataFrame(cot_vision_data_for_df, columns=column_names) # Function to finalize DataFrame def finalize_df(df): df = df.round(1) # Round to one decimal place df = df.applymap(lambda x: f"{x:.1f}" if isinstance(x, (int, float)) else x) df.columns = headers_with_icons df.sort_values(by="⭐ Overall", ascending=False, inplace=True) # add a new column with the order (index) df["#"] = range(1, len(df) + 1) # bring rank to the first column cols = df.columns.tolist() cols = cols[-1:] + cols[:-1] df = df[cols] return df # Finalize all DataFrames accuracy_df = finalize_df(accuracy_df) accuracy_df_qwen = finalize_df(accuracy_df_qwen) vision_accuracy_df = finalize_df(vision_accuracy_df) cot_text_accuracy_df = finalize_df(cot_text_accuracy_df) # cot_vision_accuracy_df = finalize_df(cot_vision_accuracy_df) def load_heatmap(evt: gr.SelectData): heatmap_image = gr.Image(f"results/{evt.value}.jpg") return heatmap_image def load_heatmap_qwen(evt: gr.SelectData): heatmap_image = gr.Image(f"results_qwen/{evt.value}.jpg") return heatmap_image def load_vision_heatmap(evt: gr.SelectData): heatmap_image = gr.Image(f"results-vision/{evt.value}.jpg") return heatmap_image def load_cot_heatmap(evt: gr.SelectData): heatmap_image = gr.Image(f"results-cot/{evt.value}.jpg") return heatmap_image def load_cot_vision_heatmap(evt: gr.SelectData): heatmap_image = gr.Image(f"results-vision-CoT/{evt.value}.jpg") return heatmap_image def calculate_order_by_first_substring(selected_models): global text_only_filtered_raw first_columns = all_data[all_data["substring_index"] == 1] query_ids_df = first_columns[first_columns["Model Type"] == "Text Only"] query_ids_df = query_ids_df[query_ids_df["Model Name"].isin(selected_models)] query_ids_df = query_ids_df.groupby("query_id").filter( lambda x: x["parsed_judge_response"].eq(1).all() ) fsm_ids = query_ids_df.fsm_id.unique() text_only = all_data[all_data["Model Type"] == "Text Only"] text_only_filtered = text_only[text_only["fsm_id"].isin(fsm_ids)] text_only_filtered_raw = text_only_filtered.copy() query_ids = text_only_filtered.query_id.unique() text_only_filtered = ( text_only_filtered.groupby(["Model Name"])["parsed_judge_response"] .mean() .reset_index() ) text_only_filtered["Accuracy"] = text_only_filtered["parsed_judge_response"] * 100 text_only_filtered.drop("parsed_judge_response", axis=1, inplace=True) text_only_filtered["Accuracy"] = text_only_filtered["Accuracy"].apply( lambda x: round(x, 2) ) text_only_filtered.sort_values("Accuracy", ascending=False, inplace=True) number_of_queries = len(query_ids) number_of_fsms = len(fsm_ids) return text_only_filtered, number_of_queries, number_of_fsms def calculate_order_by_first_substring_cot(selected_models): global text_only_filtered_raw_cot first_columns = all_data[all_data["substring_index"] == 1] query_ids_df = first_columns[first_columns["Model Type"] == "CoT Text Only"] query_ids_df = query_ids_df[query_ids_df["Model Name"].isin(selected_models)] query_ids_df = query_ids_df.groupby("query_id").filter( lambda x: x["parsed_judge_response"].eq(1).all() ) fsm_ids = query_ids_df.fsm_id.unique() text_only = all_data[all_data["Model Type"] == "CoT Text Only"] text_only_filtered = text_only[text_only["fsm_id"].isin(fsm_ids)] text_only_filtered_raw_cot = text_only_filtered.copy() query_ids = text_only_filtered.query_id.unique() text_only_filtered = ( text_only_filtered.groupby(["Model Name"])["parsed_judge_response"] .mean() .reset_index() ) text_only_filtered["Accuracy"] = text_only_filtered["parsed_judge_response"] * 100 text_only_filtered.drop("parsed_judge_response", axis=1, inplace=True) text_only_filtered["Accuracy"] = text_only_filtered["Accuracy"].apply( lambda x: round(x, 2) ) text_only_filtered.sort_values("Accuracy", ascending=False, inplace=True) number_of_queries = len(query_ids) number_of_fsms = len(fsm_ids) return text_only_filtered, number_of_queries, number_of_fsms def generate_heatmap_for_specific_model(model_name): global text_only_filtered_raw cmap = ListedColormap(["lightblue", "red", "green"]) bounds = [-1.5, -0.5, 0.5, 1.5] norm = BoundaryNorm(bounds, cmap.N) model_df = text_only_filtered_raw[ text_only_filtered_raw["Model Name"] == model_name ] model_df["fsm_info"] = model_df.apply( lambda x: f"{x['num_states']} states, {x['num_alphabet']} alphabet", axis=1 ) model_df = model_df.sort_values(by=["num_states", "num_alphabet"]) pivot_df = ( model_df.pivot_table( index="fsm_info", columns="substring_index", values="parsed_judge_response", aggfunc="first", ) .fillna(-1) .astype(float) ) # Dynamically adjust figure size num_rows, num_cols = pivot_df.shape fig_width = max(12, num_cols * 0.5) # Adjust width per column fig_height = max(8, num_rows * 0.4) # Adjust height per row fig, ax = plt.subplots(figsize=(fig_width, fig_height)) sns.heatmap( pivot_df, cmap=cmap, linewidths=1, linecolor="black", norm=norm, cbar=False, square=True, ax=ax, ) plt.title(f"Heatmap for Model: {model_name}", fontsize=12) plt.xlabel("Substring Index") plt.ylabel("FSM (States, Alphabet)") plt.xticks(rotation=45) sns.despine(ax=ax, top=True, right=True, left=True, bottom=True) return fig def generate_heatmap_for_specific_model_cot(model_name): global text_only_filtered_raw_cot cmap = ListedColormap(["lightblue", "red", "green"]) bounds = [-1.5, -0.5, 0.5, 1.5] norm = BoundaryNorm(bounds, cmap.N) model_df = text_only_filtered_raw_cot[ text_only_filtered_raw_cot["Model Name"] == model_name ] model_df["fsm_info"] = model_df.apply( lambda x: f"{x['num_states']} states, {x['num_alphabet']} alphabet", axis=1 ) model_df = model_df.sort_values(by=["num_states", "num_alphabet"]) pivot_df = ( model_df.pivot_table( index="fsm_info", columns="substring_index", values="parsed_judge_response", aggfunc="first", ) .fillna(-1) .astype(float) ) # Dynamically adjust figure size num_rows, num_cols = pivot_df.shape fig_width = max(12, num_cols * 0.5) # Adjust width per column fig_height = max(8, num_rows * 0.4) # Adjust height per row fig, ax = plt.subplots(figsize=(fig_width, fig_height)) sns.heatmap( pivot_df, cmap=cmap, linewidths=1, linecolor="black", norm=norm, cbar=False, square=True, ax=ax, ) plt.title(f"Heatmap for Model: {model_name}", fontsize=12) plt.xlabel("Substring Index") plt.ylabel("FSM (States, Alphabet)") plt.xticks(rotation=45) sns.despine(ax=ax, top=True, right=True, left=True, bottom=True) return fig def generate_heatmap_for_intersection_model(model_name): global intersection_df cmap = ListedColormap(["lightblue", "red", "green"]) bounds = [-1.5, -0.5, 0.5, 1.5] norm = BoundaryNorm(bounds, cmap.N) # Filter for a specific model model_df = intersection_df[intersection_df["model_name"] == model_name].copy() if model_df.empty: print(f"No data found for model {model_name}. Skipping heatmap generation.") return None model_df["fsm_info"] = model_df.apply( lambda x: f"{x['num_states']} states, {x['num_alphabet']} alphabet", axis=1 ) model_df = model_df.sort_values(by=["num_states", "num_alphabet"]) pivot_df = ( model_df.pivot_table( index="fsm_info", columns="substring_index", values="parsed_judge_response", aggfunc="first", ) .fillna(-1) .astype(float) ) # Dynamically adjust figure size num_rows, num_cols = pivot_df.shape fig_width = max(12, num_cols * 0.5) fig_height = max(8, num_rows * 0.4) fig, ax = plt.subplots(figsize=(fig_width, fig_height)) sns.heatmap( pivot_df, cmap=cmap, linewidths=1, linecolor="black", norm=norm, cbar=False, square=True, ax=ax, ) plt.title(f"Heatmap for Model: {model_name}", fontsize=12) plt.xlabel("Substring Index") plt.ylabel("FSM (States, Alphabet)") plt.xticks(rotation=45) sns.despine(ax=ax, top=True, right=True, left=True, bottom=True) plt.close(fig) return fig def show_constraint_heatmap(evt: gr.SelectData): model_name = evt.value return generate_heatmap_for_specific_model(model_name) def show_constraint_heatmap_cot(evt: gr.SelectData): model_name = evt.value return generate_heatmap_for_specific_model_cot(model_name) def show_intersection_heatmap(evt: gr.SelectData): model_name = evt.value return generate_heatmap_for_intersection_model(model_name) with gr.Blocks() as demo: gr.Markdown("# FSM Benchmark Leaderboard") with gr.Tab("Text-only Benchmark"): gr.Markdown("# Text-only Leaderboard (Judged by Qwen)") leader_board = gr.Dataframe(accuracy_df_qwen, headers=headers_with_icons) gr.Markdown("## Heatmap") heatmap_image_qwen = gr.Image(label="", show_label=False) leader_board.select(fn=load_heatmap_qwen, outputs=[heatmap_image_qwen]) with gr.Tab("Vision Benchmark", visible=False): gr.Markdown("# Vision Benchmark Leaderboard") leader_board_vision = gr.Dataframe( vision_accuracy_df, headers=headers_with_icons ) gr.Markdown("## Heatmap") heatmap_image_vision = gr.Image(label="", show_label=False) leader_board_vision.select( fn=load_vision_heatmap, outputs=[heatmap_image_vision] ) with gr.Tab("Text-only Benchmark (CoT)", visible=False): gr.Markdown("# Text-only Leaderboard (CoT)") cot_leader_board_text = gr.Dataframe( cot_text_accuracy_df, headers=headers_with_icons ) gr.Markdown("## Heatmap") cot_heatmap_image_text = gr.Image(label="", show_label=False) cot_leader_board_text.select( fn=load_cot_heatmap, outputs=[cot_heatmap_image_text] ) # with gr.Tab("Vision Benchmark (CoT)"): # gr.Markdown("# Vision Benchmark Leaderboard (CoT)") # cot_leader_board_vision = gr.Dataframe( # cot_vision_accuracy_df, headers=headers_with_icons # ) # gr.Markdown("## Heatmap") # cot_heatmap_image_vision = gr.Image(label="", show_label=False) # cot_leader_board_vision.select( # fn=load_cot_vision_heatmap, outputs=[cot_heatmap_image_vision] # ) with gr.Tab("Constraint Text-only Results"): gr.Markdown("## Constraint Text-only Leaderboard by first substring") included_models = gr.CheckboxGroup( label="Models to include", choices=all_text_only_model_names, value=all_text_only_model_names, interactive=True, ) with gr.Row(): number_of_queries = gr.Textbox(label="Number of included queries") number_of_fsms = gr.Textbox(label="Number of included FSMs") constrained_leader_board_text = gr.Dataframe() constrained_leader_board_plot = gr.Plot() included_models.select( fn=calculate_order_by_first_substring, inputs=[included_models], outputs=[constrained_leader_board_text, number_of_queries, number_of_fsms], queue=True, ) with gr.Tab("Constraint Text-only Results (CoT)", visible=False): gr.Markdown("## Constraint Text-only Leaderboard by first substrin (CoT)") included_models_cot = gr.CheckboxGroup( label="Models to include", choices=all_cot_text_only_models, value=all_cot_text_only_models, interactive=True, ) with gr.Row(): number_of_queries_cot = gr.Textbox(label="Number of included queries") number_of_fsms_cot = gr.Textbox(label="Number of included FSMs") constrained_leader_board_text_cot = gr.Dataframe() constrained_leader_board_plot_cot = gr.Plot() with gr.Tab("Majority Vote (Subset 1)", visible=False): gr.Markdown("## Majority Vote (Subset 1)") intersection_leader_board = gr.Dataframe( intersection_df_acc, headers=headers_with_icons ) heatmap_image = gr.Plot(label="Model Heatmap") with gr.Tab("Text-only Benchmark (deprecated)", visible=False): gr.Markdown("# Text-only Leaderboard") leader_board = gr.Dataframe(accuracy_df, headers=headers_with_icons) gr.Markdown("## Heatmap") heatmap_image = gr.Image(label="", show_label=False) leader_board.select(fn=load_heatmap, outputs=[heatmap_image]) # ============ Callbacks ============ included_models_cot.select( fn=calculate_order_by_first_substring_cot, inputs=[included_models_cot], outputs=[ constrained_leader_board_text_cot, number_of_queries_cot, number_of_fsms_cot, ], queue=True, ) constrained_leader_board_text.select( fn=show_constraint_heatmap, outputs=[constrained_leader_board_plot] ) constrained_leader_board_text_cot.select( fn=show_constraint_heatmap_cot, outputs=[constrained_leader_board_plot_cot] ) intersection_leader_board.select( fn=show_intersection_heatmap, outputs=[heatmap_image] ) demo.launch()