import os import gradio as gr import requests import json import re import numexpr import pandas as pd import math import pdfminer from duckduckgo_search import DDGS from pdfminer.high_level import extract_text from bs4 import BeautifulSoup import html2text from typing import Dict, Any, List, Tuple, Callable, Optional from dotenv import load_dotenv from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig import torch import time import gc # --- Load Environment Variables --- load_dotenv() SERPER_API_KEY = os.getenv("SERPER_API_KEY") # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" MAX_STEPS = 6 MAX_TOKENS = 256 MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct" # --- Configure Environment for Hugging Face Spaces --- os.environ["PIP_BREAK_SYSTEM_PACKAGES"] = "1" os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" os.environ["BITSANDBYTES_NOWELCOME"] = "1" print("Loading model (CPU-compatible)...") start_time = time.time() # Load model with explicit configuration for better compatibility model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, trust_remote_code=True, torch_dtype=torch.float32, # Use float32 for CPU compatibility device_map="cpu", # Explicitly set to CPU low_cpu_mem_usage=True, # Optimize for low memory usage use_cache=False # Disable cache to avoid DynamicCache issues ) tokenizer = AutoTokenizer.from_pretrained( MODEL_NAME, use_fast=False, trust_remote_code=True ) # Ensure pad token is set if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token load_time = time.time() - start_time print(f"Model loaded in {load_time:.2f} seconds") # --- Tools for GAIA Agent --- def web_search(query: str) -> str: """Search the web using DuckDuckGo or Serper API""" try: if SERPER_API_KEY: # Use Serper API if key is available params = { 'q': query, 'num': 3, 'hl': 'en', 'gl': 'us' } headers = { 'X-API-KEY': SERPER_API_KEY, 'Content-Type': 'application/json' } response = requests.post( 'https://google.serper.dev/search', headers=headers, json=params, timeout=10 ) results = response.json() if 'organic' in results: return json.dumps([r['title'] + ": " + r['snippet'] for r in results['organic'][:3]]) return "No results found" else: # Fallback to DuckDuckGo with DDGS() as ddgs: results = [r for r in ddgs.text(query, max_results=3)] return json.dumps([r['title'] + ": " + r['body'] for r in results]) except Exception as e: return f"Search error: {str(e)}" def calculator(expression: str) -> str: """Evaluate mathematical expressions safely""" try: # Clean the expression expression = re.sub(r'[^\d+\-*/().\s]', '', expression) result = numexpr.evaluate(expression) return str(result) except Exception as e: return f"Calculation error: {str(e)}" def read_pdf(file_path: str) -> str: """Extract text from PDF files""" try: text = extract_text(file_path) return text[:2000] if text else "No text found in PDF" except Exception as e: return f"PDF read error: {str(e)}" def read_webpage(url: str) -> str: """Fetch and extract text from web pages""" try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, timeout=10, headers=headers) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') # Remove script and style elements for script in soup(["script", "style"]): script.decompose() text = soup.get_text(separator=' ', strip=True) return text[:2000] if text else "No text found on webpage" except Exception as e: return f"Webpage read error: {str(e)}" TOOLS = { "web_search": web_search, "calculator": calculator, "read_pdf": read_pdf, "read_webpage": read_webpage } # --- GAIA Agent Implementation --- class GAIA_Agent: def __init__(self): self.tools = TOOLS self.history = [] self.system_prompt = ( "You are an expert GAIA problem solver. Use these tools: {web_search, calculator, read_pdf, read_webpage}.\n" "Guidelines:\n" "1. Think step-by-step. Explain reasoning\n" "2. Use tools for calculations, searches, or file operations\n" "3. Tools must be called as: ```json\n{'tool': 'tool_name', 'args': {'arg1': value}}```\n" "4. Final Answer must be exact and standalone\n\n" "Example:\n" "Question: \"What's the population density of France? (File: france_data.pdf)\"\n" "Thought: Need population and area. Read PDF first.\n" "Action: ```json\n{'tool': 'read_pdf', 'args': {'file_path': 'france_data.pdf'}}```\n" "Observation: Population: 67.8M, Area: 643,801 km²\n" "Thought: Now calculate density: 67,800,000 / 643,801\n" "Action: ```json\n{'tool': 'calculator', 'args': {'expression': '67800000 / 643801'}}```\n" "Observation: 105.32\n" "Final Answer: 105.32 people/km²" ) def __call__(self, question: str) -> str: print(f"\nProcessing: {question[:80]}...") self.history = [f"Question: {question}"] try: for step in range(MAX_STEPS): prompt = self._build_prompt() response = self._call_model(prompt) if "Final Answer" in response: answer = response.split("Final Answer:")[-1].strip() print(f"Final Answer: {answer}") return answer tool_call = self._parse_tool_call(response) if tool_call: tool_name, args = tool_call observation = self._use_tool(tool_name, args) self.history.append(f"Observation: {observation}") else: self.history.append(f"Thought: {response}") # Clean up memory after each step if step % 2 == 0: gc.collect() return "Agent couldn't find solution within step limit" except Exception as e: print(f"Error in agent execution: {str(e)}") return f"Agent error: {str(e)}" def _build_prompt(self) -> str: prompt = "<|system|>\n" + self.system_prompt + "<|end|>\n" prompt += "<|user|>\n" + "\n".join(self.history) + "<|end|>\n" prompt += "<|assistant|>" return prompt def _call_model(self, prompt: str) -> str: start_time = time.time() try: # Tokenize input inputs = tokenizer( prompt, return_tensors="pt", return_attention_mask=True, truncation=True, max_length=3072 # Leave room for generation ) # Move to same device as model inputs = {k: v.to(model.device) for k, v in inputs.items()} # Create generation config generation_config = GenerationConfig( max_new_tokens=MAX_TOKENS, temperature=0.01, do_sample=True, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, use_cache=False # Disable cache to avoid DynamicCache issues ) # Generate response with torch.no_grad(): outputs = model.generate( **inputs, generation_config=generation_config ) # Decode response full_response = tokenizer.decode(outputs[0], skip_special_tokens=True) response = full_response.split("<|assistant|>")[-1].strip() gen_time = time.time() - start_time print(f"Generated {len(response)} tokens in {gen_time:.2f}s: {response[:60]}...") # Clean up del inputs, outputs gc.collect() return response except Exception as e: print(f"Model generation error: {str(e)}") return f"Generation error: {str(e)}" def _parse_tool_call(self, text: str) -> Optional[Tuple[str, Dict]]: try: json_match = re.search(r'```json\s*({.*?})\s*```', text, re.DOTALL) if json_match: tool_call = json.loads(json_match.group(1)) if "tool" in tool_call and "args" in tool_call: return tool_call["tool"], tool_call["args"] except Exception as e: print(f"Tool parse error: {str(e)}") return None def _use_tool(self, tool_name: str, args: Dict) -> str: if tool_name not in self.tools: return f"Error: Unknown tool {tool_name}" print(f"Using tool: {tool_name}({args})") try: start_time = time.time() result = self.tools[tool_name](**args) exec_time = time.time() - start_time print(f"Tool executed in {exec_time:.2f}s") return str(result)[:500] # Truncate long outputs except Exception as e: return f"Tool error: {str(e)}" # --- Evaluation Runner --- def run_and_submit_all(profile: gr.OAuthProfile | None): """Fetches questions, runs agent, submits answers, and displays results""" space_id = os.getenv("SPACE_ID") if profile: username = f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" try: agent = GAIA_Agent() except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(f"Agent code URL: {agent_code}") # Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=30) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None except Exception as e: print(f"An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None # Run Agent results_log = [] answers_payload = [] print(f"Running agent on {len(questions_data)} questions...") for i, item in enumerate(questions_data): task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue try: print(f"Processing question {i+1}/{len(questions_data)}") submitted_answer = agent(question_text) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({ "Task ID": task_id, "Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, "Submitted Answer": submitted_answer }) # Clean up memory periodically if i % 5 == 0: gc.collect() except Exception as e: print(f"Error running agent on task {task_id}: {e}") error_answer = f"AGENT ERROR: {str(e)}" answers_payload.append({"task_id": task_id, "submitted_answer": error_answer}) results_log.append({ "Task ID": task_id, "Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, "Submitted Answer": error_answer }) if not answers_payload: print("Agent did not produce any answers to submit.") return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) # Prepare Submission submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": answers_payload } status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) # Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=120) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df # --- Gradio Interface --- with gr.Blocks(title="GAIA Agent Evaluation") as demo: gr.Markdown("# GAIA Agent Evaluation Runner") gr.Markdown( """ **Instructions:** 1. Log in to your Hugging Face account using the button below 2. Click 'Run Evaluation & Submit All Answers' to start the evaluation 3. View results and score in the output sections **Agent Information:** - Model: Phi-3-mini-4k-instruct (CPU optimized) - Tools: Web Search, Calculator, PDF Reader, Webpage Reader - Max Steps: 6 per question - Memory: Optimized for 2vCPU/16GB environment """ ) with gr.Row(): gr.LoginButton() with gr.Row(): run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary", size="lg") with gr.Row(): status_output = gr.Textbox( label="Evaluation Status & Submission Result", lines=5, interactive=False, placeholder="Click the button above to start evaluation..." ) with gr.Row(): results_table = gr.DataFrame( label="Questions and Agent Answers", wrap=True, interactive=False ) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table], show_progress=True ) if __name__ == "__main__": print("\n" + "="*50) print("GAIA Agent Evaluation System Starting") print("="*50) space_host = os.getenv("SPACE_HOST") space_id = os.getenv("SPACE_ID") if space_host: print(f"✅ SPACE_HOST found: {space_host}") else: print("⚠️ SPACE_HOST not found") if space_id: print(f"✅ SPACE_ID found: {space_id}") else: print("⚠️ SPACE_ID not found") print("="*50) print("Launching Gradio Interface...") demo.launch( debug=False, # Disable debug in production share=False, server_name="0.0.0.0", server_port=7860, show_error=True )