Spaces:
Runtime error
Runtime error
# app.py | |
import os | |
import gradio as gr | |
import requests | |
import pandas as pd | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
import torch | |
from smolagents import CodeAgent, tool | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# --- Simple Web Search Tool --- | |
def simple_search(query: str) -> str: | |
""" | |
Performs a DuckDuckGo search and returns the top 3 results. | |
Args: | |
query (str): The search query text. | |
Returns: | |
str: Titles and links of the top 3 search results. | |
""" | |
try: | |
resp = requests.get( | |
"https://html.duckduckgo.com/html/", | |
params={"q": query}, | |
timeout=10 | |
) | |
resp.raise_for_status() | |
from bs4 import BeautifulSoup | |
soup = BeautifulSoup(resp.text, "html.parser") | |
items = soup.select("a.result__a")[:3] | |
return "\n\n".join(f"{a.get_text()}\n{a['href']}" for a in items) or "No results found." | |
except Exception as e: | |
return f"Search error: {e}" | |
# --- Wikipedia Search Tool --- | |
def wikipedia_search(query: str) -> str: | |
""" | |
Searches Wikipedia for information. | |
Args: | |
query (str): The search query text. | |
Returns: | |
str: Wikipedia search results. | |
""" | |
try: | |
import wikipedia | |
wikipedia.set_lang("en") | |
results = wikipedia.search(query, results=3) | |
if not results: | |
return "No Wikipedia results found." | |
summaries = [] | |
for title in results[:2]: # Get top 2 results | |
try: | |
page = wikipedia.page(title) | |
summary = wikipedia.summary(title, sentences=3) | |
summaries.append(f"**{title}**\n{summary}\nURL: {page.url}") | |
except: | |
continue | |
return "\n\n".join(summaries) if summaries else "No detailed results found." | |
except Exception as e: | |
return f"Wikipedia search error: {e}" | |
# --- Calculator Tool --- | |
def calculator(expression: str) -> str: | |
""" | |
Evaluates mathematical expressions safely. | |
Args: | |
expression (str): Mathematical expression to evaluate. | |
Returns: | |
str: Result of the calculation. | |
""" | |
try: | |
# Basic safety check | |
allowed_chars = set('0123456789+-*/.() ') | |
if not all(c in allowed_chars for c in expression): | |
return "Error: Invalid characters in expression" | |
result = eval(expression) | |
return str(result) | |
except Exception as e: | |
return f"Calculation error: {e}" | |
# --- Custom HuggingFace Model Wrapper --- | |
class HuggingFaceModel: | |
def __init__(self, model_name="microsoft/DialoGPT-small"): | |
""" | |
Initialize with a lightweight model that fits in 16GB RAM | |
""" | |
print(f"Loading model: {model_name}") | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
try: | |
# Use a smaller, more efficient model | |
self.tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side="left") | |
if self.tokenizer.pad_token is None: | |
self.tokenizer.pad_token = self.tokenizer.eos_token | |
self.model = AutoModelForCausalLM.from_pretrained( | |
model_name, | |
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, | |
device_map="auto" if self.device == "cuda" else None, | |
trust_remote_code=True | |
) | |
if self.device == "cpu": | |
self.model = self.model.to(self.device) | |
print(f"Model loaded successfully on {self.device}") | |
except Exception as e: | |
print(f"Error loading model: {e}") | |
# Fallback to an even smaller model | |
print("Falling back to distilgpt2...") | |
self.tokenizer = AutoTokenizer.from_pretrained("distilgpt2") | |
self.tokenizer.pad_token = self.tokenizer.eos_token | |
self.model = AutoModelForCausalLM.from_pretrained("distilgpt2") | |
if self.device == "cuda": | |
self.model = self.model.to(self.device) | |
def generate(self, prompt: str, max_length: int = 512) -> str: | |
""" | |
Generate text response from the model | |
""" | |
try: | |
# Encode the prompt | |
inputs = self.tokenizer.encode(prompt, return_tensors="pt", truncate=True, max_length=400) | |
if self.device == "cuda": | |
inputs = inputs.to(self.device) | |
# Generate response | |
with torch.no_grad(): | |
outputs = self.model.generate( | |
inputs, | |
max_length=min(max_length, inputs.size(1) + 200), | |
num_return_sequences=1, | |
temperature=0.7, | |
do_sample=True, | |
pad_token_id=self.tokenizer.eos_token_id, | |
eos_token_id=self.tokenizer.eos_token_id, | |
attention_mask=torch.ones_like(inputs) | |
) | |
# Decode the response | |
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
# Extract only the new part (remove the input prompt) | |
if response.startswith(prompt): | |
response = response[len(prompt):].strip() | |
return response if response else "I need more information to answer this question." | |
except Exception as e: | |
return f"Generation error: {e}" | |
# --- Simple Agent Implementation --- | |
class BasicAgent: | |
def __init__(self): | |
print("BasicAgent initializing with HuggingFace model...") | |
self.model = HuggingFaceModel("microsoft/DialoGPT-medium") # Changed to medium for better performance | |
self.tools = { | |
"search": simple_search, | |
"wikipedia": wikipedia_search, | |
"calculator": calculator | |
} | |
def __call__(self, question: str) -> str: | |
print(f"Question: {question[:60]}...") | |
try: | |
# Simple logic to determine if we need tools | |
question_lower = question.lower() | |
# Check if it's a math question | |
if any(word in question_lower for word in ['calculate', 'compute', 'math', '+', '-', '*', '/', 'sum', 'total']): | |
# Try to extract mathematical expressions | |
import re | |
math_pattern = r'[\d\+\-\*/\.\(\)\s]+' | |
math_matches = re.findall(math_pattern, question) | |
if math_matches: | |
for match in math_matches: | |
if any(op in match for op in ['+', '-', '*', '/']): | |
calc_result = calculator(match.strip()) | |
return f"The calculation result is: {calc_result}" | |
# Check if it needs web search | |
if any(word in question_lower for word in ['current', 'recent', 'latest', 'today', 'news', 'when', 'who', 'what']): | |
# Try Wikipedia first for factual questions | |
if any(word in question_lower for word in ['who is', 'what is', 'born', 'died', 'biography']): | |
wiki_result = wikipedia_search(question) | |
if "No Wikipedia results" not in wiki_result: | |
return wiki_result | |
# Fall back to web search | |
search_result = simple_search(question) | |
if "No results found" not in search_result: | |
return search_result | |
# For other questions, use the language model | |
prompt = f"""Question: {question} | |
Please provide a clear and accurate answer. If you're not sure about something, say so. | |
Answer:""" | |
response = self.model.generate(prompt, max_length=400) | |
# If the response is too short or generic, try to enhance it | |
if len(response.split()) < 5: | |
enhanced_prompt = f"""You are a helpful assistant. Answer this question with specific details: | |
{question} | |
Provide a comprehensive answer:""" | |
response = self.model.generate(enhanced_prompt, max_length=500) | |
return response.strip() if response.strip() else "I need more information to answer this question properly." | |
except Exception as e: | |
return f"Agent error: {e}" | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
if not profile: | |
return "Please log in to Hugging Face to submit answers.", None | |
username = profile.username | |
space_id = os.getenv("SPACE_ID", "") | |
questions_url = f"{DEFAULT_API_URL}/questions" | |
submit_url = f"{DEFAULT_API_URL}/submit" | |
try: | |
agent = BasicAgent() | |
except Exception as e: | |
return f"Agent initialization failed: {e}", None | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
try: | |
r = requests.get(questions_url, timeout=15) | |
r.raise_for_status() | |
questions = r.json() | |
except Exception as e: | |
return f"Error fetching questions: {e}", None | |
logs, answers = [], [] | |
for i, item in enumerate(questions): | |
task_id = item.get("task_id") | |
question = item.get("question") | |
if not task_id or question is None: | |
continue | |
print(f"Processing question {i+1}/{len(questions)}: {task_id}") | |
ans = agent(question) | |
answers.append({"task_id": task_id, "submitted_answer": ans}) | |
logs.append({"Task ID": task_id, "Question": question[:100] + "..." if len(question) > 100 else question, "Submitted Answer": ans[:200] + "..." if len(ans) > 200 else ans}) | |
if not answers: | |
return "Agent produced no answers.", pd.DataFrame(logs) | |
payload = {"username": username, "agent_code": agent_code, "answers": answers} | |
try: | |
resp = requests.post(submit_url, json=payload, timeout=60) | |
resp.raise_for_status() | |
data = resp.json() | |
status = ( | |
f"✅ Submission Successful!\n" | |
f"Score: {data.get('score','N/A')}% " | |
f"({data.get('correct_count','?')}/{data.get('total_attempted','?')})\n" | |
f"{data.get('message','')}" | |
) | |
return status, pd.DataFrame(logs) | |
except Exception as e: | |
return f"Submission failed: {e}", pd.DataFrame(logs) | |
# --- Gradio Interface --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# GAIA Agent Evaluation Runner") | |
gr.Markdown("This agent uses HuggingFace models locally (no API calls) to answer GAIA benchmark questions.") | |
gr.LoginButton() | |
with gr.Row(): | |
run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary") | |
status_box = gr.Textbox(label="Status / Submission Result", lines=8, interactive=False) | |
result_table = gr.DataFrame(label="Questions & Agent Answers", wrap=True) | |
run_button.click(run_and_submit_all, outputs=[status_box, result_table]) | |
if __name__ == "__main__": | |
print("Launching Gradio app...") | |
demo.launch(debug=True, share=False) |