LamiaYT's picture
Deploy GAIA agent
bf833c0
raw
history blame
11.3 kB
# app.py
import os
import gradio as gr
import requests
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch
from smolagents import CodeAgent, tool
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Simple Web Search Tool ---
@tool
def simple_search(query: str) -> str:
"""
Performs a DuckDuckGo search and returns the top 3 results.
Args:
query (str): The search query text.
Returns:
str: Titles and links of the top 3 search results.
"""
try:
resp = requests.get(
"https://html.duckduckgo.com/html/",
params={"q": query},
timeout=10
)
resp.raise_for_status()
from bs4 import BeautifulSoup
soup = BeautifulSoup(resp.text, "html.parser")
items = soup.select("a.result__a")[:3]
return "\n\n".join(f"{a.get_text()}\n{a['href']}" for a in items) or "No results found."
except Exception as e:
return f"Search error: {e}"
# --- Wikipedia Search Tool ---
@tool
def wikipedia_search(query: str) -> str:
"""
Searches Wikipedia for information.
Args:
query (str): The search query text.
Returns:
str: Wikipedia search results.
"""
try:
import wikipedia
wikipedia.set_lang("en")
results = wikipedia.search(query, results=3)
if not results:
return "No Wikipedia results found."
summaries = []
for title in results[:2]: # Get top 2 results
try:
page = wikipedia.page(title)
summary = wikipedia.summary(title, sentences=3)
summaries.append(f"**{title}**\n{summary}\nURL: {page.url}")
except:
continue
return "\n\n".join(summaries) if summaries else "No detailed results found."
except Exception as e:
return f"Wikipedia search error: {e}"
# --- Calculator Tool ---
@tool
def calculator(expression: str) -> str:
"""
Evaluates mathematical expressions safely.
Args:
expression (str): Mathematical expression to evaluate.
Returns:
str: Result of the calculation.
"""
try:
# Basic safety check
allowed_chars = set('0123456789+-*/.() ')
if not all(c in allowed_chars for c in expression):
return "Error: Invalid characters in expression"
result = eval(expression)
return str(result)
except Exception as e:
return f"Calculation error: {e}"
# --- Custom HuggingFace Model Wrapper ---
class HuggingFaceModel:
def __init__(self, model_name="microsoft/DialoGPT-small"):
"""
Initialize with a lightweight model that fits in 16GB RAM
"""
print(f"Loading model: {model_name}")
self.device = "cuda" if torch.cuda.is_available() else "cpu"
try:
# Use a smaller, more efficient model
self.tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side="left")
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
device_map="auto" if self.device == "cuda" else None,
trust_remote_code=True
)
if self.device == "cpu":
self.model = self.model.to(self.device)
print(f"Model loaded successfully on {self.device}")
except Exception as e:
print(f"Error loading model: {e}")
# Fallback to an even smaller model
print("Falling back to distilgpt2...")
self.tokenizer = AutoTokenizer.from_pretrained("distilgpt2")
self.tokenizer.pad_token = self.tokenizer.eos_token
self.model = AutoModelForCausalLM.from_pretrained("distilgpt2")
if self.device == "cuda":
self.model = self.model.to(self.device)
def generate(self, prompt: str, max_length: int = 512) -> str:
"""
Generate text response from the model
"""
try:
# Encode the prompt
inputs = self.tokenizer.encode(prompt, return_tensors="pt", truncate=True, max_length=400)
if self.device == "cuda":
inputs = inputs.to(self.device)
# Generate response
with torch.no_grad():
outputs = self.model.generate(
inputs,
max_length=min(max_length, inputs.size(1) + 200),
num_return_sequences=1,
temperature=0.7,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id,
eos_token_id=self.tokenizer.eos_token_id,
attention_mask=torch.ones_like(inputs)
)
# Decode the response
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# Extract only the new part (remove the input prompt)
if response.startswith(prompt):
response = response[len(prompt):].strip()
return response if response else "I need more information to answer this question."
except Exception as e:
return f"Generation error: {e}"
# --- Simple Agent Implementation ---
class BasicAgent:
def __init__(self):
print("BasicAgent initializing with HuggingFace model...")
self.model = HuggingFaceModel("microsoft/DialoGPT-medium") # Changed to medium for better performance
self.tools = {
"search": simple_search,
"wikipedia": wikipedia_search,
"calculator": calculator
}
def __call__(self, question: str) -> str:
print(f"Question: {question[:60]}...")
try:
# Simple logic to determine if we need tools
question_lower = question.lower()
# Check if it's a math question
if any(word in question_lower for word in ['calculate', 'compute', 'math', '+', '-', '*', '/', 'sum', 'total']):
# Try to extract mathematical expressions
import re
math_pattern = r'[\d\+\-\*/\.\(\)\s]+'
math_matches = re.findall(math_pattern, question)
if math_matches:
for match in math_matches:
if any(op in match for op in ['+', '-', '*', '/']):
calc_result = calculator(match.strip())
return f"The calculation result is: {calc_result}"
# Check if it needs web search
if any(word in question_lower for word in ['current', 'recent', 'latest', 'today', 'news', 'when', 'who', 'what']):
# Try Wikipedia first for factual questions
if any(word in question_lower for word in ['who is', 'what is', 'born', 'died', 'biography']):
wiki_result = wikipedia_search(question)
if "No Wikipedia results" not in wiki_result:
return wiki_result
# Fall back to web search
search_result = simple_search(question)
if "No results found" not in search_result:
return search_result
# For other questions, use the language model
prompt = f"""Question: {question}
Please provide a clear and accurate answer. If you're not sure about something, say so.
Answer:"""
response = self.model.generate(prompt, max_length=400)
# If the response is too short or generic, try to enhance it
if len(response.split()) < 5:
enhanced_prompt = f"""You are a helpful assistant. Answer this question with specific details:
{question}
Provide a comprehensive answer:"""
response = self.model.generate(enhanced_prompt, max_length=500)
return response.strip() if response.strip() else "I need more information to answer this question properly."
except Exception as e:
return f"Agent error: {e}"
def run_and_submit_all(profile: gr.OAuthProfile | None):
if not profile:
return "Please log in to Hugging Face to submit answers.", None
username = profile.username
space_id = os.getenv("SPACE_ID", "")
questions_url = f"{DEFAULT_API_URL}/questions"
submit_url = f"{DEFAULT_API_URL}/submit"
try:
agent = BasicAgent()
except Exception as e:
return f"Agent initialization failed: {e}", None
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
try:
r = requests.get(questions_url, timeout=15)
r.raise_for_status()
questions = r.json()
except Exception as e:
return f"Error fetching questions: {e}", None
logs, answers = [], []
for i, item in enumerate(questions):
task_id = item.get("task_id")
question = item.get("question")
if not task_id or question is None:
continue
print(f"Processing question {i+1}/{len(questions)}: {task_id}")
ans = agent(question)
answers.append({"task_id": task_id, "submitted_answer": ans})
logs.append({"Task ID": task_id, "Question": question[:100] + "..." if len(question) > 100 else question, "Submitted Answer": ans[:200] + "..." if len(ans) > 200 else ans})
if not answers:
return "Agent produced no answers.", pd.DataFrame(logs)
payload = {"username": username, "agent_code": agent_code, "answers": answers}
try:
resp = requests.post(submit_url, json=payload, timeout=60)
resp.raise_for_status()
data = resp.json()
status = (
f"✅ Submission Successful!\n"
f"Score: {data.get('score','N/A')}% "
f"({data.get('correct_count','?')}/{data.get('total_attempted','?')})\n"
f"{data.get('message','')}"
)
return status, pd.DataFrame(logs)
except Exception as e:
return f"Submission failed: {e}", pd.DataFrame(logs)
# --- Gradio Interface ---
with gr.Blocks() as demo:
gr.Markdown("# GAIA Agent Evaluation Runner")
gr.Markdown("This agent uses HuggingFace models locally (no API calls) to answer GAIA benchmark questions.")
gr.LoginButton()
with gr.Row():
run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary")
status_box = gr.Textbox(label="Status / Submission Result", lines=8, interactive=False)
result_table = gr.DataFrame(label="Questions & Agent Answers", wrap=True)
run_button.click(run_and_submit_all, outputs=[status_box, result_table])
if __name__ == "__main__":
print("Launching Gradio app...")
demo.launch(debug=True, share=False)