Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import requests | |
import inspect | |
import pandas as pd | |
import json | |
import re | |
from typing import Dict, List, Any, Optional | |
import urllib.parse | |
from datetime import datetime | |
import math | |
# Transformers and torch imports | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
import torch | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
class EnhancedGAIAAgent: | |
def __init__(self): | |
print("Initializing Enhanced GAIA Agent with Mistral-7B...") | |
# Initialize Mistral model | |
try: | |
print("Loading Mistral-7B-Instruct model...") | |
self.tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.3") | |
self.model = AutoModelForCausalLM.from_pretrained( | |
"mistralai/Mistral-7B-Instruct-v0.3", | |
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
device_map="auto" if torch.cuda.is_available() else None | |
) | |
# Create pipeline for easier use | |
self.pipe = pipeline( | |
"text-generation", | |
model=self.model, | |
tokenizer=self.tokenizer, | |
max_new_tokens=512, | |
temperature=0.7, | |
do_sample=True, | |
pad_token_id=self.tokenizer.eos_token_id | |
) | |
print("โ Mistral model loaded successfully!") | |
except Exception as e: | |
print(f"โ Error loading Mistral model: {e}") | |
print("Falling back to basic responses...") | |
self.pipe = None | |
# Tool functions for GAIA tasks | |
self.tools = { | |
"calculate": self._calculate, | |
"search_web": self._search_web, | |
"parse_data": self._parse_data, | |
"analyze_text": self._analyze_text, | |
"solve_math": self._solve_math | |
} | |
def _calculate(self, expression: str) -> str: | |
"""Safe calculator for mathematical expressions""" | |
try: | |
# Clean and validate expression | |
expression = re.sub(r'[^0-9+\-*/().\s]', '', expression) | |
result = eval(expression) | |
return str(result) | |
except Exception as e: | |
return f"Calculation error: {e}" | |
def _search_web(self, query: str) -> str: | |
"""Simulate web search (placeholder - you'd integrate real search API)""" | |
# This is a placeholder - integrate with actual search API | |
return f"Search results for '{query}': [This would contain real search results]" | |
def _parse_data(self, data: str) -> str: | |
"""Parse and analyze structured data""" | |
try: | |
# Try to parse as JSON | |
if data.strip().startswith('{') or data.strip().startswith('['): | |
parsed = json.loads(data) | |
return f"Parsed data structure with {len(parsed) if isinstance(parsed, (list, dict)) else 1} elements" | |
else: | |
# Basic text analysis | |
lines = data.split('\n') | |
return f"Text data with {len(lines)} lines, {len(data.split())} words" | |
except Exception as e: | |
return f"Data parsing error: {e}" | |
def _analyze_text(self, text: str) -> str: | |
"""Analyze text content""" | |
words = text.split() | |
sentences = text.split('.') | |
return f"Text analysis: {len(words)} words, {len(sentences)} sentences" | |
def _solve_math(self, problem: str) -> str: | |
"""Enhanced math problem solver""" | |
try: | |
# Extract numbers and operations | |
numbers = re.findall(r'-?\d+\.?\d*', problem) | |
# Handle common math patterns | |
if "percent" in problem.lower() or "%" in problem: | |
if len(numbers) >= 2: | |
base = float(numbers[0]) | |
percent = float(numbers[1]) | |
result = base * (percent / 100) | |
return str(result) | |
if "average" in problem.lower() or "mean" in problem.lower(): | |
if numbers: | |
nums = [float(n) for n in numbers] | |
return str(sum(nums) / len(nums)) | |
# Default calculation | |
return self._calculate(" ".join(numbers)) | |
except Exception as e: | |
return f"Math solving error: {e}" | |
def _generate_response(self, prompt: str) -> str: | |
"""Generate response using Mistral model""" | |
if not self.pipe: | |
return "Model not available - using fallback response." | |
try: | |
messages = [ | |
{"role": "user", "content": prompt} | |
] | |
response = self.pipe(messages, max_new_tokens=512, temperature=0.7) | |
# Extract the generated text | |
if response and len(response) > 0: | |
generated_text = response[0]['generated_text'] | |
# Get only the assistant's response (after the user message) | |
if isinstance(generated_text, list): | |
# Find the assistant's response | |
for msg in generated_text: | |
if msg.get('role') == 'assistant': | |
return msg.get('content', '') | |
elif isinstance(generated_text, str): | |
return generated_text | |
else: | |
return str(generated_text) | |
return "No response generated." | |
except Exception as e: | |
print(f"Error generating response: {e}") | |
return f"Error in response generation: {e}" | |
def _detect_task_type(self, question: str) -> str: | |
"""Detect the type of task to apply appropriate strategy""" | |
question_lower = question.lower() | |
if any(word in question_lower for word in ["calculate", "compute", "math", "+", "-", "*", "/", "="]): | |
return "calculation" | |
elif any(word in question_lower for word in ["search", "find", "lookup", "google"]): | |
return "search" | |
elif any(word in question_lower for word in ["data", "csv", "json", "table", "parse"]): | |
return "data_analysis" | |
elif any(word in question_lower for word in ["percent", "%", "average", "mean", "sum"]): | |
return "math_word_problem" | |
else: | |
return "general_reasoning" | |
def __call__(self, question: str) -> str: | |
print(f"Agent processing question (first 100 chars): {question[:100]}...") | |
# Detect task type | |
task_type = self._detect_task_type(question) | |
print(f"Detected task type: {task_type}") | |
# Build enhanced prompt based on task type | |
if task_type == "calculation": | |
enhanced_prompt = f""" | |
You are a precise mathematical assistant. Solve this step-by-step: | |
Question: {question} | |
Provide a clear, accurate answer. If calculation is needed, show your work. | |
Answer:""" | |
elif task_type == "math_word_problem": | |
enhanced_prompt = f""" | |
You are solving a math word problem. Break it down step by step: | |
Question: {question} | |
Steps: | |
1. Identify what is being asked | |
2. Extract the relevant numbers | |
3. Determine the operation needed | |
4. Calculate the result | |
5. Provide the final answer | |
Answer:""" | |
elif task_type == "data_analysis": | |
enhanced_prompt = f""" | |
You are analyzing data. Approach this systematically: | |
Question: {question} | |
Consider: | |
- What type of data is involved? | |
- What analysis is needed? | |
- What tools or methods should be used? | |
Provide a clear, structured answer. | |
Answer:""" | |
else: | |
enhanced_prompt = f""" | |
You are a helpful assistant that provides accurate, well-reasoned answers. | |
Question: {question} | |
Think through this step-by-step and provide a clear, comprehensive answer. | |
Answer:""" | |
# Generate response using the model | |
try: | |
response = self._generate_response(enhanced_prompt) | |
# Post-process response for specific task types | |
if task_type in ["calculation", "math_word_problem"]: | |
# Try to extract and verify any calculations | |
numbers_in_response = re.findall(r'-?\d+\.?\d*', response) | |
if numbers_in_response: | |
# Attempt to verify calculation if simple enough | |
pass | |
print(f"Agent returning response (first 100 chars): {response[:100]}...") | |
return response.strip() | |
except Exception as e: | |
print(f"Error in agent processing: {e}") | |
fallback_response = self._handle_fallback(question, task_type) | |
return fallback_response | |
def _handle_fallback(self, question: str, task_type: str) -> str: | |
"""Provide fallback responses when the main model fails""" | |
if task_type == "calculation": | |
# Try to extract and calculate simple expressions | |
try: | |
numbers = re.findall(r'-?\d+\.?\d*', question) | |
if len(numbers) >= 2: | |
if "+" in question: | |
result = sum(float(n) for n in numbers) | |
return f"The sum is {result}" | |
elif "*" in question or "multiply" in question.lower(): | |
result = 1 | |
for n in numbers: | |
result *= float(n) | |
return f"The product is {result}" | |
except: | |
pass | |
return f"I understand you're asking about: {question}. This appears to be a {task_type} task. Let me provide my best analysis based on the available information." | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the EnhancedGAIAAgent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") | |
if profile: | |
username = f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Enhanced Agent | |
try: | |
print("Initializing Enhanced GAIA Agent...") | |
agent = EnhancedGAIAAgent() | |
print("โ Agent initialized successfully!") | |
except Exception as e: | |
print(f"โ Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(f"Agent code URL: {agent_code}") | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"โ Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"โ Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"โ Error decoding JSON response from questions endpoint: {e}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"โ An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run Enhanced Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"๐ Running enhanced agent on {len(questions_data)} questions...") | |
for i, item in enumerate(questions_data, 1): | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
print(f"โ ๏ธ Skipping item with missing task_id or question: {item}") | |
continue | |
print(f"๐ Processing question {i}/{len(questions_data)} (ID: {task_id})") | |
try: | |
submitted_answer = agent(question_text) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:200] + "..." if len(question_text) > 200 else question_text, | |
"Submitted Answer": submitted_answer[:300] + "..." if len(submitted_answer) > 300 else submitted_answer | |
}) | |
print(f"โ Completed question {i}") | |
except Exception as e: | |
print(f"โ Error running agent on task {task_id}: {e}") | |
error_response = f"AGENT ERROR: {e}" | |
answers_payload.append({"task_id": task_id, "submitted_answer": error_response}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text[:200] + "..." if len(question_text) > 200 else question_text, | |
"Submitted Answer": error_response | |
}) | |
if not answers_payload: | |
print("โ Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = { | |
"username": username.strip(), | |
"agent_code": agent_code, | |
"answers": answers_payload | |
} | |
print(f"๐ค Submitting {len(answers_payload)} answers for user '{username}'...") | |
# 5. Submit | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=120) # Increased timeout | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"๐ Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("โ Submission successful!") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"โ Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"โ An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks(title="Enhanced GAIA Agent") as demo: | |
gr.Markdown("# ๐ Enhanced GAIA Agent with Mistral-7B") | |
gr.Markdown( | |
""" | |
**Enhanced Features:** | |
- ๐ง **Mistral-7B-Instruct** for advanced reasoning | |
- ๐ง **Tool Integration** for calculations and data processing | |
- ๐ **Task Type Detection** for optimized responses | |
- ๐ฏ **GAIA-Optimized** prompting strategies | |
**Instructions:** | |
1. Clone this space and ensure you have access to Mistral-7B-Instruct | |
2. Log in to your Hugging Face account using the button below | |
3. Click 'Run Enhanced Evaluation' to process all questions with the enhanced agent | |
**Note:** The enhanced agent uses Mistral-7B which requires significant computational resources. | |
Processing may take several minutes depending on the number of questions. | |
""" | |
) | |
with gr.Row(): | |
gr.LoginButton() | |
with gr.Row(): | |
run_button = gr.Button("๐ Run Enhanced Evaluation & Submit All Answers", variant="primary") | |
status_output = gr.Textbox( | |
label="๐ Run Status / Submission Result", | |
lines=8, | |
interactive=False | |
) | |
results_table = gr.DataFrame( | |
label="๐ Questions and Agent Answers", | |
wrap=True, | |
height=400 | |
) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "="*50) | |
print("๐ ENHANCED GAIA AGENT STARTING") | |
print("="*50) | |
# Environment check | |
space_host = os.getenv("SPACE_HOST") | |
space_id = os.getenv("SPACE_ID") | |
if space_host: | |
print(f"โ SPACE_HOST: {space_host}") | |
print(f"๐ Runtime URL: https://{space_host}.hf.space") | |
else: | |
print("โน๏ธ Running locally - SPACE_HOST not found") | |
if space_id: | |
print(f"โ SPACE_ID: {space_id}") | |
print(f"๐ Repo URL: https://huggingface.co/spaces/{space_id}") | |
else: | |
print("โน๏ธ SPACE_ID not found") | |
# GPU/CPU check | |
if torch.cuda.is_available(): | |
print(f"๐ฎ GPU Available: {torch.cuda.get_device_name()}") | |
print(f"๐พ GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") | |
else: | |
print("๐ป Running on CPU (GPU not available)") | |
print("="*50) | |
print("๐ Launching Enhanced GAIA Agent Interface...") | |
demo.launch(debug=True, share=False) |