rcastriotta
publish
1a3fc6f
from deepgram import DeepgramClient, LiveTranscriptionEvents, LiveOptions
import asyncio
import os
from src.logging import initialize_logger
import logging
import threading
import time
logger = initialize_logger("transcriber", level=logging.INFO)
options = LiveOptions(
model="nova-2",
language="en-US",
smart_format=True,
punctuate=True,
# smart_format=True,
sample_rate=48000,
interim_results=True,
)
class Transcriber:
def __init__(
self,
):
self.deepgram_api_key = os.getenv("DEEPGRAM_API_KEY")
self.deepgram = None
self.dg_connection = None
self.audio_queue = asyncio.Queue()
self.stop_event = threading.Event()
def process_audio(self):
while not self.stop_event.is_set():
try:
if self.dg_connection is None:
logger.info("returned from process")
return
if self.audio_queue.empty():
time.sleep(0.1)
continue
data = self.audio_queue.get_nowait()
self.dg_connection.send(data)
self.audio_queue.task_done()
logger.info("sent data to deepgram")
except Exception as e:
logger.warning(f"Error while sending data: {e}")
break
logger.info("Audio processing thread is stopping")
def on_transcript(self, result, *args, **kwargs):
try:
sentence = result.channel.alternatives[0].transcript
logger.info(f"Transcription: {sentence}")
except Exception as e:
logger.warning(e)
def close_connection(self):
if self.dg_connection:
self.dg_connection.finish()
self.dg_connection = None
logger.info("finished deepgram connection")
def stop(self):
self.stop_event.set()
self.close_connection()
logger.info("Requested to stop the audio processing thread")
def on_close(self, *args, **kwargs):
logger.info("Deepgram connection closed")
self.dg_connection = None
def on_utterance_end(self, utterance_end, *args, **kwargs):
logger.info(f"\n\n{utterance_end}\n\n")
def on_error(self, e, *args, **kwargs):
logger.warning(f"Deepgram error received {e}")
self.dg_connection = None
def start_deepgram(self):
try:
self.deepgram = DeepgramClient(self.deepgram_api_key)
dg_connection = self.deepgram.listen.live.v("1")
except Exception as e:
logger.warning(f"Could not open socket: {e}")
return
def on_message(self, result, **kwargs):
sentence = result.channel.alternatives[0].transcript
if len(sentence) == 0:
return
logger.info(f"speaker: {sentence}")
def on_metadata(self, metadata, **kwargs):
logger.info(f"\n\n{metadata}\n\n")
def on_utterance_end(self, utterance_end, **kwargs):
logger.info(f"\n\n{utterance_end}\n\n")
def on_error(self, error, **kwargs):
logger.info(f"\n\n{error}\n\n")
def on_close(self, **kwargs):
logger.info(f"\n\nclosed\n\n")
dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata)
# dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started)
dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end)
dg_connection.on(LiveTranscriptionEvents.Error, on_error)
dg_connection.on(LiveTranscriptionEvents.Close, on_close)
dg_connection.start(options)
self.dg_connection = dg_connection
logger.info("deepgram connection opened")
self.process_audio()
def start(self):
threading.Thread(target=self.start_deepgram).start()
def send_audio(self, data):
try:
self.audio_queue.put_nowait(data)
except Exception as e:
logger.warning(e)