from operator import itemgetter import os from urllib import parse from pprint import pformat import socketio import time import logging from starlette.applications import Starlette from starlette.routing import Mount, Route from starlette.staticfiles import StaticFiles from dotenv import load_dotenv load_dotenv() from src.auth import google_auth_check from src.client import Client from src.context import ContextManager from src.transcriber import Transcriber from src.simuleval_agent_directory import NoAvailableAgentException from src.simuleval_agent_directory import SimulevalAgentDirectory from src.simuleval_transcoder import SimulevalTranscoder from src.transcoder_helpers import get_transcoder_output_events from src.logging import ( initialize_logger, catch_and_log_exceptions_for_sio_event_handlers, ) logger = initialize_logger(__name__, level=logging.WARNING) print("=" * 20 + " ⭐️ Starting Server... ⭐️ " + "=" * 20) sio = socketio.AsyncServer( async_mode="asgi", cors_allowed_origins="*", logger=logger, # engineio_logger=logger, ) socketio_app = socketio.ASGIApp(sio) app_routes = [ Mount("/ws", app=socketio_app), ] app = Starlette(debug=True, routes=app_routes) # Specify specific models to load (some environments have issues loading multiple models) # See AgentWithInfo with JSON format details. models_override = os.environ.get("MODELS_OVERRIDE") available_agents = SimulevalAgentDirectory() logger.info("Building and adding agents...") if models_override is not None: logger.info(f"MODELS_OVERRIDE supplied from env vars: {models_override}") available_agents.build_and_add_agents(models_override) agents_capabilities_for_json = available_agents.get_agents_capabilities_list_for_json() clients = {} @sio.on("connect") @catch_and_log_exceptions_for_sio_event_handlers(logger, sio) async def connect(sid, environ): logger.info(f"📥 [event: connected] sid={sid}") # TODO: Sanitize/validate query param input query_params = dict(parse.parse_qsl(environ["QUERY_STRING"])) client_id = query_params.get("clientID") token = query_params.get("token") # if google_auth_check(token) is None: # await sio.emit("auth_error", "Not authenticated", to=sid) # logger.info("Invalid auth token, Disconnecting...") # await sio.disconnect(sid) # return logger.debug(f"query_params:\n{pformat(query_params)}") if client_id is None: logger.info("No clientID provided. Disconnecting...") await sio.disconnect(sid) return clients[sid] = Client(client_id) @sio.on("*") async def catch_all(event, sid, data): logger.info(f"[unhandled event: {event}] sid={sid} data={data}") @sio.event @catch_and_log_exceptions_for_sio_event_handlers(logger, sio) async def configure_stream(sid, config): client_obj = clients[sid] logger.warning(sid) if client_obj is None: logger.error(f"No client object for {sid}") await sio.disconnect(sid) return {"status": "error", "message": "member_or_room_is_none"} debug = config.get("debug") async_processing = config.get("async_processing") manual_transcribe = config.get("manual_transcribe") client_obj.manual_transcribe = manual_transcribe if manual_transcribe: client_obj.transcriber = Transcriber() client_obj.transcriber.start() else: # Currently s2s, s2t or s2s&t model_type = config.get("model_type") client_obj.requested_output_type = model_type model_name = config.get("model_name") try: agent = available_agents.get_agent_or_throw(model_name) except NoAvailableAgentException as e: logger.warn(f"Error while getting agent: {e}") await sio.disconnect(sid) return {"status": "error", "message": str(e)} if client_obj.transcoder: logger.warn( "Member already has a transcoder configured. Closing it, and overwriting with a new transcoder..." ) client_obj.transcoder.close = True t0 = time.time() try: client_obj.transcoder = SimulevalTranscoder( agent, config["rate"], debug=debug, buffer_limit=int(config["buffer_limit"]), ) except Exception as e: logger.warn(f"Got exception while initializing agents: {e}") await sio.disconnect(sid) return {"status": "error", "message": str(e)} t1 = time.time() logger.debug(f"Booting up VAD and transcoder took {t1-t0} sec") # TODO: if async_processing is false, then we need to run transcoder.process_pipeline_once() whenever we receive audio, or at some other sensible interval if async_processing: client_obj.transcoder.start() client_obj.context = ContextManager() return {"status": "ok", "message": "server_ready"} @sio.on("set_dynamic_config") @catch_and_log_exceptions_for_sio_event_handlers(logger, sio) async def set_dynamic_config( sid, partial_config, ): client_obj = clients[sid] if client_obj is None: logger.error(f"No client object for {sid}") await sio.disconnect(sid) return {"status": "error", "message": "member_or_room_is_none"} new_dynamic_config = { **(client_obj.transcoder_dynamic_config or {}), **partial_config, } logger.info( f"[set_dynamic_config] Setting new dynamic config:\n\n{pformat(new_dynamic_config)}\n" ) client_obj.transcoder_dynamic_config = new_dynamic_config if client_obj.context: client_obj.context.set_language(partial_config["targetLanguage"]) # TODO set transcriber language return {"status": "ok", "message": "dynamic_config_set"} @sio.event async def incoming_audio(sid, blob): client_obj = clients[sid] if client_obj is None: logger.error(f"No client object for {sid}") await sio.disconnect(sid) return {"status": "error", "message": "member_or_room_is_none"} if client_obj.manual_transcribe: client_obj.transcriber.send_audio(blob) else: # NOTE: bytes and bytearray are very similar, but bytes is immutable, and is what is returned by socketio if not isinstance(blob, bytes): logger.error( f"[incoming_audio] Received audio from {sid}, but it was not of type `bytes`. type(blob) = {type(blob)}" ) return if client_obj.transcoder is None: logger.error( f"[incoming_audio] Received audio from {sid}, but no transcoder configured to process it (member.transcoder is None). This should not happen." ) return client_obj.transcoder.process_incoming_bytes( blob, dynamic_config=client_obj.transcoder_dynamic_config ) # Send back any available model output # NOTE: In theory it would make sense remove this from the incoming_audio handler and # handle this in a dedicated thread that checks for output and sends it right away, # but in practice for our limited demo use cases this approach didn't add noticeable # latency, so we're keeping it simple for now. events = get_transcoder_output_events(client_obj.transcoder) logger.debug(f"[incoming_audio] transcoder output events: {len(events)}") if len(events) == 0: logger.debug("[incoming_audio] No transcoder output to send") else: for e in events: if e[ "event" ] == "translation_speech" and client_obj.requested_output_type in [ "s2s", "s2s&t", ]: logger.debug("[incoming_audio] Sending translation_speech event") await sio.emit("translation_speech", e, room=sid) elif e[ "event" ] == "translation_text" and client_obj.requested_output_type in [ "s2t", "s2s&t", ]: logger.debug("[incoming_audio] Sending translation_text event") await sio.emit("translation_text", e, room=sid) client_obj.context.add_text_chunk(e["payload"]) else: logger.error( f"[incoming_audio] Unexpected event type: {e['event']}" ) new_context = client_obj.context.get_current_context() if new_context: await sio.emit( "context", {"event": "context", "payload": new_context}, room=sid, ) return @sio.event async def stop_stream(sid): client_obj = clients[sid] if client_obj is None: logger.error(f"No client object for {sid}") await sio.disconnect(sid) return {"status": "error", "message": "member_or_room_is_none"} if client_obj.transcoder: client_obj.transcoder.close = True client_obj.transcoder = None if client_obj.transcriber: client_obj.transcriber.close_connection() @sio.event async def disconnect(sid): client_obj = clients[sid] if client_obj is None: return if client_obj.transcriber: client_obj.transcriber.stop() if client_obj.transcoder: client_obj.transcoder.close = True client_obj.transcoder = None del clients[sid]