Spaces:
Sleeping
Sleeping
import gradio as gr | |
import asyncio | |
import websockets | |
import uuid | |
import argparse | |
from datetime import datetime | |
import os | |
import random | |
import time | |
# Fun usernames with emojis - the VIP list of quirky characters! ππ | |
FUN_USERNAMES = [ | |
"CosmicJester π", "PixelPanda πΌ", "QuantumQuack π¦", "StellarSquirrel πΏοΈ", | |
"GizmoGuru βοΈ", "NebulaNinja π ", "ByteBuster πΎ", "GalacticGopher π", | |
"RocketRaccoon π", "EchoElf π§", "PhantomFox π¦", "WittyWizard π§", | |
"LunarLlama π", "SolarSloth βοΈ", "AstroAlpaca π¦", "CyberCoyote πΊ", | |
"MysticMoose π¦", "GlitchGnome π§", "VortexViper π", "ChronoChimp π" | |
] | |
# Directory for chat logs - the secret vault where tales are stashed! ποΈπ | |
CHAT_DIR = "chat_logs" | |
os.makedirs(CHAT_DIR, exist_ok=True) | |
# Persistent chat file - the grand tome of all chatter! πβ¨ | |
CHAT_FILE = os.path.join(CHAT_DIR, "global_chat.md") | |
# Node name - the appβs codename generator, sneaky and slick! π΅οΈββοΈπΎ | |
def get_node_name(): | |
"""π² Spins the wheel of fate to name our node - a random alias or user pick! π·οΈ""" | |
parser = argparse.ArgumentParser(description='Start a chat node with a specific name') | |
parser.add_argument('--node-name', type=str, default=None, help='Name for this chat node') | |
parser.add_argument('--port', type=int, default=7860, help='Port to run the Gradio interface on') | |
args = parser.parse_args() | |
return args.node_name or f"node-{uuid.uuid4().hex[:8]}", args.port | |
# Chat saver - the scribe etching epic messages into the eternal scroll! ποΈπ | |
def save_chat_entry(username, message): | |
"""ποΈ Carves a chat line into the grand Markdown tome - history in the making! ποΈ""" | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
entry = f"[{timestamp}] {username}: {message}" | |
try: | |
with open(CHAT_FILE, 'a') as f: | |
f.write(f"{entry}\n") | |
return True | |
except Exception as e: | |
print(f"Oops! Failed to save chat: {e}") | |
return False | |
# Chat loader - the archaeologist unearthing the chat saga! βοΈπ | |
def load_chat(): | |
"""π Digs up the chat treasure from the filesystem - tales of old and new! π°""" | |
if not os.path.exists(CHAT_FILE): | |
with open(CHAT_FILE, 'w') as f: | |
f.write("# Global Chat\n\nNo messages yet - start chatting! π€\n") | |
try: | |
with open(CHAT_FILE, 'r') as f: | |
content = f.read() | |
lines = content.strip().split('\n') | |
numbered_content = "\n".join(f"{i+1}. {line}" for i, line in enumerate(lines) if line.strip()) | |
return numbered_content | |
except Exception as e: | |
print(f"Chat load hiccup: {e}") | |
return "# Error loading chat\nSomething went wonky! π΅" | |
# User list grabber - the social butterfly spotting all the chatters! π¦π₯ | |
def get_user_list(chat_content): | |
"""π Peeks at the chat to spot all the cool cats talking - whoβs in the club? πΈ""" | |
users = set() | |
for line in chat_content.split('\n'): | |
if line.strip() and ': ' in line: | |
user = line.split(': ')[1].split(' ')[0] | |
users.add(user) | |
return sorted(list(users)) | |
active_connections = {} | |
# WebSocket handler - the bouncer at the chat rave, keeping it hopping! ππͺ | |
async def websocket_handler(websocket, path): | |
"""π§ Guards the chat gate, letting messages fly and booting crashers! π¨""" | |
try: | |
client_id = str(uuid.uuid4()) | |
room_id = "chat" | |
active_connections.setdefault(room_id, {})[client_id] = websocket | |
print(f"Client {client_id} joined the chat party!") | |
async for message in websocket: | |
try: | |
# Parse message as simple text (no JSON) | |
parts = message.split('|', 1) | |
if len(parts) == 2: | |
username, content = parts | |
save_chat_entry(username, content) | |
await broadcast_message(f"{username}|{content}", room_id) | |
except Exception as e: | |
print(f"Message mishap: {e}") | |
await websocket.send(f"ERROR|Oops, bad message format! π¬") | |
except websockets.ConnectionClosed: | |
print(f"Client {client_id} bailed from the chat!") | |
finally: | |
if room_id in active_connections and client_id in active_connections[room_id]: | |
del active_connections[room_id][client_id] | |
if not active_connections[room_id]: | |
del active_connections[room_id] | |
# Broadcaster - the megaphone blasting chat vibes to all! π£πΆ | |
async def broadcast_message(message, room_id): | |
"""π’ Shouts the latest chat beat to every dancer in the room - hear it loud! π΅""" | |
if room_id in active_connections: | |
disconnected = [] | |
for client_id, ws in active_connections[room_id].items(): | |
try: | |
await ws.send(message) | |
except websockets.ConnectionClosed: | |
disconnected.append(client_id) | |
for client_id in disconnected: | |
del active_connections[room_id][client_id] | |
# WebSocket starter - the DJ spinning up the chat tunes! π§π₯ | |
async def start_websocket_server(host='0.0.0.0', port=8765): | |
"""π Cranks up the WebSocket jukebox, ready to rock the chat scene! πΈ""" | |
server = await websockets.serve(websocket_handler, host, port) | |
print(f"WebSocket server jamming on ws://{host}:{port}") | |
return server | |
# Chat interface maker - the stage builder for our chat extravaganza! πποΈ | |
def create_gradio_interface(initial_username): | |
"""ποΈ Sets up the chat stage with live updates and name-switching flair! π""" | |
with gr.Blocks(title=f"Chat Node: {NODE_NAME}", css=".chat-box { font-family: monospace; background: #1e1e1e; color: #d4d4d4; padding: 10px; border-radius: 5px; }") as demo: | |
# Shared state - the magic memory box all clients peek into! πͺπ¦ | |
chat_state = gr.State(value=load_chat()) | |
gr.Markdown(f"# Chat Node: {NODE_NAME}") | |
gr.Markdown("Chat live, switch names, and keep the party going! π") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
chat_display = gr.Code(label="Chat History", language="markdown", lines=15, elem_classes=["chat-box"]) | |
with gr.Column(scale=1): | |
user_list = gr.Dropdown(label="Switch User", choices=get_user_list(chat_state.value), value=initial_username) | |
with gr.Row(): | |
username_input = gr.Textbox(label="Your Name", value=initial_username) | |
message_input = gr.Textbox(label="Message", placeholder="Type your epic line here! βοΈ") | |
send_button = gr.Button("Send π") | |
# Chat updater - the wizard keeping the chat spell alive! π§ββοΈβ¨ | |
def update_chat(username, message, current_chat): | |
"""π© Conjures a fresh chat view with your latest quip - abracadabra! π""" | |
if message.strip(): | |
save_chat_entry(username, message) | |
new_chat = load_chat() | |
return new_chat, get_user_list(new_chat) | |
# Name switcher - the shapeshifter swapping your chat persona! π¦ΈββοΈπ | |
def switch_user(new_username, current_chat): | |
"""π¦ Transforms you into a new chat hero - new name, same game! π""" | |
return new_username, current_chat, get_user_list(current_chat) | |
# Live chat persistence - the timekeeper syncing the chat clock! β°π | |
async def persist_chat(): | |
"""β³ Keeps the chat tome in sync, scribbling updates every few ticks! π""" | |
while True: | |
await asyncio.sleep(5) # Persist every 5 seconds | |
with open(CHAT_FILE, 'r') as f: | |
current = f.read() | |
chat_state.value = current | |
# Hook up the magic! π£ | |
send_button.click( | |
fn=update_chat, | |
inputs=[username_input, message_input, chat_state], | |
outputs=[chat_display, user_list], | |
_js="() => ['', document.getElementById('message_input').value, document.getElementById('chat_display').value]" | |
) | |
message_input.submit( | |
fn=update_chat, | |
inputs=[username_input, message_input, chat_state], | |
outputs=[chat_display, user_list] | |
) | |
user_list.change( | |
fn=switch_user, | |
inputs=[user_list, chat_state], | |
outputs=[username_input, chat_display, user_list] | |
) | |
# Start with the latest chat vibes! π΅ | |
demo.load(lambda: (load_chat(), get_user_list(load_chat())), None, [chat_display, user_list]) | |
# Kick off the persistence dance! π | |
asyncio.create_task(persist_chat()) | |
return demo | |
# Main event - the ringmaster kicking off the chat circus! πͺπ€‘ | |
async def main(): | |
"""π€ Drops the mic and starts the chat party - itβs showtime, folks! π""" | |
global NODE_NAME | |
NODE_NAME, port = get_node_name() | |
await start_websocket_server() | |
# Grab username from URL or roll the dice! π² | |
initial_username = os.environ.get("QUERY_STRING", "").split("username=")[-1] if "username=" in os.environ.get("QUERY_STRING", "") else random.choice(FUN_USERNAMES) | |
print(f"Welcoming {initial_username} to the chat bash!") | |
interface = create_gradio_interface(initial_username) | |
# URL param magic - the gatekeeper checking your VIP pass! ποΈ | |
from starlette.middleware.base import BaseHTTPMiddleware | |
class UsernameMiddleware(BaseHTTPMiddleware): | |
async def dispatch(self, request, call_next): | |
"""π Snags your name from the URL VIP list - exclusive entry! π©""" | |
query_params = dict(request.query_params) | |
if "username" in query_params and query_params["username"] in FUN_USERNAMES: | |
global initial_username | |
initial_username = query_params["username"] | |
return await call_next(request) | |
app = gr.routes.App.create_app(interface) | |
app.add_middleware(UsernameMiddleware) | |
import uvicorn | |
await uvicorn.Server(uvicorn.Config(app, host="0.0.0.0", port=port)).serve() | |
if __name__ == "__main__": | |
asyncio.run(main()) |