|
import gradio as gr |
|
import plotly.graph_objs as go |
|
import numpy as np |
|
import time |
|
from openai import OpenAI |
|
import os |
|
from hardCodedData import * |
|
from Helper import * |
|
import cv2 |
|
from moviepy.editor import VideoFileClip |
|
import time |
|
import base64 |
|
import whisperx |
|
import gc |
|
from moviepy.editor import VideoFileClip |
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
''' |
|
Model Information |
|
GPT4o |
|
''' |
|
|
|
import openai |
|
api_key = os.getenv("OPENAI_API_KEY") |
|
client = openai.OpenAI( |
|
api_key=api_key, |
|
base_url="https://openai.gateway.salt-lab.org/v1", |
|
) |
|
MODEL="gpt-4o" |
|
|
|
|
|
device = "cpu" |
|
batch_size = 16 |
|
compute_type = "int8" |
|
from faster_whisper.transcribe import TranscriptionOptions |
|
|
|
|
|
default_asr_options = TranscriptionOptions( |
|
beam_size=5, |
|
best_of=5, |
|
patience=0.0, |
|
length_penalty=1.0, |
|
repetition_penalty=1.0, |
|
no_repeat_ngram_size=0, |
|
log_prob_threshold=-1.0, |
|
no_speech_threshold=0.6, |
|
compression_ratio_threshold=2.4, |
|
condition_on_previous_text=True, |
|
prompt_reset_on_temperature=True, |
|
temperatures=[0.0], |
|
initial_prompt=None, |
|
prefix=None, |
|
suppress_blank=True, |
|
suppress_tokens=[], |
|
without_timestamps=False, |
|
max_initial_timestamp=1.0, |
|
word_timestamps=False, |
|
prepend_punctuations="\"'“¿([{-", |
|
append_punctuations="\"'.。,,!!??::”)]}、", |
|
max_new_tokens=512, |
|
clip_timestamps=True, |
|
hallucination_silence_threshold=0.5 |
|
) |
|
|
|
|
|
model = whisperx.load_model("large-v2", device, compute_type=compute_type) |
|
''' |
|
Video |
|
''' |
|
video_file = None |
|
audio_path=None |
|
base64Frames = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
transcript="" |
|
|
|
def process_video(video_path, seconds_per_frame=2, target_width=320, target_height=180): |
|
global audio_path |
|
base64Frames = [] |
|
base_video_path, _ = os.path.splitext(video_path) |
|
|
|
video = cv2.VideoCapture(video_path) |
|
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
fps = video.get(cv2.CAP_PROP_FPS) |
|
frames_to_skip = int(fps * seconds_per_frame) |
|
curr_frame = 0 |
|
|
|
|
|
original_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
original_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
print(f"Original width: {original_width}, Original height: {original_height}") |
|
|
|
|
|
while curr_frame < total_frames - 1: |
|
video.set(cv2.CAP_PROP_POS_FRAMES, curr_frame) |
|
success, frame = video.read() |
|
if not success: |
|
break |
|
|
|
|
|
resized_frame = cv2.resize(frame, (target_width, target_height)) |
|
|
|
_, buffer = cv2.imencode(".jpg", resized_frame) |
|
base64Frames.append(base64.b64encode(buffer).decode("utf-8")) |
|
curr_frame += frames_to_skip |
|
|
|
video.release() |
|
|
|
|
|
audio_path = f"./TEST.mp3" |
|
clip = VideoFileClip(video_path) |
|
clip.audio.write_audiofile(audio_path, bitrate="32k") |
|
clip.audio.close() |
|
clip.close() |
|
transcribe_video(audio_path) |
|
print(f"Extracted {len(base64Frames)} frames") |
|
print(f"Extracted audio to {audio_path}") |
|
return base64Frames, audio_path |
|
|
|
chat_history = [] |
|
chat_history.append({ |
|
"role": "system", |
|
"content": ( |
|
""" |
|
You are an assistant chatbot for a Speech Language Pathologist (SLP). |
|
Your task is to help analyze a provided video of a therapy session and answer questions accurately. |
|
Provide timestamps in MM:SS format as frames are given at 1 fps for specific events or behaviors mentioned. |
|
|
|
Analyse the video for IRB based on information below: Initiating Behavioral Request (IBR): the child's skill in using behavior(s) to elicit aid in obtaining an object, or object related event |
|
|
|
Instances of IBR: |
|
-Language: Listen for intelligible single words or greater verbal expressions the child uses to request an object or assistance. |
|
-React:Observe if the child extends their arm with an open palm towards the object or the adult. Do not consider grabbing as a --requesting gesture. |
|
-Point: Look for the child pointing at the object or direction where the object is located. |
|
-Give: Watch if the child hands a toy or object to the adult to request help. |
|
""" |
|
) |
|
}) |
|
|
|
def transcribe_video(audio_path): |
|
global transcript |
|
if not audio_path: |
|
raise ValueError("Audio path is None") |
|
print(audio_path) |
|
audio = whisperx.load_audio(audio_path) |
|
result = model.transcribe(audio, batch_size=batch_size) |
|
|
|
model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) |
|
result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False) |
|
|
|
|
|
hf_auth_token = os.getenv("HF_AUTH_TOKEN") |
|
diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_auth_token, device=device) |
|
|
|
diarize_segments = diarize_model(audio) |
|
|
|
dia_result = whisperx.assign_word_speakers(diarize_segments, result) |
|
|
|
for res in dia_result["segments"]: |
|
|
|
transcript += "Dialogue: " + str(res["text"].lstrip()) + "\n" |
|
transcript += "start: " + str(int(res["start"])) + "\n" |
|
transcript += "end: " + str(int(res["end"])) + "\n" |
|
transcript += "\n" |
|
print(transcript) |
|
return transcript |
|
|
|
|
|
def handle_video(video=None): |
|
global video_file, base64Frames, audio_path, chat_history, transcript |
|
|
|
if video is None: |
|
|
|
video = "./TEST.mp4" |
|
|
|
base64Frames, audio_path = process_video(video, seconds_per_frame=1, target_width=320, target_height=180) |
|
chat_history.append({ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": "These are the frames from the video."}, |
|
*map(lambda x: {"type": "image_url", "image_url": {"url": f'data:image/jpg;base64,{x}', "detail": "low"}}, base64Frames) |
|
] |
|
}) |
|
|
|
if transcript: |
|
chat_history[-1]['content'].append({ |
|
"type": "text", |
|
"text": "Also, below is the template of transcript from the video:\n" |
|
"Speaker: <the speaker of the dialogue>\n" |
|
"Dialogue: <the text of the dialogue>\n" |
|
"start: <the starting timestamp of the dialogue in the video in second>\n" |
|
"end: <the ending timestamp of the dialogue in the video in second>\n" |
|
f"Transcription: {transcript}" |
|
}) |
|
|
|
video_file = video |
|
return video_file |
|
|
|
''' |
|
Chatbot |
|
''' |
|
|
|
def new_prompt(prompt): |
|
global chat_history, video_file |
|
chat_history.append({"role": "user","content": prompt,}) |
|
MODEL="gpt-4o" |
|
print(chat_history) |
|
|
|
try: |
|
if video_file: |
|
|
|
response = client.chat.completions.create(model=MODEL,messages=chat_history,temperature=0,) |
|
else: |
|
|
|
response = client.chat.completions.create(model=MODEL,messages=chat_history,temperature=0,) |
|
|
|
|
|
assistant_message = response.choices[0].message.content |
|
chat_history.append({'role': 'system', 'content': assistant_message}) |
|
print(assistant_message) |
|
except Exception as e: |
|
print("Error: ",e) |
|
assistant_message = "API rate limit has been reached. Please wait a moment and try again." |
|
chat_history.append({'role': 'system', 'content': assistant_message}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return chat_history |
|
|
|
def user_input(user_message, history): |
|
return "", history + [[user_message, None]] |
|
|
|
def bot_response(history): |
|
user_message = history[-1][0] |
|
updated_history = new_prompt(user_message) |
|
assistant_message = updated_history[-1]['content'] |
|
history[-1][1] = assistant_message |
|
yield history |
|
|
|
|
|
''' |
|
Behaivor box |
|
''' |
|
initial_behaviors = [ |
|
("Initiating Behavioral Request (IBR)", |
|
("The child's skill in using behavior(s) to elicit aid in obtaining an object, or object related event", |
|
["00:10", "00:45", "01:30"])), |
|
|
|
("Initiating Joint Attention (IJA)", |
|
("The child's skill in using behavior(s) to initiate shared attention to objects or events.", |
|
["00:15", "00:50", "01:40"])), |
|
|
|
("Responding to Joint Attention (RJA)", |
|
("The child's skill in following the examiner’s line of regard and pointing gestures.", |
|
["00:20", "01:00", "02:00"])), |
|
|
|
("Initiating Social Interaction (ISI)", |
|
("The child's skill at initiating turn-taking sequences and the tendency to tease the tester", |
|
["00:20", "00:50", "02:00"])), |
|
|
|
("Responding to Social Interaction (RSI)", |
|
("The child’s skill in responding to turn-taking interactions initiated by the examiner.", |
|
["00:20", "01:00", "02:00"])) |
|
] |
|
|
|
behaviors = initial_behaviors |
|
behavior_bank = [] |
|
|
|
def add_or_update_behavior(name, definition, timestamps, selected_behavior): |
|
global behaviors, behavior_bank |
|
if selected_behavior: |
|
for i, (old_name, _) in enumerate(behaviors): |
|
if old_name == selected_behavior: |
|
behaviors[i] = (name, (definition, timestamps)) |
|
break |
|
|
|
behavior_bank = [name if b == selected_behavior else b for b in behavior_bank] |
|
else: |
|
new_behavior = (name, (definition, timestamps)) |
|
behaviors.append(new_behavior) |
|
choices = [b[0] for b in behaviors] |
|
return gr.Dropdown(choices=choices, value=None, interactive=True), gr.CheckboxGroup(choices=behavior_bank, value=behavior_bank, interactive=True), "", "", "" |
|
|
|
def add_to_behaivor_bank(selected_behavior, checkbox_group_values): |
|
global behavior_bank |
|
if selected_behavior and selected_behavior not in checkbox_group_values: |
|
checkbox_group_values.append(selected_behavior) |
|
behavior_bank = checkbox_group_values |
|
return gr.CheckboxGroup(choices=checkbox_group_values, value=checkbox_group_values, interactive=True), gr.Dropdown(value=None,interactive=True) |
|
|
|
def delete_behavior(selected_behavior, checkbox_group_values): |
|
global behaviors, behavior_bank |
|
behaviors = [b for b in behaviors if b[0] != selected_behavior] |
|
behavior_bank = [b for b in behavior_bank if b != selected_behavior] |
|
updated_choices = [b[0] for b in behaviors] |
|
updated_checkbox_group = [cb for cb in checkbox_group_values if cb != selected_behavior] |
|
return gr.Dropdown(choices=updated_choices, value=None, interactive=True), gr.CheckboxGroup(choices=updated_checkbox_group, value=updated_checkbox_group, interactive=True) |
|
|
|
def edit_behavior(selected_behavior): |
|
for name, (definition, timestamps) in behaviors: |
|
if name == selected_behavior: |
|
|
|
return name, definition, timestamps |
|
return "", "", "" |
|
|
|
|
|
welcome_message = """ |
|
Hello! I'm your AI assistant. |
|
I can help you analyze your video sessions following your instructions. |
|
To get started, please upload a video or add your behaviors to the Behavior Bank using the Behavior Manager. |
|
""" |
|
|
|
|
|
css=""" |
|
body { |
|
background-color: #edf1fa; /* offwhite */ |
|
} |
|
.gradio-container { |
|
background-color: #edf1fa; /* offwhite */ |
|
} |
|
.column-form .wrap { |
|
flex-direction: column; |
|
} |
|
.sidebar { |
|
background: #ffffff; |
|
padding: 10px; |
|
border-right: 1px solid #dee2e6; |
|
} |
|
.content { |
|
padding: 10px; |
|
} |
|
""" |
|
|
|
''' |
|
Gradio Demo |
|
''' |
|
with gr.Blocks(theme='base', css=css, title="Soap.AI") as demo: |
|
gr.Markdown("# 🤖 AI-Supported SOAP Generation") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
video = gr.Video(label="Video", visible=True, height=360, container=True) |
|
with gr.Row(): |
|
with gr.Column(min_width=1, scale=1): |
|
video_upload_button = gr.Button("Analyze Video", variant="primary") |
|
with gr.Column(min_width=1, scale=1): |
|
example_video_button = gr.Button("Load Example Video") |
|
|
|
video_upload_button.click(handle_video, inputs=video, outputs=video) |
|
example_video_button.click(handle_video, None, outputs=video) |
|
|
|
with gr.Column(): |
|
chat_section = gr.Group(visible=True) |
|
with chat_section: |
|
chatbot = gr.Chatbot(elem_id="chatbot", |
|
container=True, |
|
likeable=True, |
|
value=[[None, welcome_message]], |
|
avatar_images=(None, "./avatar.webp")) |
|
with gr.Row(): |
|
txt = gr.Textbox(show_label=False, placeholder="Type here!") |
|
with gr.Row(): |
|
send_btn = gr.Button("Send Message", elem_id="send-btn", variant="primary") |
|
clear_btn = gr.Button("Clear Chat", elem_id="clear-btn") |
|
|
|
with gr.Row(): |
|
behaivor_bank = gr.CheckboxGroup(label="Behavior Bank", |
|
choices=[], |
|
interactive=True, |
|
info="A space to store all the behaviors you want to analyze.") |
|
open_sidebar_btn = gr.Button("Show Behavior Manager", scale=0) |
|
close_sidebar_btn = gr.Button("Hide Behavior Manager", visible=False, scale=0) |
|
|
|
txt.submit(user_input, [txt, chatbot], [txt, chatbot], queue=False).then( |
|
bot_response, chatbot, chatbot) |
|
send_btn.click(user_input, [txt, chatbot], [txt, chatbot], queue=False).then( |
|
bot_response, chatbot, chatbot) |
|
clear_btn.click(lambda: None, None, chatbot, queue=False) |
|
|
|
|
|
with gr.Column(visible=False, min_width=200, scale=0.5, elem_classes="sidebar") as sidebar: |
|
behavior_dropdown = gr.Dropdown(label="Behavior Collection", |
|
choices=behaviors, |
|
interactive=True, |
|
container=True, |
|
elem_classes="column-form", |
|
info="Choose a behavior to add to the bank, edit or remove.") |
|
with gr.Row(): |
|
add_toBank_button = gr.Button("Add Behavior to Bank", variant="primary") |
|
edit_button = gr.Button("Edit Behavior") |
|
delete_button = gr.Button("Remove Behavior") |
|
|
|
with gr.Row(): |
|
name_input = gr.Textbox(label="Behavior Name", |
|
placeholder="(e.g., IBR)", |
|
info="The name you give to the specific behavior you're tracking or analyzing.") |
|
timestamps_input = gr.Textbox(label="Timestamps MM:SS", |
|
placeholder="(e.g., (01:15,01:35) )", |
|
info="The exact times during a session when you saw the behavior. The first two digits represent minutes and the last two digits represent seconds.") |
|
definition_input = gr.Textbox(lines=3, |
|
label="Behavior Definition", |
|
placeholder="(e.g., the child's skill in using behavior(s) to elicit aid in obtaining an object, or object related event)", |
|
info="Provide a clear definition of the behavior.") |
|
|
|
with gr.Row(): |
|
submit_button = gr.Button("Save Behavior", variant="primary") |
|
|
|
submit_button.click(fn=add_or_update_behavior, |
|
inputs=[name_input, definition_input, timestamps_input, behavior_dropdown], |
|
outputs=[behavior_dropdown, behaivor_bank, name_input, definition_input, timestamps_input]) |
|
|
|
add_toBank_button.click(fn=add_to_behaivor_bank, |
|
inputs=[behavior_dropdown, behaivor_bank], |
|
outputs=[behaivor_bank, behavior_dropdown]) |
|
|
|
delete_button.click(fn=delete_behavior, |
|
inputs=[behavior_dropdown, behaivor_bank], |
|
outputs=[behavior_dropdown, behaivor_bank]) |
|
|
|
edit_button.click(fn=edit_behavior, |
|
inputs=[behavior_dropdown], |
|
outputs=[name_input, definition_input, timestamps_input]) |
|
|
|
|
|
open_sidebar_btn.click(lambda: { |
|
open_sidebar_btn: gr.Button(visible=False), |
|
close_sidebar_btn: gr.Button(visible=True), |
|
sidebar: gr.Column(visible=True) |
|
}, outputs=[open_sidebar_btn, close_sidebar_btn, sidebar]) |
|
|
|
|
|
close_sidebar_btn.click(lambda: { |
|
open_sidebar_btn: gr.Button(visible=True), |
|
close_sidebar_btn: gr.Button(visible=False), |
|
sidebar: gr.Column(visible=False) |
|
}, outputs=[open_sidebar_btn, close_sidebar_btn, sidebar]) |
|
|
|
|
|
demo.launch(share=True) |