import os import subprocess import asyncio import time import threading import gc import psutil import torch from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uvicorn from langchain.prompts import PromptTemplate from langchain_community.chat_models import ChatOllama from langchain_community.tools import DuckDuckGoSearchRun from langchain_community.utilities import DuckDuckGoSearchAPIWrapper from langchain_core.output_parsers import JsonOutputParser, StrOutputParser from langgraph.graph import END, StateGraph from typing_extensions import TypedDict # Asegúrate de tener la librería Ollama instalada # pip install ollama langchain langchain_community langgraph os.system("curl -fsSL https://ollama.com/install.sh | sh") # Función para instalar Ollama def install_ollama(): try: print("Instalando Ollama...") subprocess.run("curl -fsSL https://ollama.com/install.sh | sh", shell=True, check=False) print("Ollama instalado con éxito.") except subprocess.CalledProcessError as e: print(f"Error al instalar Ollama: {e}") raise async def async_download_ollamahkvh(): asyncio.run(install_ollama()) # Iniciar el servicio de Ollama en un hilo def ollama_service_thread(): print("Iniciando el servicio de Ollama") subprocess.run("ollama serve", shell=True) # Función para descargar el modelo de Ollama async def download_ollama_model(model_name='hf.co/MaziyarPanahi/Llama-3.2-3B-Instruct-uncensored-GGUF:IQ1_S'): try: print(f"Descargando el modelo: {model_name}") subprocess.run(["ollama", "pull", model_name], check=False) except subprocess.CalledProcessError as e: print(f"Error al descargar el modelo: {e}") raise # Función asíncrona para manejar la descarga del modelo async def async_download_ollama_model(): await asyncio.to_thread(download_ollama_model) # Crear un hilo para iniciar Ollama OLLAMA_SERVICE_THREAD = threading.Thread(target=ollama_service_thread) OLLAMA_SERVICE_THREAD.start() # Esperar a que Ollama esté listo print("Esperando a que Ollama inicie...") time.sleep(10) # Descargar el modelo de Hugging Face si no está disponible asyncio.run(async_download_ollama_model()) # Crear instancia de FastAPI app = FastAPI() # Definición del modelo de datos para recibir las consultas en la API class QueryRequest(BaseModel): query: str # Definir el modelo de lenguaje de Ollama (sin 'temperature') local_llm = 'hf.co/MaziyarPanahi/Llama-3.2-3B-Instruct-uncensored-GGUF:IQ1_S' llama3 = ChatOllama(model=local_llm) # Definir la herramienta de búsqueda web usando DuckDuckGo wrapper = DuckDuckGoSearchAPIWrapper(max_results=1) web_search_tool = DuckDuckGoSearchRun(api_wrapper=wrapper) # Definición de los prompts para generación y enrutamiento generate_prompt = PromptTemplate( template=""" <|begin_of_text|> <|start_header_id|>system<|end_header_id|> You are an AI assistant for Research Question Tasks, that synthesizes web search results. Strictly use the following pieces of web search context to answer the question. If you don't know the answer, just say that you don't know. Keep the answer concise, but provide all of the details you can in the form of a research report. Only make direct references to material if provided in the context. <|eot_id|> <|start_header_id|>user<|end_header_id|> Question: {question} Web Search Context: {context} Answer: <|eot_id|> <|start_header_id|>assistant<|end_header_id|>""", input_variables=["question", "context"], ) generate_chain = generate_prompt | llama3 | StrOutputParser() router_prompt = PromptTemplate( template=""" <|begin_of_text|> <|start_header_id|>system<|end_header_id|> You are an expert at routing a user question to either the generation stage or web search. Use the web search for questions that require more context for a better answer, or recent events. Otherwise, you can skip and go straight to the generation phase to respond. You do not need to be stringent with the keywords in the question related to these topics. Give a binary choice 'web_search' or 'generate' based on the question. Return the JSON with a single key 'choice' with no premable or explanation. Question to route: {question} <|eot_id|> <|start_header_id|>assistant<|end_header_id|>""", input_variables=["question"], ) question_router = router_prompt | llama3 | JsonOutputParser() query_prompt = PromptTemplate( template=""" <|begin_of_text|> <|start_header_id|>system<|end_header_id|> You are an expert at crafting web search queries for research questions. More often than not, a user will ask a basic question that they wish to learn more about, however it might not be in the best format. Reword their query to be the most effective web search string possible. Return the JSON with a single key 'query' with no premable or explanation. Question to transform: {question} <|eot_id|> <|start_header_id|>assistant<|end_header_id|>""", input_variables=["question"], ) query_chain = query_prompt | llama3 | JsonOutputParser() # Definir el estado del grafo class GraphState(TypedDict): question: str generation: str search_query: str context: str # Nodos de procesamiento async def generate(state): print("Step: Generating Final Response") question = state["question"] context = state["context"] generation = await asyncio.to_thread(generate_chain.invoke, {"context": context, "question": question}) return {"generation": generation} async def transform_query(state): print("Step: Optimizing Query for Web Search") question = state['question'] gen_query = await asyncio.to_thread(query_chain.invoke, {"question": question}) search_query = gen_query.get("query", "") # Asegurarnos de que estamos obteniendo la clave correcta return {"search_query": search_query} async def web_search(state): search_query = state['search_query'] print(f'Step: Searching the Web for: "{search_query}"') try: search_result = await asyncio.to_thread(web_search_tool.invoke, search_query) if isinstance(search_result, str): # Si la respuesta es una cadena, la convertimos en un diccionario print(f"Respuesta de búsqueda web es cadena: {search_result}") return {"context": search_result} elif isinstance(search_result, dict): # Si es un diccionario, lo usamos directamente return {"context": search_result} else: raise ValueError("Respuesta de búsqueda web no es válida") except Exception as e: print(f"Web search failed: {e}") return None # Si la búsqueda falla, no devuelve contexto async def route_question(state): print("Step: Routing Query") question = state['question'] output = await asyncio.to_thread(question_router.invoke, {"question": question}) if output.get('choice') == "web_search": print("Step: Routing Query to Web Search") return "websearch" elif output.get('choice') == 'generate': print("Step: Routing Query to Generation") return "generate" # Crear el grafo de estado workflow = StateGraph(GraphState) workflow.add_node("websearch", web_search) workflow.add_node("transform_query", transform_query) workflow.add_node("generate", generate) workflow.set_conditional_entry_point( route_question, { "websearch": "transform_query", "generate": "generate", }, ) workflow.add_edge("transform_query", "websearch") workflow.add_edge("websearch", "generate") workflow.add_edge("generate", END) # Compilar el agente local_agent = workflow.compile() # Función para ejecutar el agente async def run_agent_parallel(query): output = await asyncio.to_thread(local_agent.invoke, {"question": query}) return output['generation'] # Lógica del servidor FastAPI @app.post("/query") async def query_endpoint(request: QueryRequest): query = request.query return {"response": await run_agent_parallel(query)} # Lógica de recursos def release_resources(): try: torch.cuda.empty_cache() gc.collect() except Exception as e: print(f"Failed to release resources: {e}") def resource_manager(): MAX_RAM_PERCENT = 1 MAX_CPU_PERCENT = 1 MAX_GPU_PERCENT = 1 MAX_RAM_MB = 1 while True: try: virtual_mem = psutil.virtual_memory() current_ram_percent = virtual_mem.percent current_ram_mb = virtual_mem.used / (1 * 1) # Convert to MB if current_ram_percent > MAX_RAM_PERCENT or current_ram_mb > MAX_RAM_MB: release_resources() current_cpu_percent = psutil.cpu_percent() if current_cpu_percent > MAX_CPU_PERCENT: psutil.Process(os.getpid()).nice() if torch.cuda.is_available(): gpu = torch.cuda.current_device() gpu_mem = torch.cuda.memory_percent(gpu) if gpu_mem > MAX_GPU_PERCENT: release_resources() except Exception as e: print(f"Error in resource manager: {e}") resource_manager() if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)