File size: 9,577 Bytes
1a3fc6f ba78468 1a3fc6f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
from operator import itemgetter
import os
from urllib import parse
from pprint import pformat
import socketio
import time
import logging
from starlette.applications import Starlette
from starlette.routing import Mount, Route
from starlette.staticfiles import StaticFiles
from dotenv import load_dotenv
load_dotenv()
from src.auth import google_auth_check
from src.client import Client
from src.context import ContextManager
from src.transcriber import Transcriber
from src.simuleval_agent_directory import NoAvailableAgentException
from src.simuleval_agent_directory import SimulevalAgentDirectory
from src.simuleval_transcoder import SimulevalTranscoder
from src.transcoder_helpers import get_transcoder_output_events
from src.logging import (
initialize_logger,
catch_and_log_exceptions_for_sio_event_handlers,
)
logger = initialize_logger(__name__, level=logging.WARNING)
print("=" * 20 + " ⭐️ Starting Server... ⭐️ " + "=" * 20)
sio = socketio.AsyncServer(
async_mode="asgi",
cors_allowed_origins="*",
logger=logger,
# engineio_logger=logger,
)
socketio_app = socketio.ASGIApp(sio)
app_routes = [
Mount("/ws", app=socketio_app),
]
app = Starlette(debug=True, routes=app_routes)
# Specify specific models to load (some environments have issues loading multiple models)
# See AgentWithInfo with JSON format details.
models_override = os.environ.get("MODELS_OVERRIDE")
available_agents = SimulevalAgentDirectory()
logger.info("Building and adding agents...")
if models_override is not None:
logger.info(f"MODELS_OVERRIDE supplied from env vars: {models_override}")
available_agents.build_and_add_agents(models_override)
agents_capabilities_for_json = available_agents.get_agents_capabilities_list_for_json()
clients = {}
@sio.on("connect")
@catch_and_log_exceptions_for_sio_event_handlers(logger, sio)
async def connect(sid, environ):
logger.info(f"📥 [event: connected] sid={sid}")
# TODO: Sanitize/validate query param input
query_params = dict(parse.parse_qsl(environ["QUERY_STRING"]))
client_id = query_params.get("clientID")
token = query_params.get("token")
# if google_auth_check(token) is None:
# await sio.emit("auth_error", "Not authenticated", to=sid)
# logger.info("Invalid auth token, Disconnecting...")
# await sio.disconnect(sid)
# return
logger.debug(f"query_params:\n{pformat(query_params)}")
if client_id is None:
logger.info("No clientID provided. Disconnecting...")
await sio.disconnect(sid)
return
clients[sid] = Client(client_id)
@sio.on("*")
async def catch_all(event, sid, data):
logger.info(f"[unhandled event: {event}] sid={sid} data={data}")
@sio.event
@catch_and_log_exceptions_for_sio_event_handlers(logger, sio)
async def configure_stream(sid, config):
client_obj = clients[sid]
logger.warning(sid)
if client_obj is None:
logger.error(f"No client object for {sid}")
await sio.disconnect(sid)
return {"status": "error", "message": "member_or_room_is_none"}
debug = config.get("debug")
async_processing = config.get("async_processing")
manual_transcribe = config.get("manual_transcribe")
client_obj.manual_transcribe = manual_transcribe
if manual_transcribe:
client_obj.transcriber = Transcriber()
client_obj.transcriber.start()
else:
# Currently s2s, s2t or s2s&t
model_type = config.get("model_type")
client_obj.requested_output_type = model_type
model_name = config.get("model_name")
try:
agent = available_agents.get_agent_or_throw(model_name)
except NoAvailableAgentException as e:
logger.warn(f"Error while getting agent: {e}")
await sio.disconnect(sid)
return {"status": "error", "message": str(e)}
if client_obj.transcoder:
logger.warn(
"Member already has a transcoder configured. Closing it, and overwriting with a new transcoder..."
)
client_obj.transcoder.close = True
t0 = time.time()
try:
client_obj.transcoder = SimulevalTranscoder(
agent,
config["rate"],
debug=debug,
buffer_limit=int(config["buffer_limit"]),
)
except Exception as e:
logger.warn(f"Got exception while initializing agents: {e}")
await sio.disconnect(sid)
return {"status": "error", "message": str(e)}
t1 = time.time()
logger.debug(f"Booting up VAD and transcoder took {t1-t0} sec")
# TODO: if async_processing is false, then we need to run transcoder.process_pipeline_once() whenever we receive audio, or at some other sensible interval
if async_processing:
client_obj.transcoder.start()
client_obj.context = ContextManager()
return {"status": "ok", "message": "server_ready"}
@sio.on("set_dynamic_config")
@catch_and_log_exceptions_for_sio_event_handlers(logger, sio)
async def set_dynamic_config(
sid,
partial_config,
):
client_obj = clients[sid]
if client_obj is None:
logger.error(f"No client object for {sid}")
await sio.disconnect(sid)
return {"status": "error", "message": "member_or_room_is_none"}
new_dynamic_config = {
**(client_obj.transcoder_dynamic_config or {}),
**partial_config,
}
logger.info(
f"[set_dynamic_config] Setting new dynamic config:\n\n{pformat(new_dynamic_config)}\n"
)
client_obj.transcoder_dynamic_config = new_dynamic_config
if client_obj.context:
client_obj.context.set_language(partial_config["targetLanguage"])
# TODO set transcriber language
return {"status": "ok", "message": "dynamic_config_set"}
@sio.event
async def incoming_audio(sid, blob):
client_obj = clients[sid]
if client_obj is None:
logger.error(f"No client object for {sid}")
await sio.disconnect(sid)
return {"status": "error", "message": "member_or_room_is_none"}
if client_obj.manual_transcribe:
client_obj.transcriber.send_audio(blob)
else:
# NOTE: bytes and bytearray are very similar, but bytes is immutable, and is what is returned by socketio
if not isinstance(blob, bytes):
logger.error(
f"[incoming_audio] Received audio from {sid}, but it was not of type `bytes`. type(blob) = {type(blob)}"
)
return
if client_obj.transcoder is None:
logger.error(
f"[incoming_audio] Received audio from {sid}, but no transcoder configured to process it (member.transcoder is None). This should not happen."
)
return
client_obj.transcoder.process_incoming_bytes(
blob, dynamic_config=client_obj.transcoder_dynamic_config
)
# Send back any available model output
# NOTE: In theory it would make sense remove this from the incoming_audio handler and
# handle this in a dedicated thread that checks for output and sends it right away,
# but in practice for our limited demo use cases this approach didn't add noticeable
# latency, so we're keeping it simple for now.
events = get_transcoder_output_events(client_obj.transcoder)
logger.debug(f"[incoming_audio] transcoder output events: {len(events)}")
if len(events) == 0:
logger.debug("[incoming_audio] No transcoder output to send")
else:
for e in events:
if e[
"event"
] == "translation_speech" and client_obj.requested_output_type in [
"s2s",
"s2s&t",
]:
logger.debug("[incoming_audio] Sending translation_speech event")
await sio.emit("translation_speech", e, room=sid)
elif e[
"event"
] == "translation_text" and client_obj.requested_output_type in [
"s2t",
"s2s&t",
]:
logger.debug("[incoming_audio] Sending translation_text event")
await sio.emit("translation_text", e, room=sid)
client_obj.context.add_text_chunk(e["payload"])
else:
logger.error(
f"[incoming_audio] Unexpected event type: {e['event']}"
)
new_context = client_obj.context.get_current_context()
if new_context:
await sio.emit(
"context",
{"event": "context", "payload": new_context},
room=sid,
)
return
@sio.event
async def stop_stream(sid):
client_obj = clients[sid]
if client_obj is None:
logger.error(f"No client object for {sid}")
await sio.disconnect(sid)
return {"status": "error", "message": "member_or_room_is_none"}
if client_obj.transcoder:
client_obj.transcoder.close = True
client_obj.transcoder = None
if client_obj.transcriber:
client_obj.transcriber.close_connection()
@sio.event
async def disconnect(sid):
client_obj = clients[sid]
if client_obj is None:
return
if client_obj.transcriber:
client_obj.transcriber.stop()
if client_obj.transcoder:
client_obj.transcoder.close = True
client_obj.transcoder = None
del clients[sid]
|