import os import gradio as gr import requests import inspect import pandas as pd import json import re from typing import Dict, List, Any, Optional import urllib.parse from datetime import datetime import math # Transformers and torch imports from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline import torch # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" class EnhancedGAIAAgent: def __init__(self): print("Initializing Enhanced GAIA Agent with Mistral-7B...") # Initialize Mistral model try: print("Loading Mistral-7B-Instruct model...") self.tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.3") self.model = AutoModelForCausalLM.from_pretrained( "mistralai/Mistral-7B-Instruct-v0.3", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else None ) # Create pipeline for easier use self.pipe = pipeline( "text-generation", model=self.model, tokenizer=self.tokenizer, max_new_tokens=512, temperature=0.7, do_sample=True, pad_token_id=self.tokenizer.eos_token_id ) print("✅ Mistral model loaded successfully!") except Exception as e: print(f"❌ Error loading Mistral model: {e}") print("Falling back to basic responses...") self.pipe = None # Tool functions for GAIA tasks self.tools = { "calculate": self._calculate, "search_web": self._search_web, "parse_data": self._parse_data, "analyze_text": self._analyze_text, "solve_math": self._solve_math } def _calculate(self, expression: str) -> str: """Safe calculator for mathematical expressions""" try: # Clean and validate expression expression = re.sub(r'[^0-9+\-*/().\s]', '', expression) result = eval(expression) return str(result) except Exception as e: return f"Calculation error: {e}" def _search_web(self, query: str) -> str: """Simulate web search (placeholder - you'd integrate real search API)""" # This is a placeholder - integrate with actual search API return f"Search results for '{query}': [This would contain real search results]" def _parse_data(self, data: str) -> str: """Parse and analyze structured data""" try: # Try to parse as JSON if data.strip().startswith('{') or data.strip().startswith('['): parsed = json.loads(data) return f"Parsed data structure with {len(parsed) if isinstance(parsed, (list, dict)) else 1} elements" else: # Basic text analysis lines = data.split('\n') return f"Text data with {len(lines)} lines, {len(data.split())} words" except Exception as e: return f"Data parsing error: {e}" def _analyze_text(self, text: str) -> str: """Analyze text content""" words = text.split() sentences = text.split('.') return f"Text analysis: {len(words)} words, {len(sentences)} sentences" def _solve_math(self, problem: str) -> str: """Enhanced math problem solver""" try: # Extract numbers and operations numbers = re.findall(r'-?\d+\.?\d*', problem) # Handle common math patterns if "percent" in problem.lower() or "%" in problem: if len(numbers) >= 2: base = float(numbers[0]) percent = float(numbers[1]) result = base * (percent / 100) return str(result) if "average" in problem.lower() or "mean" in problem.lower(): if numbers: nums = [float(n) for n in numbers] return str(sum(nums) / len(nums)) # Default calculation return self._calculate(" ".join(numbers)) except Exception as e: return f"Math solving error: {e}" def _generate_response(self, prompt: str) -> str: """Generate response using Mistral model""" if not self.pipe: return "Model not available - using fallback response." try: messages = [ {"role": "user", "content": prompt} ] response = self.pipe(messages, max_new_tokens=512, temperature=0.7) # Extract the generated text if response and len(response) > 0: generated_text = response[0]['generated_text'] # Get only the assistant's response (after the user message) if isinstance(generated_text, list): # Find the assistant's response for msg in generated_text: if msg.get('role') == 'assistant': return msg.get('content', '') elif isinstance(generated_text, str): return generated_text else: return str(generated_text) return "No response generated." except Exception as e: print(f"Error generating response: {e}") return f"Error in response generation: {e}" def _detect_task_type(self, question: str) -> str: """Detect the type of task to apply appropriate strategy""" question_lower = question.lower() if any(word in question_lower for word in ["calculate", "compute", "math", "+", "-", "*", "/", "="]): return "calculation" elif any(word in question_lower for word in ["search", "find", "lookup", "google"]): return "search" elif any(word in question_lower for word in ["data", "csv", "json", "table", "parse"]): return "data_analysis" elif any(word in question_lower for word in ["percent", "%", "average", "mean", "sum"]): return "math_word_problem" else: return "general_reasoning" def __call__(self, question: str) -> str: print(f"Agent processing question (first 100 chars): {question[:100]}...") # Detect task type task_type = self._detect_task_type(question) print(f"Detected task type: {task_type}") # Build enhanced prompt based on task type if task_type == "calculation": enhanced_prompt = f""" You are a precise mathematical assistant. Solve this step-by-step: Question: {question} Provide a clear, accurate answer. If calculation is needed, show your work. Answer:""" elif task_type == "math_word_problem": enhanced_prompt = f""" You are solving a math word problem. Break it down step by step: Question: {question} Steps: 1. Identify what is being asked 2. Extract the relevant numbers 3. Determine the operation needed 4. Calculate the result 5. Provide the final answer Answer:""" elif task_type == "data_analysis": enhanced_prompt = f""" You are analyzing data. Approach this systematically: Question: {question} Consider: - What type of data is involved? - What analysis is needed? - What tools or methods should be used? Provide a clear, structured answer. Answer:""" else: enhanced_prompt = f""" You are a helpful assistant that provides accurate, well-reasoned answers. Question: {question} Think through this step-by-step and provide a clear, comprehensive answer. Answer:""" # Generate response using the model try: response = self._generate_response(enhanced_prompt) # Post-process response for specific task types if task_type in ["calculation", "math_word_problem"]: # Try to extract and verify any calculations numbers_in_response = re.findall(r'-?\d+\.?\d*', response) if numbers_in_response: # Attempt to verify calculation if simple enough pass print(f"Agent returning response (first 100 chars): {response[:100]}...") return response.strip() except Exception as e: print(f"Error in agent processing: {e}") fallback_response = self._handle_fallback(question, task_type) return fallback_response def _handle_fallback(self, question: str, task_type: str) -> str: """Provide fallback responses when the main model fails""" if task_type == "calculation": # Try to extract and calculate simple expressions try: numbers = re.findall(r'-?\d+\.?\d*', question) if len(numbers) >= 2: if "+" in question: result = sum(float(n) for n in numbers) return f"The sum is {result}" elif "*" in question or "multiply" in question.lower(): result = 1 for n in numbers: result *= float(n) return f"The product is {result}" except: pass return f"I understand you're asking about: {question}. This appears to be a {task_type} task. Let me provide my best analysis based on the available information." def run_and_submit_all(profile: gr.OAuthProfile | None): """ Fetches all questions, runs the EnhancedGAIAAgent on them, submits all answers, and displays the results. """ # --- Determine HF Space Runtime URL and Repo URL --- space_id = os.getenv("SPACE_ID") if profile: username = f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # 1. Instantiate Enhanced Agent try: print("Initializing Enhanced GAIA Agent...") agent = EnhancedGAIAAgent() print("✅ Agent initialized successfully!") except Exception as e: print(f"❌ Error instantiating agent: {e}") return f"Error initializing agent: {e}", None agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(f"Agent code URL: {agent_code}") # 2. Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"✅ Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: print(f"❌ Error fetching questions: {e}") return f"Error fetching questions: {e}", None except requests.exceptions.JSONDecodeError as e: print(f"❌ Error decoding JSON response from questions endpoint: {e}") return f"Error decoding server response for questions: {e}", None except Exception as e: print(f"❌ An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None # 3. Run Enhanced Agent results_log = [] answers_payload = [] print(f"🚀 Running enhanced agent on {len(questions_data)} questions...") for i, item in enumerate(questions_data, 1): task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"⚠️ Skipping item with missing task_id or question: {item}") continue print(f"📝 Processing question {i}/{len(questions_data)} (ID: {task_id})") try: submitted_answer = agent(question_text) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({ "Task ID": task_id, "Question": question_text[:200] + "..." if len(question_text) > 200 else question_text, "Submitted Answer": submitted_answer[:300] + "..." if len(submitted_answer) > 300 else submitted_answer }) print(f"✅ Completed question {i}") except Exception as e: print(f"❌ Error running agent on task {task_id}: {e}") error_response = f"AGENT ERROR: {e}" answers_payload.append({"task_id": task_id, "submitted_answer": error_response}) results_log.append({ "Task ID": task_id, "Question": question_text[:200] + "..." if len(question_text) > 200 else question_text, "Submitted Answer": error_response }) if not answers_payload: print("❌ Agent did not produce any answers to submit.") return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) # 4. Prepare Submission submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": answers_payload } print(f"📤 Submitting {len(answers_payload)} answers for user '{username}'...") # 5. Submit try: response = requests.post(submit_url, json=submission_data, timeout=120) # Increased timeout response.raise_for_status() result_data = response.json() final_status = ( f"🎉 Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("✅ Submission successful!") results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"❌ Submission Failed: {error_detail}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except Exception as e: status_message = f"❌ An unexpected error occurred during submission: {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df # --- Build Gradio Interface using Blocks --- with gr.Blocks(title="Enhanced GAIA Agent") as demo: gr.Markdown("# 🚀 Enhanced GAIA Agent with Mistral-7B") gr.Markdown( """ **Enhanced Features:** - 🧠 **Mistral-7B-Instruct** for advanced reasoning - 🔧 **Tool Integration** for calculations and data processing - 📊 **Task Type Detection** for optimized responses - 🎯 **GAIA-Optimized** prompting strategies **Instructions:** 1. Clone this space and ensure you have access to Mistral-7B-Instruct 2. Log in to your Hugging Face account using the button below 3. Click 'Run Enhanced Evaluation' to process all questions with the enhanced agent **Note:** The enhanced agent uses Mistral-7B which requires significant computational resources. Processing may take several minutes depending on the number of questions. """ ) with gr.Row(): gr.LoginButton() with gr.Row(): run_button = gr.Button("🚀 Run Enhanced Evaluation & Submit All Answers", variant="primary") status_output = gr.Textbox( label="📊 Run Status / Submission Result", lines=8, interactive=False ) results_table = gr.DataFrame( label="📝 Questions and Agent Answers", wrap=True, height=400 ) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table] ) if __name__ == "__main__": print("\n" + "="*50) print("🚀 ENHANCED GAIA AGENT STARTING") print("="*50) # Environment check space_host = os.getenv("SPACE_HOST") space_id = os.getenv("SPACE_ID") if space_host: print(f"✅ SPACE_HOST: {space_host}") print(f"🌐 Runtime URL: https://{space_host}.hf.space") else: print("ℹ️ Running locally - SPACE_HOST not found") if space_id: print(f"✅ SPACE_ID: {space_id}") print(f"📁 Repo URL: https://huggingface.co/spaces/{space_id}") else: print("ℹ️ SPACE_ID not found") # GPU/CPU check if torch.cuda.is_available(): print(f"🎮 GPU Available: {torch.cuda.get_device_name()}") print(f"💾 GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") else: print("💻 Running on CPU (GPU not available)") print("="*50) print("🚀 Launching Enhanced GAIA Agent Interface...") demo.launch(debug=True, share=False)