|
const isBuffer = require("is-buffer"); |
|
const { Buffer } = require("buffer"); |
|
const Session = require("../utils/session.js"); |
|
const { cleanupConnection } = require("./cleangRPCconnections.js"); |
|
const { getgRPCConnection } = require("./makegRPCconnection.js"); |
|
const { updateChathistory } = require("../providers/updateChathistory.js"); |
|
|
|
const audio_stream = async (wss, req) => { |
|
try { |
|
const session = new Session(); |
|
|
|
wss.send(JSON.stringify({ type: "initial", msg: "connected" })); |
|
|
|
wss.on("message", async (message) => { |
|
try { |
|
if (isBuffer(message) && session.call) { |
|
try { |
|
const audio_message = { |
|
audio_data: { |
|
buffer: message, |
|
}, |
|
}; |
|
|
|
try { |
|
|
|
session.call.write(audio_message); |
|
} catch (error) { |
|
console.log("Error sending buffer to deepgram : ", error); |
|
} |
|
} catch (err) { |
|
console.error("Error writing to stream: ", err); |
|
} |
|
} |
|
|
|
if (typeof message === "string") { |
|
try { |
|
const data = JSON.parse(message); |
|
|
|
const { type, msg } = data; |
|
|
|
switch (type) { |
|
case "start": |
|
session.starttime = Date.now(); |
|
session.chathistory = []; |
|
session.chathistorybackup = []; |
|
console.log("Making Connection with gRPC..."); |
|
try { |
|
console.time("grpcconnection"); |
|
session.call = await getgRPCConnection(session); |
|
console.timeEnd("grpcconnection"); |
|
const state = session.channel.getConnectivityState(false); |
|
console.log(`Client : ${state}`); |
|
session.saved = false; |
|
wss.send(JSON.stringify({ type: "ready", msg: "connected" })); |
|
console.log("Connected to gRPC."); |
|
|
|
const { |
|
sessionId, |
|
silenceDuration, |
|
threshold, |
|
temperature, |
|
activeVoice, |
|
maxTokens, |
|
} = JSON.parse(msg); |
|
console.log({ |
|
sessionId, |
|
silenceDuration, |
|
threshold, |
|
temperature, |
|
activeVoice, |
|
maxTokens, |
|
}); |
|
|
|
console.log(silenceDuration); |
|
const metadata = { |
|
metadata: { |
|
session_id: String(sessionId), |
|
silenceDuration: parseInt(silenceDuration * 1000) || 800, |
|
threshold: parseInt(threshold, 10) || 100, |
|
temperature: parseFloat(temperature, 10) || 0.7, |
|
activeVoice: String(activeVoice), |
|
maxTokens: parseInt(maxTokens, 10) || 500, |
|
}, |
|
}; |
|
|
|
console.log(metadata); |
|
if (session.call) { |
|
console.log("Sending metadata."); |
|
session.call.write(metadata); |
|
} |
|
} catch (err) { |
|
await cleanupConnection(session); |
|
console.error("Error in making gRPC Connection. : ", err); |
|
} |
|
session.call.on("data", (response) => { |
|
const { session_id, sequence_id, transcript, buffer } = |
|
response; |
|
|
|
const metadata = JSON.stringify({ |
|
session_id: session_id, |
|
sequence_id: sequence_id, |
|
transcript: transcript, |
|
}); |
|
|
|
if (sequence_id === "-2") { |
|
session.latency = Date.now(); |
|
wss.send(JSON.stringify({ type: "clear", msg: "clear" })); |
|
session.chathistory = [...session.chathistorybackup]; |
|
|
|
|
|
|
|
|
|
|
|
|
|
const wavBuffer = Buffer.concat([ |
|
Buffer.from(metadata), |
|
Buffer.from([0]), |
|
buffer, |
|
]); |
|
|
|
const base64buffer = wavBuffer.toString("base64"); |
|
wss.send( |
|
JSON.stringify({ type: "media", msg: base64buffer }) |
|
); |
|
session.chathistory.push({ |
|
speaker: "USER", |
|
content: transcript, |
|
}); |
|
wss.send( |
|
JSON.stringify({ |
|
type: "chat", |
|
msg: { |
|
role: "user", |
|
content: transcript, |
|
}, |
|
}) |
|
); |
|
session.chathistorybackup.push({ |
|
speaker: "USER", |
|
content: transcript, |
|
}); |
|
return; |
|
} |
|
|
|
if (sequence_id === "0") { |
|
wss.send(JSON.stringify({ type: "pause", msg: "pause" })); |
|
session.cansend = false; |
|
return; |
|
} |
|
|
|
if (sequence_id === "-3") { |
|
wss.send( |
|
JSON.stringify({ |
|
type: "transcribing", |
|
msg: "transcribing", |
|
}) |
|
); |
|
return; |
|
} |
|
if (sequence_id === "-5") { |
|
wss.send( |
|
JSON.stringify({ |
|
type: "stop_transcribing", |
|
msg: "stop_transcribing", |
|
}) |
|
); |
|
return; |
|
} |
|
if (sequence_id === "-10") { |
|
wss.send( |
|
JSON.stringify({ |
|
type: "connected", |
|
msg: "connected", |
|
}) |
|
); |
|
return; |
|
} |
|
if (sequence_id === "-4") { |
|
wss.send( |
|
JSON.stringify({ type: "thinking", msg: "thinking" }) |
|
); |
|
return; |
|
} |
|
|
|
if (sequence_id === "-1") { |
|
wss.send( |
|
JSON.stringify({ type: "continue", msg: "continue" }) |
|
); |
|
return; |
|
} |
|
|
|
if (sequence_id === "1") { |
|
const latency = Date.now() - session.latency; |
|
session.latency = 0; |
|
|
|
session.cansend = true; |
|
} |
|
|
|
if (!buffer) { |
|
return; |
|
} |
|
|
|
if (!session.cansend && sequence_id !== "0") { |
|
return; |
|
} |
|
|
|
|
|
const wavBuffer = Buffer.concat([ |
|
Buffer.from(metadata), |
|
Buffer.from([0]), |
|
buffer, |
|
]); |
|
|
|
const base64buffer = wavBuffer.toString("base64"); |
|
wss.send( |
|
JSON.stringify({ type: "media", msg: base64buffer }) |
|
); |
|
|
|
updateChathistory(transcript, false, session); |
|
|
|
wss.send( |
|
JSON.stringify({ |
|
type: "chat", |
|
msg: { |
|
role: "ai", |
|
content: transcript, |
|
}, |
|
}) |
|
); |
|
}); |
|
|
|
session.call.on("end", async () => { |
|
console.log("Ended"); |
|
await cleanupConnection(session); |
|
try { |
|
wss.send(JSON.stringify({ type: "end", msg: "end" })); |
|
} catch (err) {} |
|
console.log("Stream ended"); |
|
}); |
|
|
|
session.call.on("error", async (error) => { |
|
console.error(`Stream error: ${error}`); |
|
try { |
|
wss.send(JSON.stringify({ type: "end", msg: "end" })); |
|
} catch (err) {} |
|
await cleanupConnection(session); |
|
}); |
|
break; |
|
|
|
case "status": |
|
const { session_id, sequence_id, transcript } = msg; |
|
const status = { |
|
status: { |
|
transcript: transcript, |
|
played_seq: sequence_id, |
|
interrupt_seq: sequence_id, |
|
}, |
|
}; |
|
|
|
if (session.call) { |
|
session.call.write(status); |
|
} |
|
|
|
updateChathistory(transcript, true, session); |
|
break; |
|
|
|
case "stop": |
|
console.log("Client Stoped the stream."); |
|
await cleanupConnection(session); |
|
break; |
|
default: |
|
console.log("Type not handled."); |
|
} |
|
} catch (err) { |
|
console.log(`Not a valid json : ${err}`); |
|
} |
|
} |
|
} catch (err) { |
|
console.error(`Error in wss.onmessage : ${err}`); |
|
} |
|
}); |
|
|
|
wss.on("close", async () => { |
|
await cleanupConnection(session); |
|
console.log("WebSocket connection closed."); |
|
}); |
|
|
|
wss.on("error", async (err) => { |
|
console.error(`WebSocket error: ${err}`); |
|
await cleanupConnection(session); |
|
}); |
|
} catch (err) { |
|
try { |
|
console.log(err); |
|
wss.send(JSON.stringify({ type: "end", msg: "end" })); |
|
} catch (err) {} |
|
} |
|
}; |
|
|
|
module.exports = { audio_stream }; |
|
|