import os import tempfile import base64 import cv2 import streamlit as st import PIL from ultralytics import YOLO import requests ############################################################################### # Helper function to embed an HTML5 video that autoplays (muted) with controls. ############################################################################### def show_autoplay_video(video_bytes: bytes, title: str = "Video"): if not video_bytes: st.warning(f"No {title} video available.") return video_base64 = base64.b64encode(video_bytes).decode() video_html = f"""

{title}

""" st.markdown(video_html, unsafe_allow_html=True) ############################################################################### # Session state initialization (for processed results) ############################################################################### if "processed_frames" not in st.session_state: st.session_state["processed_frames"] = [] if "shortened_video_data" not in st.session_state: st.session_state["shortened_video_data"] = None if "shortened_video_ready" not in st.session_state: st.session_state["shortened_video_ready"] = False ############################################################################### # Configure YOLO model path and page layout ############################################################################### model_path = 'https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/best.pt' st.set_page_config( page_title="Fire Detection: Original vs. Processed Video", page_icon="🔥", layout="wide", initial_sidebar_state="expanded" ) ############################################################################### # SIDEBAR: Video input options, confidence, sampling options, and example selection ############################################################################### with st.sidebar: st.header("Video Input Options") # Option to select an example pair; "None" means use an uploaded file. example_option = st.selectbox( "Select Example Pair (optional)", ["None", "T Example", "LA Example"] ) source_file = st.file_uploader( "Or upload your own file...", type=("mp4", "jpg", "jpeg", "png", "bmp", "webp") ) confidence = float(st.slider("Select Model Confidence", 25, 100, 40)) / 100 video_option = st.selectbox( "Select Video Shortening Option", ["Original FPS", "1 fps", "1 frame per 5 seconds", "1 frame per 10 seconds", "1 frame per 15 seconds"] ) progress_text = st.empty() progress_bar = st.progress(0) ############################################################################### # MAIN TITLE ############################################################################### st.title("Fire Detection: Original vs. Processed Video") ############################################################################### # Load YOLO model ############################################################################### try: model = YOLO(model_path) except Exception as ex: st.error(f"Unable to load model. Check model path: {model_path}") st.error(ex) ############################################################################### # Determine source video(s): Example pair or uploaded file. ############################################################################### original_video_data = None processed_video_data = None # For example pairs if example_option != "None": # Use example videos from remote URLs. if example_option == "T Example": orig_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/T1.mp4" proc_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/T2.mpg" elif example_option == "LA Example": orig_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/LA1.mp4" proc_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/LA2.mp4" try: original_video_data = requests.get(orig_url).content processed_video_data = requests.get(proc_url).content except Exception as ex: st.error("Error loading example videos. Check your URLs.") else: # No example selected. If a file is uploaded, use it. if source_file: file_type = source_file.type.split('/')[0] if file_type == 'image': original_image = PIL.Image.open(source_file) buf = tempfile.NamedTemporaryFile(suffix=".png", delete=False) original_image.save(buf.name, format="PNG") with open(buf.name, "rb") as f: original_video_data = f.read() else: tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") tfile.write(source_file.read()) tfile.flush() with open(tfile.name, "rb") as vf: original_video_data = vf.read() # Open with OpenCV for processing. vidcap = cv2.VideoCapture(tfile.name) else: st.info("Please select an example pair or upload a video file.") ############################################################################### # Layout: Two columns for Original and Processed videos ############################################################################### col1, col2 = st.columns(2) with col1: st.subheader("Original File") if original_video_data: show_autoplay_video(original_video_data, title="Original Video") else: st.info("No original video available.") with col2: st.subheader("Result File") if example_option != "None": if processed_video_data: show_autoplay_video(processed_video_data, title="Processed Video") else: st.info("No processed video available in example.") else: if st.session_state["shortened_video_ready"] and st.session_state["shortened_video_data"]: show_autoplay_video(st.session_state["shortened_video_data"], title="Processed Video") else: st.info("Processed video will appear here once detection is run.") ############################################################################### # DETECTION: Process the uploaded video if no example is selected. ############################################################################### if example_option == "None" and source_file and source_file.type.split('/')[0] != 'image': if st.sidebar.button("Let's Detect Wildfire"): # Reset previous processed results. st.session_state["processed_frames"] = [] st.session_state["shortened_video_data"] = None st.session_state["shortened_video_ready"] = False processed_frames = st.session_state["processed_frames"] frame_count = 0 orig_fps = vidcap.get(cv2.CAP_PROP_FPS) total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Determine sampling interval. if video_option == "Original FPS": sample_interval = 1 output_fps = orig_fps elif video_option == "1 fps": sample_interval = int(orig_fps) if orig_fps > 0 else 1 output_fps = 1 elif video_option == "1 frame per 5 seconds": sample_interval = int(orig_fps * 5) if orig_fps > 0 else 5 output_fps = 1 elif video_option == "1 frame per 10 seconds": sample_interval = int(orig_fps * 10) if orig_fps > 0 else 10 output_fps = 1 elif video_option == "1 frame per 15 seconds": sample_interval = int(orig_fps * 15) if orig_fps > 0 else 15 output_fps = 1 else: sample_interval = 1 output_fps = orig_fps success, image = vidcap.read() while success: if frame_count % sample_interval == 0: res = model.predict(image, conf=confidence) res_plotted = res[0].plot()[:, :, ::-1] processed_frames.append(res_plotted) if total_frames > 0: progress_pct = int((frame_count / total_frames) * 100) progress_text.text(f"Processing frame {frame_count} / {total_frames} ({progress_pct}%)") progress_bar.progress(min(100, progress_pct)) else: progress_text.text(f"Processing frame {frame_count}") frame_count += 1 success, image = vidcap.read() progress_text.text("Video processing complete!") progress_bar.progress(100) # Create shortened video from processed frames. if processed_frames: temp_video_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') # Use the 'avc1' codec (H.264) for better compatibility fourcc = cv2.VideoWriter_fourcc(*'avc1') out = cv2.VideoWriter(temp_video_file.name, fourcc, output_fps, (width, height)) for frame in processed_frames: frame_out = cv2.convertScaleAbs(frame) out.write(frame_out) out.release() with open(temp_video_file.name, 'rb') as video_file: st.session_state["shortened_video_data"] = video_file.read() st.session_state["shortened_video_ready"] = True st.success("Processed video created successfully!") else: st.error("No frames were processed from the video.") ############################################################################### # Always display the download button if a processed video is ready. ############################################################################### if st.session_state["shortened_video_ready"] and st.session_state["shortened_video_data"]: st.download_button( label="Download Processed Video", data=st.session_state["shortened_video_data"], file_name="processed_video.mp4", mime="video/mp4" )