from deepgram import DeepgramClient, LiveTranscriptionEvents, LiveOptions import asyncio import os from src.logging import initialize_logger import logging import threading import time logger = initialize_logger("transcriber", level=logging.INFO) options = LiveOptions( model="nova-2", language="en-US", smart_format=True, punctuate=True, # smart_format=True, sample_rate=48000, interim_results=True, ) class Transcriber: def __init__( self, ): self.deepgram_api_key = os.getenv("DEEPGRAM_API_KEY") self.deepgram = None self.dg_connection = None self.audio_queue = asyncio.Queue() self.stop_event = threading.Event() def process_audio(self): while not self.stop_event.is_set(): try: if self.dg_connection is None: logger.info("returned from process") return if self.audio_queue.empty(): time.sleep(0.1) continue data = self.audio_queue.get_nowait() self.dg_connection.send(data) self.audio_queue.task_done() logger.info("sent data to deepgram") except Exception as e: logger.warning(f"Error while sending data: {e}") break logger.info("Audio processing thread is stopping") def on_transcript(self, result, *args, **kwargs): try: sentence = result.channel.alternatives[0].transcript logger.info(f"Transcription: {sentence}") except Exception as e: logger.warning(e) def close_connection(self): if self.dg_connection: self.dg_connection.finish() self.dg_connection = None logger.info("finished deepgram connection") def stop(self): self.stop_event.set() self.close_connection() logger.info("Requested to stop the audio processing thread") def on_close(self, *args, **kwargs): logger.info("Deepgram connection closed") self.dg_connection = None def on_utterance_end(self, utterance_end, *args, **kwargs): logger.info(f"\n\n{utterance_end}\n\n") def on_error(self, e, *args, **kwargs): logger.warning(f"Deepgram error received {e}") self.dg_connection = None def start_deepgram(self): try: self.deepgram = DeepgramClient(self.deepgram_api_key) dg_connection = self.deepgram.listen.live.v("1") except Exception as e: logger.warning(f"Could not open socket: {e}") return def on_message(self, result, **kwargs): sentence = result.channel.alternatives[0].transcript if len(sentence) == 0: return logger.info(f"speaker: {sentence}") def on_metadata(self, metadata, **kwargs): logger.info(f"\n\n{metadata}\n\n") def on_utterance_end(self, utterance_end, **kwargs): logger.info(f"\n\n{utterance_end}\n\n") def on_error(self, error, **kwargs): logger.info(f"\n\n{error}\n\n") def on_close(self, **kwargs): logger.info(f"\n\nclosed\n\n") dg_connection.on(LiveTranscriptionEvents.Transcript, on_message) dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata) # dg_connection.on(LiveTranscriptionEvents.SpeechStarted, on_speech_started) dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end) dg_connection.on(LiveTranscriptionEvents.Error, on_error) dg_connection.on(LiveTranscriptionEvents.Close, on_close) dg_connection.start(options) self.dg_connection = dg_connection logger.info("deepgram connection opened") self.process_audio() def start(self): threading.Thread(target=self.start_deepgram).start() def send_audio(self, data): try: self.audio_queue.put_nowait(data) except Exception as e: logger.warning(e)