|
|
|
import numpy as np |
|
import pandas as pd |
|
import matplotlib |
|
matplotlib.use('Agg') |
|
import matplotlib.pyplot as plt |
|
from PIL import Image |
|
import io |
|
import json |
|
import traceback |
|
|
|
|
|
from models import BioprocessModel |
|
|
|
|
|
|
|
USE_MODAL_FOR_LLM_ANALYSIS = False |
|
generate_analysis_from_modal = None |
|
|
|
def create_error_image(message="Error", width=600, height=400): |
|
"""Crea una imagen PIL simple para mostrar mensajes de error.""" |
|
img = Image.new('RGB', (width, height), color = (255, 200, 200)) |
|
|
|
|
|
|
|
|
|
|
|
print(f"Generando imagen de error: {message}") |
|
return img |
|
|
|
def parse_bounds_str(bounds_str_input, num_params): |
|
bounds_str = str(bounds_str_input).strip() |
|
if not bounds_str: |
|
print(f"Cadena de límites vacía para {num_params} params. Usando (-inf, inf).") |
|
return [-np.inf] * num_params, [np.inf] * num_params |
|
try: |
|
bounds_str = bounds_str.lower().replace('inf', 'np.inf').replace('none', 'None') |
|
if not (bounds_str.startswith('[') and bounds_str.endswith(']')): |
|
bounds_str = f"[{bounds_str}]" |
|
parsed_bounds_list = eval(bounds_str, {'np': np, 'inf': np.inf, 'None': None}) |
|
|
|
if not isinstance(parsed_bounds_list, list): |
|
raise ValueError("Cadena de límites no evaluó a una lista.") |
|
if len(parsed_bounds_list) != num_params: |
|
raise ValueError(f"Num límites ({len(parsed_bounds_list)}) != num params ({num_params}).") |
|
|
|
lower_bounds, upper_bounds = [], [] |
|
for item in parsed_bounds_list: |
|
if not (isinstance(item, (tuple, list)) and len(item) == 2): |
|
raise ValueError(f"Límite debe ser (low, high). Se encontró: {item}") |
|
low = -np.inf if (item[0] is None or (isinstance(item[0], float) and np.isnan(item[0]))) else float(item[0]) |
|
high = np.inf if (item[1] is None or (isinstance(item[1], float) and np.isnan(item[1]))) else float(item[1]) |
|
lower_bounds.append(low); upper_bounds.append(high) |
|
return lower_bounds, upper_bounds |
|
except Exception as e: |
|
print(f"Error al parsear límites '{bounds_str_input}': {e}. Usando por defecto (-inf, inf).") |
|
return [-np.inf] * num_params, [np.inf] * num_params |
|
|
|
def call_llm_analysis_service(prompt: str) -> str: |
|
"""Llama al servicio LLM (ya sea localmente o a través de Modal).""" |
|
|
|
if USE_MODAL_FOR_LLM_ANALYSIS and generate_analysis_from_modal: |
|
print("interface.py: Usando la función de análisis LLM de Modal...") |
|
try: |
|
return generate_analysis_from_modal(prompt) |
|
except Exception as e_modal_call: |
|
print(f"Error llamando a la función Modal LLM: {e_modal_call}") |
|
traceback.print_exc() |
|
return f"Error al contactar el servicio de análisis IA (Modal): {e_modal_call}" |
|
else: |
|
print("interface.py: Usando la función de análisis LLM local (fallback)...") |
|
|
|
try: |
|
from config import MODEL_PATH, MAX_LENGTH, DEVICE |
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
import torch |
|
|
|
print(f"Fallback: Cargando modelo {MODEL_PATH} localmente en {DEVICE}...") |
|
tokenizer_local = AutoTokenizer.from_pretrained(MODEL_PATH) |
|
model_local = AutoModelForCausalLM.from_pretrained(MODEL_PATH).to(DEVICE) |
|
|
|
model_context_window = getattr(model_local.config, 'max_position_embeddings', getattr(model_local.config, 'sliding_window', 4096)) |
|
max_prompt_len = model_context_window - MAX_LENGTH - 50 |
|
if max_prompt_len <= 0 : max_prompt_len = model_context_window // 2 |
|
|
|
inputs = tokenizer_local(prompt, return_tensors="pt", truncation=True, max_length=max_prompt_len).to(DEVICE) |
|
with torch.no_grad(): |
|
outputs = model_local.generate( |
|
**inputs, max_new_tokens=MAX_LENGTH, |
|
eos_token_id=tokenizer_local.eos_token_id, |
|
pad_token_id=tokenizer_local.pad_token_id if tokenizer_local.pad_token_id else tokenizer_local.eos_token_id, |
|
do_sample=True, temperature=0.6, top_p=0.9 |
|
) |
|
input_len = inputs.input_ids.shape[1] |
|
analysis = tokenizer_local.decode(outputs[0][input_len:], skip_special_tokens=True) |
|
return analysis.strip() |
|
except Exception as e_local_llm: |
|
print(f"Error en el fallback LLM local: {e_local_llm}") |
|
traceback.print_exc() |
|
return f"Análisis (fallback local): Error al cargar/ejecutar modelo LLM local: {e_local_llm}." |
|
|
|
|
|
def process_and_plot( |
|
file_obj, |
|
biomass_eq1_ui, biomass_eq2_ui, biomass_eq3_ui, |
|
biomass_param1_ui, biomass_param2_ui, biomass_param3_ui, |
|
biomass_bound1_ui, biomass_bound2_ui, biomass_bound3_ui, |
|
substrate_eq1_ui, substrate_eq2_ui, substrate_eq3_ui, |
|
substrate_param1_ui, substrate_param2_ui, substrate_param3_ui, |
|
substrate_bound1_ui, substrate_bound2_ui, substrate_bound3_ui, |
|
product_eq1_ui, product_eq2_ui, product_eq3_ui, |
|
product_param1_ui, product_param2_ui, product_param3_ui, |
|
product_bound1_ui, product_bound2_ui, product_bound3_ui, |
|
legend_position_ui, |
|
show_legend_ui, |
|
show_params_ui, |
|
biomass_eq_count_ui, |
|
substrate_eq_count_ui, |
|
product_eq_count_ui |
|
): |
|
|
|
error_img = create_error_image("Error en procesamiento") |
|
error_analysis_text = "No se pudo generar el análisis debido a un error." |
|
|
|
try: |
|
if file_obj is None: |
|
return error_img, "Error: Por favor, sube un archivo Excel." |
|
|
|
try: |
|
df = pd.read_excel(file_obj.name) |
|
except Exception as e: |
|
return error_img, f"Error al leer el archivo Excel: {e}\n{traceback.format_exc()}" |
|
|
|
expected_cols = ['Tiempo', 'Biomasa', 'Sustrato', 'Producto'] |
|
for col in expected_cols: |
|
if col not in df.columns: |
|
return error_img, f"Error: La columna '{col}' no se encuentra en el archivo Excel." |
|
|
|
time_data = df['Tiempo'].values |
|
biomass_data_exp = df['Biomasa'].values |
|
substrate_data_exp = df['Sustrato'].values |
|
product_data_exp = df['Producto'].values |
|
|
|
|
|
try: |
|
active_biomass_eqs = int(float(biomass_eq_count_ui)) |
|
active_substrate_eqs = int(float(substrate_eq_count_ui)) |
|
active_product_eqs = int(float(product_eq_count_ui)) |
|
except (TypeError, ValueError): |
|
return error_img, "Error: Número de ecuaciones inválido." |
|
|
|
|
|
all_eq_inputs = { |
|
'biomass': ( |
|
[biomass_eq1_ui, biomass_eq2_ui, biomass_eq3_ui][:active_biomass_eqs], |
|
[biomass_param1_ui, biomass_param2_ui, biomass_param3_ui][:active_biomass_eqs], |
|
[biomass_bound1_ui, biomass_bound2_ui, biomass_bound3_ui][:active_biomass_eqs], |
|
biomass_data_exp |
|
), |
|
'substrate': ( |
|
[substrate_eq1_ui, substrate_eq2_ui, substrate_eq3_ui][:active_substrate_eqs], |
|
[substrate_param1_ui, substrate_param2_ui, substrate_param3_ui][:active_substrate_eqs], |
|
[substrate_bound1_ui, substrate_bound2_ui, substrate_bound3_ui][:active_substrate_eqs], |
|
substrate_data_exp |
|
), |
|
'product': ( |
|
[product_eq1_ui, product_eq2_ui, product_eq3_ui][:active_product_eqs], |
|
[product_param1_ui, product_param2_ui, product_param3_ui][:active_product_eqs], |
|
[product_bound1_ui, product_bound2_ui, product_bound3_ui][:active_product_eqs], |
|
product_data_exp |
|
) |
|
} |
|
|
|
model_handler = BioprocessModel() |
|
|
|
fitted_results_for_plot = {'biomass': [], 'substrate': [], 'product': []} |
|
results_for_llm_prompt = {'biomass': [], 'substrate': [], 'product': []} |
|
biomass_params_for_s_p = None |
|
|
|
for model_type, (eq_list, param_str_list, bound_str_list, exp_data) in all_eq_inputs.items(): |
|
if not (isinstance(exp_data, np.ndarray) and exp_data.size > 0 and np.any(np.isfinite(exp_data))): |
|
print(f"Datos experimentales para {model_type} no válidos o vacíos, saltando ajuste.") |
|
continue |
|
|
|
for i in range(len(eq_list)): |
|
eq_str, param_s, bound_s = eq_list[i], param_str_list[i], bound_str_list[i] |
|
if not eq_str or not param_s: continue |
|
|
|
try: |
|
model_handler.set_model(model_type, eq_str, param_s) |
|
num_p = len(model_handler.models[model_type]['params']) |
|
l_b, u_b = parse_bounds_str(bound_s, num_p) |
|
current_biomass_p = biomass_params_for_s_p if model_type in ['substrate', 'product'] else None |
|
|
|
y_pred, popt = model_handler.fit_model(model_type, time_data, exp_data, bounds=(l_b, u_b), biomass_params_fitted=current_biomass_p) |
|
|
|
current_params = model_handler.params.get(model_type, {}) |
|
r2_val = model_handler.r2.get(model_type, float('nan')) |
|
rmse_val = model_handler.rmse.get(model_type, float('nan')) |
|
|
|
fitted_results_for_plot[model_type].append({'equation': eq_str, 'y_pred': y_pred, 'params': current_params, 'R2': r2_val}) |
|
results_for_llm_prompt[model_type].append({'equation': eq_str, 'params_fitted': current_params, 'R2': r2_val, 'RMSE': rmse_val}) |
|
|
|
if model_type == 'biomass' and biomass_params_for_s_p is None and current_params: |
|
biomass_params_for_s_p = current_params |
|
except Exception as e_fit: |
|
error_msg = f"Error ajustando {model_type} #{i+1} ('{eq_str}'): {e_fit}\n{traceback.format_exc()}" |
|
print(error_msg); return error_img, error_msg |
|
|
|
|
|
fig, axs = plt.subplots(3, 1, figsize=(10, 18), sharex=True) |
|
plot_config_map = { |
|
axs[0]: (biomass_data_exp, 'Biomasa', fitted_results_for_plot['biomass']), |
|
axs[1]: (substrate_data_exp, 'Sustrato', fitted_results_for_plot['sustrato']), |
|
axs[2]: (product_data_exp, 'Producto', fitted_results_for_plot['producto']) |
|
} |
|
|
|
for ax, data_actual, ylabel, plot_results in plot_config_map.items(): |
|
if isinstance(data_actual, np.ndarray) and data_actual.size > 0 and np.any(np.isfinite(data_actual)): |
|
ax.plot(time_data, data_actual, 'o', label=f'Datos {ylabel}', markersize=5, alpha=0.7) |
|
else: |
|
ax.text(0.5, 0.5, f"No hay datos para {ylabel}", transform=ax.transAxes, ha='center', va='center') |
|
|
|
for idx, res_detail in enumerate(plot_results): |
|
label = f'Modelo {idx+1} (R²:{res_detail.get("R2", float("nan")):.3f})' |
|
ax.plot(time_data, res_detail['y_pred'], '-', label=label, linewidth=2) |
|
ax.set_xlabel('Tiempo'); ax.set_ylabel(ylabel); ax.grid(True, linestyle=':', alpha=0.7) |
|
if show_legend_ui: ax.legend(loc=legend_position_ui, fontsize='small') |
|
|
|
if show_params_ui and plot_results: |
|
param_display_texts = [f"Modelo {idx+1}:\n" + "\n".join([f" {k}: {v:.4g}" for k,v in res_detail.get('params',{}).items()]) for idx, res_detail in enumerate(plot_results)] |
|
ax.text(0.02, 0.98 if not ('upper' in legend_position_ui) else 0.02, "\n---\n".join(param_display_texts), |
|
transform=ax.transAxes, fontsize=7, verticalalignment='top' if not ('upper' in legend_position_ui) else 'bottom', |
|
bbox=dict(boxstyle='round,pad=0.3', fc='lightyellow', alpha=0.8)) |
|
|
|
plt.tight_layout(rect=[0, 0, 1, 0.96]); fig.suptitle("Resultados del Ajuste de Modelos Cinéticos", fontsize=16) |
|
buf = io.BytesIO(); plt.savefig(buf, format='png', dpi=150); buf.seek(0) |
|
image_pil = Image.open(buf); plt.close(fig) |
|
|
|
|
|
prompt_intro = "Eres un experto en modelado cinético de bioprocesos...\n\n" |
|
prompt_details = json.dumps(results_for_llm_prompt, indent=2, ensure_ascii=False) |
|
prompt_instructions = "\n\nPor favor, proporciona un análisis detallado...\n" |
|
full_prompt = prompt_intro + prompt_details + prompt_instructions |
|
analysis_text_llm = call_llm_analysis_service(full_prompt) |
|
|
|
return image_pil, analysis_text_llm |
|
|
|
except Exception as general_e: |
|
error_trace = traceback.format_exc() |
|
error_message_full = f"Error inesperado en process_and_plot: {general_e}\n{error_trace}" |
|
print(error_message_full) |
|
return create_error_image(f"Error: {general_e}"), error_message_full |