ashishja's picture
Update app.py
7d1b068 verified
raw
history blame
9.29 kB
import asyncio
import json
import logging
from typing import AsyncGenerator, Dict, Any
import google.genai.types as types
import requests
from google.adk.agents import BaseAgent, LlmAgent
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event, EventActions
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import (
FunctionTool,
ToolContext,
agent_tool,
# built_in_code_execution,
google_search,
)
# Configure logging to suppress verbose output
logging.basicConfig(level=logging.ERROR)
# --- API Interaction Functions ---
def answer_questions() -> list[str]:
"""
Fetches the full list of evaluation questions from the scoring API.
Each question is formatted with its task_id.
"""
print("Attempting to fetch questions from the API...")
url = 'https://agents-course-unit4-scoring.hf.space/questions'
headers = {'accept': 'application/json'}
try:
response = requests.get(url, headers=headers)
response.raise_for_status() # Raise an exception for bad status codes
questions_data = response.json()
print(f"Successfully fetched {len(questions_data)} questions.")
prompts = []
for item in questions_data:
task_id = item['task_id']
question_text = item['question']
if item.get('file_name'):
file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
prompt = f"{task_id}:{question_text} The URL for the associated file is: {file_url}"
else:
prompt = f"{task_id}:{question_text}"
prompts.append(prompt)
return prompts
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return []
def submit_questions(answers: list[Dict[str, Any]]) -> Dict[str, Any]:
"""
Submits the collected answers to the scoring API.
Args:
answers: A list of dictionaries, where each dictionary contains
a 'task_id' and a 'submitted_answer'.
"""
# !!! IMPORTANT !!!
# REPLACE the username and agent_code with your own details.
username = "YOUR_HUGGING_FACE_USERNAME"
agent_code_url = "https://huggingface.co/spaces/YOUR_USERNAME/YOUR_SPACE_NAME/tree/main"
print(f"Attempting to submit {len(answers)} answers for user '{username}'...")
url = 'https://agents-course-unit4-scoring.hf.space/submit'
payload = {
"username": username,
"agent_code": agent_code_url,
"answers": answers
}
headers = {'accept': 'application/json', "Content-Type": "application/json"}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
print("Submission successful!")
print("Response:", response.json())
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error submitting answers: {e}")
print(f"Response Body: {e.response.text if e.response else 'No response'}")
raise
# Wrap API functions in ADK Tools
responses_api = FunctionTool(func=answer_questions, description="Fetches all questions from the remote server.")
submit_api = FunctionTool(func=submit_questions, description="Submits the final list of answers to the remote server for scoring.")
# --- Agent Definitions ---
APP_NAME = "gaia_challenge_agent"
USER_ID = "test_user"
SESSION_ID = "main_session"
# A specialized agent for tasks requiring code execution or data analysis
code_agent = LlmAgent(
name='CodeAgent',
model="gemini-1.5-pro-latest", # Using Pro for complex code generation
description="Executes code and analyzes data files (.csv, .xlsx, .json, .py) to answer a question. Responds with only the final, exact answer.",
instruction=(
"You are an expert in data analysis and code execution. Given a question and a file URL, "
"write Python code to find the answer. "
"Use pandas for data files. Fetch remote files using requests. "
"Your final output must be only the answer to the question, with no extra text or explanation."
),
# tools=[built_in_code_execution],
)
# A specialized agent for web searches
search_agent = LlmAgent(
name='SearchAgent',
model="gemini-1.5-flash-latest", # Flash is efficient for search-and-answer
description="Searches the web to answer questions about current events, facts, or general knowledge. Responds with only the final, exact answer.",
instruction=(
"You are an expert web researcher. You will be given a question. "
"Use your search tool to find the most accurate information. "
"Synthesize the findings and provide a concise, direct answer to the question. "
"Your final output must be only the answer, with no extra text."
),
tools=[google_search],
)
# A specialized agent for image analysis
image_agent = LlmAgent(
name='ImageAgent',
model="gemini-1.5-flash-latest", # Flash model has vision capabilities
description="Analyzes an image to answer a question about its content. Responds with only the final, exact answer.",
instruction=(
"You are an expert image analyst. You will be given a question and a URL to an image. "
"Analyze the image content to answer the question. "
"Your final output must be only the answer, with no extra text."
),
)
# A specialized agent for video analysis
youtube_agent = LlmAgent(
name='YouTubeAgent',
model="gemini-1.5-flash-latest", # Flash model has vision capabilities
description="Watches a YouTube video to answer a question about its content. Responds with only the final, exact answer.",
instruction=(
"You are an expert video analyst. You will be given a question and a URL to a YouTube video. "
"Analyze the video content to answer the question. "
"Your final output must be only the answer, with no extra text."
),
)
# The main orchestrator agent
root_agent = LlmAgent(
name='OrchestratorAgent',
model="gemini-1.5-pro-latest", # Pro for robust orchestration
description="Manages a team of specialized agents to answer a list of questions and submits them for scoring.",
instruction=(
"You are the project manager. Your goal is to answer a series of questions and submit them. "
"1. **FETCH**: Start by using the `answer_questions` tool to get the list of all tasks. "
"2. **DELEGATE**: For each task string, which contains a 'task_id:question', extract the task_id and the question. "
" - Determine the best specialized agent for the job (Code, Search, Image, YouTube) based on the question and any file URLs. "
" - Invoke that agent with the question and necessary context (like the file URL). "
"3. **COLLECT**: Get the precise answer back from the specialist agent. Create a dictionary: `{'task_id': 'the_id', 'submitted_answer': 'the_answer'}`. The answer must be exact, without any extra formatting or text. "
"4. **SUBMIT**: After processing all questions, gather all the answer dictionaries into a single list. Call the `submit_questions` tool with this list to complete the assignment."
),
tools=[
responses_api,
submit_api,
agent_tool.AgentTool(agent=code_agent),
agent_tool.AgentTool(agent=search_agent),
agent_tool.AgentTool(agent=image_agent),
agent_tool.AgentTool(agent=youtube_agent),
],
)
# --- Application Runner ---
session_service = InMemorySessionService()
runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)
async def run_agent_process():
"""Initializes a session and runs the agent's main task."""
session = await session_service.create_session(
app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
# Corrected the print statement below
print(f"===== Agent Process Started for session: {session.session_id} =====")
initial_prompt = "Get all the questions, answer each one using your specialized agents, and submit the final list of answers for scoring."
print(f"\nSending initial prompt to the Orchestrator Agent:\n'{initial_prompt}'")
async for event in runner.run_async(
session_id=session.session_id,
content=types.Content(role="user", parts=[types.Part(text=initial_prompt)]),
):
# Optional: Print events for debugging
if event.action == EventActions.AGENT_RESPONSE and event.author == root_agent.name:
if event.content and event.content.parts:
print(f"\n[Orchestrator Response]: {event.content.parts[0].text}")
elif event.action == EventActions.TOOL_OUTPUT:
if event.content and event.content.parts and event.content.parts[0].tool_output:
tool_output = event.content.parts[0].tool_output
print(f"\n<-- [Tool Output] from `{tool_output.tool_name}`")
print("\n===== Agent Process Finished =====")
async def main():
"""Main entry point for the application."""
await run_agent_process()
if __name__ == "__main__":
asyncio.run(main())