Spaces:
Running
Running
import os | |
import tempfile | |
import base64 | |
import cv2 | |
import streamlit as st | |
import PIL | |
from ultralytics import YOLO | |
############################################################################### | |
# Helper function: Display an HTML5 video with autoplay, controls, and muted | |
############################################################################### | |
def show_autoplay_video(video_data: bytes, title: str = "Video"): | |
if not video_data: | |
st.warning(f"No {title} video available.") | |
return | |
video_base64 = base64.b64encode(video_data).decode() | |
video_html = f""" | |
<h4>{title}</h4> | |
<video width="100%" height="auto" controls autoplay muted> | |
<source src="data:video/mp4;base64,{video_base64}" type="video/mp4"> | |
Your browser does not support the video tag. | |
</video> | |
""" | |
st.markdown(video_html, unsafe_allow_html=True) | |
############################################################################### | |
# Session state initialization for processed results (for uploaded files) | |
############################################################################### | |
if "processed_frames" not in st.session_state: | |
st.session_state["processed_frames"] = [] | |
if "shortened_video_data" not in st.session_state: | |
st.session_state["shortened_video_data"] = None | |
if "shortened_video_ready" not in st.session_state: | |
st.session_state["shortened_video_ready"] = False | |
############################################################################### | |
# Configure YOLO model path and Streamlit page | |
############################################################################### | |
model_path = 'https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/best.pt' | |
st.set_page_config( | |
page_title="Fire Detection: Original vs. Processed Video", | |
page_icon="🔥", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
############################################################################### | |
# SIDEBAR: Upload file, set confidence, video option, and select an example pair | |
############################################################################### | |
with st.sidebar: | |
st.header("Video Input Options") | |
example_option = st.selectbox( | |
"Select Example Pair (optional)", | |
["None", "T Example", "LA Example"] | |
) | |
source_file = st.file_uploader( | |
"Or upload your own file...", | |
type=("jpg", "jpeg", "png", "bmp", "webp", "mp4") | |
) | |
confidence = float(st.slider("Select Model Confidence", 25, 100, 40)) / 100 | |
video_option = st.selectbox( | |
"Select Video Shortening Option", | |
["Original FPS", "1 fps", "1 frame per 5 seconds", "1 frame per 10 seconds", "1 frame per 15 seconds"] | |
) | |
progress_text = st.empty() | |
progress_bar = st.progress(0) | |
############################################################################### | |
# MAIN PAGE TITLE | |
############################################################################### | |
st.title("Fire Detection: Original vs. Processed Video") | |
############################################################################### | |
# Load YOLO model | |
############################################################################### | |
try: | |
model = YOLO(model_path) | |
except Exception as ex: | |
st.error(f"Unable to load model. Check model path: {model_path}") | |
st.error(ex) | |
############################################################################### | |
# Determine source: Example or Uploaded File | |
############################################################################### | |
original_video_data = None | |
processed_video_data = None # For example pairs, these are loaded directly | |
if example_option != "None": | |
# An example pair was chosen. Load the videos from disk. | |
if example_option == "T Example": | |
# T1.mp4: original, T2.mpg: processed (analysis completed video) | |
try: | |
with open("T1.mp4", "rb") as f: | |
original_video_data = f.read() | |
with open("T2.mpg", "rb") as f: | |
processed_video_data = f.read() | |
except Exception as ex: | |
st.error("Error loading T Example videos. Ensure T1.mp4 and T2.mpg are in your repo.") | |
elif example_option == "LA Example": | |
# LA1.mp4: original, LA2.mp4: processed | |
try: | |
with open("LA1.mp4", "rb") as f: | |
original_video_data = f.read() | |
with open("LA2.mp4", "rb") as f: | |
processed_video_data = f.read() | |
except Exception as ex: | |
st.error("Error loading LA Example videos. Ensure LA1.mp4 and LA2.mp4 are in your repo.") | |
else: | |
# No example selected. Use uploaded file if available. | |
if source_file: | |
file_type = source_file.type.split('/')[0] | |
if file_type == 'image': | |
# For images, simply show the uploaded image (and detection result below) | |
original_image = PIL.Image.open(source_file) | |
# Convert image to bytes for display if needed | |
buf = tempfile.NamedTemporaryFile(suffix=".png", delete=False) | |
original_image.save(buf.name, format="PNG") | |
with open(buf.name, "rb") as f: | |
original_video_data = f.read() # Actually, this is just an image preview. | |
else: | |
# For video, save to a temporary file and load its bytes. | |
tfile = tempfile.NamedTemporaryFile(delete=False) | |
tfile.write(source_file.read()) | |
tfile.flush() | |
with open(tfile.name, "rb") as vf: | |
original_video_data = vf.read() | |
# Also open video with OpenCV for processing below. | |
vidcap = cv2.VideoCapture(tfile.name) | |
else: | |
st.info("Please select an example pair or upload a file.") | |
############################################################################### | |
# Display the Original and Result columns side-by-side | |
############################################################################### | |
col1, col2 = st.columns(2) | |
# Left column: Original video | |
with col1: | |
st.subheader("Original File") | |
if original_video_data: | |
show_autoplay_video(original_video_data, title="Original") | |
else: | |
st.info("No original video available.") | |
############################################################################### | |
# DETECTION: For uploaded video files (not example pairs) run YOLO analysis | |
############################################################################### | |
# We only run detection if no example pair is selected and if an upload is provided. | |
if example_option == "None" and source_file and source_file.type.split('/')[0] != 'image': | |
# Reset processed frames for a new analysis | |
st.session_state["processed_frames"] = [] | |
frame_count = 0 | |
orig_fps = vidcap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
# Determine sampling interval | |
if video_option == "Original FPS": | |
sample_interval = 1 | |
output_fps = orig_fps | |
elif video_option == "1 fps": | |
sample_interval = int(orig_fps) if orig_fps > 0 else 1 | |
output_fps = 1 | |
elif video_option == "1 frame per 5 seconds": | |
sample_interval = int(orig_fps * 5) if orig_fps > 0 else 5 | |
output_fps = 1 | |
elif video_option == "1 frame per 10 seconds": | |
sample_interval = int(orig_fps * 10) if orig_fps > 0 else 10 | |
output_fps = 1 | |
elif video_option == "1 frame per 15 seconds": | |
sample_interval = int(orig_fps * 15) if orig_fps > 0 else 15 | |
output_fps = 1 | |
else: | |
sample_interval = 1 | |
output_fps = orig_fps | |
success, image = vidcap.read() | |
while success: | |
if frame_count % sample_interval == 0: | |
res = model.predict(image, conf=confidence) | |
res_plotted = res[0].plot()[:, :, ::-1] | |
st.session_state["processed_frames"].append(res_plotted) | |
# Update progress | |
if total_frames > 0: | |
progress_pct = int((frame_count / total_frames) * 100) | |
progress_text.text(f"Processing frame {frame_count} / {total_frames} ({progress_pct}%)") | |
progress_bar.progress(min(100, progress_pct)) | |
else: | |
progress_text.text(f"Processing frame {frame_count}") | |
frame_count += 1 | |
success, image = vidcap.read() | |
progress_text.text("Video processing complete!") | |
progress_bar.progress(100) | |
# Create shortened video from processed frames | |
processed_frames = st.session_state["processed_frames"] | |
if processed_frames: | |
temp_video_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(temp_video_file.name, fourcc, output_fps, (width, height)) | |
for frame in processed_frames: | |
out.write(frame) | |
out.release() | |
with open(temp_video_file.name, 'rb') as video_file: | |
st.session_state["shortened_video_data"] = video_file.read() | |
st.session_state["shortened_video_ready"] = True | |
st.success("Processed video created successfully!") | |
############################################################################### | |
# Right column: Display the Processed (Result) video | |
############################################################################### | |
with col2: | |
st.subheader("Result File") | |
# For example pairs, use the preloaded processed_video_data | |
if processed_video_data: | |
show_autoplay_video(processed_video_data, title="Processed") | |
# Otherwise, if a processed video has been generated from an upload, show it | |
elif st.session_state["shortened_video_ready"] and st.session_state["shortened_video_data"]: | |
show_autoplay_video(st.session_state["shortened_video_data"], title="Processed") | |
else: | |
st.info("No processed video available yet. Run detection if you uploaded a file.") | |
############################################################################### | |
# Always display the download button if a processed video is ready | |
############################################################################### | |
if st.session_state["shortened_video_ready"] and st.session_state["shortened_video_data"]: | |
st.download_button( | |
label="Download Processed Video", | |
data=st.session_state["shortened_video_data"], | |
file_name="processed_video.mp4", | |
mime="video/mp4" | |
) | |