tstone87's picture
Update app.py
2d60aec verified
import os
import sys
import tempfile
import cv2
import requests
from ultralytics import YOLO
import streamlit as st
# Set page configuration
st.set_page_config(
page_title="People Tracking with YOLO11-pose",
page_icon="👥",
layout="wide",
initial_sidebar_state="expanded"
)
st.title("People Tracking with YOLO11-pose")
# Sidebar: Input method and settings
st.sidebar.header("Input Settings")
uploaded_file = st.sidebar.file_uploader("Upload Image/Video", type=["jpg", "jpeg", "png", "bmp", "webp", "mp4"])
youtube_link = st.sidebar.text_input("YouTube Link (optional)", "")
image_url = st.sidebar.text_input("Image URL (optional)", "")
sensitivity = st.sidebar.slider("Sensitivity (Confidence Threshold)", 0.0, 1.0, 0.2, step=0.01)
process_button = st.sidebar.button("Process Input")
# Define the video extensions for later use
video_exts = [".mp4", ".mov", ".avi", ".webm"]
def process_input(uploaded_file, youtube_link, image_url, sensitivity):
input_path = None
temp_files = []
# Input priority: YouTube link > Image URL > Uploaded file.
if youtube_link and youtube_link.strip():
try:
from pytubefix import YouTube
yt = YouTube(youtube_link)
stream = yt.streams.filter(file_extension='mp4', progressive=True).order_by("resolution").desc().first()
if not stream:
return None, None, None, "No suitable mp4 stream found."
temp_path = os.path.join(tempfile.gettempdir(), f"yt_{os.urandom(8).hex()}.mp4")
stream.download(output_path=tempfile.gettempdir(), filename=os.path.basename(temp_path))
input_path = temp_path
temp_files.append(input_path)
except Exception as e:
return None, None, None, f"Error downloading YouTube video: {str(e)}"
elif image_url and image_url.strip():
try:
response = requests.get(image_url, stream=True, timeout=10)
response.raise_for_status()
temp_path = os.path.join(tempfile.gettempdir(), f"img_{os.urandom(8).hex()}.jpg")
with open(temp_path, "wb") as f:
f.write(response.content)
input_path = temp_path
temp_files.append(input_path)
except Exception as e:
return None, None, None, f"Error downloading image: {str(e)}"
elif uploaded_file is not None:
# Save the uploaded file to a temporary file
ext = os.path.splitext(uploaded_file.name)[1]
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp:
tmp.write(uploaded_file.read())
input_path = tmp.name
temp_files.append(input_path)
else:
return None, None, None, "Please provide an input."
ext = os.path.splitext(input_path)[1].lower()
output_path = None
# Load the YOLO model (ensure the model file is available in your repository)
model = YOLO("yolo11n-pose.pt")
try:
if ext in video_exts:
# Video processing
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
return None, None, None, f"Cannot open video file: {input_path}"
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if fps <= 0 or width <= 0 or height <= 0:
return None, None, None, "Invalid video properties detected."
output_path = os.path.join(tempfile.gettempdir(), f"out_{os.urandom(8).hex()}.mp4")
# Use 'mp4v' as codec
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if not out.isOpened():
return None, None, None, "Video processing failed: No suitable encoder available."
processed_frames = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Process frame: convert to RGB, run YOLO, then annotate and convert back to BGR.
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = model.predict(source=frame_rgb, conf=sensitivity)[0]
annotated_frame = results.plot()
annotated_frame_bgr = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)
out.write(annotated_frame_bgr)
processed_frames += 1
cap.release()
out.release()
temp_files.append(output_path)
if processed_frames == 0:
return None, None, None, "No frames processed from video."
if not os.path.exists(output_path) or os.path.getsize(output_path) < 1024:
return None, None, None, f"Output video created but too small ({os.path.getsize(output_path)} bytes) - processing failed."
return output_path, None, output_path, f"Video processed successfully! ({processed_frames}/{frame_count} frames)"
else:
# Image processing
results = model.predict(source=input_path, conf=sensitivity)[0]
annotated = results.plot()
output_path = os.path.join(tempfile.gettempdir(), f"out_{os.urandom(8).hex()}.jpg")
cv2.imwrite(output_path, annotated)
temp_files.append(output_path)
return output_path, output_path, None, "Image processed successfully!"
except Exception as e:
return None, None, None, f"Processing error: {str(e)}"
finally:
# Clean up temporary files except the final output
for f in temp_files[:-1]:
if f and os.path.exists(f):
try:
os.remove(f)
except:
pass
# When the user clicks "Process Input"
if process_button:
out_file, out_img, out_vid, status = process_input(uploaded_file, youtube_link, image_url, sensitivity)
st.write(status)
if out_img:
st.image(out_img, caption="Annotated Output (Image)", use_column_width=True)
if out_vid:
st.video(out_vid)
if out_file:
with open(out_file, "rb") as f:
st.download_button(
label="Download Annotated Output",
data=f,
file_name=os.path.basename(out_file),
mime="video/mp4" if os.path.splitext(out_file)[1].lower() in video_exts else "image/jpeg"
)