ccr-colorado / app.py
tstone87's picture
Update app.py
550e871 verified
raw
history blame
10.6 kB
import os
import tempfile
import base64
import cv2
import streamlit as st
import PIL
from ultralytics import YOLO
import requests
###############################################################################
# Helper: Embed an HTML5 video that autoplays (muted) with controls.
###############################################################################
def show_autoplay_video(video_bytes: bytes, title: str = "Video"):
if not video_bytes:
st.warning(f"No {title} video available.")
return
video_base64 = base64.b64encode(video_bytes).decode()
video_html = f"""
<h4>{title}</h4>
<video width="100%" controls autoplay muted>
<source src="data:video/mp4;base64,{video_base64}" type="video/mp4">
Your browser does not support the video tag.
</video>
"""
st.markdown(video_html, unsafe_allow_html=True)
###############################################################################
# Session state initialization (for processed results)
###############################################################################
if "processed_frames" not in st.session_state:
st.session_state["processed_frames"] = []
if "shortened_video_data" not in st.session_state:
st.session_state["shortened_video_data"] = None
if "shortened_video_ready" not in st.session_state:
st.session_state["shortened_video_ready"] = False
###############################################################################
# Configure YOLO model path and page layout
###############################################################################
model_path = 'https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/best.pt'
st.set_page_config(
page_title="Fire Detection: Original vs. Processed Video",
page_icon="🔥",
layout="wide",
initial_sidebar_state="expanded"
)
###############################################################################
# SIDEBAR: Video input options, confidence, sampling options, and example selection
###############################################################################
with st.sidebar:
st.header("Video Input Options")
example_option = st.selectbox(
"Select Example Pair (optional)",
["None", "T Example", "LA Example"]
)
source_file = st.file_uploader(
"Or upload your own file...",
type=("mp4", "jpg", "jpeg", "png", "bmp", "webp")
)
confidence = float(st.slider("Select Model Confidence", 25, 100, 40)) / 100
video_option = st.selectbox(
"Select Video Shortening Option",
["Original FPS", "1 fps", "1 frame per 5 seconds", "1 frame per 10 seconds", "1 frame per 15 seconds"]
)
progress_text = st.empty()
progress_bar = st.progress(0)
download_placeholder = st.empty() # This placeholder will hold the download button
###############################################################################
# MAIN TITLE
###############################################################################
st.title("Fire Detection: Original vs. Processed Video")
###############################################################################
# Load YOLO model
###############################################################################
try:
model = YOLO(model_path)
except Exception as ex:
st.error(f"Unable to load model. Check model path: {model_path}")
st.error(ex)
###############################################################################
# Determine source video(s): Example pair or uploaded file.
###############################################################################
original_video_data = None
processed_video_data = None # For example pairs
if example_option != "None":
# Use example videos from remote URLs.
if example_option == "T Example":
orig_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/T1.mp4"
proc_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/T2.mpg"
elif example_option == "LA Example":
orig_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/LA1.mp4"
proc_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/LA2.mp4"
try:
original_video_data = requests.get(orig_url).content
processed_video_data = requests.get(proc_url).content
except Exception as ex:
st.error("Error loading example videos. Check your URLs.")
else:
# No example selected. If a file is uploaded, use it.
if source_file:
file_type = source_file.type.split('/')[0]
if file_type == 'image':
original_image = PIL.Image.open(source_file)
buf = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
original_image.save(buf.name, format="PNG")
with open(buf.name, "rb") as f:
original_video_data = f.read()
else:
tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tfile.write(source_file.read())
tfile.flush()
with open(tfile.name, "rb") as vf:
original_video_data = vf.read()
# Open with OpenCV for processing.
vidcap = cv2.VideoCapture(tfile.name)
else:
st.info("Please select an example pair or upload a video file.")
###############################################################################
# Layout: Two columns for Original and Processed videos.
###############################################################################
col1, col2 = st.columns(2)
with col1:
st.subheader("Original File")
if original_video_data:
show_autoplay_video(original_video_data, title="Original Video")
else:
st.info("No original video available.")
with col2:
st.subheader("Result File")
# Create a dedicated placeholder for the processed video.
viewer_slot = st.empty()
if example_option != "None":
if processed_video_data:
show_autoplay_video(processed_video_data, title="Processed Video")
else:
st.info("No processed video available in example.")
else:
viewer_slot.info("Processed video will appear here once detection is run.")
###############################################################################
# DETECTION: Process the uploaded video if no example is selected.
###############################################################################
if example_option == "None" and source_file and source_file.type.split('/')[0] != 'image':
if st.sidebar.button("Let's Detect Wildfire"):
# Reset previous processed results.
st.session_state["processed_frames"] = []
st.session_state["shortened_video_data"] = None
st.session_state["shortened_video_ready"] = False
processed_frames = st.session_state["processed_frames"]
frame_count = 0
orig_fps = vidcap.get(cv2.CAP_PROP_FPS)
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Determine sampling interval.
if video_option == "Original FPS":
sample_interval = 1
output_fps = orig_fps
elif video_option == "1 fps":
sample_interval = int(orig_fps) if orig_fps > 0 else 1
output_fps = 1
elif video_option == "1 frame per 5 seconds":
sample_interval = int(orig_fps * 5) if orig_fps > 0 else 5
output_fps = 1
elif video_option == "1 frame per 10 seconds":
sample_interval = int(orig_fps * 10) if orig_fps > 0 else 10
output_fps = 1
elif video_option == "1 frame per 15 seconds":
sample_interval = int(orig_fps * 15) if orig_fps > 0 else 15
output_fps = 1
else:
sample_interval = 1
output_fps = orig_fps
success, image = vidcap.read()
while success:
if frame_count % sample_interval == 0:
res = model.predict(image, conf=confidence)
res_plotted = res[0].plot()[:, :, ::-1]
processed_frames.append(res_plotted)
# Update progress.
if total_frames > 0:
progress_pct = int((frame_count / total_frames) * 100)
progress_text.text(f"Processing frame {frame_count} / {total_frames} ({progress_pct}%)")
progress_bar.progress(min(100, progress_pct))
else:
progress_text.text(f"Processing frame {frame_count}")
# Update the viewer with the most recent processed frame.
viewer_slot.image(res_plotted, caption=f"Frame {frame_count}", use_column_width=True)
frame_count += 1
success, image = vidcap.read()
progress_text.text("Video processing complete!")
progress_bar.progress(100)
# Create shortened video from processed frames.
if processed_frames:
temp_video_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
# Use 'avc1' codec (H.264) for better compatibility.
fourcc = cv2.VideoWriter_fourcc(*'avc1')
out = cv2.VideoWriter(temp_video_file.name, fourcc, output_fps, (width, height))
for frame in processed_frames:
frame_out = cv2.convertScaleAbs(frame)
out.write(frame_out)
out.release()
with open(temp_video_file.name, 'rb') as video_file:
st.session_state["shortened_video_data"] = video_file.read()
st.session_state["shortened_video_ready"] = True
st.success("Processed video created successfully!")
# Update the viewer with the final processed video.
viewer_slot.empty()
show_autoplay_video(st.session_state["shortened_video_data"], title="Processed Video")
else:
st.error("No frames were processed from the video.")
###############################################################################
# ALWAYS display the download button if a processed video is ready.
###############################################################################
if st.session_state["shortened_video_ready"] and st.session_state["shortened_video_data"]:
download_placeholder.download_button(
label="Download Processed Video",
data=st.session_state["shortened_video_data"],
file_name="processed_video.mp4",
mime="video/mp4"
)