Spaces:
Sleeping
Sleeping
import streamlit as st | |
import asyncio | |
import websockets | |
import uuid | |
import argparse | |
from datetime import datetime | |
import os | |
import random | |
import time | |
import hashlib | |
from PIL import Image | |
import glob | |
from urllib.parse import quote | |
import base64 | |
import io | |
import streamlit.components.v1 as components | |
import edge_tts | |
from audio_recorder_streamlit import audio_recorder | |
import nest_asyncio | |
import re | |
# Patch for nested async - sneaky fix! πβ¨ | |
nest_asyncio.apply() | |
# Static config - constants rule! ππ | |
icons = 'π€π§ π¬π' | |
START_ROOM = "Sector π" | |
# Page setup - dressing up the window! πΌοΈπ | |
st.set_page_config( | |
page_title="π€π§ MMO Chat Brainππ¬", | |
page_icon=icons, | |
layout="wide", | |
initial_sidebar_state="auto" | |
) | |
# Funky usernames - whoβs who in the zoo with unique voices! ππΎποΈ | |
FUN_USERNAMES = { | |
"CosmicJester π": "en-US-AriaNeural", # US Female, bright & clear | |
"PixelPanda πΌ": "en-US-JennyNeural", # US Female, warm & friendly | |
"QuantumQuack π¦": "en-GB-SoniaNeural", # UK Female, posh & crisp | |
"StellarSquirrel πΏοΈ": "en-AU-NatashaNeural", # AU Female, lively & upbeat | |
"GizmoGuru βοΈ": "en-CA-ClaraNeural", # CA Female, calm & soothing | |
"NebulaNinja π ": "en-US-GuyNeural", # US Male, deep & cool | |
"ByteBuster πΎ": "en-GB-RyanNeural", # UK Male, smooth & refined | |
"GalacticGopher π": "en-AU-WilliamNeural", # AU Male, bold & rugged | |
"RocketRaccoon π": "en-CA-LiamNeural", # CA Male, steady & warm | |
"EchoElf π§": "en-US-AnaNeural", # US Female, soft & gentle (child-like) | |
"PhantomFox π¦": "en-US-BrandonNeural", # US Male, confident & rich | |
"WittyWizard π§": "en-GB-ThomasNeural", # UK Male, authoritative & clear | |
"LunarLlama π": "en-AU-FreyaNeural", # AU Female, sweet & melodic | |
"SolarSloth βοΈ": "en-CA-LindaNeural", # CA Female, neutral & pleasant | |
"AstroAlpaca π¦": "en-US-ChristopherNeural",# US Male, strong & resonant | |
"CyberCoyote πΊ": "en-GB-ElliotNeural", # UK Male, youthful & energetic | |
"MysticMoose π¦": "en-AU-JamesNeural", # AU Male, deep & grounded | |
"GlitchGnome π§": "en-CA-EthanNeural", # CA Male, bright & lively | |
"VortexViper π": "en-US-AmberNeural", # US Female, expressive & vibrant | |
"ChronoChimp π": "en-GB-LibbyNeural" # UK Female, cheerful & distinct | |
} | |
# Folders galore - organizing chaos! ππ | |
CHAT_DIR = "chat_logs" | |
VOTE_DIR = "vote_logs" | |
STATE_FILE = "user_state.txt" | |
AUDIO_DIR = "audio_logs" | |
HISTORY_DIR = "history_logs" | |
os.makedirs(CHAT_DIR, exist_ok=True) | |
os.makedirs(VOTE_DIR, exist_ok=True) | |
os.makedirs(AUDIO_DIR, exist_ok=True) | |
os.makedirs(HISTORY_DIR, exist_ok=True) | |
CHAT_FILE = os.path.join(CHAT_DIR, "global_chat.md") | |
QUOTE_VOTES_FILE = os.path.join(VOTE_DIR, "quote_votes.md") | |
MEDIA_VOTES_FILE = os.path.join(VOTE_DIR, "media_votes.md") | |
HISTORY_FILE = os.path.join(HISTORY_DIR, "chat_history.md") | |
# Fancy digits - numbers got style! π’π | |
UNICODE_DIGITS = {i: f"{i}\uFE0Fβ£" for i in range(10)} | |
# Massive font collection - typography bonanza! ποΈπ¨ | |
UNICODE_FONTS = [ | |
("Normal", lambda x: x), | |
("Bold", lambda x: "".join(chr(ord(c) + 0x1D400 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D41A - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Italic", lambda x: "".join(chr(ord(c) + 0x1D434 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D44E - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Bold Italic", lambda x: "".join(chr(ord(c) + 0x1D468 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D482 - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Script", lambda x: "".join(chr(ord(c) + 0x1D49C - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D4B6 - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Bold Script", lambda x: "".join(chr(ord(c) + 0x1D4D0 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D4EA - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Fraktur", lambda x: "".join(chr(ord(c) + 0x1D504 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D51E - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Bold Fraktur", lambda x: "".join(chr(ord(c) + 0x1D56C - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D586 - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Double Struck", lambda x: "".join(chr(ord(c) + 0x1D538 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D552 - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Sans Serif", lambda x: "".join(chr(ord(c) + 0x1D5A0 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D5BA - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Sans Serif Bold", lambda x: "".join(chr(ord(c) + 0x1D5D4 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D5EE - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Sans Serif Italic", lambda x: "".join(chr(ord(c) + 0x1D608 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D622 - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Sans Serif Bold Italic", lambda x: "".join(chr(ord(c) + 0x1D63C - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D656 - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Monospace", lambda x: "".join(chr(ord(c) + 0x1D670 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D68A - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
("Circled", lambda x: "".join(chr(ord(c) - 0x41 + 0x24B6) if 'A' <= c <= 'Z' else chr(ord(c) - 0x61 + 0x24D0) if 'a' <= c <= 'z' else c for c in x)), | |
("Squared", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F130) if 'A' <= c <= 'Z' else c for c in x)), | |
("Negative Circled", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F150) if 'A' <= c <= 'Z' else c for c in x)), | |
("Negative Squared", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F170) if 'A' <= c <= 'Z' else c for c in x)), | |
("Regional Indicator", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F1E6) if 'A' <= c <= 'Z' else c for c in x)), | |
] | |
# Global state - keeping tabs! ππ | |
if 'server_running' not in st.session_state: | |
st.session_state.server_running = False | |
if 'server_task' not in st.session_state: | |
st.session_state.server_task = None | |
if 'active_connections' not in st.session_state: | |
st.session_state.active_connections = {} | |
# Timestamp wizardry - clock ticks with flair! β°π© | |
def format_timestamp_prefix(): | |
return datetime.now().strftime("%Y%m%d_%H%M%S") | |
# Node naming - christening the beast! ππΌ | |
def get_node_name(): | |
parser = argparse.ArgumentParser(description='Start a chat node with a specific name') | |
parser.add_argument('--node-name', type=str, default=None) | |
parser.add_argument('--port', type=int, default=8501) | |
args = parser.parse_args() | |
username = st.session_state.get('username', 'System π') | |
log_action(username, "ππΌ - Node naming - christening the beast!") | |
return args.node_name or f"node-{uuid.uuid4().hex[:8]}", args.port | |
# Action logger - spying on deeds! π΅οΈπ | |
def log_action(username, action): | |
if 'action_log' not in st.session_state: | |
st.session_state.action_log = {} | |
user_log = st.session_state.action_log.setdefault(username, {}) | |
current_time = time.time() | |
user_log = {k: v for k, v in user_log.items() if current_time - v < 10} | |
st.session_state.action_log[username] = user_log | |
if action not in user_log: | |
with open(HISTORY_FILE, 'a') as f: | |
f.write(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {username}: {action}\n") | |
user_log[action] = current_time | |
# Clean text - strip the fancy stuff! π§Ήπ | |
def clean_text_for_tts(text): | |
# Remove Markdown formatting (e.g., #, *, [], ![]) | |
cleaned = re.sub(r'[#*!\[\]]+', '', text) | |
# Replace newlines with spaces and strip extra whitespace | |
cleaned = ' '.join(cleaned.split()) | |
# Ensure some text exists, max 200 chars to avoid edgeTTS limits | |
return cleaned[:200] if cleaned else "No text to speak" | |
# Chat saver - words locked tight! π¬π | |
async def save_chat_entry(username, message): | |
await asyncio.to_thread(log_action, username, "π¬π - Chat saver - words locked tight!") | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
entry = f"[{timestamp}] {username}: {message}" | |
await asyncio.to_thread(lambda: open(CHAT_FILE, 'a').write(f"{entry}\n")) | |
voice = FUN_USERNAMES.get(username, "en-US-AriaNeural") | |
cleaned_message = clean_text_for_tts(message) | |
audio_file = await async_edge_tts_generate(cleaned_message, voice) | |
if audio_file: | |
with open(HISTORY_FILE, 'a') as f: | |
f.write(f"[{timestamp}] {username}: Audio generated - {audio_file}\n") | |
# Chat loader - history unleashed! ππ | |
async def load_chat(): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "ππ - Chat loader - history unleashed!") | |
if not os.path.exists(CHAT_FILE): | |
await asyncio.to_thread(lambda: open(CHAT_FILE, 'a').write(f"# {START_ROOM} Chat\n\nWelcome to the cosmic hub - start chatting! π€\n")) | |
with open(CHAT_FILE, 'r') as f: | |
content = await asyncio.to_thread(f.read) | |
return content | |
# User lister - whoβs in the gang! π₯π | |
async def get_user_list(chat_content): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "π₯π - User lister - whoβs in the gang!") | |
users = set() | |
for line in chat_content.split('\n'): | |
if line.strip() and ': ' in line: | |
user = line.split(': ')[1].split(' ')[0] | |
users.add(user) | |
return sorted(list(users)) | |
# Join checker - been here before? πͺπ | |
async def has_joined_before(client_id, chat_content): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "πͺπ - Join checker - been here before?") | |
return any(f"Client-{client_id} has joined" in line for line in chat_content.split('\n')) | |
# Suggestion maker - old quips resurface! π‘π | |
async def get_message_suggestions(chat_content, prefix): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "π‘π - Suggestion maker - old quips resurface!") | |
lines = chat_content.split('\n') | |
messages = [line.split(': ', 1)[1] for line in lines if ': ' in line and line.strip()] | |
return [msg for msg in messages if msg.lower().startswith(prefix.lower())][:5] | |
# Vote saver - cheers recorded! ππ | |
async def save_vote(file, item, user_hash, username, comment=""): | |
await asyncio.to_thread(log_action, username, "ππ - Vote saver - cheers recorded!") | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
entry = f"[{timestamp}] {user_hash} voted for {item}" | |
await asyncio.to_thread(lambda: open(file, 'a').write(f"{entry}\n")) | |
await asyncio.to_thread(lambda: open(HISTORY_FILE, "a").write(f"- {timestamp} - User {user_hash} voted for {item}\n")) | |
chat_message = f"{username} upvoted: \"{item}\"" | |
if comment: | |
chat_message += f" - {comment}" | |
await save_chat_entry(username, chat_message) | |
# Vote counter - tallying the love! ππ | |
async def load_votes(file): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "ππ - Vote counter - tallying the love!") | |
if not os.path.exists(file): | |
await asyncio.to_thread(lambda: open(file, 'w').write("# Vote Tally\n\nNo votes yet - get clicking! π±οΈ\n")) | |
with open(file, 'r') as f: | |
content = await asyncio.to_thread(f.read) | |
lines = content.strip().split('\n')[2:] | |
votes = {} | |
user_votes = set() | |
for line in lines: | |
if line.strip() and 'voted for' in line: | |
user_hash = line.split('] ')[1].split(' voted for ')[0] | |
item = line.split('voted for ')[1] | |
vote_key = f"{user_hash}-{item}" | |
if vote_key not in user_votes: | |
votes[item] = votes.get(item, 0) + 1 | |
user_votes.add(vote_key) | |
return votes | |
# Hash generator - secret codes ahoy! ππ΅οΈ | |
async def generate_user_hash(): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "ππ΅οΈ - Hash generator - secret codes ahoy!") | |
if 'user_hash' not in st.session_state: | |
st.session_state.user_hash = hashlib.md5(str(random.getrandbits(128)).encode()).hexdigest()[:8] | |
return st.session_state.user_hash | |
# Audio maker - voices come alive! πΆπ | |
async def async_edge_tts_generate(text, voice, rate=0, pitch=0, file_format="mp3"): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "πΆπ - Audio maker - voices come alive!") | |
timestamp = format_timestamp_prefix() | |
filename = os.path.join(AUDIO_DIR, f"audio_{timestamp}_{random.randint(1000, 9999)}.mp3") | |
communicate = edge_tts.Communicate(text, voice, rate=f"{rate:+d}%", pitch=f"{pitch:+d}Hz") | |
try: | |
await communicate.save(filename) | |
return filename if os.path.exists(filename) else None | |
except edge_tts.exceptions.NoAudioReceived: | |
with open(HISTORY_FILE, 'a') as f: | |
f.write(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {username}: Audio failed - No audio received for '{text}'\n") | |
return None | |
# Audio player - tunes blast off! ππ | |
def play_and_download_audio(file_path): | |
if file_path and os.path.exists(file_path): | |
st.audio(file_path) | |
with open(file_path, "rb") as f: | |
b64 = base64.b64encode(f.read()).decode() | |
dl_link = f'<a href="data:audio/mpeg;base64,{b64}" download="{os.path.basename(file_path)}">π΅ Download {os.path.basename(file_path)}</a>' | |
st.markdown(dl_link, unsafe_allow_html=True) | |
# Image saver - pics preserved! πΈπΎ | |
async def save_pasted_image(image_data): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "πΈπΎ - Image saver - pics preserved!") | |
timestamp = format_timestamp_prefix() | |
filename = f"paste_{timestamp}.png" | |
filepath = os.path.join('./', filename) | |
if ',' in image_data: | |
image_data = image_data.split(',')[1] | |
img_bytes = base64.b64decode(image_data) | |
img = Image.open(io.BytesIO(img_bytes)) | |
await asyncio.to_thread(img.save, filepath, "PNG") | |
return filename | |
# Video renderer - movies roll! π₯π¬ | |
async def get_video_html(video_path, width="100%"): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "π₯π¬ - Video renderer - movies roll!") | |
video_url = f"data:video/mp4;base64,{base64.b64encode(await asyncio.to_thread(open, video_path, 'rb').read()).decode()}" | |
return f'<video width="{width}" controls autoplay muted loop><source src="{video_url}" type="video/mp4">Your browser does not support the video tag.</video>' | |
# Audio renderer - sounds soar! πΆβοΈ | |
async def get_audio_html(audio_path, width="100%"): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "πΆβοΈ - Audio renderer - sounds soar!") | |
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(await asyncio.to_thread(open, audio_path, 'rb').read()).decode()}" | |
return f'<audio controls style="width: {width};"><source src="{audio_url}" type="audio/mpeg">Your browser does not support the audio element.</audio>' | |
# Websocket handler - chat links up! ππ | |
async def websocket_handler(websocket, path): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "ππ - Websocket handler - chat links up!") | |
try: | |
client_id = str(uuid.uuid4()) | |
room_id = "chat" | |
st.session_state.active_connections.setdefault(room_id, {})[client_id] = websocket | |
chat_content = await load_chat() | |
username = st.session_state.get('username', random.choice(list(FUN_USERNAMES.keys()))) | |
if not await has_joined_before(client_id, chat_content): | |
await save_chat_entry(f"Client-{client_id}", f"{username} has joined {START_ROOM}!") | |
async for message in websocket: | |
parts = message.split('|', 1) | |
if len(parts) == 2: | |
username, content = parts | |
await save_chat_entry(username, content) | |
await broadcast_message(f"{username}|{content}", room_id) | |
except websockets.ConnectionClosed: | |
pass | |
finally: | |
if room_id in st.session_state.active_connections and client_id in st.session_state.active_connections[room_id]: | |
del st.session_state.active_connections[room_id][client_id] | |
# Message broadcaster - words fly far! π’βοΈ | |
async def broadcast_message(message, room_id): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "π’βοΈ - Message broadcaster - words fly far!") | |
if room_id in st.session_state.active_connections: | |
disconnected = [] | |
for client_id, ws in st.session_state.active_connections[room_id].items(): | |
try: | |
await ws.send(message) | |
except websockets.ConnectionClosed: | |
disconnected.append(client_id) | |
for client_id in disconnected: | |
del st.session_state.active_connections[room_id][client_id] | |
# Server starter - web spins up! π₯οΈπ | |
async def run_websocket_server(): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "π₯οΈπ - Server starter - web spins up!") | |
if not st.session_state.server_running: | |
server = await websockets.serve(websocket_handler, '0.0.0.0', 8765) | |
st.session_state.server_running = True | |
await server.wait_closed() | |
# Voice processor - speech to text! π€π | |
async def process_voice_input(audio_bytes): | |
username = st.session_state.get('username', 'System π') | |
await asyncio.to_thread(log_action, username, "π€π - Voice processor - speech to text!") | |
if audio_bytes: | |
text = "Voice input simulation" # Replace with actual speech-to-text logic | |
await save_chat_entry(username, text) | |
# Interface builder - UI takes shape! π¨ποΈ | |
def create_streamlit_interface(): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
async def async_interface(): | |
if 'username' not in st.session_state: | |
chat_content = await load_chat() | |
available_names = [name for name in FUN_USERNAMES if not any(f"{name} has joined" in line for line in chat_content.split('\n'))] | |
st.session_state.username = random.choice(available_names) if available_names else random.choice(list(FUN_USERNAMES.keys())) | |
if 'refresh_rate' not in st.session_state: | |
st.session_state.refresh_rate = 5 | |
if 'timer_start' not in st.session_state: | |
st.session_state.timer_start = time.time() | |
if 'quote_line' not in st.session_state: | |
st.session_state.quote_line = None | |
if 'pasted_image_data' not in st.session_state: | |
st.session_state.pasted_image_data = None | |
if 'message_text' not in st.session_state: | |
st.session_state.message_text = "" | |
if 'audio_cache' not in st.session_state: | |
st.session_state.audio_cache = {} | |
if 'chat_history' not in st.session_state: | |
st.session_state.chat_history = [] | |
st.markdown(""" | |
<style> | |
.chat-box {font-family: monospace; background: #1e1e1e; color: #d4d4d4; padding: 10px; border-radius: 5px; height: 300px; overflow-y: auto;} | |
.timer {font-size: 24px; color: #ffcc00; text-align: center; animation: pulse 1s infinite;} | |
@keyframes pulse {0% {transform: scale(1);} 50% {transform: scale(1.1);} 100% {transform: scale(1);}} | |
#paste-target {border: 2px dashed #ccc; padding: 20px; text-align: center; cursor: pointer;} | |
</style> | |
""", unsafe_allow_html=True) | |
st.title(f"π€π§ MMO {st.session_state.username}ππ¬") | |
st.markdown(f"Welcome to {START_ROOM} - chat, vote, upload, paste images, and enjoy quoting! π") | |
if not st.session_state.server_task: | |
st.session_state.server_task = loop.create_task(run_websocket_server()) | |
audio_bytes = audio_recorder() | |
if audio_bytes: | |
await process_voice_input(audio_bytes) | |
st.rerun() | |
st.subheader(f"{START_ROOM} Chat π¬") | |
chat_content = await load_chat() | |
chat_lines = chat_content.split('\n') | |
chat_votes = await load_votes(QUOTE_VOTES_FILE) | |
for i, line in enumerate(chat_lines): | |
if line.strip() and ': ' in line: | |
col1, col2, col3 = st.columns([4, 1, 1]) | |
with col1: | |
st.markdown(line) | |
username = line.split(': ')[1].split(' ')[0] | |
audio_file = None | |
cache_key = f"{line}_{FUN_USERNAMES.get(username, 'en-US-AriaNeural')}" | |
if cache_key in st.session_state.audio_cache: | |
audio_file = st.session_state.audio_cache[cache_key] | |
else: | |
cleaned_text = clean_text_for_tts(line.split(': ', 1)[1]) | |
audio_file = await async_edge_tts_generate(cleaned_text, FUN_USERNAMES.get(username, "en-US-AriaNeural")) | |
st.session_state.audio_cache[cache_key] = audio_file | |
if audio_file: | |
play_and_download_audio(audio_file) | |
with col2: | |
vote_count = chat_votes.get(line.split('. ')[1] if '. ' in line else line, 0) | |
if st.button(f"π {vote_count}", key=f"chat_vote_{i}"): | |
comment = st.session_state.message_text | |
await save_vote(QUOTE_VOTES_FILE, line.split('. ')[1] if '. ' in line else line, await generate_user_hash(), st.session_state.username, comment) | |
if st.session_state.pasted_image_data: | |
filename = await save_pasted_image(st.session_state.pasted_image_data) | |
if filename: | |
await save_chat_entry(st.session_state.username, f"Pasted image: {filename}") | |
st.session_state.pasted_image_data = None | |
st.session_state.message_text = '' | |
st.rerun() | |
with col3: | |
if st.button("π’ Quote", key=f"quote_{i}"): | |
st.session_state.quote_line = line | |
st.rerun() | |
if 'quote_line' in st.session_state: | |
st.markdown(f"### Quoting: {st.session_state.quote_line}") | |
quote_response = st.text_area("Add your response", key="quote_response") | |
if st.button("Send Quote π", key="send_quote"): | |
async def process_quote(): | |
await asyncio.to_thread(log_action, st.session_state.username, "π’π¬ - Quote processor - echoes resound!") | |
markdown_response = f"### Quote Response\n- **Original**: {st.session_state.quote_line}\n- **{st.session_state.username} Replies**: {quote_response}" | |
if st.session_state.pasted_image_data: | |
filename = await save_pasted_image(st.session_state.pasted_image_data) | |
if filename: | |
markdown_response += f"\n- **Image**: " | |
st.session_state.pasted_image_data = None | |
try: | |
await save_chat_entry(st.session_state.username, markdown_response) | |
except edge_tts.exceptions.NoAudioReceived: | |
# Log failure but continue without audio | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
with open(HISTORY_FILE, 'a') as f: | |
f.write(f"[{timestamp}] {st.session_state.username}: Quote saved without audio - No audio received\n") | |
await asyncio.to_thread(lambda: open(CHAT_FILE, 'a').write(f"[{timestamp}] {st.session_state.username}: {markdown_response}\n")) | |
loop.run_until_complete(process_quote()) | |
del st.session_state.quote_line | |
st.session_state.message_text = '' | |
st.rerun() | |
new_username = st.selectbox("Change Name", [""] + list(FUN_USERNAMES.keys()), index=0) | |
if new_username and new_username != st.session_state.username: | |
loop.run_until_complete(save_chat_entry("System π", f"{st.session_state.username} changed name to {new_username}")) | |
st.session_state.username = new_username | |
st.rerun() | |
message = st.text_input(f"Message as {st.session_state.username}", key="message_input", value=st.session_state.message_text, on_change=lambda: st.session_state.update(message_text=st.session_state.message_input)) | |
if st.button("Send π", key="send_button") and message.strip(): | |
loop.run_until_complete(save_chat_entry(st.session_state.username, message)) | |
if st.session_state.pasted_image_data: | |
filename = loop.run_until_complete(save_pasted_image(st.session_state.pasted_image_data)) | |
if filename: | |
loop.run_until_complete(save_chat_entry(st.session_state.username, f"Pasted image: {filename}")) | |
st.session_state.pasted_image_data = None | |
st.session_state.message_text = '' | |
st.rerun() | |
components.html( | |
""" | |
<div id="paste-target">Paste an image here (Ctrl+V)</div> | |
<script> | |
const pasteTarget = document.getElementById('paste-target'); | |
pasteTarget.addEventListener('paste', (event) => { | |
const items = (event.clipboardData || window.clipboardData).items; | |
for (let i = 0; i < items.length; i++) { | |
if (items[i].type.indexOf('image') !== -1) { | |
const blob = items[i].getAsFile(); | |
const reader = new FileReader(); | |
reader.onload = (e) => { | |
window.parent.postMessage({ | |
type: 'streamlit:setComponentValue', | |
value: e.target.result | |
}, '*'); | |
pasteTarget.innerHTML = '<p>Image pasted! Processing...</p>'; | |
}; | |
reader.readAsDataURL(blob); | |
} | |
} | |
event.preventDefault(); | |
}); | |
</script> | |
""", | |
height=100 | |
) | |
st.subheader("Media Gallery π¨πΆπ₯") | |
uploaded_file = st.file_uploader("Upload Media", type=['png', 'jpg', 'mp3', 'mp4']) | |
if uploaded_file: | |
file_path = os.path.join('./', uploaded_file.name) | |
await asyncio.to_thread(lambda: open(file_path, 'wb').write(uploaded_file.getbuffer())) | |
st.success(f"Uploaded {uploaded_file.name}") | |
media_files = glob.glob("./*.png") + glob.glob("./*.jpg") + glob.glob("./*.mp3") + glob.glob("./*.mp4") | |
if media_files: | |
cols = st.columns(3) | |
media_votes = loop.run_until_complete(load_votes(MEDIA_VOTES_FILE)) | |
for idx, media_file in enumerate(media_files): | |
vote_count = media_votes.get(media_file, 0) | |
if vote_count > 0: | |
with cols[idx % 3]: | |
if media_file.endswith(('.png', '.jpg')): | |
st.image(media_file, use_container_width=True) | |
elif media_file.endswith('.mp3'): | |
st.markdown(loop.run_until_complete(get_audio_html(media_file)), unsafe_allow_html=True) | |
elif media_file.endswith('.mp4'): | |
st.markdown(loop.run_until_complete(get_video_html(media_file)), unsafe_allow_html=True) | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button(f"π {vote_count}", key=f"media_vote_{idx}"): | |
comment = st.session_state.message_text | |
loop.run_until_complete(save_vote(MEDIA_VOTES_FILE, media_file, await generate_user_hash(), st.session_state.username, comment)) | |
if st.session_state.pasted_image_data: | |
filename = loop.run_until_complete(save_pasted_image(st.session_state.pasted_image_data)) | |
if filename: | |
loop.run_until_complete(save_chat_entry(st.session_state.username, f"Pasted image: {filename}")) | |
st.session_state.pasted_image_data = None | |
st.session_state.message_text = '' | |
st.rerun() | |
with col2: | |
if st.button("ποΈ", key=f"media_delete_{idx}"): | |
await asyncio.to_thread(os.remove, media_file) | |
st.rerun() | |
st.subheader("Refresh β³") | |
refresh_rate = st.slider("Refresh Rate", 1, 300, st.session_state.refresh_rate) | |
st.session_state.refresh_rate = refresh_rate | |
timer_placeholder = st.empty() | |
for i in range(st.session_state.refresh_rate, -1, -1): | |
font_name, font_func = random.choice(UNICODE_FONTS) | |
countdown_str = "".join(UNICODE_DIGITS[int(d)] for d in str(i)) if i < 10 else font_func(str(i)) | |
timer_placeholder.markdown(f"<p class='timer'>β³ {font_func('Refresh in:')} {countdown_str}</p>", unsafe_allow_html=True) | |
loop.run_until_complete(asyncio.sleep(1)) | |
st.rerun() | |
st.sidebar.subheader("Chat History π") | |
with open(HISTORY_FILE, 'r') as f: | |
history_content = f.read() | |
st.sidebar.markdown(history_content) | |
loop.run_until_complete(async_interface()) | |
# Main execution - letβs roll! π²π | |
def main(): | |
NODE_NAME, port = get_node_name() | |
create_streamlit_interface() | |
if __name__ == "__main__": | |
main() |