twitch_streaming / main.py
Jofthomas's picture
Update main.py
cb75854 verified
raw
history blame
39.7 kB
# main.py - FastAPI application for Pokemon Livestream
import asyncio
import os
import random
import time
import traceback
import logging
# --- Additions for last_action route ---
import datetime
import html
from typing import List, Dict, Optional, Set, Callable # Added Callable
# ---------------------------------------
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
# --- Imports for poke_env and agents ---
from poke_env.player import Player, ActionType, ForfeitAction, MoveOrder, SwitchOrder, DefaultOrder # Import action types
from poke_env import AccountConfiguration, ServerConfiguration
from poke_env.environment.battle import Battle
# Import the actual agent classes
from agents import OpenAIAgent, GeminiAgent, MistralAgent
# --- Configuration ---
CUSTOM_SERVER_URL = "wss://jofthomas.com/showdown/websocket"
CUSTOM_ACTION_URL = 'https://play.pokemonshowdown.com/action.php?'
CUSTOM_BATTLE_VIEW_URL_TEMPLATE = "https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}"
custom_config = ServerConfiguration(CUSTOM_SERVER_URL, CUSTOM_ACTION_URL)
DEFAULT_BATTLE_FORMAT = "gen9randombattle"
# Define available agents with their corresponding classes
AGENT_CONFIGS = {
"OpenAIAgent": {"class": OpenAIAgent, "password_env_var": "OPENAI_AGENT_PASSWORD"},
"GeminiAgent": {"class": GeminiAgent, "password_env_var": "GEMINI_AGENT_PASSWORD"},
"MistralAgent": {"class": MistralAgent, "password_env_var": "MISTRAL_AGENT_PASSWORD"},
}
# Filter out agents with missing passwords
AVAILABLE_AGENT_NAMES = [
name for name, cfg in AGENT_CONFIGS.items()
if os.environ.get(cfg.get("password_env_var", ""))
]
if not AVAILABLE_AGENT_NAMES:
print("FATAL ERROR: No agent configurations have their required password environment variables set. Exiting.")
exit(1)
# --- Global State Variables ---
active_agent_name: Optional[str] = None
active_agent_instance: Optional[Player] = None
active_agent_task: Optional[asyncio.Task] = None
current_battle_instance: Optional[Battle] = None
background_task_handle: Optional[asyncio.Task] = None
# --- NEW: Global variable for last action ---
last_llm_action: Optional[Dict] = None
# --------------------------------------------
# --- Create FastAPI app ---
app = FastAPI(title="Pokemon Battle Livestream")
# --- NEW: Callback function for agents ---
def update_last_action_callback(action_info: Dict):
"""Callback for agents to report their chosen action."""
global last_llm_action
# Add a timestamp for context
action_info["timestamp_utc"] = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
last_llm_action = action_info
# Optional: Log the update for debugging
print(f"ACTION_LOG: Agent '{action_info.get('agent', 'Unknown')}' chose action: {action_info.get('action_str', 'N/A')} (Turn: {action_info.get('turn', '?')})")
# ---------------------------------------
# --- Helper Functions ---
def get_active_battle(agent: Player) -> Optional[Battle]:
"""Returns the first non-finished battle for an agent."""
if agent and agent._battles:
active_battles = [b for b in agent._battles.values() if not b.finished]
if active_battles:
# Ensure the battle object has a battle_tag before returning
if hasattr(active_battles[0], 'battle_tag') and active_battles[0].battle_tag:
# Check if the battle_tag has the expected format (starts with 'battle-')
if active_battles[0].battle_tag.startswith("battle-"):
return active_battles[0]
else:
# This handles cases where the battle object might exist but tag isn't ready
# print(f"DEBUG: Found active battle for {agent.username} but tag '{active_battles[0].battle_tag}' not ready.")
return None
else:
# print(f"DEBUG: Found active battle for {agent.username} but it has no battle_tag attribute yet.")
return None
return None
def create_battle_iframe(battle_id: str) -> str:
"""Creates JUST the HTML for the battle iframe tag."""
print("Creating iframe content for battle ID: ", battle_id)
battle_url = f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#{battle_id}" # Using your custom URL
return f"""
<iframe
id="battle-iframe"
class="battle-iframe"
src="{battle_url}"
allowfullscreen
></iframe>
"""
def create_idle_html(status_message: str, instruction: str) -> str:
"""Creates a visually appealing idle screen HTML fragment."""
return f"""
<div class="content-container idle-container">
<div class="message-box">
<p class="status">{status_message}</p>
<p class="instruction">{instruction}</p>
</div>
</div>
"""
def create_error_html(error_msg: str) -> str:
"""Creates HTML fragment to display an error message."""
return f"""
<div class="content-container error-container">
<div class="message-box">
<p class="status">🚨 Error 🚨</p>
<p class="instruction">{error_msg}</p>
</div>
</div>
"""
# --- NEW: Helper function to create HTML for the last action page ---
def create_last_action_html(action_data: Optional[Dict]) -> str:
"""Formats the last action data into an HTML page."""
if not action_data:
content = """
<div class="message-box">
<p class="status">No Action Recorded Yet</p>
<p class="instruction">Waiting for the agent to make its first move...</p>
</div>
"""
else:
# Escape HTML characters in potentially user-generated content
raw_output_escaped = html.escape(action_data.get("raw_llm_output", "N/A"))
agent_name_escaped = html.escape(action_data.get('agent', 'Unknown'))
action_type_escaped = html.escape(action_data.get('action_type', 'N/A'))
action_str_escaped = html.escape(action_data.get('action_str', 'N/A'))
content = f"""
<div class="action-details">
<h2>Last Action by Agent: <span class="agent-name">{agent_name_escaped}</span></h2>
<p><strong>Timestamp:</strong> {action_data.get('timestamp_utc', 'N/A')}</p>
<p><strong>Battle Turn:</strong> {action_data.get('turn', '?')}</p>
<p><strong>Action Type:</strong> {action_type_escaped}</p>
<p><strong>Action Chosen:</strong> <span class="action-chosen">{action_str_escaped}</span></p>
<div class="raw-output">
<h3>Raw LLM Output (if available):</h3>
<pre>{raw_output_escaped}</pre>
</div>
</div>
"""
# Basic HTML structure with auto-refresh and styling
return f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="refresh" content="5">
<title>Last Agent Action</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Poppins:wght@400;700&family=Press+Start+2P&display=swap" rel="stylesheet">
<style>
body {{
font-family: 'Poppins', sans-serif;
background-color: #2c2f33;
color: #ffffff;
margin: 0;
padding: 20px;
line-height: 1.6;
}}
.container {{
max-width: 900px;
margin: 20px auto;
background-color: #3e4147;
padding: 25px;
border-radius: 10px;
box-shadow: 0 4px 15px rgba(0, 0, 0, 0.3);
}}
h1, h2 {{
font-family: 'Press Start 2P', cursive;
color: #ffcb05; /* Pokemon Yellow */
text-shadow: 2px 2px 0px #3b4cca; /* Pokemon Blue shadow */
margin-bottom: 20px;
text-align: center;
}}
.action-details p {{
margin: 10px 0;
font-size: 1.1em;
}}
.action-details strong {{
color: #ffcb05; /* Pokemon Yellow */
}}
.agent-name {{
color: #ff7f0f; /* Orange */
font-weight: bold;
}}
.action-chosen {{
font-weight: bold;
color: #76d7c4; /* Teal */
font-size: 1.2em;
}}
.raw-output {{
margin-top: 25px;
border-top: 1px solid #555;
padding-top: 15px;
}}
.raw-output h3 {{
margin-bottom: 10px;
color: #f0f0f0;
}}
pre {{
background-color: #23272a;
color: #dcdcdc;
padding: 15px;
border-radius: 5px;
white-space: pre-wrap; /* Wrap long lines */
word-wrap: break-word; /* Break words if necessary */
font-family: monospace;
font-size: 0.95em;
max-height: 300px; /* Limit height */
overflow-y: auto; /* Add scrollbar if needed */
}}
/* Styling for the 'No Action' message box */
.message-box {{
text-align: center;
padding: 30px;
}}
.message-box .status {{
font-family: 'Press Start 2P', cursive;
font-size: 1.8em;
color: #ff7f0f; /* Orange */
margin-bottom: 15px;
}}
.message-box .instruction {{
font-size: 1.1em;
color: #cccccc;
}}
</style>
</head>
<body>
<div class="container">
<h1>Agent Action Log</h1>
{content}
</div>
</body>
</html>
"""
# --------------------------------------------------------------------
async def update_display_html(new_html_fragment: str) -> None:
"""Updates the current display HTML fragment and broadcasts to all clients."""
await manager.update_all(new_html_fragment)
print("HTML Display FRAGMENT UPDATED and broadcasted.")
# --- Agent Lifecycle Management ---
async def select_and_activate_new_agent():
"""Selects a random available agent, instantiates it, and starts its listening task."""
# --- MODIFIED: Make sure globals are declared ---
global active_agent_name, active_agent_instance, active_agent_task, last_llm_action
# -----------------------------------------------
# --- MODIFIED: Reset last action when selecting a new agent ---
last_llm_action = None
# -----------------------------------------------------------
if not AVAILABLE_AGENT_NAMES:
print("Lifecycle: No available agents with passwords set.")
await update_display_html(create_error_html("No agents available. Check server logs/environment variables."))
return False
selected_name = random.choice(AVAILABLE_AGENT_NAMES)
config = AGENT_CONFIGS[selected_name]
AgentClass = config["class"]
password_env_var = config["password_env_var"]
agent_password = os.environ.get(password_env_var)
print(f"Lifecycle: Activating agent '{selected_name}'...")
await update_display_html(create_idle_html("Selecting Next Agent...", f"Preparing <strong>{selected_name}</strong>..."))
try:
account_config = AccountConfiguration(selected_name, agent_password)
# --- MODIFIED: Pass the action callback to the agent constructor ---
agent = AgentClass(
account_configuration=account_config,
server_configuration=custom_config,
battle_format=DEFAULT_BATTLE_FORMAT,
log_level=logging.INFO,
max_concurrent_battles=1,
action_callback=update_last_action_callback # Pass the callback function
)
# -------------------------------------------------------------------
task = asyncio.create_task(agent.accept_challenges(None, 1), name=f"AcceptChallenge_{selected_name}")
task.add_done_callback(log_task_exception)
active_agent_name = selected_name
active_agent_instance = agent
active_agent_task = task
print(f"Lifecycle: Agent '{selected_name}' is active and listening for 1 challenge.")
await update_display_html(create_idle_html(f"Agent Ready: <strong>{selected_name}</strong>",
f"Please challenge <strong>{selected_name}</strong> to a <strong>{DEFAULT_BATTLE_FORMAT}</strong> battle."))
return True
except Exception as e:
error_msg = f"Failed to activate agent '{selected_name}': {e}"
print(error_msg)
traceback.print_exc()
await update_display_html(create_error_html(f"Error activating {selected_name}. Please wait or check logs."))
active_agent_name = None
active_agent_instance = None
active_agent_task = None
return False
async def check_for_new_battle():
"""Checks if the active agent has started a battle with a valid tag."""
global active_agent_instance, current_battle_instance, active_agent_name, active_agent_task
if active_agent_instance:
battle = get_active_battle(active_agent_instance)
if battle and battle.battle_tag:
current_battle_instance = battle
print(f"Lifecycle: Agent '{active_agent_name}' started battle: {battle.battle_tag}")
if active_agent_task and not active_agent_task.done():
print(f"Lifecycle: Cancelling accept_challenges task for {active_agent_name} as battle started.")
active_agent_task.cancel()
async def deactivate_current_agent(reason: str = "cycle"):
"""Cleans up the currently active agent and resets state."""
global active_agent_name, active_agent_instance, active_agent_task, current_battle_instance
agent_name_to_deactivate = active_agent_name
print(f"Lifecycle: Deactivating agent '{agent_name_to_deactivate}' (Reason: {reason})...")
if reason == "battle_end":
await update_display_html(create_idle_html("Battle Finished!", f"Agent <strong>{agent_name_to_deactivate}</strong> completed the match."))
elif reason == "cycle":
await update_display_html(create_idle_html("Cycling Agents", f"Switching from <strong>{agent_name_to_deactivate}</strong>..."))
elif reason == "forfeited_private_battle":
await update_display_html(create_idle_html("Switching Agent", f"Agent <strong>{agent_name_to_deactivate}</strong> forfeited a private battle."))
else: # Generic reason or error
await update_display_html(create_idle_html(f"Resetting Agent ({reason})", f"Cleaning up <strong>{agent_name_to_deactivate}</strong>..."))
await asyncio.sleep(3)
await update_display_html(create_idle_html("Preparing Next Agent...", "Please wait..."))
agent = active_agent_instance
task = active_agent_task
active_agent_name = None
active_agent_instance = None
active_agent_task = None
current_battle_instance = None
print(f"Lifecycle: Global state cleared for '{agent_name_to_deactivate}'.")
if task and not task.done():
print(f"Lifecycle: Ensuring task cancellation for {agent_name_to_deactivate} ({task.get_name()})...")
task.cancel()
try:
await asyncio.wait_for(task, timeout=2.0)
print(f"Lifecycle: Task cancellation confirmed for {agent_name_to_deactivate}.")
except asyncio.CancelledError:
print(f"Lifecycle: Task cancellation confirmation (CancelledError) for {agent_name_to_deactivate}.")
except asyncio.TimeoutError:
print(f"Lifecycle: Task did not confirm cancellation within timeout for {agent_name_to_deactivate}.")
except Exception as e:
print(f"Lifecycle: Error during task cancellation wait for {agent_name_to_deactivate}: {e}")
if agent:
print(f"Lifecycle: Disconnecting player {agent.username}...")
try:
if hasattr(agent, '_websocket') and agent._websocket and agent._websocket.open:
await agent.disconnect()
print(f"Lifecycle: Player {agent.username} disconnected successfully.")
else:
print(f"Lifecycle: Player {agent.username} already disconnected or websocket not available.")
except Exception as e:
print(f"ERROR during agent disconnect ({agent.username}): {e}")
traceback.print_exc()
await asyncio.sleep(2)
print(f"Lifecycle: Agent '{agent_name_to_deactivate}' deactivation complete.")
async def manage_agent_lifecycle():
"""Runs the main loop selecting, running, and cleaning up agents sequentially."""
global active_agent_name, active_agent_instance, active_agent_task, current_battle_instance
print("Background lifecycle manager started.")
REFRESH_INTERVAL_SECONDS = 3
LOOP_COOLDOWN_SECONDS = 1
ERROR_RETRY_DELAY_SECONDS = 10
POST_BATTLE_DELAY_SECONDS = 5
loop_counter = 0
while True:
loop_counter += 1
loop_start_time = time.monotonic()
print(f"\n--- Lifecycle Check #{loop_counter} [{time.strftime('%H:%M:%S')}] ---")
try:
if active_agent_instance is None:
print(f"[{loop_counter}] State 1: No active agent. Selecting...")
activated = await select_and_activate_new_agent()
if not activated:
print(f"[{loop_counter}] State 1: Activation failed. Waiting {ERROR_RETRY_DELAY_SECONDS}s before retry.")
await asyncio.sleep(ERROR_RETRY_DELAY_SECONDS)
else:
print(f"[{loop_counter}] State 1: Agent '{active_agent_name}' activated successfully.")
else: # Agent is active
agent_name = active_agent_name
print(f"[{loop_counter}] State 2: Agent '{agent_name}' is active.")
if current_battle_instance is None:
print(f"[{loop_counter}] State 2a: Checking for new battle for '{agent_name}'...")
await check_for_new_battle()
if current_battle_instance:
battle_tag = current_battle_instance.battle_tag
print(f"[{loop_counter}] State 2a: *** NEW BATTLE DETECTED: {battle_tag} for '{agent_name}' ***")
parts = battle_tag.split('-')
is_suffixed_format = len(parts) > 3 and parts[2].isdigit()
if is_suffixed_format:
print(f"[{loop_counter}] Detected potentially non-public battle format ({battle_tag}). Forfeiting.")
try:
if active_agent_instance:
await active_agent_instance.forfeit(battle_tag)
print(f"[{loop_counter}] Sent forfeit command for {battle_tag}.")
await asyncio.sleep(1.5)
except Exception as forfeit_err:
print(f"[{loop_counter}] ERROR sending forfeit for {battle_tag}: {forfeit_err}")
await deactivate_current_agent(reason="forfeited_private_battle")
continue
else:
print(f"[{loop_counter}] Public battle format detected. Displaying battle {battle_tag}.")
await update_display_html(create_battle_iframe(battle_tag))
else:
print(f"[{loop_counter}] State 2a: No new battle found. Agent '{agent_name}' remains idle, waiting for challenge.")
idle_html = create_idle_html(f"Agent Ready: <strong>{agent_name}</strong>",
f"Please challenge <strong>{agent_name}</strong> to a <strong>{DEFAULT_BATTLE_FORMAT}</strong> battle.")
await update_display_html(idle_html)
await asyncio.sleep(REFRESH_INTERVAL_SECONDS)
if current_battle_instance is not None: # Check again in case a battle just started
battle_tag = current_battle_instance.battle_tag
print(f"[{loop_counter}] State 2b: Monitoring battle {battle_tag} for '{agent_name}'")
if not active_agent_instance:
print(f"[{loop_counter}] WARNING: Agent instance for '{agent_name}' disappeared while monitoring battle {battle_tag}! Deactivating.")
await deactivate_current_agent(reason="agent_disappeared_mid_battle")
continue
battle_obj = active_agent_instance._battles.get(battle_tag)
if battle_obj and battle_obj.finished:
print(f"[{loop_counter}] Battle {battle_tag} is FINISHED. Deactivating agent '{agent_name}'.")
await deactivate_current_agent(reason="battle_end")
print(f"[{loop_counter}] Waiting {POST_BATTLE_DELAY_SECONDS}s post-battle before selecting next agent.")
await asyncio.sleep(POST_BATTLE_DELAY_SECONDS)
continue
elif not battle_obj:
print(f"[{loop_counter}] WARNING: Battle object for {battle_tag} not found in agent's list for '{agent_name}'. Battle might have ended abruptly. Deactivating.")
await deactivate_current_agent(reason="battle_object_missing")
continue
else:
print(f"[{loop_counter}] Battle {battle_tag} ongoing for '{agent_name}'.")
await asyncio.sleep(REFRESH_INTERVAL_SECONDS)
except asyncio.CancelledError:
print("Lifecycle manager task cancelled.")
raise
except Exception as e:
print(f"!!! ERROR in main lifecycle loop #{loop_counter}: {e} !!!")
traceback.print_exc()
current_agent_name_err = active_agent_name # Use different var name to avoid conflict
if active_agent_instance:
print(f"Attempting to deactivate agent '{current_agent_name_err}' due to loop error...")
try:
await deactivate_current_agent(reason="main_loop_error")
except Exception as deactivation_err:
print(f"Error during error-handling deactivation: {deactivation_err}")
active_agent_name = None
active_agent_instance = None
active_agent_task = None
current_battle_instance = None
else:
print("No active agent instance during loop error.")
await update_display_html(create_error_html(f"A server error occurred in the lifecycle manager. Please wait. ({e})"))
print(f"Waiting {ERROR_RETRY_DELAY_SECONDS}s after loop error.")
await asyncio.sleep(ERROR_RETRY_DELAY_SECONDS)
continue
elapsed_time = time.monotonic() - loop_start_time
if elapsed_time < LOOP_COOLDOWN_SECONDS:
await asyncio.sleep(LOOP_COOLDOWN_SECONDS - elapsed_time)
def log_task_exception(task: asyncio.Task):
"""Callback to log exceptions from background tasks (like accept_challenges)."""
try:
if task.cancelled():
print(f"Task '{task.get_name()}' was cancelled.")
return
task.result()
print(f"Task '{task.get_name()}' completed successfully.")
except asyncio.CancelledError:
print(f"Task '{task.get_name()}' confirmed cancelled (exception caught).")
pass
except Exception as e:
print(f"!!! Exception in background task '{task.get_name()}': {e} !!!")
traceback.print_exc()
# --- WebSocket connection manager ---
class ConnectionManager:
def __init__(self):
self.active_connections: Set[WebSocket] = set()
self.current_html_fragment: str = create_idle_html("Initializing...", "Setting up PokΓ©mon Battle Stream")
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.add(websocket)
print(f"Client connected. Sending current state. Total clients: {len(self.active_connections)}")
try:
await websocket.send_text(self.current_html_fragment)
except Exception as e:
print(f"Error sending initial state to new client: {e}")
await self.disconnect(websocket)
async def disconnect(self, websocket: WebSocket):
self.active_connections.discard(websocket)
print(f"Client disconnected. Total clients: {len(self.active_connections)}")
async def update_all(self, html_fragment: str):
"""Update the current HTML fragment and broadcast to all clients."""
if self.current_html_fragment == html_fragment:
return
self.current_html_fragment = html_fragment
if not self.active_connections:
return
print(f"Broadcasting update to {len(self.active_connections)} clients...")
send_tasks = [
connection.send_text(html_fragment)
for connection in list(self.active_connections)
]
results = await asyncio.gather(*send_tasks, return_exceptions=True)
connections_to_remove = set()
# Need to iterate carefully if connections can change during gather
conn_list = list(self.active_connections)
for i, result in enumerate(results):
# Ensure index is valid if connections changed mid-gather
if i < len(conn_list):
connection = conn_list[i]
if isinstance(result, Exception):
print(f"Error sending update to client: {result}. Marking for removal.")
connections_to_remove.add(connection)
for connection in connections_to_remove:
# Check if connection still exists before disconnecting
if connection in self.active_connections:
await self.disconnect(connection)
manager = ConnectionManager()
# --- API Routes ---
@app.get("/", response_class=HTMLResponse)
async def get_homepage():
"""Serves the main HTML page with WebSocket connection and improved styling."""
# ... (HTML remains the same as before)
return """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Pokemon Battle Livestream</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Poppins:wght@400;700&family=Press+Start+2P&display=swap" rel="stylesheet">
<style>
/* Basic Reset */
* {
box-sizing: border-box;
}
html, body {
margin: 0;
padding: 0;
height: 100%;
width: 100%;
overflow: hidden; /* Prevent scrollbars on body */
font-family: 'Poppins', sans-serif; /* Default font */
color: #ffffff; /* Default text color */
background-color: #1a1a1a; /* Dark background */
}
/* Container for dynamic content */
#stream-container {
position: fixed; /* Use fixed to ensure it covers viewport */
top: 0;
left: 0;
width: 100%;
height: 100%;
display: flex; /* Use flexbox for centering content */
justify-content: center;
align-items: center;
}
/* Iframe Styling */
.battle-iframe {
width: 100%;
height: 100%;
border: none; /* Remove default border */
display: block; /* Prevents potential extra space below iframe */
}
/* Base Content Container Styling (used by idle/error) */
.content-container {
width: 100%;
height: 100%;
display: flex;
flex-direction: column;
justify-content: center;
align-items: center;
padding: 20px;
text-align: center;
}
/* Idle Screen Specific Styling */
.idle-container {
/* Ensure the background covers the entire container */
background-image: url('/static/pokemon_huggingface.png');
background-size: cover;
background-position: center;
background-repeat: no-repeat;
}
/* Error Screen Specific Styling */
.error-container {
background: linear-gradient(135deg, #4d0000, #1a0000); /* Dark red gradient */
}
/* Message Box Styling (shared by idle/error) */
.message-box {
background-color: rgba(0, 0, 0, 0.75); /* Darker, more opaque */
padding: 40px 50px; /* More padding */
border-radius: 20px; /* More rounded */
max-width: 70%; /* Max width */
box-shadow: 0 8px 25px rgba(0, 0, 0, 0.5); /* Softer shadow */
border: 1px solid rgba(255, 255, 255, 0.1); /* Subtle border */
}
.status {
font-family: 'Press Start 2P', cursive; /* Pixel font for status */
font-size: clamp(1.5em, 4vw, 2.5em); /* Responsive font size */
margin-bottom: 25px;
color: #ffcb05; /* Pokemon Yellow */
text-shadow: 3px 3px 0px #3b4cca; /* Pokemon Blue shadow */
/* Subtle pulse animation for idle status */
animation: pulse 2s infinite ease-in-out;
}
.instruction {
font-size: clamp(1em, 2.5vw, 1.4em); /* Responsive font size */
color: #f0f0f0; /* Light grey for readability */
line-height: 1.6;
text-shadow: 1px 1px 3px rgba(0, 0, 0, 0.7);
}
.instruction strong {
color: #ff7f0f; /* A contrasting color like orange */
font-weight: 700; /* Ensure Poppins bold is used */
}
/* Error Screen Specific Text Styling */
.error-container .status {
color: #ff4d4d; /* Bright Red for error status */
text-shadow: 2px 2px 0px #800000; /* Darker red shadow */
animation: none; /* No pulse on error */
}
.error-container .instruction {
color: #ffdddd; /* Lighter red for error details */
}
/* Pulse Animation */
@keyframes pulse {
0% { transform: scale(1); }
50% { transform: scale(1.03); }
100% { transform: scale(1); }
}
</style>
</head>
<body>
<div id="stream-container">
</div>
<script>
const streamContainer = document.getElementById('stream-container');
let ws = null; // WebSocket instance
function connectWebSocket() {
const wsProtocol = location.protocol === 'https:' ? 'wss' : 'ws';
const wsUrl = `${wsProtocol}://${location.host}/ws`;
ws = new WebSocket(wsUrl);
console.log('Attempting to connect to WebSocket server...');
ws.onopen = (event) => {
console.log('WebSocket connection established.');
};
ws.onmessage = (event) => {
streamContainer.innerHTML = event.data;
};
ws.onclose = (event) => {
console.log(`WebSocket connection closed. Code: ${event.code}, Reason: ${event.reason}. Attempting to reconnect in 5 seconds...`);
ws = null;
streamContainer.innerHTML = createReconnectMessage();
setTimeout(connectWebSocket, 5000);
};
ws.onerror = (event) => {
console.error('WebSocket error:', event);
streamContainer.innerHTML = createErrorMessage("WebSocket connection error. Attempting to reconnect...");
if (ws && ws.readyState !== WebSocket.CLOSED) {
ws.close();
}
};
}
function createReconnectMessage() {
return `
<div class="content-container error-container" style="background: #333;">
<div class="message-box" style="background-color: rgba(0,0,0,0.6);">
<p class="status" style="color: #ffcb05; text-shadow: none; animation: none;">πŸ”Œ Disconnected πŸ”Œ</p>
<p class="instruction" style="color: #eee;">Connection lost. Attempting to reconnect automatically...</p>
</div>
</div>`;
}
function createErrorMessage(message) {
return `
<div class="content-container error-container">
<div class="message-box">
<p class="status">🚨 Error 🚨</p>
<p class="instruction">${message}</p>
</div>
</div>`;
}
connectWebSocket();
</script>
</body>
</html>
"""
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(f"Received unexpected message from client: {data}")
except WebSocketDisconnect as e:
print(f"WebSocket disconnected: Code {e.code}, Reason: {getattr(e, 'reason', 'N/A')}")
await manager.disconnect(websocket)
except Exception as e:
print(f"WebSocket error: {e}")
traceback.print_exc()
await manager.disconnect(websocket)
# --- NEW: Route to display the last action ---
@app.get("/last_action", response_class=HTMLResponse)
async def get_last_action():
"""Serves an HTML page displaying the last recorded agent action."""
global last_llm_action
# Return the formatted HTML page using the current state
return create_last_action_html(last_llm_action)
# --------------------------------------------
# --- Lifecyle Events ---
@app.on_event("startup")
async def startup_event():
"""Start background tasks when the application starts."""
global background_task_handle
static_dir = "static"
if not os.path.exists(static_dir):
os.makedirs(static_dir)
print(f"Created static directory at: {os.path.abspath(static_dir)}")
print("!!! Please add 'pokemon_huggingface.png' to this directory! !!!")
app.mount("/static", StaticFiles(directory=static_dir), name="static")
print(f"Mounted static directory '{static_dir}' at '/static'")
print("πŸš€ Starting background tasks")
background_task_handle = asyncio.create_task(manage_agent_lifecycle(), name="LifecycleManager")
background_task_handle.add_done_callback(log_task_exception)
print("βœ… Background tasks started")
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up tasks when shutting down."""
global background_task_handle, active_agent_instance
print("\nπŸ”Œ Shutting down application. Cleaning up...")
if background_task_handle and not background_task_handle.done():
print("Cancelling background task...")
background_task_handle.cancel()
try:
await asyncio.wait_for(background_task_handle, timeout=5.0)
print("Background task cancelled successfully.")
except asyncio.CancelledError:
print("Background task cancellation confirmed (CancelledError).")
except asyncio.TimeoutError:
print("Background task did not finish cancelling within timeout.")
except Exception as e:
print(f"Error during background task cancellation: {e}")
agent_to_disconnect = active_agent_instance
if agent_to_disconnect:
agent_name = agent_to_disconnect.username if hasattr(agent_to_disconnect, 'username') else 'Unknown Agent'
print(f"Disconnecting active agent '{agent_name}'...")
try:
if hasattr(agent_to_disconnect, '_websocket') and agent_to_disconnect._websocket and agent_to_disconnect._websocket.open:
await agent_to_disconnect.disconnect()
print(f"Agent '{agent_name}' disconnected.")
else:
print(f"Agent '{agent_name}' already disconnected or websocket not available.")
except Exception as e:
print(f"Error during agent disconnect on shutdown for '{agent_name}': {e}")
print(f"Closing {len(manager.active_connections)} client WebSocket connections...")
close_tasks = [
conn.close(code=1000, reason="Server shutting down")
for conn in list(manager.active_connections)
]
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
print("βœ… Cleanup complete. Application shutdown.")
# --- Main execution ---
if __name__ == "__main__":
import uvicorn
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.getLogger('poke_env').setLevel(logging.WARNING)
logging.getLogger('websockets.client').setLevel(logging.INFO)
print("Starting Pokemon Battle Livestream Server...")
print("="*60)
if not AVAILABLE_AGENT_NAMES:
print("β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ FATAL ERROR β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ")
print(" No agents found with configured passwords!")
print(" Please set the required environment variables:")
for name, cfg in AGENT_CONFIGS.items():
print(f" - {cfg.get('password_env_var', 'N/A')} (for agent: {name})")
print("="*60)
exit("Exiting due to missing agent passwords.")
else:
print("✨ Available Agents Found:")
for name in AVAILABLE_AGENT_NAMES:
print(f" - {name}")
print("="*60)
print(f"Server will run on http://0.0.0.0:7860")
print("Access the action log at http://0.0.0.0:7860/last_action") # Added info
print("="*60)
uvicorn.run(
"main:app",
host="0.0.0.0",
port=7860,
reload=False,
log_level="info"
)