# main.py - FastAPI application for Pokemon Livestream import asyncio import os import random import time import traceback import logging # --- Additions for last_action route --- import datetime import html from typing import List, Dict, Optional, Set, Callable # Added Callable # --------------------------------------- from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles # --- Imports for poke_env and agents --- from poke_env.player import Player, ActionType, ForfeitAction, MoveOrder, SwitchOrder, DefaultOrder # Import action types from poke_env import AccountConfiguration, ServerConfiguration from poke_env.environment.battle import Battle # Import the actual agent classes from agents import OpenAIAgent, GeminiAgent, MistralAgent # --- Configuration --- CUSTOM_SERVER_URL = "wss://jofthomas.com/showdown/websocket" CUSTOM_ACTION_URL = 'https://play.pokemonshowdown.com/action.php?' CUSTOM_BATTLE_VIEW_URL_TEMPLATE = "https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}" custom_config = ServerConfiguration(CUSTOM_SERVER_URL, CUSTOM_ACTION_URL) DEFAULT_BATTLE_FORMAT = "gen9randombattle" # Define available agents with their corresponding classes AGENT_CONFIGS = { "OpenAIAgent": {"class": OpenAIAgent, "password_env_var": "OPENAI_AGENT_PASSWORD"}, "GeminiAgent": {"class": GeminiAgent, "password_env_var": "GEMINI_AGENT_PASSWORD"}, "MistralAgent": {"class": MistralAgent, "password_env_var": "MISTRAL_AGENT_PASSWORD"}, } # Filter out agents with missing passwords AVAILABLE_AGENT_NAMES = [ name for name, cfg in AGENT_CONFIGS.items() if os.environ.get(cfg.get("password_env_var", "")) ] if not AVAILABLE_AGENT_NAMES: print("FATAL ERROR: No agent configurations have their required password environment variables set. Exiting.") exit(1) # --- Global State Variables --- active_agent_name: Optional[str] = None active_agent_instance: Optional[Player] = None active_agent_task: Optional[asyncio.Task] = None current_battle_instance: Optional[Battle] = None background_task_handle: Optional[asyncio.Task] = None # --- NEW: Global variable for last action --- last_llm_action: Optional[Dict] = None # -------------------------------------------- # --- Create FastAPI app --- app = FastAPI(title="Pokemon Battle Livestream") # --- NEW: Callback function for agents --- def update_last_action_callback(action_info: Dict): """Callback for agents to report their chosen action.""" global last_llm_action # Add a timestamp for context action_info["timestamp_utc"] = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC') last_llm_action = action_info # Optional: Log the update for debugging print(f"ACTION_LOG: Agent '{action_info.get('agent', 'Unknown')}' chose action: {action_info.get('action_str', 'N/A')} (Turn: {action_info.get('turn', '?')})") # --------------------------------------- # --- Helper Functions --- def get_active_battle(agent: Player) -> Optional[Battle]: """Returns the first non-finished battle for an agent.""" if agent and agent._battles: active_battles = [b for b in agent._battles.values() if not b.finished] if active_battles: # Ensure the battle object has a battle_tag before returning if hasattr(active_battles[0], 'battle_tag') and active_battles[0].battle_tag: # Check if the battle_tag has the expected format (starts with 'battle-') if active_battles[0].battle_tag.startswith("battle-"): return active_battles[0] else: # This handles cases where the battle object might exist but tag isn't ready # print(f"DEBUG: Found active battle for {agent.username} but tag '{active_battles[0].battle_tag}' not ready.") return None else: # print(f"DEBUG: Found active battle for {agent.username} but it has no battle_tag attribute yet.") return None return None def create_battle_iframe(battle_id: str) -> str: """Creates JUST the HTML for the battle iframe tag.""" print("Creating iframe content for battle ID: ", battle_id) battle_url = f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}" # Using your custom URL return f""" """ def create_idle_html(status_message: str, instruction: str) -> str: """Creates a visually appealing idle screen HTML fragment.""" return f"""

{status_message}

{instruction}

""" def create_error_html(error_msg: str) -> str: """Creates HTML fragment to display an error message.""" return f"""

🚨 Error 🚨

{error_msg}

""" # --- NEW: Helper function to create HTML for the last action page --- def create_last_action_html(action_data: Optional[Dict]) -> str: """Formats the last action data into an HTML page.""" if not action_data: content = """

No Action Recorded Yet

Waiting for the agent to make its first move...

