|
import os |
|
import gradio as gr |
|
import requests |
|
import inspect |
|
import pandas as pd |
|
|
|
|
|
try: |
|
from google.genai import types |
|
from agent import session_service, APP_NAME, USER_ID, SESSION_ID, runner |
|
GOOGLE_ADK_AVAILABLE = True |
|
print("β
Google ADK components loaded successfully") |
|
except ImportError as e: |
|
print(f"β οΈ Google ADK not available: {e}") |
|
print("π Falling back to simple HTTP-based agent") |
|
GOOGLE_ADK_AVAILABLE = False |
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
class SimpleAgent: |
|
def __init__(self): |
|
print("SimpleAgent initialized - using basic HTTP requests") |
|
|
|
def __call__(self, question: str) -> str: |
|
print(f"SimpleAgent received question (first 50 chars): {question[:50]}...") |
|
|
|
try: |
|
|
|
question_lower = question.lower() |
|
|
|
|
|
if self._is_complex_gaia_question(question): |
|
return self._handle_complex_question(question) |
|
|
|
|
|
elif any(word in question_lower for word in ['calculate', 'sum', 'total', 'add', 'multiply', 'divide']): |
|
return self._try_basic_math(question) |
|
|
|
|
|
elif any(word in question_lower for word in ['how many', 'count', 'number of']): |
|
return "I would need to analyze the data to count the items. [SimpleAgent - limited capabilities]" |
|
|
|
|
|
elif 'file' in question_lower or 'excel' in question_lower or 'csv' in question_lower: |
|
return "I would need to download and analyze the file to answer this question. [SimpleAgent - limited capabilities]" |
|
|
|
|
|
elif any(word in question_lower for word in ['who is', 'who are', 'what is']): |
|
return "I would need to search for information about this topic. [SimpleAgent - limited capabilities]" |
|
|
|
|
|
else: |
|
return f"I received your question but need more advanced capabilities to answer it properly. Question: {question[:200]}... [SimpleAgent - Google ADK not available]" |
|
|
|
except Exception as e: |
|
return f"Error processing question: {str(e)} [SimpleAgent]" |
|
|
|
def _is_complex_gaia_question(self, question): |
|
"""Detect if this is a complex GAIA-style question requiring multiple steps""" |
|
indicators = [ |
|
'painting', 'film', 'movie', 'ocean liner', 'ship', 'menu', |
|
'clockwise', 'order', 'arrangement', 'position', |
|
'comma-separated', 'list', 'plural form', |
|
'served as part of', 'later used as', 'floating prop' |
|
] |
|
question_lower = question.lower() |
|
return sum(1 for indicator in indicators if indicator in question_lower) >= 3 |
|
|
|
def _handle_complex_question(self, question): |
|
"""Handle complex GAIA questions with basic analysis""" |
|
question_lower = question.lower() |
|
|
|
|
|
steps_needed = [] |
|
|
|
if 'painting' in question_lower: |
|
steps_needed.append("π¨ Analyze painting/image") |
|
if any(word in question_lower for word in ['film', 'movie']): |
|
steps_needed.append("π¬ Research film information") |
|
if any(word in question_lower for word in ['ocean liner', 'ship']): |
|
steps_needed.append("π’ Research ship/vessel details") |
|
if 'menu' in question_lower: |
|
steps_needed.append("π Find historical menu information") |
|
if any(word in question_lower for word in ['clockwise', 'order', 'arrangement']): |
|
steps_needed.append("π Analyze spatial arrangement") |
|
|
|
analysis = f"This appears to be a complex GAIA question requiring multiple steps:\n" |
|
for i, step in enumerate(steps_needed, 1): |
|
analysis += f"{i}. {step}\n" |
|
|
|
analysis += "\nI would need advanced capabilities including:\n" |
|
analysis += "- Image analysis for visual content\n" |
|
analysis += "- Web search for historical/factual information\n" |
|
analysis += "- Multi-step reasoning to connect different pieces of information\n" |
|
analysis += "\n[SimpleAgent - Complex GAIA question detected but cannot solve]" |
|
|
|
return analysis |
|
|
|
def _try_basic_math(self, question): |
|
"""Try to extract and solve basic math from the question""" |
|
try: |
|
|
|
import re |
|
numbers = re.findall(r'\d+\.?\d*', question) |
|
if len(numbers) >= 2: |
|
nums = [float(n) for n in numbers[:2]] |
|
if 'add' in question.lower() or 'sum' in question.lower(): |
|
result = nums[0] + nums[1] |
|
return f"Basic calculation: {nums[0]} + {nums[1]} = {result} [SimpleAgent - basic math]" |
|
elif 'multiply' in question.lower(): |
|
result = nums[0] * nums[1] |
|
return f"Basic calculation: {nums[0]} Γ {nums[1]} = {result} [SimpleAgent - basic math]" |
|
|
|
return "I can see this involves math but need more advanced capabilities to solve it. [SimpleAgent - limited math]" |
|
except: |
|
return "I can see this involves math but couldn't parse it. [SimpleAgent - limited math]" |
|
|
|
|
|
|
|
class GoogleADKAgent: |
|
def __init__(self): |
|
print("GoogleADKAgent initialized with Google ADK runner and agents.") |
|
|
|
try: |
|
|
|
self.runner = runner |
|
self.session_service = session_service |
|
self.app_name = APP_NAME |
|
self.user_id = USER_ID |
|
self.question_counter = 0 |
|
self.initialized = True |
|
print("β
Google ADK Agent successfully initialized using pre-configured runner") |
|
|
|
except Exception as e: |
|
print(f"β Failed to initialize Google ADK Agent: {e}") |
|
self.initialized = False |
|
raise e |
|
|
|
def __call__(self, question: str) -> str: |
|
print(f"Agent received question (first 50 chars): {question[:50]}...") |
|
|
|
if not self.initialized: |
|
return "Google ADK Agent not properly initialized" |
|
|
|
try: |
|
|
|
self.question_counter += 1 |
|
unique_session_id = f"{SESSION_ID}_q{self.question_counter}" |
|
|
|
|
|
try: |
|
self.session_service.create_session( |
|
app_name=self.app_name, |
|
user_id=self.user_id, |
|
session_id=unique_session_id |
|
) |
|
print(f"β
Created session: {unique_session_id}") |
|
except Exception as session_error: |
|
print(f"β οΈ Session creation error: {session_error}") |
|
|
|
unique_session_id = SESSION_ID |
|
|
|
|
|
query_content = types.Content( |
|
role='user', |
|
parts=[types.Part(text=question)] |
|
) |
|
|
|
|
|
print(f"π Running agent with session: {unique_session_id}") |
|
events = list(self.runner.run( |
|
user_id=self.user_id, |
|
session_id=unique_session_id, |
|
new_message=query_content |
|
)) |
|
|
|
print(f"π Generated {len(events)} events") |
|
|
|
|
|
for i, event in enumerate(events): |
|
print(f"Event {i}: author={getattr(event, 'author', 'unknown')}, content_type={type(getattr(event, 'content', None))}") |
|
if hasattr(event, 'content') and event.content and hasattr(event.content, 'parts'): |
|
for j, part in enumerate(event.content.parts): |
|
if hasattr(part, 'text') and part.text: |
|
print(f" Part {j}: {part.text[:100]}...") |
|
|
|
|
|
final_answer = "No response generated." |
|
|
|
|
|
final_answer = self._extract_gaia_answer(events) |
|
|
|
|
|
final_answer = self._clean_answer_for_exact_match(final_answer) |
|
|
|
print(f"Agent returning answer: {final_answer[:100]}...") |
|
return final_answer |
|
|
|
except Exception as e: |
|
error_msg = f"Error running Google ADK agent: {str(e)}" |
|
print(error_msg) |
|
return error_msg |
|
|
|
def _extract_gaia_answer(self, events): |
|
"""Extract the final answer from events with GAIA-specific logic""" |
|
final_answer = "No response generated." |
|
|
|
|
|
all_responses = [] |
|
for event in events: |
|
if event.content and event.content.parts: |
|
for part in event.content.parts: |
|
if part.text and part.text.strip(): |
|
text = part.text.strip() |
|
|
|
if (not text.startswith("I'll") and |
|
not text.startswith("Let me") and |
|
not text.startswith("I need to") and |
|
len(text) > 10): |
|
all_responses.append(text) |
|
|
|
|
|
if all_responses: |
|
|
|
for response in reversed(all_responses): |
|
|
|
if not any(phrase in response.lower() for phrase in [ |
|
"let me", "i need to", "first", "next", "then", "now i'll" |
|
]): |
|
final_answer = response |
|
break |
|
|
|
|
|
if final_answer == "No response generated.": |
|
final_answer = all_responses[-1] |
|
else: |
|
|
|
for event in reversed(events): |
|
if event.content and event.content.parts: |
|
for part in event.content.parts: |
|
if part.text and part.text.strip(): |
|
final_answer = part.text.strip() |
|
break |
|
if final_answer != "No response generated.": |
|
break |
|
|
|
return final_answer |
|
|
|
def _clean_answer_for_exact_match(self, answer): |
|
"""Clean the answer for exact matching requirements""" |
|
if not answer or answer == "No response generated.": |
|
return answer |
|
|
|
|
|
prefixes_to_remove = [ |
|
"The answer is: ", |
|
"Answer: ", |
|
"Final answer: ", |
|
"FINAL ANSWER: ", |
|
"Based on my analysis, ", |
|
"The result is: ", |
|
] |
|
|
|
cleaned = answer |
|
for prefix in prefixes_to_remove: |
|
if cleaned.startswith(prefix): |
|
cleaned = cleaned[len(prefix):] |
|
|
|
|
|
import re |
|
cleaned = re.sub(r'\s*\[.*?\]\s*$', '', cleaned) |
|
cleaned = re.sub(r'\s*\(.*?\)\s*$', '', cleaned) |
|
|
|
|
|
cleaned = cleaned.strip() |
|
|
|
return cleaned |
|
|
|
def run_and_submit_all( profile: gr.OAuthProfile | None): |
|
""" |
|
Fetches all questions, runs the BasicAgent on them, submits all answers, |
|
and displays the results. |
|
""" |
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
if profile: |
|
username= f"{profile.username}" |
|
print(f"User logged in: {username}") |
|
else: |
|
print("User not logged in.") |
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
try: |
|
if GOOGLE_ADK_AVAILABLE: |
|
agent = GoogleADKAgent() |
|
print("β
Using Google ADK Agent") |
|
else: |
|
agent = SimpleAgent() |
|
print("β οΈ Using Simple Agent (Google ADK not available)") |
|
except Exception as e: |
|
print(f"Error instantiating agent: {e}") |
|
|
|
try: |
|
agent = SimpleAgent() |
|
print("π Fallback to Simple Agent due to error") |
|
except Exception as e2: |
|
print(f"Error with fallback agent: {e2}") |
|
return f"Error initializing any agent: {e}, {e2}", None |
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
print(agent_code) |
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
if not questions_data: |
|
print("Fetched questions list is empty.") |
|
return "Fetched questions list is empty or invalid format.", None |
|
print(f"Fetched {len(questions_data)} questions.") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching questions: {e}") |
|
return f"Error fetching questions: {e}", None |
|
except requests.exceptions.JSONDecodeError as e: |
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
print(f"Response text: {response.text[:500]}") |
|
return f"Error decoding server response for questions: {e}", None |
|
except Exception as e: |
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
results_log = [] |
|
answers_payload = [] |
|
print(f"Running agent on {len(questions_data)} questions...") |
|
for item in questions_data: |
|
task_id = item.get("task_id") |
|
question_text = item.get("question") |
|
file_name = item.get("file_name", "") |
|
|
|
if not task_id or question_text is None: |
|
print(f"Skipping item with missing task_id or question: {item}") |
|
continue |
|
|
|
|
|
enhanced_question = question_text |
|
if file_name: |
|
file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" |
|
enhanced_question = f"{question_text}\n\nFile available at: {file_url}" |
|
|
|
try: |
|
submitted_answer = agent(enhanced_question) |
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
except Exception as e: |
|
print(f"Error running agent on task {task_id}: {e}") |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
if not answers_payload: |
|
print("Agent did not produce any answers to submit.") |
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
|
print(status_update) |
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
response.raise_for_status() |
|
result_data = response.json() |
|
final_status = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
) |
|
print("Submission successful.") |
|
results_df = pd.DataFrame(results_log) |
|
return final_status, results_df |
|
except requests.exceptions.HTTPError as e: |
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
try: |
|
error_json = e.response.json() |
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
except requests.exceptions.JSONDecodeError: |
|
error_detail += f" Response: {e.response.text[:500]}" |
|
status_message = f"Submission Failed: {error_detail}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.Timeout: |
|
status_message = "Submission Failed: The request timed out." |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.RequestException as e: |
|
status_message = f"Submission Failed: Network error - {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except Exception as e: |
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# π€ GAIA Benchmark Agent Evaluation") |
|
|
|
|
|
if GOOGLE_ADK_AVAILABLE: |
|
status_msg = "β
**Google ADK Agent Active** - Full capabilities for complex GAIA questions including multi-step reasoning, web search, code execution, file analysis, and multimodal understanding." |
|
else: |
|
status_msg = "β οΈ **Simple Agent Active** - Limited capabilities. Google ADK not available in this environment. Can detect GAIA question types but cannot solve them." |
|
|
|
gr.Markdown(f"**Agent Status:** {status_msg}") |
|
|
|
gr.Markdown( |
|
""" |
|
## About GAIA Benchmark |
|
|
|
This evaluation uses questions from the **GAIA benchmark** - a challenging dataset that tests AI agents on: |
|
- π **Multi-step reasoning** across different domains |
|
- πΌοΈ **Multimodal understanding** (text, images, files) |
|
- π **Multi-hop information retrieval** |
|
- π **Structured output formatting** |
|
- π― **Exact answer matching** |
|
|
|
**Example GAIA Question:** |
|
*"Which of the fruits shown in the 2008 painting 'Embroidery from Uzbekistan' were served as part of the October 1949 breakfast menu for the ocean liner that was later used as a floating prop for the film 'The Last Voyage'?"* |
|
|
|
--- |
|
|
|
**Instructions:** |
|
1. **Clone this space** and customize the agent code for your approach |
|
2. **Log in** to your Hugging Face account using the button below |
|
3. **Run Evaluation** to test your agent on 20 filtered GAIA questions |
|
4. **Submit answers** for scoring with exact match evaluation |
|
|
|
**Target:** Aim for ~30% accuracy on Level 1 GAIA questions (current benchmark performance) |
|
|
|
--- |
|
**Note:** Evaluation may take several minutes as the agent processes complex multi-step questions. |
|
""" |
|
) |
|
|
|
gr.LoginButton() |
|
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
|
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
run_button.click( |
|
fn=run_and_submit_all, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
if __name__ == "__main__": |
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
|
space_host_startup = os.getenv("SPACE_HOST") |
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
|
if space_host_startup: |
|
print(f"β
SPACE_HOST found: {space_host_startup}") |
|
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
|
else: |
|
print("βΉοΈ SPACE_HOST environment variable not found (running locally?).") |
|
|
|
if space_id_startup: |
|
print(f"β
SPACE_ID found: {space_id_startup}") |
|
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
|
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
|
else: |
|
print("βΉοΈ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
|
print("Launching Gradio Interface for Basic Agent Evaluation...") |
|
demo.launch(debug=True, share=False) |