LamiaYT's picture
Last
0f20e93
raw
history blame
26.4 kB
import os
import gradio as gr
import requests
import pandas as pd
import re
import time
import json
import base64
from typing import Dict, Any, List, Optional, Tuple
from io import StringIO, BytesIO
import openpyxl
from PIL import Image
import PyPDF2
import ast
import math
import statistics
from datetime import datetime, timedelta
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
class FileProcessor:
"""Handle various file types that GAIA questions might reference"""
@staticmethod
def process_excel_file(file_path: str) -> Dict[str, Any]:
"""Process Excel files and extract data"""
try:
# Try multiple sheet reading approaches
excel_data = {}
workbook = openpyxl.load_workbook(file_path, data_only=True)
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
data = []
for row in sheet.iter_rows(values_only=True):
if any(cell is not None for cell in row):
data.append(row)
excel_data[sheet_name] = data
return excel_data
except Exception as e:
print(f"Excel processing error: {e}")
return {}
@staticmethod
def process_python_code(code_content: str) -> str:
"""Execute Python code safely and return output"""
try:
# Create a safe execution environment
safe_globals = {
'__builtins__': {
'print': print, 'len': len, 'range': range, 'sum': sum,
'max': max, 'min': min, 'abs': abs, 'round': round,
'int': int, 'float': float, 'str': str, 'list': list,
'dict': dict, 'set': set, 'tuple': tuple
},
'math': math,
'statistics': statistics
}
# Capture output
import io
import sys
old_stdout = sys.stdout
sys.stdout = captured_output = io.StringIO()
try:
exec(code_content, safe_globals)
output = captured_output.getvalue()
finally:
sys.stdout = old_stdout
return output.strip()
except Exception as e:
return f"Code execution error: {e}"
@staticmethod
def process_pdf_file(file_path: str) -> str:
"""Extract text from PDF files"""
try:
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text.strip()
except Exception as e:
return f"PDF processing error: {e}"
class AdvancedWebSearchEngine:
"""Enhanced web search with multiple strategies"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
self.serper_api_key = os.getenv("SERPER_API_KEY")
self.search_cache = {}
def search_with_serper(self, query: str, search_type: str = "search") -> Dict[str, Any]:
"""Enhanced Serper API search with different types"""
if not self.serper_api_key:
return {}
# Check cache first
cache_key = f"{query}_{search_type}"
if cache_key in self.search_cache:
return self.search_cache[cache_key]
try:
url = f"https://google.serper.dev/{search_type}"
payload = {
"q": query,
"num": 15, # Get more results
"gl": "us", # US results
"hl": "en" # English language
}
headers = {
"X-API-KEY": self.serper_api_key,
"Content-Type": "application/json"
}
response = self.session.post(url, json=payload, headers=headers, timeout=20)
result = response.json() if response.status_code == 200 else {}
# Cache the result
self.search_cache[cache_key] = result
return result
except Exception as e:
print(f"Serper API error: {e}")
return {}
def multi_strategy_search(self, query: str) -> Dict[str, Any]:
"""Try multiple search strategies for better results"""
results = {}
# Primary search
primary = self.search_with_serper(query)
if primary:
results['primary'] = primary
# Try variations if primary doesn't yield good results
variations = [
f'"{query}"', # Exact phrase
f"{query} site:wikipedia.org", # Wikipedia specific
f"{query} facts information", # More specific
]
for i, variation in enumerate(variations):
if len(results) < 2: # Don't overdo it
var_result = self.search_with_serper(variation)
if var_result and var_result != primary:
results[f'variation_{i}'] = var_result
return results
def extract_answer_from_results(self, results: Dict[str, Any], question: str) -> str:
"""Advanced answer extraction from search results"""
all_content = []
for result_type, data in results.items():
# Extract answer box
if "answerBox" in data:
answer_box = data["answerBox"]
if "answer" in answer_box:
return answer_box["answer"]
elif "snippet" in answer_box:
return answer_box["snippet"]
# Extract knowledge graph
if "knowledgeGraph" in data:
kg = data["knowledgeGraph"]
if "description" in kg:
all_content.append(kg["description"])
# Extract organic results
for organic in data.get("organic", []):
title = organic.get("title", "")
snippet = organic.get("snippet", "")
if title and snippet:
all_content.append(f"{title}: {snippet}")
# Combine all content
combined_content = "\n".join(all_content)
# Apply question-specific extraction
return self.extract_specific_answer(combined_content, question)
def extract_specific_answer(self, content: str, question: str) -> str:
"""Extract specific answers based on question type"""
q_lower = question.lower()
# Numbers and quantities
if any(word in q_lower for word in ['how many', 'how much', 'number of', 'count']):
numbers = re.findall(r'\b\d{1,10}\b', content)
if numbers:
# Return the most likely number (often the first one found)
return numbers[0]
# Names and people
if any(word in q_lower for word in ['who', 'whom', 'name', 'person']):
# Look for proper names (capitalized words)
names = re.findall(r'\b[A-Z][a-z]+ [A-Z][a-z]+(?:\s[A-Z][a-z]+)*\b', content)
if names:
if 'first name' in q_lower:
return names[0].split()[0]
elif 'last name' in q_lower or 'surname' in q_lower:
return names[0].split()[-1]
else:
return names[0]
# Dates and years
if any(word in q_lower for word in ['when', 'year', 'date']):
years = re.findall(r'\b(19|20)\d{2}\b', content)
if years:
return years[0]
dates = re.findall(r'\b\w+ \d{1,2}, \d{4}\b', content)
if dates:
return dates[0]
# Places and locations
if any(word in q_lower for word in ['where', 'location', 'place', 'country']):
# Look for place names
places = re.findall(r'\b[A-Z][a-z]+(?:\s[A-Z][a-z]+)*(?:\s(?:City|State|Country|Province|Region))?\b', content)
if places:
return places[0]
# Country codes
if 'country code' in q_lower:
codes = re.findall(r'\b[A-Z]{2,3}\b', content)
if codes:
return codes[0]
# Default: return first meaningful sentence
sentences = [s.strip() for s in content.split('.') if len(s.strip()) > 20]
return sentences[0] if sentences else "Answer not found in search results"
class EnhancedQuestionSolver:
"""Advanced question solver with multiple reasoning strategies"""
def __init__(self):
self.search_engine = AdvancedWebSearchEngine()
self.file_processor = FileProcessor()
def solve_question(self, question: str, files: List[str] = None) -> str:
"""Main question solving method with multiple strategies"""
print(f"๐Ÿค” Analyzing: {question[:100]}...")
# Handle file-based questions first
if files:
file_answer = self.handle_file_based_question(question, files)
if file_answer and file_answer != "File processing failed":
return file_answer
# Detect file references in question text
if self.has_file_references(question):
return self.handle_file_reference_question(question)
# Handle mathematical calculations
if self.is_math_question(question):
return self.handle_math_question(question)
# Handle multi-step reasoning questions
if self.needs_multi_step_reasoning(question):
return self.handle_multi_step_question(question)
# Handle specific structured questions
return self.handle_structured_question(question)
def has_file_references(self, question: str) -> bool:
"""Check if question references files"""
file_indicators = [
"attached", "excel file", "python code", "pdf", "image",
"spreadsheet", "document", "file contains", "in the file"
]
return any(indicator in question.lower() for indicator in file_indicators)
def handle_file_reference_question(self, question: str) -> str:
"""Handle questions that reference files but files aren't provided"""
# Try to search for the specific content mentioned
if "excel file" in question.lower() and "sales" in question.lower():
return "Unable to access attached Excel file. Please ensure file is properly uploaded."
elif "python code" in question.lower():
return "Unable to access attached Python code. Please ensure file is properly uploaded."
else:
return "File referenced but not accessible. Please provide the file."
def handle_file_based_question(self, question: str, files: List[str]) -> str:
"""Handle questions that involve file processing"""
try:
for file_path in files:
if file_path.endswith('.xlsx') or file_path.endswith('.xls'):
excel_data = self.file_processor.process_excel_file(file_path)
return self.analyze_excel_data(excel_data, question)
elif file_path.endswith('.py'):
with open(file_path, 'r') as f:
code_content = f.read()
return self.file_processor.process_python_code(code_content)
elif file_path.endswith('.pdf'):
pdf_text = self.file_processor.process_pdf_file(file_path)
return self.analyze_text_content(pdf_text, question)
except Exception as e:
return f"File processing failed: {e}"
return "File processing failed"
def analyze_excel_data(self, excel_data: Dict, question: str) -> str:
"""Analyze Excel data to answer questions"""
if not excel_data:
return "No data found in Excel file"
# Convert to DataFrame for analysis
try:
for sheet_name, data in excel_data.items():
if data:
df = pd.DataFrame(data[1:], columns=data[0]) # First row as header
# Handle sales analysis questions
if "sales" in question.lower():
if "total" in question.lower():
numeric_cols = df.select_dtypes(include=[int, float]).columns
if len(numeric_cols) > 0:
return str(df[numeric_cols[0]].sum())
elif "average" in question.lower():
numeric_cols = df.select_dtypes(include=[int, float]).columns
if len(numeric_cols) > 0:
return str(df[numeric_cols[0]].mean())
return "Could not analyze Excel data for this question"
except Exception as e:
return f"Excel analysis error: {e}"
def analyze_text_content(self, text: str, question: str) -> str:
"""Analyze text content to find answers"""
# Look for specific patterns based on question
if "surname" in question.lower() or "last name" in question.lower():
names = re.findall(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', text)
if names:
return names[0].split()[-1]
# Use search to find more specific information
search_query = f"{question} {text[:100]}"
results = self.search_engine.multi_strategy_search(search_query)
return self.search_engine.extract_answer_from_results(results, question)
def is_math_question(self, question: str) -> bool:
"""Detect mathematical questions"""
math_indicators = [
'calculate', 'compute', 'sum', 'average', 'mean',
'total', 'how many', 'how much', 'solve', 'equation'
]
return any(indicator in question.lower() for indicator in math_indicators)
def handle_math_question(self, question: str) -> str:
"""Handle mathematical questions"""
# Try to extract and solve mathematical expressions
expressions = re.findall(r'\b\d+\s*[\+\-\*\/]\s*\d+\b', question)
for expr in expressions:
try:
result = eval(expr)
return str(result)
except:
continue
# For word problems, search for the answer
results = self.search_engine.multi_strategy_search(question)
return self.search_engine.extract_answer_from_results(results, question)
def needs_multi_step_reasoning(self, question: str) -> bool:
"""Check if question needs multi-step reasoning"""
multi_step_indicators = [
"who played", "actor who", "person who", "after",
"before", "then", "subsequently", "following"
]
return any(indicator in question.lower() for indicator in multi_step_indicators)
def handle_multi_step_question(self, question: str) -> str:
"""Handle questions requiring multiple steps"""
# Break down complex questions
if "actor who played" in question.lower():
return self.handle_actor_chain_question(question)
elif "before and after" in question.lower():
return self.handle_sequence_question(question)
else:
return self.handle_structured_question(question)
def handle_actor_chain_question(self, question: str) -> str:
"""Handle questions about actors playing different roles"""
# Step 1: Find the initial actor/role
parts = question.split(" in ")
if len(parts) >= 2:
first_search = f"actor who played {parts[0].split('actor who played')[1]} in {parts[1].split(' play in')[0]}"
results1 = self.search_engine.multi_strategy_search(first_search)
actor_name = self.search_engine.extract_answer_from_results(results1, f"who is the actor")
if actor_name and actor_name != "Answer not found in search results":
# Step 2: Find what this actor played in the target show/movie
target = parts[1].split(" play in ")[1] if " play in " in parts[1] else parts[1]
second_search = f"{actor_name} role in {target}"
results2 = self.search_engine.multi_strategy_search(second_search)
return self.search_engine.extract_answer_from_results(results2, f"what role did {actor_name} play")
# Fallback to single search
results = self.search_engine.multi_strategy_search(question)
return self.search_engine.extract_answer_from_results(results, question)
def handle_sequence_question(self, question: str) -> str:
"""Handle questions about sequences (before/after)"""
results = self.search_engine.multi_strategy_search(question)
return self.search_engine.extract_answer_from_results(results, question)
def handle_structured_question(self, question: str) -> str:
"""Handle general structured questions with enhanced search"""
results = self.search_engine.multi_strategy_search(question)
answer = self.search_engine.extract_answer_from_results(results, question)
# If no good answer found, try rephrasing the question
if answer == "Answer not found in search results":
rephrased_questions = self.rephrase_question(question)
for rq in rephrased_questions:
results = self.search_engine.multi_strategy_search(rq)
answer = self.search_engine.extract_answer_from_results(results, question)
if answer != "Answer not found in search results":
break
return answer
def rephrase_question(self, question: str) -> List[str]:
"""Generate alternative phrasings of the question"""
rephrased = []
# Add question marks if missing
if not question.endswith('?'):
rephrased.append(question + '?')
# Remove question words for factual search
words_to_remove = ['what is', 'who is', 'where is', 'when is', 'how many', 'how much']
for word in words_to_remove:
if word in question.lower():
rephrased.append(question.lower().replace(word, '').strip())
# Add context words
context_words = ['information about', 'facts about', 'details about']
for context in context_words:
rephrased.append(f"{context} {question}")
return rephrased[:3] # Limit to 3 rephrasings
def get_enhanced_api_status():
"""Check API status with more details"""
status = []
if os.getenv("SERPER_API_KEY"):
status.append("โœ… Serper API: Configured")
else:
status.append("โŒ Serper API: Missing - Get key at serper.dev")
# Check if we can access file processing libraries
try:
import openpyxl
status.append("โœ… Excel Processing: Available")
except ImportError:
status.append("โŒ Excel Processing: openpyxl not available")
try:
import PyPDF2
status.append("โœ… PDF Processing: Available")
except ImportError:
status.append("โŒ PDF Processing: PyPDF2 not available")
return "\n".join(status)
def run_enhanced_gaia_evaluation(profile: gr.OAuthProfile | None):
"""Run GAIA evaluation with enhanced solving capabilities"""
if not profile:
return "Please log in to Hugging Face first.", None
# Check API status
api_status = get_enhanced_api_status()
if "โŒ Serper API" in api_status:
return f"โš ๏ธ Serper API not configured!\n\n{api_status}", None
username = profile.username
questions_url = f"{DEFAULT_API_URL}/questions"
submit_url = f"{DEFAULT_API_URL}/submit"
try:
solver = EnhancedQuestionSolver()
print("โœ… Enhanced question solver initialized")
except Exception as e:
return f"โŒ Initialization failed: {e}", None
try:
print("๐Ÿ“ฅ Fetching questions...")
r = requests.get(questions_url, timeout=30)
r.raise_for_status()
questions = r.json()
print(f"โœ… Got {len(questions)} questions")
except Exception as e:
return f"โŒ Failed to fetch questions: {e}", None
answers = []
logs = []
for i, item in enumerate(questions):
task_id = item.get("task_id")
question = item.get("question")
files = item.get("files", []) # Get attached files if any
if not task_id or not question:
continue
print(f"\n๐Ÿ”„ Processing {i+1}/{len(questions)}: {task_id}")
print(f"๐Ÿ“ Question: {question[:100]}{'...' if len(question) > 100 else ''}")
if files:
print(f"๐Ÿ“Ž Files: {files}")
try:
start_time = time.time()
answer = solver.solve_question(question, files)
processing_time = time.time() - start_time
answers.append({"task_id": task_id, "submitted_answer": answer})
logs.append({
"Task ID": task_id,
"Question": question[:150] + "..." if len(question) > 150 else question,
"Answer": answer[:100] + "..." if len(answer) > 100 else answer,
"Files": len(files) if files else 0,
"Time (s)": f"{processing_time:.2f}"
})
print(f"โœ… Answer: {answer[:80]}{'...' if len(answer) > 80 else ''}")
time.sleep(0.5) # Rate limiting for API
except Exception as e:
error_msg = f"Error: {str(e)}"
answers.append({"task_id": task_id, "submitted_answer": error_msg})
logs.append({
"Task ID": task_id,
"Question": question[:150] + "..." if len(question) > 150 else question,
"Answer": error_msg,
"Files": len(files) if files else 0,
"Time (s)": "Error"
})
print(f"โŒ Error: {e}")
# Submit answers
print(f"\n๐Ÿ“ค Submitting {len(answers)} answers...")
payload = {
"username": username,
"agent_code": f"https://huggingface.co/spaces/{os.getenv('SPACE_ID', '')}/tree/main",
"answers": answers
}
try:
resp = requests.post(submit_url, json=payload, timeout=300) # Increased timeout
resp.raise_for_status()
data = resp.json()
score = data.get('score', 'N/A')
correct = data.get('correct_count', '?')
total = data.get('total_attempted', '?')
result_message = f"""๐ŸŽฏ ENHANCED GAIA EVALUATION RESULTS
๐Ÿ“Š Final Score: {score}% ({correct}/{total} correct)
๐Ÿ”ง System Status:
{api_status}
๐Ÿš€ Enhanced Features:
โ€ข Multi-strategy web search with result caching
โ€ข Advanced file processing (Excel, PDF, Python)
โ€ข Multi-step reasoning for complex questions
โ€ข Context-aware answer extraction
โ€ข Question rephrasing for better results
โ€ข Specialized handlers for different question types
๐Ÿ“ˆ Performance Improvements:
โ€ข Better search result processing
โ€ข Enhanced name/number extraction
โ€ข Improved mathematical computation
โ€ข File-based question handling
โ€ข Actor chain and sequence reasoning"""
return result_message, pd.DataFrame(logs)
except Exception as e:
return f"โŒ Submission failed: {str(e)}", pd.DataFrame(logs)
# Enhanced Gradio Interface
with gr.Blocks(title="Enhanced GAIA Agent", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# ๐Ÿง  Enhanced GAIA Benchmark Agent v2.0
**๐Ÿ”ง Required Setup:**
- `SERPER_API_KEY` environment variable - Get 2500 free searches/month at [serper.dev](https://serper.dev)
**โšก Advanced Capabilities:**
- ๐Ÿ” Multi-strategy web search with intelligent caching
- ๐Ÿ“Š Excel/CSV file processing and analysis
- ๐Ÿ Python code execution for computational questions
- ๐Ÿ“„ PDF document text extraction and analysis
- ๐Ÿงฎ Advanced mathematical problem solving
- ๐ŸŽญ Multi-step reasoning for complex actor/person chains
- ๐ŸŽฏ Context-aware answer extraction with multiple fallbacks
- ๐Ÿ“ Question rephrasing for better search results
**๐Ÿ“ˆ Expected Performance:**
- Significantly improved accuracy on GAIA benchmark
- Better handling of file-based questions
- Enhanced name/number/date extraction
- Robust error handling and fallback strategies
""")
gr.LoginButton()
with gr.Row():
with gr.Column():
api_status_display = gr.Textbox(
label="๐Ÿ”ง System Status",
value=get_enhanced_api_status(),
lines=4,
interactive=False
)
run_button = gr.Button(
"๐Ÿš€ Run Enhanced GAIA Evaluation",
variant="primary",
size="lg"
)
with gr.Row():
results_display = gr.Textbox(
label="๐Ÿ“Š Evaluation Results",
lines=15,
interactive=False
)
with gr.Row():
detailed_results = gr.DataFrame(
label="๐Ÿ“‹ Detailed Question Analysis",
wrap=True,
interactive=False
)
# Refresh status button
refresh_status = gr.Button("๐Ÿ”„ Refresh Status", size="sm")
refresh_status.click(
lambda: get_enhanced_api_status(),
outputs=[api_status_display]
)
run_button.click(
run_enhanced_gaia_evaluation,
outputs=[results_display, detailed_results]
)
if __name__ == "__main__":
demo.launch(share=True, debug=True)