leofltt's picture
now should be working
cc70c39
raw
history blame
14.3 kB
# app.py (Final Version)
import os
import re
import gradio as gr
import requests
import pandas as pd
import logging
import numexpr
from typing import TypedDict, Annotated
# --- Langchain & HF Imports ---
from langchain_huggingface import HuggingFaceEndpoint
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.tools import tool
from langgraph.graph import StateGraph, END
from langchain_community.document_loaders.youtube import YoutubeLoader
from transformers.pipelines import pipeline as hf_pipeline # Renamed to avoid conflict
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
SYSTEM_PROMPT = """You are a helpful and expert assistant named GAIA, designed to answer questions accurately. To do this, you have access to a set of tools. Based on the user's question, you must decide which tool to use, if any. Your process is:
1. **Analyze the Question**: Understand what is being asked.
2. **Select a Tool**: If necessary, choose the best tool. Your available tools are: `web_search`, `math_calculator`, `image_analyzer`, `youtube_transcript_reader`.
3. **Call the Tool**: Output a tool call in the format `tool_name("argument")`. For example: `web_search("what is the weather in Paris?")`.
4. **Analyze the Result**: Look at the tool's output.
5. **Final Answer**: If you have enough information, provide the final answer. If not, you can use another tool.
When you have the final answer, you **must** output it in the following format, and nothing else:
FINAL ANSWER: [YOUR FINAL ANSWER]"""
# --- Tool Definitions ---
image_to_text_pipeline = None
@tool
def web_search(query: str) -> str:
"""Searches the web using DuckDuckGo for up-to-date information."""
logging.info(f"--- Calling Web Search Tool with query: {query} ---")
search = DuckDuckGoSearchRun()
return search.run(query)
@tool
def math_calculator(expression: str) -> str:
"""Calculates the result of a mathematical expression."""
logging.info(f"--- Calling Math Calculator Tool with expression: {expression} ---")
try:
if not re.match(r"^[0-9\.\+\-\*\/\(\)\s]+$", expression):
return "Error: Invalid characters in expression."
result = numexpr.evaluate(expression).item()
return str(result)
except Exception as e:
logging.error(f"Calculator error: {e}")
return f"Error: {e}"
@tool
def image_analyzer(image_url: str) -> str:
"""Analyzes an image from a URL and returns a text description."""
global image_to_text_pipeline
logging.info(f"--- Calling Image Analyzer Tool with URL: {image_url} ---")
try:
if image_to_text_pipeline is None:
logging.info(
"--- Initializing Image Analyzer pipeline (lazy loading)... ---"
)
image_to_text_pipeline = hf_pipeline(
"image-to-text", model="Salesforce/blip-image-captioning-base"
)
logging.info("--- Image Analyzer pipeline initialized. ---")
pipeline_output = image_to_text_pipeline(image_url)
if (
pipeline_output
and isinstance(pipeline_output, list)
and len(pipeline_output) > 0
):
description = pipeline_output[0].get(
"generated_text", "Error: Could not generate text."
)
else:
description = "Error: Could not analyze image."
return description
except Exception as e:
logging.error(f"Error analyzing image: {e}")
return f"Error analyzing image: {e}"
@tool
def youtube_transcript_reader(youtube_url: str) -> str:
"""Reads the transcript of a YouTube video from its URL."""
logging.info(
f"--- Calling YouTube Transcript Reader Tool with URL: {youtube_url} ---"
)
try:
loader = YoutubeLoader.from_youtube_url(youtube_url, add_video_info=False)
docs = loader.load()
transcript = " ".join([doc.page_content for doc in docs])
return transcript[:4000]
except Exception as e:
logging.error(f"Error reading YouTube transcript: {e}")
return f"Error: {e}"
class AgentState(TypedDict):
question: str
messages: Annotated[list, lambda x, y: x + y]
sender: str
class GaiaAgent:
def __init__(self):
logging.info("Initializing GaiaAgent...")
self.tools = [
web_search,
math_calculator,
image_analyzer,
youtube_transcript_reader,
]
# --- THIS IS THE CORRECTED LLM INITIALIZATION ---
logging.info("Initializing LLM via modern HuggingFaceEndpoint...")
llm = HuggingFaceEndpoint(
repo_id="HuggingFaceH4/zephyr-7b-beta",
temperature=0.1,
max_new_tokens=1024,
huggingfacehub_api_token=os.getenv("HUGGINGFACEHUB_API_TOKEN"),
)
logging.info("LLM initialized successfully.")
# The rest of the class remains the same
prompt = PromptTemplate(
template=SYSTEM_PROMPT
+ "\nHere is the current conversation:\n{messages}\n\nQuestion: {question}",
input_variables=["messages", "question"],
)
self.agent = prompt | llm | StrOutputParser()
self.graph = self._create_graph()
logging.info("GaiaAgent initialized successfully.")
def _create_graph(self):
graph = StateGraph(AgentState)
graph.add_node("agent", self._call_agent)
graph.add_node("tools", self._call_tools)
graph.add_conditional_edges(
"agent", self._decide_action, {END: END, "tools": "tools"}
)
graph.add_edge("tools", "agent")
graph.set_entry_point("agent")
return graph.compile()
def _call_agent(self, state: AgentState):
logging.info("--- Calling Agent ---")
message_history = "\n".join(state["messages"])
response = self.agent.invoke(
{"messages": message_history, "question": state["question"]}
)
return {"messages": [response], "sender": "agent"}
def _decide_action(self, state: AgentState):
logging.info("--- Deciding Action ---")
response = state["messages"][-1]
if "FINAL ANSWER:" in response:
return END
else:
return "tools"
def _call_tools(self, state: AgentState):
logging.info("--- Calling Tools ---")
raw_tool_call = state["messages"][-1]
tool_call_match = re.search(r"(\w+)\s*\((.*?)\)", raw_tool_call, re.DOTALL)
if not tool_call_match:
logging.warning("No valid tool call found in agent response.")
return {
"messages": [
'No valid tool call found. Please format your response as `tool_name("argument")` or provide a `FINAL ANSWER:`.'
],
"sender": "tools",
}
tool_name = tool_call_match.group(1).strip()
tool_input_str = tool_call_match.group(2).strip()
if (tool_input_str.startswith('"') and tool_input_str.endswith('"')) or (
tool_input_str.startswith("'") and tool_input_str.endswith("'")
):
tool_input = tool_input_str[1:-1]
else:
tool_input = tool_input_str
tool_to_call = next((t for t in self.tools if t.name == tool_name), None)
if tool_to_call:
try:
result = tool_to_call.run(tool_input)
return {"messages": [str(result)], "sender": "tools"}
except Exception as e:
logging.error(f"Error executing tool {tool_name}: {e}")
return {
"messages": [f"Error executing tool {tool_name}: {e}"],
"sender": "tools",
}
else:
logging.warning(f"Tool '{tool_name}' not found.")
return {"messages": [f"Tool '{tool_name}' not found."], "sender": "tools"}
def __call__(self, question: str) -> str:
logging.info(f"Agent received question: {question[:100]}...")
try:
initial_state = {"question": question, "messages": [], "sender": "user"}
final_state = self.graph.invoke(initial_state, {"recursion_limit": 15})
final_response = final_state["messages"][-1]
match = re.search(
r"FINAL ANSWER:\s*(.*)", final_response, re.IGNORECASE | re.DOTALL
)
if match:
extracted_answer = match.group(1).strip()
logging.info(f"Agent returning final answer: {extracted_answer}")
return extracted_answer
else:
logging.warning(
"Agent could not find a final answer. Returning the last message."
)
return final_response
except Exception as e:
logging.error(f"Error during agent invocation: {e}", exc_info=True)
return f"Error during agent invocation: {e}"
# In app.py
# ... (keep all the code above this function)
def run_and_submit_all(profile: gr.OAuthProfile | None):
if not profile:
return "Please Login to Hugging Face with the button.", None
username = profile.username
logging.info(f"User logged in: {username}")
space_id = os.getenv("SPACE_ID")
if not space_id:
space_id = "leofltt/HF_Agents_Final_Assignment"
logging.warning(f"SPACE_ID not found, using fallback for local run: {space_id}")
if not space_id:
return "CRITICAL ERROR: SPACE_ID environment variable is not set.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
try:
agent = GaiaAgent()
except Exception as e:
logging.critical(f"Fatal error instantiating agent: {e}", exc_info=True)
return f"Fatal error initializing agent: {e}", None
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
logging.info(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=20)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
return "Fetched questions list is empty.", None
logging.info(f"Successfully fetched {len(questions_data)} questions.")
except Exception as e:
return f"Error fetching questions: {e}", None
# --- MODIFICATION FOR DEBUGGING ---
# We will only process the first question from the list.
questions_to_process = [questions_data[0]]
logging.info(
f"DEBUG MODE: Processing only the first question out of {len(questions_data)}."
)
# --- END OF MODIFICATION ---
results_log = []
answers_payload = []
# The loop now runs only once.
for i, item in enumerate(questions_to_process):
task_id = item.get("task_id")
question_text = item.get("question")
logging.info(f"--- Processing question (Task ID: {task_id}) ---")
if not task_id or question_text is None:
continue
try:
submitted_answer = agent(question_text)
answers_payload.append(
{"task_id": task_id, "submitted_answer": submitted_answer}
)
results_log.append(
{
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": submitted_answer,
}
)
except Exception as e:
logging.error(f"Error running agent on task {task_id}: {e}", exc_info=True)
results_log.append(
{
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": f"AGENT ERROR: {e}",
}
)
# Also return the error in the status for immediate feedback
return f"Agent failed on the first question with error: {e}", pd.DataFrame(
results_log
)
if not answers_payload:
return "Agent did not produce an answer for the first question.", pd.DataFrame(
results_log
)
submission_data = {
"username": username.strip(),
"agent_code": agent_code,
"answers": answers_payload,
}
logging.info(f"Submitting {len(answers_payload)} answer for user '{username}'...")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful (for one question)!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
return final_status, pd.DataFrame(results_log)
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}. Detail: {e.response.text}"
return f"Submission Failed: {error_detail}", pd.DataFrame(results_log)
except Exception as e:
return f"An unexpected error occurred during submission: {e}", pd.DataFrame(
results_log
)
with gr.Blocks() as demo:
gr.Markdown("# GAIA Agent Evaluation Runner")
gr.Markdown(
"This agent uses LangGraph and Mistral-7B to answer questions from the GAIA benchmark."
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(
label="Run Status / Submission Result", lines=5, interactive=False
)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table])
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logging.info("App Starting (Final Version)...")
demo.launch()