Spaces:
Runtime error
Runtime error
import os | |
from transformers import pipeline | |
import gradio as gr | |
import requests | |
import inspect | |
import pandas as pd | |
from smolagents import CodeAgent | |
from smolagents.tools import PythonInterpreterTool | |
import json | |
import tempfile | |
import urllib.parse | |
from pathlib import Path | |
from duckduckgo_search import DDGS | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
class HfApiModel: | |
""" | |
Simple wrapper for Hugging Face pipeline as a replacement for smolagents.HfApiModel | |
""" | |
def __init__(self, model_id: str, token: str = None): | |
self.model_id = model_id | |
self.token = token or os.getenv("HUGGINGFACE_INFERENCE_TOKEN") | |
self.pipe = pipeline("text-generation", model=model_id, token=self.token) | |
def __call__(self, prompt: str) -> str: | |
outputs = self.pipe(prompt, max_new_tokens=512, do_sample=True) | |
return outputs[0]["generated_text"] | |
class DuckDuckGoSearchTool: | |
name = "duckduckgo_search" | |
description = "Use DuckDuckGo to search the web." | |
def __call__(self, query: str) -> str: | |
try: | |
results = [] | |
with DDGS() as ddgs: | |
for r in ddgs.text(query, max_results=3): | |
results.append(f"Title: {r['title']}\nURL: {r['href']}\nSnippet: {r['body']}\n---") | |
return "\n".join(results) if results else "No results found." | |
except Exception as e: | |
return f"Error using DuckDuckGoSearchTool: {e}" | |
# --- Custom Tools --- | |
class SerperSearchTool: | |
"""Enhanced search tool using Serper API for more reliable results""" | |
name = "serper_search" | |
description = "Search the web using Serper API. Use this for finding current information, facts, and data." | |
def __init__(self): | |
self.api_key = os.getenv("SERPER_API_KEY") | |
if not self.api_key: | |
print("Warning: SERPER_API_KEY not found, falling back to DuckDuckGo") | |
def __call__(self, query: str) -> str: | |
"""Search the web and return formatted results""" | |
if not self.api_key: | |
return f"Search query: {query} - API key not available" | |
try: | |
url = "https://google.serper.dev/search" | |
payload = json.dumps({ | |
"q": query, | |
"num": 5 | |
}) | |
headers = { | |
'X-API-KEY': self.api_key, | |
'Content-Type': 'application/json' | |
} | |
response = requests.post(url, headers=headers, data=payload, timeout=10) | |
response.raise_for_status() | |
data = response.json() | |
results = [] | |
if 'organic' in data: | |
for item in data['organic'][:3]: | |
results.append(f"Title: {item.get('title', 'N/A')}") | |
results.append(f"Content: {item.get('snippet', 'N/A')}") | |
results.append(f"URL: {item.get('link', 'N/A')}") | |
results.append("---") | |
if 'answerBox' in data: | |
answer = data['answerBox'] | |
results.insert(0, f"Answer: {answer.get('answer', answer.get('snippet', 'N/A'))}") | |
results.insert(1, "---") | |
return "\n".join(results) if results else f"No results found for: {query}" | |
except Exception as e: | |
print(f"Serper search error: {e}") | |
return f"Search error for '{query}': {str(e)}" | |
class MathCalculatorTool: | |
"""Tool for mathematical calculations and computations""" | |
name = "math_calculator" | |
description = "Perform mathematical calculations, solve equations, and handle numerical computations." | |
def __call__(self, expression: str) -> str: | |
"""Safely evaluate mathematical expressions""" | |
try: | |
# Import math functions for calculations | |
import math | |
import operator | |
# Safe evaluation context | |
safe_dict = { | |
"abs": abs, "round": round, "min": min, "max": max, | |
"sum": sum, "pow": pow, "sqrt": math.sqrt, | |
"sin": math.sin, "cos": math.cos, "tan": math.tan, | |
"log": math.log, "log10": math.log10, "exp": math.exp, | |
"pi": math.pi, "e": math.e | |
} | |
# Clean the expression | |
expression = expression.replace("^", "**") # Handle exponents | |
result = eval(expression, {"__builtins__": {}}, safe_dict) | |
return f"Result: {result}" | |
except Exception as e: | |
return f"Math calculation error: {str(e)}" | |
class FileProcessorTool: | |
"""Tool for processing various file formats""" | |
name = "file_processor" | |
description = "Process and extract information from files (text, CSV, JSON, etc.)" | |
def __call__(self, file_path: str, action: str = "read") -> str: | |
"""Process files based on action type""" | |
try: | |
if not os.path.exists(file_path): | |
return f"File not found: {file_path}" | |
file_ext = Path(file_path).suffix.lower() | |
if file_ext in ['.txt', '.md']: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
return f"File content ({len(content)} chars):\n{content[:1000]}..." | |
elif file_ext == '.csv': | |
import pandas as pd | |
df = pd.read_csv(file_path) | |
return f"CSV file with {len(df)} rows and {len(df.columns)} columns:\n{df.head().to_string()}" | |
elif file_ext == '.json': | |
with open(file_path, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
return f"JSON data:\n{json.dumps(data, indent=2)[:1000]}..." | |
else: | |
return f"Unsupported file type: {file_ext}" | |
except Exception as e: | |
return f"File processing error: {str(e)}" | |
# --- Enhanced Agent Definition --- | |
class GAIAAgent: | |
def __init__(self): | |
"""Initialize the GAIA agent with tools and model""" | |
print("Initializing GAIA Agent...") | |
# Initialize model | |
try: | |
hf_token = os.getenv("HUGGINGFACE_INFERENCE_TOKEN") | |
if not hf_token: | |
print("Warning: HUGGINGFACE_INFERENCE_TOKEN not found") | |
# Use a good model for reasoning | |
model = HfApiModel( | |
model_id="meta-llama/Llama-3.1-70B-Instruct", | |
token=hf_token | |
) | |
# Initialize tools | |
self.tools = [ | |
SerperSearchTool(), | |
PythonInterpreterTool(), | |
MathCalculatorTool(), | |
FileProcessorTool(), | |
DuckDuckGoSearchTool() # Backup search | |
] | |
# Initialize the agent | |
self.agent = CodeAgent( | |
tools=self.tools, | |
model=model, | |
max_steps=10, | |
verbosity_level=1 | |
) | |
print("GAIA Agent initialized successfully with tools:", [tool.name for tool in self.tools]) | |
except Exception as e: | |
print(f"Error initializing GAIA Agent: {e}") | |
# Fallback to basic setup | |
try: | |
model = HfApiModel(model_id="microsoft/DialoGPT-medium") | |
self.agent = CodeAgent(tools=[PythonInterpreterTool()], model=model) | |
print("Fallback agent initialized") | |
except Exception as fallback_error: | |
print(f"Fallback initialization failed: {fallback_error}") | |
self.agent = None | |
def __call__(self, question: str) -> str: | |
"""Process a question using the GAIA agent""" | |
print(f"Processing question: {question[:100]}...") | |
if not self.agent: | |
return "Agent initialization failed. Please check your configuration." | |
try: | |
# Enhanced prompt for better reasoning | |
enhanced_prompt = f""" | |
You are an AI assistant designed to answer questions accurately and thoroughly. | |
You have access to web search, Python interpreter, math calculator, and file processing tools. | |
Question: {question} | |
Please think step by step: | |
1. Analyze what type of question this is | |
2. Determine what tools or information you need | |
3. Use appropriate tools to gather information | |
4. Reason through the problem | |
5. Provide a clear, accurate answer | |
If the question requires: | |
- Current information or facts: Use search tools | |
- Calculations: Use the math calculator or Python interpreter | |
- File analysis: Use the file processor tool | |
- Multi-step reasoning: Break it down systematically | |
Answer:""" | |
# Run the agent | |
result = self.agent.run(enhanced_prompt) | |
# Extract the final answer if it's structured | |
if isinstance(result, dict) and 'output' in result: | |
answer = result['output'] | |
else: | |
answer = str(result) | |
# Clean up the answer | |
if "Answer:" in answer: | |
answer = answer.split("Answer:")[-1].strip() | |
print(f"Agent response: {answer[:100]}...") | |
return answer | |
except Exception as e: | |
error_msg = f"Error processing question: {str(e)}" | |
print(error_msg) | |
# Fallback to basic response | |
try: | |
basic_response = f"I encountered an error while processing this question: {question}. Error: {str(e)}" | |
return basic_response | |
except: | |
return "Unable to process this question due to technical difficulties." | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the GAIA Agent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") | |
if profile: | |
username = f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent | |
try: | |
agent = GAIAAgent() | |
if not agent.agent: | |
return "Failed to initialize GAIA Agent. Please check your tokens and try again.", None | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# Agent code URL | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local" | |
print(f"Agent code: {agent_code}") | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run GAIA Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running GAIA agent on {len(questions_data)} questions...") | |
for i, item in enumerate(questions_data): | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
print(f"Processing question {i+1}/{len(questions_data)}: {task_id}") | |
submitted_answer = agent(question_text) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, | |
"Submitted Answer": submitted_answer[:200] + "..." if len(submitted_answer) > 200 else submitted_answer | |
}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
error_answer = f"AGENT ERROR: {e}" | |
answers_payload.append({"task_id": task_id, "submitted_answer": error_answer}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, | |
"Submitted Answer": error_answer | |
}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=120) # Increased timeout | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface --- | |
with gr.Blocks(title="GAIA Agent Evaluation") as demo: | |
gr.Markdown("# GAIA Benchmark Agent Evaluation") | |
gr.Markdown( | |
""" | |
**Enhanced GAIA Agent with Multiple Tools:** | |
- ๐ Web Search (Serper API + DuckDuckGo fallback) | |
- ๐ Python Interpreter for calculations | |
- ๐งฎ Mathematical calculator | |
- ๐ File processor for various formats | |
- ๐ง Advanced reasoning with Llama-3.1-70B | |
**Instructions:** | |
1. Make sure you have SERPER_API_KEY and HUGGINGFACE_INFERENCE_TOKEN set | |
2. Log in to your Hugging Face account | |
3. Click 'Run GAIA Evaluation' to start the benchmark | |
**Target:** >40% accuracy on GAIA benchmark questions | |
""" | |
) | |
gr.LoginButton() | |
run_button = gr.Button("๐ Run GAIA Evaluation & Submit", variant="primary") | |
status_output = gr.Textbox( | |
label="Evaluation Status & Results", | |
lines=6, | |
interactive=False, | |
placeholder="Click the button above to start evaluation..." | |
) | |
results_table = gr.DataFrame( | |
label="Questions and Agent Responses", | |
wrap=True, | |
interactive=False | |
) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "="*50) | |
print("๐ค GAIA Agent Evaluation System Starting") | |
print("="*50) | |
# Check environment variables | |
serper_key = os.getenv("SERPER_API_KEY") | |
hf_token = os.getenv("HUGGINGFACE_INFERENCE_TOKEN") | |
space_id = os.getenv("SPACE_ID") | |
print(f"โ SERPER_API_KEY: {'Found' if serper_key else 'Missing (will use fallback search)'}") | |
print(f"โ HF_TOKEN: {'Found' if hf_token else 'Missing (required for model access)'}") | |
print(f"โ SPACE_ID: {space_id if space_id else 'Not found (running locally)'}") | |
if space_id: | |
print(f"๐ Space URL: https://huggingface.co/spaces/{space_id}") | |
print("="*50) | |
print("๐ฏ Target: >40% accuracy on GAIA benchmark") | |
print("๐ ๏ธ Tools: Search, Python, Math, File Processing") | |
print("๐ง Model: Llama-3.1-70B-Instruct") | |
print("="*50 + "\n") | |
demo.launch(debug=True, share=False) |