VocRT / backend /handle-realtime-tts /sttModelSocket_whisper.js
Anurag
version-2 initial version
5306da4
const isBuffer = require("is-buffer");
const { Buffer } = require("buffer");
const Session = require("../utils/session.js");
const { cleanupConnection } = require("./cleangRPCconnections.js");
const { getgRPCConnection } = require("./makegRPCconnection.js");
const { updateChathistory } = require("../providers/updateChathistory.js");
const audio_stream = async (wss, req) => {
try {
const session = new Session();
wss.send(JSON.stringify({ type: "initial", msg: "connected" }));
wss.on("message", async (message) => {
try {
if (isBuffer(message) && session.call) {
try {
const audio_message = {
audio_data: {
buffer: message,
},
};
try {
// Whisper
session.call.write(audio_message);
} catch (error) {
console.log("Error sending buffer to deepgram : ", error);
}
} catch (err) {
console.error("Error writing to stream: ", err);
}
}
if (typeof message === "string") {
try {
const data = JSON.parse(message);
const { type, msg } = data;
switch (type) {
case "start":
session.starttime = Date.now();
session.chathistory = [];
session.chathistorybackup = [];
console.log("Making Connection with gRPC...");
try {
console.time("grpcconnection");
session.call = await getgRPCConnection(session);
console.timeEnd("grpcconnection");
const state = session.channel.getConnectivityState(false);
console.log(`Client : ${state}`);
session.saved = false;
wss.send(JSON.stringify({ type: "ready", msg: "connected" }));
console.log("Connected to gRPC.");
const {
sessionId,
silenceDuration,
threshold,
temperature,
activeVoice,
maxTokens,
} = JSON.parse(msg);
console.log({
sessionId,
silenceDuration,
threshold,
temperature,
activeVoice,
maxTokens,
});
console.log(silenceDuration);
const metadata = {
metadata: {
session_id: String(sessionId),
silenceDuration: parseInt(silenceDuration * 1000) || 800,
threshold: parseInt(threshold, 10) || 100,
temperature: parseFloat(temperature, 10) || 0.7,
activeVoice: String(activeVoice),
maxTokens: parseInt(maxTokens, 10) || 500,
},
};
console.log(metadata);
if (session.call) {
console.log("Sending metadata.");
session.call.write(metadata);
}
} catch (err) {
await cleanupConnection(session);
console.error("Error in making gRPC Connection. : ", err);
}
session.call.on("data", (response) => {
const { session_id, sequence_id, transcript, buffer } =
response;
const metadata = JSON.stringify({
session_id: session_id,
sequence_id: sequence_id,
transcript: transcript,
});
if (sequence_id === "-2") {
session.latency = Date.now();
wss.send(JSON.stringify({ type: "clear", msg: "clear" }));
session.chathistory = [...session.chathistorybackup];
// wss.send(
// JSON.stringify({
// type: "chathistory",
// msg: session.chathistorybackup,
// })
// );
const wavBuffer = Buffer.concat([
Buffer.from(metadata),
Buffer.from([0]),
buffer,
]);
const base64buffer = wavBuffer.toString("base64");
wss.send(
JSON.stringify({ type: "media", msg: base64buffer })
);
session.chathistory.push({
speaker: "USER",
content: transcript,
});
wss.send(
JSON.stringify({
type: "chat",
msg: {
role: "user",
content: transcript,
},
})
);
session.chathistorybackup.push({
speaker: "USER",
content: transcript,
});
return;
}
if (sequence_id === "0") {
wss.send(JSON.stringify({ type: "pause", msg: "pause" }));
session.cansend = false;
return;
}
if (sequence_id === "-3") {
wss.send(
JSON.stringify({
type: "transcribing",
msg: "transcribing",
})
);
return;
}
if (sequence_id === "-5") {
wss.send(
JSON.stringify({
type: "stop_transcribing",
msg: "stop_transcribing",
})
);
return;
}
if (sequence_id === "-10") {
wss.send(
JSON.stringify({
type: "connected",
msg: "connected",
})
);
return;
}
if (sequence_id === "-4") {
wss.send(
JSON.stringify({ type: "thinking", msg: "thinking" })
);
return;
}
if (sequence_id === "-1") {
wss.send(
JSON.stringify({ type: "continue", msg: "continue" })
);
return;
}
if (sequence_id === "1") {
const latency = Date.now() - session.latency;
session.latency = 0;
// wss.send(JSON.stringify({ type: "clear", msg: "clear" }));
session.cansend = true;
}
if (!buffer) {
return;
}
if (!session.cansend && sequence_id !== "0") {
return;
}
// Combine header and PCM data into a single Buffer
const wavBuffer = Buffer.concat([
Buffer.from(metadata),
Buffer.from([0]),
buffer,
]);
const base64buffer = wavBuffer.toString("base64");
wss.send(
JSON.stringify({ type: "media", msg: base64buffer })
);
updateChathistory(transcript, false, session);
wss.send(
JSON.stringify({
type: "chat",
msg: {
role: "ai",
content: transcript,
},
})
);
});
session.call.on("end", async () => {
console.log("Ended");
await cleanupConnection(session);
try {
wss.send(JSON.stringify({ type: "end", msg: "end" }));
} catch (err) {}
console.log("Stream ended");
});
session.call.on("error", async (error) => {
console.error(`Stream error: ${error}`);
try {
wss.send(JSON.stringify({ type: "end", msg: "end" }));
} catch (err) {}
await cleanupConnection(session);
});
break;
case "status":
const { session_id, sequence_id, transcript } = msg;
const status = {
status: {
transcript: transcript,
played_seq: sequence_id,
interrupt_seq: sequence_id,
},
};
if (session.call) {
session.call.write(status);
}
updateChathistory(transcript, true, session);
break;
case "stop":
console.log("Client Stoped the stream.");
await cleanupConnection(session);
break;
default:
console.log("Type not handled.");
}
} catch (err) {
console.log(`Not a valid json : ${err}`);
}
}
} catch (err) {
console.error(`Error in wss.onmessage : ${err}`);
}
});
wss.on("close", async () => {
await cleanupConnection(session);
console.log("WebSocket connection closed.");
});
wss.on("error", async (err) => {
console.error(`WebSocket error: ${err}`);
await cleanupConnection(session);
});
} catch (err) {
try {
console.log(err);
wss.send(JSON.stringify({ type: "end", msg: "end" }));
} catch (err) {}
}
};
module.exports = { audio_stream };