import os import sys import re import gradio as gr import json import tempfile import base64 import io from typing import List, Dict, Any, Optional, Tuple, Union import logging import pandas as pd import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots from flask import Flask, request, jsonify import uuid # Configuración de logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Flask app initialization flask_app = Flask(__name__) # Importaciones adicionales necesarias import os from dotenv import load_dotenv from sqlalchemy import create_engine, text from sqlalchemy.exc import SQLAlchemyError from langchain_google_genai import ChatGoogleGenerativeAI from langchain_community.agent_toolkits import create_sql_agent from langchain_community.utilities import SQLDatabase from langgraph.prebuilt import create_react_agent # Cargar variables de entorno load_dotenv() def initialize_llm(): """Inicializar el modelo LLM de Google Gemini.""" try: api_key = os.getenv('GOOGLE_API_KEY') if not api_key: return None, "No se encontró GOOGLE_API_KEY en las variables de entorno" llm = ChatGoogleGenerativeAI( model="gemini-2.0-flash", google_api_key=api_key, temperature=0.1, convert_system_message_to_human=True ) return llm, None except Exception as e: return None, str(e) def setup_database_connection(): """Configurar la conexión a la base de datos.""" try: # Obtener configuración de la base de datos db_user = os.getenv('DB_USER') db_password = os.getenv('DB_PASSWORD') db_host = os.getenv('DB_HOST', 'localhost') db_name = os.getenv('DB_NAME') if not all([db_user, db_password, db_name]): return None, "Faltan variables de entorno para la conexión a la base de datos" # Crear string de conexión connection_string = f"mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}" # Crear engine de SQLAlchemy y probar la conexión engine = create_engine(connection_string) with engine.connect() as conn: conn.execute(text("SELECT 1")) return connection_string, None except Exception as e: return None, str(e) def create_agent(llm, connection_string): """Crear el agente SQL.""" try: if not llm or not connection_string: return None, "LLM o conexión a base de datos no proporcionados" # Crear la base de datos SQL db = SQLDatabase.from_uri(connection_string) # Crear el agente SQL agent = create_sql_agent( llm=llm, db=db, agent_type="zero-shot-react-description", verbose=True, return_intermediate_steps=True ) return agent, None except Exception as e: return None, str(e) def stream_agent_response(question: str, chat_history: List[List[str]] = None) -> Tuple[str, Optional[go.Figure]]: """Procesar la respuesta del agente y generar visualizaciones si es necesario.""" try: # Inicializar componentes llm, llm_error = initialize_llm() if llm_error: return f"Error al inicializar LLM: {llm_error}", None connection_string, db_error = setup_database_connection() if db_error: return f"Error de conexión a base de datos: {db_error}", None agent, agent_error = create_agent(llm, connection_string) if agent_error: return f"Error al crear el agente: {agent_error}", None # Ejecutar la consulta response = agent.invoke({"input": question}) # Extraer la respuesta if hasattr(response, 'output'): response_text = response.output elif isinstance(response, dict) and 'output' in response: response_text = response['output'] else: response_text = str(response) # Verificar si hay resultados de SQL que podrían visualizarse chart_fig = None if hasattr(response, 'intermediate_steps'): for step in response.intermediate_steps: if len(step) > 1 and 'sql_query' in str(step[0]).lower(): # Intentar ejecutar la consulta y crear visualización try: query = str(step[0]).split('sql_query:')[1].split('\n')[0].strip() if 'SELECT' in query.upper(): df = pd.read_sql_query(query, create_engine(connection_string)) if len(df.columns) >= 2: fig = px.bar(df, x=df.columns[0], y=df.columns[1]) chart_fig = fig except: pass return response_text, chart_fig except Exception as e: return f"Error al procesar la solicitud: {str(e)}", None def create_ui(): """Crear la interfaz de usuario de Gradio.""" with gr.Blocks(title="🤖 Asistente SQL con Gemini", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🤖 Asistente de Base de Datos SQL") gr.Markdown("Pregunta cualquier cosa sobre tu base de datos en lenguaje natural") with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot( label="Chat", type="messages", height=400 ) with gr.Row(): question_input = gr.Textbox( label="Tu pregunta", placeholder="Ej: ¿Cuántos usuarios hay registrados?", lines=2, scale=4 ) submit_button = gr.Button("📤 Enviar", scale=1) with gr.Column(scale=1): chart_display = gr.Plot( label="Visualización de datos", height=400 ) # Campo oculto para el estado de salida streaming_output_display = gr.HTML(visible=False) return demo, chatbot, chart_display, question_input, submit_button, streaming_output_display # Almacenamiento en memoria de los mensajes message_store: Dict[str, str] = {} @flask_app.route('/user_message', methods=['POST']) def handle_user_message(): try: data = request.get_json() if not data or 'message' not in data: return jsonify({'error': 'Se requiere el campo message'}), 400 user_message = data['message'] # Generar un ID único para este mensaje message_id = str(uuid.uuid4()) # Almacenar el mensaje message_store[message_id] = user_message return jsonify({ 'message_id': message_id, 'status': 'success' }) except Exception as e: return jsonify({'error': str(e)}), 500 @flask_app.route('/ask', methods=['POST']) def handle_ask(): try: data = request.get_json() if not data or 'message_id' not in data: return jsonify({'error': 'Se requiere el campo message_id'}), 400 message_id = data['message_id'] # Recuperar el mensaje almacenado if message_id not in message_store: return jsonify({'error': 'ID de mensaje no encontrado'}), 404 user_message = message_store[message_id] # Inicializar componentes necesarios llm, llm_error = initialize_llm() if llm_error: return jsonify({'error': f'Error al inicializar LLM: {llm_error}'}), 500 connection_string, db_error = setup_database_connection() if db_error: return jsonify({'error': f'Error de conexión a la base de datos: {db_error}'}), 500 agent, agent_error = create_agent(llm, connection_string) if agent_error: return jsonify({'error': f'Error al crear el agente: {agent_error}'}), 500 # Obtener respuesta del agente response = agent.invoke({"input": user_message}) # Procesar la respuesta if hasattr(response, 'output') and response.output: response_text = response.output elif isinstance(response, str): response_text = response elif hasattr(response, 'get') and callable(response.get) and 'output' in response: response_text = response['output'] else: response_text = str(response) # Eliminar el mensaje almacenado después de procesarlo del message_store[message_id] return jsonify({ 'response': response_text, 'status': 'success' }) except Exception as e: return jsonify({'error': str(e)}), 500 # ... (resto del código existente sin cambios) ... def create_application(): """Create and configure the Gradio application.""" # Create the UI components demo, chatbot, chart_display, question_input, submit_button, streaming_output_display = create_ui() # Montar la API Flask en la aplicación Gradio if os.getenv('SPACE_ID'): demo = gr.mount_gradio_app( flask_app, demo, "/api" # Prefijo para los endpoints de la API ) def user_message(user_input: str, chat_history: List[Dict[str, str]]) -> Tuple[str, List[Dict[str, str]]]: """Add user message to chat history (messages format) and clear input.""" if not user_input.strip(): return "", chat_history logger.info(f"User message: {user_input}") if chat_history is None: chat_history = [] # Append user message in messages format chat_history.append({"role": "user", "content": user_input}) return "", chat_history def bot_response(chat_history: List[Dict[str, str]]) -> Tuple[List[Dict[str, str]], Optional[go.Figure]]: """Generate bot response for messages-format chat history and return optional chart figure.""" if not chat_history: return chat_history, None # Ensure last message is a user turn awaiting assistant reply last = chat_history[-1] if not isinstance(last, dict) or last.get("role") != "user" or not last.get("content"): return chat_history, None try: question = last["content"] logger.info(f"Processing question: {question}") # Convert prior messages to pair history for stream_agent_response() pair_history: List[List[str]] = [] i = 0 while i < len(chat_history) - 1: m1 = chat_history[i] m2 = chat_history[i + 1] if i + 1 < len(chat_history) else None if ( isinstance(m1, dict) and m1.get("role") == "user" and isinstance(m2, dict) and m2.get("role") == "assistant" ): pair_history.append([m1.get("content", ""), m2.get("content", "")]) i += 2 else: i += 1 # Call the agent for this new user question assistant_message, chart_fig = stream_agent_response(question, pair_history) # Append assistant message back into messages history chat_history.append({"role": "assistant", "content": assistant_message}) logger.info("Response generation complete") return chat_history, chart_fig except Exception as e: error_msg = f"## ❌ Error\n\nError al procesar la solicitud:\n\n```\n{str(e)}\n```" logger.error(error_msg, exc_info=True) # Ensure we add an assistant error message for the UI chat_history.append({"role": "assistant", "content": error_msg}) return chat_history, None # Event handlers with demo: # Handle form submission msg_submit = question_input.submit( fn=user_message, inputs=[question_input, chatbot], outputs=[question_input, chatbot], queue=False ).then( fn=bot_response, inputs=[chatbot], outputs=[chatbot, chart_display], api_name="ask" ) # Handle button click btn_click = submit_button.click( fn=user_message, inputs=[question_input, chatbot], outputs=[question_input, chatbot], queue=False ).then( fn=bot_response, inputs=[chatbot], outputs=[chatbot, chart_display] ) return demo # Create the application demo = create_application() # Configuración para Hugging Face Spaces def get_app(): """Obtiene la instancia de la aplicación Gradio para Hugging Face Spaces.""" # Verificar si estamos en un entorno de Hugging Face Spaces if os.getenv('SPACE_ID'): # Configuración específica para Spaces demo.title = "🤖 Asistente de Base de Datos SQL (Demo)" demo.description = """ Este es un demo del asistente de base de datos SQL. Para usar la versión completa con conexión a base de datos, clona este espacio y configura las variables de entorno. """ return demo # Para desarrollo local if __name__ == "__main__": # Verificar si se debe ejecutar Flask o Gradio if os.environ.get('RUN_FLASK', 'false').lower() == 'true': # Ejecutar solo el servidor Flask port = int(os.environ.get('PORT', 5000)) flask_app.run(host='0.0.0.0', port=port) else: # Configuración para desarrollo local - versión simplificada para Gradio 5.x demo.launch( server_name="0.0.0.0", server_port=7860, debug=True, share=False )