ccr-colorado / app.py
tstone87's picture
Update app.py
d79abfc verified
raw
history blame
10.5 kB
import os
import tempfile
import base64
import cv2
import streamlit as st
import PIL
from ultralytics import YOLO
###############################################################################
# Helper function: Display an HTML5 video with autoplay, controls, and muted
###############################################################################
def show_autoplay_video(video_data: bytes, title: str = "Video"):
if not video_data:
st.warning(f"No {title} video available.")
return
video_base64 = base64.b64encode(video_data).decode()
video_html = f"""
<h4>{title}</h4>
<video width="100%" height="auto" controls autoplay muted>
<source src="data:video/mp4;base64,{video_base64}" type="video/mp4">
Your browser does not support the video tag.
</video>
"""
st.markdown(video_html, unsafe_allow_html=True)
###############################################################################
# Session state initialization for processed results (for uploaded files)
###############################################################################
if "processed_frames" not in st.session_state:
st.session_state["processed_frames"] = []
if "shortened_video_data" not in st.session_state:
st.session_state["shortened_video_data"] = None
if "shortened_video_ready" not in st.session_state:
st.session_state["shortened_video_ready"] = False
###############################################################################
# Configure YOLO model path and Streamlit page
###############################################################################
model_path = 'https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/best.pt'
st.set_page_config(
page_title="Fire Detection: Original vs. Processed Video",
page_icon="🔥",
layout="wide",
initial_sidebar_state="expanded"
)
###############################################################################
# SIDEBAR: Upload file, set confidence, video option, and select an example pair
###############################################################################
with st.sidebar:
st.header("Video Input Options")
example_option = st.selectbox(
"Select Example Pair (optional)",
["None", "T Example", "LA Example"]
)
source_file = st.file_uploader(
"Or upload your own file...",
type=("jpg", "jpeg", "png", "bmp", "webp", "mp4")
)
confidence = float(st.slider("Select Model Confidence", 25, 100, 40)) / 100
video_option = st.selectbox(
"Select Video Shortening Option",
["Original FPS", "1 fps", "1 frame per 5 seconds", "1 frame per 10 seconds", "1 frame per 15 seconds"]
)
progress_text = st.empty()
progress_bar = st.progress(0)
###############################################################################
# MAIN PAGE TITLE
###############################################################################
st.title("Fire Detection: Original vs. Processed Video")
###############################################################################
# Load YOLO model
###############################################################################
try:
model = YOLO(model_path)
except Exception as ex:
st.error(f"Unable to load model. Check model path: {model_path}")
st.error(ex)
###############################################################################
# Determine source: Example or Uploaded File
###############################################################################
original_video_data = None
processed_video_data = None # For example pairs, these are loaded directly
if example_option != "None":
# An example pair was chosen. Load the videos from disk.
if example_option == "T Example":
# T1.mp4: original, T2.mpg: processed (analysis completed video)
try:
with open("T1.mp4", "rb") as f:
original_video_data = f.read()
with open("T2.mpg", "rb") as f:
processed_video_data = f.read()
except Exception as ex:
st.error("Error loading T Example videos. Ensure T1.mp4 and T2.mpg are in your repo.")
elif example_option == "LA Example":
# LA1.mp4: original, LA2.mp4: processed
try:
with open("LA1.mp4", "rb") as f:
original_video_data = f.read()
with open("LA2.mp4", "rb") as f:
processed_video_data = f.read()
except Exception as ex:
st.error("Error loading LA Example videos. Ensure LA1.mp4 and LA2.mp4 are in your repo.")
else:
# No example selected. Use uploaded file if available.
if source_file:
file_type = source_file.type.split('/')[0]
if file_type == 'image':
# For images, simply show the uploaded image (and detection result below)
original_image = PIL.Image.open(source_file)
# Convert image to bytes for display if needed
buf = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
original_image.save(buf.name, format="PNG")
with open(buf.name, "rb") as f:
original_video_data = f.read() # Actually, this is just an image preview.
else:
# For video, save to a temporary file and load its bytes.
tfile = tempfile.NamedTemporaryFile(delete=False)
tfile.write(source_file.read())
tfile.flush()
with open(tfile.name, "rb") as vf:
original_video_data = vf.read()
# Also open video with OpenCV for processing below.
vidcap = cv2.VideoCapture(tfile.name)
else:
st.info("Please select an example pair or upload a file.")
###############################################################################
# Display the Original and Result columns side-by-side
###############################################################################
col1, col2 = st.columns(2)
# Left column: Original video
with col1:
st.subheader("Original File")
if original_video_data:
show_autoplay_video(original_video_data, title="Original")
else:
st.info("No original video available.")
###############################################################################
# DETECTION: For uploaded video files (not example pairs) run YOLO analysis
###############################################################################
# We only run detection if no example pair is selected and if an upload is provided.
if example_option == "None" and source_file and source_file.type.split('/')[0] != 'image':
# Reset processed frames for a new analysis
st.session_state["processed_frames"] = []
frame_count = 0
orig_fps = vidcap.get(cv2.CAP_PROP_FPS)
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Determine sampling interval
if video_option == "Original FPS":
sample_interval = 1
output_fps = orig_fps
elif video_option == "1 fps":
sample_interval = int(orig_fps) if orig_fps > 0 else 1
output_fps = 1
elif video_option == "1 frame per 5 seconds":
sample_interval = int(orig_fps * 5) if orig_fps > 0 else 5
output_fps = 1
elif video_option == "1 frame per 10 seconds":
sample_interval = int(orig_fps * 10) if orig_fps > 0 else 10
output_fps = 1
elif video_option == "1 frame per 15 seconds":
sample_interval = int(orig_fps * 15) if orig_fps > 0 else 15
output_fps = 1
else:
sample_interval = 1
output_fps = orig_fps
success, image = vidcap.read()
while success:
if frame_count % sample_interval == 0:
res = model.predict(image, conf=confidence)
res_plotted = res[0].plot()[:, :, ::-1]
st.session_state["processed_frames"].append(res_plotted)
# Update progress
if total_frames > 0:
progress_pct = int((frame_count / total_frames) * 100)
progress_text.text(f"Processing frame {frame_count} / {total_frames} ({progress_pct}%)")
progress_bar.progress(min(100, progress_pct))
else:
progress_text.text(f"Processing frame {frame_count}")
frame_count += 1
success, image = vidcap.read()
progress_text.text("Video processing complete!")
progress_bar.progress(100)
# Create shortened video from processed frames
processed_frames = st.session_state["processed_frames"]
if processed_frames:
temp_video_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(temp_video_file.name, fourcc, output_fps, (width, height))
for frame in processed_frames:
out.write(frame)
out.release()
with open(temp_video_file.name, 'rb') as video_file:
st.session_state["shortened_video_data"] = video_file.read()
st.session_state["shortened_video_ready"] = True
st.success("Processed video created successfully!")
###############################################################################
# Right column: Display the Processed (Result) video
###############################################################################
with col2:
st.subheader("Result File")
# For example pairs, use the preloaded processed_video_data
if processed_video_data:
show_autoplay_video(processed_video_data, title="Processed")
# Otherwise, if a processed video has been generated from an upload, show it
elif st.session_state["shortened_video_ready"] and st.session_state["shortened_video_data"]:
show_autoplay_video(st.session_state["shortened_video_data"], title="Processed")
else:
st.info("No processed video available yet. Run detection if you uploaded a file.")
###############################################################################
# Always display the download button if a processed video is ready
###############################################################################
if st.session_state["shortened_video_ready"] and st.session_state["shortened_video_data"]:
st.download_button(
label="Download Processed Video",
data=st.session_state["shortened_video_data"],
file_name="processed_video.mp4",
mime="video/mp4"
)