File size: 9,577 Bytes
1a3fc6f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ba78468
 
 
 
 
1a3fc6f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
from operator import itemgetter
import os
from urllib import parse
from pprint import pformat
import socketio
import time
import logging
from starlette.applications import Starlette
from starlette.routing import Mount, Route
from starlette.staticfiles import StaticFiles
from dotenv import load_dotenv

load_dotenv()

from src.auth import google_auth_check
from src.client import Client
from src.context import ContextManager
from src.transcriber import Transcriber

from src.simuleval_agent_directory import NoAvailableAgentException
from src.simuleval_agent_directory import SimulevalAgentDirectory
from src.simuleval_transcoder import SimulevalTranscoder
from src.transcoder_helpers import get_transcoder_output_events
from src.logging import (
    initialize_logger,
    catch_and_log_exceptions_for_sio_event_handlers,
)

logger = initialize_logger(__name__, level=logging.WARNING)
print("=" * 20 + " ⭐️ Starting Server... ⭐️ " + "=" * 20)

sio = socketio.AsyncServer(
    async_mode="asgi",
    cors_allowed_origins="*",
    logger=logger,
    # engineio_logger=logger,
)
socketio_app = socketio.ASGIApp(sio)

app_routes = [
    Mount("/ws", app=socketio_app),
]
app = Starlette(debug=True, routes=app_routes)

# Specify specific models to load (some environments have issues loading multiple models)
# See AgentWithInfo with JSON format details.
models_override = os.environ.get("MODELS_OVERRIDE")

available_agents = SimulevalAgentDirectory()
logger.info("Building and adding agents...")
if models_override is not None:
    logger.info(f"MODELS_OVERRIDE supplied from env vars: {models_override}")
available_agents.build_and_add_agents(models_override)

agents_capabilities_for_json = available_agents.get_agents_capabilities_list_for_json()


clients = {}


@sio.on("connect")
@catch_and_log_exceptions_for_sio_event_handlers(logger, sio)
async def connect(sid, environ):
    logger.info(f"📥 [event: connected] sid={sid}")

    # TODO: Sanitize/validate query param input
    query_params = dict(parse.parse_qsl(environ["QUERY_STRING"]))
    client_id = query_params.get("clientID")
    token = query_params.get("token")

    # if google_auth_check(token) is None:
    #     await sio.emit("auth_error", "Not authenticated", to=sid)
    #     logger.info("Invalid auth token, Disconnecting...")
    #     await sio.disconnect(sid)
    #     return

    logger.debug(f"query_params:\n{pformat(query_params)}")

    if client_id is None:
        logger.info("No clientID provided. Disconnecting...")
        await sio.disconnect(sid)
        return

    clients[sid] = Client(client_id)


@sio.on("*")
async def catch_all(event, sid, data):
    logger.info(f"[unhandled event: {event}] sid={sid} data={data}")


@sio.event
@catch_and_log_exceptions_for_sio_event_handlers(logger, sio)
async def configure_stream(sid, config):
    client_obj = clients[sid]
    logger.warning(sid)

    if client_obj is None:
        logger.error(f"No client object for {sid}")
        await sio.disconnect(sid)
        return {"status": "error", "message": "member_or_room_is_none"}

    debug = config.get("debug")
    async_processing = config.get("async_processing")
    manual_transcribe = config.get("manual_transcribe")
    client_obj.manual_transcribe = manual_transcribe

    if manual_transcribe:
        client_obj.transcriber = Transcriber()
        client_obj.transcriber.start()
    else:
        # Currently s2s, s2t or s2s&t
        model_type = config.get("model_type")
        client_obj.requested_output_type = model_type

        model_name = config.get("model_name")

        try:
            agent = available_agents.get_agent_or_throw(model_name)
        except NoAvailableAgentException as e:
            logger.warn(f"Error while getting agent: {e}")
            await sio.disconnect(sid)
            return {"status": "error", "message": str(e)}

        if client_obj.transcoder:
            logger.warn(
                "Member already has a transcoder configured. Closing it, and overwriting with a new transcoder..."
            )
            client_obj.transcoder.close = True

        t0 = time.time()
        try:
            client_obj.transcoder = SimulevalTranscoder(
                agent,
                config["rate"],
                debug=debug,
                buffer_limit=int(config["buffer_limit"]),
            )
        except Exception as e:
            logger.warn(f"Got exception while initializing agents: {e}")
            await sio.disconnect(sid)
            return {"status": "error", "message": str(e)}

        t1 = time.time()
        logger.debug(f"Booting up VAD and transcoder took {t1-t0} sec")

        # TODO: if async_processing is false, then we need to run transcoder.process_pipeline_once() whenever we receive audio, or at some other sensible interval
        if async_processing:
            client_obj.transcoder.start()

    client_obj.context = ContextManager()
    return {"status": "ok", "message": "server_ready"}


