Spaces:
Sleeping
Sleeping
import os | |
import tempfile | |
import base64 | |
import cv2 | |
import streamlit as st | |
import PIL | |
from ultralytics import YOLO | |
import requests | |
############################################################################### | |
# Helper: Embed an HTML5 video that autoplays (muted) with controls. | |
############################################################################### | |
def show_autoplay_video(video_bytes: bytes, title: str = "Video"): | |
if not video_bytes: | |
st.warning(f"No {title} video available.") | |
return | |
video_base64 = base64.b64encode(video_bytes).decode() | |
video_html = f""" | |
<h4>{title}</h4> | |
<video width="100%" controls autoplay muted> | |
<source src="data:video/mp4;base64,{video_base64}" type="video/mp4"> | |
Your browser does not support the video tag. | |
</video> | |
""" | |
st.markdown(video_html, unsafe_allow_html=True) | |
############################################################################### | |
# Session state initialization (for processed results) | |
############################################################################### | |
if "processed_frames" not in st.session_state: | |
st.session_state["processed_frames"] = [] | |
if "shortened_video_data" not in st.session_state: | |
st.session_state["shortened_video_data"] = None | |
if "shortened_video_ready" not in st.session_state: | |
st.session_state["shortened_video_ready"] = False | |
############################################################################### | |
# Configure YOLO model path and page layout | |
############################################################################### | |
model_path = 'https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/best.pt' | |
st.set_page_config( | |
page_title="Fire Detection: Original vs. Processed Video", | |
page_icon="🔥", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
############################################################################### | |
# SIDEBAR: Video input options, confidence, sampling options, and example selection | |
############################################################################### | |
with st.sidebar: | |
st.header("Video Input Options") | |
example_option = st.selectbox( | |
"Select Example Pair (optional)", | |
["None", "T Example", "LA Example"] | |
) | |
source_file = st.file_uploader( | |
"Or upload your own file...", | |
type=("mp4", "jpg", "jpeg", "png", "bmp", "webp") | |
) | |
confidence = float(st.slider("Select Model Confidence", 25, 100, 40)) / 100 | |
video_option = st.selectbox( | |
"Select Video Shortening Option", | |
["Original FPS", "1 fps", "1 frame per 5 seconds", "1 frame per 10 seconds", "1 frame per 15 seconds"] | |
) | |
progress_text = st.empty() | |
progress_bar = st.progress(0) | |
download_placeholder = st.empty() # This placeholder will hold the download button | |
############################################################################### | |
# MAIN TITLE | |
############################################################################### | |
st.title("Fire Detection: Original vs. Processed Video") | |
############################################################################### | |
# Load YOLO model | |
############################################################################### | |
try: | |
model = YOLO(model_path) | |
except Exception as ex: | |
st.error(f"Unable to load model. Check model path: {model_path}") | |
st.error(ex) | |
############################################################################### | |
# Determine source video(s): Example pair or uploaded file. | |
############################################################################### | |
original_video_data = None | |
processed_video_data = None # For example pairs | |
if example_option != "None": | |
# Use example videos from remote URLs. | |
if example_option == "T Example": | |
orig_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/T1.mp4" | |
proc_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/T2.mpg" | |
elif example_option == "LA Example": | |
orig_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/LA1.mp4" | |
proc_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/LA2.mp4" | |
try: | |
original_video_data = requests.get(orig_url).content | |
processed_video_data = requests.get(proc_url).content | |
except Exception as ex: | |
st.error("Error loading example videos. Check your URLs.") | |
else: | |
# No example selected. If a file is uploaded, use it. | |
if source_file: | |
file_type = source_file.type.split('/')[0] | |
if file_type == 'image': | |
original_image = PIL.Image.open(source_file) | |
buf = tempfile.NamedTemporaryFile(suffix=".png", delete=False) | |
original_image.save(buf.name, format="PNG") | |
with open(buf.name, "rb") as f: | |
original_video_data = f.read() | |
else: | |
tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
tfile.write(source_file.read()) | |
tfile.flush() | |
with open(tfile.name, "rb") as vf: | |
original_video_data = vf.read() | |
# Open with OpenCV for processing. | |
vidcap = cv2.VideoCapture(tfile.name) | |
else: | |
st.info("Please select an example pair or upload a video file.") | |
############################################################################### | |
# Layout: Two columns for Original and Processed videos. | |
############################################################################### | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("Original File") | |
if original_video_data: | |
show_autoplay_video(original_video_data, title="Original Video") | |
else: | |
st.info("No original video available.") | |
with col2: | |
st.subheader("Result File") | |
# Create a dedicated placeholder for the processed video. | |
viewer_slot = st.empty() | |
if example_option != "None": | |
if processed_video_data: | |
show_autoplay_video(processed_video_data, title="Processed Video") | |
else: | |
st.info("No processed video available in example.") | |
else: | |
viewer_slot.info("Processed video will appear here once detection is run.") | |
############################################################################### | |
# DETECTION: Process the uploaded video if no example is selected. | |
############################################################################### | |
if example_option == "None" and source_file and source_file.type.split('/')[0] != 'image': | |
if st.sidebar.button("Let's Detect Wildfire"): | |
# Reset previous processed results. | |
st.session_state["processed_frames"] = [] | |
st.session_state["shortened_video_data"] = None | |
st.session_state["shortened_video_ready"] = False | |
processed_frames = st.session_state["processed_frames"] | |
frame_count = 0 | |
orig_fps = vidcap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
# Determine sampling interval. | |
if video_option == "Original FPS": | |
sample_interval = 1 | |
output_fps = orig_fps | |
elif video_option == "1 fps": | |
sample_interval = int(orig_fps) if orig_fps > 0 else 1 | |
output_fps = 1 | |
elif video_option == "1 frame per 5 seconds": | |
sample_interval = int(orig_fps * 5) if orig_fps > 0 else 5 | |
output_fps = 1 | |
elif video_option == "1 frame per 10 seconds": | |
sample_interval = int(orig_fps * 10) if orig_fps > 0 else 10 | |
output_fps = 1 | |
elif video_option == "1 frame per 15 seconds": | |
sample_interval = int(orig_fps * 15) if orig_fps > 0 else 15 | |
output_fps = 1 | |
else: | |
sample_interval = 1 | |
output_fps = orig_fps | |
success, image = vidcap.read() | |
while success: | |
if frame_count % sample_interval == 0: | |
res = model.predict(image, conf=confidence) | |
res_plotted = res[0].plot()[:, :, ::-1] | |
processed_frames.append(res_plotted) | |
# Update progress. | |
if total_frames > 0: | |
progress_pct = int((frame_count / total_frames) * 100) | |
progress_text.text(f"Processing frame {frame_count} / {total_frames} ({progress_pct}%)") | |
progress_bar.progress(min(100, progress_pct)) | |
else: | |
progress_text.text(f"Processing frame {frame_count}") | |
# Update the viewer with the most recent processed frame. | |
viewer_slot.image(res_plotted, caption=f"Frame {frame_count}", use_column_width=True) | |
frame_count += 1 | |
success, image = vidcap.read() | |
progress_text.text("Video processing complete!") | |
progress_bar.progress(100) | |
# Create shortened video from processed frames. | |
if processed_frames: | |
temp_video_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
# Use 'avc1' codec (H.264) for better compatibility. | |
fourcc = cv2.VideoWriter_fourcc(*'avc1') | |
out = cv2.VideoWriter(temp_video_file.name, fourcc, output_fps, (width, height)) | |
for frame in processed_frames: | |
frame_out = cv2.convertScaleAbs(frame) | |
out.write(frame_out) | |
out.release() | |
with open(temp_video_file.name, 'rb') as video_file: | |
st.session_state["shortened_video_data"] = video_file.read() | |
st.session_state["shortened_video_ready"] = True | |
st.success("Processed video created successfully!") | |
# Update the viewer with the final processed video. | |
viewer_slot.empty() | |
show_autoplay_video(st.session_state["shortened_video_data"], title="Processed Video") | |
else: | |
st.error("No frames were processed from the video.") | |
############################################################################### | |
# ALWAYS display the download button if a processed video is ready. | |
############################################################################### | |
if st.session_state["shortened_video_ready"] and st.session_state["shortened_video_data"]: | |
download_placeholder.download_button( | |
label="Download Processed Video", | |
data=st.session_state["shortened_video_data"], | |
file_name="processed_video.mp4", | |
mime="video/mp4" | |
) | |