Spaces:
Sleeping
Sleeping
import os, time, random | |
from dotenv import load_dotenv | |
from typing import List, Dict, Any, TypedDict, Annotated | |
import operator | |
# Load environment variables | |
load_dotenv() | |
# LangGraph imports | |
from langgraph.graph import StateGraph, END | |
from langgraph.prebuilt import create_react_agent | |
from langgraph.checkpoint.memory import MemorySaver | |
# LangChain imports | |
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage | |
from langchain_core.tools import tool | |
from langchain_groq import ChatGroq | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_nvidia_ai_endpoints import ChatNVIDIA | |
from langchain_core.rate_limiters import InMemoryRateLimiter | |
# Tavily import | |
from tavily import TavilyClient | |
# Advanced Rate Limiter (SILENT) | |
class AdvancedRateLimiter: | |
def __init__(self, requests_per_minute: int): | |
self.requests_per_minute = requests_per_minute | |
self.request_times = [] | |
def wait_if_needed(self): | |
current_time = time.time() | |
# Clean old requests (older than 1 minute) | |
self.request_times = [t for t in self.request_times if current_time - t < 60] | |
# Check if we need to wait | |
if len(self.request_times) >= self.requests_per_minute: | |
wait_time = 60 - (current_time - self.request_times[0]) + random.uniform(2, 8) | |
time.sleep(wait_time) | |
# Record this request | |
self.request_times.append(current_time) | |
# Initialize rate limiters for free tiers | |
groq_limiter = AdvancedRateLimiter(requests_per_minute=30) | |
gemini_limiter = AdvancedRateLimiter(requests_per_minute=2) | |
nvidia_limiter = AdvancedRateLimiter(requests_per_minute=5) # NVIDIA free tier | |
tavily_limiter = AdvancedRateLimiter(requests_per_minute=50) | |
# Initialize LangChain rate limiters for NVIDIA | |
nvidia_rate_limiter = InMemoryRateLimiter( | |
requests_per_second=0.083, # 5 requests per minute | |
check_every_n_seconds=0.1, | |
max_bucket_size=5 | |
) | |
# Initialize LLMs with best free models | |
groq_llm = ChatGroq( | |
model="llama-3.3-70b-versatile", | |
api_key=os.getenv("GROQ_API_KEY"), | |
temperature=0 | |
) | |
gemini_llm = ChatGoogleGenerativeAI( | |
model="gemini-2.0-flash-thinking-exp", | |
api_key=os.getenv("GOOGLE_API_KEY"), | |
temperature=0 | |
) | |
# Best NVIDIA models based on search results | |
nvidia_general_llm = ChatNVIDIA( | |
model="meta/llama3-70b-instruct", # Best general model from NVIDIA | |
api_key=os.getenv("NVIDIA_API_KEY"), | |
temperature=0, | |
max_tokens=4000, | |
rate_limiter=nvidia_rate_limiter | |
) | |
nvidia_code_llm = ChatNVIDIA( | |
model="meta/codellama-70b", # Best code generation model from NVIDIA | |
api_key=os.getenv("NVIDIA_API_KEY"), | |
temperature=0, | |
max_tokens=4000, | |
rate_limiter=nvidia_rate_limiter | |
) | |
nvidia_math_llm = ChatNVIDIA( | |
model="mistralai/mixtral-8x22b-instruct-v0.1", # Best reasoning model from NVIDIA | |
api_key=os.getenv("NVIDIA_API_KEY"), | |
temperature=0, | |
max_tokens=4000, | |
rate_limiter=nvidia_rate_limiter | |
) | |
# Initialize Tavily client | |
tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) | |
# Define State | |
class AgentState(TypedDict): | |
messages: Annotated[List[HumanMessage | AIMessage], operator.add] | |
query: str | |
agent_type: str | |
final_answer: str | |
# Custom Tools | |
def multiply_tool(a: float, b: float) -> float: | |
"""Multiply two numbers together""" | |
return a * b | |
def add_tool(a: float, b: float) -> float: | |
"""Add two numbers together""" | |
return a + b | |
def subtract_tool(a: float, b: float) -> float: | |
"""Subtract two numbers""" | |
return a - b | |
def divide_tool(a: float, b: float) -> float: | |
"""Divide two numbers""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def tavily_search_tool(query: str) -> str: | |
"""Search the web using Tavily for current information""" | |
try: | |
tavily_limiter.wait_if_needed() | |
response = tavily_client.search( | |
query=query, | |
max_results=3, | |
search_depth="basic", | |
include_answer=False | |
) | |
# Format results | |
results = [] | |
for result in response.get('results', []): | |
results.append(f"Title: {result.get('title', '')}\nContent: {result.get('content', '')}") | |
return "\n\n---\n\n".join(results) | |
except Exception as e: | |
return f"Tavily search failed: {str(e)}" | |
def wiki_search_tool(query: str) -> str: | |
"""Search Wikipedia for encyclopedic information""" | |
try: | |
time.sleep(random.uniform(1, 3)) | |
from langchain_community.document_loaders import WikipediaLoader | |
loader = WikipediaLoader(query=query, load_max_docs=1) | |
data = loader.load() | |
return "\n\n---\n\n".join([doc.page_content[:1000] for doc in data]) | |
except Exception as e: | |
return f"Wikipedia search failed: {str(e)}" | |
# Define tools for each agent type | |
math_tools = [multiply_tool, add_tool, subtract_tool, divide_tool] | |
research_tools = [tavily_search_tool, wiki_search_tool] | |
coordinator_tools = [tavily_search_tool, wiki_search_tool] | |
# Node functions | |
def router_node(state: AgentState) -> AgentState: | |
"""Route queries to appropriate agent type""" | |
query = state["query"].lower() | |
if any(word in query for word in ['calculate', 'math', 'multiply', 'add', 'subtract', 'divide', 'compute']): | |
agent_type = "math" | |
elif any(word in query for word in ['code', 'program', 'python', 'javascript', 'function', 'algorithm']): | |
agent_type = "code" | |
elif any(word in query for word in ['search', 'find', 'research', 'what is', 'who is', 'when', 'where']): | |
agent_type = "research" | |
else: | |
agent_type = "coordinator" | |
return {**state, "agent_type": agent_type} | |
def math_agent_node(state: AgentState) -> AgentState: | |
"""Mathematical specialist agent using NVIDIA Mixtral""" | |
nvidia_limiter.wait_if_needed() | |
system_message = SystemMessage(content="""You are a mathematical specialist with access to calculation tools. | |
Use the appropriate math tools for calculations. | |
Show your work step by step. | |
Always provide precise numerical answers. | |
Finish with: FINAL ANSWER: [numerical result]""") | |
# Create math agent with NVIDIA's best reasoning model | |
math_agent = create_react_agent(nvidia_math_llm, math_tools) | |
# Process query | |
messages = [system_message, HumanMessage(content=state["query"])] | |
config = {"configurable": {"thread_id": "math_thread"}} | |
try: | |
result = math_agent.invoke({"messages": messages}, config) | |
final_message = result["messages"][-1].content | |
return { | |
**state, | |
"messages": state["messages"] + [AIMessage(content=final_message)], | |
"final_answer": final_message | |
} | |
except Exception as e: | |
error_msg = f"Math agent error: {str(e)}" | |
return { | |
**state, | |
"messages": state["messages"] + [AIMessage(content=error_msg)], | |
"final_answer": error_msg | |
} | |
def code_agent_node(state: AgentState) -> AgentState: | |
"""Code generation specialist agent using NVIDIA CodeLlama""" | |
nvidia_limiter.wait_if_needed() | |
system_message = SystemMessage(content="""You are an expert coding AI specialist. | |
Generate clean, efficient, and well-documented code. | |
Explain your code solutions clearly. | |
Always provide working code examples. | |
Finish with: FINAL ANSWER: [your code solution]""") | |
# Create code agent with NVIDIA's best code model | |
code_agent = create_react_agent(nvidia_code_llm, []) | |
# Process query | |
messages = [system_message, HumanMessage(content=state["query"])] | |
config = {"configurable": {"thread_id": "code_thread"}} | |
try: | |
result = code_agent.invoke({"messages": messages}, config) | |
final_message = result["messages"][-1].content | |
return { | |
**state, | |
"messages": state["messages"] + [AIMessage(content=final_message)], | |
"final_answer": final_message | |
} | |
except Exception as e: | |
error_msg = f"Code agent error: {str(e)}" | |
return { | |
**state, | |
"messages": state["messages"] + [AIMessage(content=error_msg)], | |
"final_answer": error_msg | |
} | |
def research_agent_node(state: AgentState) -> AgentState: | |
"""Research specialist agent using Gemini""" | |
gemini_limiter.wait_if_needed() | |
system_message = SystemMessage(content="""You are a research specialist with access to web search and Wikipedia. | |
Use appropriate search tools to gather comprehensive information. | |
Always cite sources and provide well-researched answers. | |
Synthesize information from multiple sources when possible. | |
Finish with: FINAL ANSWER: [your researched answer]""") | |
# Create research agent | |
research_agent = create_react_agent(gemini_llm, research_tools) | |
# Process query | |
messages = [system_message, HumanMessage(content=state["query"])] | |
config = {"configurable": {"thread_id": "research_thread"}} | |
try: | |
result = research_agent.invoke({"messages": messages}, config) | |
final_message = result["messages"][-1].content | |
return { | |
**state, | |
"messages": state["messages"] + [AIMessage(content=final_message)], | |
"final_answer": final_message | |
} | |
except Exception as e: | |
error_msg = f"Research agent error: {str(e)}" | |
return { | |
**state, | |
"messages": state["messages"] + [AIMessage(content=error_msg)], | |
"final_answer": error_msg | |
} | |
def coordinator_agent_node(state: AgentState) -> AgentState: | |
"""Coordinator agent using NVIDIA Llama3""" | |
nvidia_limiter.wait_if_needed() | |
system_message = SystemMessage(content="""You are the main coordinator agent. | |
Analyze queries and provide comprehensive responses. | |
Use search tools for factual information when needed. | |
Always finish with: FINAL ANSWER: [your final answer]""") | |
# Create coordinator agent with NVIDIA's best general model | |
coordinator_agent = create_react_agent(nvidia_general_llm, coordinator_tools) | |
# Process query | |
messages = [system_message, HumanMessage(content=state["query"])] | |
config = {"configurable": {"thread_id": "coordinator_thread"}} | |
try: | |
result = coordinator_agent.invoke({"messages": messages}, config) | |
final_message = result["messages"][-1].content | |
return { | |
**state, | |
"messages": state["messages"] + [AIMessage(content=final_message)], | |
"final_answer": final_message | |
} | |
except Exception as e: | |
error_msg = f"Coordinator agent error: {str(e)}" | |
return { | |
**state, | |
"messages": state["messages"] + [AIMessage(content=error_msg)], | |
"final_answer": error_msg | |
} | |
# Conditional routing function | |
def route_agent(state: AgentState) -> str: | |
"""Route to appropriate agent based on agent_type""" | |
agent_type = state.get("agent_type", "coordinator") | |
if agent_type == "math": | |
return "math_agent" | |
elif agent_type == "code": | |
return "code_agent" | |
elif agent_type == "research": | |
return "research_agent" | |
else: | |
return "coordinator_agent" | |
# LangGraph Multi-Agent System | |
class LangGraphMultiAgentSystem: | |
def __init__(self): | |
self.request_count = 0 | |
self.last_request_time = time.time() | |
self.graph = self._create_graph() | |
def _create_graph(self) -> StateGraph: | |
"""Create the LangGraph workflow""" | |
workflow = StateGraph(AgentState) | |
# Add nodes | |
workflow.add_node("router", router_node) | |
workflow.add_node("math_agent", math_agent_node) | |
workflow.add_node("code_agent", code_agent_node) | |
workflow.add_node("research_agent", research_agent_node) | |
workflow.add_node("coordinator_agent", coordinator_agent_node) | |
# Add edges | |
workflow.set_entry_point("router") | |
workflow.add_conditional_edges( | |
"router", | |
route_agent, | |
{ | |
"math_agent": "math_agent", | |
"code_agent": "code_agent", | |
"research_agent": "research_agent", | |
"coordinator_agent": "coordinator_agent" | |
} | |
) | |
# All agents end the workflow | |
workflow.add_edge("math_agent", END) | |
workflow.add_edge("code_agent", END) | |
workflow.add_edge("research_agent", END) | |
workflow.add_edge("coordinator_agent", END) | |
# Compile the graph | |
memory = MemorySaver() | |
return workflow.compile(checkpointer=memory) | |
def process_query(self, query: str) -> str: | |
"""Process query using LangGraph multi-agent system""" | |
# Global rate limiting (SILENT) | |
current_time = time.time() | |
if current_time - self.last_request_time > 3600: | |
self.request_count = 0 | |
self.last_request_time = current_time | |
self.request_count += 1 | |
# Add delay between requests (SILENT) | |
if self.request_count > 1: | |
time.sleep(random.uniform(3, 10)) | |
# Initial state | |
initial_state = { | |
"messages": [HumanMessage(content=query)], | |
"query": query, | |
"agent_type": "", | |
"final_answer": "" | |
} | |
# Configuration for the graph | |
config = {"configurable": {"thread_id": f"thread_{self.request_count}"}} | |
try: | |
# Run the graph | |
final_state = self.graph.invoke(initial_state, config) | |
return final_state.get("final_answer", "No response generated") | |
except Exception as e: | |
return f"Error: {str(e)}" | |
# Main functions | |
def main(query: str) -> str: | |
"""Main function using LangGraph multi-agent system""" | |
langgraph_system = LangGraphMultiAgentSystem() | |
return langgraph_system.process_query(query) | |
def get_final_answer(query: str) -> str: | |
"""Extract only the FINAL ANSWER from the response""" | |
full_response = main(query) | |
if "FINAL ANSWER:" in full_response: | |
final_answer = full_response.split("FINAL ANSWER:")[-1].strip() | |
return final_answer | |
else: | |
return full_response.strip() | |
if __name__ == "__main__": | |
# Test the LangGraph system - CLEAN OUTPUT ONLY | |
result = get_final_answer("What are the names of the US presidents who were assassinated?") | |
print(result) | |