LamiaYT's picture
Deploy GAIA agent
7963312
raw
history blame
19.3 kB
import os
import gradio as gr
import requests
import inspect
import pandas as pd
import json
import re
import io
import base64
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
# SmolaAgent imports
from smolagents import CodeAgent, tool, DuckDuckGoSearchTool, PythonInterpreterTool
from smolagents.models import LiteLLMModel
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Enhanced Tools for GAIA ---
@tool
def web_search_tool(query: str) -> str:
"""
Search the web for information using DuckDuckGo.
Args:
query: The search query string
Returns:
String containing search results
"""
try:
search_tool = DuckDuckGoSearchTool()
results = search_tool(query)
return str(results)
except Exception as e:
return f"Search failed: {str(e)}"
@tool
def calculator_tool(expression: str) -> str:
"""
Evaluate mathematical expressions safely.
Args:
expression: Mathematical expression as string
Returns:
Result of the calculation
"""
try:
# Safe evaluation - only allow basic math operations
allowed_chars = set('0123456789+-*/.() ')
if not all(c in allowed_chars for c in expression.replace(' ', '')):
return "Error: Expression contains invalid characters"
result = eval(expression)
return str(result)
except Exception as e:
return f"Calculation error: {str(e)}"
@tool
def image_analyzer_tool(image_path: str) -> str:
"""
Analyze images and extract information.
Args:
image_path: Path to the image file
Returns:
Description of image content
"""
try:
if not os.path.exists(image_path):
return "Error: Image file not found"
img = Image.open(image_path)
# Basic image analysis
width, height = img.size
mode = img.mode
format_info = img.format if img.format else "Unknown"
# Simple color analysis
if mode == 'RGB':
colors = img.getcolors(maxcolors=256*256*256)
if colors:
dominant_color = max(colors, key=lambda x: x[0])[1]
color_info = f"Dominant color: RGB{dominant_color}"
else:
color_info = "Complex color palette"
else:
color_info = f"Color mode: {mode}"
analysis = f"""Image Analysis:
- Dimensions: {width}x{height} pixels
- Format: {format_info}
- {color_info}
- File size: {os.path.getsize(image_path)} bytes
"""
return analysis
except Exception as e:
return f"Image analysis error: {str(e)}"
@tool
def file_reader_tool(file_path: str) -> str:
"""
Read and analyze various file types (text, CSV, JSON, etc.).
Args:
file_path: Path to the file
Returns:
File content or analysis
"""
try:
if not os.path.exists(file_path):
return "Error: File not found"
file_ext = Path(file_path).suffix.lower()
if file_ext == '.csv':
df = pd.read_csv(file_path)
return f"CSV file with {len(df)} rows and {len(df.columns)} columns.\nColumns: {list(df.columns)}\nFirst 5 rows:\n{df.head().to_string()}"
elif file_ext == '.json':
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return f"JSON file content:\n{json.dumps(data, indent=2)[:1000]}..."
elif file_ext in ['.txt', '.md', '.py', '.js', '.html', '.css']:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return f"Text file content ({len(content)} characters):\n{content[:1000]}..."
else:
return f"Binary file: {file_ext}, size: {os.path.getsize(file_path)} bytes"
except Exception as e:
return f"File reading error: {str(e)}"
@tool
def data_processor_tool(data: str, operation: str) -> str:
"""
Process data with various operations (sort, filter, calculate statistics).
Args:
data: Data as string (JSON, CSV format, or numbers)
operation: Operation to perform (sort, sum, average, count, etc.)
Returns:
Processed data result
"""
try:
# Try to parse as JSON first
try:
parsed_data = json.loads(data)
except:
# Try to parse as numbers
try:
parsed_data = [float(x.strip()) for x in data.replace(',', ' ').split() if x.strip()]
except:
return "Error: Could not parse data"
if operation.lower() == 'sum' and isinstance(parsed_data, list):
return str(sum([x for x in parsed_data if isinstance(x, (int, float))]))
elif operation.lower() == 'average' and isinstance(parsed_data, list):
nums = [x for x in parsed_data if isinstance(x, (int, float))]
return str(sum(nums) / len(nums) if nums else 0)
elif operation.lower() == 'count':
return str(len(parsed_data))
elif operation.lower() == 'sort' and isinstance(parsed_data, list):
return str(sorted(parsed_data))
elif operation.lower() == 'max' and isinstance(parsed_data, list):
nums = [x for x in parsed_data if isinstance(x, (int, float))]
return str(max(nums) if nums else "No numbers found")
elif operation.lower() == 'min' and isinstance(parsed_data, list):
nums = [x for x in parsed_data if isinstance(x, (int, float))]
return str(min(nums) if nums else "No numbers found")
else:
return f"Unsupported operation: {operation}"
except Exception as e:
return f"Data processing error: {str(e)}"
# --- Enhanced GAIA Agent ---
class GAIAAgent:
def __init__(self):
print("GAIAAgent initialized with SmolaAgent framework.")
# Initialize model - using a lightweight model for resource efficiency
try:
# Use HuggingFace's free inference API or local model
self.model = LiteLLMModel(
model_id="microsoft/DialoGPT-medium", # Lightweight model
max_tokens=512,
temperature=0.1
)
except:
# Fallback to a basic model
print("Warning: Using fallback model configuration")
self.model = None
# Initialize tools
self.tools = [
web_search_tool,
calculator_tool,
image_analyzer_tool,
file_reader_tool,
data_processor_tool,
PythonInterpreterTool()
]
# Initialize the agent
try:
self.agent = CodeAgent(
tools=self.tools,
model=self.model,
max_iterations=5,
verbosity_level=1
)
except Exception as e:
print(f"Agent initialization error: {e}")
self.agent = None
def __call__(self, question: str) -> str:
print(f"GAIAAgent processing question: {question[:100]}...")
if not self.agent:
# Fallback logic if agent failed to initialize
return self._fallback_processing(question)
try:
# Enhanced prompt for GAIA tasks
enhanced_prompt = f"""
You are a helpful AI assistant designed to solve complex real-world problems that may require:
- Web searching for current information
- Mathematical calculations
- Image analysis
- File processing
- Multi-step reasoning
Question: {question}
Please approach this systematically:
1. Analyze what type of problem this is
2. Determine what tools/information you need
3. Use available tools to gather information
4. Reason through the problem step by step
5. Provide a clear, concise final answer
Remember to be precise and factual in your response.
"""
response = self.agent.run(enhanced_prompt)
# Extract the final answer if it's in the response
if isinstance(response, str):
# Look for common answer patterns
answer_patterns = [
r"Final answer:?\s*(.+)",
r"Answer:?\s*(.+)",
r"The answer is:?\s*(.+)",
r"Result:?\s*(.+)"
]
for pattern in answer_patterns:
match = re.search(pattern, response, re.IGNORECASE)
if match:
return match.group(1).strip()
# If no pattern found, return the last sentence or the whole response
sentences = response.split('.')
return sentences[-1].strip() if sentences else response
return str(response)
except Exception as e:
print(f"Error in agent processing: {e}")
return self._fallback_processing(question)
def _fallback_processing(self, question: str) -> str:
"""Fallback processing when main agent fails"""
try:
# Simple heuristic-based processing
question_lower = question.lower()
# Math questions
if any(op in question for op in ['+', '-', '*', '/', 'calculate', 'sum', 'average']):
# Extract numbers and try basic calculation
numbers = re.findall(r'-?\d+\.?\d*', question)
if len(numbers) >= 2:
try:
if 'sum' in question_lower or '+' in question:
result = sum(float(n) for n in numbers)
return str(result)
elif 'average' in question_lower:
result = sum(float(n) for n in numbers) / len(numbers)
return str(result)
except:
pass
# Search-based questions
if any(word in question_lower for word in ['what', 'who', 'when', 'where', 'how', 'why']):
try:
search_result = web_search_tool(question)
# Extract key information from search results
lines = search_result.split('\n')
relevant_lines = [line for line in lines if len(line.strip()) > 20]
return relevant_lines[0] if relevant_lines else "Unable to find specific information"
except:
pass
# Default response
return "I need more context or tools to answer this question accurately."
except Exception as e:
return f"Processing error: {str(e)}"
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the GAIAAgent on them, submits all answers,
and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID")
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent
try:
agent = GAIAAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run GAIA Agent
results_log = []
answers_payload = []
print(f"Running GAIA agent on {len(questions_data)} questions...")
for i, item in enumerate(questions_data):
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
print(f"Processing question {i+1}/{len(questions_data)}: {task_id}")
try:
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text[:100] + "..." if len(question_text) > 100 else question_text,
"Submitted Answer": submitted_answer
})
print(f"Answer for {task_id}: {submitted_answer[:50]}...")
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
error_answer = f"AGENT ERROR: {e}"
answers_payload.append({"task_id": task_id, "submitted_answer": error_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text[:100] + "..." if len(question_text) > 100 else question_text,
"Submitted Answer": error_answer
})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"GAIA Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# GAIA Agent Evaluation Runner")
gr.Markdown(
"""
**Enhanced GAIA Agent with SmolaAgent Framework**
This agent is equipped with:
- ๐Ÿ” Web search capabilities (DuckDuckGo)
- ๐Ÿงฎ Mathematical calculator
- ๐Ÿ–ผ๏ธ Image analysis
- ๐Ÿ“ File processing (CSV, JSON, text files)
- ๐Ÿ“Š Data processing and statistics
- ๐Ÿ Python code execution
**Instructions:**
1. Log in to your Hugging Face account using the button below
2. Click 'Run GAIA Evaluation & Submit All Answers' to start the evaluation
3. The agent will process each question systematically using available tools
**Note:** Processing may take time as the agent analyzes each question thoroughly.
"""
)
gr.LoginButton()
run_button = gr.Button("Run GAIA Evaluation & Submit All Answers", variant="primary")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " GAIA Agent Starting " + "-"*30)
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID")
if space_host_startup:
print(f"โœ… SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("โ„น๏ธ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup:
print(f"โœ… SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("โ„น๏ธ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print("-"*(60 + len(" GAIA Agent Starting ")) + "\n")
print("Launching Gradio Interface for GAIA Agent Evaluation...")
demo.launch(debug=True, share=False)