@sio.on("set_dynamic_config")
@catch_and_log_exceptions_for_sio_event_handlers(logger, sio)
async def set_dynamic_config(
    sid,
    partial_config,
):
    client_obj = clients[sid]

    if client_obj is None:
        logger.error(f"No client object for {sid}")
        await sio.disconnect(sid)
        return {"status": "error", "message": "member_or_room_is_none"}

    new_dynamic_config = {
        **(client_obj.transcoder_dynamic_config or {}),
        **partial_config,
    }
    logger.info(
        f"[set_dynamic_config] Setting new dynamic config:\n\n{pformat(new_dynamic_config)}\n"
    )

    client_obj.transcoder_dynamic_config = new_dynamic_config

    if client_obj.context:
        client_obj.context.set_language(partial_config["targetLanguage"])

    # TODO set transcriber language

    return {"status": "ok", "message": "dynamic_config_set"}


@sio.event
async def incoming_audio(sid, blob):
    client_obj = clients[sid]

    if client_obj is None:
        logger.error(f"No client object for {sid}")
        await sio.disconnect(sid)
        return {"status": "error", "message": "member_or_room_is_none"}

    if client_obj.manual_transcribe:
        client_obj.transcriber.send_audio(blob)
    else:
        # NOTE: bytes and bytearray are very similar, but bytes is immutable, and is what is returned by socketio
        if not isinstance(blob, bytes):
            logger.error(
                f"[incoming_audio] Received audio from {sid}, but it was not of type `bytes`. type(blob) = {type(blob)}"
            )
            return

        if client_obj.transcoder is None:
            logger.error(
                f"[incoming_audio] Received audio from {sid}, but no transcoder configured to process it (member.transcoder is None). This should not happen."
            )
            return

        client_obj.transcoder.process_incoming_bytes(
            blob, dynamic_config=client_obj.transcoder_dynamic_config
        )

        # Send back any available model output
        # NOTE: In theory it would make sense remove this from the incoming_audio handler and
        # handle this in a dedicated thread that checks for output and sends it right away,
        # but in practice for our limited demo use cases this approach didn't add noticeable
        # latency, so we're keeping it simple for now.
        events = get_transcoder_output_events(client_obj.transcoder)
        logger.debug(f"[incoming_audio] transcoder output events: {len(events)}")

        if len(events) == 0:
            logger.debug("[incoming_audio] No transcoder output to send")
        else:
            for e in events:
                if e[
                    "event"
                ] == "translation_speech" and client_obj.requested_output_type in [
                    "s2s",
                    "s2s&t",
                ]:
                    logger.debug("[incoming_audio] Sending translation_speech event")
                    await sio.emit("translation_speech", e, room=sid)
                elif e[
                    "event"
                ] == "translation_text" and client_obj.requested_output_type in [
                    "s2t",
                    "s2s&t",
                ]:
                    logger.debug("[incoming_audio] Sending translation_text event")
                    await sio.emit("translation_text", e, room=sid)
                    client_obj.context.add_text_chunk(e["payload"])
                else:
                    logger.error(
                        f"[incoming_audio] Unexpected event type: {e['event']}"
                    )
    new_context = client_obj.context.get_current_context()
    if new_context:
        await sio.emit(
            "context",
            {"event": "context", "payload": new_context},
            room=sid,
        )
    return


@sio.event
async def stop_stream(sid):
    client_obj = clients[sid]

    if client_obj is None:
        logger.error(f"No client object for {sid}")
        await sio.disconnect(sid)
        return {"status": "error", "message": "member_or_room_is_none"}

    if client_obj.transcoder:
        client_obj.transcoder.close = True
        client_obj.transcoder = None

    if client_obj.transcriber:
        client_obj.transcriber.close_connection()


@sio.event
async def disconnect(sid):
    client_obj = clients[sid]
    if client_obj is None:
        return

    if client_obj.transcriber:
        client_obj.transcriber.stop()

    if client_obj.transcoder:
        client_obj.transcoder.close = True
        client_obj.transcoder = None

    del clients[sid]