LamiaYT's picture
Fixing
dfcd4f6
raw
history blame
23 kB
import os
import gradio as gr
import requests
import inspect
import pandas as pd
import json
import re
import time
from typing import List, Dict, Any, Optional
from datetime import datetime
import threading
import queue
from ctransformers import AutoModelForCausalLM
import logging
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
class WebSearchTool:
"""Web search tool using Serper API for real-time information retrieval"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://google.serper.dev/search"
def search(self, query: str, num_results: int = 5) -> Dict[str, Any]:
"""Perform web search and return structured results"""
try:
headers = {
'X-API-KEY': self.api_key,
'Content-Type': 'application/json'
}
payload = {
'q': query,
'num': num_results,
'gl': 'us',
'hl': 'en'
}
response = requests.post(self.base_url, json=payload, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
# Extract and format results
results = []
if 'organic' in data:
for item in data['organic'][:num_results]:
results.append({
'title': item.get('title', ''),
'snippet': item.get('snippet', ''),
'link': item.get('link', ''),
'position': item.get('position', 0)
})
return {
'success': True,
'results': results,
'query': query,
'total_results': len(results)
}
except Exception as e:
logger.error(f"Web search error: {e}")
return {
'success': False,
'error': str(e),
'results': [],
'query': query,
'total_results': 0
}
class CalculatorTool:
"""Enhanced calculator tool for mathematical operations"""
def calculate(self, expression: str) -> Dict[str, Any]:
"""Safely evaluate mathematical expressions"""
try:
# Clean the expression
expression = expression.strip()
# Replace common mathematical functions
expression = expression.replace('^', '**') # Power operator
expression = re.sub(r'\b(\d+)x(\d+)\b', r'\1*\2', expression) # Handle multiplication like 5x3
# Allow only safe mathematical operations
allowed_chars = set('0123456789+-*/().,eE pi')
allowed_funcs = ['abs', 'round', 'min', 'max', 'sum', 'pow', 'sqrt']
# Basic safety check
if any(char.isalpha() and char not in 'pie' for char in expression):
# Check if it contains allowed function names
import math
safe_dict = {
"__builtins__": {},
"abs": abs, "round": round, "min": min, "max": max,
"sum": sum, "pow": pow, "sqrt": math.sqrt,
"pi": math.pi, "e": math.e,
"sin": math.sin, "cos": math.cos, "tan": math.tan,
"log": math.log, "log10": math.log10,
"exp": math.exp, "floor": math.floor, "ceil": math.ceil
}
result = eval(expression, safe_dict)
else:
result = eval(expression)
return {
'success': True,
'result': result,
'expression': expression
}
except Exception as e:
logger.error(f"Calculator error: {e}")
return {
'success': False,
'error': str(e),
'expression': expression,
'result': None
}
class LocalLLMManager:
"""Manages local quantized LLM for reasoning"""
def __init__(self):
self.model = None
self.model_loaded = False
self.load_lock = threading.Lock()
def load_model(self):
"""Load quantized model optimized for CPU inference"""
with self.load_lock:
if self.model_loaded:
return
try:
logger.info("Loading quantized model...")
# Use Phi-3-mini for better performance on CPU with limited resources
self.model = AutoModelForCausalLM.from_pretrained(
"microsoft/Phi-3-mini-4k-instruct-gguf",
model_file="Phi-3-mini-4k-instruct-q4.gguf",
model_type="phi3",
gpu_layers=0, # CPU only
context_length=3072, # Reduced context to save memory
max_new_tokens=512,
temperature=0.1,
top_p=0.9,
repetition_penalty=1.1
)
self.model_loaded = True
logger.info("Model loaded successfully")
except Exception as e:
logger.error(f"Error loading model: {e}")
# Fallback to a smaller model if Phi-3 fails
try:
logger.info("Trying fallback model...")
self.model = AutoModelForCausalLM.from_pretrained(
"TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF",
model_file="tinyllama-1.1b-chat-v1.0.q4_k_m.gguf",
model_type="llama",
gpu_layers=0,
context_length=2048,
max_new_tokens=256
)
self.model_loaded = True
logger.info("Fallback model loaded successfully")
except Exception as e2:
logger.error(f"Fallback model also failed: {e2}")
raise
def generate(self, prompt: str, max_tokens: int = 256) -> str:
"""Generate response from local model"""
if not self.model_loaded:
self.load_model()
if not self.model:
return "Error: Model not available"
try:
# Format prompt for Phi-3
formatted_prompt = f"<|user|>\n{prompt}<|end|>\n<|assistant|>\n"
response = self.model(
formatted_prompt,
max_new_tokens=min(max_tokens, 256), # Limit tokens for speed
temperature=0.1,
stop=["<|end|>", "<|user|>"]
)
# Clean response
response = response.replace(formatted_prompt, "").strip()
if "<|end|>" in response:
response = response.split("<|end|>")[0].strip()
return response
except Exception as e:
logger.error(f"Generation error: {e}")
return f"Error generating response: {e}"
class GAIAAgent:
"""Advanced GAIA agent with reasoning, tools, and multi-step problem solving"""
def __init__(self):
# Initialize tools
self.serper_api_key = os.getenv("SERPER_API_KEY")
if not self.serper_api_key:
logger.warning("SERPER_API_KEY not found. Web search will be disabled.")
self.web_search = None
else:
self.web_search = WebSearchTool(self.serper_api_key)
self.calculator = CalculatorTool()
self.llm = LocalLLMManager()
# Agent configuration
self.max_iterations = 5
self.max_reasoning_length = 1000
logger.info("GAIA Agent initialized")
def _identify_question_type(self, question: str) -> str:
"""Identify the type of question to determine approach"""
question_lower = question.lower()
if any(word in question_lower for word in ['calculate', 'compute', 'math', '+', '-', '*', '/', '=', 'sum', 'multiply', 'divide']):
return 'mathematical'
elif any(word in question_lower for word in ['current', 'latest', 'recent', 'today', 'now', '2024', '2025']):
return 'current_info'
elif any(word in question_lower for word in ['who', 'what', 'where', 'when', 'why', 'how']):
return 'factual'
elif any(word in question_lower for word in ['analyze', 'compare', 'explain', 'reason']):
return 'analytical'
else:
return 'general'
def _use_web_search(self, query: str) -> str:
"""Use web search tool and format results"""
if not self.web_search:
return "Web search not available (API key missing)"
results = self.web_search.search(query, num_results=3)
if not results['success']:
return f"Search failed: {results.get('error', 'Unknown error')}"
if not results['results']:
return "No search results found"
formatted_results = f"Search results for '{query}':\n"
for i, result in enumerate(results['results'], 1):
formatted_results += f"{i}. {result['title']}\n {result['snippet']}\n\n"
return formatted_results
def _use_calculator(self, expression: str) -> str:
"""Use calculator tool and format result"""
result = self.calculator.calculate(expression)
if result['success']:
return f"Calculation: {result['expression']} = {result['result']}"
else:
return f"Calculation error: {result['error']}"
def _generate_reasoning(self, question: str, context: str = "") -> str:
"""Generate reasoning step using local LLM"""
reasoning_prompt = f"""Question: {question}
Context: {context}
Think step by step about this question. Consider:
1. What information do I need?
2. What tools might help?
3. How should I approach this problem?
Provide a clear reasoning step:"""
try:
reasoning = self.llm.generate(reasoning_prompt, max_tokens=200)
return reasoning
except Exception as e:
logger.error(f"Reasoning generation error: {e}")
return "Unable to generate reasoning step"
def _generate_final_answer(self, question: str, context: str, reasoning_steps: List[str]) -> str:
"""Generate final answer using all available information"""
all_reasoning = "\n".join([f"Step {i+1}: {step}" for i, step in enumerate(reasoning_steps)])
answer_prompt = f"""Question: {question}
Context and Information:
{context}
Reasoning Steps:
{all_reasoning}
Based on all the information and reasoning above, provide a clear, concise, and accurate final answer to the question:"""
try:
answer = self.llm.generate(answer_prompt, max_tokens=200)
return answer.strip()
except Exception as e:
logger.error(f"Answer generation error: {e}")
return "Unable to generate final answer"
def __call__(self, question: str) -> str:
"""Main agent execution method"""
logger.info(f"Processing question: {question[:100]}...")
try:
# Initialize
context = ""
reasoning_steps = []
question_type = self._identify_question_type(question)
logger.info(f"Question type identified: {question_type}")
# Step 1: Initial reasoning
initial_reasoning = self._generate_reasoning(question)
reasoning_steps.append(initial_reasoning)
context += f"Initial reasoning: {initial_reasoning}\n\n"
# Step 2: Apply tools based on question type
if question_type == 'mathematical':
# Try to extract mathematical expressions
math_matches = re.findall(r'[\d\+\-\*/\(\)\.\s\^]+', question)
for match in math_matches:
if len(match.strip()) > 3: # Avoid single digits
calc_result = self._use_calculator(match.strip())
context += f"Calculation: {calc_result}\n"
elif question_type in ['current_info', 'factual']:
# Use web search for factual or current information
search_result = self._use_web_search(question)
context += f"Web search results: {search_result}\n"
# Step 3: Additional reasoning with context
if context:
additional_reasoning = self._generate_reasoning(question, context)
reasoning_steps.append(additional_reasoning)
context += f"Additional reasoning: {additional_reasoning}\n\n"
# Step 4: Generate final answer
final_answer = self._generate_final_answer(question, context, reasoning_steps)
logger.info(f"Generated answer: {final_answer[:100]}...")
return final_answer
except Exception as e:
logger.error(f"Agent execution error: {e}")
return f"Error processing question: {str(e)}"
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the GAIA Agent on them, submits all answers,
and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent
try:
print("Initializing GAIA Agent...")
agent = GAIAAgent()
print("GAIA Agent initialized successfully")
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
# Agent code link
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(f"Agent code: {agent_code}")
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run GAIA Agent
results_log = []
answers_payload = []
print(f"Running GAIA agent on {len(questions_data)} questions...")
for i, item in enumerate(questions_data):
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
print(f"Processing question {i+1}/{len(questions_data)}: {task_id}")
try:
start_time = time.time()
submitted_answer = agent(question_text)
processing_time = time.time() - start_time
print(f"Question {task_id} processed in {processing_time:.2f}s")
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text[:100] + "..." if len(question_text) > 100 else question_text,
"Submitted Answer": submitted_answer[:200] + "..." if len(submitted_answer) > 200 else submitted_answer,
"Processing Time (s)": f"{processing_time:.2f}"
})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({
"Task ID": task_id,
"Question": question_text[:100] + "..." if len(question_text) > 100 else question_text,
"Submitted Answer": f"AGENT ERROR: {e}",
"Processing Time (s)": "Error"
})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=120)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks(title="GAIA Agent Evaluation") as demo:
gr.Markdown("# GAIA Agent Evaluation Runner")
gr.Markdown(
"""
**Advanced GAIA Agent Features:**
- ๐Ÿง  Local quantized LLM for reasoning (Phi-3-mini optimized for CPU)
- ๐Ÿ” Web search capabilities via Serper API
- ๐Ÿงฎ Mathematical calculation tools
- ๐ŸŽฏ Multi-step problem solving approach
- ๐Ÿš€ Optimized for 16GB RAM / 2 vCPU constraints
**Instructions:**
1. Ensure your SERPER_API_KEY environment variable is set for web search
2. Log in to your Hugging Face account using the button below
3. Click 'Run GAIA Evaluation' to start the comprehensive evaluation
**Note:** Initial model loading may take 1-2 minutes. Subsequent questions will be processed faster.
"""
)
gr.LoginButton()
run_button = gr.Button("๐Ÿš€ Run GAIA Evaluation & Submit All Answers", variant="primary")
status_output = gr.Textbox(label="๐Ÿ“Š Evaluation Status & Results", lines=8, interactive=False)
results_table = gr.DataFrame(label="๐Ÿ“‹ Detailed Question Results", wrap=True)
# Add system info
with gr.Accordion("๐Ÿ”ง System Information", open=False):
gr.Markdown(f"""
- **Environment**: Hugging Face Space
- **Resources**: 16GB RAM, 2 vCPU
- **Model**: Phi-3-mini-4k-instruct (quantized)
- **Web Search**: {'โœ… Enabled' if os.getenv('SERPER_API_KEY') else 'โŒ Disabled (no API key)'}
- **Calculator**: โœ… Enabled
- **Timestamp**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}
""")
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "="*70)
print("๐Ÿš€ GAIA AGENT EVALUATION SYSTEM STARTING")
print("="*70)
# Environment check
space_host = os.getenv("SPACE_HOST")
space_id = os.getenv("SPACE_ID")
serper_key = os.getenv("SERPER_API_KEY")
if space_host:
print(f"โœ… SPACE_HOST: {space_host}")
print(f" ๐ŸŒ Runtime URL: https://{space_host}.hf.space")
else:
print("โ„น๏ธ Running locally (SPACE_HOST not found)")
if space_id:
print(f"โœ… SPACE_ID: {space_id}")
print(f" ๐Ÿ“ Repo URL: https://huggingface.co/spaces/{space_id}")
else:
print("โ„น๏ธ SPACE_ID not found")
if serper_key:
print("โœ… SERPER_API_KEY: Configured")
else:
print("โš ๏ธ SERPER_API_KEY: Not found - Web search will be disabled")
print("="*70)
print("๐Ÿ“š GAIA Agent Features:")
print(" ๐Ÿง  Local LLM reasoning")
print(" ๐Ÿ” Web search integration")
print(" ๐Ÿงฎ Mathematical calculations")
print(" ๐ŸŽฏ Multi-step problem solving")
print("="*70 + "\n")
print("๐ŸŽฏ Launching GAIA Agent Evaluation Interface...")
demo.launch(debug=True, share=False)