ccr-colorado / app.py
tstone87's picture
Update app.py
b5b4046 verified
import os
import tempfile
import cv2
import streamlit as st
import PIL
import requests
from ultralytics import YOLO
import time
import numpy as np
import imageio_ffmpeg as ffmpeg
import base64
# Page config first
st.set_page_config(
page_title="Fire Watch: Fire and Smoke Detection with an AI Vision Model",
page_icon="🔥",
layout="wide",
initial_sidebar_state="expanded"
)
# Model path
model_path = 'https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/best.pt'
# Session state initialization
for key in ["processed_frames", "slider_value", "processed_video", "start_time"]:
if key not in st.session_state:
st.session_state[key] = [] if key == "processed_frames" else 0 if key == "slider_value" else None
# Sidebar
with st.sidebar:
st.header("Upload & Settings")
source_file = st.file_uploader("Upload image/video", type=["jpg", "jpeg", "png", "bmp", "webp", "mp4"])
confidence = float(st.slider("Confidence Threshold", 10, 100, 20)) / 100
fps_options = {
"Original FPS": None,
"3 FPS": 3,
"1 FPS": 1,
"1 frame/4s": 0.25,
"1 frame/10s": 0.1,
"1 frame/15s": 0.0667,
"1 frame/30s": 0.0333
}
video_option = st.selectbox("Output Frame Rate", list(fps_options.keys()))
process_button = st.button("Detect fire")
progress_bar = st.progress(0)
progress_text = st.empty()
download_slot = st.empty()
# Main page
st.title("Fire Watch: AI-Powered Fire and Smoke Detection")
# Display result images directly
col1, col2 = st.columns(2)
with col1:
fire_4a_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/Fire_4a.jpg"
st.image(fire_4a_url, use_column_width=True)
with col2:
fire_3a_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/Fire_3a.jpg"
st.image(fire_3a_url, use_column_width=True)
st.markdown("""
Early fire and smoke detection using YOLOv8 AI vision model. See detected results below, and upload more content for additional analysis!
""")
if not source_file:
st.info("Please upload a file to begin.")
st.header("Your Results")
result_cols = st.columns(2)
viewer_slot = st.empty()
# Load model
try:
model = YOLO(model_path)
except Exception as ex:
st.error(f"Model loading failed: {str(ex)}")
model = None
# Processing
if process_button and source_file and model:
st.session_state.processed_frames = []
if source_file.type.split('/')[0] == 'image':
image = PIL.Image.open(source_file)
res = model.predict(image, conf=confidence)
result = res[0].plot()[:, :, ::-1]
with result_cols[0]:
st.image(image, caption="Original", use_column_width=True)
with result_cols[1]:
st.image(result, caption="Detected", use_column_width=True)
else:
# Video processing
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp:
tmp.write(source_file.read())
vidcap = cv2.VideoCapture(tmp.name)
orig_fps = vidcap.get(cv2.CAP_PROP_FPS)
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT))
output_fps = fps_options[video_option] if fps_options[video_option] else orig_fps
sample_interval = max(1, int(orig_fps / output_fps)) if output_fps else 1
# Set fixed output FPS to 2 (500ms per frame = 2 FPS)
fixed_output_fps = 1
st.session_state.start_time = time.time()
frame_count = 0
processed_count = 0
success, frame = vidcap.read()
while success:
if frame_count % sample_interval == 0:
res = model.predict(frame, conf=confidence)
processed_frame = res[0].plot()[:, :, ::-1]
if not processed_frame.flags['C_CONTIGUOUS']:
processed_frame = np.ascontiguousarray(processed_frame)
st.session_state.processed_frames.append(processed_frame)
processed_count += 1
elapsed = time.time() - st.session_state.start_time
progress = frame_count / total_frames
if elapsed > 0 and progress > 0:
total_estimated_time = elapsed / progress
eta = total_estimated_time - elapsed
elapsed_str = f"{int(elapsed // 60)}m {int(elapsed % 60)}s"
eta_str = f"{int(eta // 60)}m {int(eta % 60)}s" if eta > 0 else "Almost done"
else:
elapsed_str = "0s"
eta_str = "Calculating..."
progress_bar.progress(min(progress, 1.0))
progress_text.text(f"Progress: {progress:.1%}\nElapsed: {elapsed_str}\nETA: {eta_str}")
frame_count += 1
success, frame = vidcap.read()
vidcap.release()
os.unlink(tmp.name)
if st.session_state.processed_frames:
out_path = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False).name
writer = ffmpeg.write_frames(
out_path,
(width, height),
fps=fixed_output_fps, # Fixed at 1 FPS (1000ms per frame)
codec='libx264',
pix_fmt_in='bgr24',
pix_fmt_out='yuv420p'
)
writer.send(None) # Initialize writer
for frame in st.session_state.processed_frames:
writer.send(frame)
writer.close()
with open(out_path, 'rb') as f:
st.session_state.processed_video = f.read()
os.unlink(out_path)
elapsed_final = time.time() - st.session_state.start_time
elapsed_final_str = f"{int(elapsed_final // 60)}m {int(elapsed_final % 60)}s"
progress_bar.progress(1.0)
progress_text.text(f"Progress: 100%\nElapsed: {elapsed_final_str}\nETA: 0m 0s")
with result_cols[0]:
st.video(source_file)
with result_cols[1]:
st.video(st.session_state.processed_video)
download_slot.download_button(
label="Download Processed Video",
data=st.session_state.processed_video,
file_name="results_fire_analysis.mp4",
mime="video/mp4"
)