""" else: # Escape HTML characters in potentially user-generated content raw_output_escaped = html.escape(action_data.get("raw_llm_output", "N/A")) agent_name_escaped = html.escape(action_data.get('agent', 'Unknown')) action_type_escaped = html.escape(action_data.get('action_type', 'N/A')) action_str_escaped = html.escape(action_data.get('action_str', 'N/A')) content = f"""

Last Action by Agent: {agent_name_escaped}

Timestamp: {action_data.get('timestamp_utc', 'N/A')}

Battle Turn: {action_data.get('turn', '?')}

Action Type: {action_type_escaped}

Action Chosen: {action_str_escaped}

Raw LLM Output (if available):

{raw_output_escaped}
""" # Basic HTML structure with auto-refresh and styling return f""" Last Agent Action

Agent Action Log

{content}
""" # -------------------------------------------------------------------- async def update_display_html(new_html_fragment: str) -> None: """Updates the current display HTML fragment and broadcasts to all clients.""" await manager.update_all(new_html_fragment) print("HTML Display FRAGMENT UPDATED and broadcasted.") # --- Agent Lifecycle Management --- async def select_and_activate_new_agent(): """Selects a random available agent, instantiates it, and starts its listening task.""" # --- MODIFIED: Make sure globals are declared --- global active_agent_name, active_agent_instance, active_agent_task, last_llm_action # ----------------------------------------------- # --- MODIFIED: Reset last action when selecting a new agent --- last_llm_action = None # ----------------------------------------------------------- if not AVAILABLE_AGENT_NAMES: print("Lifecycle: No available agents with passwords set.") await update_display_html(create_error_html("No agents available. Check server logs/environment variables.")) return False selected_name = random.choice(AVAILABLE_AGENT_NAMES) config = AGENT_CONFIGS[selected_name] AgentClass = config["class"] password_env_var = config["password_env_var"] agent_password = os.environ.get(password_env_var) print(f"Lifecycle: Activating agent '{selected_name}'...") await update_display_html(create_idle_html("Selecting Next Agent...", f"Preparing {selected_name}...")) try: account_config = AccountConfiguration(selected_name, agent_password) # --- MODIFIED: Pass the action callback to the agent constructor --- agent = AgentClass( account_configuration=account_config, server_configuration=custom_config, battle_format=DEFAULT_BATTLE_FORMAT, log_level=logging.INFO, max_concurrent_battles=1, action_callback=update_last_action_callback # Pass the callback function ) # ------------------------------------------------------------------- task = asyncio.create_task(agent.accept_challenges(None, 1), name=f"AcceptChallenge_{selected_name}") task.add_done_callback(log_task_exception) active_agent_name = selected_name active_agent_instance = agent active_agent_task = task print(f"Lifecycle: Agent '{selected_name}' is active and listening for 1 challenge.") await update_display_html(create_idle_html(f"Agent Ready: {selected_name}", f"Please challenge {selected_name} to a {DEFAULT_BATTLE_FORMAT} battle.")) return True except Exception as e: error_msg = f"Failed to activate agent '{selected_name}': {e}" print(error_msg) traceback.print_exc() await update_display_html(create_error_html(f"Error activating {selected_name}. Please wait or check logs.")) active_agent_name = None active_agent_instance = None active_agent_task = None return False async def check_for_new_battle(): """Checks if the active agent has started a battle with a valid tag.""" global active_agent_instance, current_battle_instance, active_agent_name, active_agent_task if active_agent_instance: battle = get_active_battle(active_agent_instance) if battle and battle.battle_tag: current_battle_instance = battle print(f"Lifecycle: Agent '{active_agent_name}' started battle: {battle.battle_tag}") if active_agent_task and not active_agent_task.done(): print(f"Lifecycle: Cancelling accept_challenges task for {active_agent_name} as battle started.") active_agent_task.cancel() async def deactivate_current_agent(reason: str = "cycle"): """Cleans up the currently active agent and resets state.""" global active_agent_name, active_agent_instance, active_agent_task, current_battle_instance agent_name_to_deactivate = active_agent_name print(f"Lifecycle: Deactivating agent '{agent_name_to_deactivate}' (Reason: {reason})...") if reason == "battle_end": await update_display_html(create_idle_html("Battle Finished!", f"Agent {agent_name_to_deactivate} completed the match.")) elif reason == "cycle": await update_display_html(create_idle_html("Cycling Agents", f"Switching from {agent_name_to_deactivate}...")) elif reason == "forfeited_private_battle": await update_display_html(create_idle_html("Switching Agent", f"Agent {agent_name_to_deactivate} forfeited a private battle.")) else: # Generic reason or error await update_display_html(create_idle_html(f"Resetting Agent ({reason})", f"Cleaning up {agent_name_to_deactivate}...")) await asyncio.sleep(3) await update_display_html(create_idle_html("Preparing Next Agent...", "Please wait...")) agent = active_agent_instance task = active_agent_task active_agent_name = None active_agent_instance = None active_agent_task = None current_battle_instance = None print(f"Lifecycle: Global state cleared for '{agent_name_to_deactivate}'.") if task and not task.done(): print(f"Lifecycle: Ensuring task cancellation for {agent_name_to_deactivate} ({task.get_name()})...") task.cancel() try: await asyncio.wait_for(task, timeout=2.0) print(f"Lifecycle: Task cancellation confirmed for {agent_name_to_deactivate}.") except asyncio.CancelledError: print(f"Lifecycle: Task cancellation confirmation (CancelledError) for {agent_name_to_deactivate}.") except asyncio.TimeoutError: print(f"Lifecycle: Task did not confirm cancellation within timeout for {agent_name_to_deactivate}.") except Exception as e: print(f"Lifecycle: Error during task cancellation wait for {agent_name_to_deactivate}: {e}") if agent: print(f"Lifecycle: Disconnecting player {agent.username}...") try: if hasattr(agent, '_websocket') and agent._websocket and agent._websocket.open: await agent.disconnect() print(f"Lifecycle: Player {agent.username} disconnected successfully.") else: print(f"Lifecycle: Player {agent.username} already disconnected or websocket not available.") except Exception as e: print(f"ERROR during agent disconnect ({agent.username}): {e}") traceback.print_exc() await asyncio.sleep(2) print(f"Lifecycle: Agent '{agent_name_to_deactivate}' deactivation complete.") async def manage_agent_lifecycle(): """Runs the main loop selecting, running, and cleaning up agents sequentially.""" global active_agent_name, active_agent_instance, active_agent_task, current_battle_instance print("Background lifecycle manager started.") REFRESH_INTERVAL_SECONDS = 3 LOOP_COOLDOWN_SECONDS = 1 ERROR_RETRY_DELAY_SECONDS = 10 POST_BATTLE_DELAY_SECONDS = 5 loop_counter = 0 while True: loop_counter += 1 loop_start_time = time.monotonic() print(f"\n--- Lifecycle Check #{loop_counter} [{time.strftime('%H:%M:%S')}] ---") try: if active_agent_instance is None: print(f"[{loop_counter}] State 1: No active agent. Selecting...") activated = await select_and_activate_new_agent() if not activated: print(f"[{loop_counter}] State 1: Activation failed. Waiting {ERROR_RETRY_DELAY_SECONDS}s before retry.") await asyncio.sleep(ERROR_RETRY_DELAY_SECONDS) else: print(f"[{loop_counter}] State 1: Agent '{active_agent_name}' activated successfully.") else: # Agent is active agent_name = active_agent_name print(f"[{loop_counter}] State 2: Agent '{agent_name}' is active.") if current_battle_instance is None: print(f"[{loop_counter}] State 2a: Checking for new battle for '{agent_name}'...") await check_for_new_battle() if current_battle_instance: battle_tag = current_battle_instance.battle_tag print(f"[{loop_counter}] State 2a: *** NEW BATTLE DETECTED: {battle_tag} for '{agent_name}' ***") parts = battle_tag.split('-') is_suffixed_format = len(parts) > 3 and parts[2].isdigit() if is_suffixed_format: print(f"[{loop_counter}] Detected potentially non-public battle format ({battle_tag}). Forfeiting.") try: if active_agent_instance: await active_agent_instance.forfeit(battle_tag) print(f"[{loop_counter}] Sent forfeit command for {battle_tag}.") await asyncio.sleep(1.5) except Exception as forfeit_err: print(f"[{loop_counter}] ERROR sending forfeit for {battle_tag}: {forfeit_err}") await deactivate_current_agent(reason="forfeited_private_battle") continue else: print(f"[{loop_counter}] Public battle format detected. Displaying battle {battle_tag}.") await update_display_html(create_battle_iframe(battle_tag)) else: print(f"[{loop_counter}] State 2a: No new battle found. Agent '{agent_name}' remains idle, waiting for challenge.") idle_html = create_idle_html(f"Agent Ready: {agent_name}", f"Please challenge {agent_name} to a {DEFAULT_BATTLE_FORMAT} battle.") await update_display_html(idle_html) await asyncio.sleep(REFRESH_INTERVAL_SECONDS) if current_battle_instance is not None: # Check again in case a battle just started battle_tag = current_battle_instance.battle_tag print(f"[{loop_counter}] State 2b: Monitoring battle {battle_tag} for '{agent_name}'") if not active_agent_instance: print(f"[{loop_counter}] WARNING: Agent instance for '{agent_name}' disappeared while monitoring battle {battle_tag}! Deactivating.") await deactivate_current_agent(reason="agent_disappeared_mid_battle") continue battle_obj = active_agent_instance._battles.get(battle_tag) if battle_obj and battle_obj.finished: print(f"[{loop_counter}] Battle {battle_tag} is FINISHED. Deactivating agent '{agent_name}'.") await deactivate_current_agent(reason="battle_end") print(f"[{loop_counter}] Waiting {POST_BATTLE_DELAY_SECONDS}s post-battle before selecting next agent.") await asyncio.sleep(POST_BATTLE_DELAY_SECONDS) continue elif not battle_obj: print(f"[{loop_counter}] WARNING: Battle object for {battle_tag} not found in agent's list for '{agent_name}'. Battle might have ended abruptly. Deactivating.") await deactivate_current_agent(reason="battle_object_missing") continue else: print(f"[{loop_counter}] Battle {battle_tag} ongoing for '{agent_name}'.") await asyncio.sleep(REFRESH_INTERVAL_SECONDS) except asyncio.CancelledError: print("Lifecycle manager task cancelled.") raise except Exception as e: print(f"!!! ERROR in main lifecycle loop #{loop_counter}: {e} !!!") traceback.print_exc() current_agent_name_err = active_agent_name # Use different var name to avoid conflict if active_agent_instance: print(f"Attempting to deactivate agent '{current_agent_name_err}' due to loop error...") try: await deactivate_current_agent(reason="main_loop_error") except Exception as deactivation_err: print(f"Error during error-handling deactivation: {deactivation_err}") active_agent_name = None active_agent_instance = None active_agent_task = None current_battle_instance = None else: print("No active agent instance during loop error.") await update_display_html(create_error_html(f"A server error occurred in the lifecycle manager. Please wait. ({e})")) print(f"Waiting {ERROR_RETRY_DELAY_SECONDS}s after loop error.") await asyncio.sleep(ERROR_RETRY_DELAY_SECONDS) continue elapsed_time = time.monotonic() - loop_start_time if elapsed_time < LOOP_COOLDOWN_SECONDS: await asyncio.sleep(LOOP_COOLDOWN_SECONDS - elapsed_time) def log_task_exception(task: asyncio.Task): """Callback to log exceptions from background tasks (like accept_challenges).""" try: if task.cancelled(): print(f"Task '{task.get_name()}' was cancelled.") return task.result() print(f"Task '{task.get_name()}' completed successfully.") except asyncio.CancelledError: print(f"Task '{task.get_name()}' confirmed cancelled (exception caught).") pass except Exception as e: print(f"!!! Exception in background task '{task.get_name()}': {e} !!!") traceback.print_exc() # --- WebSocket connection manager --- class ConnectionManager: def __init__(self): self.active_connections: Set[WebSocket] = set() self.current_html_fragment: str = create_idle_html("Initializing...", "Setting up PokΓ©mon Battle Stream") async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.add(websocket) print(f"Client connected. Sending current state. Total clients: {len(self.active_connections)}") try: await websocket.send_text(self.current_html_fragment) except Exception as e: print(f"Error sending initial state to new client: {e}") await self.disconnect(websocket) async def disconnect(self, websocket: WebSocket): self.active_connections.discard(websocket) print(f"Client disconnected. Total clients: {len(self.active_connections)}") async def update_all(self, html_fragment: str): """Update the current HTML fragment and broadcast to all clients.""" if self.current_html_fragment == html_fragment: return self.current_html_fragment = html_fragment if not self.active_connections: return print(f"Broadcasting update to {len(self.active_connections)} clients...") send_tasks = [ connection.send_text(html_fragment) for connection in list(self.active_connections) ] results = await asyncio.gather(*send_tasks, return_exceptions=True) connections_to_remove = set() # Need to iterate carefully if connections can change during gather conn_list = list(self.active_connections) for i, result in enumerate(results): # Ensure index is valid if connections changed mid-gather if i < len(conn_list): connection = conn_list[i] if isinstance(result, Exception): print(f"Error sending update to client: {result}. Marking for removal.") connections_to_remove.add(connection) for connection in connections_to_remove: # Check if connection still exists before disconnecting if connection in self.active_connections: await self.disconnect(connection) manager = ConnectionManager() # --- API Routes --- @app.get("/", response_class=HTMLResponse) async def get_homepage(): """Serves the main HTML page with WebSocket connection and improved styling.""" # ... (HTML remains the same as before) return """ Pokemon Battle Livestream
""" @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) try: while True: data = await websocket.receive_text() print(f"Received unexpected message from client: {data}") except WebSocketDisconnect as e: print(f"WebSocket disconnected: Code {e.code}, Reason: {getattr(e, 'reason', 'N/A')}") await manager.disconnect(websocket) except Exception as e: print(f"WebSocket error: {e}") traceback.print_exc() await manager.disconnect(websocket) # --- NEW: Route to display the last action --- @app.get("/last_action", response_class=HTMLResponse) async def get_last_action(): """Serves an HTML page displaying the last recorded agent action.""" global last_llm_action # Return the formatted HTML page using the current state return create_last_action_html(last_llm_action) # -------------------------------------------- # --- Lifecyle Events --- @app.on_event("startup") async def startup_event(): """Start background tasks when the application starts.""" global background_task_handle static_dir = "static" if not os.path.exists(static_dir): os.makedirs(static_dir) print(f"Created static directory at: {os.path.abspath(static_dir)}") print("!!! Please add 'pokemon_huggingface.png' to this directory! !!!") app.mount("/static", StaticFiles(directory=static_dir), name="static") print(f"Mounted static directory '{static_dir}' at '/static'") print("πŸš€ Starting background tasks") background_task_handle = asyncio.create_task(manage_agent_lifecycle(), name="LifecycleManager") background_task_handle.add_done_callback(log_task_exception) print("βœ… Background tasks started") @app.on_event("shutdown") async def shutdown_event(): """Clean up tasks when shutting down.""" global background_task_handle, active_agent_instance print("\nπŸ”Œ Shutting down application. Cleaning up...") if background_task_handle and not background_task_handle.done(): print("Cancelling background task...") background_task_handle.cancel() try: await asyncio.wait_for(background_task_handle, timeout=5.0) print("Background task cancelled successfully.") except asyncio.CancelledError: print("Background task cancellation confirmed (CancelledError).") except asyncio.TimeoutError: print("Background task did not finish cancelling within timeout.") except Exception as e: print(f"Error during background task cancellation: {e}") agent_to_disconnect = active_agent_instance if agent_to_disconnect: agent_name = agent_to_disconnect.username if hasattr(agent_to_disconnect, 'username') else 'Unknown Agent' print(f"Disconnecting active agent '{agent_name}'...") try: if hasattr(agent_to_disconnect, '_websocket') and agent_to_disconnect._websocket and agent_to_disconnect._websocket.open: await agent_to_disconnect.disconnect() print(f"Agent '{agent_name}' disconnected.") else: print(f"Agent '{agent_name}' already disconnected or websocket not available.") except Exception as e: print(f"Error during agent disconnect on shutdown for '{agent_name}': {e}") print(f"Closing {len(manager.active_connections)} client WebSocket connections...") close_tasks = [ conn.close(code=1000, reason="Server shutting down") for conn in list(manager.active_connections) ] if close_tasks: await asyncio.gather(*close_tasks, return_exceptions=True) print("βœ… Cleanup complete. Application shutdown.") # --- Main execution --- if __name__ == "__main__": import uvicorn logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logging.getLogger('poke_env').setLevel(logging.WARNING) logging.getLogger('websockets.client').setLevel(logging.INFO) print("Starting Pokemon Battle Livestream Server...") print("="*60) if not AVAILABLE_AGENT_NAMES: print("β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ FATAL ERROR β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ") print(" No agents found with configured passwords!") print(" Please set the required environment variables:") for name, cfg in AGENT_CONFIGS.items(): print(f" - {cfg.get('password_env_var', 'N/A')} (for agent: {name})") print("="*60) exit("Exiting due to missing agent passwords.") else: print("✨ Available Agents Found:") for name in AVAILABLE_AGENT_NAMES: print(f" - {name}") print("="*60) print(f"Server will run on http://0.0.0.0:7860") print("Access the action log at http://0.0.0.0:7860/last_action") # Added info print("="*60) uvicorn.run( "main:app", host="0.0.0.0", port=7860, reload=False, log_level="info" )