Spaces:
Runtime error
Runtime error
import os | |
import subprocess | |
import asyncio | |
import time | |
import threading | |
import gc | |
import psutil | |
import torch | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
import uvicorn | |
from langchain.prompts import PromptTemplate | |
from langchain_community.chat_models import ChatOllama | |
from langchain_community.tools import DuckDuckGoSearchRun | |
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper | |
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser | |
from langgraph.graph import END, StateGraph | |
from typing_extensions import TypedDict | |
# Aseg煤rate de tener la librer铆a Ollama instalada | |
# pip install ollama langchain langchain_community langgraph | |
os.system("sudo curl -fsSL https://ollama.com/install.sh | sh") | |
# Funci贸n para instalar Ollama | |
def install_ollama(): | |
try: | |
print("Instalando Ollama...") | |
subprocess.run("curl -fsSL https://ollama.com/install.sh | sh", shell=True, check=False) | |
print("Ollama instalado con 茅xito.") | |
except subprocess.CalledProcessError as e: | |
print(f"Error al instalar Ollama: {e}") | |
raise | |
async def async_download_ollamahkvh(): | |
asyncio.run(install_ollama()) | |
# Iniciar el servicio de Ollama en un hilo | |
def ollama_service_thread(): | |
print("Iniciando el servicio de Ollama") | |
subprocess.run("ollama serve", shell=True) | |
# Funci贸n para descargar el modelo de Ollama | |
async def download_ollama_model(model_name='hf.co/MaziyarPanahi/Llama-3.2-3B-Instruct-uncensored-GGUF:IQ1_S'): | |
try: | |
print(f"Descargando el modelo: {model_name}") | |
subprocess.run(["ollama", "pull", model_name], check=False) | |
except subprocess.CalledProcessError as e: | |
print(f"Error al descargar el modelo: {e}") | |
raise | |
# Funci贸n as铆ncrona para manejar la descarga del modelo | |
async def async_download_ollama_model(): | |
await asyncio.to_thread(download_ollama_model) | |
# Crear un hilo para iniciar Ollama | |
OLLAMA_SERVICE_THREAD = threading.Thread(target=ollama_service_thread) | |
OLLAMA_SERVICE_THREAD.start() | |
# Esperar a que Ollama est茅 listo | |
print("Esperando a que Ollama inicie...") | |
time.sleep(10) | |
# Descargar el modelo de Hugging Face si no est谩 disponible | |
asyncio.run(async_download_ollama_model()) | |
# Crear instancia de FastAPI | |
app = FastAPI() | |
# Definici贸n del modelo de datos para recibir las consultas en la API | |
class QueryRequest(BaseModel): | |
query: str | |
# Definir el modelo de lenguaje de Ollama (sin 'temperature') | |
local_llm = 'hf.co/MaziyarPanahi/Llama-3.2-3B-Instruct-uncensored-GGUF:IQ1_S' | |
llama3 = ChatOllama(model=local_llm) | |
# Definir la herramienta de b煤squeda web usando DuckDuckGo | |
wrapper = DuckDuckGoSearchAPIWrapper(max_results=1) | |
web_search_tool = DuckDuckGoSearchRun(api_wrapper=wrapper) | |
# Definici贸n de los prompts para generaci贸n y enrutamiento | |
generate_prompt = PromptTemplate( | |
template=""" | |
<|begin_of_text|> | |
<|start_header_id|>system<|end_header_id|> | |
You are an AI assistant for Research Question Tasks, that synthesizes web search results. | |
Strictly use the following pieces of web search context to answer the question. If you don't know the answer, just say that you don't know. | |
Keep the answer concise, but provide all of the details you can in the form of a research report. | |
Only make direct references to material if provided in the context. | |
<|eot_id|> | |
<|start_header_id|>user<|end_header_id|> | |
Question: {question} | |
Web Search Context: {context} | |
Answer: | |
<|eot_id|> | |
<|start_header_id|>assistant<|end_header_id|>""", | |
input_variables=["question", "context"], | |
) | |
generate_chain = generate_prompt | llama3 | StrOutputParser() | |
router_prompt = PromptTemplate( | |
template=""" | |
<|begin_of_text|> | |
<|start_header_id|>system<|end_header_id|> | |
You are an expert at routing a user question to either the generation stage or web search. | |
Use the web search for questions that require more context for a better answer, or recent events. | |
Otherwise, you can skip and go straight to the generation phase to respond. | |
You do not need to be stringent with the keywords in the question related to these topics. | |
Give a binary choice 'web_search' or 'generate' based on the question. | |
Return the JSON with a single key 'choice' with no premable or explanation. | |
Question to route: {question} | |
<|eot_id|> | |
<|start_header_id|>assistant<|end_header_id|>""", | |
input_variables=["question"], | |
) | |
question_router = router_prompt | llama3 | JsonOutputParser() | |
query_prompt = PromptTemplate( | |
template=""" | |
<|begin_of_text|> | |
<|start_header_id|>system<|end_header_id|> | |
You are an expert at crafting web search queries for research questions. | |
More often than not, a user will ask a basic question that they wish to learn more about, however it might not be in the best format. | |
Reword their query to be the most effective web search string possible. | |
Return the JSON with a single key 'query' with no premable or explanation. | |
Question to transform: {question} | |
<|eot_id|> | |
<|start_header_id|>assistant<|end_header_id|>""", | |
input_variables=["question"], | |
) | |
query_chain = query_prompt | llama3 | JsonOutputParser() | |
# Definir el estado del grafo | |
class GraphState(TypedDict): | |
question: str | |
generation: str | |
search_query: str | |
context: str | |
# Nodos de procesamiento | |
async def generate(state): | |
print("Step: Generating Final Response") | |
question = state["question"] | |
context = state["context"] | |
generation = await asyncio.to_thread(generate_chain.invoke, {"context": context, "question": question}) | |
return {"generation": generation} | |
async def transform_query(state): | |
print("Step: Optimizing Query for Web Search") | |
question = state['question'] | |
gen_query = await asyncio.to_thread(query_chain.invoke, {"question": question}) | |
search_query = gen_query.get("query", "") # Asegurarnos de que estamos obteniendo la clave correcta | |
return {"search_query": search_query} | |
async def web_search(state): | |
search_query = state['search_query'] | |
print(f'Step: Searching the Web for: "{search_query}"') | |
try: | |
search_result = await asyncio.to_thread(web_search_tool.invoke, search_query) | |
if isinstance(search_result, str): # Si la respuesta es una cadena, la convertimos en un diccionario | |
print(f"Respuesta de b煤squeda web es cadena: {search_result}") | |
return {"context": search_result} | |
elif isinstance(search_result, dict): # Si es un diccionario, lo usamos directamente | |
return {"context": search_result} | |
else: | |
raise ValueError("Respuesta de b煤squeda web no es v谩lida") | |
except Exception as e: | |
print(f"Web search failed: {e}") | |
return None # Si la b煤squeda falla, no devuelve contexto | |
async def route_question(state): | |
print("Step: Routing Query") | |
question = state['question'] | |
output = await asyncio.to_thread(question_router.invoke, {"question": question}) | |
if output.get('choice') == "web_search": | |
print("Step: Routing Query to Web Search") | |
return "websearch" | |
elif output.get('choice') == 'generate': | |
print("Step: Routing Query to Generation") | |
return "generate" | |
# Crear el grafo de estado | |
workflow = StateGraph(GraphState) | |
workflow.add_node("websearch", web_search) | |
workflow.add_node("transform_query", transform_query) | |
workflow.add_node("generate", generate) | |
workflow.set_conditional_entry_point( | |
route_question, | |
{ | |
"websearch": "transform_query", | |
"generate": "generate", | |
}, | |
) | |
workflow.add_edge("transform_query", "websearch") | |
workflow.add_edge("websearch", "generate") | |
workflow.add_edge("generate", END) | |
# Compilar el agente | |
local_agent = workflow.compile() | |
# Funci贸n para ejecutar el agente | |
async def run_agent_parallel(query): | |
output = await asyncio.to_thread(local_agent.invoke, {"question": query}) | |
return output['generation'] | |
# L贸gica del servidor FastAPI | |
async def query_endpoint(request: QueryRequest): | |
query = request.query | |
return {"response": await run_agent_parallel(query)} | |
# L贸gica de recursos | |
def release_resources(): | |
try: | |
torch.cuda.empty_cache() | |
gc.collect() | |
except Exception as e: | |
print(f"Failed to release resources: {e}") | |
def resource_manager(): | |
MAX_RAM_PERCENT = 1 | |
MAX_CPU_PERCENT = 1 | |
MAX_GPU_PERCENT = 1 | |
MAX_RAM_MB = 1 | |
while True: | |
try: | |
virtual_mem = psutil.virtual_memory() | |
current_ram_percent = virtual_mem.percent | |
current_ram_mb = virtual_mem.used / (1 * 1) # Convert to MB | |
if current_ram_percent > MAX_RAM_PERCENT or current_ram_mb > MAX_RAM_MB: | |
release_resources() | |
current_cpu_percent = psutil.cpu_percent() | |
if current_cpu_percent > MAX_CPU_PERCENT: | |
psutil.Process(os.getpid()).nice() | |
if torch.cuda.is_available(): | |
gpu = torch.cuda.current_device() | |
gpu_mem = torch.cuda.memory_percent(gpu) | |
if gpu_mem > MAX_GPU_PERCENT: | |
release_resources() | |
except Exception as e: | |
print(f"Error in resource manager: {e}") | |
resource_manager() | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |