Spaces:
Sleeping
Sleeping
| # This script is designed for Hugging Face Spaces | |
| # Dependencies are specified in requirements.txt in the Space's repository root | |
| # Expected dependencies: | |
| # gradio>=4.0.0 | |
| # opencv-python>=4.8.0 | |
| # requests>=2.28.0 | |
| # ultralytics>=8.2.0 | |
| # pytubefix>=6.0.0 | |
| # numpy>=1.23.0 | |
| import sys | |
| import gradio as gr | |
| import os | |
| import tempfile | |
| import cv2 | |
| import requests | |
| from ultralytics import YOLO | |
| # Remove extra CLI arguments that Spaces might pass | |
| sys.argv = [arg for arg in sys.argv if arg != "--import"] | |
| model = YOLO("yolo11n-pose.pt") | |
| def process_input(uploaded_file, youtube_link, image_url, sensitivity): | |
| input_path = None | |
| temp_files = [] | |
| # Input priority handling | |
| if youtube_link and youtube_link.strip(): | |
| try: | |
| from pytubefix import YouTube | |
| yt = YouTube(youtube_link) | |
| stream = yt.streams.filter(file_extension='mp4', progressive=True).order_by("resolution").desc().first() | |
| if not stream: | |
| return None, None, None, "No suitable mp4 stream found." | |
| temp_path = os.path.join(tempfile.gettempdir(), f"yt_{os.urandom(8).hex()}.mp4") | |
| stream.download(output_path=tempfile.gettempdir(), filename=os.path.basename(temp_path)) | |
| input_path = temp_path | |
| temp_files.append(input_path) | |
| except Exception as e: | |
| return None, None, None, f"Error downloading YouTube video: {str(e)}" | |
| elif image_url and image_url.strip(): | |
| try: | |
| response = requests.get(image_url, stream=True, timeout=10) | |
| response.raise_for_status() | |
| temp_path = os.path.join(tempfile.gettempdir(), f"img_{os.urandom(8).hex()}.jpg") | |
| with open(temp_path, "wb") as f: | |
| f.write(response.content) | |
| input_path = temp_path | |
| temp_files.append(input_path) | |
| except Exception as e: | |
| return None, None, None, f"Error downloading image: {str(e)}" | |
| elif uploaded_file is not None: | |
| input_path = uploaded_file.name | |
| else: | |
| return None, None, None, "Please provide an input." | |
| # Process file | |
| ext = os.path.splitext(input_path)[1].lower() | |
| video_exts = [".mp4", ".mov", ".avi", ".webm"] | |
| output_path = None | |
| try: | |
| if ext in video_exts: | |
| # Video processing | |
| cap = cv2.VideoCapture(input_path) | |
| if not cap.isOpened(): | |
| return None, None, None, f"Cannot open video file: {input_path}" | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if fps <= 0 or width <= 0 or height <= 0: | |
| return None, None, None, "Invalid video properties detected." | |
| output_path = os.path.join(tempfile.gettempdir(), f"out_{os.urandom(8).hex()}.mp4") | |
| # Use 'mp4v' instead of 'avc1' as it might work better with Spaces' OpenCV | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| if not out.isOpened(): | |
| # Fallback message if VideoWriter fails | |
| return None, None, None, "Video processing failed: No suitable encoder available in this environment. Try a different input format or contact support." | |
| processed_frames = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Process frame | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| results = model.predict(source=frame_rgb, conf=sensitivity)[0] | |
| annotated_frame = results.plot() | |
| annotated_frame_bgr = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR) | |
| out.write(annotated_frame_bgr) | |
| processed_frames += 1 | |
| cap.release() | |
| out.release() | |
| temp_files.append(output_path) | |
| if processed_frames == 0: | |
| return None, None, None, "No frames processed from video." | |
| if not os.path.exists(output_path) or os.path.getsize(output_path) < 1024: | |
| return None, None, None, f"Output video created but too small ({os.path.getsize(output_path)} bytes) - processing failed. Codec support might be limited." | |
| return output_path, None, output_path, f"Video processed successfully! ({processed_frames}/{frame_count} frames)" | |
| else: | |
| # Image processing | |
| results = model.predict(source=input_path, conf=sensitivity)[0] | |
| annotated = results.plot() | |
| output_path = os.path.join(tempfile.gettempdir(), f"out_{os.urandom(8).hex()}.jpg") | |
| cv2.imwrite(output_path, annotated) | |
| temp_files.append(output_path) | |
| return output_path, output_path, None, "Image processed successfully!" | |
| except Exception as e: | |
| return None, None, None, f"Processing error: {str(e)}" | |
| finally: | |
| for f in temp_files[:-1]: | |
| if f and os.path.exists(f): | |
| try: | |
| os.remove(f) | |
| except: | |
| pass | |
| with gr.Blocks(css=""" | |
| .result_img > img { | |
| width: 100%; | |
| height: auto; | |
| object-fit: contain; | |
| } | |
| """) as demo: | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.HTML("<div style='text-align:center;'><img src='https://huggingface.co/spaces/tstone87/stance-detection/resolve/main/crowdresult.jpg' style='width:25%;'/></div>") | |
| gr.Markdown("## Pose Detection with YOLO11-pose") | |
| with gr.Tabs(): | |
| with gr.TabItem("Upload File"): | |
| file_input = gr.File(label="Upload Image/Video") | |
| with gr.TabItem("YouTube Link"): | |
| youtube_input = gr.Textbox(label="YouTube Link", placeholder="https://...") | |
| with gr.TabItem("Image URL"): | |
| image_url_input = gr.Textbox(label="Image URL", placeholder="https://...") | |
| sensitivity_slider = gr.Slider(minimum=0.0, maximum=1.0, step=0.01, value=0.2, | |
| label="Sensitivity (Confidence Threshold)") | |
| with gr.Column(scale=2): | |
| output_image = gr.Image(label="Annotated Output (Image)", elem_classes="result_img") | |
| output_video = gr.Video(label="Annotated Output (Video)") | |
| output_file = gr.File(label="Download Annotated Output") | |
| output_text = gr.Textbox(label="Status", interactive=False) | |
| file_input.change( | |
| fn=process_input, | |
| inputs=[file_input, gr.State(""), gr.State(""), sensitivity_slider], | |
| outputs=[output_file, output_image, output_video, output_text] | |
| ) | |
| youtube_input.change( | |
| fn=process_input, | |
| inputs=[gr.State(None), youtube_input, gr.State(""), sensitivity_slider], | |
| outputs=[output_file, output_image, output_video, output_text] | |
| ) | |
| image_url_input.change( | |
| fn=process_input, | |
| inputs=[gr.State(None), gr.State(""), image_url_input, sensitivity_slider], | |
| outputs=[output_file, output_image, output_video, output_text] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |