Spaces:
Runtime error
Runtime error
| import os | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Set protobuf implementation to avoid C++ extension issues | |
| os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python" | |
| # Load keys from environment | |
| hf_token = os.getenv("HUGGINGFACE_INFERENCE_TOKEN") | |
| serper_api_key = os.getenv("SERPER_API_KEY") | |
| # ---- Imports ---- | |
| from langgraph.graph import START, StateGraph, MessagesState | |
| from langgraph.prebuilt import tools_condition, ToolNode | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders import WikipediaLoader, ArxivLoader | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_core.documents import Document | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
| from langchain_core.tools import tool | |
| from langchain_core.language_models.base import BaseLanguageModel | |
| from langchain.tools.retriever import create_retriever_tool | |
| from langchain.vectorstores import Chroma | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.schema import Document | |
| import json | |
| import requests | |
| from typing import List, Dict, Any | |
| import re | |
| import math | |
| from datetime import datetime | |
| # Custom HuggingFace LLM wrapper | |
| class SimpleHuggingFaceLLM(BaseLanguageModel): | |
| def __init__(self, repo_id: str, hf_token: str): | |
| super().__init__() | |
| self.repo_id = repo_id | |
| self.hf_token = hf_token | |
| self.api_url = f"https://api-inference.huggingface.co/models/{repo_id}" | |
| self.headers = {"Authorization": f"Bearer {hf_token}"} | |
| def _generate(self, messages, stop=None, run_manager=None, **kwargs): | |
| # Convert messages to a single prompt | |
| if isinstance(messages, list): | |
| prompt = messages[-1].content if messages else "" | |
| else: | |
| prompt = str(messages) | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": { | |
| "max_new_tokens": 512, | |
| "temperature": 0.1, | |
| "return_full_text": False | |
| } | |
| } | |
| try: | |
| response = requests.post(self.api_url, headers=self.headers, json=payload) | |
| if response.status_code == 200: | |
| result = response.json() | |
| if isinstance(result, list) and len(result) > 0: | |
| generated_text = result[0].get('generated_text', '') | |
| else: | |
| generated_text = str(result) | |
| from langchain_core.outputs import LLMResult, Generation | |
| return LLMResult(generations=[[Generation(text=generated_text)]]) | |
| else: | |
| return LLMResult(generations=[[Generation(text=f"Error: {response.status_code}")]]) | |
| except Exception as e: | |
| return LLMResult(generations=[[Generation(text=f"Error: {str(e)}")]]) | |
| def invoke(self, input, config=None, **kwargs): | |
| if isinstance(input, list): | |
| prompt = input[-1].content if input else "" | |
| else: | |
| prompt = str(input) | |
| result = self._generate(prompt) | |
| generated_text = result.generations[0][0].text | |
| return AIMessage(content=generated_text) | |
| def _llm_type(self): | |
| return "huggingface_custom" | |
| # ---- Enhanced Tools ---- | |
| def multiply(a: float, b: float) -> float: | |
| """Multiply two numbers""" | |
| return a * b | |
| def add(a: float, b: float) -> float: | |
| """Add two numbers""" | |
| return a + b | |
| def subtract(a: float, b: float) -> float: | |
| """Subtract two numbers""" | |
| return a - b | |
| def divide(a: float, b: float) -> float: | |
| """Divide two numbers""" | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """Calculate modulus of two integers""" | |
| return a % b | |
| def power(a: float, b: float) -> float: | |
| """Calculate a raised to the power of b""" | |
| return a ** b | |
| def square_root(a: float) -> float: | |
| """Calculate square root of a number""" | |
| return math.sqrt(a) | |
| def factorial(n: int) -> int: | |
| """Calculate factorial of a number""" | |
| if n < 0: | |
| raise ValueError("Factorial is not defined for negative numbers") | |
| if n == 0 or n == 1: | |
| return 1 | |
| result = 1 | |
| for i in range(2, n + 1): | |
| result *= i | |
| return result | |
| def gcd(a: int, b: int) -> int: | |
| """Calculate greatest common divisor""" | |
| while b: | |
| a, b = b, a % b | |
| return a | |
| def lcm(a: int, b: int) -> int: | |
| """Calculate least common multiple""" | |
| return abs(a * b) // gcd(a, b) | |
| def percentage(part: float, whole: float) -> float: | |
| """Calculate percentage""" | |
| return (part / whole) * 100 | |
| def compound_interest(principal: float, rate: float, time: float, n: int = 1) -> float: | |
| """Calculate compound interest""" | |
| return principal * (1 + rate/n) ** (n * time) | |
| def calculate_average(numbers: str) -> float: | |
| """Calculate average of comma-separated numbers""" | |
| try: | |
| nums = [float(x.strip()) for x in numbers.split(',')] | |
| return sum(nums) / len(nums) | |
| except: | |
| return 0.0 | |
| def wiki_search(query: str) -> str: | |
| """Search Wikipedia for information""" | |
| try: | |
| search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
| if not search_docs: | |
| return "No Wikipedia results found." | |
| formatted = "\n\n---\n\n".join([ | |
| f'Wikipedia: {doc.metadata.get("title", "Unknown")}\n{doc.page_content[:1500]}' | |
| for doc in search_docs | |
| ]) | |
| return formatted | |
| except Exception as e: | |
| return f"Wikipedia search error: {str(e)}" | |
| def web_search(query: str) -> str: | |
| """Search the web using Tavily""" | |
| try: | |
| search_docs = TavilySearchResults(max_results=2).invoke(query=query) | |
| if not search_docs: | |
| return "No web search results found." | |
| formatted = "\n\n---\n\n".join([ | |
| f'Web: {doc.get("title", "Unknown")}\n{doc.get("content", "")[:1500]}' | |
| for doc in search_docs | |
| ]) | |
| return formatted | |
| except Exception as e: | |
| return f"Web search error: {str(e)}" | |
| def simple_calculation(expression: str) -> str: | |
| """Safely evaluate simple mathematical expressions""" | |
| try: | |
| # Remove any non-mathematical characters for safety | |
| safe_chars = set('0123456789+-*/.() ') | |
| if not all(c in safe_chars for c in expression): | |
| return "Invalid characters in expression" | |
| # Evaluate the expression | |
| result = eval(expression) | |
| return str(result) | |
| except Exception as e: | |
| return f"Calculation error: {str(e)}" | |
| # ---- Embedding & Vector Store Setup ---- | |
| def setup_vector_store(): | |
| try: | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| # Check if metadata.jsonl exists and load it | |
| if os.path.exists('metadata.jsonl'): | |
| json_QA = [] | |
| with open('metadata.jsonl', 'r') as jsonl_file: | |
| for line in jsonl_file: | |
| if line.strip(): # Skip empty lines | |
| try: | |
| json_QA.append(json.loads(line)) | |
| except: | |
| continue | |
| if json_QA: | |
| documents = [] | |
| for sample in json_QA: | |
| if sample.get('Question') and sample.get('Final answer'): | |
| doc = Document( | |
| page_content=f"Question: {sample['Question']}\n\nAnswer: {sample['Final answer']}", | |
| metadata={"source": sample.get("task_id", "unknown")} | |
| ) | |
| documents.append(doc) | |
| if documents: | |
| vector_store = Chroma.from_documents( | |
| documents=documents, | |
| embedding=embeddings, | |
| persist_directory="./chroma_db", | |
| collection_name="my_collection" | |
| ) | |
| vector_store.persist() | |
| print(f"Vector store created with {len(documents)} documents") | |
| return vector_store | |
| # Create empty vector store if no data | |
| vector_store = Chroma( | |
| embedding_function=embeddings, | |
| persist_directory="./chroma_db", | |
| collection_name="my_collection" | |
| ) | |
| print("Empty vector store created") | |
| return vector_store | |
| except Exception as e: | |
| print(f"Vector store setup error: {e}") | |
| return None | |
| vector_store = setup_vector_store() | |
| def similar_question_search(query: str) -> str: | |
| """Search for similar questions in the knowledge base""" | |
| if not vector_store: | |
| return "No similar questions available" | |
| try: | |
| matched_docs = vector_store.similarity_search(query, k=2) | |
| if not matched_docs: | |
| return "No similar questions found" | |
| formatted = "\n\n".join([ | |
| f'Similar Q&A:\n{doc.page_content[:800]}' | |
| for doc in matched_docs | |
| ]) | |
| return formatted | |
| except Exception as e: | |
| return f"Similar question search error: {str(e)}" | |
| # ---- Enhanced System Prompt ---- | |
| system_prompt = """ | |
| You are an expert assistant that can solve various types of questions using available tools. | |
| Available tools: | |
| - Math: add, subtract, multiply, divide, modulus, power, square_root, factorial, gcd, lcm, percentage, compound_interest, calculate_average, simple_calculation | |
| - Search: wiki_search, web_search, similar_question_search | |
| Instructions: | |
| 1. Read the question carefully | |
| 2. Break down complex problems into steps | |
| 3. Use appropriate tools to gather information or perform calculations | |
| 4. Think step by step and show your reasoning | |
| 5. Provide accurate, concise answers | |
| IMPORTANT: Always end your response with: | |
| FINAL ANSWER: [your answer here] | |
| For the final answer: | |
| - Numbers: Use plain digits (no commas, units, or symbols unless requested) | |
| - Text: Use exact names without articles | |
| - Lists: Comma-separated values | |
| Think carefully and use tools when needed. | |
| """ | |
| sys_msg = SystemMessage(content=system_prompt) | |
| # ---- Tool List ---- | |
| tools = [ | |
| # Math tools | |
| multiply, add, subtract, divide, modulus, power, square_root, | |
| factorial, gcd, lcm, percentage, compound_interest, calculate_average, simple_calculation, | |
| # Search tools | |
| wiki_search, web_search, similar_question_search | |
| ] | |
| # ---- Graph Definition ---- | |
| def build_graph(provider: str = "huggingface"): | |
| """Build the agent graph with custom HuggingFace integration""" | |
| if provider == "huggingface": | |
| # Use custom HuggingFace LLM with fallback models | |
| models_to_try = [ | |
| "google/flan-t5-base", | |
| "microsoft/DialoGPT-medium", | |
| "bigscience/bloom-560m" | |
| ] | |
| llm = None | |
| for model_id in models_to_try: | |
| try: | |
| llm = SimpleHuggingFaceLLM(repo_id=model_id, hf_token=hf_token) | |
| print(f"Successfully initialized model: {model_id}") | |
| break | |
| except Exception as e: | |
| print(f"Failed to initialize {model_id}: {e}") | |
| continue | |
| if llm is None: | |
| raise ValueError("Failed to initialize any HuggingFace model") | |
| else: | |
| raise ValueError("Only 'huggingface' provider is supported") | |
| # Simple tool binding simulation | |
| def llm_with_tools(messages): | |
| return llm.invoke(messages) | |
| def assistant(state: MessagesState): | |
| """Assistant node with enhanced error handling""" | |
| try: | |
| messages = state["messages"] | |
| response = llm_with_tools(messages) | |
| return {"messages": [response]} | |
| except Exception as e: | |
| print(f"Assistant error: {e}") | |
| fallback_response = AIMessage(content="I encountered an error processing your request. Let me try a simpler approach.") | |
| return {"messages": [fallback_response]} | |
| def retriever(state: MessagesState): | |
| """Enhanced retriever with context injection""" | |
| messages = state["messages"] | |
| user_query = messages[-1].content if messages else "" | |
| context_messages = [sys_msg] | |
| # Add similar question context if available | |
| if vector_store: | |
| try: | |
| similar = vector_store.similarity_search(user_query, k=1) | |
| if similar: | |
| context_msg = HumanMessage( | |
| content=f"Here's a similar example:\n{similar[0].page_content[:500]}" | |
| ) | |
| context_messages.append(context_msg) | |
| except Exception as e: | |
| print(f"Retriever error: {e}") | |
| return {"messages": context_messages + messages} | |
| # Build simplified graph (without complex tool routing for now) | |
| builder = StateGraph(MessagesState) | |
| builder.add_node("retriever", retriever) | |
| builder.add_node("assistant", assistant) | |
| # Simple linear flow | |
| builder.add_edge(START, "retriever") | |
| builder.add_edge("retriever", "assistant") | |
| return builder.compile() |