|
import matplotlib.pyplot as plt |
|
from mpl_toolkits.mplot3d import Axes3D |
|
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas |
|
import matplotlib.colors as mcolors |
|
from matplotlib.colors import LinearSegmentedColormap |
|
import seaborn as sns |
|
import numpy as np |
|
import pandas as pd |
|
import cv2 |
|
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeVideoClip, ImageClip, VideoClip, concatenate_videoclips |
|
from moviepy.video.fx.all import resize |
|
from PIL import Image, ImageDraw, ImageFont |
|
from matplotlib.patches import Rectangle |
|
from utils import seconds_to_timecode |
|
from anomaly_detection import determine_anomalies |
|
from scipy import interpolate |
|
import gradio as gr |
|
import os |
|
|
|
def plot_mse(df, mse_values, title, color='navy', time_threshold=3, anomaly_threshold=4): |
|
plt.figure(figsize=(16, 8), dpi=300) |
|
fig, ax = plt.subplots(figsize=(16, 8)) |
|
|
|
if 'Seconds' not in df.columns: |
|
df['Seconds'] = df['Timecode'].apply( |
|
lambda x: sum(float(t) * 60 ** i for i, t in enumerate(reversed(x.split(':'))))) |
|
|
|
|
|
min_length = min(len(df), len(mse_values)) |
|
df = df.iloc[:min_length].copy() |
|
mse_values = mse_values[:min_length] |
|
|
|
|
|
valid_mask = ~np.isnan(mse_values) |
|
df = df[valid_mask] |
|
mse_values = mse_values[valid_mask] |
|
|
|
|
|
def get_continuous_segments(seconds, values, max_gap=1): |
|
segments = [] |
|
current_segment = [] |
|
for i, (sec, val) in enumerate(zip(seconds, values)): |
|
if not current_segment or (sec - current_segment[-1][0] <= max_gap): |
|
current_segment.append((sec, val)) |
|
else: |
|
segments.append(current_segment) |
|
current_segment = [(sec, val)] |
|
if current_segment: |
|
segments.append(current_segment) |
|
return segments |
|
|
|
|
|
segments = get_continuous_segments(df['Seconds'], mse_values) |
|
|
|
|
|
for segment in segments: |
|
segment_seconds, segment_mse = zip(*segment) |
|
ax.scatter(segment_seconds, segment_mse, color=color, alpha=0.3, s=5) |
|
|
|
|
|
if len(segment) > 1: |
|
segment_df = pd.DataFrame({'Seconds': segment_seconds, 'MSE': segment_mse}) |
|
segment_df = segment_df.sort_values('Seconds') |
|
mean = segment_df['MSE'].rolling(window=min(10, len(segment)), min_periods=1, center=True).mean() |
|
std = segment_df['MSE'].rolling(window=min(10, len(segment)), min_periods=1, center=True).std() |
|
|
|
ax.plot(segment_df['Seconds'], mean, color=color, linewidth=0.5) |
|
ax.fill_between(segment_df['Seconds'], mean - std, mean + std, color=color, alpha=0.1) |
|
|
|
|
|
median = np.median(mse_values) |
|
ax.axhline(y=median, color='black', linestyle='--', label='Median Baseline') |
|
|
|
threshold = np.mean(mse_values) + anomaly_threshold * np.std(mse_values) |
|
ax.axhline(y=threshold, color='red', linestyle='--', label=f'Anomaly Threshold') |
|
ax.text(ax.get_xlim()[1], threshold, f'Anomaly Threshold', verticalalignment='center', horizontalalignment='left', color='red') |
|
|
|
anomalies = determine_anomalies(mse_values, anomaly_threshold) |
|
anomaly_frames = df['Frame'].iloc[anomalies].tolist() |
|
|
|
ax.scatter(df['Seconds'].iloc[anomalies], mse_values[anomalies], color='red', s=20, zorder=5) |
|
|
|
anomaly_data = list(zip(df['Timecode'].iloc[anomalies], |
|
df['Seconds'].iloc[anomalies], |
|
mse_values[anomalies])) |
|
anomaly_data.sort(key=lambda x: x[1]) |
|
|
|
grouped_anomalies = [] |
|
current_group = [] |
|
for timecode, sec, mse in anomaly_data: |
|
if not current_group or sec - current_group[-1][1] <= time_threshold: |
|
current_group.append((timecode, sec, mse)) |
|
else: |
|
grouped_anomalies.append(current_group) |
|
current_group = [(timecode, sec, mse)] |
|
if current_group: |
|
grouped_anomalies.append(current_group) |
|
|
|
for group in grouped_anomalies: |
|
start_sec = group[0][1] |
|
end_sec = group[-1][1] |
|
rect = Rectangle((start_sec, ax.get_ylim()[0]), end_sec - start_sec, ax.get_ylim()[1] - ax.get_ylim()[0], |
|
facecolor='red', alpha=0.2, zorder=1) |
|
ax.add_patch(rect) |
|
|
|
for group in grouped_anomalies: |
|
highest_mse_anomaly = max(group, key=lambda x: x[2]) |
|
timecode, sec, mse = highest_mse_anomaly |
|
ax.annotate(timecode, (sec, mse), textcoords="offset points", xytext=(0, 10), |
|
ha='center', fontsize=6, color='red') |
|
|
|
max_seconds = df['Seconds'].max() |
|
num_ticks = 100 |
|
tick_locations = np.linspace(0, max_seconds, num_ticks) |
|
tick_labels = [seconds_to_timecode(int(s)) for s in tick_locations] |
|
|
|
ax.set_xticks(tick_locations) |
|
ax.set_xticklabels(tick_labels, rotation=90, ha='center', fontsize=6) |
|
|
|
ax.set_xlabel('Timecode') |
|
ax.set_ylabel('Mean Squared Error') |
|
ax.set_title(title) |
|
|
|
ax.grid(True, linestyle='--', alpha=0.7) |
|
ax.legend() |
|
plt.tight_layout() |
|
plt.close() |
|
return fig, anomaly_frames |
|
|
|
def plot_combined_mse(df, mse_embeddings, mse_posture, mse_voice, title, anomaly_threshold=4, time_threshold=3): |
|
plt.figure(figsize=(16, 8), dpi=300) |
|
fig, ax = plt.subplots(figsize=(16, 8)) |
|
|
|
if 'Seconds' not in df.columns: |
|
df['Seconds'] = df['Timecode'].apply( |
|
lambda x: sum(float(t) * 60 ** i for i, t in enumerate(reversed(x.split(':'))))) |
|
|
|
|
|
def get_continuous_segments(seconds, values, max_gap=1): |
|
segments = [] |
|
current_segment = [] |
|
for i, (sec, val) in enumerate(zip(seconds, values)): |
|
if not current_segment or (sec - current_segment[-1][0] <= max_gap): |
|
current_segment.append((sec, val)) |
|
else: |
|
segments.append(current_segment) |
|
current_segment = [(sec, val)] |
|
if current_segment: |
|
segments.append(current_segment) |
|
return segments |
|
|
|
|
|
def scale_mse(mse_values): |
|
min_val = np.min(mse_values) |
|
max_val = np.max(mse_values) |
|
return (mse_values - min_val) / (max_val - min_val) |
|
|
|
mse_embeddings_scaled = scale_mse(mse_embeddings) |
|
mse_posture_scaled = scale_mse(mse_posture) |
|
mse_voice_scaled = scale_mse(mse_voice) |
|
|
|
|
|
for mse_values, color, label in zip([mse_embeddings_scaled, mse_posture_scaled, mse_voice_scaled], |
|
['navy', 'purple', 'green'], |
|
['Facial Features', 'Body Posture', 'Voice']): |
|
segments = get_continuous_segments(df['Seconds'], mse_values) |
|
|
|
for segment in segments: |
|
segment_seconds, segment_mse = zip(*segment) |
|
ax.scatter(segment_seconds, segment_mse, color=color, alpha=0.3, s=5, label=label if segment == segments[0] else "") |
|
|
|
if len(segment) > 1: |
|
segment_df = pd.DataFrame({'Seconds': segment_seconds, 'MSE': segment_mse}) |
|
segment_df = segment_df.sort_values('Seconds') |
|
mean = segment_df['MSE'].rolling(window=min(10, len(segment)), min_periods=1, center=True).mean() |
|
std = segment_df['MSE'].rolling(window=min(10, len(segment)), min_periods=1, center=True).std() |
|
|
|
ax.plot(segment_df['Seconds'], mean, color=color, linewidth=0.5) |
|
ax.fill_between(segment_df['Seconds'], mean - std, mean + std, color=color, alpha=0.1) |
|
|
|
|
|
median = np.median(mse_values) |
|
ax.axhline(y=median, color=color, linestyle=':', alpha=0.5, label=f'{label} Baseline Median') |
|
|
|
|
|
threshold = np.mean(mse_values) + anomaly_threshold * np.std(mse_values) |
|
ax.axhline(y=threshold, color=color, linestyle='--', alpha=1, label=f'{label} Anomaly Threshold') |
|
|
|
|
|
anomalies = mse_values > threshold |
|
ax.scatter(df['Seconds'][anomalies], mse_values[anomalies], color='red', s=20, zorder=5) |
|
|
|
max_seconds = df['Seconds'].max() |
|
num_ticks = 100 |
|
tick_locations = np.linspace(0, max_seconds, num_ticks) |
|
tick_labels = [seconds_to_timecode(int(s)) for s in tick_locations] |
|
|
|
ax.set_xticks(tick_locations) |
|
ax.set_xticklabels(tick_labels, rotation=90, ha='center', fontsize=6) |
|
|
|
ax.set_xlabel('Timecode') |
|
ax.set_ylabel('Scaled Mean Squared Error') |
|
ax.set_title(title) |
|
|
|
ax.grid(True, linestyle='--', alpha=0.7) |
|
ax.legend() |
|
plt.tight_layout() |
|
plt.close() |
|
return fig |
|
|
|
def plot_mse_histogram(mse_values, title, anomaly_threshold, color='blue'): |
|
plt.figure(figsize=(16, 3), dpi=300) |
|
fig, ax = plt.subplots(figsize=(16, 3)) |
|
|
|
ax.hist(mse_values, bins=100, edgecolor='black', color=color, alpha=0.7) |
|
ax.set_xlabel('Mean Squared Error') |
|
ax.set_ylabel('Number of Frames') |
|
ax.set_title(title) |
|
|
|
mean = np.mean(mse_values) |
|
std = np.std(mse_values) |
|
threshold = mean + anomaly_threshold * std |
|
|
|
ax.axvline(x=threshold, color='red', linestyle='--', linewidth=2) |
|
|
|
plt.tight_layout() |
|
plt.close() |
|
return fig |
|
|
|
|
|
def plot_mse_heatmap(mse_values, title, df): |
|
plt.figure(figsize=(20, 3), dpi=300) |
|
fig, ax = plt.subplots(figsize=(20, 3)) |
|
|
|
|
|
mse_2d = mse_values.reshape(1, -1) |
|
|
|
|
|
sns.heatmap(mse_2d, cmap='YlOrRd', cbar=False, ax=ax) |
|
|
|
|
|
num_ticks = min(60, len(mse_values)) |
|
tick_locations = np.linspace(0, len(mse_values) - 1, num_ticks).astype(int) |
|
|
|
|
|
tick_locations = tick_locations[tick_locations < len(df)] |
|
|
|
tick_labels = [df['Timecode'].iloc[i] if i < len(df) else '' for i in tick_locations] |
|
|
|
ax.set_xticks(tick_locations) |
|
ax.set_xticklabels(tick_labels, rotation=90, ha='center', va='top') |
|
|
|
ax.set_title(title) |
|
|
|
|
|
ax.set_yticks([]) |
|
|
|
plt.tight_layout() |
|
plt.close() |
|
return fig |
|
|
|
def plot_posture(df, posture_scores, color='blue', anomaly_threshold=3): |
|
plt.figure(figsize=(16, 8), dpi=300) |
|
fig, ax = plt.subplots(figsize=(16, 8)) |
|
|
|
df['Seconds'] = df['Timecode'].apply( |
|
lambda x: sum(float(t) * 60 ** i for i, t in enumerate(reversed(x.split(':'))))) |
|
|
|
posture_data = [(frame, score) for frame, score in posture_scores.items() if score is not None] |
|
posture_frames, posture_scores = zip(*posture_data) |
|
|
|
|
|
posture_df = pd.DataFrame({'Frame': posture_frames, 'Score': posture_scores}) |
|
|
|
|
|
posture_df = posture_df.merge(df[['Frame', 'Seconds']], on='Frame', how='inner') |
|
|
|
ax.scatter(posture_df['Seconds'], posture_df['Score'], color=color, alpha=0.3, s=5) |
|
mean = posture_df['Score'].rolling(window=10).mean() |
|
ax.plot(posture_df['Seconds'], mean, color=color, linewidth=0.5) |
|
|
|
ax.set_xlabel('Timecode') |
|
ax.set_ylabel('Posture Score') |
|
ax.set_title("Body Posture Over Time") |
|
|
|
ax.grid(True, linestyle='--', alpha=0.7) |
|
|
|
max_seconds = df['Seconds'].max() |
|
num_ticks = 80 |
|
tick_locations = np.linspace(0, max_seconds, num_ticks) |
|
tick_labels = [seconds_to_timecode(int(s)) for s in tick_locations] |
|
|
|
ax.set_xticks(tick_locations) |
|
ax.set_xticklabels(tick_labels, rotation=90, ha='center', fontsize=6) |
|
|
|
plt.tight_layout() |
|
plt.close() |
|
return fig |
|
|
|
def create_heatmap(t, mse_embeddings, mse_posture, mse_voice, video_fps, total_frames, video_width): |
|
frame_count = int(t * video_fps) |
|
|
|
|
|
mse_embeddings_norm = (mse_embeddings - np.min(mse_embeddings)) / (np.max(mse_embeddings) - np.min(mse_embeddings)) |
|
mse_posture_norm = (mse_posture - np.min(mse_posture)) / (np.max(mse_posture) - np.min(mse_posture)) |
|
mse_voice_norm = (mse_voice - np.min(mse_voice)) / (np.max(mse_voice) - np.min(mse_voice)) |
|
|
|
combined_mse = np.zeros((3, total_frames)) |
|
combined_mse[0] = mse_embeddings_norm |
|
combined_mse[1] = mse_posture_norm |
|
combined_mse[2] = mse_voice_norm |
|
|
|
fig, ax = plt.subplots(figsize=(video_width / 250, 0.6)) |
|
ax.imshow(combined_mse, aspect='auto', cmap='Reds', vmin=0, vmax=1, extent=[0, total_frames, 0, 3]) |
|
ax.set_yticks([0.5, 1.5, 2.5]) |
|
ax.set_yticklabels(['Voice', 'Posture', 'Face'], fontsize=7) |
|
ax.set_xticks([]) |
|
|
|
ax.axvline(x=frame_count, color='black', linewidth=3) |
|
|
|
plt.tight_layout(pad=0.5) |
|
|
|
canvas = FigureCanvas(fig) |
|
canvas.draw() |
|
heatmap_img = np.frombuffer(canvas.tostring_rgb(), dtype='uint8') |
|
heatmap_img = heatmap_img.reshape(canvas.get_width_height()[::-1] + (3,)) |
|
plt.close(fig) |
|
return heatmap_img |
|
|
|
def create_video_with_heatmap(video_path, df, mse_embeddings, mse_posture, mse_voice, output_folder, desired_fps, largest_cluster): |
|
print(f"Creating heatmap video. Output folder: {output_folder}") |
|
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
output_filename = os.path.basename(video_path).rsplit('.', 1)[0] + '_heatmap.mp4' |
|
heatmap_video_path = os.path.join(output_folder, output_filename) |
|
|
|
print(f"Heatmap video will be saved at: {heatmap_video_path}") |
|
|
|
|
|
video = VideoFileClip(video_path) |
|
|
|
|
|
width, height = video.w, video.h |
|
total_frames = int(video.duration * video.fps) |
|
|
|
|
|
mse_embeddings = np.interp(np.linspace(0, len(mse_embeddings) - 1, total_frames), |
|
np.arange(len(mse_embeddings)), mse_embeddings) |
|
mse_posture = np.interp(np.linspace(0, len(mse_posture) - 1, total_frames), |
|
np.arange(len(mse_posture)), mse_posture) |
|
mse_voice = np.interp(np.linspace(0, len(mse_voice) - 1, total_frames), |
|
np.arange(len(mse_voice)), mse_voice) |
|
|
|
def combine_video_and_heatmap(t): |
|
video_frame = video.get_frame(t) |
|
heatmap_frame = create_heatmap(t, mse_embeddings, mse_posture, mse_voice, video.fps, total_frames, width) |
|
heatmap_frame_resized = cv2.resize(heatmap_frame, (width, heatmap_frame.shape[0])) |
|
combined_frame = np.vstack((video_frame, heatmap_frame_resized)) |
|
return combined_frame |
|
|
|
final_clip = VideoClip(combine_video_and_heatmap, duration=video.duration) |
|
final_clip = final_clip.set_audio(video.audio) |
|
|
|
|
|
final_clip.write_videofile(heatmap_video_path, codec='libx264', audio_codec='aac', fps=video.fps) |
|
|
|
|
|
video.close() |
|
final_clip.close() |
|
|
|
if os.path.exists(heatmap_video_path): |
|
print(f"Heatmap video created at: {heatmap_video_path}") |
|
print(f"Heatmap video size: {os.path.getsize(heatmap_video_path)} bytes") |
|
return heatmap_video_path |
|
else: |
|
print(f"Failed to create heatmap video at: {heatmap_video_path}") |
|
return None |
|
|
|
|
|
|
|
def plot_correlation_heatmap(mse_embeddings, mse_posture, mse_voice): |
|
data = np.vstack((mse_embeddings, mse_posture, mse_voice)).T |
|
df = pd.DataFrame(data, columns=["Facial Features", "Body Posture", "Voice"]) |
|
corr = df.corr() |
|
|
|
plt.figure(figsize=(10, 8), dpi=300) |
|
|
|
heatmap = sns.heatmap(corr, annot=True, cmap='coolwarm', vmin=-1, vmax=1) |
|
plt.title('Correlation Heatmap of MSEs') |
|
plt.tight_layout() |
|
return plt.gcf() |
|
|