import os import tempfile import cv2 import streamlit as st import PIL import requests from ultralytics import YOLO import time import numpy as np import imageio_ffmpeg as ffmpeg import base64 # Page config first st.set_page_config( page_title="Fire Watch: Fire and Smoke Detection with an AI Vision Model", page_icon="🔥", layout="wide", initial_sidebar_state="expanded" ) # Model path model_path = 'https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/best.pt' # Session state initialization for key in ["processed_frames", "slider_value", "processed_video", "start_time"]: if key not in st.session_state: st.session_state[key] = [] if key == "processed_frames" else 0 if key == "slider_value" else None # Sidebar with st.sidebar: st.header("Upload & Settings") source_file = st.file_uploader("Upload image or video to be analyzed:", type=["jpg", "jpeg", "png", "bmp", "webp", "mp4"]) confidence = float(st.slider("Confidence Threshold", 10, 100, 20)) / 100 fps_options = { "Original FPS": None, "3 FPS": 3, "1 FPS": 1, "1 frame/4s": 0.25, "1 frame/10s": 0.1, "1 frame/15s": 0.0667, "1 frame/30s": 0.0333 } video_option = st.selectbox("Output Frame Rate", list(fps_options.keys())) process_button = st.button("Detect fire") progress_bar = st.progress(0) progress_text = st.empty() download_slot = st.empty() # Main page st.title("Fire Watch: AI-Powered Fire and Smoke Detection") # Display result images directly col1, col2 = st.columns(2) with col1: fire_4a_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/Fire_4a.jpg" st.image(fire_4a_url, use_column_width=True) with col2: fire_3a_url = "https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/Fire_3a.jpg" st.image(fire_3a_url, use_column_width=True) st.markdown(""" Early wildfire detection using YOLOv8 AI vision model. See detected results above and video examples below, or upload your own content! Click on video frames to load and play examples. """) # Function to create simple video pair HTML if not source_file: st.info("Please upload a file to begin.") st.header("Your Results") result_cols = st.columns(2) viewer_slot = st.empty() # Example videos (LA before T) #st.header("Example Results") #examples = [ # ("LA Example", "LA1.mp4", "LA2.mp4"), # ("T Example", "T1.mp4", "T2.mp4") #] #for title, orig_file, proc_file in examples: # st.subheader(title) # orig_url = f"https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/{orig_file}" # proc_url = f"https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/{proc_file}" # video_html = create_video_pair(orig_url, proc_url) # st.markdown(video_html, unsafe_allow_html=True) # Load model try: model = YOLO(model_path) except Exception as ex: st.error(f"Model loading failed: {str(ex)}") model = None # Processing if process_button and source_file and model: st.session_state.processed_frames = [] if source_file.type.split('/')[0] == 'image': image = PIL.Image.open(source_file) res = model.predict(image, conf=confidence) result = res[0].plot()[:, :, ::-1] with result_cols[0]: st.image(image, caption="Original", use_column_width=True) with result_cols[1]: st.image(result, caption="Detected", use_column_width=True) else: # Video processing with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: tmp.write(source_file.read()) vidcap = cv2.VideoCapture(tmp.name) orig_fps = vidcap.get(cv2.CAP_PROP_FPS) total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT)) output_fps = fps_options[video_option] if fps_options[video_option] else orig_fps sample_interval = max(1, int(orig_fps / output_fps)) if output_fps else 1 # Set fixed output FPS to 2 (500ms per frame = 2 FPS) fixed_output_fps = 2 st.session_state.start_time = time.time() frame_count = 0 processed_count = 0 success, frame = vidcap.read() while success: if frame_count % sample_interval == 0: res = model.predict(frame, conf=confidence) processed_frame = res[0].plot()[:, :, ::-1] if not processed_frame.flags['C_CONTIGUOUS']: processed_frame = np.ascontiguousarray(processed_frame) st.session_state.processed_frames.append(processed_frame) processed_count += 1 elapsed = time.time() - st.session_state.start_time progress = frame_count / total_frames if elapsed > 0 and progress > 0: total_estimated_time = elapsed / progress eta = total_estimated_time - elapsed elapsed_str = f"{int(elapsed // 60)}m {int(elapsed % 60)}s" eta_str = f"{int(eta // 60)}m {int(eta % 60)}s" if eta > 0 else "Almost done" else: elapsed_str = "0s" eta_str = "Calculating..." progress_bar.progress(min(progress, 1.0)) progress_text.text(f"Progress: {progress:.1%}\nElapsed: {elapsed_str}\nETA: {eta_str}") frame_count += 1 success, frame = vidcap.read() vidcap.release() os.unlink(tmp.name) if st.session_state.processed_frames: out_path = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False).name writer = ffmpeg.write_frames( out_path, (width, height), fps=fixed_output_fps, # Fixed at 2 FPS (500ms per frame) codec='libx264', pix_fmt_in='bgr24', pix_fmt_out='yuv420p' ) writer.send(None) # Initialize writer for frame in st.session_state.processed_frames: writer.send(frame) writer.close() with open(out_path, 'rb') as f: st.session_state.processed_video = f.read() os.unlink(out_path) elapsed_final = time.time() - st.session_state.start_time elapsed_final_str = f"{int(elapsed_final // 60)}m {int(elapsed_final % 60)}s" progress_bar.progress(1.0) progress_text.text(f"Progress: 100%\nElapsed: {elapsed_final_str}\nETA: 0m 0s") with result_cols[0]: st.video(source_file) with result_cols[1]: st.video(st.session_state.processed_video) download_slot.download_button( label="Download Processed Video", data=st.session_state.processed_video, file_name="results_fire_analysis.mp4", mime="video/mp4" ) if not source_file: st.info("Please upload a file to begin.")