Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import requests | |
import pandas as pd | |
import re | |
import time | |
import json | |
import base64 | |
from typing import Dict, Any, List, Optional, Tuple | |
from io import StringIO, BytesIO | |
import openpyxl | |
from PIL import Image | |
import PyPDF2 | |
import ast | |
import math | |
import statistics | |
from datetime import datetime, timedelta | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
class FileProcessor: | |
"""Handle various file types that GAIA questions might reference""" | |
def process_excel_file(file_path: str) -> Dict[str, Any]: | |
"""Process Excel files and extract data""" | |
try: | |
# Try multiple sheet reading approaches | |
excel_data = {} | |
workbook = openpyxl.load_workbook(file_path, data_only=True) | |
for sheet_name in workbook.sheetnames: | |
sheet = workbook[sheet_name] | |
data = [] | |
for row in sheet.iter_rows(values_only=True): | |
if any(cell is not None for cell in row): | |
data.append(row) | |
excel_data[sheet_name] = data | |
return excel_data | |
except Exception as e: | |
print(f"Excel processing error: {e}") | |
return {} | |
def process_python_code(code_content: str) -> str: | |
"""Execute Python code safely and return output""" | |
try: | |
# Create a safe execution environment | |
safe_globals = { | |
'__builtins__': { | |
'print': print, 'len': len, 'range': range, 'sum': sum, | |
'max': max, 'min': min, 'abs': abs, 'round': round, | |
'int': int, 'float': float, 'str': str, 'list': list, | |
'dict': dict, 'set': set, 'tuple': tuple | |
}, | |
'math': math, | |
'statistics': statistics | |
} | |
# Capture output | |
import io | |
import sys | |
old_stdout = sys.stdout | |
sys.stdout = captured_output = io.StringIO() | |
try: | |
exec(code_content, safe_globals) | |
output = captured_output.getvalue() | |
finally: | |
sys.stdout = old_stdout | |
return output.strip() | |
except Exception as e: | |
return f"Code execution error: {e}" | |
def process_pdf_file(file_path: str) -> str: | |
"""Extract text from PDF files""" | |
try: | |
with open(file_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() + "\n" | |
return text.strip() | |
except Exception as e: | |
return f"PDF processing error: {e}" | |
class AdvancedWebSearchEngine: | |
"""Enhanced web search with multiple strategies""" | |
def __init__(self): | |
self.session = requests.Session() | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
}) | |
self.serper_api_key = os.getenv("SERPER_API_KEY") | |
self.search_cache = {} | |
def search_with_serper(self, query: str, search_type: str = "search") -> Dict[str, Any]: | |
"""Enhanced Serper API search with different types""" | |
if not self.serper_api_key: | |
return {} | |
# Check cache first | |
cache_key = f"{query}_{search_type}" | |
if cache_key in self.search_cache: | |
return self.search_cache[cache_key] | |
try: | |
url = f"https://google.serper.dev/{search_type}" | |
payload = { | |
"q": query, | |
"num": 15, # Get more results | |
"gl": "us", # US results | |
"hl": "en" # English language | |
} | |
headers = { | |
"X-API-KEY": self.serper_api_key, | |
"Content-Type": "application/json" | |
} | |
response = self.session.post(url, json=payload, headers=headers, timeout=20) | |
result = response.json() if response.status_code == 200 else {} | |
# Cache the result | |
self.search_cache[cache_key] = result | |
return result | |
except Exception as e: | |
print(f"Serper API error: {e}") | |
return {} | |
def multi_strategy_search(self, query: str) -> Dict[str, Any]: | |
"""Try multiple search strategies for better results""" | |
results = {} | |
# Primary search | |
primary = self.search_with_serper(query) | |
if primary: | |
results['primary'] = primary | |
# Try variations if primary doesn't yield good results | |
variations = [ | |
f'"{query}"', # Exact phrase | |
f"{query} site:wikipedia.org", # Wikipedia specific | |
f"{query} facts information", # More specific | |
] | |
for i, variation in enumerate(variations): | |
if len(results) < 2: # Don't overdo it | |
var_result = self.search_with_serper(variation) | |
if var_result and var_result != primary: | |
results[f'variation_{i}'] = var_result | |
return results | |
def extract_answer_from_results(self, results: Dict[str, Any], question: str) -> str: | |
"""Advanced answer extraction from search results""" | |
all_content = [] | |
for result_type, data in results.items(): | |
# Extract answer box | |
if "answerBox" in data: | |
answer_box = data["answerBox"] | |
if "answer" in answer_box: | |
return answer_box["answer"] | |
elif "snippet" in answer_box: | |
return answer_box["snippet"] | |
# Extract knowledge graph | |
if "knowledgeGraph" in data: | |
kg = data["knowledgeGraph"] | |
if "description" in kg: | |
all_content.append(kg["description"]) | |
# Extract organic results | |
for organic in data.get("organic", []): | |
title = organic.get("title", "") | |
snippet = organic.get("snippet", "") | |
if title and snippet: | |
all_content.append(f"{title}: {snippet}") | |
# Combine all content | |
combined_content = "\n".join(all_content) | |
# Apply question-specific extraction | |
return self.extract_specific_answer(combined_content, question) | |
def extract_specific_answer(self, content: str, question: str) -> str: | |
"""Extract specific answers based on question type""" | |
q_lower = question.lower() | |
# Numbers and quantities | |
if any(word in q_lower for word in ['how many', 'how much', 'number of', 'count']): | |
numbers = re.findall(r'\b\d{1,10}\b', content) | |
if numbers: | |
# Return the most likely number (often the first one found) | |
return numbers[0] | |
# Names and people | |
if any(word in q_lower for word in ['who', 'whom', 'name', 'person']): | |
# Look for proper names (capitalized words) | |
names = re.findall(r'\b[A-Z][a-z]+ [A-Z][a-z]+(?:\s[A-Z][a-z]+)*\b', content) | |
if names: | |
if 'first name' in q_lower: | |
return names[0].split()[0] | |
elif 'last name' in q_lower or 'surname' in q_lower: | |
return names[0].split()[-1] | |
else: | |
return names[0] | |
# Dates and years | |
if any(word in q_lower for word in ['when', 'year', 'date']): | |
years = re.findall(r'\b(19|20)\d{2}\b', content) | |
if years: | |
return years[0] | |
dates = re.findall(r'\b\w+ \d{1,2}, \d{4}\b', content) | |
if dates: | |
return dates[0] | |
# Places and locations | |
if any(word in q_lower for word in ['where', 'location', 'place', 'country']): | |
# Look for place names | |
places = re.findall(r'\b[A-Z][a-z]+(?:\s[A-Z][a-z]+)*(?:\s(?:City|State|Country|Province|Region))?\b', content) | |
if places: | |
return places[0] | |
# Country codes | |
if 'country code' in q_lower: | |
codes = re.findall(r'\b[A-Z]{2,3}\b', content) | |
if codes: | |
return codes[0] | |
# Default: return first meaningful sentence | |
sentences = [s.strip() for s in content.split('.') if len(s.strip()) > 20] | |
return sentences[0] if sentences else "Answer not found in search results" | |
class EnhancedQuestionSolver: | |
"""Advanced question solver with multiple reasoning strategies""" | |
def __init__(self): | |
self.search_engine = AdvancedWebSearchEngine() | |
self.file_processor = FileProcessor() | |
def solve_question(self, question: str, files: List[str] = None) -> str: | |
"""Main question solving method with multiple strategies""" | |
print(f"๐ค Analyzing: {question[:100]}...") | |
# Handle file-based questions first | |
if files: | |
file_answer = self.handle_file_based_question(question, files) | |
if file_answer and file_answer != "File processing failed": | |
return file_answer | |
# Detect file references in question text | |
if self.has_file_references(question): | |
return self.handle_file_reference_question(question) | |
# Handle mathematical calculations | |
if self.is_math_question(question): | |
return self.handle_math_question(question) | |
# Handle multi-step reasoning questions | |
if self.needs_multi_step_reasoning(question): | |
return self.handle_multi_step_question(question) | |
# Handle specific structured questions | |
return self.handle_structured_question(question) | |
def has_file_references(self, question: str) -> bool: | |
"""Check if question references files""" | |
file_indicators = [ | |
"attached", "excel file", "python code", "pdf", "image", | |
"spreadsheet", "document", "file contains", "in the file" | |
] | |
return any(indicator in question.lower() for indicator in file_indicators) | |
def handle_file_reference_question(self, question: str) -> str: | |
"""Handle questions that reference files but files aren't provided""" | |
# Try to search for the specific content mentioned | |
if "excel file" in question.lower() and "sales" in question.lower(): | |
return "Unable to access attached Excel file. Please ensure file is properly uploaded." | |
elif "python code" in question.lower(): | |
return "Unable to access attached Python code. Please ensure file is properly uploaded." | |
else: | |
return "File referenced but not accessible. Please provide the file." | |
def handle_file_based_question(self, question: str, files: List[str]) -> str: | |
"""Handle questions that involve file processing""" | |
try: | |
for file_path in files: | |
if file_path.endswith('.xlsx') or file_path.endswith('.xls'): | |
excel_data = self.file_processor.process_excel_file(file_path) | |
return self.analyze_excel_data(excel_data, question) | |
elif file_path.endswith('.py'): | |
with open(file_path, 'r') as f: | |
code_content = f.read() | |
return self.file_processor.process_python_code(code_content) | |
elif file_path.endswith('.pdf'): | |
pdf_text = self.file_processor.process_pdf_file(file_path) | |
return self.analyze_text_content(pdf_text, question) | |
except Exception as e: | |
return f"File processing failed: {e}" | |
return "File processing failed" | |
def analyze_excel_data(self, excel_data: Dict, question: str) -> str: | |
"""Analyze Excel data to answer questions""" | |
if not excel_data: | |
return "No data found in Excel file" | |
# Convert to DataFrame for analysis | |
try: | |
for sheet_name, data in excel_data.items(): | |
if data: | |
df = pd.DataFrame(data[1:], columns=data[0]) # First row as header | |
# Handle sales analysis questions | |
if "sales" in question.lower(): | |
if "total" in question.lower(): | |
numeric_cols = df.select_dtypes(include=[int, float]).columns | |
if len(numeric_cols) > 0: | |
return str(df[numeric_cols[0]].sum()) | |
elif "average" in question.lower(): | |
numeric_cols = df.select_dtypes(include=[int, float]).columns | |
if len(numeric_cols) > 0: | |
return str(df[numeric_cols[0]].mean()) | |
return "Could not analyze Excel data for this question" | |
except Exception as e: | |
return f"Excel analysis error: {e}" | |
def analyze_text_content(self, text: str, question: str) -> str: | |
"""Analyze text content to find answers""" | |
# Look for specific patterns based on question | |
if "surname" in question.lower() or "last name" in question.lower(): | |
names = re.findall(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', text) | |
if names: | |
return names[0].split()[-1] | |
# Use search to find more specific information | |
search_query = f"{question} {text[:100]}" | |
results = self.search_engine.multi_strategy_search(search_query) | |
return self.search_engine.extract_answer_from_results(results, question) | |
def is_math_question(self, question: str) -> bool: | |
"""Detect mathematical questions""" | |
math_indicators = [ | |
'calculate', 'compute', 'sum', 'average', 'mean', | |
'total', 'how many', 'how much', 'solve', 'equation' | |
] | |
return any(indicator in question.lower() for indicator in math_indicators) | |
def handle_math_question(self, question: str) -> str: | |
"""Handle mathematical questions""" | |
# Try to extract and solve mathematical expressions | |
expressions = re.findall(r'\b\d+\s*[\+\-\*\/]\s*\d+\b', question) | |
for expr in expressions: | |
try: | |
result = eval(expr) | |
return str(result) | |
except: | |
continue | |
# For word problems, search for the answer | |
results = self.search_engine.multi_strategy_search(question) | |
return self.search_engine.extract_answer_from_results(results, question) | |
def needs_multi_step_reasoning(self, question: str) -> bool: | |
"""Check if question needs multi-step reasoning""" | |
multi_step_indicators = [ | |
"who played", "actor who", "person who", "after", | |
"before", "then", "subsequently", "following" | |
] | |
return any(indicator in question.lower() for indicator in multi_step_indicators) | |
def handle_multi_step_question(self, question: str) -> str: | |
"""Handle questions requiring multiple steps""" | |
# Break down complex questions | |
if "actor who played" in question.lower(): | |
return self.handle_actor_chain_question(question) | |
elif "before and after" in question.lower(): | |
return self.handle_sequence_question(question) | |
else: | |
return self.handle_structured_question(question) | |
def handle_actor_chain_question(self, question: str) -> str: | |
"""Handle questions about actors playing different roles""" | |
# Step 1: Find the initial actor/role | |
parts = question.split(" in ") | |
if len(parts) >= 2: | |
first_search = f"actor who played {parts[0].split('actor who played')[1]} in {parts[1].split(' play in')[0]}" | |
results1 = self.search_engine.multi_strategy_search(first_search) | |
actor_name = self.search_engine.extract_answer_from_results(results1, f"who is the actor") | |
if actor_name and actor_name != "Answer not found in search results": | |
# Step 2: Find what this actor played in the target show/movie | |
target = parts[1].split(" play in ")[1] if " play in " in parts[1] else parts[1] | |
second_search = f"{actor_name} role in {target}" | |
results2 = self.search_engine.multi_strategy_search(second_search) | |
return self.search_engine.extract_answer_from_results(results2, f"what role did {actor_name} play") | |
# Fallback to single search | |
results = self.search_engine.multi_strategy_search(question) | |
return self.search_engine.extract_answer_from_results(results, question) | |
def handle_sequence_question(self, question: str) -> str: | |
"""Handle questions about sequences (before/after)""" | |
results = self.search_engine.multi_strategy_search(question) | |
return self.search_engine.extract_answer_from_results(results, question) | |
def handle_structured_question(self, question: str) -> str: | |
"""Handle general structured questions with enhanced search""" | |
results = self.search_engine.multi_strategy_search(question) | |
answer = self.search_engine.extract_answer_from_results(results, question) | |
# If no good answer found, try rephrasing the question | |
if answer == "Answer not found in search results": | |
rephrased_questions = self.rephrase_question(question) | |
for rq in rephrased_questions: | |
results = self.search_engine.multi_strategy_search(rq) | |
answer = self.search_engine.extract_answer_from_results(results, question) | |
if answer != "Answer not found in search results": | |
break | |
return answer | |
def rephrase_question(self, question: str) -> List[str]: | |
"""Generate alternative phrasings of the question""" | |
rephrased = [] | |
# Add question marks if missing | |
if not question.endswith('?'): | |
rephrased.append(question + '?') | |
# Remove question words for factual search | |
words_to_remove = ['what is', 'who is', 'where is', 'when is', 'how many', 'how much'] | |
for word in words_to_remove: | |
if word in question.lower(): | |
rephrased.append(question.lower().replace(word, '').strip()) | |
# Add context words | |
context_words = ['information about', 'facts about', 'details about'] | |
for context in context_words: | |
rephrased.append(f"{context} {question}") | |
return rephrased[:3] # Limit to 3 rephrasings | |
def get_enhanced_api_status(): | |
"""Check API status with more details""" | |
status = [] | |
if os.getenv("SERPER_API_KEY"): | |
status.append("โ Serper API: Configured") | |
else: | |
status.append("โ Serper API: Missing - Get key at serper.dev") | |
# Check if we can access file processing libraries | |
try: | |
import openpyxl | |
status.append("โ Excel Processing: Available") | |
except ImportError: | |
status.append("โ Excel Processing: openpyxl not available") | |
try: | |
import PyPDF2 | |
status.append("โ PDF Processing: Available") | |
except ImportError: | |
status.append("โ PDF Processing: PyPDF2 not available") | |
return "\n".join(status) | |
def run_enhanced_gaia_evaluation(profile: gr.OAuthProfile | None): | |
"""Run GAIA evaluation with enhanced solving capabilities""" | |
if not profile: | |
return "Please log in to Hugging Face first.", None | |
# Check API status | |
api_status = get_enhanced_api_status() | |
if "โ Serper API" in api_status: | |
return f"โ ๏ธ Serper API not configured!\n\n{api_status}", None | |
username = profile.username | |
questions_url = f"{DEFAULT_API_URL}/questions" | |
submit_url = f"{DEFAULT_API_URL}/submit" | |
try: | |
solver = EnhancedQuestionSolver() | |
print("โ Enhanced question solver initialized") | |
except Exception as e: | |
return f"โ Initialization failed: {e}", None | |
try: | |
print("๐ฅ Fetching questions...") | |
r = requests.get(questions_url, timeout=30) | |
r.raise_for_status() | |
questions = r.json() | |
print(f"โ Got {len(questions)} questions") | |
except Exception as e: | |
return f"โ Failed to fetch questions: {e}", None | |
answers = [] | |
logs = [] | |
for i, item in enumerate(questions): | |
task_id = item.get("task_id") | |
question = item.get("question") | |
files = item.get("files", []) # Get attached files if any | |
if not task_id or not question: | |
continue | |
print(f"\n๐ Processing {i+1}/{len(questions)}: {task_id}") | |
print(f"๐ Question: {question[:100]}{'...' if len(question) > 100 else ''}") | |
if files: | |
print(f"๐ Files: {files}") | |
try: | |
start_time = time.time() | |
answer = solver.solve_question(question, files) | |
processing_time = time.time() - start_time | |
answers.append({"task_id": task_id, "submitted_answer": answer}) | |
logs.append({ | |
"Task ID": task_id, | |
"Question": question[:150] + "..." if len(question) > 150 else question, | |
"Answer": answer[:100] + "..." if len(answer) > 100 else answer, | |
"Files": len(files) if files else 0, | |
"Time (s)": f"{processing_time:.2f}" | |
}) | |
print(f"โ Answer: {answer[:80]}{'...' if len(answer) > 80 else ''}") | |
time.sleep(0.5) # Rate limiting for API | |
except Exception as e: | |
error_msg = f"Error: {str(e)}" | |
answers.append({"task_id": task_id, "submitted_answer": error_msg}) | |
logs.append({ | |
"Task ID": task_id, | |
"Question": question[:150] + "..." if len(question) > 150 else question, | |
"Answer": error_msg, | |
"Files": len(files) if files else 0, | |
"Time (s)": "Error" | |
}) | |
print(f"โ Error: {e}") | |
# Submit answers | |
print(f"\n๐ค Submitting {len(answers)} answers...") | |
payload = { | |
"username": username, | |
"agent_code": f"https://huggingface.co/spaces/{os.getenv('SPACE_ID', '')}/tree/main", | |
"answers": answers | |
} | |
try: | |
resp = requests.post(submit_url, json=payload, timeout=300) # Increased timeout | |
resp.raise_for_status() | |
data = resp.json() | |
score = data.get('score', 'N/A') | |
correct = data.get('correct_count', '?') | |
total = data.get('total_attempted', '?') | |
result_message = f"""๐ฏ ENHANCED GAIA EVALUATION RESULTS | |
๐ Final Score: {score}% ({correct}/{total} correct) | |
๐ง System Status: | |
{api_status} | |
๐ Enhanced Features: | |
โข Multi-strategy web search with result caching | |
โข Advanced file processing (Excel, PDF, Python) | |
โข Multi-step reasoning for complex questions | |
โข Context-aware answer extraction | |
โข Question rephrasing for better results | |
โข Specialized handlers for different question types | |
๐ Performance Improvements: | |
โข Better search result processing | |
โข Enhanced name/number extraction | |
โข Improved mathematical computation | |
โข File-based question handling | |
โข Actor chain and sequence reasoning""" | |
return result_message, pd.DataFrame(logs) | |
except Exception as e: | |
return f"โ Submission failed: {str(e)}", pd.DataFrame(logs) | |
# Enhanced Gradio Interface | |
with gr.Blocks(title="Enhanced GAIA Agent", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# ๐ง Enhanced GAIA Benchmark Agent v2.0 | |
**๐ง Required Setup:** | |
- `SERPER_API_KEY` environment variable - Get 2500 free searches/month at [serper.dev](https://serper.dev) | |
**โก Advanced Capabilities:** | |
- ๐ Multi-strategy web search with intelligent caching | |
- ๐ Excel/CSV file processing and analysis | |
- ๐ Python code execution for computational questions | |
- ๐ PDF document text extraction and analysis | |
- ๐งฎ Advanced mathematical problem solving | |
- ๐ญ Multi-step reasoning for complex actor/person chains | |
- ๐ฏ Context-aware answer extraction with multiple fallbacks | |
- ๐ Question rephrasing for better search results | |
**๐ Expected Performance:** | |
- Significantly improved accuracy on GAIA benchmark | |
- Better handling of file-based questions | |
- Enhanced name/number/date extraction | |
- Robust error handling and fallback strategies | |
""") | |
gr.LoginButton() | |
with gr.Row(): | |
with gr.Column(): | |
api_status_display = gr.Textbox( | |
label="๐ง System Status", | |
value=get_enhanced_api_status(), | |
lines=4, | |
interactive=False | |
) | |
run_button = gr.Button( | |
"๐ Run Enhanced GAIA Evaluation", | |
variant="primary", | |
size="lg" | |
) | |
with gr.Row(): | |
results_display = gr.Textbox( | |
label="๐ Evaluation Results", | |
lines=15, | |
interactive=False | |
) | |
with gr.Row(): | |
detailed_results = gr.DataFrame( | |
label="๐ Detailed Question Analysis", | |
wrap=True, | |
interactive=False | |
) | |
# Refresh status button | |
refresh_status = gr.Button("๐ Refresh Status", size="sm") | |
refresh_status.click( | |
lambda: get_enhanced_api_status(), | |
outputs=[api_status_display] | |
) | |
run_button.click( | |
run_enhanced_gaia_evaluation, | |
outputs=[results_display, detailed_results] | |
) | |
if __name__ == "__main__": | |
demo.launch(share=True, debug=True) |