|
from deepgram import DeepgramClient, LiveTranscriptionEvents, LiveOptions |
|
import asyncio |
|
import os |
|
from src.logging import initialize_logger |
|
import logging |
|
import threading |
|
import time |
|
|
|
logger = initialize_logger("transcriber", level=logging.INFO) |
|
|
|
options = LiveOptions( |
|
model="nova-2", |
|
language="en-US", |
|
smart_format=True, |
|
punctuate=True, |
|
|
|
sample_rate=48000, |
|
interim_results=True, |
|
) |
|
|
|
|
|
class Transcriber: |
|
def __init__( |
|
self, |
|
): |
|
self.deepgram_api_key = os.getenv("DEEPGRAM_API_KEY") |
|
self.deepgram = None |
|
self.dg_connection = None |
|
self.audio_queue = asyncio.Queue() |
|
self.stop_event = threading.Event() |
|
|
|
def process_audio(self): |
|
while not self.stop_event.is_set(): |
|
try: |
|
if self.dg_connection is None: |
|
logger.info("returned from process") |
|
return |
|
|
|
if self.audio_queue.empty(): |
|
time.sleep(0.1) |
|
continue |
|
|
|
data = self.audio_queue.get_nowait() |
|
self.dg_connection.send(data) |
|
self.audio_queue.task_done() |
|
logger.info("sent data to deepgram") |
|
except Exception as e: |
|
logger.warning(f"Error while sending data: {e}") |
|
break |
|
|
|
logger.info("Audio processing thread is stopping") |
|
|
|
def on_transcript(self, result, *args, **kwargs): |
|
try: |
|
sentence = result.channel.alternatives[0].transcript |
|
logger.info(f"Transcription: {sentence}") |
|
except Exception as e: |
|
logger.warning(e) |
|
|
|
def close_connection(self): |
|
if self.dg_connection: |
|
self.dg_connection.finish() |
|
self.dg_connection = None |
|
logger.info("finished deepgram connection") |
|
|
|
def stop(self): |
|
self.stop_event.set() |
|
self.close_connection() |
|
logger.info("Requested to stop the audio processing thread") |
|
|
|
def on_close(self, *args, **kwargs): |
|
logger.info("Deepgram connection closed") |
|
self.dg_connection = None |
|
|
|
def on_utterance_end(self, utterance_end, *args, **kwargs): |
|
logger.info(f"\n\n{utterance_end}\n\n") |
|
|
|
def on_error(self, e, *args, **kwargs): |
|
logger.warning(f"Deepgram error received {e}") |
|
self.dg_connection = None |
|
|
|
def start_deepgram(self): |
|
try: |
|
self.deepgram = DeepgramClient(self.deepgram_api_key) |
|
dg_connection = self.deepgram.listen.live.v("1") |
|
except Exception as e: |
|
logger.warning(f"Could not open socket: {e}") |
|
return |
|
|
|
def on_message(self, result, **kwargs): |
|
sentence = result.channel.alternatives[0].transcript |
|
if len(sentence) == 0: |
|
return |
|
logger.info(f"speaker: {sentence}") |
|
|
|
def on_metadata(self, metadata, **kwargs): |
|
logger.info(f"\n\n{metadata}\n\n") |
|
|
|
def on_utterance_end(self, utterance_end, **kwargs): |
|
logger.info(f"\n\n{utterance_end}\n\n") |
|
|
|
def on_error(self, error, **kwargs): |
|
logger.info(f"\n\n{error}\n\n") |
|
|
|
def on_close(self, **kwargs): |
|
logger.info(f"\n\nclosed\n\n") |
|
|
|
dg_connection.on(LiveTranscriptionEvents.Transcript, on_message) |
|
dg_connection.on(LiveTranscriptionEvents.Metadata, on_metadata) |
|
|
|
dg_connection.on(LiveTranscriptionEvents.UtteranceEnd, on_utterance_end) |
|
dg_connection.on(LiveTranscriptionEvents.Error, on_error) |
|
dg_connection.on(LiveTranscriptionEvents.Close, on_close) |
|
|
|
dg_connection.start(options) |
|
self.dg_connection = dg_connection |
|
|
|
logger.info("deepgram connection opened") |
|
self.process_audio() |
|
|
|
def start(self): |
|
threading.Thread(target=self.start_deepgram).start() |
|
|
|
def send_audio(self, data): |
|
try: |
|
self.audio_queue.put_nowait(data) |
|
except Exception as e: |
|
logger.warning(e) |
|
|