BioRAG / interface.py
C2MV's picture
Upload 2 files
a622789 verified
raw
history blame
13.7 kB
# interface.py
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('Agg') # Backend no interactivo
import matplotlib.pyplot as plt
from PIL import Image
import io
import json
import traceback # Para traceback detallado
# Importar BioprocessModel de TU models.py (el que usa sympy)
from models import BioprocessModel
# from decorators import gpu_decorator # No es necesario con Modal
# Variables globales que serán "inyectadas"
USE_MODAL_FOR_LLM_ANALYSIS = False
generate_analysis_from_modal = None
def create_error_image(message="Error", width=600, height=400):
"""Crea una imagen PIL simple para mostrar mensajes de error."""
img = Image.new('RGB', (width, height), color = (255, 200, 200)) # Fondo rojo claro
# No podemos dibujar texto fácilmente sin Pillow-SIMD o dependencias de dibujo complejas.
# Una imagen simple es suficiente para indicar un error.
# from PIL import ImageDraw
# d = ImageDraw.Draw(img)
# d.text((10,10), message, fill=(0,0,0)) # Esto requeriría una fuente
print(f"Generando imagen de error: {message}")
return img
def parse_bounds_str(bounds_str_input, num_params):
bounds_str = str(bounds_str_input).strip()
if not bounds_str:
print(f"Cadena de límites vacía para {num_params} params. Usando (-inf, inf).")
return [-np.inf] * num_params, [np.inf] * num_params
try:
bounds_str = bounds_str.lower().replace('inf', 'np.inf').replace('none', 'None')
if not (bounds_str.startswith('[') and bounds_str.endswith(']')):
bounds_str = f"[{bounds_str}]"
parsed_bounds_list = eval(bounds_str, {'np': np, 'inf': np.inf, 'None': None})
if not isinstance(parsed_bounds_list, list):
raise ValueError("Cadena de límites no evaluó a una lista.")
if len(parsed_bounds_list) != num_params:
raise ValueError(f"Num límites ({len(parsed_bounds_list)}) != num params ({num_params}).")
lower_bounds, upper_bounds = [], []
for item in parsed_bounds_list:
if not (isinstance(item, (tuple, list)) and len(item) == 2):
raise ValueError(f"Límite debe ser (low, high). Se encontró: {item}")
low = -np.inf if (item[0] is None or (isinstance(item[0], float) and np.isnan(item[0]))) else float(item[0])
high = np.inf if (item[1] is None or (isinstance(item[1], float) and np.isnan(item[1]))) else float(item[1])
lower_bounds.append(low); upper_bounds.append(high)
return lower_bounds, upper_bounds
except Exception as e:
print(f"Error al parsear límites '{bounds_str_input}': {e}. Usando por defecto (-inf, inf).")
return [-np.inf] * num_params, [np.inf] * num_params
def call_llm_analysis_service(prompt: str) -> str:
"""Llama al servicio LLM (ya sea localmente o a través de Modal)."""
# ... (sin cambios respecto a la versión anterior completa)
if USE_MODAL_FOR_LLM_ANALYSIS and generate_analysis_from_modal:
print("interface.py: Usando la función de análisis LLM de Modal...")
try:
return generate_analysis_from_modal(prompt)
except Exception as e_modal_call:
print(f"Error llamando a la función Modal LLM: {e_modal_call}")
traceback.print_exc()
return f"Error al contactar el servicio de análisis IA (Modal): {e_modal_call}"
else:
print("interface.py: Usando la función de análisis LLM local (fallback)...")
# Implementación de fallback local (como en la respuesta anterior)
try:
from config import MODEL_PATH, MAX_LENGTH, DEVICE
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch # Asegurar importación de torch para fallback
print(f"Fallback: Cargando modelo {MODEL_PATH} localmente en {DEVICE}...")
tokenizer_local = AutoTokenizer.from_pretrained(MODEL_PATH)
model_local = AutoModelForCausalLM.from_pretrained(MODEL_PATH).to(DEVICE)
model_context_window = getattr(model_local.config, 'max_position_embeddings', getattr(model_local.config, 'sliding_window', 4096))
max_prompt_len = model_context_window - MAX_LENGTH - 50
if max_prompt_len <= 0 : max_prompt_len = model_context_window // 2
inputs = tokenizer_local(prompt, return_tensors="pt", truncation=True, max_length=max_prompt_len).to(DEVICE)
with torch.no_grad():
outputs = model_local.generate(
**inputs, max_new_tokens=MAX_LENGTH,
eos_token_id=tokenizer_local.eos_token_id,
pad_token_id=tokenizer_local.pad_token_id if tokenizer_local.pad_token_id else tokenizer_local.eos_token_id,
do_sample=True, temperature=0.6, top_p=0.9
)
input_len = inputs.input_ids.shape[1]
analysis = tokenizer_local.decode(outputs[0][input_len:], skip_special_tokens=True)
return analysis.strip()
except Exception as e_local_llm:
print(f"Error en el fallback LLM local: {e_local_llm}")
traceback.print_exc()
return f"Análisis (fallback local): Error al cargar/ejecutar modelo LLM local: {e_local_llm}."
def process_and_plot(
file_obj,
biomass_eq1_ui, biomass_eq2_ui, biomass_eq3_ui,
biomass_param1_ui, biomass_param2_ui, biomass_param3_ui,
biomass_bound1_ui, biomass_bound2_ui, biomass_bound3_ui,
substrate_eq1_ui, substrate_eq2_ui, substrate_eq3_ui,
substrate_param1_ui, substrate_param2_ui, substrate_param3_ui,
substrate_bound1_ui, substrate_bound2_ui, substrate_bound3_ui,
product_eq1_ui, product_eq2_ui, product_eq3_ui,
product_param1_ui, product_param2_ui, product_param3_ui,
product_bound1_ui, product_bound2_ui, product_bound3_ui,
legend_position_ui,
show_legend_ui,
show_params_ui,
biomass_eq_count_ui,
substrate_eq_count_ui,
product_eq_count_ui
):
# Imagen y texto de error por defecto
error_img = create_error_image("Error en procesamiento")
error_analysis_text = "No se pudo generar el análisis debido a un error."
try:
if file_obj is None:
return error_img, "Error: Por favor, sube un archivo Excel."
try:
df = pd.read_excel(file_obj.name)
except Exception as e:
return error_img, f"Error al leer el archivo Excel: {e}\n{traceback.format_exc()}"
expected_cols = ['Tiempo', 'Biomasa', 'Sustrato', 'Producto']
for col in expected_cols:
if col not in df.columns:
return error_img, f"Error: La columna '{col}' no se encuentra en el archivo Excel."
time_data = df['Tiempo'].values
biomass_data_exp = df['Biomasa'].values
substrate_data_exp = df['Sustrato'].values
product_data_exp = df['Producto'].values
# Asegurar que los contadores sean enteros válidos
try:
active_biomass_eqs = int(float(biomass_eq_count_ui))
active_substrate_eqs = int(float(substrate_eq_count_ui))
active_product_eqs = int(float(product_eq_count_ui))
except (TypeError, ValueError):
return error_img, "Error: Número de ecuaciones inválido."
all_eq_inputs = {
'biomass': (
[biomass_eq1_ui, biomass_eq2_ui, biomass_eq3_ui][:active_biomass_eqs],
[biomass_param1_ui, biomass_param2_ui, biomass_param3_ui][:active_biomass_eqs],
[biomass_bound1_ui, biomass_bound2_ui, biomass_bound3_ui][:active_biomass_eqs],
biomass_data_exp
),
'substrate': (
[substrate_eq1_ui, substrate_eq2_ui, substrate_eq3_ui][:active_substrate_eqs],
[substrate_param1_ui, substrate_param2_ui, substrate_param3_ui][:active_substrate_eqs],
[substrate_bound1_ui, substrate_bound2_ui, substrate_bound3_ui][:active_substrate_eqs],
substrate_data_exp
),
'product': (
[product_eq1_ui, product_eq2_ui, product_eq3_ui][:active_product_eqs],
[product_param1_ui, product_param2_ui, product_param3_ui][:active_product_eqs],
[product_bound1_ui, product_bound2_ui, product_bound3_ui][:active_product_eqs],
product_data_exp
)
}
model_handler = BioprocessModel()
fitted_results_for_plot = {'biomass': [], 'substrate': [], 'product': []}
results_for_llm_prompt = {'biomass': [], 'substrate': [], 'product': []}
biomass_params_for_s_p = None
for model_type, (eq_list, param_str_list, bound_str_list, exp_data) in all_eq_inputs.items():
if not (isinstance(exp_data, np.ndarray) and exp_data.size > 0 and np.any(np.isfinite(exp_data))):
print(f"Datos experimentales para {model_type} no válidos o vacíos, saltando ajuste.")
continue
for i in range(len(eq_list)):
eq_str, param_s, bound_s = eq_list[i], param_str_list[i], bound_str_list[i]
if not eq_str or not param_s: continue
try:
model_handler.set_model(model_type, eq_str, param_s)
num_p = len(model_handler.models[model_type]['params'])
l_b, u_b = parse_bounds_str(bound_s, num_p)
current_biomass_p = biomass_params_for_s_p if model_type in ['substrate', 'product'] else None
y_pred, popt = model_handler.fit_model(model_type, time_data, exp_data, bounds=(l_b, u_b), biomass_params_fitted=current_biomass_p)
current_params = model_handler.params.get(model_type, {}) # Obtener params del handler
r2_val = model_handler.r2.get(model_type, float('nan'))
rmse_val = model_handler.rmse.get(model_type, float('nan'))
fitted_results_for_plot[model_type].append({'equation': eq_str, 'y_pred': y_pred, 'params': current_params, 'R2': r2_val})
results_for_llm_prompt[model_type].append({'equation': eq_str, 'params_fitted': current_params, 'R2': r2_val, 'RMSE': rmse_val})
if model_type == 'biomass' and biomass_params_for_s_p is None and current_params:
biomass_params_for_s_p = current_params
except Exception as e_fit:
error_msg = f"Error ajustando {model_type} #{i+1} ('{eq_str}'): {e_fit}\n{traceback.format_exc()}"
print(error_msg); return error_img, error_msg
# Generar gráfico
fig, axs = plt.subplots(3, 1, figsize=(10, 18), sharex=True)
plot_config_map = {
axs[0]: (biomass_data_exp, 'Biomasa', fitted_results_for_plot['biomass']),
axs[1]: (substrate_data_exp, 'Sustrato', fitted_results_for_plot['sustrato']),
axs[2]: (product_data_exp, 'Producto', fitted_results_for_plot['producto'])
}
for ax, data_actual, ylabel, plot_results in plot_config_map.items():
if isinstance(data_actual, np.ndarray) and data_actual.size > 0 and np.any(np.isfinite(data_actual)):
ax.plot(time_data, data_actual, 'o', label=f'Datos {ylabel}', markersize=5, alpha=0.7)
else:
ax.text(0.5, 0.5, f"No hay datos para {ylabel}", transform=ax.transAxes, ha='center', va='center')
for idx, res_detail in enumerate(plot_results):
label = f'Modelo {idx+1} (R²:{res_detail.get("R2", float("nan")):.3f})'
ax.plot(time_data, res_detail['y_pred'], '-', label=label, linewidth=2)
ax.set_xlabel('Tiempo'); ax.set_ylabel(ylabel); ax.grid(True, linestyle=':', alpha=0.7)
if show_legend_ui: ax.legend(loc=legend_position_ui, fontsize='small')
if show_params_ui and plot_results:
param_display_texts = [f"Modelo {idx+1}:\n" + "\n".join([f" {k}: {v:.4g}" for k,v in res_detail.get('params',{}).items()]) for idx, res_detail in enumerate(plot_results)]
ax.text(0.02, 0.98 if not ('upper' in legend_position_ui) else 0.02, "\n---\n".join(param_display_texts),
transform=ax.transAxes, fontsize=7, verticalalignment='top' if not ('upper' in legend_position_ui) else 'bottom',
bbox=dict(boxstyle='round,pad=0.3', fc='lightyellow', alpha=0.8))
plt.tight_layout(rect=[0, 0, 1, 0.96]); fig.suptitle("Resultados del Ajuste de Modelos Cinéticos", fontsize=16)
buf = io.BytesIO(); plt.savefig(buf, format='png', dpi=150); buf.seek(0)
image_pil = Image.open(buf); plt.close(fig)
# Construir prompt y llamar a LLM
prompt_intro = "Eres un experto en modelado cinético de bioprocesos...\n\n" # (como antes)
prompt_details = json.dumps(results_for_llm_prompt, indent=2, ensure_ascii=False)
prompt_instructions = "\n\nPor favor, proporciona un análisis detallado...\n" # (como antes)
full_prompt = prompt_intro + prompt_details + prompt_instructions
analysis_text_llm = call_llm_analysis_service(full_prompt)
return image_pil, analysis_text_llm
except Exception as general_e:
error_trace = traceback.format_exc()
error_message_full = f"Error inesperado en process_and_plot: {general_e}\n{error_trace}"
print(error_message_full)
return create_error_image(f"Error: {general_e}"), error_message_full