reab5555's picture
Update video_processing.py
0c437f5 verified
raw
history blame
17.1 kB
import os
import cv2
import numpy as np
from moviepy.editor import VideoFileClip
import tempfile
import time
from PIL import Image, ImageDraw, ImageFont
import math
from face_analysis import get_face_embedding, cluster_faces, organize_faces_by_person, draw_facial_landmarks
from pose_analysis import pose, calculate_posture_score, draw_pose_landmarks
from anomaly_detection import anomaly_detection
from visualization import plot_mse, plot_mse_histogram, plot_mse_heatmap
from utils import frame_to_timecode
import pandas as pd
from facenet_pytorch import MTCNN
import torch
import mediapipe as mp
from voice_analysis import process_audio
from pydub import AudioSegment
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
mtcnn = MTCNN(keep_all=False, device=device, thresholds=[0.9, 0.9, 0.9], min_face_size=50)
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, min_detection_confidence=0.8)
def extract_frames(video_path, output_folder, desired_fps, progress_callback=None):
os.makedirs(output_folder, exist_ok=True)
clip = VideoFileClip(video_path)
original_fps = clip.fps
duration = clip.duration
total_frames = int(duration * original_fps)
step = max(1, original_fps / desired_fps)
total_frames_to_extract = int(total_frames / step)
frame_count = 0
for t in np.arange(0, duration, step / original_fps):
frame = clip.get_frame(t)
cv2.imwrite(os.path.join(output_folder, f"frame_{frame_count:04d}.jpg"), cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
frame_count += 1
if progress_callback:
progress = min(100, (frame_count / total_frames_to_extract) * 100)
progress_callback(progress, f"Extracting frame")
if frame_count >= total_frames_to_extract:
break
clip.close()
return frame_count, original_fps
def process_frames(frames_folder, aligned_faces_folder, frame_count, progress):
embeddings_by_frame = {}
posture_scores_by_frame = {}
posture_landmarks_by_frame = {}
facial_landmarks_by_frame = {}
aligned_face_paths = []
frame_files = sorted([f for f in os.listdir(frames_folder) if f.endswith('.jpg')])
for i, frame_file in enumerate(frame_files):
frame_num = int(frame_file.split('_')[1].split('.')[0])
frame_path = os.path.join(frames_folder, frame_file)
frame = cv2.imread(frame_path)
if frame is not None:
posture_score, posture_landmarks = calculate_posture_score(frame)
posture_scores_by_frame[frame_num] = posture_score
posture_landmarks_by_frame[frame_num] = posture_landmarks
boxes, probs = mtcnn.detect(frame)
if boxes is not None and len(boxes) > 0 and probs[0] >= 0.99:
x1, y1, x2, y2 = [int(b) for b in boxes[0]]
face = frame[y1:y2, x1:x2]
if face.size > 0:
face_rgb = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
results = face_mesh.process(face_rgb)
if results.multi_face_landmarks:
facial_landmarks_by_frame[frame_num] = results.multi_face_landmarks[0]
if is_frontal_face(results.multi_face_landmarks[0].landmark):
aligned_face = face
if aligned_face is not None:
aligned_face_resized = cv2.resize(aligned_face, (160, 160))
output_path = os.path.join(aligned_faces_folder, f"frame_{frame_num}_face.jpg")
cv2.imwrite(output_path, aligned_face_resized)
aligned_face_paths.append(output_path)
embedding = get_face_embedding(aligned_face_resized)
embeddings_by_frame[frame_num] = embedding
progress((i + 1) / len(frame_files), f"Processing frame {i + 1} of {len(frame_files)}")
return embeddings_by_frame, posture_scores_by_frame, posture_landmarks_by_frame, aligned_face_paths, facial_landmarks_by_frame
def process_video(video_path, anomaly_threshold, desired_fps, progress=None):
start_time = time.time()
output_folder = "output"
os.makedirs(output_folder, exist_ok=True)
GRAPH_COLORS = {
'facial_embeddings': 'navy',
'body_posture': 'purple'
}
with tempfile.TemporaryDirectory() as temp_dir:
aligned_faces_folder = os.path.join(temp_dir, 'aligned_faces')
organized_faces_folder = os.path.join(temp_dir, 'organized_faces')
os.makedirs(aligned_faces_folder, exist_ok=True)
os.makedirs(organized_faces_folder, exist_ok=True)
clip = VideoFileClip(video_path)
video_duration = clip.duration
clip.close()
progress(0, "Starting frame extraction")
frames_folder = os.path.join(temp_dir, 'extracted_frames')
def extraction_progress(percent, message):
progress(percent / 100, f"Extracting frames")
frame_count, original_fps = extract_frames(video_path, frames_folder, desired_fps, extraction_progress)
progress(1, "Frame extraction complete")
progress(0.3, "Processing frames")
embeddings_by_frame, posture_scores_by_frame, posture_landmarks_by_frame, aligned_face_paths, facial_landmarks_by_frame = process_frames(
frames_folder, aligned_faces_folder,
frame_count,
progress)
if not aligned_face_paths:
raise ValueError("No faces were extracted from the video.")
progress(0.6, "Clustering faces")
embeddings = [embedding for _, embedding in embeddings_by_frame.items()]
clusters = cluster_faces(embeddings)
num_clusters = len(set(clusters))
progress(0.7, "Organizing faces")
organize_faces_by_person(embeddings_by_frame, clusters, aligned_faces_folder, organized_faces_folder)
progress(0.8, "Saving person data")
df, largest_cluster = save_person_data_to_csv(embeddings_by_frame, clusters, desired_fps,
original_fps, temp_dir, video_duration)
df['Seconds'] = df['Timecode'].apply(
lambda x: sum(float(t) * 60 ** i for i, t in enumerate(reversed(x.split(':')))))
progress(0.85, "Getting face samples")
face_samples = get_all_face_samples(organized_faces_folder, output_folder, largest_cluster)
progress(0.9, "Performing anomaly detection")
embedding_columns = [col for col in df.columns if col.startswith('Raw_Embedding_')]
X_embeddings = df[embedding_columns].values
try:
X_posture = np.array([posture_scores_by_frame.get(frame, None) for frame in df['Frame']])
X_posture = X_posture[X_posture != None].reshape(-1, 1)
if len(X_posture) == 0:
raise ValueError("No valid posture data found")
mse_embeddings, mse_posture = anomaly_detection(X_embeddings, X_posture)
# Extract audio from video
video = AudioSegment.from_file(video_path, "mp4")
audio_path = os.path.join(temp_dir, "audio.wav")
video.export(audio_path, format="wav")
# Process audio
most_frequent_voice, voice_features, voice_clusters = process_audio(audio_path)
if len(voice_features) < 2:
print("Not enough voice segments for processing. Skipping voice analysis.")
raise ValueError("Insufficient voice data")
# Perform anomaly detection on voice
X_voice = np.array(most_frequent_voice)
mse_voice = anomaly_detection(X_voice, X_voice)
progress(0.95, "Generating plots")
# Generate plots for facial features
mse_plot_embeddings, anomaly_frames_embeddings = plot_mse(df, mse_embeddings, "Facial Features",
color=GRAPH_COLORS['facial_embeddings'],
anomaly_threshold=anomaly_threshold)
mse_histogram_embeddings = plot_mse_histogram(mse_embeddings, "MSE Distribution: Facial Features",
anomaly_threshold, color=GRAPH_COLORS['facial_embeddings'])
mse_heatmap_embeddings = plot_mse_heatmap(mse_embeddings, "Facial Features MSE Heatmap", df)
# Generate plots for body posture
mse_plot_posture, anomaly_frames_posture = plot_mse(df, mse_posture, "Body Posture",
color=GRAPH_COLORS['body_posture'],
anomaly_threshold=anomaly_threshold)
mse_histogram_posture = plot_mse_histogram(mse_posture, "MSE Distribution: Body Posture",
anomaly_threshold, color=GRAPH_COLORS['body_posture'])
mse_heatmap_posture = plot_mse_heatmap(mse_posture, "Body Posture MSE Heatmap", df)
# Generate plots for voice
mse_plot_voice, anomaly_segments_voice = plot_mse(df, mse_voice, "Voice",
color='green',
anomaly_threshold=anomaly_threshold)
mse_histogram_voice = plot_mse_histogram(mse_voice, "MSE Distribution: Voice",
anomaly_threshold, color='green')
mse_heatmap_voice = plot_mse_heatmap(mse_voice, "Voice MSE Heatmap", df)
except Exception as e:
print(f"Error details: {str(e)}")
import traceback
traceback.print_exc()
return (f"Error in video processing: {str(e)}",) + (None,) * 20
progress(1.0, "Preparing results")
results = f"Number of persons detected: {num_clusters}\n\n"
results += "Breakdown:\n"
for cluster_id in range(num_clusters):
face_count = len([c for c in clusters if c == cluster_id])
results += f"Person {cluster_id + 1}: {face_count} face frames\n"
end_time = time.time()
execution_time = end_time - start_time
def add_timecode_to_image(image, timecode):
img_pil = Image.fromarray(image)
draw = ImageDraw.Draw(img_pil)
font = ImageFont.load_default()
draw.text((10, 10), timecode, (255, 0, 0), font=font)
return np.array(img_pil)
anomaly_faces_embeddings = []
for frame in anomaly_frames_embeddings:
face_path = os.path.join(aligned_faces_folder, f"frame_{frame}_face.jpg")
if os.path.exists(face_path):
face_img = cv2.imread(face_path)
if face_img is not None:
face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB)
if frame in facial_landmarks_by_frame:
face_img = draw_facial_landmarks(face_img, facial_landmarks_by_frame[frame])
timecode = df[df['Frame'] == frame]['Timecode'].iloc[0]
face_img_with_timecode = add_timecode_to_image(face_img, timecode)
anomaly_faces_embeddings.append(face_img_with_timecode)
anomaly_frames_posture_images = []
for frame in anomaly_frames_posture:
frame_path = os.path.join(frames_folder, f"frame_{frame:04d}.jpg")
if os.path.exists(frame_path):
frame_img = cv2.imread(frame_path)
if frame_img is not None:
frame_img = cv2.cvtColor(frame_img, cv2.COLOR_BGR2RGB)
pose_results = pose.process(frame_img)
if pose_results.pose_landmarks:
frame_img = draw_pose_landmarks(frame_img, pose_results.pose_landmarks)
timecode = df[df['Frame'] == frame]['Timecode'].iloc[0]
frame_img_with_timecode = add_timecode_to_image(frame_img, timecode)
anomaly_frames_posture_images.append(frame_img_with_timecode)
return (
execution_time,
results,
df,
mse_embeddings,
mse_posture,
mse_plot_embeddings,
mse_histogram_embeddings,
mse_plot_posture,
mse_histogram_posture,
mse_heatmap_embeddings,
mse_heatmap_posture,
mse_voice,
mse_plot_voice,
mse_histogram_voice,
mse_heatmap_voice,
anomaly_segments_voice,
face_samples["most_frequent"],
anomaly_faces_embeddings,
anomaly_frames_posture_images,
aligned_faces_folder,
frames_folder
)
def is_frontal_face(landmarks, threshold=60):
nose_tip = landmarks[4]
left_chin = landmarks[234]
right_chin = landmarks[454]
nose_to_left = [left_chin.x - nose_tip.x, left_chin.y - nose_tip.y]
nose_to_right = [right_chin.x - nose_tip.x, right_chin.y - nose_tip.y]
dot_product = nose_to_left[0] * nose_to_right[0] + nose_to_left[1] * nose_to_right[1]
magnitude_left = math.sqrt(nose_to_left[0] ** 2 + nose_to_left[1] ** 2)
magnitude_right = math.sqrt(nose_to_right[0] ** 2 + nose_to_right[1] ** 2)
cos_angle = dot_product / (magnitude_left * magnitude_right)
angle = math.acos(cos_angle)
angle_degrees = math.degrees(angle)
return abs(180 - angle_degrees) < threshold
def save_person_data_to_csv(embeddings_by_frame, clusters, desired_fps, original_fps, output_folder, video_duration):
person_data = {}
for (frame_num, embedding), cluster in zip(embeddings_by_frame.items(), clusters):
if cluster not in person_data:
person_data[cluster] = []
person_data[cluster].append((frame_num, embedding))
largest_cluster = max(person_data, key=lambda k: len(person_data[k]))
data = person_data[largest_cluster]
data.sort(key=lambda x: x[0])
frames, embeddings = zip(*data)
embeddings_array = np.array(embeddings)
np.save(os.path.join(output_folder, 'face_embeddings.npy'), embeddings_array)
total_frames = max(frames)
timecodes = [frame_to_timecode(frame, total_frames, video_duration) for frame in frames]
df_data = {
'Frame': frames,
'Timecode': timecodes,
'Embedding_Index': range(len(embeddings))
}
for i in range(len(embeddings[0])):
df_data[f'Raw_Embedding_{i}'] = [embedding[i] for embedding in embeddings]
df = pd.DataFrame(df_data)
return df, largest_cluster
def get_all_face_samples(organized_faces_folder, output_folder, largest_cluster, max_samples=100):
face_samples = {"most_frequent": [], "others": []}
for cluster_folder in sorted(os.listdir(organized_faces_folder)):
if cluster_folder.startswith("person_"):
person_folder = os.path.join(organized_faces_folder, cluster_folder)
face_files = sorted([f for f in os.listdir(person_folder) if f.endswith('.jpg')])
if face_files:
cluster_id = int(cluster_folder.split('_')[1])
if cluster_id == largest_cluster:
for i, sample in enumerate(face_files[:max_samples]):
face_path = os.path.join(person_folder, sample)
output_path = os.path.join(output_folder, f"face_sample_most_frequent_{i:04d}.jpg")
face_img = cv2.imread(face_path)
if face_img is not None:
small_face = cv2.resize(face_img, (160, 160))
cv2.imwrite(output_path, small_face)
face_samples["most_frequent"].append(output_path)
if len(face_samples["most_frequent"]) >= max_samples:
break
else:
remaining_samples = max_samples - len(face_samples["others"])
if remaining_samples > 0:
for i, sample in enumerate(face_files[:remaining_samples]):
face_path = os.path.join(person_folder, sample)
output_path = os.path.join(output_folder, f"face_sample_other_{cluster_id:02d}_{i:04d}.jpg")
face_img = cv2.imread(face_path)
if face_img is not None:
small_face = cv2.resize(face_img, (160, 160))
cv2.imwrite(output_path, small_face)
face_samples["others"].append(output_path)
if len(face_samples["others"]) >= max_samples:
break
return face_samples