import os import gradio as gr import requests import inspect import pandas as pd import json import re import io import base64 from PIL import Image import matplotlib.pyplot as plt import numpy as np from pathlib import Path from duckduckgo_search import DDGS # SmolaAgent imports from smolagents import CodeAgent, tool, PythonInterpreterTool from smolagents.models import LiteLLMModel # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # --- Enhanced Tools for GAIA --- from smolagents import tool from duckduckgo_search import DDGS @tool def web_search_tool(query: str) -> str: """ Perform a web search using DuckDuckGo and return top results. Args: query (str): Search query. Returns: str: Formatted search result string. """ try: with DDGS() as ddgs: results = ddgs.text(query, max_results=3) output = [] for r in results: output.append(f"Title: {r['title']}\nURL: {r['href']}\nSnippet: {r['body']}") return "\n\n".join(output) except Exception as e: return f"Web search failed: {str(e)}" @tool def calculator_tool(expression: str) -> str: """ Evaluate mathematical expressions safely. Args: expression: Mathematical expression as string Returns: Result of the calculation """ try: # Safe evaluation - only allow basic math operations allowed_chars = set('0123456789+-*/.() ') if not all(c in allowed_chars for c in expression.replace(' ', '')): return "Error: Expression contains invalid characters" result = eval(expression) return str(result) except Exception as e: return f"Calculation error: {str(e)}" @tool def image_analyzer_tool(image_path: str) -> str: """ Analyze images and extract information. Args: image_path: Path to the image file Returns: Description of image content """ try: if not os.path.exists(image_path): return "Error: Image file not found" img = Image.open(image_path) # Basic image analysis width, height = img.size mode = img.mode format_info = img.format if img.format else "Unknown" # Simple color analysis if mode == 'RGB': colors = img.getcolors(maxcolors=256*256*256) if colors: dominant_color = max(colors, key=lambda x: x[0])[1] color_info = f"Dominant color: RGB{dominant_color}" else: color_info = "Complex color palette" else: color_info = f"Color mode: {mode}" analysis = f"""Image Analysis: - Dimensions: {width}x{height} pixels - Format: {format_info} - {color_info} - File size: {os.path.getsize(image_path)} bytes """ return analysis except Exception as e: return f"Image analysis error: {str(e)}" @tool def file_reader_tool(file_path: str) -> str: """ Read and analyze various file types (text, CSV, JSON, etc.). Args: file_path: Path to the file Returns: File content or analysis """ try: if not os.path.exists(file_path): return "Error: File not found" file_ext = Path(file_path).suffix.lower() if file_ext == '.csv': df = pd.read_csv(file_path) return f"CSV file with {len(df)} rows and {len(df.columns)} columns.\nColumns: {list(df.columns)}\nFirst 5 rows:\n{df.head().to_string()}" elif file_ext == '.json': with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return f"JSON file content:\n{json.dumps(data, indent=2)[:1000]}..." elif file_ext in ['.txt', '.md', '.py', '.js', '.html', '.css']: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return f"Text file content ({len(content)} characters):\n{content[:1000]}..." else: return f"Binary file: {file_ext}, size: {os.path.getsize(file_path)} bytes" except Exception as e: return f"File reading error: {str(e)}" @tool def data_processor_tool(data: str, operation: str) -> str: """ Process data with various operations (sort, filter, calculate statistics). Args: data: Data as string (JSON, CSV format, or numbers) operation: Operation to perform (sort, sum, average, count, etc.) Returns: Processed data result """ try: # Try to parse as JSON first try: parsed_data = json.loads(data) except: # Try to parse as numbers try: parsed_data = [float(x.strip()) for x in data.replace(',', ' ').split() if x.strip()] except: return "Error: Could not parse data" if operation.lower() == 'sum' and isinstance(parsed_data, list): return str(sum([x for x in parsed_data if isinstance(x, (int, float))])) elif operation.lower() == 'average' and isinstance(parsed_data, list): nums = [x for x in parsed_data if isinstance(x, (int, float))] return str(sum(nums) / len(nums) if nums else 0) elif operation.lower() == 'count': return str(len(parsed_data)) elif operation.lower() == 'sort' and isinstance(parsed_data, list): return str(sorted(parsed_data)) elif operation.lower() == 'max' and isinstance(parsed_data, list): nums = [x for x in parsed_data if isinstance(x, (int, float))] return str(max(nums) if nums else "No numbers found") elif operation.lower() == 'min' and isinstance(parsed_data, list): nums = [x for x in parsed_data if isinstance(x, (int, float))] return str(min(nums) if nums else "No numbers found") else: return f"Unsupported operation: {operation}" except Exception as e: return f"Data processing error: {str(e)}" # --- Enhanced GAIA Agent --- class GAIAAgent: def __init__(self): print("GAIAAgent initialized with SmolaAgent framework.") # Initialize model - using a lightweight model for resource efficiency try: # Use HuggingFace's free inference API or local model self.model = LiteLLMModel( model_id="microsoft/DialoGPT-medium", # Lightweight model max_tokens=512, temperature=0.1 ) except: # Fallback to a basic model print("Warning: Using fallback model configuration") self.model = None # Initialize tools self.tools = [ web_search_tool, calculator_tool, image_analyzer_tool, file_reader_tool, data_processor_tool, PythonInterpreterTool() ] # Initialize the agent try: self.agent = CodeAgent( tools=self.tools, model=self.model, verbosity_level=1 ) except Exception as e: print(f"Agent initialization error: {e}") self.agent = None def __call__(self, question: str) -> str: print(f"GAIAAgent processing question: {question[:100]}...") if not self.agent: # Fallback logic if agent failed to initialize return self._fallback_processing(question) try: # Enhanced prompt for GAIA tasks enhanced_prompt = f""" You are a helpful AI assistant designed to solve complex real-world problems that may require: - Web searching for current information - Mathematical calculations - Image analysis - File processing - Multi-step reasoning Question: {question} Please approach this systematically: 1. Analyze what type of problem this is 2. Determine what tools/information you need 3. Use available tools to gather information 4. Reason through the problem step by step 5. Provide a clear, concise final answer Remember to be precise and factual in your response. """ response = self.agent.run(enhanced_prompt) # Extract the final answer if it's in the response if isinstance(response, str): # Look for common answer patterns answer_patterns = [ r"Final answer:?\s*(.+)", r"Answer:?\s*(.+)", r"The answer is:?\s*(.+)", r"Result:?\s*(.+)" ] for pattern in answer_patterns: match = re.search(pattern, response, re.IGNORECASE) if match: return match.group(1).strip() # If no pattern found, return the last sentence or the whole response sentences = response.split('.') return sentences[-1].strip() if sentences else response return str(response) except Exception as e: print(f"Error in agent processing: {e}") return self._fallback_processing(question) def _fallback_processing(self, question: str) -> str: """Fallback processing when main agent fails""" try: # Simple heuristic-based processing question_lower = question.lower() # Math questions if any(op in question for op in ['+', '-', '*', '/', 'calculate', 'sum', 'average']): # Extract numbers and try basic calculation numbers = re.findall(r'-?\d+\.?\d*', question) if len(numbers) >= 2: try: if 'sum' in question_lower or '+' in question: result = sum(float(n) for n in numbers) return str(result) elif 'average' in question_lower: result = sum(float(n) for n in numbers) / len(numbers) return str(result) except: pass # Search-based questions if any(word in question_lower for word in ['what', 'who', 'when', 'where', 'how', 'why']): try: search_result = web_search_tool(question) # Extract key information from search results lines = search_result.split('\n') relevant_lines = [line for line in lines if len(line.strip()) > 20] return relevant_lines[0] if relevant_lines else "Unable to find specific information" except: pass # Default response return "I need more context or tools to answer this question accurately." except Exception as e: return f"Processing error: {str(e)}" def run_and_submit_all(profile: gr.OAuthProfile | None): """ Fetches all questions, runs the GAIAAgent on them, submits all answers, and displays the results. """ # --- Determine HF Space Runtime URL and Repo URL --- space_id = os.getenv("SPACE_ID") if profile: username = f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # 1. Instantiate Agent try: agent = GAIAAgent() except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(agent_code) # 2. Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None except requests.exceptions.JSONDecodeError as e: print(f"Error decoding JSON response from questions endpoint: {e}") print(f"Response text: {response.text[:500]}") return f"Error decoding server response for questions: {e}", None except Exception as e: print(f"An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None # 3. Run GAIA Agent results_log = [] answers_payload = [] print(f"Running GAIA agent on {len(questions_data)} questions...") for i, item in enumerate(questions_data): task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue print(f"Processing question {i+1}/{len(questions_data)}: {task_id}") try: submitted_answer = agent(question_text) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({ "Task ID": task_id, "Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, "Submitted Answer": submitted_answer }) print(f"Answer for {task_id}: {submitted_answer[:50]}...") except Exception as e: print(f"Error running agent on task {task_id}: {e}") error_answer = f"AGENT ERROR: {e}" answers_payload.append({"task_id": task_id, "submitted_answer": error_answer}) results_log.append({ "Task ID": task_id, "Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, "Submitted Answer": error_answer }) if not answers_payload: print("Agent did not produce any answers to submit.") return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) # 4. Prepare Submission submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} status_update = f"GAIA Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) # 5. Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.Timeout: status_message = "Submission Failed: The request timed out." print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.RequestException as e: status_message = f"Submission Failed: Network error - {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df # --- Build Gradio Interface using Blocks --- with gr.Blocks() as demo: gr.Markdown("# GAIA Agent Evaluation Runner") gr.Markdown( """ **Enhanced GAIA Agent with SmolaAgent Framework** This agent is equipped with: - 🔍 Web search capabilities (DuckDuckGo) - **FIXED** - 🧮 Mathematical calculator - đŸ–ŧī¸ Image analysis - 📁 File processing (CSV, JSON, text files) - 📊 Data processing and statistics - 🐍 Python code execution **Instructions:** 1. Log in to your Hugging Face account using the button below 2. Click 'Run GAIA Evaluation & Submit All Answers' to start the evaluation 3. The agent will process each question systematically using available tools **Note:** Processing may take time as the agent analyzes each question thoroughly. """ ) gr.LoginButton() run_button = gr.Button("Run GAIA Evaluation & Submit All Answers", variant="primary") status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table] ) if __name__ == "__main__": print("\n" + "-"*30 + " GAIA Agent Starting " + "-"*30) space_host_startup = os.getenv("SPACE_HOST") space_id_startup = os.getenv("SPACE_ID") if space_host_startup: print(f"✅ SPACE_HOST found: {space_host_startup}") print(f" Runtime URL should be: https://{space_host_startup}.hf.space") else: print("â„šī¸ SPACE_HOST environment variable not found (running locally?).") if space_id_startup: print(f"✅ SPACE_ID found: {space_id_startup}") print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") else: print("â„šī¸ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") print("-"*(60 + len(" GAIA Agent Starting ")) + "\n") print("Launching Gradio Interface for GAIA Agent Evaluation...") demo.launch(debug=True, share=False)