import asyncio import json import logging from typing import AsyncGenerator, Dict, Any import google.genai.types as types import requests from google.adk.agents import BaseAgent, LlmAgent from google.adk.agents.invocation_context import InvocationContext from google.adk.events import Event, EventActions from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService from google.adk.tools import ( FunctionTool, ToolContext, agent_tool, # built_in_code_execution, google_search, ) # Configure logging to suppress verbose output logging.basicConfig(level=logging.ERROR) # --- API Interaction Functions --- def answer_questions() -> list[str]: """ Fetches the full list of evaluation questions from the scoring API. Each question is formatted with its task_id. """ print("Attempting to fetch questions from the API...") url = 'https://agents-course-unit4-scoring.hf.space/questions' headers = {'accept': 'application/json'} try: response = requests.get(url, headers=headers) response.raise_for_status() # Raise an exception for bad status codes questions_data = response.json() print(f"Successfully fetched {len(questions_data)} questions.") prompts = [] for item in questions_data: task_id = item['task_id'] question_text = item['question'] if item.get('file_name'): file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" prompt = f"{task_id}:{question_text} The URL for the associated file is: {file_url}" else: prompt = f"{task_id}:{question_text}" prompts.append(prompt) return prompts except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return [] def submit_questions(answers: list[Dict[str, Any]]) -> Dict[str, Any]: """ Submits the collected answers to the scoring API. Args: answers: A list of dictionaries, where each dictionary contains a 'task_id' and a 'submitted_answer'. """ # !!! IMPORTANT !!! # REPLACE the username and agent_code with your own details. username = "YOUR_HUGGING_FACE_USERNAME" agent_code_url = "https://huggingface.co/spaces/YOUR_USERNAME/YOUR_SPACE_NAME/tree/main" print(f"Attempting to submit {len(answers)} answers for user '{username}'...") url = 'https://agents-course-unit4-scoring.hf.space/submit' payload = { "username": username, "agent_code": agent_code_url, "answers": answers } headers = {'accept': 'application/json', "Content-Type": "application/json"} try: response = requests.post(url, headers=headers, json=payload) response.raise_for_status() print("Submission successful!") print("Response:", response.json()) return response.json() except requests.exceptions.RequestException as e: print(f"Error submitting answers: {e}") print(f"Response Body: {e.response.text if e.response else 'No response'}") raise # Wrap API functions in ADK Tools responses_api = FunctionTool(func=answer_questions, description="Fetches all questions from the remote server.") submit_api = FunctionTool(func=submit_questions, description="Submits the final list of answers to the remote server for scoring.") # --- Agent Definitions --- APP_NAME = "gaia_challenge_agent" USER_ID = "test_user" SESSION_ID = "main_session" # A specialized agent for tasks requiring code execution or data analysis code_agent = LlmAgent( name='CodeAgent', model="gemini-1.5-pro-latest", # Using Pro for complex code generation description="Executes code and analyzes data files (.csv, .xlsx, .json, .py) to answer a question. Responds with only the final, exact answer.", instruction=( "You are an expert in data analysis and code execution. Given a question and a file URL, " "write Python code to find the answer. " "Use pandas for data files. Fetch remote files using requests. " "Your final output must be only the answer to the question, with no extra text or explanation." ), # tools=[built_in_code_execution], ) # A specialized agent for web searches search_agent = LlmAgent( name='SearchAgent', model="gemini-1.5-flash-latest", # Flash is efficient for search-and-answer description="Searches the web to answer questions about current events, facts, or general knowledge. Responds with only the final, exact answer.", instruction=( "You are an expert web researcher. You will be given a question. " "Use your search tool to find the most accurate information. " "Synthesize the findings and provide a concise, direct answer to the question. " "Your final output must be only the answer, with no extra text." ), tools=[google_search], ) # A specialized agent for image analysis image_agent = LlmAgent( name='ImageAgent', model="gemini-1.5-flash-latest", # Flash model has vision capabilities description="Analyzes an image to answer a question about its content. Responds with only the final, exact answer.", instruction=( "You are an expert image analyst. You will be given a question and a URL to an image. " "Analyze the image content to answer the question. " "Your final output must be only the answer, with no extra text." ), ) # A specialized agent for video analysis youtube_agent = LlmAgent( name='YouTubeAgent', model="gemini-1.5-flash-latest", # Flash model has vision capabilities description="Watches a YouTube video to answer a question about its content. Responds with only the final, exact answer.", instruction=( "You are an expert video analyst. You will be given a question and a URL to a YouTube video. " "Analyze the video content to answer the question. " "Your final output must be only the answer, with no extra text." ), ) # The main orchestrator agent root_agent = LlmAgent( name='OrchestratorAgent', model="gemini-1.5-pro-latest", # Pro for robust orchestration description="Manages a team of specialized agents to answer a list of questions and submits them for scoring.", instruction=( "You are the project manager. Your goal is to answer a series of questions and submit them. " "1. **FETCH**: Start by using the `answer_questions` tool to get the list of all tasks. " "2. **DELEGATE**: For each task string, which contains a 'task_id:question', extract the task_id and the question. " " - Determine the best specialized agent for the job (Code, Search, Image, YouTube) based on the question and any file URLs. " " - Invoke that agent with the question and necessary context (like the file URL). " "3. **COLLECT**: Get the precise answer back from the specialist agent. Create a dictionary: `{'task_id': 'the_id', 'submitted_answer': 'the_answer'}`. The answer must be exact, without any extra formatting or text. " "4. **SUBMIT**: After processing all questions, gather all the answer dictionaries into a single list. Call the `submit_questions` tool with this list to complete the assignment." ), tools=[ responses_api, submit_api, agent_tool.AgentTool(agent=code_agent), agent_tool.AgentTool(agent=search_agent), agent_tool.AgentTool(agent=image_agent), agent_tool.AgentTool(agent=youtube_agent), ], ) # --- Application Runner --- session_service = InMemorySessionService() runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service) async def run_agent_process(): """Initializes a session and runs the agent's main task.""" session = await session_service.create_session( app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID ) # Corrected the print statement below print(f"===== Agent Process Started for session: {session.session_id} =====") initial_prompt = "Get all the questions, answer each one using your specialized agents, and submit the final list of answers for scoring." print(f"\nSending initial prompt to the Orchestrator Agent:\n'{initial_prompt}'") async for event in runner.run_async( session_id=session.session_id, content=types.Content(role="user", parts=[types.Part(text=initial_prompt)]), ): # Optional: Print events for debugging if event.action == EventActions.AGENT_RESPONSE and event.author == root_agent.name: if event.content and event.content.parts: print(f"\n[Orchestrator Response]: {event.content.parts[0].text}") elif event.action == EventActions.TOOL_OUTPUT: if event.content and event.content.parts and event.content.parts[0].tool_output: tool_output = event.content.parts[0].tool_output print(f"\n<-- [Tool Output] from `{tool_output.tool_name}`") print("\n===== Agent Process Finished =====") async def main(): """Main entry point for the application.""" await run_agent_process() if __name__ == "__main__": asyncio.run(main())