import os import gradio as gr import requests import json import re import numexpr import pandas as pd import math import pdfminer from duckduckgo_search import DDGS from pdfminer.high_level import extract_text from bs4 import BeautifulSoup import html2text from typing import Dict, Any, List, Tuple, Callable from dotenv import load_dotenv from transformers import AutoModelForCausalLM, AutoTokenizer import torch import time # --- Load Environment Variables --- load_dotenv() SERPER_API_KEY = os.getenv("SERPER_API_KEY") # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" MAX_STEPS = 6 MAX_TOKENS = 256 MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct" # --- Configure Environment for Hugging Face Spaces --- os.environ["PIP_BREAK_SYSTEM_PACKAGES"] = "1" os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" os.environ["BITSANDBYTES_NOWELCOME"] = "1" MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct" print("Loading model (CPU-compatible)...") start_time = time.time() model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, trust_remote_code=True, torch_dtype=torch.float32 # Use float32 for CPU compatibility ) tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) load_time = time.time() - start_time print(f"Model loaded in {load_time:.2f} seconds") # --- Tools for GAIA Agent --- def web_search(query: str) -> str: """Search the web using DuckDuckGo or Serper API""" try: if SERPER_API_KEY: # Use Serper API if key is available params = { 'q': query, 'num': 3, 'hl': 'en', 'gl': 'us' } headers = { 'X-API-KEY': SERPER_API_KEY, 'Content-Type': 'application/json' } response = requests.post( 'https://google.serper.dev/search', headers=headers, json=params ) results = response.json() if 'organic' in results: return json.dumps([r['title'] + ": " + r['snippet'] for r in results['organic'][:3]]) return "No results found" else: # Fallback to DuckDuckGo with DDGS() as ddgs: results = [r for r in ddgs.text(query, max_results=3)] return json.dumps([r['title'] + ": " + r['body'] for r in results]) except Exception as e: return f"Search error: {str(e)}" def calculator(expression: str) -> str: """Evaluate mathematical expressions safely""" try: return str(numexpr.evaluate(expression)) except Exception as e: return f"Calculation error: {str(e)}" def read_pdf(file_path: str) -> str: """Extract text from PDF files""" try: return extract_text(file_path)[:2000] # Limit to first 2000 characters except Exception as e: return f"PDF read error: {str(e)}" def read_webpage(url: str) -> str: """Fetch and extract text from web pages""" try: response = requests.get(url, timeout=10) soup = BeautifulSoup(response.text, 'html.parser') return soup.get_text(separator=' ', strip=True)[:2000] # Limit text except Exception as e: return f"Webpage read error: {str(e)}" TOOLS = { "web_search": web_search, "calculator": calculator, "read_pdf": read_pdf, "read_webpage": read_webpage } # --- GAIA Agent Implementation --- class GAIA_Agent: def __init__(self): self.tools = TOOLS self.history = [] self.system_prompt = ( "You are an expert GAIA problem solver. Use these tools: {web_search, calculator, read_pdf, read_webpage}.\n" "Guidelines:\n" "1. Think step-by-step. Explain reasoning\n" "2. Use tools for calculations, searches, or file operations\n" "3. Tools must be called as: ```json\n{'tool': 'tool_name', 'args': {'arg1': value}}```\n" "4. Final Answer must be exact and standalone\n\n" "Example:\n" "Question: \"What's the population density of France? (File: france_data.pdf)\"\n" "Thought: Need population and area. Read PDF first.\n" "Action: ```json\n{'tool': 'read_pdf', 'args': {'file_path': 'france_data.pdf'}}```\n" "Observation: Population: 67.8M, Area: 643,801 km²\n" "Thought: Now calculate density: 67,800,000 / 643,801\n" "Action: ```json\n{'tool': 'calculator', 'args': {'expression': '67800000 / 643801'}}```\n" "Observation: 105.32\n" "Final Answer: 105.32 people/km²" ) def __call__(self, question: str) -> str: print(f"\nProcessing: {question[:80]}...") self.history = [f"Question: {question}"] for step in range(MAX_STEPS): prompt = self._build_prompt() response = self._call_model(prompt) if "Final Answer" in response: answer = response.split("Final Answer:")[-1].strip() print(f"Final Answer: {answer}") return answer tool_call = self._parse_tool_call(response) if tool_call: tool_name, args = tool_call observation = self._use_tool(tool_name, args) self.history.append(f"Observation: {observation}") else: self.history.append(f"Thought: {response}") return "Agent couldn't find solution within step limit" def _build_prompt(self) -> str: prompt = "<|system|>\n" + self.system_prompt + "<|end|>\n" prompt += "<|user|>\n" + "\n".join(self.history) + "<|end|>\n" prompt += "<|assistant|>" return prompt def _call_model(self, prompt: str) -> str: start_time = time.time() inputs = tokenizer(prompt, return_tensors="pt", return_attention_mask=True).to(model.device) outputs = model.generate( **inputs, max_new_tokens=MAX_TOKENS, temperature=0.01, do_sample=True, pad_token_id=tokenizer.eos_token_id ) response = tokenizer.decode(outputs[0], skip_special_tokens=True) response = response.split("<|assistant|>")[-1].strip() gen_time = time.time() - start_time print(f"Generated {len(response)} tokens in {gen_time:.2f}s: {response[:60]}...") return response def _parse_tool_call(self, text: str) -> Tuple[str, Dict] or None: try: json_match = re.search(r'```json\s*({.*?})\s*```', text, re.DOTALL) if json_match: tool_call = json.loads(json_match.group(1)) return tool_call["tool"], tool_call["args"] except Exception as e: print(f"Tool parse error: {str(e)}") return None def _use_tool(self, tool_name: str, args: Dict) -> str: if tool_name not in self.tools: return f"Error: Unknown tool {tool_name}" print(f"Using tool: {tool_name}({args})") try: start_time = time.time() result = self.tools[tool_name](**args) exec_time = time.time() - start_time print(f"Tool executed in {exec_time:.2f}s") return str(result)[:500] # Truncate long outputs except Exception as e: return f"Tool error: {str(e)}" # --- Evaluation Runner --- def run_and_submit_all(profile: gr.OAuthProfile | None): """Fetches questions, runs agent, submits answers, and displays results""" space_id = os.getenv("SPACE_ID") if profile: username = f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" try: agent = GAIA_Agent() except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(agent_code) # Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None except Exception as e: print(f"An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None # Run Agent results_log = [] answers_payload = [] print(f"Running agent on {len(questions_data)} questions...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue try: submitted_answer = agent(question_text) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) except Exception as e: print(f"Error running agent on task {task_id}: {e}") results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) if not answers_payload: print("Agent did not produce any answers to submit.") return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) # Prepare Submission submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": answers_payload } status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) # Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df # --- Gradio Interface --- with gr.Blocks() as demo: gr.Markdown("# GAIA Agent Evaluation Runner") gr.Markdown( """ **Instructions:** 1. Log in to your Hugging Face account 2. Click 'Run Evaluation & Submit All Answers' 3. View results and score **Agent Info:** - Model: Phi-3-mini-4k-instruct (4-bit quantized) - Tools: Web Search, Calculator, PDF Reader, Webpage Reader - Max Steps: 6 """ ) gr.LoginButton() run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary") status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table] ) if __name__ == "__main__": print("\n" + "-"*30 + " App Starting " + "-"*30) space_host = os.getenv("SPACE_HOST") space_id = os.getenv("SPACE_ID") if space_host: print(f"✅ SPACE_HOST found: {space_host}") if space_id: print(f"✅ SPACE_ID found: {space_id}") print("-"*(60 + len(" App Starting ")) + "\n") print("Launching Gradio Interface...") demo.launch(debug=True, share=False)