File size: 6,721 Bytes
2d60aec
 
 
3b7f151
2d60aec
3b7f151
2d60aec
3b7f151
2d60aec
3b7f151
2d60aec
 
3b7f151
 
 
2d60aec
3b7f151
2d60aec
 
 
 
 
 
 
3b7f151
2d60aec
 
3b7f151
2d60aec
 
 
3b7f151
2d60aec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3b7f151
2d60aec
 
3b7f151
2d60aec
 
3b7f151
2d60aec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3b7f151
2d60aec
 
 
 
 
 
 
 
3b7f151
2d60aec
 
 
 
 
 
 
 
 
 
 
3b7f151
2d60aec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import os
import sys
import tempfile
import cv2
import requests
from ultralytics import YOLO
import streamlit as st

# Set page configuration
st.set_page_config(
    page_title="People Tracking with YOLO11-pose",
    page_icon="👥",
    layout="wide",
    initial_sidebar_state="expanded"
)
st.title("People Tracking with YOLO11-pose")

# Sidebar: Input method and settings
st.sidebar.header("Input Settings")
uploaded_file = st.sidebar.file_uploader("Upload Image/Video", type=["jpg", "jpeg", "png", "bmp", "webp", "mp4"])
youtube_link = st.sidebar.text_input("YouTube Link (optional)", "")
image_url = st.sidebar.text_input("Image URL (optional)", "")
sensitivity = st.sidebar.slider("Sensitivity (Confidence Threshold)", 0.0, 1.0, 0.2, step=0.01)
process_button = st.sidebar.button("Process Input")

# Define the video extensions for later use
video_exts = [".mp4", ".mov", ".avi", ".webm"]

def process_input(uploaded_file, youtube_link, image_url, sensitivity):
    input_path = None
    temp_files = []

    # Input priority: YouTube link > Image URL > Uploaded file.
    if youtube_link and youtube_link.strip():
        try:
            from pytubefix import YouTube
            yt = YouTube(youtube_link)
            stream = yt.streams.filter(file_extension='mp4', progressive=True).order_by("resolution").desc().first()
            if not stream:
                return None, None, None, "No suitable mp4 stream found."
            temp_path = os.path.join(tempfile.gettempdir(), f"yt_{os.urandom(8).hex()}.mp4")
            stream.download(output_path=tempfile.gettempdir(), filename=os.path.basename(temp_path))
            input_path = temp_path
            temp_files.append(input_path)
        except Exception as e:
            return None, None, None, f"Error downloading YouTube video: {str(e)}"
    elif image_url and image_url.strip():
        try:
            response = requests.get(image_url, stream=True, timeout=10)
            response.raise_for_status()
            temp_path = os.path.join(tempfile.gettempdir(), f"img_{os.urandom(8).hex()}.jpg")
            with open(temp_path, "wb") as f:
                f.write(response.content)
            input_path = temp_path
            temp_files.append(input_path)
        except Exception as e:
            return None, None, None, f"Error downloading image: {str(e)}"
    elif uploaded_file is not None:
        # Save the uploaded file to a temporary file
        ext = os.path.splitext(uploaded_file.name)[1]
        with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp:
            tmp.write(uploaded_file.read())
            input_path = tmp.name
            temp_files.append(input_path)
    else:
        return None, None, None, "Please provide an input."

    ext = os.path.splitext(input_path)[1].lower()
    output_path = None

    # Load the YOLO model (ensure the model file is available in your repository)
    model = YOLO("yolo11n-pose.pt")

    try:
        if ext in video_exts:
            # Video processing
            cap = cv2.VideoCapture(input_path)
            if not cap.isOpened():
                return None, None, None, f"Cannot open video file: {input_path}"
            
            fps = cap.get(cv2.CAP_PROP_FPS)
            width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            
            if fps <= 0 or width <= 0 or height <= 0:
                return None, None, None, "Invalid video properties detected."
            
            output_path = os.path.join(tempfile.gettempdir(), f"out_{os.urandom(8).hex()}.mp4")
            # Use 'mp4v' as codec
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
            
            if not out.isOpened():
                return None, None, None, "Video processing failed: No suitable encoder available."
            
            processed_frames = 0
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                    
                # Process frame: convert to RGB, run YOLO, then annotate and convert back to BGR.
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                results = model.predict(source=frame_rgb, conf=sensitivity)[0]
                annotated_frame = results.plot()
                annotated_frame_bgr = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)
                
                out.write(annotated_frame_bgr)
                processed_frames += 1
            
            cap.release()
            out.release()
            temp_files.append(output_path)
            
            if processed_frames == 0:
                return None, None, None, "No frames processed from video."
                
            if not os.path.exists(output_path) or os.path.getsize(output_path) < 1024:
                return None, None, None, f"Output video created but too small ({os.path.getsize(output_path)} bytes) - processing failed."
                
            return output_path, None, output_path, f"Video processed successfully! ({processed_frames}/{frame_count} frames)"

        else:
            # Image processing
            results = model.predict(source=input_path, conf=sensitivity)[0]
            annotated = results.plot()
            output_path = os.path.join(tempfile.gettempdir(), f"out_{os.urandom(8).hex()}.jpg")
            cv2.imwrite(output_path, annotated)
            temp_files.append(output_path)
            return output_path, output_path, None, "Image processed successfully!"

    except Exception as e:
        return None, None, None, f"Processing error: {str(e)}"
    
    finally:
        # Clean up temporary files except the final output
        for f in temp_files[:-1]:
            if f and os.path.exists(f):
                try:
                    os.remove(f)
                except:
                    pass

# When the user clicks "Process Input"
if process_button:
    out_file, out_img, out_vid, status = process_input(uploaded_file, youtube_link, image_url, sensitivity)
    st.write(status)
    if out_img:
        st.image(out_img, caption="Annotated Output (Image)", use_column_width=True)
    if out_vid:
        st.video(out_vid)
    if out_file:
        with open(out_file, "rb") as f:
            st.download_button(
                label="Download Annotated Output",
                data=f,
                file_name=os.path.basename(out_file),
                mime="video/mp4" if os.path.splitext(out_file)[1].lower() in video_exts else "image/jpeg"
            )