const isBuffer = require("is-buffer"); const { Buffer } = require("buffer"); const {deepgram_key} = require("../config"); const Session = require("../utils/session.js"); const { cleanupConnection } = require("./cleangRPCconnections.js"); const { getgRPCConnection } = require("./makegRPCconnection.js"); const { updateChathistory } = require("../providers/updateChathistory.js"); const { createClient, LiveTranscriptionEvents } = require("@deepgram/sdk"); const deepgram = createClient(deepgram_key); const audio_stream = async (wss, req) => { try { const session = new Session(); wss.send(JSON.stringify({ type: "initial", msg: "connected" })); const connection = deepgram.listen.live({ punctuate: true, interim_results: true, speech_final: true, encoding: "linear16", sample_rate: 16000, model: "nova-2", speech_final: true, version: "latest", }); const callMLServer = async (text) => { try { session.call.write({ text: text }); } catch (error) { console.error("Error in calling ml server : ", error); } } connection.on(LiveTranscriptionEvents.Open, () => { console.log(LiveTranscriptionEvents.Open); connection.on(LiveTranscriptionEvents.Close, () => { console.log("Connection closed."); }); connection.on(LiveTranscriptionEvents.Transcript, (data) => { const text = data?.channel?.alternatives[0]?.transcript; // console.log("Response : ", text); if (data.is_final && data.speech_final && text) { console.log("Response : ", text); callMLServer(text); } }); connection.on(LiveTranscriptionEvents.Metadata, (data) => { console.log(data); }); connection.on(LiveTranscriptionEvents.Error, (err) => { console.error(err); }); }); wss.on("message", async (message) => { try { if (isBuffer(message) && session.call) { try { const audioChunk = { audio_data: message, }; try { if (connection && connection.getReadyState() == 1) { connection.send(message); } } catch (error) { console.log("Error sending buffer to deepgram : ", error); } } catch (err) { console.error("Error writing to stream: ", err); } } // Handle message not of typeof buffer if (typeof message === "string") { try { const data = JSON.parse(message); const { type, msg } = data; switch (type) { case "start": session.starttime = Date.now(); session.chathistory = []; session.chathistorybackup = []; console.log("Making Connection with gRPC..."); try { console.time("grpcconnection"); session.call = await getgRPCConnection(session); console.timeEnd("grpcconnection"); const state = session.channel.getConnectivityState(false); console.log(`Client : ${state}`); session.saved = false; wss.send(JSON.stringify({ type: "ready", msg: "connected" })); console.log("Connected to gRPC."); const { sessionId, } = JSON.parse(msg); const metadata = { metadata: { session_id: sessionId, }, }; if (session.call) { console.log("Sending metadata.") session.call.write(metadata); } } catch (err) { await cleanupConnection(session); console.error("Error in making gRPC Connection. : ", err); } session.call.on("data", (response) => { console.log("Data : ", response); const {session_id , sequence_id , transcript , buffer} = response; const metadata = JSON.stringify({ session_id: session_id, sequence_id: sequence_id, transcript: transcript, }); if (sequence_id === "-2") { session.latency = Date.now(); wss.send(JSON.stringify({ type: "clear", msg: "clear" })); session.chathistory = [...session.chathistorybackup]; wss.send( JSON.stringify({ type: "chathistory", msg: session.chathistorybackup, }) ); const wavBuffer = Buffer.concat([ Buffer.from(metadata), Buffer.from([0]), buffer, ]); const base64buffer = wavBuffer.toString("base64"); wss.send( JSON.stringify({ type: "media", msg: base64buffer }) ); session.chathistory.push({ speaker: "USER", content: transcript, }); wss.send( JSON.stringify({ type: "chathistory", msg: session.chathistory, }) ); session.chathistorybackup.push({ speaker: "USER", content: transcript, }); return; } if (sequence_id === "0") { wss.send(JSON.stringify({ type: "pause", msg: "pause" })); session.cansend = false; return; } if (sequence_id === "-1") { wss.send( JSON.stringify({ type: "continue", msg: "continue" }) ); return; } if (sequence_id === "1") { const latency = Date.now() - session.latency; console.log("First Response Latency: ", latency, "ms"); session.latency = 0; // wss.send(JSON.stringify({ type: "clear", msg: "clear" })); session.cansend = true; } if (!buffer) { return; } if (!session.cansend && sequence_id !== "0") { return; } // Combine header and PCM data into a single Buffer const wavBuffer = Buffer.concat([ Buffer.from(metadata), Buffer.from([0]), buffer, ]); const base64buffer = wavBuffer.toString("base64"); wss.send( JSON.stringify({ type: "media", msg: base64buffer }) ); updateChathistory(transcript, false, session); wss.send( JSON.stringify({ type: "chathistory", msg: session.chathistory, }) ); }); session.call.on("end", async () => { console.log("Ended"); await cleanupConnection(session); try { wss.send(JSON.stringify({ type: "end", msg: "end" })); } catch (err) { } console.log("Stream ended"); }); session.call.on("error", async (error) => { console.error(`Stream error: ${error}`); try { wss.send(JSON.stringify({ type: "end", msg: "end" })); } catch (err) { } await cleanupConnection(session); }); break; case "status": const { session_id, sequence_id, transcript } = msg; const status = { status: { transcript : transcript, played_seq: sequence_id, interrupt_seq: sequence_id, }, }; if (session.call) { session.call.write(status); } updateChathistory(transcript, true, session); break; case "stop": console.log("Client Stoped the stream."); await cleanupConnection(session); break; default: console.log("Type not handled."); } } catch (err) { console.log(`Not a valid json : ${err}`); } } } catch (err) { console.error(`Error in wss.onmessage : ${err}`); } }); wss.on("close", async () => { await cleanupConnection(session); console.log("WebSocket connection closed."); }); wss.on("error", async (err) => { console.error(`WebSocket error: ${err}`); await cleanupConnection(session); }); } catch (err) { try { console.log(err) wss.send(JSON.stringify({ type: "end", msg: "end" })); } catch (err) { } } }; module.exports = { audio_stream };