ccr-colorado / app.py
tstone87's picture
Update app.py
debd205 verified
raw
history blame
7.78 kB
import streamlit as st
import cv2
import PIL.Image
from ultralytics import YOLO
import tempfile
import time
import requests
import numpy as np
import streamlink # pip install streamlink
# -------------------------------
# Page & Model Setup
st.set_page_config(
page_title="WildfireWatch",
page_icon="🔥",
layout="wide",
initial_sidebar_state="expanded"
)
# IMPORTANT: Ensure the model URL returns the raw .pt file. Otherwise, use a local path.
model_path = 'https://huggingface.co/spaces/ankitkupadhyay/fire_and_smoke/resolve/main/best.pt'
try:
model = YOLO(model_path)
except Exception as ex:
st.error(f"Unable to load model from: {model_path}")
st.error(ex)
st.stop() # Stop the app if the model cannot load
st.title("WildfireWatch: Detecting Fire and Smoke using AI")
st.markdown("""
Wildfires and smoke are a major threat. This app uses a YOLOv8 model to detect fire/smoke in:
- Uploaded images or videos,
- Webcam snapshots or live streams,
- YouTube live streams.
""")
# -------------------------------
# Create three tabs for different modes
tabs = st.tabs(["File Upload", "Webcam / Image URL", "YouTube Live Stream"])
# ===============================
# Tab 1: File Upload Mode
with tabs[0]:
st.header("File Upload Mode")
col_left, col_right = st.columns(2)
with col_left:
uploaded_file = st.file_uploader("Choose an image or video...", type=["jpg", "jpeg", "png", "bmp", "webp", "mp4"])
file_confidence = st.slider("Select Detection Confidence", 25, 100, 40) / 100
display_mode = st.radio("Display Mode", options=["Detection", "Original"], index=0)
if uploaded_file:
file_type = uploaded_file.type.split('/')[0]
if file_type == 'image':
image = PIL.Image.open(uploaded_file)
col_left.image(image, caption="Uploaded Image", use_column_width=True)
results = model.predict(image, conf=file_confidence)
detected_image = results[0].plot()[:, :, ::-1]
if display_mode == "Detection":
col_right.image(detected_image, caption="Detection Result", use_column_width=True)
else:
col_right.image(image, caption="Original Image", use_column_width=True)
with col_right.expander("Detection Details"):
for box in results[0].boxes:
st.write("Box (xywh):", box.xywh)
elif file_type == 'video':
tfile = tempfile.NamedTemporaryFile(delete=False)
tfile.write(uploaded_file.read())
cap = cv2.VideoCapture(tfile.name)
if st.button("Start Video Detection"):
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.predict(frame, conf=file_confidence)
detected_frame = results[0].plot()[:, :, ::-1]
col_right.image(detected_frame, caption="Detection Frame", channels="BGR", use_column_width=True)
time.sleep(0.05)
cap.release()
# ===============================
# Tab 2: Webcam / Image URL Mode
with tabs[1]:
st.header("Webcam / Image URL Mode")
col_left, col_right = st.columns(2)
with col_left:
# Enter a webcam snapshot URL (e.g. an updating JPEG) or a live stream URL
webcam_url = st.text_input("Enter Webcam URL", value="http://<your_webcam_ip>/current.jpg")
webcam_confidence = st.slider("Select Detection Confidence", 25, 100, 40, key="webcam_conf") / 100
input_mode = st.radio("Input Mode", options=["Snapshot (Updating Image)", "Live Stream"], index=0)
start_webcam = st.button("Start Detection", key="start_webcam")
if start_webcam:
if input_mode == "Snapshot (Updating Image)":
placeholder = col_right.empty()
# Loop to periodically fetch the snapshot
while True:
try:
response = requests.get(webcam_url, timeout=5)
image_array = np.asarray(bytearray(response.content), dtype=np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if frame is None:
col_right.error("Failed to decode image from URL.")
break
results = model.predict(frame, conf=webcam_confidence)
detected_frame = results[0].plot()[:, :, ::-1]
placeholder.image(detected_frame, channels="BGR", use_column_width=True)
time.sleep(1) # Adjust refresh rate as needed
st.experimental_rerun()
except Exception as e:
col_right.error(f"Error fetching image: {e}")
break
else:
cap = cv2.VideoCapture(webcam_url)
if not cap.isOpened():
col_right.error("Unable to open webcam stream. Check the URL and network connectivity.")
else:
stop_button = st.button("Stop Live Detection", key="stop_webcam")
while cap.isOpened() and not stop_button:
ret, frame = cap.read()
if not ret:
col_right.error("Failed to retrieve frame from stream.")
break
results = model.predict(frame, conf=webcam_confidence)
detected_frame = results[0].plot()[:, :, ::-1]
col_right.image(detected_frame, channels="BGR", use_column_width=True)
time.sleep(0.05)
stop_button = st.button("Stop Live Detection", key="stop_webcam_loop")
cap.release()
# ===============================
# Tab 3: YouTube Live Stream Mode
with tabs[2]:
st.header("YouTube Live Stream Mode")
col_left, col_right = st.columns(2)
with col_left:
youtube_url = st.text_input("Enter YouTube Live URL", value="https://www.youtube.com/watch?v=<live_stream_id>")
yt_confidence = st.slider("Select Detection Confidence", 25, 100, 40, key="yt_conf") / 100
start_yt = st.button("Start Live Detection", key="start_yt")
if start_yt:
try:
streams = streamlink.streams(youtube_url)
if not streams:
st.error("No streams found. Please check the URL or ensure the stream is live.")
else:
best_stream = streams.get("best")
if best_stream is None:
st.error("No suitable stream found.")
else:
stream_url = best_stream.to_url()
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
st.error("Unable to open YouTube live stream.")
else:
stop_yt = st.button("Stop Live Detection", key="stop_yt")
while cap.isOpened() and not stop_yt:
ret, frame = cap.read()
if not ret:
col_right.error("Failed to retrieve frame from YouTube stream.")
break
results = model.predict(frame, conf=yt_confidence)
detected_frame = results[0].plot()[:, :, ::-1]
col_right.image(detected_frame, channels="BGR", use_column_width=True)
time.sleep(0.05)
stop_yt = st.button("Stop Live Detection", key="stop_yt_loop")
cap.release()
except Exception as e:
st.error(f"Error processing YouTube stream: {e}")