LamiaYT's picture
fixing
c913a81
raw
history blame
18.1 kB
import os
import gradio as gr
import requests
import inspect
import pandas as pd
import json
import re
from typing import Dict, List, Any, Optional
import urllib.parse
from datetime import datetime
import math
# Transformers and torch imports
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
class EnhancedGAIAAgent:
def __init__(self):
print("Initializing Enhanced GAIA Agent with Mistral-7B...")
# Initialize Mistral model
try:
print("Loading Mistral-7B-Instruct model...")
self.tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.3")
self.model = AutoModelForCausalLM.from_pretrained(
"mistralai/Mistral-7B-Instruct-v0.3",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device_map="auto" if torch.cuda.is_available() else None
)
# Create pipeline for easier use
self.pipe = pipeline(
"text-generation",
model=self.model,
tokenizer=self.tokenizer,
max_new_tokens=512,
temperature=0.7,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
print("โœ… Mistral model loaded successfully!")
except Exception as e:
print(f"โŒ Error loading Mistral model: {e}")
print("Falling back to basic responses...")
self.pipe = None
# Tool functions for GAIA tasks
self.tools = {
"calculate": self._calculate,
"search_web": self._search_web,
"parse_data": self._parse_data,
"analyze_text": self._analyze_text,
"solve_math": self._solve_math
}
def _calculate(self, expression: str) -> str:
"""Safe calculator for mathematical expressions"""
try:
# Clean and validate expression
expression = re.sub(r'[^0-9+\-*/().\s]', '', expression)
result = eval(expression)
return str(result)
except Exception as e:
return f"Calculation error: {e}"
def _search_web(self, query: str) -> str:
"""Simulate web search (placeholder - you'd integrate real search API)"""
# This is a placeholder - integrate with actual search API
return f"Search results for '{query}': [This would contain real search results]"
def _parse_data(self, data: str) -> str:
"""Parse and analyze structured data"""
try:
# Try to parse as JSON
if data.strip().startswith('{') or data.strip().startswith('['):
parsed = json.loads(data)
return f"Parsed data structure with {len(parsed) if isinstance(parsed, (list, dict)) else 1} elements"
else:
# Basic text analysis
lines = data.split('\n')
return f"Text data with {len(lines)} lines, {len(data.split())} words"
except Exception as e:
return f"Data parsing error: {e}"
def _analyze_text(self, text: str) -> str:
"""Analyze text content"""
words = text.split()
sentences = text.split('.')
return f"Text analysis: {len(words)} words, {len(sentences)} sentences"
def _solve_math(self, problem: str) -> str:
"""Enhanced math problem solver"""
try:
# Extract numbers and operations
numbers = re.findall(r'-?\d+\.?\d*', problem)
# Handle common math patterns
if "percent" in problem.lower() or "%" in problem:
if len(numbers) >= 2:
base = float(numbers[0])
percent = float(numbers[1])
result = base * (percent / 100)
return str(result)
if "average" in problem.lower() or "mean" in problem.lower():
if numbers:
nums = [float(n) for n in numbers]
return str(sum(nums) / len(nums))
# Default calculation
return self._calculate(" ".join(numbers))
except Exception as e:
return f"Math solving error: {e}"
def _generate_response(self, prompt: str) -> str:
"""Generate response using Mistral model"""
if not self.pipe:
return "Model not available - using fallback response."
try:
messages = [
{"role": "user", "content": prompt}
]
response = self.pipe(messages, max_new_tokens=512, temperature=0.7)
# Extract the generated text
if response and len(response) > 0:
generated_text = response[0]['generated_text']
# Get only the assistant's response (after the user message)
if isinstance(generated_text, list):
# Find the assistant's response
for msg in generated_text:
if msg.get('role') == 'assistant':
return msg.get('content', '')
elif isinstance(generated_text, str):
return generated_text
else:
return str(generated_text)
return "No response generated."
except Exception as e:
print(f"Error generating response: {e}")
return f"Error in response generation: {e}"
def _detect_task_type(self, question: str) -> str:
"""Detect the type of task to apply appropriate strategy"""
question_lower = question.lower()
if any(word in question_lower for word in ["calculate", "compute", "math", "+", "-", "*", "/", "="]):
return "calculation"
elif any(word in question_lower for word in ["search", "find", "lookup", "google"]):
return "search"
elif any(word in question_lower for word in ["data", "csv", "json", "table", "parse"]):
return "data_analysis"
elif any(word in question_lower for word in ["percent", "%", "average", "mean", "sum"]):
return "math_word_problem"
else:
return "general_reasoning"
def __call__(self, question: str) -> str:
print(f"Agent processing question (first 100 chars): {question[:100]}...")
# Detect task type
task_type = self._detect_task_type(question)
print(f"Detected task type: {task_type}")
# Build enhanced prompt based on task type
if task_type == "calculation":
enhanced_prompt = f"""
You are a precise mathematical assistant. Solve this step-by-step:
Question: {question}
Provide a clear, accurate answer. If calculation is needed, show your work.
Answer:"""
elif task_type == "math_word_problem":
enhanced_prompt = f"""
You are solving a math word problem. Break it down step by step:
Question: {question}
Steps:
1. Identify what is being asked
2. Extract the relevant numbers
3. Determine the operation needed
4. Calculate the result
5. Provide the final answer
Answer:"""
elif task_type == "data_analysis":
enhanced_prompt = f"""
You are analyzing data. Approach this systematically:
Question: {question}
Consider:
- What type of data is involved?
- What analysis is needed?
- What tools or methods should be used?
Provide a clear, structured answer.
Answer:"""
else:
enhanced_prompt = f"""
You are a helpful assistant that provides accurate, well-reasoned answers.
Question: {question}
Think through this step-by-step and provide a clear, comprehensive answer.
Answer:"""
# Generate response using the model
try:
response = self._generate_response(enhanced_prompt)
# Post-process response for specific task types
if task_type in ["calculation", "math_word_problem"]:
# Try to extract and verify any calculations
numbers_in_response = re.findall(r'-?\d+\.?\d*', response)
if numbers_in_response:
# Attempt to verify calculation if simple enough
pass
print(f"Agent returning response (first 100 chars): {response[:100]}...")
return response.strip()
except Exception as e:
print(f"Error in agent processing: {e}")
fallback_response = self._handle_fallback(question, task_type)
return fallback_response
def _handle_fallback(self, question: str, task_type: str) -> str:
"""Provide fallback responses when the main model fails"""
if task_type == "calculation":
# Try to extract and calculate simple expressions
try:
numbers = re.findall(r'-?\d+\.?\d*', question)
if len(numbers) >= 2:
if "+" in question:
result = sum(float(n) for n in numbers)
return f"The sum is {result}"
elif "*" in question or "multiply" in question.lower():
result = 1
for n in numbers:
result *= float(n)
return f"The product is {result}"
except:
pass
return f"I understand you're asking about: {question}. This appears to be a {task_type} task. Let me provide my best analysis based on the available information."
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the EnhancedGAIAAgent on them, submits all answers,
and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID")
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Enhanced Agent
try:
print("Initializing Enhanced GAIA Agent...")
agent = EnhancedGAIAAgent()
print("โœ… Agent initialized successfully!")
except Exception as e:
print(f"โŒ Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(f"Agent code URL: {agent_code}")
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"โœ… Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"โŒ Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"โŒ Error decoding JSON response from questions endpoint: {e}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"โŒ An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run Enhanced Agent
results_log = []
answers_payload = []
print(f"๐Ÿš€ Running enhanced agent on {len(questions_data)} questions...")
for i, item in enumerate(questions_data, 1):
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"โš ๏ธ Skipping item with missing task_id or question: {item}")
continue
print(f"๐Ÿ“ Processing question {i}/{len(questions_data)} (ID: {task_id})")
try:
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text[:200] + "..." if len(question_text) > 200 else question_text,
"Submitted Answer": submitted_answer[:300] + "..." if len(submitted_answer) > 300 else submitted_answer
})
print(f"โœ… Completed question {i}")
except Exception as e:
print(f"โŒ Error running agent on task {task_id}: {e}")
error_response = f"AGENT ERROR: {e}"
answers_payload.append({"task_id": task_id, "submitted_answer": error_response})
results_log.append({
"Task ID": task_id,
"Question": question_text[:200] + "..." if len(question_text) > 200 else question_text,
"Submitted Answer": error_response
})
if not answers_payload:
print("โŒ Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {
"username": username.strip(),
"agent_code": agent_code,
"answers": answers_payload
}
print(f"๐Ÿ“ค Submitting {len(answers_payload)} answers for user '{username}'...")
# 5. Submit
try:
response = requests.post(submit_url, json=submission_data, timeout=120) # Increased timeout
response.raise_for_status()
result_data = response.json()
final_status = (
f"๐ŸŽ‰ Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("โœ… Submission successful!")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"โŒ Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"โŒ An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks(title="Enhanced GAIA Agent") as demo:
gr.Markdown("# ๐Ÿš€ Enhanced GAIA Agent with Mistral-7B")
gr.Markdown(
"""
**Enhanced Features:**
- ๐Ÿง  **Mistral-7B-Instruct** for advanced reasoning
- ๐Ÿ”ง **Tool Integration** for calculations and data processing
- ๐Ÿ“Š **Task Type Detection** for optimized responses
- ๐ŸŽฏ **GAIA-Optimized** prompting strategies
**Instructions:**
1. Clone this space and ensure you have access to Mistral-7B-Instruct
2. Log in to your Hugging Face account using the button below
3. Click 'Run Enhanced Evaluation' to process all questions with the enhanced agent
**Note:** The enhanced agent uses Mistral-7B which requires significant computational resources.
Processing may take several minutes depending on the number of questions.
"""
)
with gr.Row():
gr.LoginButton()
with gr.Row():
run_button = gr.Button("๐Ÿš€ Run Enhanced Evaluation & Submit All Answers", variant="primary")
status_output = gr.Textbox(
label="๐Ÿ“Š Run Status / Submission Result",
lines=8,
interactive=False
)
results_table = gr.DataFrame(
label="๐Ÿ“ Questions and Agent Answers",
wrap=True,
height=400
)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "="*50)
print("๐Ÿš€ ENHANCED GAIA AGENT STARTING")
print("="*50)
# Environment check
space_host = os.getenv("SPACE_HOST")
space_id = os.getenv("SPACE_ID")
if space_host:
print(f"โœ… SPACE_HOST: {space_host}")
print(f"๐ŸŒ Runtime URL: https://{space_host}.hf.space")
else:
print("โ„น๏ธ Running locally - SPACE_HOST not found")
if space_id:
print(f"โœ… SPACE_ID: {space_id}")
print(f"๐Ÿ“ Repo URL: https://huggingface.co/spaces/{space_id}")
else:
print("โ„น๏ธ SPACE_ID not found")
# GPU/CPU check
if torch.cuda.is_available():
print(f"๐ŸŽฎ GPU Available: {torch.cuda.get_device_name()}")
print(f"๐Ÿ’พ GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
print("๐Ÿ’ป Running on CPU (GPU not available)")
print("="*50)
print("๐Ÿš€ Launching Enhanced GAIA Agent Interface...")
demo.launch(debug=True, share=False)