File size: 7,715 Bytes
bafab47 064088f bafab47 064088f bafab47 18d311d afe7a1d 18d311d afe7a1d 18d311d afe7a1d bafab47 3e6c98a 18d311d bafab47 95703f6 18d311d 95703f6 162ac6d 064088f 2ae7b80 162ac6d 93212ca 162ac6d 064088f 99345db 18d311d 064088f 95703f6 3e6c98a 5de1bb7 3e6c98a 5de1bb7 3e6c98a 5de1bb7 3e6c98a 5de1bb7 3e6c98a 95703f6 5de1bb7 064088f c500bb3 162ac6d 18d311d 95703f6 18d311d 95703f6 064088f 162ac6d 064088f aeb4947 162ac6d 064088f 162ac6d 064088f 18d311d 162ac6d 064088f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import gradio as gr
import time
from video_processing import process_video
from PIL import Image
import matplotlib
matplotlib.rcParams['figure.dpi'] = 300
matplotlib.rcParams['savefig.dpi'] = 300
def process_and_show_completion(video_input_path, anomaly_threshold_input, fps, progress=gr.Progress()):
start_time = time.time()
try:
print("Starting video processing...")
results = process_video(video_input_path, anomaly_threshold_input, fps, progress=progress)
print("Video processing completed.")
if isinstance(results[0], str) and results[0].startswith("Error"):
print(f"Error occurred: {results[0]}")
return [results[0]] + [None] * 27
exec_time, results_summary, df, mse_embeddings, mse_posture, mse_voice, \
mse_plot_embeddings, mse_plot_posture, mse_plot_voice, \
mse_histogram_embeddings, mse_histogram_posture, mse_histogram_voice, \
mse_heatmap_embeddings, mse_heatmap_posture, mse_heatmap_voice, \
face_samples_frequent, \
anomaly_faces_embeddings, anomaly_frames_posture_images, \
aligned_faces_folder, frames_folder, \
heatmap_video_path, combined_mse_plot, correlation_heatmap = results
anomaly_faces_embeddings_pil = [Image.fromarray(face) for face in anomaly_faces_embeddings] if anomaly_faces_embeddings is not None else []
anomaly_frames_posture_pil = [Image.fromarray(frame) for frame in anomaly_frames_posture_images] if anomaly_frames_posture_images is not None else []
face_samples_frequent = [Image.open(path) for path in face_samples_frequent] if face_samples_frequent is not None else []
end_time = time.time()
total_exec_time = end_time - start_time
output = [
total_exec_time, results_summary,
df, mse_embeddings, mse_posture, mse_voice,
mse_plot_embeddings, mse_plot_posture, mse_plot_voice,
mse_histogram_embeddings, mse_histogram_posture, mse_histogram_voice,
mse_heatmap_embeddings, mse_heatmap_posture, mse_heatmap_voice,
anomaly_faces_embeddings_pil, anomaly_frames_posture_pil,
face_samples_frequent,
aligned_faces_folder, frames_folder,
mse_embeddings, mse_posture, mse_voice,
heatmap_video_path, combined_mse_plot, correlation_heatmap
]
return output
except Exception as e:
error_message = f"An error occurred: {str(e)}"
print(error_message)
import traceback
traceback.print_exc()
return [error_message] + [None] * 27
def show_results():
return [gr.Tab.update(visible=True) for _ in range(4)] + [gr.Tab.update(visible=False)]
def hide_description_show_results():
return [gr.Tab.update(visible=False)] + [gr.Tab.update(visible=True) for _ in range(4)], gr.Group.update(visible=True)
def start_execution_timer():
return gr.Number.update(value=0, visible=True), gr.Markdown.update(visible=True)
def update_execution_time():
return gr.Number.update()
with gr.Blocks() as iface:
with gr.Row():
video_input = gr.Video(label="Input Video")
anomaly_threshold = gr.Slider(minimum=1, maximum=5, step=0.1, value=3, label="Anomaly Detection Threshold (Standard deviation)")
fps_slider = gr.Slider(minimum=5, maximum=20, step=1, value=10, label="Frames Per Second (FPS)")
process_btn = gr.Button("Detect Anomalies")
progress_bar = gr.Progress()
execution_time_group = gr.Group(visible=False)
with execution_time_group:
execution_time = gr.Number(label="Execution Time", visible=False)
execution_time_text = gr.Markdown("Execution time will be displayed here", visible=False)
with gr.Tabs() as all_tabs:
description_tab = gr.Tab("Description")
with description_tab:
gr.Markdown("""
# Multimodal Behavioral Anomalies Detection
This tool detects anomalies in facial expressions, body language, and voice over the timeline of a video.
It extracts faces, postures, and voice from video frames, and analyzes them to identify anomalies using time series analysis and a variational autoencoder (VAE) approach.
""")
facial_features_tab = gr.Tab("Facial Features", visible=False)
with facial_features_tab:
results_text = gr.TextArea(label="Faces Breakdown", lines=5)
mse_features_plot = gr.Plot(label="MSE: Facial Features")
mse_features_hist = gr.Plot(label="MSE Distribution: Facial Features")
mse_features_heatmap = gr.Plot(label="MSE Heatmap: Facial Features")
anomaly_frames_features = gr.Gallery(label="Anomaly Frames (Facial Features)", columns=6, rows=2, height="auto")
face_samples_most_frequent = gr.Gallery(label="Most Frequent Person Samples", columns=10, rows=2, height="auto")
body_posture_tab = gr.Tab("Body Posture", visible=False)
with body_posture_tab:
mse_posture_plot = gr.Plot(label="MSE: Body Posture")
mse_posture_hist = gr.Plot(label="MSE Distribution: Body Posture")
mse_posture_heatmap = gr.Plot(label="MSE Heatmap: Body Posture")
anomaly_frames_posture = gr.Gallery(label="Anomaly Frames (Body Posture)", columns=6, rows=2, height="auto")
voice_tab = gr.Tab("Voice", visible=False)
with voice_tab:
mse_voice_plot = gr.Plot(label="MSE: Voice")
mse_voice_hist = gr.Plot(label="MSE Distribution: Voice")
mse_voice_heatmap = gr.Plot(label="MSE Heatmap: Voice")
combined_tab = gr.Tab("Combined", visible=False)
with combined_tab:
heatmap_video = gr.Video(label="Video with Anomaly Heatmap")
combined_mse_plot = gr.Plot(label="Combined MSE Plot")
correlation_heatmap_plot = gr.Plot(label="Correlation Heatmap")
df_store = gr.State()
mse_features_store = gr.State()
mse_posture_store = gr.State()
mse_voice_store = gr.State()
aligned_faces_folder_store = gr.State()
frames_folder_store = gr.State()
mse_heatmap_embeddings_store = gr.State()
mse_heatmap_posture_store = gr.State()
mse_heatmap_voice_store = gr.State()
process_btn.click(
start_execution_timer,
inputs=None,
outputs=[execution_time, execution_time_text]
).then(
hide_description_show_results,
inputs=None,
outputs=[description_tab, facial_features_tab, body_posture_tab, voice_tab, combined_tab, execution_time_group]
).then(
process_and_show_completion,
inputs=[video_input, anomaly_threshold, fps_slider],
outputs=[
execution_time, results_text, df_store,
mse_features_store, mse_posture_store, mse_voice_store,
mse_features_plot, mse_posture_plot, mse_voice_plot,
mse_features_hist, mse_posture_hist, mse_voice_hist,
mse_features_heatmap, mse_posture_heatmap, mse_voice_heatmap,
anomaly_frames_features, anomaly_frames_posture,
face_samples_most_frequent,
aligned_faces_folder_store, frames_folder_store,
mse_heatmap_embeddings_store, mse_heatmap_posture_store, mse_heatmap_voice_store,
heatmap_video, combined_mse_plot, correlation_heatmap_plot
]
).then(
show_results,
inputs=None,
outputs=all_tabs.children
)
execution_time.change(
update_execution_time,
inputs=None,
outputs=execution_time_text,
every=1
)
if __name__ == "__main__":
iface.launch() |