Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import json | |
import traceback | |
from typing import List, Dict | |
import warnings | |
# Suppress warnings for cleaner output | |
warnings.filterwarnings("ignore", category=FutureWarning) | |
warnings.filterwarnings("ignore", category=UserWarning) | |
import gradio as gr | |
# --- Environment variable setup to fix permission issues --- | |
def setup_environment(): | |
env_vars = { | |
"NLTK_DATA": "/tmp/nltk_data", | |
"MPLCONFIGDIR": "/tmp/matplotlib_cache", | |
"HF_HOME": "/tmp/huggingface_cache", | |
"TORCH_HOME": "/tmp/torch_cache", | |
"TRANSFORMERS_CACHE": "/tmp/huggingface_cache" | |
} | |
for var, path in env_vars.items(): | |
os.environ[var] = path | |
os.makedirs(path, exist_ok=True) | |
for var, path in env_vars.items(): | |
os.environ[var] = path | |
# Create directory if it doesn't exist | |
try: | |
os.makedirs(path, exist_ok=True) | |
print(f"β Created/verified directory: {path}") | |
except PermissionError: | |
print(f"β οΈ Permission denied for {path}, using /tmp fallback") | |
fallback_path = f"/tmp/{var.lower()}" | |
os.environ[var] = fallback_path | |
os.makedirs(fallback_path, exist_ok=True) | |
except Exception as e: | |
print(f"β Error setting up {var}: {e}") | |
# Setup environment first | |
setup_environment() | |
# Import nltk AFTER setting environment variables | |
try: | |
import nltk | |
# Download required NLTK data upfront | |
nltk.download('punkt', download_dir=os.environ["NLTK_DATA"], quiet=True) | |
nltk.download('stopwords', download_dir=os.environ["NLTK_DATA"], quiet=True) | |
print("β NLTK data downloaded successfully") | |
except Exception as e: | |
print(f"β οΈ NLTK setup warning: {e}") | |
# Add current directory to path for local imports | |
sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
# Import dependencies with better error handling | |
try: | |
from utils.gaia_api import GaiaAPI | |
print("β GaiaAPI imported successfully") | |
except ImportError as e: | |
print(f"β οΈ Failed to import GaiaAPI: {e}") | |
# Create a fallback GaiaAPI | |
class GaiaAPI: | |
def get_questions(cls): | |
return [{"task_id": "fallback", "question": "What is 2+2?"}] | |
def get_random_question(cls): | |
return {"task_id": "fallback", "question": "What is 2+2?"} | |
def submit_answers(cls, username, code_url, answers): | |
return {"error": "GaiaAPI not available", "score": 0} | |
# Initialize global agent state | |
AGENT_READY = False | |
agent = None | |
initialization_error = None | |
agent_info = {} | |
def initialize_agent(): | |
"""Initialize the LlamaIndex agent with comprehensive error handling""" | |
global agent, AGENT_READY, initialization_error, agent_info | |
try: | |
print("π Starting agent initialization...") | |
# Import agent-related modules | |
print("π¦ Importing modules...") | |
from agent.local_llm import LocalLLM | |
from agent.tools import gaia_tools | |
from llama_index.core.agent import ReActAgent | |
from llama_index.core.memory import ChatMemoryBuffer | |
agent_info["modules_imported"] = True | |
print("β All modules imported successfully!") | |
print("π€ Initializing Local LLM...") | |
local_llm = LocalLLM() | |
llm = local_llm.get_llm() | |
agent_info["llm_type"] = llm.__class__.__name__ | |
print("π§ Creating ReAct Agent...") | |
memory = ChatMemoryBuffer.from_defaults(token_limit=2000) | |
# Check if we have a proper LLM or mock | |
if hasattr(llm, 'chat') and llm.__class__.__name__ != 'MockLLM': | |
agent = ReActAgent.from_tools( | |
tools=gaia_tools, | |
llm=llm, | |
memory=memory, | |
verbose=True, | |
max_iterations=3 | |
) | |
agent_info["agent_type"] = "ReActAgent" | |
print("β ReAct Agent initialized successfully!") | |
else: | |
agent = llm # Use the mock LLM directly | |
agent_info["agent_type"] = "MockLLM" | |
print("β οΈ Using mock mode - agent partially ready") | |
agent_info["tools_count"] = len(gaia_tools) if 'gaia_tools' in locals() else 0 | |
AGENT_READY = True | |
print("π Agent initialization complete!") | |
except Exception as e: | |
error_msg = f"Failed to initialize agent: {str(e)}" | |
print(f"β {error_msg}") | |
traceback.print_exc() | |
AGENT_READY = False | |
agent = None | |
initialization_error = error_msg | |
agent_info["error"] = error_msg | |
# Initialize agent | |
initialize_agent() | |
def process_single_question(question_text: str) -> str: | |
"""Process a single GAIA question through the agent""" | |
if not AGENT_READY: | |
error_msg = "β Agent not ready. " | |
if initialization_error: | |
error_msg += f"Error: {initialization_error}" | |
return error_msg | |
if not question_text.strip(): | |
return "β Please enter a question." | |
try: | |
enhanced_prompt = f""" | |
Answer the following question directly and concisely. Do not include "FINAL ANSWER" or any other prefixes in your response. Just provide the answer. | |
Question: {question_text} | |
""" | |
print(f"π€ Processing question: {question_text[:50]}...") | |
# FIXED: Use .complete() instead of .chat() to avoid chat template errors | |
if hasattr(agent, 'query'): | |
response = agent.query(enhanced_prompt) | |
elif hasattr(agent, 'complete'): | |
# Use complete() method for models without chat templates | |
response = agent.complete(enhanced_prompt) | |
answer = response.text if hasattr(response, 'text') else str(response) | |
elif hasattr(agent, 'chat'): | |
# Only use chat if it's the MockLLM or a proper chat model | |
try: | |
response = agent.chat([{"role": "user", "content": enhanced_prompt}]) | |
answer = response.message.content if hasattr(response, 'message') else str(response) | |
except Exception as chat_error: | |
# Fallback to complete if chat fails | |
print(f"β οΈ Chat method failed, trying complete: {chat_error}") | |
if hasattr(agent, 'complete'): | |
response = agent.complete(enhanced_prompt) | |
answer = response.text if hasattr(response, 'text') else str(response) | |
else: | |
raise chat_error | |
else: | |
answer = "Mock response: I would analyze this question and provide an answer." | |
# Clean up the answer if it wasn't already processed above | |
if 'answer' not in locals(): | |
answer = str(response).strip() | |
# Remove common prefixes from the answer | |
for prefix in ["FINAL ANSWER:", "Answer:", "The answer is:", "Final answer:"]: | |
if answer.startswith(prefix): | |
answer = answer[len(prefix):].strip() | |
print(f"β Generated answer: {answer[:50]}...") | |
return answer | |
except Exception as e: | |
error_msg = f"β Error processing question: {str(e)}" | |
print(error_msg) | |
return error_msg | |
def process_all_questions() -> str: | |
"""Process all GAIA questions and prepare answers for submission""" | |
if not AGENT_READY: | |
return "β Agent not ready." | |
try: | |
print("π₯ Fetching all GAIA questions...") | |
questions = GaiaAPI.get_questions() | |
processed_answers = [] | |
print(f"π Processing {len(questions)} questions...") | |
for i, question in enumerate(questions): | |
print(f"Processing question {i + 1}/{len(questions)}: {question['task_id']}") | |
answer = process_single_question(question['question']) | |
processed_answers.append({ | |
"task_id": question['task_id'], | |
"submitted_answer": answer | |
}) | |
# Save answers to file | |
output_file = "/app/gaia_answers.json" | |
with open(output_file, "w") as f: | |
json.dump(processed_answers, f, indent=2) | |
summary = f"β Processed {len(processed_answers)} questions.\n" | |
summary += f"πΎ Answers saved to {output_file}\n" | |
summary += "π First 3 answers:\n" | |
for ans in processed_answers[:3]: | |
summary += f"- {ans['task_id']}: {ans['submitted_answer'][:50]}...\n" | |
print(summary) | |
return summary | |
except Exception as e: | |
error_msg = f"β Error processing questions: {str(e)}" | |
print(error_msg) | |
traceback.print_exc() | |
return error_msg | |
def submit_to_gaia(username: str, code_url: str) -> str: | |
"""Submit answers to GAIA benchmark""" | |
if not AGENT_READY: | |
return "β Agent not ready." | |
if not username or not code_url: | |
return "β Please provide both username and code URL." | |
try: | |
answers_file = "/app/gaia_answers.json" | |
with open(answers_file, "r") as f: | |
answers = json.load(f) | |
print(f"π€ Submitting {len(answers)} answers...") | |
except FileNotFoundError: | |
return "β No processed answers found. Please process them first." | |
try: | |
result = GaiaAPI.submit_answers(username, code_url, answers) | |
if "error" in result: | |
return f"β Submission failed: {result['error']}" | |
score = result.get("score", "Unknown") | |
success_msg = f"β Submission successful!\nπ Score: {score}" | |
print(success_msg) | |
return success_msg | |
except Exception as e: | |
error_msg = f"β Submission error: {str(e)}" | |
print(error_msg) | |
return error_msg | |
def get_sample_question() -> str: | |
"""Load a sample question for testing""" | |
try: | |
question = GaiaAPI.get_random_question() | |
return question['question'] | |
except Exception as e: | |
return f"Error loading sample question: {str(e)}" | |
def get_system_status() -> str: | |
"""Get detailed system status for debugging""" | |
status = "π System Status:\n\n" | |
# Agent status | |
status += f"π€ Agent Ready: {'β Yes' if AGENT_READY else 'β No'}\n" | |
if initialization_error: | |
status += f"β Error: {initialization_error}\n" | |
# Agent info | |
status += f"π§ LLM Type: {agent_info.get('llm_type', 'Unknown')}\n" | |
status += f"π§ Agent Type: {agent_info.get('agent_type', 'Unknown')}\n" | |
status += f"π οΈ Tools Count: {agent_info.get('tools_count', 0)}\n" | |
# Environment | |
status += "\nπ Environment Variables:\n" | |
for var in ["NLTK_DATA", "HF_HOME", "MPLCONFIGDIR", "TORCH_HOME"]: | |
path = os.environ.get(var, 'Not set') | |
exists = "β " if os.path.exists(path) else "β" | |
status += f" {var}: {path} {exists}\n" | |
# Directory permissions | |
status += "\nπ Directory Status:\n" | |
for path in ["/app", "/tmp"]: | |
try: | |
writable = os.access(path, os.W_OK) | |
status += f" {path}: {'β Writable' if writable else 'β Not writable'}\n" | |
except: | |
status += f" {path}: β Error checking\n" | |
return status | |
# ---------- Gradio UI ---------- | |
with gr.Blocks(title="π¦ GAIA LlamaIndex Agent", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(f""" | |
# π¦ GAIA Benchmark Agent with LlamaIndex | |
This agent uses LlamaIndex with a local LLM to tackle GAIA benchmark questions. | |
**Status:** {"β Ready" if AGENT_READY else "β Not Ready"} | |
{f"**Error:** {initialization_error}" if initialization_error else ""} | |
""") | |
with gr.Tab("π¬ Test Single Question"): | |
gr.Markdown("Test the agent with individual questions") | |
with gr.Row(): | |
with gr.Column(): | |
question_input = gr.Textbox( | |
label="Question", | |
placeholder="Enter a GAIA question or click 'Load Sample'", | |
lines=3 | |
) | |
with gr.Row(): | |
sample_btn = gr.Button("π² Load Sample Question") | |
process_btn = gr.Button("π Process Question", variant="primary") | |
with gr.Column(): | |
answer_output = gr.Textbox( | |
label="Agent Answer", | |
lines=5, | |
interactive=False | |
) | |
sample_btn.click(get_sample_question, outputs=question_input) | |
process_btn.click(process_single_question, inputs=question_input, outputs=answer_output) | |
with gr.Tab("π Full Evaluation"): | |
gr.Markdown("Process all GAIA questions and prepare for submission") | |
process_all_btn = gr.Button("π Process All Questions", variant="primary") | |
processing_output = gr.Textbox(label="Processing Status", lines=10, interactive=False) | |
process_all_btn.click(process_all_questions, outputs=processing_output) | |
with gr.Tab("π Submit to GAIA"): | |
gr.Markdown(""" | |
Submit your processed answers to the GAIA benchmark for official scoring. | |
**Requirements:** | |
1. Your Hugging Face username | |
2. Link to your Space code (e.g., https://huggingface.co/spaces/your-username/gaia-agent) | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
username_input = gr.Textbox(label="HF Username", placeholder="your-username") | |
code_url_input = gr.Textbox(label="Space Code URL", placeholder="https://huggingface.co/spaces/your-username/gaia-agent") | |
submit_btn = gr.Button("π― Submit to GAIA", variant="primary") | |
with gr.Column(): | |
submission_output = gr.Textbox(label="Submission Result", lines=5, interactive=False) | |
submit_btn.click(submit_to_gaia, inputs=[username_input, code_url_input], outputs=submission_output) | |
with gr.Tab("βΉοΈ System Status"): | |
gr.Markdown("## System Information and Debugging") | |
refresh_btn = gr.Button("π Refresh Status") | |
status_output = gr.Textbox(label="System Status", lines=20, interactive=False) | |
# Load initial status | |
demo.load(get_system_status, outputs=status_output) | |
refresh_btn.click(get_system_status, outputs=status_output) | |
if __name__ == "__main__": | |
print("π Starting Gradio interface...") | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) |