Spaces:
Runtime error
Runtime error
# app.py - Improved GAIA Agent with GPT-NeoX-20B + LoRA | |
from llama_index.llms.huggingface import HuggingFaceLLM | |
from llama_index.core.agent import ReActAgent | |
from llama_index.core.tools import FunctionTool | |
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig | |
from peft import LoraConfig, get_peft_model | |
import os | |
import gradio as gr | |
import requests | |
import pandas as pd | |
import traceback | |
import torch | |
import re | |
import json | |
# Import real tool dependencies | |
try: | |
from duckduckgo_search import DDGS | |
except ImportError: | |
print("Warning: duckduckgo_search not installed. Web search will be limited.") | |
DDGS = None | |
try: | |
from sympy import sympify, solve, simplify, N, symbols | |
from sympy.core.sympify import SympifyError | |
except ImportError: | |
print("Warning: sympy not installed. Math calculator will be limited.") | |
sympify = None | |
SympifyError = Exception | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
def print_trainable_parameters(model): | |
"""Print trainable parameters info""" | |
trainable_parameters = 0 | |
all_parameters = 0 | |
for _, param in model.named_parameters(): | |
all_parameters += param.numel() | |
if param.requires_grad: | |
trainable_parameters += param.numel() | |
print( | |
f"Trainable: {trainable_parameters} || All: {all_parameters} || Trainable %: {100 * trainable_parameters / all_parameters:.2f}%" | |
) | |
class ImprovedGAIAAgent: | |
def __init__(self): | |
print("🚀 Initializing Improved GAIA Agent with GPT-NeoX-20B...") | |
if not torch.cuda.is_available(): | |
raise RuntimeError("❌ CUDA required for GPT-NeoX-20B. Please use a GPU environment.") | |
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9 | |
print(f"🔥 GPU Memory: {gpu_memory:.1f}GB") | |
# Model configuration | |
self.model_name = "EleutherAI/gpt-neox-20b" | |
# 4-bit quantization config for memory efficiency | |
self.bnb_config = BitsAndBytesConfig( | |
load_in_4bit=True, | |
bnb_4bit_use_double_quant=True, | |
bnb_4bit_quant_type="nf4", | |
bnb_4bit_compute_dtype=torch.bfloat16 | |
) | |
# LoRA configuration for efficient fine-tuning capability | |
self.lora_config = LoraConfig( | |
r=16, # Increased for better performance | |
lora_alpha=32, | |
target_modules=["query_key_value", "dense", "dense_h_to_4h", "dense_4h_to_h"], # More comprehensive targets | |
lora_dropout=0.1, | |
bias="none", | |
task_type="CAUSAL_LM" | |
) | |
self.load_model() | |
self.setup_tools() | |
self.create_agent() | |
def load_model(self): | |
"""Load and configure the model""" | |
print("📥 Loading tokenizer...") | |
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
# Add padding token if not present | |
if self.tokenizer.pad_token is None: | |
self.tokenizer.pad_token = self.tokenizer.eos_token | |
print("📥 Loading model with 4-bit quantization...") | |
self.model = AutoModelForCausalLM.from_pretrained( | |
self.model_name, | |
quantization_config=self.bnb_config, | |
device_map="auto", | |
trust_remote_code=True, | |
torch_dtype=torch.bfloat16 | |
) | |
print("🔧 Applying LoRA configuration...") | |
self.model = get_peft_model(self.model, self.lora_config) | |
print_trainable_parameters(self.model) | |
# Create LlamaIndex LLM wrapper | |
print("🔗 Creating LlamaIndex LLM wrapper...") | |
self.llm = HuggingFaceLLM( | |
model=self.model, | |
tokenizer=self.tokenizer, | |
context_window=2048, # GPT-NeoX context length | |
max_new_tokens=512, | |
generate_kwargs={ | |
"temperature": 0.1, | |
"do_sample": True, | |
"top_p": 0.9, | |
"repetition_penalty": 1.1, | |
"pad_token_id": self.tokenizer.eos_token_id, | |
}, | |
# Improved system message for GAIA tasks | |
system_message="""You are a helpful AI assistant that can search the web and perform calculations. | |
When answering questions: | |
1. Think step by step | |
2. Use tools when you need current information or calculations | |
3. Be precise and factual | |
4. For numerical answers, provide exact numbers when possible | |
5. Always show your reasoning | |
Available tools: web_search, math_calculator""" | |
) | |
def setup_tools(self): | |
"""Setup enhanced tools for GAIA benchmark""" | |
self.tools = [ | |
FunctionTool.from_defaults( | |
fn=self.enhanced_web_search, | |
name="web_search", | |
description="Search the web for current information, facts, people, events, or recent data. Use specific keywords." | |
), | |
FunctionTool.from_defaults( | |
fn=self.advanced_calculator, | |
name="math_calculator", | |
description="Perform mathematical calculations, solve equations, handle percentages, averages, and complex math operations." | |
), | |
FunctionTool.from_defaults( | |
fn=self.fact_checker, | |
name="fact_checker", | |
description="Verify facts and get detailed information about people, places, events, or concepts." | |
) | |
] | |
def enhanced_web_search(self, query: str) -> str: | |
"""Enhanced web search with better result processing""" | |
print(f"🔍 Enhanced search: {query}") | |
if not DDGS: | |
return "Web search unavailable - duckduckgo_search not installed" | |
try: | |
with DDGS() as ddgs: | |
# Get both regular results and news if relevant | |
results = list(ddgs.text(query, max_results=8, region='wt-wt')) | |
if not results: | |
return f"No results found for: {query}" | |
# Process and format results | |
formatted_results = [] | |
for i, result in enumerate(results, 1): | |
title = result.get('title', 'No title') | |
body = result.get('body', '').strip() | |
url = result.get('href', '') | |
# Extract key information | |
if len(body) > 300: | |
body = body[:300] + "..." | |
formatted_results.append(f"""Result {i}: {title} | |
Content: {body} | |
Source: {url} | |
""") | |
search_summary = f"Search results for '{query}':\n\n" + "\n".join(formatted_results) | |
# Try to extract specific answers for common question types | |
if any(keyword in query.lower() for keyword in ['how many', 'when was', 'who is', 'what year']): | |
# Look for numbers and dates in results | |
all_text = " ".join([r.get('body', '') for r in results]) | |
# Extract years | |
years = re.findall(r'\b(19|20)\d{2}\b', all_text) | |
if years and 'when' in query.lower(): | |
search_summary += f"\n\nExtracted years: {', '.join(set(years))}" | |
# Extract numbers | |
numbers = re.findall(r'\b\d+\b', all_text) | |
if numbers and 'how many' in query.lower(): | |
search_summary += f"\n\nExtracted numbers: {', '.join(set(numbers)[:5])}" | |
return search_summary | |
except Exception as e: | |
print(f"❌ Search error: {e}") | |
return f"Search failed: {str(e)}" | |
def advanced_calculator(self, expression: str) -> str: | |
"""Advanced calculator with symbolic math""" | |
print(f"🧮 Advanced calculation: {expression}") | |
try: | |
# Clean and normalize the expression | |
clean_expr = expression.replace('^', '**').replace('×', '*').replace('÷', '/') | |
clean_expr = re.sub(r'(\d)\s*\(', r'\1*(', clean_expr) # Add implicit multiplication | |
if sympify: | |
try: | |
# Try symbolic computation first | |
expr = sympify(clean_expr, evaluate=False) | |
result = simplify(expr) | |
numerical = N(result, 15) # High precision | |
# Handle different result types | |
if result.is_number: | |
return f"Calculation: {expression} = {numerical}" | |
else: | |
return f"Calculation: {expression} = {result} ≈ {numerical}" | |
except SympifyError: | |
# Fallback to numerical evaluation | |
result = eval(clean_expr) | |
return f"Calculation: {expression} = {result}" | |
else: | |
# Basic evaluation | |
result = eval(clean_expr) | |
return f"Calculation: {expression} = {result}" | |
except Exception as e: | |
return f"Could not calculate '{expression}': {str(e)}" | |
def fact_checker(self, query: str) -> str: | |
"""Specialized fact checking with multiple search strategies""" | |
print(f"✅ Fact checking: {query}") | |
# Try different search strategies | |
search_variations = [ | |
query, | |
f"{query} facts", | |
f"{query} biography" if any(word in query.lower() for word in ['who is', 'person', 'artist']) else f"{query} information", | |
] | |
all_results = [] | |
for search_query in search_variations[:2]: # Limit to avoid rate limiting | |
result = self.enhanced_web_search(search_query) | |
if "No results found" not in result: | |
all_results.append(f"Search: {search_query}\n{result}") | |
return "\n\n" + "="*50 + "\n\n".join(all_results) if all_results else f"Could not verify facts about: {query}" | |
def create_agent(self): | |
"""Create the ReAct agent""" | |
print("🤖 Creating ReAct agent...") | |
try: | |
self.agent = ReActAgent.from_tools( | |
tools=self.tools, | |
llm=self.llm, | |
verbose=True, | |
max_iterations=5, # Allow more iterations for complex problems | |
react_chat_formatter=None, # Use default formatter | |
) | |
print("✅ ReAct Agent created successfully") | |
except Exception as e: | |
print(f"❌ Agent creation failed: {e}") | |
traceback.print_exc() | |
raise | |
def __call__(self, question: str) -> str: | |
"""Process question through the agent""" | |
print(f"\n" + "="*60) | |
print(f"🤔 Processing: {question}") | |
print("="*60) | |
try: | |
# Use the agent to process the question | |
response = self.agent.query(question) | |
answer = str(response).strip() | |
# Validate response quality | |
if len(answer) < 10 or answer.lower() in ['error', 'none', 'unknown']: | |
print("⚠️ Poor response, trying direct approach...") | |
return self._direct_approach(question) | |
print(f"✅ Agent response: {answer[:200]}...") | |
return answer | |
except Exception as e: | |
print(f"❌ Agent error: {e}") | |
print("🔄 Falling back to direct approach...") | |
return self._direct_approach(question) | |
def _direct_approach(self, question: str) -> str: | |
"""Direct approach when agent fails""" | |
question_lower = question.lower() | |
# Determine approach based on question type | |
if any(term in question_lower for term in ['calculate', 'compute', 'math', '+', '-', '*', '/', '=', 'percentage', 'average']): | |
# Math-focused approach | |
math_result = self.advanced_calculator(question) | |
return math_result | |
elif any(term in question_lower for term in ['who is', 'when was', 'where is', 'what is', 'how many']): | |
# Search-focused approach | |
search_result = self.enhanced_web_search(question) | |
fact_result = self.fact_checker(question) | |
return f"{search_result}\n\nFact Check:\n{fact_result}" | |
else: | |
# General approach | |
search_result = self.enhanced_web_search(question) | |
return search_result | |
def cleanup_memory(): | |
"""Clean up GPU memory""" | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
print("🧹 Memory cleaned") | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
"""Run evaluation with improved agent""" | |
if not profile: | |
return "❌ Please login to Hugging Face first", None | |
username = profile.username | |
print(f"👤 User: {username}") | |
# API endpoints | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
cleanup_memory() | |
# Initialize improved agent | |
try: | |
print("🚀 Initializing Improved GAIA Agent...") | |
agent = ImprovedGAIAAgent() | |
print("✅ Agent initialized successfully") | |
except Exception as e: | |
error_msg = f"❌ Agent initialization failed: {str(e)}\n{traceback.format_exc()}" | |
print(error_msg) | |
return error_msg, None | |
# Get space info | |
space_id = os.getenv("SPACE_ID", "unknown") | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
# Fetch questions | |
try: | |
print("📥 Fetching questions...") | |
response = requests.get(questions_url, timeout=30) | |
response.raise_for_status() | |
questions_data = response.json() | |
print(f"📋 Got {len(questions_data)} questions") | |
except Exception as e: | |
return f"❌ Failed to fetch questions: {str(e)}", None | |
# Process all questions | |
results_log = [] | |
answers_payload = [] | |
print("\n" + "="*50) | |
print("🚀 STARTING GAIA EVALUATION") | |
print("="*50) | |
for i, item in enumerate(questions_data, 1): | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or not question_text: | |
continue | |
print(f"\n📝 Question {i}/{len(questions_data)}") | |
print(f"🆔 ID: {task_id}") | |
print(f"❓ Question: {question_text}") | |
try: | |
# Get answer from improved agent | |
answer = agent(question_text) | |
# Ensure answer is meaningful | |
if not answer or len(answer.strip()) < 5: | |
answer = f"Unable to determine answer for: {question_text[:100]}..." | |
print(f"✅ Answer: {answer[:200]}...") | |
# Store results | |
answers_payload.append({ | |
"task_id": task_id, | |
"submitted_answer": answer | |
}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:150] + ("..." if len(question_text) > 150 else ""), | |
"Answer": answer[:200] + ("..." if len(answer) > 200 else "") | |
}) | |
# Memory cleanup every few questions | |
if i % 3 == 0: | |
cleanup_memory() | |
except Exception as e: | |
print(f"❌ Error processing {task_id}: {e}") | |
error_answer = f"Processing error: {str(e)[:150]}" | |
answers_payload.append({ | |
"task_id": task_id, | |
"submitted_answer": error_answer | |
}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:150] + "...", | |
"Answer": error_answer | |
}) | |
print(f"\n📤 Submitting {len(answers_payload)} answers...") | |
# Submit answers | |
submission_data = { | |
"username": username, | |
"agent_code": agent_code, | |
"answers": answers_payload | |
} | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=180) | |
response.raise_for_status() | |
result_data = response.json() | |
score = result_data.get('score', 0) | |
correct = result_data.get('correct_count', 0) | |
total = result_data.get('total_attempted', len(answers_payload)) | |
message = result_data.get('message', '') | |
# Create final status message | |
final_status = f"""🎉 IMPROVED GAIA EVALUATION COMPLETE! | |
👤 User: {username} | |
🤖 Model: GPT-NeoX-20B + LoRA + 4-bit Quantization | |
📊 Final Score: {score}% | |
✅ Correct: {correct}/{total} | |
🎯 Target: 30%+ {'🎉 ACHIEVED!' if score >= 30 else '📈 Significant improvement expected!'} | |
📝 Message: {message} | |
🔧 Improvements Made: | |
- ✅ Proper causal LM (GPT-NeoX-20B) instead of encoder-decoder | |
- ✅ 4-bit quantization for memory efficiency | |
- ✅ LoRA for better parameter efficiency | |
- ✅ Enhanced tools with fact checking | |
- ✅ Better reasoning prompts | |
- ✅ Multi-strategy search approach | |
""" | |
print(f"\n🏆 FINAL SCORE: {score}%") | |
return final_status, pd.DataFrame(results_log) | |
except Exception as e: | |
error_msg = f"❌ Submission failed: {str(e)}" | |
print(error_msg) | |
return error_msg, pd.DataFrame(results_log) | |
# --- Gradio Interface --- | |
with gr.Blocks(title="Improved GAIA Agent", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# 🚀 Improved GAIA Agent - GPT-NeoX-20B + LoRA") | |
gr.Markdown(""" | |
**Major Improvements:** | |
- 🧠 **GPT-NeoX-20B**: 20B parameter causal language model (vs 220M FLAN-T5) | |
- ⚡ **4-bit Quantization**: Memory efficient loading with BitsAndBytes | |
- 🎯 **LoRA**: Parameter-efficient fine-tuning ready | |
- 🔍 **Enhanced Tools**: Multi-strategy search + fact checking + advanced math | |
- 🤖 **Better ReAct**: Improved reasoning prompts and error handling | |
- 📈 **Expected**: Significant improvement over 0% baseline | |
**Requirements**: CUDA GPU with 16GB+ VRAM | |
""") | |
with gr.Row(): | |
gr.LoginButton() | |
with gr.Row(): | |
run_button = gr.Button( | |
"🚀 Run Improved GAIA Evaluation", | |
variant="primary", | |
size="lg" | |
) | |
status_output = gr.Textbox( | |
label="📊 Evaluation Results", | |
lines=15, | |
interactive=False | |
) | |
results_table = gr.DataFrame( | |
label="📝 Detailed Results", | |
wrap=True | |
) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("🚀 Starting Improved GAIA Agent...") | |
print("💪 Using GPT-NeoX-20B + LoRA + 4-bit Quantization") | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) |