tstone87's picture
Update app.py
4eb9029 verified
# This script is designed for Hugging Face Spaces
# Dependencies are specified in requirements.txt in the Space's repository root
# Expected dependencies:
# gradio>=4.0.0
# opencv-python>=4.8.0
# requests>=2.28.0
# ultralytics>=8.2.0
# pytubefix>=6.0.0
# numpy>=1.23.0
import sys
import gradio as gr
import os
import tempfile
import cv2
import requests
from ultralytics import YOLO
# Remove extra CLI arguments that Spaces might pass
sys.argv = [arg for arg in sys.argv if arg != "--import"]
model = YOLO("yolo11n-pose.pt")
def process_input(uploaded_file, youtube_link, image_url, sensitivity):
input_path = None
temp_files = []
# Input priority handling
if youtube_link and youtube_link.strip():
try:
from pytubefix import YouTube
yt = YouTube(youtube_link)
stream = yt.streams.filter(file_extension='mp4', progressive=True).order_by("resolution").desc().first()
if not stream:
return None, None, None, "No suitable mp4 stream found."
temp_path = os.path.join(tempfile.gettempdir(), f"yt_{os.urandom(8).hex()}.mp4")
stream.download(output_path=tempfile.gettempdir(), filename=os.path.basename(temp_path))
input_path = temp_path
temp_files.append(input_path)
except Exception as e:
return None, None, None, f"Error downloading YouTube video: {str(e)}"
elif image_url and image_url.strip():
try:
response = requests.get(image_url, stream=True, timeout=10)
response.raise_for_status()
temp_path = os.path.join(tempfile.gettempdir(), f"img_{os.urandom(8).hex()}.jpg")
with open(temp_path, "wb") as f:
f.write(response.content)
input_path = temp_path
temp_files.append(input_path)
except Exception as e:
return None, None, None, f"Error downloading image: {str(e)}"
elif uploaded_file is not None:
input_path = uploaded_file.name
else:
return None, None, None, "Please provide an input."
# Process file
ext = os.path.splitext(input_path)[1].lower()
video_exts = [".mp4", ".mov", ".avi", ".webm"]
output_path = None
try:
if ext in video_exts:
# Video processing
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
return None, None, None, f"Cannot open video file: {input_path}"
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if fps <= 0 or width <= 0 or height <= 0:
return None, None, None, "Invalid video properties detected."
output_path = os.path.join(tempfile.gettempdir(), f"out_{os.urandom(8).hex()}.mp4")
# Use 'mp4v' instead of 'avc1' as it might work better with Spaces' OpenCV
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if not out.isOpened():
# Fallback message if VideoWriter fails
return None, None, None, "Video processing failed: No suitable encoder available in this environment. Try a different input format or contact support."
processed_frames = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Process frame
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = model.predict(source=frame_rgb, conf=sensitivity)[0]
annotated_frame = results.plot()
annotated_frame_bgr = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)
out.write(annotated_frame_bgr)
processed_frames += 1
cap.release()
out.release()
temp_files.append(output_path)
if processed_frames == 0:
return None, None, None, "No frames processed from video."
if not os.path.exists(output_path) or os.path.getsize(output_path) < 1024:
return None, None, None, f"Output video created but too small ({os.path.getsize(output_path)} bytes) - processing failed. Codec support might be limited."
return output_path, None, output_path, f"Video processed successfully! ({processed_frames}/{frame_count} frames)"
else:
# Image processing
results = model.predict(source=input_path, conf=sensitivity)[0]
annotated = results.plot()
output_path = os.path.join(tempfile.gettempdir(), f"out_{os.urandom(8).hex()}.jpg")
cv2.imwrite(output_path, annotated)
temp_files.append(output_path)
return output_path, output_path, None, "Image processed successfully!"
except Exception as e:
return None, None, None, f"Processing error: {str(e)}"
finally:
for f in temp_files[:-1]:
if f and os.path.exists(f):
try:
os.remove(f)
except:
pass
with gr.Blocks(css="""
.result_img > img {
width: 100%;
height: auto;
object-fit: contain;
}
""") as demo:
with gr.Row():
with gr.Column(scale=1):
gr.HTML("<div style='text-align:center;'><img src='https://huggingface.co/spaces/tstone87/stance-detection/resolve/main/crowdresult.jpg' style='width:25%;'/></div>")
gr.Markdown("## Pose Detection with YOLO11-pose")
with gr.Tabs():
with gr.TabItem("Upload File"):
file_input = gr.File(label="Upload Image/Video")
with gr.TabItem("YouTube Link"):
youtube_input = gr.Textbox(label="YouTube Link", placeholder="https://...")
with gr.TabItem("Image URL"):
image_url_input = gr.Textbox(label="Image URL", placeholder="https://...")
sensitivity_slider = gr.Slider(minimum=0.0, maximum=1.0, step=0.01, value=0.2,
label="Sensitivity (Confidence Threshold)")
with gr.Column(scale=2):
output_image = gr.Image(label="Annotated Output (Image)", elem_classes="result_img")
output_video = gr.Video(label="Annotated Output (Video)")
output_file = gr.File(label="Download Annotated Output")
output_text = gr.Textbox(label="Status", interactive=False)
file_input.change(
fn=process_input,
inputs=[file_input, gr.State(""), gr.State(""), sensitivity_slider],
outputs=[output_file, output_image, output_video, output_text]
)
youtube_input.change(
fn=process_input,
inputs=[gr.State(None), youtube_input, gr.State(""), sensitivity_slider],
outputs=[output_file, output_image, output_video, output_text]
)
image_url_input.change(
fn=process_input,
inputs=[gr.State(None), gr.State(""), image_url_input, sensitivity_slider],
outputs=[output_file, output_image, output_video, output_text]
)
if __name__ == "__main__":
demo.launch()