File size: 9,292 Bytes
7d1b068
 
 
 
 
581911f
eccf8e4
7d1b068
581911f
7d1b068
 
 
 
 
 
 
 
 
 
581911f
7d1b068
581911f
b37e524
7d1b068
581911f
7d1b068
 
 
 
 
 
581911f
 
7d1b068
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b37e524
 
7d1b068
 
 
 
 
 
 
 
 
 
 
 
 
581911f
 
7d1b068
 
b37e524
 
581911f
7d1b068
 
 
 
b37e524
7d1b068
581911f
7d1b068
 
 
 
7d65c66
7d1b068
 
 
581911f
7d1b068
581911f
7d1b068
 
 
581911f
7d1b068
581911f
7d1b068
 
 
b37e524
7d1b068
 
 
 
b37e524
7d1b068
581911f
 
7d1b068
581911f
7d1b068
 
 
b37e524
7d1b068
 
 
 
b37e524
 
581911f
 
7d1b068
581911f
7d1b068
 
 
b37e524
7d1b068
 
 
581911f
 
 
7d1b068
581911f
7d1b068
 
 
b37e524
7d1b068
 
 
581911f
 
 
7d1b068
581911f
7d1b068
 
 
b37e524
7d1b068
 
 
 
 
 
 
b37e524
 
 
 
 
 
7d1b068
b37e524
 
3f1eae6
 
7d1b068
 
581911f
b37e524
 
7d1b068
 
b37e524
 
 
7d1b068
 
 
 
 
 
 
b37e524
 
 
 
7d1b068
b37e524
 
7d1b068
b37e524
 
 
7d1b068
b37e524
7d1b068
581911f
3f1eae6
b37e524
7d1b068
581911f
3f1eae6
b37e524
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import asyncio
import json
import logging
from typing import AsyncGenerator, Dict, Any

import google.genai.types as types
import requests
from google.adk.agents import BaseAgent, LlmAgent
from google.adk.agents.invocation_context import InvocationContext
from google.adk.events import Event, EventActions
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import (
    FunctionTool,
    ToolContext,
    agent_tool,
    # built_in_code_execution,
    google_search,
)

# Configure logging to suppress verbose output
logging.basicConfig(level=logging.ERROR)

# --- API Interaction Functions ---

def answer_questions() -> list[str]:
    """
    Fetches the full list of evaluation questions from the scoring API.
    Each question is formatted with its task_id.
    """
    print("Attempting to fetch questions from the API...")
    url = 'https://agents-course-unit4-scoring.hf.space/questions'
    headers = {'accept': 'application/json'}
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()  # Raise an exception for bad status codes
        questions_data = response.json()
        print(f"Successfully fetched {len(questions_data)} questions.")
        prompts = []
        for item in questions_data:
            task_id = item['task_id']
            question_text = item['question']
            if item.get('file_name'):
                file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
                prompt = f"{task_id}:{question_text} The URL for the associated file is: {file_url}"
            else:
                prompt = f"{task_id}:{question_text}"
            prompts.append(prompt)
        return prompts
    except requests.exceptions.RequestException as e:
        print(f"Error fetching questions: {e}")
        return []

