# main.py - FastAPI application for Pokemon Livestream
import asyncio
import os
import random
import time
import traceback
import logging
# --- Additions for last_action route ---
import datetime
import html
from typing import List, Dict, Optional, Set, Callable # Added Callable
# ---------------------------------------
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
# --- Imports for poke_env and agents ---
from poke_env.player import Player, ActionType, ForfeitAction, MoveOrder, SwitchOrder, DefaultOrder # Import action types
from poke_env import AccountConfiguration, ServerConfiguration
from poke_env.environment.battle import Battle
# Import the actual agent classes
from agents import OpenAIAgent, GeminiAgent, MistralAgent
# --- Configuration ---
CUSTOM_SERVER_URL = "wss://jofthomas.com/showdown/websocket"
CUSTOM_ACTION_URL = 'https://play.pokemonshowdown.com/action.php?'
CUSTOM_BATTLE_VIEW_URL_TEMPLATE = "https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}"
custom_config = ServerConfiguration(CUSTOM_SERVER_URL, CUSTOM_ACTION_URL)
DEFAULT_BATTLE_FORMAT = "gen9randombattle"
# Define available agents with their corresponding classes
AGENT_CONFIGS = {
"OpenAIAgent": {"class": OpenAIAgent, "password_env_var": "OPENAI_AGENT_PASSWORD"},
"GeminiAgent": {"class": GeminiAgent, "password_env_var": "GEMINI_AGENT_PASSWORD"},
"MistralAgent": {"class": MistralAgent, "password_env_var": "MISTRAL_AGENT_PASSWORD"},
}
# Filter out agents with missing passwords
AVAILABLE_AGENT_NAMES = [
name for name, cfg in AGENT_CONFIGS.items()
if os.environ.get(cfg.get("password_env_var", ""))
]
if not AVAILABLE_AGENT_NAMES:
print("FATAL ERROR: No agent configurations have their required password environment variables set. Exiting.")
exit(1)
# --- Global State Variables ---
active_agent_name: Optional[str] = None
active_agent_instance: Optional[Player] = None
active_agent_task: Optional[asyncio.Task] = None
current_battle_instance: Optional[Battle] = None
background_task_handle: Optional[asyncio.Task] = None
# --- NEW: Global variable for last action ---
last_llm_action: Optional[Dict] = None
# --------------------------------------------
# --- Create FastAPI app ---
app = FastAPI(title="Pokemon Battle Livestream")
# --- NEW: Callback function for agents ---
def update_last_action_callback(action_info: Dict):
"""Callback for agents to report their chosen action."""
global last_llm_action
# Add a timestamp for context
action_info["timestamp_utc"] = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
last_llm_action = action_info
# Optional: Log the update for debugging
print(f"ACTION_LOG: Agent '{action_info.get('agent', 'Unknown')}' chose action: {action_info.get('action_str', 'N/A')} (Turn: {action_info.get('turn', '?')})")
# ---------------------------------------
# --- Helper Functions ---
def get_active_battle(agent: Player) -> Optional[Battle]:
"""Returns the first non-finished battle for an agent."""
if agent and agent._battles:
active_battles = [b for b in agent._battles.values() if not b.finished]
if active_battles:
# Ensure the battle object has a battle_tag before returning
if hasattr(active_battles[0], 'battle_tag') and active_battles[0].battle_tag:
# Check if the battle_tag has the expected format (starts with 'battle-')
if active_battles[0].battle_tag.startswith("battle-"):
return active_battles[0]
else:
# This handles cases where the battle object might exist but tag isn't ready
# print(f"DEBUG: Found active battle for {agent.username} but tag '{active_battles[0].battle_tag}' not ready.")
return None
else:
# print(f"DEBUG: Found active battle for {agent.username} but it has no battle_tag attribute yet.")
return None
return None
def create_battle_iframe(battle_id: str) -> str:
"""Creates JUST the HTML for the battle iframe tag."""
print("Creating iframe content for battle ID: ", battle_id)
battle_url = f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}" # Using your custom URL
return f"""
"""
def create_idle_html(status_message: str, instruction: str) -> str:
"""Creates a visually appealing idle screen HTML fragment."""
return f"""
{status_message}
{instruction}
"""
def create_error_html(error_msg: str) -> str:
"""Creates HTML fragment to display an error message."""
return f"""
π¨ Error π¨
{error_msg}
"""
# --- NEW: Helper function to create HTML for the last action page ---
def create_last_action_html(action_data: Optional[Dict]) -> str:
"""Formats the last action data into an HTML page."""
if not action_data:
content = """
No Action Recorded Yet
Waiting for the agent to make its first move...
"""
else:
# Escape HTML characters in potentially user-generated content
raw_output_escaped = html.escape(action_data.get("raw_llm_output", "N/A"))
agent_name_escaped = html.escape(action_data.get('agent', 'Unknown'))
action_type_escaped = html.escape(action_data.get('action_type', 'N/A'))
action_str_escaped = html.escape(action_data.get('action_str', 'N/A'))
content = f"""
Last Action by Agent: {agent_name_escaped}
Timestamp: {action_data.get('timestamp_utc', 'N/A')}
Battle Turn: {action_data.get('turn', '?')}
Action Type: {action_type_escaped}
Action Chosen: {action_str_escaped}
Raw LLM Output (if available):
{raw_output_escaped}
"""
# Basic HTML structure with auto-refresh and styling
return f"""
Last Agent Action
Agent Action Log
{content}
"""
# --------------------------------------------------------------------
async def update_display_html(new_html_fragment: str) -> None:
"""Updates the current display HTML fragment and broadcasts to all clients."""
await manager.update_all(new_html_fragment)
print("HTML Display FRAGMENT UPDATED and broadcasted.")
# --- Agent Lifecycle Management ---
async def select_and_activate_new_agent():
"""Selects a random available agent, instantiates it, and starts its listening task."""
# --- MODIFIED: Make sure globals are declared ---
global active_agent_name, active_agent_instance, active_agent_task, last_llm_action
# -----------------------------------------------
# --- MODIFIED: Reset last action when selecting a new agent ---
last_llm_action = None
# -----------------------------------------------------------
if not AVAILABLE_AGENT_NAMES:
print("Lifecycle: No available agents with passwords set.")
await update_display_html(create_error_html("No agents available. Check server logs/environment variables."))
return False
selected_name = random.choice(AVAILABLE_AGENT_NAMES)
config = AGENT_CONFIGS[selected_name]
AgentClass = config["class"]
password_env_var = config["password_env_var"]
agent_password = os.environ.get(password_env_var)
print(f"Lifecycle: Activating agent '{selected_name}'...")
await update_display_html(create_idle_html("Selecting Next Agent...", f"Preparing {selected_name} ..."))
try:
account_config = AccountConfiguration(selected_name, agent_password)
# --- MODIFIED: Pass the action callback to the agent constructor ---
agent = AgentClass(
account_configuration=account_config,
server_configuration=custom_config,
battle_format=DEFAULT_BATTLE_FORMAT,
log_level=logging.INFO,
max_concurrent_battles=1,
action_callback=update_last_action_callback # Pass the callback function
)
# -------------------------------------------------------------------
task = asyncio.create_task(agent.accept_challenges(None, 1), name=f"AcceptChallenge_{selected_name}")
task.add_done_callback(log_task_exception)
active_agent_name = selected_name
active_agent_instance = agent
active_agent_task = task
print(f"Lifecycle: Agent '{selected_name}' is active and listening for 1 challenge.")
await update_display_html(create_idle_html(f"Agent Ready: {selected_name} ",
f"Please challenge {selected_name} to a {DEFAULT_BATTLE_FORMAT} battle."))
return True
except Exception as e:
error_msg = f"Failed to activate agent '{selected_name}': {e}"
print(error_msg)
traceback.print_exc()
await update_display_html(create_error_html(f"Error activating {selected_name}. Please wait or check logs."))
active_agent_name = None
active_agent_instance = None
active_agent_task = None
return False
async def check_for_new_battle():
"""Checks if the active agent has started a battle with a valid tag."""
global active_agent_instance, current_battle_instance, active_agent_name, active_agent_task
if active_agent_instance:
battle = get_active_battle(active_agent_instance)
if battle and battle.battle_tag:
current_battle_instance = battle
print(f"Lifecycle: Agent '{active_agent_name}' started battle: {battle.battle_tag}")
if active_agent_task and not active_agent_task.done():
print(f"Lifecycle: Cancelling accept_challenges task for {active_agent_name} as battle started.")
active_agent_task.cancel()
async def deactivate_current_agent(reason: str = "cycle"):
"""Cleans up the currently active agent and resets state."""
global active_agent_name, active_agent_instance, active_agent_task, current_battle_instance
agent_name_to_deactivate = active_agent_name
print(f"Lifecycle: Deactivating agent '{agent_name_to_deactivate}' (Reason: {reason})...")
if reason == "battle_end":
await update_display_html(create_idle_html("Battle Finished!", f"Agent {agent_name_to_deactivate} completed the match."))
elif reason == "cycle":
await update_display_html(create_idle_html("Cycling Agents", f"Switching from {agent_name_to_deactivate} ..."))
elif reason == "forfeited_private_battle":
await update_display_html(create_idle_html("Switching Agent", f"Agent {agent_name_to_deactivate} forfeited a private battle."))
else: # Generic reason or error
await update_display_html(create_idle_html(f"Resetting Agent ({reason})", f"Cleaning up {agent_name_to_deactivate} ..."))
await asyncio.sleep(3)
await update_display_html(create_idle_html("Preparing Next Agent...", "Please wait..."))
agent = active_agent_instance
task = active_agent_task
active_agent_name = None
active_agent_instance = None
active_agent_task = None
current_battle_instance = None
print(f"Lifecycle: Global state cleared for '{agent_name_to_deactivate}'.")
if task and not task.done():
print(f"Lifecycle: Ensuring task cancellation for {agent_name_to_deactivate} ({task.get_name()})...")
task.cancel()
try:
await asyncio.wait_for(task, timeout=2.0)
print(f"Lifecycle: Task cancellation confirmed for {agent_name_to_deactivate}.")
except asyncio.CancelledError:
print(f"Lifecycle: Task cancellation confirmation (CancelledError) for {agent_name_to_deactivate}.")
except asyncio.TimeoutError:
print(f"Lifecycle: Task did not confirm cancellation within timeout for {agent_name_to_deactivate}.")
except Exception as e:
print(f"Lifecycle: Error during task cancellation wait for {agent_name_to_deactivate}: {e}")
if agent:
print(f"Lifecycle: Disconnecting player {agent.username}...")
try:
if hasattr(agent, '_websocket') and agent._websocket and agent._websocket.open:
await agent.disconnect()
print(f"Lifecycle: Player {agent.username} disconnected successfully.")
else:
print(f"Lifecycle: Player {agent.username} already disconnected or websocket not available.")
except Exception as e:
print(f"ERROR during agent disconnect ({agent.username}): {e}")
traceback.print_exc()
await asyncio.sleep(2)
print(f"Lifecycle: Agent '{agent_name_to_deactivate}' deactivation complete.")
async def manage_agent_lifecycle():
"""Runs the main loop selecting, running, and cleaning up agents sequentially."""
global active_agent_name, active_agent_instance, active_agent_task, current_battle_instance
print("Background lifecycle manager started.")
REFRESH_INTERVAL_SECONDS = 3
LOOP_COOLDOWN_SECONDS = 1
ERROR_RETRY_DELAY_SECONDS = 10
POST_BATTLE_DELAY_SECONDS = 5
loop_counter = 0
while True:
loop_counter += 1
loop_start_time = time.monotonic()
print(f"\n--- Lifecycle Check #{loop_counter} [{time.strftime('%H:%M:%S')}] ---")
try:
if active_agent_instance is None:
print(f"[{loop_counter}] State 1: No active agent. Selecting...")
activated = await select_and_activate_new_agent()
if not activated:
print(f"[{loop_counter}] State 1: Activation failed. Waiting {ERROR_RETRY_DELAY_SECONDS}s before retry.")
await asyncio.sleep(ERROR_RETRY_DELAY_SECONDS)
else:
print(f"[{loop_counter}] State 1: Agent '{active_agent_name}' activated successfully.")
else: # Agent is active
agent_name = active_agent_name
print(f"[{loop_counter}] State 2: Agent '{agent_name}' is active.")
if current_battle_instance is None:
print(f"[{loop_counter}] State 2a: Checking for new battle for '{agent_name}'...")
await check_for_new_battle()
if current_battle_instance:
battle_tag = current_battle_instance.battle_tag
print(f"[{loop_counter}] State 2a: *** NEW BATTLE DETECTED: {battle_tag} for '{agent_name}' ***")
parts = battle_tag.split('-')
is_suffixed_format = len(parts) > 3 and parts[2].isdigit()
if is_suffixed_format:
print(f"[{loop_counter}] Detected potentially non-public battle format ({battle_tag}). Forfeiting.")
try:
if active_agent_instance:
await active_agent_instance.forfeit(battle_tag)
print(f"[{loop_counter}] Sent forfeit command for {battle_tag}.")
await asyncio.sleep(1.5)
except Exception as forfeit_err:
print(f"[{loop_counter}] ERROR sending forfeit for {battle_tag}: {forfeit_err}")
await deactivate_current_agent(reason="forfeited_private_battle")
continue
else:
print(f"[{loop_counter}] Public battle format detected. Displaying battle {battle_tag}.")
await update_display_html(create_battle_iframe(battle_tag))
else:
print(f"[{loop_counter}] State 2a: No new battle found. Agent '{agent_name}' remains idle, waiting for challenge.")
idle_html = create_idle_html(f"Agent Ready: {agent_name} ",
f"Please challenge {agent_name} to a {DEFAULT_BATTLE_FORMAT} battle.")
await update_display_html(idle_html)
await asyncio.sleep(REFRESH_INTERVAL_SECONDS)
if current_battle_instance is not None: # Check again in case a battle just started
battle_tag = current_battle_instance.battle_tag
print(f"[{loop_counter}] State 2b: Monitoring battle {battle_tag} for '{agent_name}'")
if not active_agent_instance:
print(f"[{loop_counter}] WARNING: Agent instance for '{agent_name}' disappeared while monitoring battle {battle_tag}! Deactivating.")
await deactivate_current_agent(reason="agent_disappeared_mid_battle")
continue
battle_obj = active_agent_instance._battles.get(battle_tag)
if battle_obj and battle_obj.finished:
print(f"[{loop_counter}] Battle {battle_tag} is FINISHED. Deactivating agent '{agent_name}'.")
await deactivate_current_agent(reason="battle_end")
print(f"[{loop_counter}] Waiting {POST_BATTLE_DELAY_SECONDS}s post-battle before selecting next agent.")
await asyncio.sleep(POST_BATTLE_DELAY_SECONDS)
continue
elif not battle_obj:
print(f"[{loop_counter}] WARNING: Battle object for {battle_tag} not found in agent's list for '{agent_name}'. Battle might have ended abruptly. Deactivating.")
await deactivate_current_agent(reason="battle_object_missing")
continue
else:
print(f"[{loop_counter}] Battle {battle_tag} ongoing for '{agent_name}'.")
await asyncio.sleep(REFRESH_INTERVAL_SECONDS)
except asyncio.CancelledError:
print("Lifecycle manager task cancelled.")
raise
except Exception as e:
print(f"!!! ERROR in main lifecycle loop #{loop_counter}: {e} !!!")
traceback.print_exc()
current_agent_name_err = active_agent_name # Use different var name to avoid conflict
if active_agent_instance:
print(f"Attempting to deactivate agent '{current_agent_name_err}' due to loop error...")
try:
await deactivate_current_agent(reason="main_loop_error")
except Exception as deactivation_err:
print(f"Error during error-handling deactivation: {deactivation_err}")
active_agent_name = None
active_agent_instance = None
active_agent_task = None
current_battle_instance = None
else:
print("No active agent instance during loop error.")
await update_display_html(create_error_html(f"A server error occurred in the lifecycle manager. Please wait. ({e})"))
print(f"Waiting {ERROR_RETRY_DELAY_SECONDS}s after loop error.")
await asyncio.sleep(ERROR_RETRY_DELAY_SECONDS)
continue
elapsed_time = time.monotonic() - loop_start_time
if elapsed_time < LOOP_COOLDOWN_SECONDS:
await asyncio.sleep(LOOP_COOLDOWN_SECONDS - elapsed_time)
def log_task_exception(task: asyncio.Task):
"""Callback to log exceptions from background tasks (like accept_challenges)."""
try:
if task.cancelled():
print(f"Task '{task.get_name()}' was cancelled.")
return
task.result()
print(f"Task '{task.get_name()}' completed successfully.")
except asyncio.CancelledError:
print(f"Task '{task.get_name()}' confirmed cancelled (exception caught).")
pass
except Exception as e:
print(f"!!! Exception in background task '{task.get_name()}': {e} !!!")
traceback.print_exc()
# --- WebSocket connection manager ---
class ConnectionManager:
def __init__(self):
self.active_connections: Set[WebSocket] = set()
self.current_html_fragment: str = create_idle_html("Initializing...", "Setting up PokΓ©mon Battle Stream")
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.add(websocket)
print(f"Client connected. Sending current state. Total clients: {len(self.active_connections)}")
try:
await websocket.send_text(self.current_html_fragment)
except Exception as e:
print(f"Error sending initial state to new client: {e}")
await self.disconnect(websocket)
async def disconnect(self, websocket: WebSocket):
self.active_connections.discard(websocket)
print(f"Client disconnected. Total clients: {len(self.active_connections)}")
async def update_all(self, html_fragment: str):
"""Update the current HTML fragment and broadcast to all clients."""
if self.current_html_fragment == html_fragment:
return
self.current_html_fragment = html_fragment
if not self.active_connections:
return
print(f"Broadcasting update to {len(self.active_connections)} clients...")
send_tasks = [
connection.send_text(html_fragment)
for connection in list(self.active_connections)
]
results = await asyncio.gather(*send_tasks, return_exceptions=True)
connections_to_remove = set()
# Need to iterate carefully if connections can change during gather
conn_list = list(self.active_connections)
for i, result in enumerate(results):
# Ensure index is valid if connections changed mid-gather
if i < len(conn_list):
connection = conn_list[i]
if isinstance(result, Exception):
print(f"Error sending update to client: {result}. Marking for removal.")
connections_to_remove.add(connection)
for connection in connections_to_remove:
# Check if connection still exists before disconnecting
if connection in self.active_connections:
await self.disconnect(connection)
manager = ConnectionManager()
# --- API Routes ---
@app.get("/", response_class=HTMLResponse)
async def get_homepage():
"""Serves the main HTML page with WebSocket connection and improved styling."""
# ... (HTML remains the same as before)
return """
Pokemon Battle Livestream
"""
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(f"Received unexpected message from client: {data}")
except WebSocketDisconnect as e:
print(f"WebSocket disconnected: Code {e.code}, Reason: {getattr(e, 'reason', 'N/A')}")
await manager.disconnect(websocket)
except Exception as e:
print(f"WebSocket error: {e}")
traceback.print_exc()
await manager.disconnect(websocket)
# --- NEW: Route to display the last action ---
@app.get("/last_action", response_class=HTMLResponse)
async def get_last_action():
"""Serves an HTML page displaying the last recorded agent action."""
global last_llm_action
# Return the formatted HTML page using the current state
return create_last_action_html(last_llm_action)
# --------------------------------------------
# --- Lifecyle Events ---
@app.on_event("startup")
async def startup_event():
"""Start background tasks when the application starts."""
global background_task_handle
static_dir = "static"
if not os.path.exists(static_dir):
os.makedirs(static_dir)
print(f"Created static directory at: {os.path.abspath(static_dir)}")
print("!!! Please add 'pokemon_huggingface.png' to this directory! !!!")
app.mount("/static", StaticFiles(directory=static_dir), name="static")
print(f"Mounted static directory '{static_dir}' at '/static'")
print("π Starting background tasks")
background_task_handle = asyncio.create_task(manage_agent_lifecycle(), name="LifecycleManager")
background_task_handle.add_done_callback(log_task_exception)
print("β
Background tasks started")
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up tasks when shutting down."""
global background_task_handle, active_agent_instance
print("\nπ Shutting down application. Cleaning up...")
if background_task_handle and not background_task_handle.done():
print("Cancelling background task...")
background_task_handle.cancel()
try:
await asyncio.wait_for(background_task_handle, timeout=5.0)
print("Background task cancelled successfully.")
except asyncio.CancelledError:
print("Background task cancellation confirmed (CancelledError).")
except asyncio.TimeoutError:
print("Background task did not finish cancelling within timeout.")
except Exception as e:
print(f"Error during background task cancellation: {e}")
agent_to_disconnect = active_agent_instance
if agent_to_disconnect:
agent_name = agent_to_disconnect.username if hasattr(agent_to_disconnect, 'username') else 'Unknown Agent'
print(f"Disconnecting active agent '{agent_name}'...")
try:
if hasattr(agent_to_disconnect, '_websocket') and agent_to_disconnect._websocket and agent_to_disconnect._websocket.open:
await agent_to_disconnect.disconnect()
print(f"Agent '{agent_name}' disconnected.")
else:
print(f"Agent '{agent_name}' already disconnected or websocket not available.")
except Exception as e:
print(f"Error during agent disconnect on shutdown for '{agent_name}': {e}")
print(f"Closing {len(manager.active_connections)} client WebSocket connections...")
close_tasks = [
conn.close(code=1000, reason="Server shutting down")
for conn in list(manager.active_connections)
]
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
print("β
Cleanup complete. Application shutdown.")
# --- Main execution ---
if __name__ == "__main__":
import uvicorn
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.getLogger('poke_env').setLevel(logging.WARNING)
logging.getLogger('websockets.client').setLevel(logging.INFO)
print("Starting Pokemon Battle Livestream Server...")
print("="*60)
if not AVAILABLE_AGENT_NAMES:
print("βββββββββββββββββββββ FATAL ERROR βββββββββββββββββββββ")
print(" No agents found with configured passwords!")
print(" Please set the required environment variables:")
for name, cfg in AGENT_CONFIGS.items():
print(f" - {cfg.get('password_env_var', 'N/A')} (for agent: {name})")
print("="*60)
exit("Exiting due to missing agent passwords.")
else:
print("β¨ Available Agents Found:")
for name in AVAILABLE_AGENT_NAMES:
print(f" - {name}")
print("="*60)
print(f"Server will run on http://0.0.0.0:7860")
print("Access the action log at http://0.0.0.0:7860/last_action") # Added info
print("="*60)
uvicorn.run(
"main:app",
host="0.0.0.0",
port=7860,
reload=False,
log_level="info"
)