Spaces:
Runtime error
Runtime error
import os | |
from dotenv import load_dotenv | |
# Load environment variables | |
load_dotenv() | |
# Set protobuf implementation to avoid C++ extension issues | |
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python" | |
# Load keys from environment | |
hf_token = os.getenv("HUGGINGFACE_INFERENCE_TOKEN") | |
serper_api_key = os.getenv("SERPER_API_KEY") | |
# ---- Imports ---- | |
from langgraph.graph import START, StateGraph, MessagesState | |
from langgraph.prebuilt import tools_condition, ToolNode | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader | |
try: | |
from langchain_community.vectorstores import Chroma | |
except ImportError: | |
from langchain.vectorstores import Chroma | |
from langchain_core.documents import Document | |
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
from langchain_core.tools import tool | |
from langchain_core.language_models.base import BaseLanguageModel | |
from langchain.tools.retriever import create_retriever_tool | |
try: | |
from langchain.embeddings import HuggingFaceEmbeddings as LegacyHFEmbeddings | |
except ImportError: | |
LegacyHFEmbeddings = HuggingFaceEmbeddings | |
from langchain.schema import Document as LegacyDocument | |
import json | |
import requests | |
from typing import List, Dict, Any | |
import re | |
import math | |
from datetime import datetime | |
# Custom HuggingFace LLM wrapper with better error handling | |
class SimpleHuggingFaceLLM(BaseLanguageModel): | |
def __init__(self, repo_id: str, hf_token: str): | |
super().__init__() | |
self.repo_id = repo_id | |
self.hf_token = hf_token | |
self.api_url = f"https://api-inference.huggingface.co/models/{repo_id}" | |
self.headers = {"Authorization": f"Bearer {hf_token}"} | |
# Test the connection | |
self._test_connection() | |
def _test_connection(self): | |
"""Test if the model is accessible""" | |
payload = { | |
"inputs": "Hello", | |
"parameters": { | |
"max_new_tokens": 10, | |
"temperature": 0.1, | |
"return_full_text": False | |
} | |
} | |
try: | |
response = requests.post(self.api_url, headers=self.headers, json=payload, timeout=30) | |
if response.status_code != 200: | |
print(f"Model {self.repo_id} test failed with status {response.status_code}: {response.text}") | |
raise Exception(f"Model not accessible: {response.status_code}") | |
print(f"Model {self.repo_id} test successful") | |
except Exception as e: | |
print(f"Model {self.repo_id} connection test failed: {e}") | |
raise e | |
def _generate(self, messages, stop=None, run_manager=None, **kwargs): | |
# Convert messages to a single prompt | |
if isinstance(messages, list): | |
prompt = messages[-1].content if messages else "" | |
else: | |
prompt = str(messages) | |
payload = { | |
"inputs": prompt, | |
"parameters": { | |
"max_new_tokens": 512, | |
"temperature": 0.1, | |
"return_full_text": False, | |
"do_sample": False | |
} | |
} | |
try: | |
response = requests.post(self.api_url, headers=self.headers, json=payload, timeout=60) | |
if response.status_code == 200: | |
result = response.json() | |
if isinstance(result, list) and len(result) > 0: | |
generated_text = result[0].get('generated_text', '') | |
elif isinstance(result, dict): | |
generated_text = result.get('generated_text', str(result)) | |
else: | |
generated_text = str(result) | |
from langchain_core.outputs import LLMResult, Generation | |
return LLMResult(generations=[[Generation(text=generated_text)]]) | |
else: | |
error_msg = f"API Error {response.status_code}: {response.text[:200]}" | |
print(error_msg) | |
from langchain_core.outputs import LLMResult, Generation | |
return LLMResult(generations=[[Generation(text=f"Error: {error_msg}")]]) | |
except Exception as e: | |
error_msg = f"Request failed: {str(e)}" | |
print(error_msg) | |
from langchain_core.outputs import LLMResult, Generation | |
return LLMResult(generations=[[Generation(text=error_msg)]]) | |
def invoke(self, input, config=None, **kwargs): | |
if isinstance(input, list): | |
prompt = input[-1].content if input else "" | |
else: | |
prompt = str(input) | |
result = self._generate(prompt) | |
generated_text = result.generations[0][0].text | |
return AIMessage(content=generated_text) | |
def _llm_type(self): | |
return "huggingface_custom" | |
def _call(self, prompt: str, stop=None, run_manager=None, **kwargs): | |
"""Legacy method for compatibility""" | |
result = self._generate(prompt) | |
return result.generations[0][0].text | |
# ---- Enhanced Tools ---- | |
def multiply(a: float, b: float) -> float: | |
"""Multiply two numbers""" | |
return a * b | |
def add(a: float, b: float) -> float: | |
"""Add two numbers""" | |
return a + b | |
def subtract(a: float, b: float) -> float: | |
"""Subtract two numbers""" | |
return a - b | |
def divide(a: float, b: float) -> float: | |
"""Divide two numbers""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Calculate modulus of two integers""" | |
return a % b | |
def power(a: float, b: float) -> float: | |
"""Calculate a raised to the power of b""" | |
return a ** b | |
def square_root(a: float) -> float: | |
"""Calculate square root of a number""" | |
return math.sqrt(a) | |
def factorial(n: int) -> int: | |
"""Calculate factorial of a number""" | |
if n < 0: | |
raise ValueError("Factorial is not defined for negative numbers") | |
if n == 0 or n == 1: | |
return 1 | |
result = 1 | |
for i in range(2, n + 1): | |
result *= i | |
return result | |
def gcd(a: int, b: int) -> int: | |
"""Calculate greatest common divisor""" | |
while b: | |
a, b = b, a % b | |
return a | |
def lcm(a: int, b: int) -> int: | |
"""Calculate least common multiple""" | |
return abs(a * b) // gcd(a, b) | |
def percentage(part: float, whole: float) -> float: | |
"""Calculate percentage""" | |
return (part / whole) * 100 | |
def compound_interest(principal: float, rate: float, time: float, n: int = 1) -> float: | |
"""Calculate compound interest""" | |
return principal * (1 + rate/n) ** (n * time) | |
def calculate_average(numbers: str) -> float: | |
"""Calculate average of comma-separated numbers""" | |
try: | |
nums = [float(x.strip()) for x in numbers.split(',')] | |
return sum(nums) / len(nums) | |
except: | |
return 0.0 | |
def wiki_search(query: str) -> str: | |
"""Search Wikipedia for information""" | |
try: | |
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
if not search_docs: | |
return "No Wikipedia results found." | |
formatted = "\n\n---\n\n".join([ | |
f'Wikipedia: {doc.metadata.get("title", "Unknown")}\n{doc.page_content[:1500]}' | |
for doc in search_docs | |
]) | |
return formatted | |
except Exception as e: | |
return f"Wikipedia search error: {str(e)}" | |
def web_search(query: str) -> str: | |
"""Search the web using Tavily""" | |
try: | |
search_docs = TavilySearchResults(max_results=2).invoke(query=query) | |
if not search_docs: | |
return "No web search results found." | |
formatted = "\n\n---\n\n".join([ | |
f'Web: {doc.get("title", "Unknown")}\n{doc.get("content", "")[:1500]}' | |
for doc in search_docs | |
]) | |
return formatted | |
except Exception as e: | |
return f"Web search error: {str(e)}" | |
def simple_calculation(expression: str) -> str: | |
"""Safely evaluate simple mathematical expressions""" | |
try: | |
# Remove any non-mathematical characters for safety | |
safe_chars = set('0123456789+-*/.() ') | |
if not all(c in safe_chars for c in expression): | |
return "Invalid characters in expression" | |
# Evaluate the expression | |
result = eval(expression) | |
return str(result) | |
except Exception as e: | |
return f"Calculation error: {str(e)}" | |
# ---- Embedding & Vector Store Setup with better error handling ---- | |
def setup_vector_store(): | |
try: | |
# Try different embedding models | |
embedding_models = [ | |
"sentence-transformers/all-MiniLM-L6-v2", | |
"sentence-transformers/all-mpnet-base-v2" | |
] | |
embeddings = None | |
for model_name in embedding_models: | |
try: | |
embeddings = HuggingFaceEmbeddings(model_name=model_name) | |
print(f"Successfully loaded embeddings: {model_name}") | |
break | |
except Exception as e: | |
print(f"Failed to load embeddings {model_name}: {e}") | |
continue | |
if embeddings is None: | |
print("Could not load any embedding model, skipping vector store setup") | |
return None | |
# Check if metadata.jsonl exists and load it | |
if os.path.exists('metadata.jsonl'): | |
json_QA = [] | |
with open('metadata.jsonl', 'r') as jsonl_file: | |
for line in jsonl_file: | |
if line.strip(): | |
try: | |
json_QA.append(json.loads(line)) | |
except: | |
continue | |
if json_QA: | |
documents = [] | |
for sample in json_QA: | |
if sample.get('Question') and sample.get('Final answer'): | |
doc = Document( | |
page_content=f"Question: {sample['Question']}\n\nAnswer: {sample['Final answer']}", | |
metadata={"source": sample.get("task_id", "unknown")} | |
) | |
documents.append(doc) | |
if documents: | |
try: | |
vector_store = Chroma.from_documents( | |
documents=documents, | |
embedding=embeddings, | |
persist_directory="./chroma_db", | |
collection_name="my_collection" | |
) | |
vector_store.persist() | |
print(f"Vector store created with {len(documents)} documents") | |
return vector_store | |
except Exception as e: | |
print(f"Error creating vector store with documents: {e}") | |
# Create empty vector store if no data | |
try: | |
vector_store = Chroma( | |
embedding_function=embeddings, | |
persist_directory="./chroma_db", | |
collection_name="my_collection" | |
) | |
print("Empty vector store created") | |
return vector_store | |
except Exception as e: | |
print(f"Error creating empty vector store: {e}") | |
return None | |
except Exception as e: | |
print(f"Vector store setup error: {e}") | |
return None | |
# Try to setup vector store, but don't fail if it doesn't work | |
vector_store = setup_vector_store() | |
def similar_question_search(query: str) -> str: | |
"""Search for similar questions in the knowledge base""" | |
if not vector_store: | |
return "No similar questions available" | |
try: | |
matched_docs = vector_store.similarity_search(query, k=2) | |
if not matched_docs: | |
return "No similar questions found" | |
formatted = "\n\n".join([ | |
f'Similar Q&A:\n{doc.page_content[:800]}' | |
for doc in matched_docs | |
]) | |
return formatted | |
except Exception as e: | |
return f"Similar question search error: {str(e)}" | |
# ---- Enhanced System Prompt ---- | |
system_prompt = """ | |
You are an expert assistant that can solve various types of questions using available tools. | |
Available tools: | |
- Math: add, subtract, multiply, divide, modulus, power, square_root, factorial, gcd, lcm, percentage, compound_interest, calculate_average, simple_calculation | |
- Search: wiki_search, web_search, similar_question_search | |
Instructions: | |
1. Read the question carefully | |
2. Break down complex problems into steps | |
3. Use appropriate tools to gather information or perform calculations | |
4. Think step by step and show your reasoning | |
5. Provide accurate, concise answers | |
IMPORTANT: Always end your response with: | |
FINAL ANSWER: [your answer here] | |
For the final answer: | |
- Numbers: Use plain digits (no commas, units, or symbols unless requested) | |
- Text: Use exact names without articles | |
- Lists: Comma-separated values | |
Think carefully and use tools when needed. | |
""" | |
sys_msg = SystemMessage(content=system_prompt) | |
# ---- Tool List ---- | |
tools = [ | |
# Math tools | |
multiply, add, subtract, divide, modulus, power, square_root, | |
factorial, gcd, lcm, percentage, compound_interest, calculate_average, simple_calculation, | |
# Search tools | |
wiki_search, web_search, similar_question_search | |
] | |
# ---- Graph Definition with better error handling ---- | |
def build_graph(provider: str = "huggingface"): | |
"""Build the agent graph with custom HuggingFace integration""" | |
if provider == "huggingface": | |
if not hf_token: | |
raise ValueError("HUGGINGFACE_INFERENCE_TOKEN is required but not found in environment variables") | |
# Use custom HuggingFace LLM with better model selection | |
models_to_try = [ | |
"microsoft/DialoGPT-medium", | |
"google/flan-t5-base", | |
"facebook/blenderbot-400M-distill", | |
"microsoft/DialoGPT-small" | |
] | |
llm = None | |
for model_id in models_to_try: | |
try: | |
print(f"Trying to initialize model: {model_id}") | |
llm = SimpleHuggingFaceLLM(repo_id=model_id, hf_token=hf_token) | |
print(f"Successfully initialized model: {model_id}") | |
break | |
except Exception as e: | |
print(f"Failed to initialize {model_id}: {e}") | |
continue | |
if llm is None: | |
raise ValueError("Failed to initialize any HuggingFace model. Please check your HUGGINGFACE_INFERENCE_TOKEN and internet connection.") | |
else: | |
raise ValueError("Only 'huggingface' provider is supported") | |
# Simple tool binding simulation | |
def llm_with_tools(messages): | |
return llm.invoke(messages) | |
def assistant(state: MessagesState): | |
"""Assistant node with enhanced error handling""" | |
try: | |
messages = state["messages"] | |
response = llm_with_tools(messages) | |
return {"messages": [response]} | |
except Exception as e: | |
print(f"Assistant error: {e}") | |
fallback_response = AIMessage(content="I encountered an error processing your request. Let me try a simpler approach.") | |
return {"messages": [fallback_response]} | |
def retriever(state: MessagesState): | |
"""Enhanced retriever with context injection""" | |
messages = state["messages"] | |
user_query = messages[-1].content if messages else "" | |
context_messages = [sys_msg] | |
# Add similar question context if available | |
if vector_store: | |
try: | |
similar = vector_store.similarity_search(user_query, k=1) | |
if similar: | |
context_msg = HumanMessage( | |
content=f"Here's a similar example:\n{similar[0].page_content[:500]}" | |
) | |
context_messages.append(context_msg) | |
except Exception as e: | |
print(f"Retriever error: {e}") | |
return {"messages": context_messages + messages} | |
# Build simplified graph | |
builder = StateGraph(MessagesState) | |
builder.add_node("retriever", retriever) | |
builder.add_node("assistant", assistant) | |
# Simple linear flow | |
builder.add_edge(START, "retriever") | |
builder.add_edge("retriever", "assistant") | |
return builder.compile() |