import os import gradio as gr import requests import json import re import numexpr import pandas as pd import math import pdfminer from duckduckgo_search import DDGS from pdfminer.high_level import extract_text from bs4 import BeautifulSoup import html2text from typing import Dict, Any, List, Tuple, Callable, Optional from dotenv import load_dotenv from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig import torch import time import gc import warnings # Suppress warnings warnings.filterwarnings("ignore") os.environ["TOKENIZERS_PARALLELISM"] = "false" # --- Load Environment Variables --- load_dotenv() SERPER_API_KEY = os.getenv("SERPER_API_KEY") # --- Balanced Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" MAX_STEPS = 4 # Reasonable steps MAX_TOKENS = 150 # Enough for reasoning MODEL_NAME = "microsoft/Phi-3-mini-4k-instruct" TIMEOUT_PER_QUESTION = 25 # 25 seconds - enough time MAX_CONTEXT = 1500 # Reasonable context # --- Configure Environment --- os.environ["PIP_BREAK_SYSTEM_PACKAGES"] = "1" os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" os.environ["BITSANDBYTES_NOWELCOME"] = "1" print("Loading model (BALANCED FAST mode)...") start_time = time.time() model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, trust_remote_code=True, torch_dtype=torch.float32, device_map="cpu", low_cpu_mem_usage=True, use_cache=False ) tokenizer = AutoTokenizer.from_pretrained( MODEL_NAME, use_fast=True, trust_remote_code=True ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token load_time = time.time() - start_time print(f"Model loaded in {load_time:.2f} seconds") # --- Reliable Tools --- def web_search(query: str) -> str: """Fast but reliable web search""" try: if SERPER_API_KEY: params = {'q': query[:150], 'num': 2} headers = {'X-API-KEY': SERPER_API_KEY, 'Content-Type': 'application/json'} response = requests.post( 'https://google.serper.dev/search', headers=headers, json=params, timeout=8 ) results = response.json() if 'organic' in results and results['organic']: output = [] for r in results['organic'][:2]: output.append(f"{r['title']}: {r['snippet']}") return " | ".join(output) return "No search results found" else: with DDGS() as ddgs: results = [] for r in ddgs.text(query, max_results=2): results.append(f"{r['title']}: {r['body'][:200]}") return " | ".join(results) if results else "No search results" except Exception as e: return f"Search failed: {str(e)}" def calculator(expression: str) -> str: """Reliable calculator""" try: # Clean the expression but keep more characters clean_expr = re.sub(r'[^0-9+\-*/().\s]', '', str(expression)) if not clean_expr.strip(): return "Invalid mathematical expression" # Use numexpr for safety result = numexpr.evaluate(clean_expr) return str(float(result)) except Exception as e: return f"Calculation error: {str(e)}" def read_pdf(file_path: str) -> str: """PDF reader with better error handling""" try: text = extract_text(file_path) if text: return text[:800] # More text for context return "No text could be extracted from PDF" except Exception as e: return f"PDF reading error: {str(e)}" def read_webpage(url: str) -> str: """Reliable webpage reader""" try: headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} response = requests.get(url, timeout=8, headers=headers) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') for script in soup(["script", "style"]): script.decompose() text = soup.get_text(separator=' ', strip=True) return text[:800] if text else "No content found on webpage" except Exception as e: return f"Webpage error: {str(e)}" TOOLS = { "web_search": web_search, "calculator": calculator, "read_pdf": read_pdf, "read_webpage": read_webpage } # --- Balanced GAIA Agent --- class BalancedGAIA_Agent: def __init__(self): self.tools = TOOLS self.system_prompt = ( "You are a GAIA problem solver. Available tools: web_search, calculator, read_pdf, read_webpage.\n" "Think step by step and use tools when needed.\n\n" "Tool usage format:\n" "```json\n{\"tool\": \"tool_name\", \"args\": {\"parameter\": \"value\"}}\n```\n\n" "Always end with: Final Answer: [your exact answer]\n\n" "Example:\n" "Question: What is 15 * 23?\n" "I need to calculate 15 * 23.\n" "```json\n{\"tool\": \"calculator\", \"args\": {\"expression\": \"15 * 23\"}}\n```\n" "Final Answer: 345" ) def __call__(self, question: str) -> str: start_time = time.time() print(f"šŸ¤” Solving: {question[:60]}...") try: conversation = [f"Question: {question}"] for step in range(MAX_STEPS): # Check timeout but be more generous if time.time() - start_time > TIMEOUT_PER_QUESTION: print(f"ā° Timeout after {TIMEOUT_PER_QUESTION}s") return "TIMEOUT: Question took too long to solve" # Generate response response = self._generate_response(conversation) print(f"Step {step+1}: {response[:80]}...") # Check for final answer if "Final Answer:" in response: answer = self._extract_final_answer(response) elapsed = time.time() - start_time print(f"āœ… Solved in {elapsed:.1f}s: {answer[:50]}...") return answer # Try to use tools tool_result = self._execute_tools(response) if tool_result: conversation.append(f"Tool used: {tool_result}") print(f"šŸ”§ Tool result: {tool_result[:60]}...") else: conversation.append(f"Reasoning: {response}") # Keep conversation manageable if len(" ".join(conversation)) > 1200: conversation = conversation[-3:] # Keep last 3 entries print("āŒ No solution found within step limit") return "Could not solve within step limit" except Exception as e: print(f"šŸ’„ Agent error: {str(e)}") return f"Agent error: {str(e)}" def _generate_response(self, conversation: List[str]) -> str: try: # Build prompt prompt = f"<|system|>\n{self.system_prompt}<|end|>\n" prompt += f"<|user|>\n{chr(10).join(conversation)}<|end|>\n" prompt += "<|assistant|>" # Tokenize inputs = tokenizer( prompt, return_tensors="pt", truncation=True, max_length=MAX_CONTEXT, padding=False ) # Generate generation_config = GenerationConfig( max_new_tokens=MAX_TOKENS, temperature=0.2, # Lower temperature for more focused responses do_sample=True, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, use_cache=False ) with torch.no_grad(): outputs = model.generate( inputs.input_ids, generation_config=generation_config, attention_mask=inputs.attention_mask ) # Decode full_response = tokenizer.decode(outputs[0], skip_special_tokens=True) response = full_response.split("<|assistant|>")[-1].strip() # Cleanup del inputs, outputs gc.collect() return response except Exception as e: return f"Generation error: {str(e)}" def _extract_final_answer(self, text: str) -> str: """Extract the final answer more reliably""" try: if "Final Answer:" in text: answer_part = text.split("Final Answer:")[-1].strip() # Take first line of the answer answer = answer_part.split('\n')[0].strip() return answer if answer else "No answer provided" return "No final answer found" except: return "Answer extraction failed" def _execute_tools(self, text: str) -> str: """Execute tools found in the response""" try: # Look for JSON tool calls json_pattern = r'```json\s*(\{[^}]*\})\s*```' matches = re.findall(json_pattern, text, re.DOTALL) for match in matches: try: tool_call = json.loads(match) tool_name = tool_call.get("tool") args = tool_call.get("args", {}) if tool_name in self.tools: print(f"šŸ”§ Executing {tool_name} with {args}") result = self.tools[tool_name](**args) return f"{tool_name}: {str(result)[:400]}" except json.JSONDecodeError: continue except Exception as e: return f"Tool execution error: {str(e)}" return None except Exception as e: return f"Tool parsing error: {str(e)}" # --- Efficient Runner --- def run_and_submit_all(profile: gr.OAuthProfile | None): if not profile: return "āŒ Please login to Hugging Face first", None username = profile.username print(f"šŸš€ Starting evaluation for user: {username}") # Initialize agent try: agent = BalancedGAIA_Agent() except Exception as e: return f"āŒ Failed to initialize agent: {e}", None # Setup api_url = DEFAULT_API_URL space_id = os.getenv("SPACE_ID", "unknown") # Fetch questions try: print("šŸ“„ Fetching questions...") response = requests.get(f"{api_url}/questions", timeout=15) response.raise_for_status() questions = response.json() print(f"šŸ“ Retrieved {len(questions)} questions") except Exception as e: return f"āŒ Failed to fetch questions: {e}", None # Process questions results = [] answers = [] total_start = time.time() for i, item in enumerate(questions): task_id = item.get("task_id") question = item.get("question", "") if not task_id: continue print(f"\nšŸ“‹ [{i+1}/{len(questions)}] Task: {task_id}") try: answer = agent(question) answers.append({"task_id": task_id, "submitted_answer": answer}) # Truncate for display q_display = question[:80] + "..." if len(question) > 80 else question a_display = answer[:100] + "..." if len(answer) > 100 else answer results.append({ "Task": task_id[:8] + "...", "Question": q_display, "Answer": a_display, "Status": "āœ…" if "error" not in answer.lower() and "timeout" not in answer.lower() else "āŒ" }) except Exception as e: error_answer = f"PROCESSING_ERROR: {str(e)}" answers.append({"task_id": task_id, "submitted_answer": error_answer}) results.append({ "Task": task_id[:8] + "...", "Question": question[:80] + "..." if len(question) > 80 else question, "Answer": error_answer, "Status": "šŸ’„" }) # Memory cleanup if i % 3 == 0: gc.collect() total_time = time.time() - total_start avg_time = total_time / len(questions) print(f"\nā±ļø Total processing time: {total_time:.1f}s ({avg_time:.1f}s per question)") # Submit results try: print("šŸ“¤ Submitting results...") submission = { "username": username, "agent_code": f"https://huggingface.co/spaces/{space_id}/tree/main", "answers": answers } response = requests.post(f"{api_url}/submit", json=submission, timeout=60) response.raise_for_status() result = response.json() # Calculate success rate successful = sum(1 for r in results if r["Status"] == "āœ…") success_rate = (successful / len(results)) * 100 status = ( f"šŸŽÆ EVALUATION COMPLETED\n" f"šŸ‘¤ User: {result.get('username', username)}\n" f"šŸ“Š Score: {result.get('score', 'N/A')}% " f"({result.get('correct_count', '?')}/{result.get('total_attempted', '?')} correct)\n" f"⚔ Processing: {total_time:.1f}s total, {avg_time:.1f}s/question\n" f"āœ… Success Rate: {success_rate:.1f}% ({successful}/{len(results)} processed)\n" f"šŸ’¬ Message: {result.get('message', 'Evaluation completed!')}" ) return status, pd.DataFrame(results) except Exception as e: error_status = ( f"āŒ SUBMISSION FAILED\n" f"Error: {str(e)}\n" f"ā±ļø Processing completed in {total_time:.1f}s\n" f"āœ… Questions processed: {len(results)}" ) return error_status, pd.DataFrame(results) # --- Clean UI --- with gr.Blocks(title="GAIA Agent - Balanced Fast") as demo: gr.Markdown("# ⚔ GAIA Agent - Balanced Fast Mode") gr.Markdown( """ **Optimized for reliability and speed:** - 4 reasoning steps max - 25 second timeout per question - 150 token responses - Enhanced error handling """ ) with gr.Row(): gr.LoginButton() with gr.Row(): run_btn = gr.Button("šŸš€ Run Balanced Evaluation", variant="primary", size="lg") with gr.Row(): status = gr.Textbox( label="šŸ“Š Evaluation Status & Results", lines=8, interactive=False, placeholder="Ready to run evaluation. Please login first." ) with gr.Row(): table = gr.DataFrame( label="šŸ“‹ Question Results", interactive=False, wrap=True ) run_btn.click( fn=run_and_submit_all, outputs=[status, table], show_progress=True ) if __name__ == "__main__": print("⚔ GAIA Agent - Balanced Fast Mode Starting...") print(f"āš™ļø Settings: {MAX_STEPS} steps, {MAX_TOKENS} tokens, {TIMEOUT_PER_QUESTION}s timeout") demo.launch( share=True, server_name="0.0.0.0", server_port=7860, debug=False, show_error=True )