reab5555's picture
Update app.py
3f2cadc verified
raw
history blame
8.83 kB
import cv2
import numpy as np
import matplotlib.pyplot as plt
import gradio as gr
from moviepy.editor import *
import os
import torch
import openpifpaf
# Ensure NumPy is available
try:
import numpy as np
except ImportError:
os.system('pip install numpy')
import numpy as np
# OpenPifPaf configuration
predictor = openpifpaf.Predictor(checkpoint='shufflenetv2k16')
def preprocess(image):
input_size = (192, 256)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = cv2.resize(image, input_size)
return image
def total_body_movement(current_poses, prev_poses):
if not current_poses or not prev_poses:
return 0
total_movement = 0
for current_pose in current_poses:
for prev_pose in prev_poses:
movement = np.sum(np.sqrt(np.sum((current_pose - prev_pose)**2, axis=1)))
total_movement += movement
return total_movement / (len(current_poses) * len(prev_poses))
def process_video(video_path, progress=gr.Progress(), batch_size=64):
if video_path is None:
return None, None, None, None, None, None, "Error: No video uploaded"
if not os.path.exists(video_path):
return None, None, None, None, None, None, f"Error: Video file not found at {video_path}"
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, None, None, None, None, None, f"Error: Unable to open video file at {video_path}"
original_fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
original_duration = frame_count / original_fps
frame_interval = max(1, round(original_fps / 10)) # Process 10 frames per second
body_movements = []
time_points = []
prev_poses = None
frames = []
frame_indices = []
for frame in progress.tqdm(range(0, frame_count, frame_interval)):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame)
ret, img = cap.read()
if not ret:
break
frames.append(img)
frame_indices.append(frame)
if len(frames) == batch_size:
process_batch(frames, frame_indices, prev_poses, body_movements, time_points, original_fps)
frames = []
# Process any remaining frames
if frames:
process_batch(frames, frame_indices, prev_poses, body_movements, time_points, original_fps)
cap.release()
fig, ax = plt.subplots(figsize=(10, 6), dpi=500)
ax.plot(time_points, body_movements, "-", linewidth=0.5)
ax.set_xlim(0, original_duration)
ax.set_xlabel("Time")
ax.set_ylabel("Body Movement")
ax.set_title("Body Movement Analysis")
num_labels = 50
label_positions = np.linspace(0, original_duration, num_labels)
label_texts = [f"{int(t//60):02d}:{int(t%60):02d}" for t in label_positions]
ax.set_xticks(label_positions)
ax.set_xticklabels(label_texts, rotation=90, ha='right')
plt.tight_layout()
return fig, ax, time_points, body_movements, video_path, original_duration, None
def process_batch(frames, frame_indices, prev_poses, body_movements, time_points, original_fps):
batch_preds = predictor.numpy_images(frames)
for i, (predictions, frame_index) in enumerate(zip(batch_preds, frame_indices)):
pose_coords = [pred.data for pred in predictions]
if prev_poses is not None:
movement = total_body_movement(pose_coords, prev_poses)
body_movements.append(movement)
else:
body_movements.append(0)
prev_poses = pose_coords
time_points.append(frame_index / original_fps)
def update_video(video_path, time):
if video_path is None:
return None
if not os.path.exists(video_path):
return None
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None
original_fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_number = int(time * original_fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, img = cap.read()
cap.release()
if not ret:
return None
predictions, _, _ = predictor.numpy_image(img)
pose_coords = [pred.data for pred in predictions]
for coords in pose_coords:
for i in range(len(coords)):
x, y = coords[i]
if x > 0 and y > 0:
cv2.circle(img, (int(x), int(y)), 3, (0, 255, 0), -1)
for pred in predictions:
skeleton = pred.data[:, :2]
for i, j in pred.skeleton:
if skeleton[i, 0] > 0 and skeleton[i, 1] > 0 and skeleton[j, 0] > 0 and skeleton[j, 1] > 0:
cv2.line(img, (int(skeleton[i, 0]), int(skeleton[i, 1])), (int(skeleton[j, 0]), int(skeleton[j, 1])), (255, 0, 0), 2)
return img
def update_graph(fig, ax, time_points, body_movements, current_time, video_duration):
ax.clear()
ax.plot(time_points, body_movements, "-", linewidth=0.5)
ax.axvline(x=current_time, color='r', linestyle='--')
minutes, seconds = divmod(int(current_time), 60)
timecode = f"{minutes:02d}:{seconds:02d}"
ax.text(current_time, ax.get_ylim()[1], timecode,
verticalalignment='top', horizontalalignment='right',
color='r', fontweight='bold', bbox=dict(facecolor='white', edgecolor='none', alpha=0.7))
ax.set_xlabel("Time")
ax.set_ylabel("Body Movement")
ax.set_title("Body Movement Analysis")
num_labels = 80
label_positions = np.linspace(0, video_duration, num_labels)
label_texts = [f"{int(t//60):02d}:{int(t%60):02d}" for t in label_positions]
ax.set_xticks(label_positions)
ax.set_xticklabels(label_texts, rotation=90, ha='right')
ax.set_xlim(0, video_duration)
plt.tight_layout()
return fig
def load_sample_frame(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None
ret, frame = cap.read()
cap.release()
if not ret:
return None
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return frame_rgb
def gradio_app():
with gr.Blocks() as app:
gr.Markdown("# Multi-Person Body Movement Analysis")
video_input = gr.Video(label="Upload Video")
graph_output = gr.Plot()
time_slider = gr.Slider(label="Time (seconds)", minimum=0, maximum=100, step=0.1)
video_output = gr.Image(label="Body Posture")
with gr.Row():
sample_video_frame = gr.Image(value=load_sample_frame("IL_Dancing_Sample.mp4"), label="Sample Video Frame")
use_sample_button = gr.Button("Use Sample Video")
error_output = gr.Textbox(label="Error Messages", visible=False)
video_path = gr.State(None)
fig_state = gr.State(None)
ax_state = gr.State(None)
time_points_state = gr.State(None)
body_movements_state = gr.State(None)
video_duration_state = gr.State(None)
def process_and_update(video):
fig, ax, time_points, body_movements, video_path_value, video_duration, error = process_video(video)
if fig is not None:
time_slider.maximum = video_duration
error_output.visible = False
else:
error_output.visible = True
return fig, video, error, video_path_value, fig, ax, time_points, body_movements, video_duration
video_input.upload(process_and_update,
inputs=video_input,
outputs=[graph_output, video_output, error_output, video_path,
fig_state, ax_state, time_points_state, body_movements_state, video_duration_state])
def update_video_and_graph(video_path_value, current_time, fig, ax, time_points, body_movements, video_duration):
updated_frame = update_video(video_path_value, current_time)
updated_fig = update_graph(fig, ax, time_points, body_movements, current_time, video_duration)
return updated_frame, updated_fig
time_slider.change(update_video_and_graph,
inputs=[video_path, time_slider, fig_state, ax_state, time_points_state, body_movements_state, video_duration_state],
outputs=[video_output, graph_output])
def use_sample_video():
sample_video_path = "IL_Dancing_Sample.mp4"
return process_and_update(sample_video_path)
use_sample_button.click(use_sample_video,
inputs=None,
outputs=[graph_output, video_output, error_output, video_path,
fig_state, ax_state, time_points_state, body_movements_state, video_duration_state])
return app
if __name__ == "__main__":
app = gradio_app()
app.launch(share=True)