Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import tempfile | |
import cv2 | |
import requests | |
from ultralytics import YOLO | |
import streamlit as st | |
# Set page configuration | |
st.set_page_config( | |
page_title="People Tracking with YOLO11-pose", | |
page_icon="👥", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
st.title("People Tracking with YOLO11-pose") | |
# Sidebar: Input method and settings | |
st.sidebar.header("Input Settings") | |
uploaded_file = st.sidebar.file_uploader("Upload Image/Video", type=["jpg", "jpeg", "png", "bmp", "webp", "mp4"]) | |
youtube_link = st.sidebar.text_input("YouTube Link (optional)", "") | |
image_url = st.sidebar.text_input("Image URL (optional)", "") | |
sensitivity = st.sidebar.slider("Sensitivity (Confidence Threshold)", 0.0, 1.0, 0.2, step=0.01) | |
process_button = st.sidebar.button("Process Input") | |
# Define the video extensions for later use | |
video_exts = [".mp4", ".mov", ".avi", ".webm"] | |
def process_input(uploaded_file, youtube_link, image_url, sensitivity): | |
input_path = None | |
temp_files = [] | |
# Input priority: YouTube link > Image URL > Uploaded file. | |
if youtube_link and youtube_link.strip(): | |
try: | |
from pytubefix import YouTube | |
yt = YouTube(youtube_link) | |
stream = yt.streams.filter(file_extension='mp4', progressive=True).order_by("resolution").desc().first() | |
if not stream: | |
return None, None, None, "No suitable mp4 stream found." | |
temp_path = os.path.join(tempfile.gettempdir(), f"yt_{os.urandom(8).hex()}.mp4") | |
stream.download(output_path=tempfile.gettempdir(), filename=os.path.basename(temp_path)) | |
input_path = temp_path | |
temp_files.append(input_path) | |
except Exception as e: | |
return None, None, None, f"Error downloading YouTube video: {str(e)}" | |
elif image_url and image_url.strip(): | |
try: | |
response = requests.get(image_url, stream=True, timeout=10) | |
response.raise_for_status() | |
temp_path = os.path.join(tempfile.gettempdir(), f"img_{os.urandom(8).hex()}.jpg") | |
with open(temp_path, "wb") as f: | |
f.write(response.content) | |
input_path = temp_path | |
temp_files.append(input_path) | |
except Exception as e: | |
return None, None, None, f"Error downloading image: {str(e)}" | |
elif uploaded_file is not None: | |
# Save the uploaded file to a temporary file | |
ext = os.path.splitext(uploaded_file.name)[1] | |
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp: | |
tmp.write(uploaded_file.read()) | |
input_path = tmp.name | |
temp_files.append(input_path) | |
else: | |
return None, None, None, "Please provide an input." | |
ext = os.path.splitext(input_path)[1].lower() | |
output_path = None | |
# Load the YOLO model (ensure the model file is available in your repository) | |
model = YOLO("yolo11n-pose.pt") | |
try: | |
if ext in video_exts: | |
# Video processing | |
cap = cv2.VideoCapture(input_path) | |
if not cap.isOpened(): | |
return None, None, None, f"Cannot open video file: {input_path}" | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
if fps <= 0 or width <= 0 or height <= 0: | |
return None, None, None, "Invalid video properties detected." | |
output_path = os.path.join(tempfile.gettempdir(), f"out_{os.urandom(8).hex()}.mp4") | |
# Use 'mp4v' as codec | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
if not out.isOpened(): | |
return None, None, None, "Video processing failed: No suitable encoder available." | |
processed_frames = 0 | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Process frame: convert to RGB, run YOLO, then annotate and convert back to BGR. | |
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
results = model.predict(source=frame_rgb, conf=sensitivity)[0] | |
annotated_frame = results.plot() | |
annotated_frame_bgr = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR) | |
out.write(annotated_frame_bgr) | |
processed_frames += 1 | |
cap.release() | |
out.release() | |
temp_files.append(output_path) | |
if processed_frames == 0: | |
return None, None, None, "No frames processed from video." | |
if not os.path.exists(output_path) or os.path.getsize(output_path) < 1024: | |
return None, None, None, f"Output video created but too small ({os.path.getsize(output_path)} bytes) - processing failed." | |
return output_path, None, output_path, f"Video processed successfully! ({processed_frames}/{frame_count} frames)" | |
else: | |
# Image processing | |
results = model.predict(source=input_path, conf=sensitivity)[0] | |
annotated = results.plot() | |
output_path = os.path.join(tempfile.gettempdir(), f"out_{os.urandom(8).hex()}.jpg") | |
cv2.imwrite(output_path, annotated) | |
temp_files.append(output_path) | |
return output_path, output_path, None, "Image processed successfully!" | |
except Exception as e: | |
return None, None, None, f"Processing error: {str(e)}" | |
finally: | |
# Clean up temporary files except the final output | |
for f in temp_files[:-1]: | |
if f and os.path.exists(f): | |
try: | |
os.remove(f) | |
except: | |
pass | |
# When the user clicks "Process Input" | |
if process_button: | |
out_file, out_img, out_vid, status = process_input(uploaded_file, youtube_link, image_url, sensitivity) | |
st.write(status) | |
if out_img: | |
st.image(out_img, caption="Annotated Output (Image)", use_column_width=True) | |
if out_vid: | |
st.video(out_vid) | |
if out_file: | |
with open(out_file, "rb") as f: | |
st.download_button( | |
label="Download Annotated Output", | |
data=f, | |
file_name=os.path.basename(out_file), | |
mime="video/mp4" if os.path.splitext(out_file)[1].lower() in video_exts else "image/jpeg" | |
) | |