import gradio as gr import plotly.graph_objs as go import numpy as np import time from openai import OpenAI import os from hardCodedData import * from Helper import * import cv2 from moviepy.editor import VideoFileClip import time import base64 import whisperx import gc from moviepy.editor import VideoFileClip from dotenv import load_dotenv load_dotenv() ''' Model Information GPT4o ''' import openai api_key = os.getenv("OPENAI_API_KEY") client = openai.OpenAI( api_key=api_key, base_url="https://openai.gateway.salt-lab.org/v1", ) MODEL="gpt-4o" # Whisperx config device = "cpu" batch_size = 16 # reduce if low on GPU mem compute_type = "int8" # change to "int8" if low on GPU mem (may reduce accuracy) from faster_whisper.transcribe import TranscriptionOptions # Initialize TranscriptionOptions with the required arguments default_asr_options = TranscriptionOptions( beam_size=5, best_of=5, patience=0.0, length_penalty=1.0, repetition_penalty=1.0, no_repeat_ngram_size=0, log_prob_threshold=-1.0, no_speech_threshold=0.6, compression_ratio_threshold=2.4, condition_on_previous_text=True, prompt_reset_on_temperature=True, temperatures=[0.0], initial_prompt=None, prefix=None, suppress_blank=True, suppress_tokens=[], without_timestamps=False, max_initial_timestamp=1.0, word_timestamps=False, prepend_punctuations="\"'“¿([{-", append_punctuations="\"'.。,,!!??::”)]}、", max_new_tokens=512, clip_timestamps=True, hallucination_silence_threshold=0.5 ) # Load the model using whisperx.load_model model = whisperx.load_model("large-v2", device, compute_type=compute_type) ''' Video ''' video_file = None audio_path=None base64Frames = [] transcript='''Dialogue: and let's say you say well first this big guy came and got us out of class to learn how to tell stories and we were sitting in the classroom. start: 0 end: 8 Dialogue: I was sitting in the classroom with Jared, Jared, and Jacob when all of a sudden, it's about those words, all of a sudden a grizzly bear walked through the door. start: 9 end: 20 Dialogue: Would that be a problem? start: 22 end: 23 Dialogue: Yeah. start: 23 end: 25 Dialogue: Okay. start: 25 end: 26 Dialogue: Would that be our start: 26 end: 27 Dialogue: Yeah, so what's our takeoff in that story? start: 28 end: 30 ''' def process_video(video_path, seconds_per_frame=2): global base64Frames, audio_path base_video_path, _ = os.path.splitext(video_path) video = cv2.VideoCapture(video_path) total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) fps = video.get(cv2.CAP_PROP_FPS) frames_to_skip = int(fps * seconds_per_frame) curr_frame=0 while curr_frame < total_frames - 1: video.set(cv2.CAP_PROP_POS_FRAMES, curr_frame) success, frame = video.read() if not success: break _, buffer = cv2.imencode(".jpg", frame) base64Frames.append(base64.b64encode(buffer).decode("utf-8")) curr_frame += frames_to_skip video.release() audio_path = "./TEST.mp3" clip = VideoFileClip(video_path) clip.audio.write_audiofile(audio_path, bitrate="32k") clip.audio.close() clip.close() # transcribe_video(audio_path) print(f"Extracted {len(base64Frames)} frames") print(f"Extracted audio to {audio_path}") return base64Frames, audio_path chat_history = [] # chat_history.append({ # "role": "system", # "content": ( # """ # You are an assistant chatbot for a Speech Language Pathologist (SLP). # Your task is to help analyze a provided video of a therapy session and answer questions accurately. # Provide timestamps for specific events or behaviors mentioned. Conclude each response with possible follow-up questions. # Follow these steps: # 1. Suggest to the user to ask, “To get started, you can try asking me how many people there are in the video.” # 2. Detect how many people are in the video. # 2. Suggest to the user to tell you the names of the people in the video, starting from left to right. # 3. After receiving the names, respond with, “Ok thank you! Now you can ask me any questions about this video.” # 4. If the user asks about a behavior, respond with, “My understanding of this behavior is [xxx - AI generated output]. Is this a behavior that you want to track? If it is, please define this behavior and tell me more about it so I can analyze it more accurately according to your practice.” # 5. If you receive names, confirm that these are the names of the people from left to right. # """ # ) # }) def transcribe_video(filename): global transcript if not audio_path: raise ValueError("Audio path is None") print(audio_path) audio = whisperx.load_audio(audio_path) result = model.transcribe(audio, batch_size=batch_size) model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False) hf_auth_token = os.getenv("HF_AUTH_TOKEN") diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_auth_token, device=device) diarize_segments = diarize_model(audio) dia_result = whisperx.assign_word_speakers(diarize_segments, result) for res in dia_result["segments"]: # transcript += "Speaker: " + str(res.get("speaker", None)) + "\n" transcript += "Dialogue: " + str(res["text"].lstrip()) + "\n" transcript += "start: " + str(int(res["start"])) + "\n" transcript += "end: " + str(int(res["end"])) + "\n" transcript += "\n" return transcript def handle_video(video=None): global video_file, base64Frames, audio_path, chat_history, transcript if video is None: # Load example video video = "./TEST.mp4" base64Frames, audio_path = process_video(video_path=video, seconds_per_frame=100) chat_history.append({ "role": "user", "content": [ {"type": "text", "text": "These are the frames from the video."}, *map(lambda x: {"type": "image_url", "image_url": {"url": f'data:image/jpg;base64,{x}', "detail": "low"}}, base64Frames) ] }) if transcript: chat_history[-1]['content'].append({ "type": "text", "text": "Also, below is the template of transcript from the video:\n" "Speaker: \n" "Dialogue: \n" "start: \n" "end: \n" f"Transcription: {transcript}" }) video_file = video return video_file ''' Chatbot ''' def new_prompt(prompt): global chat_history, video_file chat_history.append({"role": "user","content": prompt,}) MODEL="gpt-4o" print(chat_history) # print(transcript) try: if video_file: # Video exists and is processed response = client.chat.completions.create(model=MODEL,messages=chat_history,temperature=0,) else: # No video uploaded yet response = client.chat.completions.create(model=MODEL,messages=chat_history,temperature=0,) # Extract the text content from the response and append it to the chat history assistant_message = response.choices[0].message.content chat_history.append({'role': 'model', 'content': assistant_message}) print(assistant_message) except Exception as e: print("Error: ",e) assistant_message = "API rate limit has been reached. Please wait a moment and try again." chat_history.append({'role': 'model', 'content': assistant_message}) # except google.api_core.exceptions.ResourceExhausted: # assistant_message = "API rate limit has been reached. Please wait a moment and try again." # chat_history.append({'role': 'model', 'parts': [assistant_message]}) # except Exception as e: # assistant_message = f"An error occurred: {str(e)}" # chat_history.append({'role': 'model', 'parts': [assistant_message]}) return chat_history def user_input(user_message, history): return "", history + [[user_message, None]] def bot_response(history): user_message = history[-1][0] updated_history = new_prompt(user_message) assistant_message = updated_history[-1]['content'] history[-1][1] = assistant_message yield history ''' Behaivor box ''' initial_behaviors = [ ("Initiating Behavioral Request (IBR)", ("The child's skill in using behavior(s) to elicit aid in obtaining an object, or object related event", ["00:10", "00:45", "01:30"])), ("Initiating Joint Attention (IJA)", ("The child's skill in using behavior(s) to initiate shared attention to objects or events.", ["00:15", "00:50", "01:40"])), ("Responding to Joint Attention (RJA)", ("The child's skill in following the examiner’s line of regard and pointing gestures.", ["00:20", "01:00", "02:00"])), ("Initiating Social Interaction (ISI)", ("The child's skill at initiating turn-taking sequences and the tendency to tease the tester", ["00:20", "00:50", "02:00"])), ("Responding to Social Interaction (RSI)", ("The child’s skill in responding to turn-taking interactions initiated by the examiner.", ["00:20", "01:00", "02:00"])) ] behaviors = initial_behaviors behavior_bank = [] def add_or_update_behavior(name, definition, timestamps, selected_behavior): global behaviors, behavior_bank if selected_behavior: # Update existing behavior for i, (old_name, _) in enumerate(behaviors): if old_name == selected_behavior: behaviors[i] = (name, (definition, timestamps)) break # Update behavior in the bank if it exists behavior_bank = [name if b == selected_behavior else b for b in behavior_bank] else: # Add new behavior new_behavior = (name, (definition, timestamps)) behaviors.append(new_behavior) choices = [b[0] for b in behaviors] return gr.Dropdown(choices=choices, value=None, interactive=True), gr.CheckboxGroup(choices=behavior_bank, value=behavior_bank, interactive=True), "", "", "" def add_to_behaivor_bank(selected_behavior, checkbox_group_values): global behavior_bank if selected_behavior and selected_behavior not in checkbox_group_values: checkbox_group_values.append(selected_behavior) behavior_bank = checkbox_group_values return gr.CheckboxGroup(choices=checkbox_group_values, value=checkbox_group_values, interactive=True), gr.Dropdown(value=None,interactive=True) def delete_behavior(selected_behavior, checkbox_group_values): global behaviors, behavior_bank behaviors = [b for b in behaviors if b[0] != selected_behavior] behavior_bank = [b for b in behavior_bank if b != selected_behavior] updated_choices = [b[0] for b in behaviors] updated_checkbox_group = [cb for cb in checkbox_group_values if cb != selected_behavior] return gr.Dropdown(choices=updated_choices, value=None, interactive=True), gr.CheckboxGroup(choices=updated_checkbox_group, value=updated_checkbox_group, interactive=True) def edit_behavior(selected_behavior): for name, (definition, timestamps) in behaviors: if name == selected_behavior: # Return values to populate textboxes return name, definition, timestamps return "", "", "" welcome_message = """ Hello! I'm your AI assistant. I can help you analyze your video sessions following your instructions. To get started, please upload a video or add your behaviors to the Behavior Bank using the Behavior Manager. """ #If you want to tell me about the people in the video, please name them starting from left to right. css=""" body { background-color: #edf1fa; /* offwhite */ } .gradio-container { background-color: #edf1fa; /* offwhite */ } .column-form .wrap { flex-direction: column; } .sidebar { background: #ffffff; padding: 10px; border-right: 1px solid #dee2e6; } .content { padding: 10px; } """ ''' Gradio Demo ''' with gr.Blocks(theme='base', css=css, title="Soap.AI") as demo: gr.Markdown("# 🤖 AI-Supported SOAP Generation") with gr.Row(): with gr.Column(): video = gr.Video(label="Video", visible=True, height=360, container=True) with gr.Row(): with gr.Column(min_width=1, scale=1): video_upload_button = gr.Button("Analyze Video", variant="primary") with gr.Column(min_width=1, scale=1): example_video_button = gr.Button("Load Example Video") video_upload_button.click(handle_video, inputs=video, outputs=video) example_video_button.click(handle_video, None, outputs=video) with gr.Column(): chat_section = gr.Group(visible=True) with chat_section: chatbot = gr.Chatbot(elem_id="chatbot", container=True, likeable=True, value=[[None, welcome_message]], avatar_images=(None, "./avatar.webp")) with gr.Row(): txt = gr.Textbox(show_label=False, placeholder="Type here!") with gr.Row(): send_btn = gr.Button("Send Message", elem_id="send-btn", variant="primary") clear_btn = gr.Button("Clear Chat", elem_id="clear-btn") with gr.Row(): behaivor_bank = gr.CheckboxGroup(label="Behavior Bank", choices=[], interactive=True, info="A space to store all the behaviors you want to analyze.") open_sidebar_btn = gr.Button("Show Behavior Manager", scale=0) close_sidebar_btn = gr.Button("Hide Behavior Manager", visible=False, scale=0) txt.submit(user_input, [txt, chatbot], [txt, chatbot], queue=False).then( bot_response, chatbot, chatbot) send_btn.click(user_input, [txt, chatbot], [txt, chatbot], queue=False).then( bot_response, chatbot, chatbot) clear_btn.click(lambda: None, None, chatbot, queue=False) # Define a sidebar column that is initially hidden with gr.Column(visible=False, min_width=200, scale=0.5, elem_classes="sidebar") as sidebar: behavior_dropdown = gr.Dropdown(label="Behavior Collection", choices=behaviors, interactive=True, container=True, elem_classes="column-form", info="Choose a behavior to add to the bank, edit or remove.") with gr.Row(): add_toBank_button = gr.Button("Add Behavior to Bank", variant="primary") edit_button = gr.Button("Edit Behavior") delete_button = gr.Button("Remove Behavior") with gr.Row(): name_input = gr.Textbox(label="Behavior Name", placeholder="(e.g., IBR)", info="The name you give to the specific behavior you're tracking or analyzing.") timestamps_input = gr.Textbox(label="Timestamps MM:SS", placeholder="(e.g., (01:15,01:35) )", info="The exact times during a session when you saw the behavior. The first two digits represent minutes and the last two digits represent seconds.") definition_input = gr.Textbox(lines=3, label="Behavior Definition", placeholder="(e.g., the child's skill in using behavior(s) to elicit aid in obtaining an object, or object related event)", info="Provide a clear definition of the behavior.") with gr.Row(): submit_button = gr.Button("Save Behavior", variant="primary") submit_button.click(fn=add_or_update_behavior, inputs=[name_input, definition_input, timestamps_input, behavior_dropdown], outputs=[behavior_dropdown, behaivor_bank, name_input, definition_input, timestamps_input]) add_toBank_button.click(fn=add_to_behaivor_bank, inputs=[behavior_dropdown, behaivor_bank], outputs=[behaivor_bank, behavior_dropdown]) delete_button.click(fn=delete_behavior, inputs=[behavior_dropdown, behaivor_bank], outputs=[behavior_dropdown, behaivor_bank]) edit_button.click(fn=edit_behavior, inputs=[behavior_dropdown], outputs=[name_input, definition_input, timestamps_input]) # Function to open the sidebar open_sidebar_btn.click(lambda: { open_sidebar_btn: gr.Button(visible=False), close_sidebar_btn: gr.Button(visible=True), sidebar: gr.Column(visible=True) }, outputs=[open_sidebar_btn, close_sidebar_btn, sidebar]) # Function to close the sidebar close_sidebar_btn.click(lambda: { open_sidebar_btn: gr.Button(visible=True), close_sidebar_btn: gr.Button(visible=False), sidebar: gr.Column(visible=False) }, outputs=[open_sidebar_btn, close_sidebar_btn, sidebar]) # Launch the demo demo.launch(share=True)