def submit_questions(answers: list[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Submits the collected answers to the scoring API.

    Args:
        answers: A list of dictionaries, where each dictionary contains
                 a 'task_id' and a 'submitted_answer'.
    """
    # !!! IMPORTANT !!!
    # REPLACE the username and agent_code with your own details.
    username = "YOUR_HUGGING_FACE_USERNAME"
    agent_code_url = "https://huggingface.co/spaces/YOUR_USERNAME/YOUR_SPACE_NAME/tree/main"
    
    print(f"Attempting to submit {len(answers)} answers for user '{username}'...")
    url = 'https://agents-course-unit4-scoring.hf.space/submit'
    payload = {
        "username": username,
        "agent_code": agent_code_url,
        "answers": answers
    }
    headers = {'accept': 'application/json', "Content-Type": "application/json"}
    
    try:
        response = requests.post(url, headers=headers, json=payload)
        response.raise_for_status()
        print("Submission successful!")
        print("Response:", response.json())
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error submitting answers: {e}")
        print(f"Response Body: {e.response.text if e.response else 'No response'}")
        raise

# Wrap API functions in ADK Tools
responses_api = FunctionTool(func=answer_questions, description="Fetches all questions from the remote server.")
submit_api = FunctionTool(func=submit_questions, description="Submits the final list of answers to the remote server for scoring.")

# --- Agent Definitions ---

APP_NAME = "gaia_challenge_agent"
USER_ID = "test_user"
SESSION_ID = "main_session"

# A specialized agent for tasks requiring code execution or data analysis
code_agent = LlmAgent(
    name='CodeAgent',
    model="gemini-1.5-pro-latest", # Using Pro for complex code generation
    description="Executes code and analyzes data files (.csv, .xlsx, .json, .py) to answer a question. Responds with only the final, exact answer.",
    instruction=(
        "You are an expert in data analysis and code execution. Given a question and a file URL, "
        "write Python code to find the answer. "
        "Use pandas for data files. Fetch remote files using requests. "
        "Your final output must be only the answer to the question, with no extra text or explanation."
    ),
    # tools=[built_in_code_execution],
)

# A specialized agent for web searches
search_agent = LlmAgent(
    name='SearchAgent',
    model="gemini-1.5-flash-latest", # Flash is efficient for search-and-answer
    description="Searches the web to answer questions about current events, facts, or general knowledge. Responds with only the final, exact answer.",
    instruction=(
        "You are an expert web researcher. You will be given a question. "
        "Use your search tool to find the most accurate information. "
        "Synthesize the findings and provide a concise, direct answer to the question. "
        "Your final output must be only the answer, with no extra text."
    ),
    tools=[google_search],
)

# A specialized agent for image analysis
image_agent = LlmAgent(
    name='ImageAgent',
    model="gemini-1.5-flash-latest", # Flash model has vision capabilities
    description="Analyzes an image to answer a question about its content. Responds with only the final, exact answer.",
    instruction=(
        "You are an expert image analyst. You will be given a question and a URL to an image. "
        "Analyze the image content to answer the question. "
        "Your final output must be only the answer, with no extra text."
    ),
)

# A specialized agent for video analysis
youtube_agent = LlmAgent(
    name='YouTubeAgent',
    model="gemini-1.5-flash-latest", # Flash model has vision capabilities
    description="Watches a YouTube video to answer a question about its content. Responds with only the final, exact answer.",
    instruction=(
        "You are an expert video analyst. You will be given a question and a URL to a YouTube video. "
        "Analyze the video content to answer the question. "
        "Your final output must be only the answer, with no extra text."
    ),
)

# The main orchestrator agent
root_agent = LlmAgent(
    name='OrchestratorAgent',
    model="gemini-1.5-pro-latest", # Pro for robust orchestration
    description="Manages a team of specialized agents to answer a list of questions and submits them for scoring.",
    instruction=(
        "You are the project manager. Your goal is to answer a series of questions and submit them. "
        "1. **FETCH**: Start by using the `answer_questions` tool to get the list of all tasks. "
        "2. **DELEGATE**: For each task string, which contains a 'task_id:question', extract the task_id and the question. "
        "   - Determine the best specialized agent for the job (Code, Search, Image, YouTube) based on the question and any file URLs. "
        "   - Invoke that agent with the question and necessary context (like the file URL). "
        "3. **COLLECT**: Get the precise answer back from the specialist agent. Create a dictionary: `{'task_id': 'the_id', 'submitted_answer': 'the_answer'}`. The answer must be exact, without any extra formatting or text. "
        "4. **SUBMIT**: After processing all questions, gather all the answer dictionaries into a single list. Call the `submit_questions` tool with this list to complete the assignment."
    ),
    tools=[
        responses_api,
        submit_api,
        agent_tool.AgentTool(agent=code_agent),
        agent_tool.AgentTool(agent=search_agent),
        agent_tool.AgentTool(agent=image_agent),
        agent_tool.AgentTool(agent=youtube_agent),
    ],
)

# --- Application Runner ---

session_service = InMemorySessionService()
runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)

async def run_agent_process():
    """Initializes a session and runs the agent's main task."""
    session = await session_service.create_session(
        app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
    )
    # Corrected the print statement below
    print(f"===== Agent Process Started for session: {session.session_id} =====")
    
    initial_prompt = "Get all the questions, answer each one using your specialized agents, and submit the final list of answers for scoring."
    
    print(f"\nSending initial prompt to the Orchestrator Agent:\n'{initial_prompt}'")
    
    async for event in runner.run_async(
        session_id=session.session_id,
        content=types.Content(role="user", parts=[types.Part(text=initial_prompt)]),
    ):
        # Optional: Print events for debugging
        if event.action == EventActions.AGENT_RESPONSE and event.author == root_agent.name:
            if event.content and event.content.parts:
                print(f"\n[Orchestrator Response]: {event.content.parts[0].text}")
        elif event.action == EventActions.TOOL_OUTPUT:
             if event.content and event.content.parts and event.content.parts[0].tool_output:
                tool_output = event.content.parts[0].tool_output
                print(f"\n<-- [Tool Output] from `{tool_output.tool_name}`")

    print("\n===== Agent Process Finished =====")

async def main():
    """Main entry point for the application."""
    await run_agent_process()

if __name__ == "__main__":
    asyncio.run(main())