Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import gradio as gr | |
| import pickle | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| import pandas as pd | |
| import tempfile | |
| import os | |
| # Split the styling into two separate functions for clarity and simplicity | |
| def style_feasible_column(val): | |
| """Style for the Feasible? column""" | |
| if val == 'Success': | |
| return 'color:white;background-color: lightgreen' | |
| elif val == 'Failed': | |
| return 'color:white;background-color: lightcoral' | |
| return '' | |
| def style_size_column(val): | |
| """Style for the Size column based on proximity to target""" | |
| try: | |
| val_float = float(val) | |
| distance = val_float - 3.0 # Signed distance from target | |
| abs_distance = abs(distance) | |
| # Calculate width percentage based on distance | |
| max_distance = 2.5 | |
| width_pct = 100 - min(abs_distance / max_distance * 100, 100) | |
| # Determine color based on value position relative to target | |
| if distance < 0: | |
| color = f"rgba(0, 128, 128, {min(1.0, 0.4 + 0.6*(1-abs_distance/max_distance))})" # Teal for below | |
| else: | |
| color = f"rgba(230, 97, 0, {min(1.0, 0.4 + 0.6*(1-abs_distance/max_distance))})" # Orange for above | |
| # Text styling based on proximity to target | |
| if abs_distance > 3: | |
| text_color = "grey" | |
| elif abs_distance > 1: | |
| text_color = "black" | |
| else: | |
| text_color = "white" | |
| font_weight = "bold" if abs_distance < 0.5 else "normal" | |
| # Create gradient style | |
| return ( | |
| f"background: linear-gradient(90deg, {color} {width_pct}%, transparent {width_pct}%); " | |
| f"color: {text_color}; " | |
| f"font-weight: {font_weight}; " | |
| ) | |
| except (ValueError, TypeError): | |
| return '' | |
| # Simulation function for electrospraying | |
| def sim_espray_constrained(x, noise_se=None): | |
| # Ensure x is a numpy array with float data type | |
| x = np.array(x, dtype=float) | |
| # Ensure x is a 2D array | |
| if x.ndim == 1: | |
| x = x.reshape(1, -1) | |
| # Define the equations | |
| conc = x[:, 0] | |
| flow_rate = x[:, 1] | |
| voltage = x[:, 2] | |
| solvent = x[:, 3] | |
| diameter = (np.sqrt(conc) * np.sqrt(flow_rate)) / np.log2(voltage) * 10 + 0.4 + solvent # Diameter in micrometers | |
| if noise_se is not None: | |
| diameter = diameter + noise_se * np.random.randn(*diameter.shape) | |
| exp_con = (np.log(flow_rate) * (solvent - 0.5) + 1.40 >= 0).astype(float) | |
| return np.column_stack((diameter, exp_con)) | |
| # Initialize experiment data | |
| X_init = np.array([[0.5, 15, 10, 0], | |
| [0.5, 0.1, 10, 1], | |
| [3, 20, 15, 0], | |
| [1, 20, 10, 1], | |
| [0.2, 0.02, 10, 1]]) | |
| Y_init = sim_espray_constrained(X_init) | |
| exp_record_df = pd.DataFrame(X_init, columns=['Concentration (%w/v)', 'Flow Rate (mL/h)', 'Voltage (kV)', 'Solvent']) | |
| exp_record_df['Size (um)'] = Y_init[:, 0] | |
| exp_record_df['Solvent'] = ['DMAc' if x == 0 else 'CHCl3' for x in exp_record_df['Solvent']] | |
| exp_record_df['Feasible?'] = ['Success' if x == 1 else 'Failed' for x in Y_init[:, 1]] | |
| # Apply each styling function to its specific column | |
| prior_experiments_display = exp_record_df.style\ | |
| .map(style_feasible_column, subset=['Feasible?'])\ | |
| .map(style_size_column, subset=['Size (um)'])\ | |
| .format(precision=3) | |
| # Functions for data processing and visualization | |
| def import_results(): | |
| strategies = ['qEI', 'qEI_vi_mixed_con', 'qEICF_vi_mixed_con', 'rnd'] | |
| # Load results from pickle file | |
| with open('best_distances.pkl', 'rb') as f: | |
| best_distances = pickle.load(f) | |
| # vstack all values in best_distances | |
| best_distances_vstack = {k: np.vstack(best_distances[k]) for k in strategies} | |
| best_distances_all_trials = -np.vstack([best_distances_vstack[k] for k in strategies]) | |
| best_distances_all_trials_df = pd.DataFrame(best_distances_all_trials) | |
| best_distances_all_trials_df['strategy'] = np.repeat(['Vanilla BO', 'Constrained BO', 'CCBO', 'Random'], 20) | |
| best_distances_all_trials_df['trial'] = list(range(20)) * len(strategies) | |
| best_distances_df_long = pd.melt(best_distances_all_trials_df, id_vars=['strategy', 'trial'], var_name='iteration', value_name='regret') | |
| return best_distances_df_long | |
| def calc_human_performance(df): | |
| # Make a copy of the dataframe to avoid modifying the original | |
| df_copy = df.copy() | |
| # convert back solvent to 0 and 1 | |
| df_copy['Solvent'] = [0 if x == 'DMAc' else 1 for x in df_copy['Solvent']] | |
| TARGET_SIZE = 3.0 # Example target size | |
| ROUNDS = len(df_copy) // 2 | |
| # Ensure all values are numeric | |
| numeric_cols = ['Concentration (%w/v)', 'Flow Rate (mL/h)', 'Voltage (kV)', 'Solvent'] | |
| for col in numeric_cols: | |
| df_copy[col] = pd.to_numeric(df_copy[col]) | |
| X_human = df_copy[numeric_cols].values | |
| X_human_init = X_init.copy() | |
| Y_human_init = Y_init.copy() | |
| best_human_distance = [] | |
| for iter in range(ROUNDS + 1): | |
| Y_distance = -np.abs(Y_human_init[:, 0] - TARGET_SIZE) | |
| best_human_distance.append(np.ma.masked_array(Y_distance, mask=~Y_human_init[:, 1].astype(bool)).max()) | |
| # Check if we have more data for this iteration | |
| if 2 * iter < len(X_human): | |
| # Get the slice of new experiments | |
| new_x = X_human[2 * iter:min(2 * (iter + 1), len(X_human))] | |
| # Add the new experiments to our dataset | |
| X_human_init = np.vstack([X_human_init, new_x]) | |
| Y_human_init = np.vstack([Y_human_init, sim_espray_constrained(new_x)]) | |
| return -np.array(best_human_distance) | |
| def plot_results(exp_data_df): | |
| # Extract human performance | |
| best_human_distance = calc_human_performance(exp_data_df) | |
| # Import results | |
| best_distances_df_long = import_results() | |
| fig = go.Figure() | |
| strategies = best_distances_df_long['strategy'].unique() | |
| for strategy in strategies: | |
| strategy_data = best_distances_df_long[best_distances_df_long['strategy'] == strategy] | |
| # Calculate mean and standard deviation | |
| mean_regret = strategy_data.groupby('iteration')['regret'].mean() | |
| std_regret = strategy_data.groupby('iteration')['regret'].std() | |
| iterations = mean_regret.index | |
| color = px.colors.qualitative.Set2[strategies.tolist().index(strategy)] | |
| # Add trace for mean line | |
| mean_trace = go.Scatter( | |
| x=iterations, | |
| y=mean_regret, | |
| mode='lines', | |
| name=strategy, | |
| line=dict(width=2, color=color) | |
| ) | |
| fig.add_trace(mean_trace) | |
| # Add trace for shaded area (standard deviation) | |
| fig.add_trace(go.Scatter( | |
| x=list(iterations) + list(iterations[::-1]), | |
| y=list(mean_regret + std_regret) + list((mean_regret - std_regret)[::-1]), | |
| fill='toself', | |
| fillcolor=mean_trace.line.color.replace('rgb', 'rgba').replace(')', ',0.2)'), | |
| line=dict(color='rgba(255,255,255,0)'), | |
| showlegend=False, | |
| name=f'{strategy} (std dev)' | |
| )) | |
| # Add trace for human performance | |
| fig.add_trace(go.Scatter( | |
| x=list(range(len(best_human_distance))), | |
| y=best_human_distance, | |
| mode='lines+markers', | |
| name='Human', | |
| line=dict(width=2, color='brown') | |
| )) | |
| fig.update_layout( | |
| title='Performance Comparison', | |
| xaxis_title='Iteration', | |
| yaxis_title='Regret (μm)', | |
| legend_title='Strategy', | |
| template='plotly_white', | |
| legend=dict( | |
| x=0.01, | |
| y=0.01, | |
| bgcolor='rgba(255, 255, 255, 0.5)', | |
| bordercolor='rgba(0, 0, 0, 0.5)', | |
| borderwidth=1 | |
| ) | |
| ) | |
| return fig | |
| # Add function to calculate AUC | |
| def calculate_auc(human_performance_values): | |
| """Calculate the Area Under the Curve for a user's performance""" | |
| # Simple trapezoidal integration | |
| if len(human_performance_values) <= 1: | |
| return 0 | |
| # AUC calculation using trapezoidal rule | |
| auc_value = np.trapezoid(human_performance_values, dx=1) | |
| return round(auc_value, 4) | |
| # Prediction function - simplified signature by removing unnecessary text params | |
| def predict(state, conc1, flow_rate1, voltage1, solvent1, conc2, flow_rate2, voltage2, solvent2): | |
| # Get current results storage from state or initialize if None | |
| if state is None: | |
| results_storage = pd.DataFrame(columns=['Concentration (%w/v)', 'Flow Rate (mL/h)', 'Voltage (kV)', 'Solvent', 'Size (um)', 'Feasible?']) | |
| else: | |
| results_storage = state.copy() | |
| solvent_value1 = 0 if solvent1 == 'DMAc' else 1 | |
| solvent_value2 = 0 if solvent2 == 'DMAc' else 1 | |
| # Process inputs and get predictions | |
| inputs1 = np.array([[conc1, flow_rate1, voltage1, solvent_value1]]) | |
| inputs2 = np.array([[conc2, flow_rate2, voltage2, solvent_value2]]) | |
| results1 = sim_espray_constrained(inputs1) | |
| results2 = sim_espray_constrained(inputs2) | |
| # Format and store results | |
| results_df = pd.DataFrame([ | |
| [conc1, flow_rate1, voltage1, solvent_value1, results1[0, 0], results1[0, 1]], | |
| [conc2, flow_rate2, voltage2, solvent_value2, results2[0, 0], results2[0, 1]] | |
| ], columns=['Concentration (%w/v)', 'Flow Rate (mL/h)', 'Voltage (kV)', 'Solvent', 'Size (um)', 'Feasible?']) | |
| results_df['Solvent'] = ['DMAc' if x == 0 else 'CHCl3' for x in results_df['Solvent']] | |
| results_df['Feasible?'] = ['Success' if x == 1 else 'Failed' for x in results_df['Feasible?']] | |
| results_storage = pd.concat([results_storage, results_df], ignore_index=True) | |
| # Apply each styling function to its specific column | |
| results_display = results_storage.style\ | |
| .map(style_feasible_column, subset=['Feasible?'])\ | |
| .map(style_size_column, subset=['Size (um)'])\ | |
| .format(precision=3) | |
| # Check if user has completed 5 rounds (10 experiments) | |
| completed = len(results_storage) >= 10 | |
| message = "" | |
| auc_value = 0 | |
| usr_level = "" | |
| if completed: | |
| # Calculate AUC | |
| human_performance = calc_human_performance(results_storage) | |
| auc_value = calculate_auc(human_performance) | |
| # beginner = 2.62, intermediate = 1.60, expert = 1.03 | |
| if auc_value > 4.10: | |
| usr_level = "randomly playing!" | |
| elif auc_value > 1.80: | |
| usr_level = "a beginner." | |
| elif auc_value > 1.30: | |
| usr_level = "an intermediate user." | |
| elif auc_value > 0.90: | |
| usr_level = "an advanced user." | |
| else: | |
| usr_level = "... come on, you must have cheated (or you are extremely lucky)!" | |
| message = f"🎉 Congratulations! You've completed all 5 rounds.Your performance AUC is ** {auc_value:.2f} ** and CCBO was ** 1.40 **. You seems to be {usr_level} Now you can download your results or try again!" | |
| # Return updated state and UI updates | |
| return ( | |
| results_storage, | |
| gr.DataFrame(value=prior_experiments_display, label="Prior Experiments"), | |
| gr.DataFrame(value=results_display, label="Your Results"), | |
| plot_results(results_storage), | |
| gr.update(visible=completed), # Show download button when completed | |
| gr.update(value=message, visible=completed), # Show message when completed | |
| gr.update(value=auc_value), # Update AUC value | |
| gr.update(visible=completed) # Show result file component | |
| ) | |
| # Reset results function | |
| def reset_results(state): | |
| results_storage = pd.DataFrame(columns=['Concentration (%w/v)', 'Flow Rate (mL/h)', 'Voltage (kV)', 'Solvent', 'Size (um)', 'Feasible?']) | |
| # Generate the plot for empty results | |
| empty_plot = plot_results(results_storage) | |
| # Apply each styling function to its specific column | |
| styled_results = results_storage.style\ | |
| .map(style_feasible_column, subset=['Feasible?'])\ | |
| .map(style_size_column, subset=['Size (um)'])\ | |
| .format(precision=3) | |
| return ( | |
| results_storage, # results_state | |
| gr.DataFrame(value=prior_experiments_display, label="Prior Experiments"), # prior_experiments | |
| gr.DataFrame(value=styled_results, label="Your Results"), # results_df | |
| empty_plot, # perf_plot | |
| gr.update(visible=False), # download_btn visibility | |
| gr.update(value="", visible=False), # completion_message | |
| gr.update(value=0), # auc_state | |
| gr.update(visible=False) # result_file visibility | |
| ) | |
| # Function to prepare results for download | |
| def prepare_results_for_download(results): | |
| """Prepare results dataframe for download and save to CSV""" | |
| if results is None or len(results) == 0: | |
| return None | |
| # Calculate human performance | |
| human_performance = calc_human_performance(results) | |
| auc_value = calculate_auc(human_performance) | |
| # Add a summary row with AUC | |
| summary_df = pd.DataFrame({ | |
| 'Concentration (%w/v)': ["Performance AUC:"], | |
| 'Flow Rate (mL/h)': [auc_value], | |
| 'Voltage (kV)': [""], | |
| 'Solvent': [""], | |
| 'Size (um)': [""], | |
| 'Feasible?': [""] | |
| }) | |
| # Combine results with summary | |
| combined_df = pd.concat([results, summary_df], ignore_index=True) | |
| # Save to temporary file | |
| temp_dir = tempfile.gettempdir() | |
| output_path = os.path.join(temp_dir, "electrospray_results.csv") | |
| combined_df.to_csv(output_path, index=False) | |
| return output_path | |
| # Application description | |
| description = "<h3>Welcome, challenger! 🎉</h3><p> If you think you may perform better than <strong>CCBO</strong>, try this interactive game to optimize electrospray!</p><p> Rules are simple:</p> <ul><li>🔍 Examine! The initial experiments are on the right (or below on your phone), remember, your target size is <u><i><strong>3.000 um</strong></i></u></li><li>⚠️ Be aware! Experiment may <u><i><strong>fail</strong></i></u> due to incompatible parameters. You want to avoid those conditions.</li><li>💡 Propose! Set your parameters, you have <strong>2</strong> chances in each round, use them wisely!</li><li>🚀 <strong>Submit</strong> to see the results, reflect and improve your selection!</li><li>🔄 Repeat! Run the process for <strong>5</strong> rounds to see if you can beat CCBO!</li></ul></p><p>Your data will not be stored, so feel free to play again, good luck! 🍀</p><p>Impressed by CCBO? Check our <a href='https://github.com/FrankWanger/CCBO'>implementation</a> and <a href='https://arxiv.org/abs/2411.10471'>paper!</a></p>" | |
| # Create Gradio interface | |
| with gr.Blocks() as demo: | |
| # Add state component to store user-specific results | |
| results_state = gr.State() | |
| auc_state = gr.State(value=0) | |
| with gr.Row(): | |
| # Input parameters column | |
| with gr.Column(): | |
| gr.Markdown("## Human vs CCBO Campaign - Optimize Electrospray") | |
| gr.Markdown(description) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Experiment 1") | |
| conc1 = gr.Slider(minimum=0.05, maximum=5.0, value=1.2, step=0.001, label="Concentration (%w/v)") | |
| flow_rate1 = gr.Slider(minimum=0.01, maximum=60.0, value=20.0, step=0.001, label="Flow Rate (mL/h)") | |
| voltage1 = gr.Slider(minimum=10.0, maximum=18.0, value=15.0, step=0.001, label="Voltage (kV)") | |
| solvent1 = gr.Dropdown(['DMAc', 'CHCl3'], value='DMAc', label='Solvent') | |
| with gr.Column(): | |
| gr.Markdown("### Experiment 2") | |
| conc2 = gr.Slider(minimum=0.05, maximum=5.0, value=2.8, step=0.001, label="Concentration (%w/v)") | |
| flow_rate2 = gr.Slider(minimum=0.01, maximum=60.0, value=20.0, step=0.001, label="Flow Rate (mL/h)") | |
| voltage2 = gr.Slider(minimum=10.0, maximum=18.0, value=15.0, step=0.001, label="Voltage (kV)") | |
| solvent2 = gr.Dropdown(['DMAc', 'CHCl3'], value='CHCl3', label='Solvent') | |
| # Group all buttons in a single row | |
| with gr.Row(): | |
| #make submit btn highlight color | |
| submit_btn = gr.Button("🚀 Submit", variant="primary") | |
| reset_btn = gr.Button("Reset") | |
| download_btn = gr.Button("📥 Download Results", visible=False) | |
| # Moved file output component below the buttons | |
| result_file = gr.File(label="Download Results CSV", visible=False) | |
| # Add notification component (initially hidden) | |
| completion_message = gr.Markdown(visible=False) | |
| # Results display column | |
| with gr.Column(): | |
| prior_experiments = gr.DataFrame(value=prior_experiments_display, label="Prior Experiments") | |
| results_df = gr.DataFrame(label="Your Results") | |
| perf_plot = gr.Plot(label="Performance Comparison") | |
| # Connect the submit button to the predict function | |
| submit_btn.click( | |
| fn=predict, | |
| inputs=[ | |
| results_state, | |
| conc1, flow_rate1, voltage1, solvent1, | |
| conc2, flow_rate2, voltage2, solvent2 | |
| ], | |
| outputs=[ | |
| results_state, prior_experiments, results_df, perf_plot, | |
| download_btn, completion_message, auc_state, result_file | |
| ] | |
| ) | |
| # Connect the reset button to the reset_results function | |
| reset_btn.click( | |
| fn=reset_results, | |
| inputs=[results_state], | |
| outputs=[ | |
| results_state, prior_experiments, results_df, perf_plot, | |
| download_btn, completion_message, auc_state, result_file | |
| ] | |
| ) | |
| # Connect download button to file download | |
| download_btn.click( | |
| fn=prepare_results_for_download, | |
| inputs=[results_state], | |
| outputs=[result_file] | |
| ) | |
| demo.launch() |