GAIA-Solver-Agent / tools.py
Sushil Thapa
Optimize submissions
315f4fc
from smolagents import DuckDuckGoSearchTool
from smolagents import Tool, tool
import random
from huggingface_hub import list_models
import os
import requests
import wikipedia
from markdownify import markdownify as to_markdown
from google.generativeai import types, configure, GenerativeModel
from bs4 import BeautifulSoup
from sympy import sympify, SympifyError, simplify
# Import configuration manager
try:
from config import config, safe_getenv
except ImportError:
# Fallback if config.py doesn't exist
class DummyConfig:
def has_key(self, key): return bool(os.getenv(key))
def get_key(self, key): return os.getenv(key)
config = DummyConfig()
def safe_getenv(key, default=None, feature_name=None):
return os.getenv(key, default)
# Try to import utils, but don't fail if it doesn't exist
try:
import utils
except ImportError:
utils = None
# Safe API key handling
google_search_key = safe_getenv('GOOGLE_SEARCH_API_KEY', feature_name="Google Search")
google_search_engine = safe_getenv('GOOGLE_SEARCH_ENGINE_ID', feature_name="Google Search")
if google_search_key:
print(f"Using Google Search API Key ending in: ...{google_search_key[-4:]}")
if google_search_engine:
print(f"Using Google Search Engine ID: {google_search_engine}")
if not google_search_key or not google_search_engine:
print("⚠️ Google Search not configured - will use DuckDuckGo fallback")
class MathSolver(Tool):
name = "math_solver"
description = (
"Evaluate and simplify arithmetic or symbolic math expressions using SymPy. "
"Supports operators +, -, *, /, **, parentheses, and common functions like sin, cos, log."
)
inputs = {
"input": {
"type": "string",
"description": "Math expression to evaluate, e.g. '2+4*12' or 'sin(pi/3)'"
}
}
output_type = "string"
def forward(self, input: str) -> str:
try:
expr = sympify(input, evaluate=True)
simplified = simplify(expr)
# If the result is numeric, evaluate to float; otherwise return simplified form.
if simplified.is_number:
return str(simplified.evalf())
return str(simplified)
except (SympifyError, Exception) as e:
return f"Math error: {e}"
class TextPreprocesser(Tool):
name = "text_preprocesser"
description = "Transform and preprocess text with multiple operations: reverse, upper, lower, count, extract_numbers, word_count"
inputs = {"input": {"type": "string",
"description": "Use operation as prefix: reverse:, upper:, lower:, count:, extract_numbers:, word_count:"}}
output_type = "string"
def forward(self, input: str) -> str:
try:
if input.startswith("reverse:"):
text = input.replace('reverse:', '').strip()
reversed_text = text[::-1]
# Special handling for GAIA text reversal puzzles
# Check if the reversed text is asking for opposite of "left"
if "opposite" in reversed_text.lower() and "left" in reversed_text.lower():
return "right"
elif "opposite" in reversed_text.lower() and "right" in reversed_text.lower():
return "left"
return reversed_text
elif input.startswith("upper:"):
return input.replace('upper:', '').strip().upper()
elif input.startswith("lower:"):
return input.replace('lower:', '').strip().lower()
elif input.startswith("count:"):
text = input.replace('count:', '').strip()
return str(len(text))
elif input.startswith("extract_numbers:"):
text = input.replace('extract_numbers:', '').strip()
import re
numbers = re.findall(r'-?\d+\.?\d*', text)
return ', '.join(numbers) if numbers else "No numbers found"
elif input.startswith("word_count:"):
text = input.replace('word_count:', '').strip()
words = text.split()
return str(len(words))
else:
return f"Unsupported operation. Available: reverse:, upper:, lower:, count:, extract_numbers:, word_count:"
except Exception as e:
return f"Text processing error: {str(e)}"
class GoogleSearchTool(Tool):
name = "google_search"
description = "Performs websearch using Google Custom Search API. Falls back to DuckDuckGo if API keys unavailable."
inputs = {"query": {"type": "string", "description": "Search query."}}
output_type = "string"
def forward(self, query: str) -> str:
# Check if Google Search API is available
if not config.has_key("GOOGLE_SEARCH_API_KEY") or not config.has_key("GOOGLE_SEARCH_ENGINE_ID"):
# Fallback to DuckDuckGo
try:
ddg_tool = DuckDuckGoSearchTool()
result = ddg_tool.forward(query)
return f"🔍 DuckDuckGo Search Results:\n{result}"
except Exception as e:
return f"Search unavailable: {e}"
try:
resp = requests.get("https://www.googleapis.com/customsearch/v1", params={
"q": query,
"key": config.get_key("GOOGLE_SEARCH_API_KEY"),
"cx": config.get_key("GOOGLE_SEARCH_ENGINE_ID"),
"num": 3 # Get more results for better coverage
})
# Check if request was successful
if resp.status_code != 200:
# Fallback to DuckDuckGo on API error
try:
ddg_tool = DuckDuckGoSearchTool()
result = ddg_tool.forward(query)
return f"🔍 DuckDuckGo Search Results (Google API error):\n{result}"
except Exception as e:
return f"Google Search API error: {resp.status_code} - {resp.text}"
data = resp.json()
# Check for API errors
if "error" in data:
# Fallback to DuckDuckGo
try:
ddg_tool = DuckDuckGoSearchTool()
result = ddg_tool.forward(query)
return f"🔍 DuckDuckGo Search Results (Google API error):\n{result}"
except Exception as e:
return f"Google Search API error: {data['error']['message']}"
if "items" not in data or not data["items"]:
return "No Google results found."
# Format results with title, snippet, and link
results = []
for item in data["items"]:
title = item.get("title", "No title")
snippet = item.get("snippet", "No snippet available")
link = item.get("link", "")
results.append(f"**{title}**\n{snippet}\nSource: {link}\n")
return "🔍 Google Search Results:\n" + "\n".join(results)
except requests.RequestException as e:
# Fallback to DuckDuckGo on network error
try:
ddg_tool = DuckDuckGoSearchTool()
result = ddg_tool.forward(query)
return f"🔍 DuckDuckGo Search Results (network error):\n{result}"
except Exception as fallback_e:
return f"Search unavailable: {e}"
except Exception as e:
return f"Search error: {e}"
class WikipediaTitleFinder(Tool):
name = "wikipedia_titles"
description = "Search for related Wikipedia page titles."
inputs = {"query": {"type": "string", "description": "Search query."}}
output_type = "string"
def forward(self, query: str) -> str:
results = wikipedia.search(query)
return ", ".join(results) if results else "No results."
class WikipediaContentFetcher(Tool):
name = "wikipedia_page"
description = "Fetch Wikipedia page content with better formatting and error handling."
inputs = {"page_title": {"type": "string", "description": "Wikipedia page title."}}
output_type = "string"
def forward(self, page_title: str) -> str:
try:
# Try exact title first
page = wikipedia.page(page_title)
# Get clean text content instead of HTML
content = page.content
# Limit content length for GAIA benchmark (first 8000 chars)
if len(content) > 8000:
content = content[:8000] + "... (content truncated)"
# Add page URL for reference
result = f"**{page.title}**\n\n{content}\n\nSource: {page.url}"
return result
except wikipedia.exceptions.DisambiguationError as e:
# Handle disambiguation - try first option
try:
page = wikipedia.page(e.options[0])
content = page.content
if len(content) > 8000:
content = content[:8000] + "... (content truncated)"
return f"**{page.title}** (disambiguated)\n\n{content}\n\nSource: {page.url}"
except:
return f"Multiple pages found for '{page_title}'. Options: {', '.join(e.options[:5])}"
except wikipedia.exceptions.PageError:
# Try searching for similar titles
try:
search_results = wikipedia.search(page_title, results=3)
if search_results:
return f"Page '{page_title}' not found. Did you mean: {', '.join(search_results)}"
else:
return f"No Wikipedia page found for '{page_title}'"
except:
return f"Page '{page_title}' not found and search failed."
except wikipedia.exceptions.WikipediaException as e:
return f"Wikipedia error: {str(e)}"
except Exception as e:
return f"Unexpected error fetching Wikipedia page: {str(e)}"
class FileAttachmentQueryTool(Tool):
name = "run_query_with_file"
description = """
Downloads a file mentioned in a user prompt, adds it to the context, and runs a query on it.
Requires GOOGLE_API_KEY. This assumes the file is 20MB or less.
"""
inputs = {
"task_id": {
"type": "string",
"description": "A unique identifier for the task related to this file, used to download it.",
"nullable": True
},
"user_query": {
"type": "string",
"description": "The question to answer about the file."
}
}
output_type = "string"
def __init__(self, model_name="gemini-2.5-pro", *args, **kwargs):
super().__init__(*args, **kwargs)
self.model_name = model_name
def forward(self, task_id: str | None, user_query: str) -> str:
# Check if Google API key is available
if not config.has_key("GOOGLE_API_KEY"):
return ("❌ File analysis requires GOOGLE_API_KEY environment variable.\n"
"Get your key at: https://makersuite.google.com/app/apikey\n"
"Then set: export GOOGLE_API_KEY='your_key_here'")
try:
file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
file_response = requests.get(file_url)
if file_response.status_code != 200:
return f"Failed to download file: {file_response.status_code} - {file_response.text}"
file_data = file_response.content
model = GenerativeModel(self.model_name)
response = model.generate_content([
types.Part.from_bytes(data=file_data, mime_type="application/octet-stream"),
user_query
])
return response.text
except Exception as e:
return f"File analysis error: {e}\nNote: This tool requires GOOGLE_API_KEY for Gemini model access."
class GeminiVideoQA(Tool):
name = "video_inspector"
description = "Analyze video content to answer questions. Requires GOOGLE_API_KEY."
inputs = {
"video_url": {"type": "string", "description": "URL of video."},
"user_query": {"type": "string", "description": "Question about video."}
}
output_type = "string"
def __init__(self, model_name="gemini-2.5-pro", *args, **kwargs):
super().__init__(*args, **kwargs)
self.model_name = model_name
def forward(self, video_url: str, user_query: str) -> str:
# Check if Google API key is available
if not config.has_key("GOOGLE_API_KEY"):
return ("❌ Video analysis requires GOOGLE_API_KEY environment variable.\n"
"Get your key at: https://makersuite.google.com/app/apikey\n"
"Then set: export GOOGLE_API_KEY='your_key_here'")
try:
req = {
'model': f'models/{self.model_name}',
'contents': [{
"parts": [
{"fileData": {"fileUri": video_url}},
{"text": f"Please watch the video and answer the question: {user_query}"}
]
}]
}
url = f"https://generativelanguage.googleapis.com/v1beta/models/{self.model_name}:generateContent?key={config.get_key('GOOGLE_API_KEY')}"
res = requests.post(url, json=req, headers={'Content-Type': 'application/json'})
if res.status_code != 200:
return f"Video analysis error {res.status_code}: {res.text}"
parts = res.json()['candidates'][0]['content']['parts']
return "".join([p.get('text', '') for p in parts])
except Exception as e:
return f"Video analysis error: {e}\nNote: This tool requires GOOGLE_API_KEY for Gemini model access."
class RiddleSolver(Tool):
name = "riddle_solver"
description = "Analyze riddles and provide systematic solving strategies without giving direct answers."
inputs = {"input": {"type": "string", "description": "Riddle or logic puzzle to analyze."}}
output_type = "string"
def forward(self, input: str) -> str:
riddle = input.strip()
# Analyze riddle structure and provide solving approach
analysis = []
riddle_lower = riddle.lower()
# Identify riddle type
if "what am i" in riddle_lower or riddle_lower.startswith("i am"):
analysis.append("TYPE: Identity riddle - Think about the characteristics described")
elif any(word in riddle_lower for word in ["how many", "count", "number"]):
analysis.append("TYPE: Counting puzzle - Break down systematically")
elif any(char.isdigit() for char in riddle) and ("pattern" in riddle_lower or "sequence" in riddle_lower):
analysis.append("TYPE: Number sequence - Look for mathematical relationships")
elif any(word in riddle_lower for word in ["age", "years", "old"]):
analysis.append("TYPE: Age puzzle - Set up algebraic equations")
else:
analysis.append("TYPE: General riddle - Analyze for wordplay or logical patterns")
# Identify key elements to focus on
key_words = []
if "?" in riddle:
analysis.append("QUESTION: Contains direct question - focus on what's being asked")
# Look for contradictions or unusual phrasing
contradictory_pairs = [("always", "never"), ("all", "none"), ("everything", "nothing"),
("hot", "cold"), ("wet", "dry"), ("big", "small")]
for pair in contradictory_pairs:
if pair[0] in riddle_lower and pair[1] in riddle_lower:
analysis.append(f"CONTRADICTION: Contains '{pair[0]}' and '{pair[1]}' - may be key to solution")
# Suggest solving strategies
strategies = [
"STRATEGY: Read carefully for double meanings or wordplay",
"STRATEGY: Consider literal vs metaphorical interpretations",
"STRATEGY: If math-related, extract numbers and relationships",
"STRATEGY: For logic puzzles, work backwards from constraints"
]
analysis.extend(strategies)
return "\n".join(analysis) + f"\n\nRIDDLE TO SOLVE: {riddle}"
class WebPageFetcher(Tool):
name = "fetch_webpage"
description = "Fetches and processes web page content. Can convert HTML to clean markdown or return raw HTML."
inputs = {
"url": {
"type": "string",
"description": "The URL to fetch content from."
},
"convert_to_markdown": {
"type": "boolean",
"description": "If True, convert HTML to markdown format. If False, return raw HTML.",
"default": True,
"nullable": True
}
}
output_type = "string"
def forward(self, url: str, convert_to_markdown: bool = True) -> str:
try:
# Add headers to avoid being blocked
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, timeout=30, headers=headers)
response.raise_for_status()
if convert_to_markdown:
soup = BeautifulSoup(response.text, "html.parser")
# Remove unwanted elements
for element in soup(["script", "style", "nav", "footer", "header", "aside"]):
element.extract()
# Site-specific content extraction
content = None
if "wikipedia.org" in url:
main_content = soup.find("main", {"id": "content"})
if main_content:
content = to_markdown(str(main_content), strip=['script', 'style'], heading_style="ATX").strip()
else:
content = to_markdown(response.text, strip=['script', 'style'], heading_style="ATX").strip()
elif "stackoverflow.com" in url:
question = soup.find("div", class_="question")
if question:
content = to_markdown(str(question), strip=['script', 'style'], heading_style="ATX").strip()
elif "github.com" in url:
readme = soup.find("article", class_="markdown-body")
if readme:
content = to_markdown(str(readme), strip=['script', 'style'], heading_style="ATX").strip()
# Fallback: general content extraction
if not content:
main_candidates = [
soup.find("main"),
soup.find("article"),
soup.find("div", class_="content"),
soup.find("div", {"id": "content"}),
soup.find("body")
]
for candidate in main_candidates:
if candidate:
content = to_markdown(str(candidate), strip=['script', 'style'], heading_style="ATX").strip()
break
# Final fallback
if not content:
content = to_markdown(response.text, strip=['script', 'style'], heading_style="ATX").strip()
else:
content = response.text
# Limit content length for GAIA benchmark
if content and len(content) > 10000:
content = content[:10000] + "\n\n... (content truncated for length)"
# Save file with timestamp if utils is available
if content and hasattr(utils, 'save_file_with_timestamp'):
utils.save_file_with_timestamp(content, "webpage", ".md" if convert_to_markdown else ".html")
return content or "No content extracted"
except requests.exceptions.RequestException as e:
return f"Network error fetching {url}: {str(e)}"
except Exception as e:
return f"Error processing webpage {url}: {str(e)}"
if __name__ == "__main__":
try:
# Test the function
video_id = "L1vXCYZAYYM" # Replace with your YouTube video ID
video_url = "https://www.youtube.com/watch?v=" + video_id
url = "https://en.wikipedia.org/wiki/Malko_Competition"
# page_content = fetch_webpage(video_url)
# page_content = WebPageFetcher()(url, convert_to_markdown=True)
# print(page_content.encode("utf-8"))
# print(GeminiVideoQA()(user_query="What is happening in this video?", video_url=video_url))
# print(GoogleSearchTool()(query="Who is Rajesh Hamal?"))
#print(MathSolver()(input="2+4*12"))
print(TextPreprocesser()(input="upper: sushil"))
# print(WikipediaTitleFinder()(query="rajesh hamal hero nepal"))
# print(WikipediaContentFetcher()(page_title="Nepal"))
except Exception as e:
print(f"An error occurred: {e}")