|
import cv2 |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import gradio as gr |
|
from moviepy.editor import * |
|
import os |
|
import torch |
|
import openpifpaf |
|
|
|
|
|
try: |
|
import numpy as np |
|
except ImportError: |
|
os.system('pip install numpy') |
|
import numpy as np |
|
|
|
|
|
predictor = openpifpaf.Predictor(checkpoint='shufflenetv2k16') |
|
|
|
def preprocess(image): |
|
input_size = (192, 256) |
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
image = cv2.resize(image, input_size) |
|
return image |
|
|
|
def total_body_movement(current_poses, prev_poses): |
|
if not current_poses or not prev_poses: |
|
return 0 |
|
total_movement = 0 |
|
for current_pose in current_poses: |
|
for prev_pose in prev_poses: |
|
movement = np.sum(np.sqrt(np.sum((current_pose - prev_pose)**2, axis=1))) |
|
total_movement += movement |
|
return total_movement / (len(current_poses) * len(prev_poses)) |
|
|
|
def process_video(video_path, progress=gr.Progress(), batch_size=64): |
|
if video_path is None: |
|
return None, None, None, None, None, None, "Error: No video uploaded" |
|
|
|
if not os.path.exists(video_path): |
|
return None, None, None, None, None, None, f"Error: Video file not found at {video_path}" |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
return None, None, None, None, None, None, f"Error: Unable to open video file at {video_path}" |
|
|
|
original_fps = int(cap.get(cv2.CAP_PROP_FPS)) |
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
original_duration = frame_count / original_fps |
|
|
|
frame_interval = max(1, round(original_fps / 10)) |
|
|
|
body_movements = [] |
|
time_points = [] |
|
|
|
prev_poses = None |
|
frames = [] |
|
frame_indices = [] |
|
|
|
for frame in progress.tqdm(range(0, frame_count, frame_interval)): |
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame) |
|
ret, img = cap.read() |
|
if not ret: |
|
break |
|
frames.append(img) |
|
frame_indices.append(frame) |
|
|
|
if len(frames) == batch_size: |
|
process_batch(frames, frame_indices, prev_poses, body_movements, time_points, original_fps) |
|
frames = [] |
|
|
|
|
|
if frames: |
|
process_batch(frames, frame_indices, prev_poses, body_movements, time_points, original_fps) |
|
|
|
cap.release() |
|
|
|
fig, ax = plt.subplots(figsize=(10, 6), dpi=500) |
|
ax.plot(time_points, body_movements, "-", linewidth=0.5) |
|
ax.set_xlim(0, original_duration) |
|
ax.set_xlabel("Time") |
|
ax.set_ylabel("Body Movement") |
|
ax.set_title("Body Movement Analysis") |
|
|
|
num_labels = 50 |
|
label_positions = np.linspace(0, original_duration, num_labels) |
|
label_texts = [f"{int(t//60):02d}:{int(t%60):02d}" for t in label_positions] |
|
ax.set_xticks(label_positions) |
|
ax.set_xticklabels(label_texts, rotation=90, ha='right') |
|
plt.tight_layout() |
|
|
|
return fig, ax, time_points, body_movements, video_path, original_duration, None |
|
|
|
def process_batch(frames, frame_indices, prev_poses, body_movements, time_points, original_fps): |
|
batch_preds = predictor.numpy_images(frames) |
|
|
|
for i, (predictions, frame_index) in enumerate(zip(batch_preds, frame_indices)): |
|
pose_coords = [pred.data for pred in predictions] |
|
|
|
if prev_poses is not None: |
|
movement = total_body_movement(pose_coords, prev_poses) |
|
body_movements.append(movement) |
|
else: |
|
body_movements.append(0) |
|
|
|
prev_poses = pose_coords |
|
time_points.append(frame_index / original_fps) |
|
|
|
def update_video(video_path, time): |
|
if video_path is None: |
|
return None |
|
|
|
if not os.path.exists(video_path): |
|
return None |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
return None |
|
|
|
original_fps = int(cap.get(cv2.CAP_PROP_FPS)) |
|
frame_number = int(time * original_fps) |
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
|
ret, img = cap.read() |
|
cap.release() |
|
|
|
if not ret: |
|
return None |
|
|
|
predictions, _, _ = predictor.numpy_image(img) |
|
pose_coords = [pred.data for pred in predictions] |
|
|
|
for coords in pose_coords: |
|
for i in range(len(coords)): |
|
x, y = coords[i] |
|
if x > 0 and y > 0: |
|
cv2.circle(img, (int(x), int(y)), 3, (0, 255, 0), -1) |
|
|
|
for pred in predictions: |
|
skeleton = pred.data[:, :2] |
|
for i, j in pred.skeleton: |
|
if skeleton[i, 0] > 0 and skeleton[i, 1] > 0 and skeleton[j, 0] > 0 and skeleton[j, 1] > 0: |
|
cv2.line(img, (int(skeleton[i, 0]), int(skeleton[i, 1])), (int(skeleton[j, 0]), int(skeleton[j, 1])), (255, 0, 0), 2) |
|
|
|
return img |
|
|
|
def update_graph(fig, ax, time_points, body_movements, current_time, video_duration): |
|
ax.clear() |
|
ax.plot(time_points, body_movements, "-", linewidth=0.5) |
|
ax.axvline(x=current_time, color='r', linestyle='--') |
|
|
|
minutes, seconds = divmod(int(current_time), 60) |
|
timecode = f"{minutes:02d}:{seconds:02d}" |
|
ax.text(current_time, ax.get_ylim()[1], timecode, |
|
verticalalignment='top', horizontalalignment='right', |
|
color='r', fontweight='bold', bbox=dict(facecolor='white', edgecolor='none', alpha=0.7)) |
|
|
|
ax.set_xlabel("Time") |
|
ax.set_ylabel("Body Movement") |
|
ax.set_title("Body Movement Analysis") |
|
|
|
num_labels = 80 |
|
label_positions = np.linspace(0, video_duration, num_labels) |
|
label_texts = [f"{int(t//60):02d}:{int(t%60):02d}" for t in label_positions] |
|
ax.set_xticks(label_positions) |
|
ax.set_xticklabels(label_texts, rotation=90, ha='right') |
|
ax.set_xlim(0, video_duration) |
|
plt.tight_layout() |
|
return fig |
|
|
|
def load_sample_frame(video_path): |
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
return None |
|
ret, frame = cap.read() |
|
cap.release() |
|
if not ret: |
|
return None |
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
return frame_rgb |
|
|
|
def gradio_app(): |
|
with gr.Blocks() as app: |
|
gr.Markdown("# Multi-Person Body Movement Analysis") |
|
|
|
video_input = gr.Video(label="Upload Video") |
|
graph_output = gr.Plot() |
|
time_slider = gr.Slider(label="Time (seconds)", minimum=0, maximum=100, step=0.1) |
|
video_output = gr.Image(label="Body Posture") |
|
|
|
with gr.Row(): |
|
sample_video_frame = gr.Image(value=load_sample_frame("IL_Dancing_Sample.mp4"), label="Sample Video Frame") |
|
use_sample_button = gr.Button("Use Sample Video") |
|
|
|
error_output = gr.Textbox(label="Error Messages", visible=False) |
|
|
|
video_path = gr.State(None) |
|
fig_state = gr.State(None) |
|
ax_state = gr.State(None) |
|
time_points_state = gr.State(None) |
|
body_movements_state = gr.State(None) |
|
video_duration_state = gr.State(None) |
|
|
|
def process_and_update(video): |
|
fig, ax, time_points, body_movements, video_path_value, video_duration, error = process_video(video) |
|
if fig is not None: |
|
time_slider.maximum = video_duration |
|
error_output.visible = False |
|
else: |
|
error_output.visible = True |
|
return fig, video, error, video_path_value, fig, ax, time_points, body_movements, video_duration |
|
|
|
video_input.upload(process_and_update, |
|
inputs=video_input, |
|
outputs=[graph_output, video_output, error_output, video_path, |
|
fig_state, ax_state, time_points_state, body_movements_state, video_duration_state]) |
|
|
|
def update_video_and_graph(video_path_value, current_time, fig, ax, time_points, body_movements, video_duration): |
|
updated_frame = update_video(video_path_value, current_time) |
|
updated_fig = update_graph(fig, ax, time_points, body_movements, current_time, video_duration) |
|
return updated_frame, updated_fig |
|
|
|
time_slider.change(update_video_and_graph, |
|
inputs=[video_path, time_slider, fig_state, ax_state, time_points_state, body_movements_state, video_duration_state], |
|
outputs=[video_output, graph_output]) |
|
|
|
def use_sample_video(): |
|
sample_video_path = "IL_Dancing_Sample.mp4" |
|
return process_and_update(sample_video_path) |
|
|
|
use_sample_button.click(use_sample_video, |
|
inputs=None, |
|
outputs=[graph_output, video_output, error_output, video_path, |
|
fig_state, ax_state, time_points_state, body_movements_state, video_duration_state]) |
|
|
|
return app |
|
|
|
if __name__ == "__main__": |
|
app = gradio_app() |
|
app.launch(share=True) |
|
|