Spaces:
Build error
Build error
File size: 9,752 Bytes
61dff03 4ac9636 61dff03 4ac9636 61dff03 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
import os
import subprocess
import threading
import time
import asyncio
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langgraph.graph import END, StateGraph
from typing_extensions import TypedDict
from fastapi.responses import StreamingResponse
from ollama import AsyncClient
import gc
import psutil
import torch
from functools import lru_cache
# Asegúrate de tener la librería Ollama instalada
# pip install ollama langchain langchain_community langgraph
# Ruta de Ollama
OLLAMA = os.path.expanduser("~/ollama")
if not os.path.exists(OLLAMA):
print("Ollama no encontrado, descargando...")
subprocess.run("curl -L https://ollama.com/download/ollama-linux-amd64 -o ~/ollama", shell=True)
os.chmod(OLLAMA, 0o755)
# Iniciar el servidor Ollama en segundo plano
async def ollama_service_thread():
print("Iniciando el servicio de Ollama")
subprocess.run("~/ollama serve -1", shell=True) # -1 indica mantener el servidor en ejecución indefinidamente
# Iniciar el hilo de servicio Ollama
print("Creando y comenzando el hilo del servicio Ollama")
OLLAMA_SERVICE_THREAD = threading.Thread(target=asyncio.run, args=(ollama_service_thread(),))
OLLAMA_SERVICE_THREAD.start()
# Esperar a que Ollama se inicie
print("Esperando a que Ollama inicie...")
time.sleep(10)
# Descargar el modelo de Hugging Face si no está disponible
async def download_ollama_model(model_name='hf.co/MaziyarPanahi/Llama-3.2-3B-Instruct-uncensored-GGUF:IQ1_S'):
try:
print(f"Descargando el modelo: {model_name}")
subprocess.run(["ollama", "pull", model_name], check=True)
except subprocess.CalledProcessError as e:
print(f"Error al descargar el modelo: {e}")
raise
# Descargar el modelo de Ollama en el hilo principal
download_ollama_model("hf.co/MaziyarPanahi/Llama-3.2-3B-Instruct-uncensored-GGUF:IQ1_S")
# Crear la aplicación FastAPI
app = FastAPI()
# Definir el modelo de datos para recibir las consultas en la API
class QueryRequest(BaseModel):
query: str
# Definir el modelo de lenguaje de Ollama
local_llm = 'hf.co/MaziyarPanahi/Llama-3.2-3B-Instruct-uncensored-GGUF:IQ1_S'
llama3 = ChatOllama(model=local_llm)
# Definir la herramienta de búsqueda web usando DuckDuckGo
wrapper = DuckDuckGoSearchAPIWrapper(max_results=1)
web_search_tool = DuckDuckGoSearchRun(api_wrapper=wrapper)
# Implementar optimización extrema mediante LRU Cache
@lru_cache(maxsize=1024) # Cache de tamaño limitado para resultados previos
async def cached_search(query):
return await web_search_tool.invoke(query)
# Definir los prompts para generación y enrutamiento
generate_prompt = PromptTemplate(
template="""
<|begin_of_text|>
<|start_header_id|>system<|end_header_id|>
You are an AI assistant for Research Question Tasks, that synthesizes web search results.
Strictly use the following pieces of web search context to answer the question. If you don't know the answer, just say that you don't know.
Keep the answer concise, but provide all of the details you can in the form of a research report.
Only make direct references to material if provided in the context.
<|eot_id|>
<|start_header_id|>user<|end_header_id|>
Question: {question}
Web Search Context: {context}
Answer:
<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>""",
input_variables=["question", "context"],
)
generate_chain = generate_prompt | llama3 | StrOutputParser()
router_prompt = PromptTemplate(
template="""
<|begin_of_text|>
<|start_header_id|>system<|end_header_id|>
You are an expert at routing a user question to either the generation stage or web search.
Use the web search for questions that require more context for a better answer, or recent events.
Otherwise, you can skip and go straight to the generation phase to respond.
You do not need to be stringent with the keywords in the question related to these topics.
Give a binary choice 'web_search' or 'generate' based on the question.
Return the JSON with a single key 'choice' with no premable or explanation.
Question to route: {question}
<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>""",
input_variables=["question"],
)
question_router = router_prompt | llama3 | JsonOutputParser()
query_prompt = PromptTemplate(
template="""
<|begin_of_text|>
<|start_header_id|>system<|end_header_id|>
You are an expert at crafting web search queries for research questions.
More often than not, a user will ask a basic question that they wish to learn more about, however it might not be in the best format.
Reword their query to be the most effective web search string possible.
Return the JSON with a single key 'query' with no premable or explanation.
Question to transform: {question}
<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>""",
input_variables=["question"],
)
query_chain = query_prompt | llama3 | JsonOutputParser()
# Definir el estado del grafo
class GraphState(TypedDict):
question: str
generation: str
search_query: str
context: str
# Nodos de procesamiento
async def generate(state):
print("Step: Generating Final Response")
question = state["question"]
context = state["context"]
generation = await generate_chain.invoke({"context": context, "question": question})
return {"generation": generation}
async def transform_query(state):
print("Step: Optimizing Query for Web Search")
question = state['question']
gen_query = await query_chain.invoke({"question": question})
search_query = gen_query.get("query", "") # Asegurarnos de que estamos obteniendo la clave correcta
return {"search_query": search_query}
async def web_search(state):
search_query = state['search_query']
print(f'Step: Searching the Web for: "{search_query}"')
try:
# Se usa la caché para optimizar los resultados
search_result = await cached_search(search_query)
if isinstance(search_result, str): # Si la respuesta es una cadena, la convertimos en un diccionario
print(f"Respuesta de búsqueda web es cadena: {search_result}")
return {"context": search_result}
elif isinstance(search_result, dict): # Si es un diccionario, lo usamos directamente
return {"context": search_result}
else:
raise ValueError("Respuesta de búsqueda web no es válida")
except Exception as e:
print(f"Web search failed: {e}")
return None # Si la búsqueda falla, no devuelve contexto
async def route_question(state):
print("Step: Routing Query")
question = state['question']
output = await question_router.invoke({"question": question})
if output.get('choice') == "web_search":
print("Step: Routing Query to Web Search")
return "websearch"
elif output.get('choice') == 'generate':
print("Step: Routing Query to Generation")
return "generate"
# Crear el grafo de estado
workflow = StateGraph(GraphState)
workflow.add_node("websearch", web_search)
workflow.add_node("transform_query", transform_query)
workflow.add_node("generate", generate)
workflow.set_conditional_entry_point(
route_question,
{
"websearch": "transform_query",
"generate": "generate",
},
)
# Gestión de recursos de CPU, RAM y GPU
def release_resources():
try:
torch.cuda.empty_cache()
gc.collect()
except Exception as e:
print(f"Failed to release resources: {e}")
def resource_manager():
MAX_RAM_PERCENT = 1 # Ajustar según sea necesario
MAX_CPU_PERCENT = 1
MAX_GPU_PERCENT = 1
MAX_RAM_MB = 1 # Ajustar según la memoria disponible
while True:
try:
virtual_mem = psutil.virtual_memory()
current_ram_percent = virtual_mem.percent
current_ram_mb = virtual_mem.used / (1 * 1) # Convertir a MB
if current_ram_percent > MAX_RAM_PERCENT or current_ram_mb > MAX_RAM_MB:
release_resources()
current_cpu_percent = psutil.cpu_percent()
if current_cpu_percent > MAX_CPU_PERCENT:
psutil.Process(os.getpid()).nice()
if torch.cuda.is_available():
gpu = torch.cuda.current_device()
gpu_mem = torch.cuda.memory_allocated(gpu) / (1024 * 1024) # Convertir a MB
if gpu_mem > MAX_GPU_PERCENT:
release_resources()
except Exception as e:
print(f"Error en el gestor de recursos: {e}")
resource_manager()
# Procesar la consulta en paralelo
async def process_query_in_parallel(query):
try:
state = GraphState(question=query, generation="", search_query="", context="")
return await workflow.invoke(state)
except Exception as e:
print(f"Error en la ejecución paralela: {e}")
raise
# Ruta de la API para manejar consultas
@app.post("/query")
async def query_handler(request: QueryRequest):
try:
query = request.query
result = await process_query_in_parallel(query) # Llamada asíncrona
return {"results": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Ejecutar el servidor FastAPI
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
|