Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import base64 | |
| import cv2 | |
| import streamlit as st | |
| import PIL | |
| import requests | |
| import imageio | |
| from ultralytics import YOLO | |
| from huggingface_hub import hf_hub_download | |
| ############################################################################### | |
| # Helper: Embed an HTML5 video that autoplays (muted) with controls. | |
| ############################################################################### | |
| def show_autoplay_video(video_bytes: bytes, title: str = "Video"): | |
| if not video_bytes: | |
| st.warning(f"No {title} video available.") | |
| return | |
| video_base64 = base64.b64encode(video_bytes).decode() | |
| video_html = f""" | |
| <h4>{title}</h4> | |
| <video width="100%" controls autoplay muted> | |
| <source src="data:video/mp4;base64,{video_base64}" type="video/mp4"> | |
| Your browser does not support the video tag. | |
| </video> | |
| """ | |
| st.markdown(video_html, unsafe_allow_html=True) | |
| ############################################################################### | |
| # Session state initialization (for processed results) | |
| ############################################################################### | |
| if "processed_frames" not in st.session_state: | |
| st.session_state["processed_frames"] = [] | |
| if "shortened_video_data" not in st.session_state: | |
| st.session_state["shortened_video_data"] = None | |
| if "shortened_video_ready" not in st.session_state: | |
| st.session_state["shortened_video_ready"] = False | |
| ############################################################################### | |
| # Download YOLO Model from Hugging Face | |
| ############################################################################### | |
| repo_id = "tstone87/ccr-colorado" | |
| model_filename = "best.pt" | |
| try: | |
| local_model_path = hf_hub_download(repo_id=repo_id, filename=model_filename) | |
| model = YOLO(local_model_path) | |
| except Exception as ex: | |
| st.error(f"Unable to load model. Check model path: {model_filename}") | |
| st.error(ex) | |
| ############################################################################### | |
| # Configure Streamlit Page Layout | |
| ############################################################################### | |
| st.set_page_config( | |
| page_title="Fire Detection: Original vs. Processed Video", | |
| page_icon="🔥", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| ############################################################################### | |
| # SIDEBAR: Video input options | |
| ############################################################################### | |
| with st.sidebar: | |
| st.header("Video Input Options") | |
| example_option = st.selectbox( | |
| "Select Example Pair (optional)", | |
| ["None", "T Example", "LA Example"] | |
| ) | |
| source_file = st.file_uploader( | |
| "Or upload your own file...", | |
| type=("mp4", "jpg", "jpeg", "png", "bmp", "webp") | |
| ) | |
| confidence = float(st.slider("Select Model Confidence", 25, 100, 40)) / 100 | |
| video_option = st.selectbox( | |
| "Select Video Shortening Option", | |
| ["Original FPS", "1 fps", "1 frame per 5 seconds", "1 frame per 10 seconds", "1 frame per 15 seconds"] | |
| ) | |
| progress_text = st.empty() | |
| progress_bar = st.progress(0) | |
| download_placeholder = st.empty() # Placeholder for download button | |
| ############################################################################### | |
| # MAIN TITLE | |
| ############################################################################### | |
| st.title("Fire Detection: Original vs. Processed Video") | |
| ############################################################################### | |
| # Load Example Video Data | |
| ############################################################################### | |
| original_video_data = None | |
| processed_video_data = None | |
| if example_option != "None": | |
| # Use example videos from remote URLs. | |
| example_videos = { | |
| "T Example": ("T1.mp4", "T2.mpg"), | |
| "LA Example": ("LA1.mp4", "LA2.mp4") | |
| } | |
| orig_filename, proc_filename = example_videos.get(example_option, (None, None)) | |
| if orig_filename and proc_filename: | |
| try: | |
| orig_url = f"https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/{orig_filename}" | |
| proc_url = f"https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/{proc_filename}" | |
| original_video_data = requests.get(orig_url).content | |
| processed_video_data = requests.get(proc_url).content | |
| except Exception as ex: | |
| st.error("Error loading example videos. Check your URLs.") | |
| else: | |
| # No example selected. If a file is uploaded, use it. | |
| if source_file: | |
| file_type = source_file.type.split('/')[0] | |
| if file_type == 'image': | |
| original_image = PIL.Image.open(source_file) | |
| buf = tempfile.NamedTemporaryFile(suffix=".png", delete=False) | |
| original_image.save(buf.name, format="PNG") | |
| with open(buf.name, "rb") as f: | |
| original_video_data = f.read() | |
| else: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tfile: | |
| tfile.write(source_file.read()) | |
| original_video_data = tfile.name | |
| ############################################################################### | |
| # Layout: Two Columns for Original and Processed Videos | |
| ############################################################################### | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Original File") | |
| if original_video_data: | |
| show_autoplay_video(original_video_data, title="Original Video") | |
| else: | |
| st.info("No original video available.") | |
| with col2: | |
| st.subheader("Result File") | |
| viewer_slot = st.empty() | |
| if example_option != "None": | |
| if processed_video_data: | |
| show_autoplay_video(processed_video_data, title="Processed Video") | |
| else: | |
| st.info("No processed video available in example.") | |
| else: | |
| viewer_slot.info("Processed video will appear here once detection is run.") | |
| ############################################################################### | |
| # Process Video if No Example is Selected | |
| ############################################################################### | |
| if example_option == "None" and source_file and source_file.type.split('/')[0] != 'image': | |
| if st.sidebar.button("Let's Detect Wildfire"): | |
| st.session_state["processed_frames"] = [] | |
| processed_frames = st.session_state["processed_frames"] | |
| vid_reader = imageio.get_reader(original_video_data) | |
| fps = vid_reader.get_meta_data()['fps'] | |
| width, height = vid_reader.get_meta_data()['size'] | |
| frame_count = 0 | |
| total_frames = vid_reader.get_length() | |
| for frame in vid_reader: | |
| res = model.predict(frame, conf=confidence) | |
| res_plotted = res[0].plot()[:, :, ::-1] | |
| processed_frames.append(res_plotted) | |
| progress_pct = int((frame_count / total_frames) * 100) | |
| progress_text.text(f"Processing frame {frame_count} / {total_frames} ({progress_pct}%)") | |
| progress_bar.progress(min(100, progress_pct)) | |
| frame_count += 1 | |
| progress_text.text("Video processing complete!") | |
| progress_bar.progress(100) | |
| if processed_frames: | |
| temp_video_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Use widely supported codec | |
| out = cv2.VideoWriter(temp_video_file.name, fourcc, fps, (width, height)) | |
| for frame in processed_frames: | |
| out.write(frame) | |
| out.release() | |
| with open(temp_video_file.name, 'rb') as video_file: | |
| st.session_state["shortened_video_data"] = video_file.read() | |
| st.session_state["shortened_video_ready"] = True | |
| st.success("Processed video created successfully!") | |
| show_autoplay_video(st.session_state["shortened_video_data"], title="Processed Video") | |
| ############################################################################### | |
| # Show Download Button if Ready | |
| ############################################################################### | |
| if st.session_state["shortened_video_ready"]: | |
| download_placeholder.download_button( | |
| label="Download Processed Video", | |
| data=st.session_state["shortened_video_data"], | |
| file_name="processed_video.mp4", | |
| mime="video/mp4" | |
| ) | |