Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import requests | |
import json | |
import re | |
import numexpr | |
import pandas as pd | |
import math | |
import pdfminer | |
from duckduckgo_search import DDGS | |
from pdfminer.high_level import extract_text | |
from bs4 import BeautifulSoup | |
import html2text | |
from typing import Dict, Any, List, Tuple, Callable, Optional | |
from dotenv import load_dotenv | |
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig | |
import torch | |
import time | |
import gc | |
import warnings | |
# Suppress warnings | |
warnings.filterwarnings("ignore") | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
# --- Load Environment Variables --- | |
load_dotenv() | |
SERPER_API_KEY = os.getenv("SERPER_API_KEY") | |
# --- Balanced Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
MAX_STEPS = 4 # Reasonable steps | |
MAX_TOKENS = 150 # Enough for reasoning | |
MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct" | |
TIMEOUT_PER_QUESTION = 25 # 25 seconds - enough time | |
MAX_CONTEXT = 1500 # Reasonable context | |
# --- Configure Environment --- | |
os.environ["PIP_BREAK_SYSTEM_PACKAGES"] = "1" | |
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" | |
os.environ["BITSANDBYTES_NOWELCOME"] = "1" | |
print("Loading model (BALANCED FAST mode)...") | |
start_time = time.time() | |
model = AutoModelForCausalLM.from_pretrained( | |
MODEL_NAME, | |
trust_remote_code=True, | |
torch_dtype=torch.float32, | |
device_map="cpu", | |
low_cpu_mem_usage=True, | |
use_cache=False | |
) | |
tokenizer = AutoTokenizer.from_pretrained( | |
MODEL_NAME, | |
use_fast=True, | |
trust_remote_code=True | |
) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
load_time = time.time() - start_time | |
print(f"Model loaded in {load_time:.2f} seconds") | |
# --- Reliable Tools --- | |
def web_search(query: str) -> str: | |
"""Fast but reliable web search""" | |
try: | |
if SERPER_API_KEY: | |
params = {'q': query[:150], 'num': 2} | |
headers = {'X-API-KEY': SERPER_API_KEY, 'Content-Type': 'application/json'} | |
response = requests.post( | |
'https://google.serper.dev/search', | |
headers=headers, | |
json=params, | |
timeout=8 | |
) | |
results = response.json() | |
if 'organic' in results and results['organic']: | |
output = [] | |
for r in results['organic'][:2]: | |
output.append(f"{r['title']}: {r['snippet']}") | |
return " | ".join(output) | |
return "No search results found" | |
else: | |
with DDGS() as ddgs: | |
results = [] | |
for r in ddgs.text(query, max_results=2): | |
results.append(f"{r['title']}: {r['body'][:200]}") | |
return " | ".join(results) if results else "No search results" | |
except Exception as e: | |
return f"Search failed: {str(e)}" | |
def calculator(expression: str) -> str: | |
"""Reliable calculator""" | |
try: | |
# Clean the expression but keep more characters | |
clean_expr = re.sub(r'[^0-9+\-*/().\s]', '', str(expression)) | |
if not clean_expr.strip(): | |
return "Invalid mathematical expression" | |
# Use numexpr for safety | |
result = numexpr.evaluate(clean_expr) | |
return str(float(result)) | |
except Exception as e: | |
return f"Calculation error: {str(e)}" | |
def read_pdf(file_path: str) -> str: | |
"""PDF reader with better error handling""" | |
try: | |
text = extract_text(file_path) | |
if text: | |
return text[:800] # More text for context | |
return "No text could be extracted from PDF" | |
except Exception as e: | |
return f"PDF reading error: {str(e)}" | |
def read_webpage(url: str) -> str: | |
"""Reliable webpage reader""" | |
try: | |
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} | |
response = requests.get(url, timeout=8, headers=headers) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for script in soup(["script", "style"]): | |
script.decompose() | |
text = soup.get_text(separator=' ', strip=True) | |
return text[:800] if text else "No content found on webpage" | |
except Exception as e: | |
return f"Webpage error: {str(e)}" | |
TOOLS = { | |
"web_search": web_search, | |
"calculator": calculator, | |
"read_pdf": read_pdf, | |
"read_webpage": read_webpage | |
} | |
# --- Balanced GAIA Agent --- | |
class BalancedGAIA_Agent: | |
def __init__(self): | |
self.tools = TOOLS | |
self.system_prompt = ( | |
"You are a GAIA problem solver. Available tools: web_search, calculator, read_pdf, read_webpage.\n" | |
"Think step by step and use tools when needed.\n\n" | |
"Tool usage format:\n" | |
"```json\n{\"tool\": \"tool_name\", \"args\": {\"parameter\": \"value\"}}\n```\n\n" | |
"Always end with: Final Answer: [your exact answer]\n\n" | |
"Example:\n" | |
"Question: What is 15 * 23?\n" | |
"I need to calculate 15 * 23.\n" | |
"```json\n{\"tool\": \"calculator\", \"args\": {\"expression\": \"15 * 23\"}}\n```\n" | |
"Final Answer: 345" | |
) | |
def __call__(self, question: str) -> str: | |
start_time = time.time() | |
print(f"π€ Solving: {question[:60]}...") | |
try: | |
conversation = [f"Question: {question}"] | |
for step in range(MAX_STEPS): | |
# Check timeout but be more generous | |
if time.time() - start_time > TIMEOUT_PER_QUESTION: | |
print(f"β° Timeout after {TIMEOUT_PER_QUESTION}s") | |
return "TIMEOUT: Question took too long to solve" | |
# Generate response | |
response = self._generate_response(conversation) | |
print(f"Step {step+1}: {response[:80]}...") | |
# Check for final answer | |
if "Final Answer:" in response: | |
answer = self._extract_final_answer(response) | |
elapsed = time.time() - start_time | |
print(f"β Solved in {elapsed:.1f}s: {answer[:50]}...") | |
return answer | |
# Try to use tools | |
tool_result = self._execute_tools(response) | |
if tool_result: | |
conversation.append(f"Tool used: {tool_result}") | |
print(f"π§ Tool result: {tool_result[:60]}...") | |
else: | |
conversation.append(f"Reasoning: {response}") | |
# Keep conversation manageable | |
if len(" ".join(conversation)) > 1200: | |
conversation = conversation[-3:] # Keep last 3 entries | |
print("β No solution found within step limit") | |
return "Could not solve within step limit" | |
except Exception as e: | |
print(f"π₯ Agent error: {str(e)}") | |
return f"Agent error: {str(e)}" | |
def _generate_response(self, conversation: List[str]) -> str: | |
try: | |
# Build prompt | |
prompt = f"<|system|>\n{self.system_prompt}<|end|>\n" | |
prompt += f"<|user|>\n{chr(10).join(conversation)}<|end|>\n" | |
prompt += "<|assistant|>" | |
# Tokenize | |
inputs = tokenizer( | |
prompt, | |
return_tensors="pt", | |
truncation=True, | |
max_length=MAX_CONTEXT, | |
padding=False | |
) | |
# Generate | |
generation_config = GenerationConfig( | |
max_new_tokens=MAX_TOKENS, | |
temperature=0.2, # Lower temperature for more focused responses | |
do_sample=True, | |
pad_token_id=tokenizer.pad_token_id, | |
eos_token_id=tokenizer.eos_token_id, | |
use_cache=False | |
) | |
with torch.no_grad(): | |
outputs = model.generate( | |
inputs.input_ids, | |
generation_config=generation_config, | |
attention_mask=inputs.attention_mask | |
) | |
# Decode | |
full_response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
response = full_response.split("<|assistant|>")[-1].strip() | |
# Cleanup | |
del inputs, outputs | |
gc.collect() | |
return response | |
except Exception as e: | |
return f"Generation error: {str(e)}" | |
def _extract_final_answer(self, text: str) -> str: | |
"""Extract the final answer more reliably""" | |
try: | |
if "Final Answer:" in text: | |
answer_part = text.split("Final Answer:")[-1].strip() | |
# Take first line of the answer | |
answer = answer_part.split('\n')[0].strip() | |
return answer if answer else "No answer provided" | |
return "No final answer found" | |
except: | |
return "Answer extraction failed" | |
def _execute_tools(self, text: str) -> str: | |
"""Execute tools found in the response""" | |
try: | |
# Look for JSON tool calls | |
json_pattern = r'```json\s*(\{[^}]*\})\s*```' | |
matches = re.findall(json_pattern, text, re.DOTALL) | |
for match in matches: | |
try: | |
tool_call = json.loads(match) | |
tool_name = tool_call.get("tool") | |
args = tool_call.get("args", {}) | |
if tool_name in self.tools: | |
print(f"π§ Executing {tool_name} with {args}") | |
result = self.tools[tool_name](**args) | |
return f"{tool_name}: {str(result)[:400]}" | |
except json.JSONDecodeError: | |
continue | |
except Exception as e: | |
return f"Tool execution error: {str(e)}" | |
return None | |
except Exception as e: | |
return f"Tool parsing error: {str(e)}" | |
# --- Efficient Runner --- | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
if not profile: | |
return "β Please login to Hugging Face first", None | |
username = profile.username | |
print(f"π Starting evaluation for user: {username}") | |
# Initialize agent | |
try: | |
agent = BalancedGAIA_Agent() | |
except Exception as e: | |
return f"β Failed to initialize agent: {e}", None | |
# Setup | |
api_url = DEFAULT_API_URL | |
space_id = os.getenv("SPACE_ID", "unknown") | |
# Fetch questions | |
try: | |
print("π₯ Fetching questions...") | |
response = requests.get(f"{api_url}/questions", timeout=15) | |
response.raise_for_status() | |
questions = response.json() | |
print(f"π Retrieved {len(questions)} questions") | |
except Exception as e: | |
return f"β Failed to fetch questions: {e}", None | |
# Process questions | |
results = [] | |
answers = [] | |
total_start = time.time() | |
for i, item in enumerate(questions): | |
task_id = item.get("task_id") | |
question = item.get("question", "") | |
if not task_id: | |
continue | |
print(f"\nπ [{i+1}/{len(questions)}] Task: {task_id}") | |
try: | |
answer = agent(question) | |
answers.append({"task_id": task_id, "submitted_answer": answer}) | |
# Truncate for display | |
q_display = question[:80] + "..." if len(question) > 80 else question | |
a_display = answer[:100] + "..." if len(answer) > 100 else answer | |
results.append({ | |
"Task": task_id[:8] + "...", | |
"Question": q_display, | |
"Answer": a_display, | |
"Status": "β " if "error" not in answer.lower() and "timeout" not in answer.lower() else "β" | |
}) | |
except Exception as e: | |
error_answer = f"PROCESSING_ERROR: {str(e)}" | |
answers.append({"task_id": task_id, "submitted_answer": error_answer}) | |
results.append({ | |
"Task": task_id[:8] + "...", | |
"Question": question[:80] + "..." if len(question) > 80 else question, | |
"Answer": error_answer, | |
"Status": "π₯" | |
}) | |
# Memory cleanup | |
if i % 3 == 0: | |
gc.collect() | |
total_time = time.time() - total_start | |
avg_time = total_time / len(questions) | |
print(f"\nβ±οΈ Total processing time: {total_time:.1f}s ({avg_time:.1f}s per question)") | |
# Submit results | |
try: | |
print("π€ Submitting results...") | |
submission = { | |
"username": username, | |
"agent_code": f"https://huggingface.co/spaces/{space_id}/tree/main", | |
"answers": answers | |
} | |
response = requests.post(f"{api_url}/submit", json=submission, timeout=60) | |
response.raise_for_status() | |
result = response.json() | |
# Calculate success rate | |
successful = sum(1 for r in results if r["Status"] == "β ") | |
success_rate = (successful / len(results)) * 100 | |
status = ( | |
f"π― EVALUATION COMPLETED\n" | |
f"π€ User: {result.get('username', username)}\n" | |
f"π Score: {result.get('score', 'N/A')}% " | |
f"({result.get('correct_count', '?')}/{result.get('total_attempted', '?')} correct)\n" | |
f"β‘ Processing: {total_time:.1f}s total, {avg_time:.1f}s/question\n" | |
f"β Success Rate: {success_rate:.1f}% ({successful}/{len(results)} processed)\n" | |
f"π¬ Message: {result.get('message', 'Evaluation completed!')}" | |
) | |
return status, pd.DataFrame(results) | |
except Exception as e: | |
error_status = ( | |
f"β SUBMISSION FAILED\n" | |
f"Error: {str(e)}\n" | |
f"β±οΈ Processing completed in {total_time:.1f}s\n" | |
f"β Questions processed: {len(results)}" | |
) | |
return error_status, pd.DataFrame(results) | |
# --- Clean UI --- | |
with gr.Blocks(title="GAIA Agent - Balanced Fast") as demo: | |
gr.Markdown("# β‘ GAIA Agent - Balanced Fast Mode") | |
gr.Markdown( | |
""" | |
**Optimized for reliability and speed:** | |
- 4 reasoning steps max | |
- 25 second timeout per question | |
- 150 token responses | |
- Enhanced error handling | |
""" | |
) | |
with gr.Row(): | |
gr.LoginButton() | |
with gr.Row(): | |
run_btn = gr.Button("π Run Balanced Evaluation", variant="primary", size="lg") | |
with gr.Row(): | |
status = gr.Textbox( | |
label="π Evaluation Status & Results", | |
lines=8, | |
interactive=False, | |
placeholder="Ready to run evaluation. Please login first." | |
) | |
with gr.Row(): | |
table = gr.DataFrame( | |
label="π Question Results", | |
interactive=False, | |
wrap=True | |
) | |
run_btn.click( | |
fn=run_and_submit_all, | |
outputs=[status, table], | |
show_progress=True | |
) | |
if __name__ == "__main__": | |
print("β‘ GAIA Agent - Balanced Fast Mode Starting...") | |
print(f"βοΈ Settings: {MAX_STEPS} steps, {MAX_TOKENS} tokens, {TIMEOUT_PER_QUESTION}s timeout") | |
demo.launch( | |
share=True, | |
server_name="0.0.0.0", | |
server_port=7860, | |
debug=False, | |
show_error=True | |
) |