ashishja's picture
Update app.py
b37e524 verified
raw
history blame
8.55 kB
from zoneinfo import ZoneInfo
from google.adk.agents import Agent,BaseAgent,LlmAgent
from google.adk.tools import google_search
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
import google.genai.types as types
import requests
from google.adk.events import Event, EventActions
from google.adk.agents.invocation_context import InvocationContext
from typing import AsyncGenerator, Dict, Any
from google.genai import types as genai_types
from google.adk.tools import ToolContext, FunctionTool
import logging
from google.adk.tools import built_in_code_execution
from google.adk.tools import agent_tool
import asyncio
logging.basicConfig(level=logging.ERROR)
url = 'https://agents-course-unit4-scoring.hf.space/questions'
headers = {'accept': 'application/json'}
response = requests.get(url, headers=headers)
def answer_questions():
"""Fetches questions from the scoring API."""
url = 'https://agents-course-unit4-scoring.hf.space/questions'
headers = {'accept': 'application/json'}
response = requests.get(url, headers=headers)
response.raise_for_status() # Raise an exception for bad status codes
prompts = []
for i in response.json():
task_id = i['task_id']
if i['file_name']:
url_file = f"https://agents-course-unit4-scoring.hf.space/files/{i['task_id']}"
question = i['question']
prompt = f"{task_id}:{question} and the file is {url_file}, give the final answer only"
else:
question = i['question']
prompt = f"{task_id}:{question} give the final answer only"
prompts.append(prompt)
return prompts
def submit_questions(answers: list[Dict[str, Any]]) -> Dict[str, Any]:
"""Submits the collected answers to the scoring API."""
url = 'https://agents-course-unit4-scoring.hf.space/submit'
payload = {
"username": "ashishja",
"agent_code": "https://huggingface.co/spaces/ashishja/Agents_Course_Final_Assignment_Ashish",
"answers": answers
}
headers = {'accept': 'application/json', "Content-Type": "application/json"}
response = requests.post(url, headers=headers, json=payload)
import json
print("Submitting the following payload:")
print(json.dumps(payload, indent=2))
if response.status_code == 200:
print("Submission successful!")
return response.json()
else:
print(f"Submission failed with status {response.status_code}: {response.text}")
response.raise_for_status()
responses_api = FunctionTool(func=answer_questions)
submit_api = FunctionTool(func=submit_questions)
# class QuestionAnswerer(LlmAgent):
# async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]:
# questions_to_answer = ctx.session_service.get('fetched_questions', [])
# for q in questions_to_answer:
# answer = await self._llm(messages=[types.ChatMessage(role="user", parts=[types.Part(text=q)])])
# yield Event(author=self.name, content=answer.content)
#
# qa = QuestionAnswerer(name = 'qa_1', model="gemini-1.5-flash-latest", description="Question Answerer")
APP_NAME="final_assignment_agent"
USER_ID="user1234"
SESSION_ID="5678"
code_agent = LlmAgent(
name='codegaiaAgent',
model="gemini-2.0-flash",
description=(
"You are a smart agent that can write and execute code to answer questions. Use this for questions involving code files (.py) or data files (.csv, .xlsx, .json, .txt)."
),
instruction=(
"If the question contains a file with .py, get the code file and, depending on the question and the file provided, execute the code and provide the final answer. "
"If the question contains a spreadsheet file like .xlsx or .csv, get the file, use pandas to analyze it, and provide the final answer. "
"If the question contains a file with .txt or .json, get the file and use code to parse it and answer the question. "
"Always use the code execution tool to run your code and provide only the final answer."
),
tools=[built_in_code_execution],
)
search_agent = LlmAgent(
name='searchgaiaAgent',
model="gemini-2.0-flash",
description=(
"You are a smart agent that can search the web to answer questions."
),
instruction=(
"Get the URL associated with the question, perform a web search, consolidate the information, and answer the provided question."
),
tools=[google_search],
)
image_agent = LlmAgent(
name='imagegaiaAgent',
model="gemini-2.0-flash",
description=(
"You are a smart agent that can analyze an image file and answer any questions related to it."
),
instruction=(
"Get the image file from the link provided in the prompt. Use your multimodal capabilities to understand the image and answer the question."
),
)
youtube_agent = LlmAgent(
name='youtubegaiaAgent',
model="gemini-2.0-flash",
description=(
"You are a smart agent that can watch a YouTube video and answer any questions related to it."
),
instruction=(
"Get the YouTube link from the prompt. Use your multimodal capabilities to watch the video and answer the provided question."
),
)
root_agent = LlmAgent(
name='basegaiaAgent',
model="gemini-2.0-flash",
description=(
"You are a master agent that orchestrates sub-agents to answer various types of questions."
),
instruction=(
"You are a helpful orchestrator agent. Your primary goal is to answer a series of questions and submit them. "
"First, invoke your tool 'answer_questions' to retrieve the list of questions. "
"Once you receive the list, iterate through each question. For each one, delegate to the most appropriate sub-agent (code, search, youtube, image) based on its description and the question's content (e.g., file type). "
"After getting the answer from the sub-agent, format it into a dictionary with 'task_id' and 'submitted_answer' keys. The task_id is at the beginning of each question string, separated by a colon. "
"Collect all these answer dictionaries into a single list. "
"Finally, pass this complete list of dictionaries to the 'submit_questions' tool to submit all answers at once."
),
tools=[
responses_api,
submit_api,
agent_tool.AgentTool(agent=code_agent),
agent_tool.AgentTool(agent=search_agent),
agent_tool.AgentTool(agent=youtube_agent),
agent_tool.AgentTool(agent=image_agent)
],
)
session_service = InMemorySessionService()
runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)
async def process_questions_and_answer():
"""
Orchestrates the entire process of fetching questions, answering them
using the agent, and submitting the final answers.
"""
session = await session_service.create_session(
app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
print(f"===== Application Startup at {session.create_time} =====")
print(f"Session created: {session.session_id}")
# Initial prompt to kick off the agent's task
initial_prompt = "Please get all the questions, answer each one by delegating to the correct tool or sub-agent, format the answers, and then submit the final list."
print("\nSending initial prompt to the agent...")
print(f"Prompt: '{initial_prompt}'")
# Run the agent and stream events
async for event in runner.run_async(
session_id=session.session_id,
content=types.Content(role="user", parts=[types.Part(text=initial_prompt)]),
):
if event.action == EventActions.AGENT_RESPONSE and event.author == root_agent.name:
if event.content and event.content.parts:
print(f"\nFinal Agent Response: {event.content.parts[0].text}")
elif event.action == EventActions.TOOL_OUTPUT:
if event.content and event.content.parts and event.content.parts[0].tool_output:
tool_output = event.content.parts[0].tool_output
print(f"\n<-- Tool Output from {tool_output.tool_name}:")
for key, value in tool_output.data.items():
print(f" {key}: {value}")
print("\n===== Task Complete =====")
async def main():
"""Main entry point for the application."""
await process_questions_and_answer()
if __name__ == "__main__":
asyncio.run(main())