import PIL
import cv2
import streamlit as st
from ultralytics import YOLO
import tempfile
import time
import requests
import numpy as np
import os
# Page Config
st.set_page_config(page_title="WildfireWatch", page_icon="🔥", layout="wide")
# CSS for layout stability and dark tab text
st.markdown(
"""
""",
unsafe_allow_html=True
)
# Load Model
model_path = 'https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/best.pt'
try:
model = YOLO(model_path)
except Exception as ex:
st.error(f"Unable to load model. Check the specified path: {model_path}")
st.error(ex)
st.stop()
# Initialize Session State
if 'monitoring' not in st.session_state:
st.session_state.monitoring = False
if 'current_webcam_url' not in st.session_state:
st.session_state.current_webcam_url = None
# Header
st.title("WildfireWatch: Detecting Wildfire using AI")
st.markdown("""
Wildfires are a major environmental issue, causing substantial losses to ecosystems, human livelihoods, and potentially leading to loss of life. Early detection of wildfires can prevent these losses. Our application uses state-of-the-art YOLOv8 model for real-time wildfire and smoke detection.
""")
st.markdown("---")
# Tabs
tabs = st.tabs(["Upload", "Webcam"])
# Tab 1: Upload (Simplified with diagnostics)
with tabs[0]:
col1, col2 = st.columns(2)
with col1:
st.markdown("**Add Your File**")
st.write("Upload an image or video to scan for fire or smoke.")
source_file = st.file_uploader("", type=["jpg", "jpeg", "png", "mp4"], label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="upload_conf")
sampling_options = {"Every Frame": 0, "1 FPS": 1, "2 FPS": 2, "5 FPS": 5}
sampling_rate = st.selectbox("Analysis Rate", list(sampling_options.keys()), index=1, key="sampling_rate")
with col2:
frame_placeholder = st.empty()
status_placeholder = st.empty()
progress_placeholder = st.empty()
download_placeholder = st.empty()
if source_file:
st.write(f"File size: {source_file.size / 1024 / 1024:.2f} MB") # Diagnostic
if st.button("Detect Wildfire", key="upload_detect"):
file_type = source_file.type.split('/')[0]
if file_type == 'image':
uploaded_image = PIL.Image.open(source_file)
res = model.predict(uploaded_image, conf=confidence)
detected_image = res[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_image, use_column_width=True)
status_placeholder.write(f"Objects detected: {len(res[0].boxes)}")
elif file_type == 'video':
try:
# Save input video
input_tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
input_tfile.write(source_file.read())
input_tfile.close()
# Open video
vidcap = cv2.VideoCapture(input_tfile.name)
if not vidcap.isOpened():
status_placeholder.error("Failed to open video file.")
else:
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = int(vidcap.get(cv2.CAP_PROP_FPS)) or 30
frame_width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Frame sampling
target_fps = sampling_options[sampling_rate]
frame_skip = 1 if target_fps == 0 else max(1, int(fps / target_fps))
# Output video
output_tfile = tempfile.NamedTemporaryFile(delete=False, suffix='_detected.mp4')
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_tfile.name, fourcc, fps, (frame_width, frame_height))
success, frame = vidcap.read()
frame_count = 0
processed_count = 0
last_detected_frame = None
while success:
if frame_count % frame_skip == 0:
res = model.predict(frame, conf=confidence)
detected_frame = res[0].plot()[:, :, ::-1]
last_detected_frame = detected_frame
frame_placeholder.image(detected_frame, use_column_width=True)
status_placeholder.write(f"Frame {frame_count}: Objects detected: {len(res[0].boxes)}")
processed_count += 1
elif last_detected_frame is not None:
frame_placeholder.image(last_detected_frame, use_column_width=True)
if last_detected_frame is not None:
out.write(last_detected_frame[:, :, ::-1])
# Progress
if total_frames > 0:
progress_percent = (frame_count + 1) / total_frames * 100
progress_placeholder.write(f"Progress: {progress_percent:.1f}% (Processed {processed_count} frames)")
else:
progress_placeholder.write(f"Progress: {frame_count} frames processed")
success, frame = vidcap.read()
frame_count += 1
time.sleep(0.05)
vidcap.release()
out.release()
os.unlink(input_tfile.name)
with open(output_tfile.name, 'rb') as f:
download_placeholder.download_button(
label="Download Analyzed Video",
data=f,
file_name="analyzed_video.mp4",
mime="video/mp4"
)
status_placeholder.write(f"Video processing complete. Processed {processed_count} of {frame_count} frames.")
except Exception as e:
status_placeholder.error(f"Error processing video: {str(e)}")
# Tab 2: Webcam (Unchanged)
with tabs[1]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**Webcam Feed**")
st.write("Provide a webcam URL (image or video stream) to monitor for hazards.")
webcam_url = st.text_input("Webcam URL", "http:///current.jpg", label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="webcam_conf")
refresh_rate = st.slider("Refresh Rate (seconds)", 1, 60, 30, key="webcam_rate")
start = st.button("Begin Monitoring", key="webcam_start")
stop = st.button("Stop Monitoring", key="webcam_stop")
if start:
st.session_state.monitoring = True
st.session_state.current_webcam_url = webcam_url
if stop or (st.session_state.monitoring and webcam_url != st.session_state.current_webcam_url):
st.session_state.monitoring = False
st.session_state.current_webcam_url = None
with col2:
frame_placeholder = st.empty()
status_placeholder = st.empty()
timer_placeholder = st.empty()
if st.session_state.monitoring and st.session_state.current_webcam_url:
cap = cv2.VideoCapture(webcam_url)
is_video_stream = cap.isOpened()
if is_video_stream:
status_placeholder.write("Connected to video stream...")
while st.session_state.monitoring and cap.isOpened():
try:
ret, frame = cap.read()
if not ret:
status_placeholder.error("Video stream interrupted.")
break
if webcam_url != st.session_state.current_webcam_url:
status_placeholder.write("URL changed. Stopping video monitoring.")
break
res = model.predict(frame, conf=confidence)
detected_frame = res[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
status_placeholder.write(f"Objects detected: {len(res[0].boxes)}")
time.sleep(0.1)
except Exception as e:
status_placeholder.error(f"Video error: {e}")
st.session_state.monitoring = False
break
cap.release()
else:
status_placeholder.write("Monitoring image-based webcam...")
while st.session_state.monitoring:
try:
start_time = time.time()
if webcam_url != st.session_state.current_webcam_url:
status_placeholder.write("URL changed. Stopping image monitoring.")
break
response = requests.get(webcam_url, timeout=5)
if response.status_code != 200:
status_placeholder.error(f"Fetch failed: HTTP {response.status_code}")
break
image_array = np.asarray(bytearray(response.content), dtype=np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if frame is None:
status_placeholder.error("Image decoding failed.")
break
res = model.predict(frame, conf=confidence)
detected_frame = res[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
status_placeholder.write(f"Objects detected: {len(res[0].boxes)}")
elapsed = time.time() - start_time
remaining = max(0, refresh_rate - elapsed)
for i in range(int(remaining), -1, -1):
if not st.session_state.monitoring or webcam_url != st.session_state.current_webcam_url:
status_placeholder.write("Monitoring interrupted or URL changed.")
break
timer_placeholder.write(f"Next scan: {i}s")
time.sleep(1)
except Exception as e:
status_placeholder.error(f"Image fetch error: {e}")
st.session_state.monitoring = False
break
if not st.session_state.monitoring:
timer_placeholder.write("Monitoring stopped.")