from collections import deque import os import threading import time import av import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer import pydub import torch # import av # import cv2 from sample_utils.turn import get_ice_servers import json from typing import List from vosk import SetLogLevel, Model, KaldiRecognizer SetLogLevel(-1) # mutes vosk verbosity from dotenv import load_dotenv load_dotenv() system_one = { "audio_bit_rate": 16000, # "audio_bit_rate": 32000, # "audio_bit_rate": 48000, # "vision_embeddings_fps": 5, "vision_embeddings_fps": 2, } system_one["video_detection_emotions"] = [ "Happiness", "Sadness", "Fear", "Disgust", "Anger", "Surprise", "Boredom", "Interest", "Excitement", "Guilt", "Shame", "Relief", "Love", "Embarrassment", "Pride", "Envy", "Jealousy", "Anxiety", "Hope", "Despair", "Frustration", "Confusion", "Curiosity", "Contentment", "Indifference", "Anticipation", "Gratitude", "Bitterness" ] system_one["video_detection_engement"] = [ "Facial_Expressions", "Open_Body_Language", "Closed_Body_Language", "Eye_Contact", "Interest", "Boredom", "Confusion", "Frustration", "Question_Asking", "Engaged_Language", "Short_Responses", "Distraction_Signs" ] system_one["video_detection_present"] = [ "a person", "no one", " ", "multiple people", "a group of people", ] system_one_audio_status = st.empty() playing = st.checkbox("Playing", value=True) def load_vosk (model='small'): # load vosk model # get path of current file current_file_path = os.path.abspath(__file__) current_directory = os.path.dirname(current_file_path) _path = os.path.join(current_directory, 'models', 'vosk', model) model_voice = Model(_path) recognizer = KaldiRecognizer(model_voice, system_one['audio_bit_rate']) return recognizer vask = load_vosk() def handle_audio_frame(frame): # if self.vosk.AcceptWaveform(data): pass def do_work(data: bytearray) -> tuple[str, bool]: text = '' speaker_finished = False if vask.AcceptWaveform(data): result = vask.Result() result_json = json.loads(result) text = result_json['text'] speaker_finished = True else: result = vask.PartialResult() result_json = json.loads(result) text = result_json['partial'] return text, speaker_finished audio_frames_deque_lock = threading.Lock() audio_frames_deque: deque = deque([]) video_frames_deque_lock = threading.Lock() video_frames_deque: deque = deque([]) async def queued_video_frames_callback( frames: List[av.AudioFrame], ) -> av.AudioFrame: with video_frames_deque_lock: video_frames_deque.extend(frames) return frames async def queued_audio_frames_callback( frames: List[av.AudioFrame], ) -> av.AudioFrame: with audio_frames_deque_lock: audio_frames_deque.extend(frames) # create frames to be returned. new_frames = [] for frame in frames: input_array = frame.to_ndarray() new_frame = av.AudioFrame.from_ndarray( np.zeros(input_array.shape, dtype=input_array.dtype), layout=frame.layout.name, ) new_frame.sample_rate = frame.sample_rate new_frames.append(new_frame) # TODO: replace with the audio we want to send to the other side. return new_frames system_one_audio_status.write("Initializing CLIP model") from clip_transform import CLIPTransform clip_transform = CLIPTransform() system_one_audio_status.write("Initializing CLIP templates") embeddings = clip_transform.text_to_embeddings(system_one["video_detection_emotions"]) system_one["video_detection_emotions_embeddings"] = embeddings embeddings = clip_transform.text_to_embeddings(system_one["video_detection_engement"]) system_one["video_detection_engement_embeddings"] = embeddings embeddings = clip_transform.text_to_embeddings(system_one["video_detection_present"]) system_one["video_detection_present_embeddings"] = embeddings system_one_audio_status.write("Initializing webrtc_streamer") webrtc_ctx = webrtc_streamer( key="charles", desired_playing_state=playing, # audio_receiver_size=4096, queued_audio_frames_callback=queued_audio_frames_callback, queued_video_frames_callback=queued_video_frames_callback, mode=WebRtcMode.SENDRECV, rtc_configuration={"iceServers": get_ice_servers()}, async_processing=True, ) if not webrtc_ctx.state.playing: exit system_one_audio_status.write("Initializing streaming") system_one_audio_output = st.empty() system_one_video_output = st.empty() system_one_audio_history = [] system_one_audio_history_output = st.empty() sound_chunk = pydub.AudioSegment.empty() current_video_embedding = None current_video_embedding_timestamp = time.monotonic() def get_dot_similarities(video_embedding, embeddings, embeddings_labels): dot_product = torch.mm(embeddings, video_embedding.T) similarity_image_label = [(float("{:.4f}".format(dot_product[i][0])), embeddings_labels[i]) for i in range(len(embeddings_labels))] similarity_image_label.sort(reverse=True) return similarity_image_label while True: if webrtc_ctx.state.playing: # handle video video_frames = [] with video_frames_deque_lock: while len(video_frames_deque) > 0: frame = video_frames_deque.popleft() video_frames.append(frame) get_embeddings = False get_embeddings |= current_video_embedding is None current_time = time.monotonic() elapsed_time = current_time - current_video_embedding_timestamp get_embeddings |= elapsed_time > 1. / system_one['vision_embeddings_fps'] if get_embeddings and len(video_frames) > 0: current_video_embedding_timestamp = current_time current_video_embedding = clip_transform.image_to_embeddings(video_frames[-1].to_ndarray()) similarities = get_dot_similarities(current_video_embedding, system_one["video_detection_emotions_embeddings"], system_one["video_detection_emotions"]) emotions_top_3 = "" for i in range(3): emotions_top_3 += f"{similarities[i][1]} ({similarities[i][0]}) " similarities = get_dot_similarities(current_video_embedding, system_one["video_detection_engement_embeddings"], system_one["video_detection_engement"]) engagement_top_3 = "" for i in range(3): engagement_top_3 += f"{similarities[i][1]} ({similarities[i][0]}) " similarities = get_dot_similarities(current_video_embedding, system_one["video_detection_present_embeddings"], system_one["video_detection_present"]) present_top_3 = "" for i in range(3): present_top_3 += f"'{similarities[i][1]}' ({similarities[i][0]}), " # table_content = "**System 1 Video:**\n\n" table_content = "| System 1 Video | |\n| --- | --- |\n" table_content += f"| Present | {present_top_3} |\n" table_content += f"| Emotion | {emotions_top_3} |\n" table_content += f"| Engagement | {engagement_top_3} |\n" system_one_video_output.markdown(table_content) # system_one_video_output.markdown(f"**System 1 Video:** \n [Emotion: {emotions_top_3}], \n [Engagement: {engagement_top_3}], \n [Present: {present_top_3}] ") # for similarity, image_label in similarity_image_label: # print (f"{similarity} {image_label}") # handle audio audio_frames = [] with audio_frames_deque_lock: while len(audio_frames_deque) > 0: frame = audio_frames_deque.popleft() audio_frames.append(frame) if len(audio_frames) == 0: time.sleep(0.1) system_one_audio_status.write("No frame arrived.") continue system_one_audio_status.write("Running. Say something!") for audio_frame in audio_frames: sound = pydub.AudioSegment( data=audio_frame.to_ndarray().tobytes(), sample_width=audio_frame.format.bytes, frame_rate=audio_frame.sample_rate, channels=len(audio_frame.layout.channels), ) sound = sound.set_channels(1) sound = sound.set_frame_rate(system_one['audio_bit_rate']) sound_chunk += sound if len(sound_chunk) > 0: buffer = np.array(sound_chunk.get_array_of_samples()) text, speaker_finished = do_work(buffer.tobytes()) system_one_audio_output.markdown(f"**System 1 Audio:** {text}") if speaker_finished and len(text) > 0: system_one_audio_history.append(text) if len(system_one_audio_history) > 10: system_one_audio_history = system_one_audio_history[-10:] table_content = "| System 1 Audio History |\n| --- |\n" table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)]) system_one_audio_history_output.markdown(table_content) sound_chunk = pydub.AudioSegment.empty() else: system_one_audio_status.write("Stopped.") break