Spaces:
Running
Running
import streamlit as st | |
import cv2 | |
import PIL.Image | |
from ultralytics import YOLO | |
import tempfile | |
import requests | |
import numpy as np | |
import time | |
import streamlink # pip install streamlink | |
# ------------------------------- | |
# Page & Model Setup | |
st.set_page_config( | |
page_title="WildfireWatch", | |
page_icon="🔥", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
model_path = 'https://huggingface.co/spaces/tstone87/ccr-colorado/blob/main/best.pt' | |
try: | |
model = YOLO(model_path) | |
except Exception as ex: | |
st.error(f"Unable to load model from: {model_path}") | |
st.error(ex) | |
# ------------------------------- | |
# App Title and Description | |
st.title("WildfireWatch: Detecting Wildfire using AI") | |
st.markdown(""" | |
Wildfires are a major environmental issue that cause substantial losses. | |
Use this app to detect wildfires in images, videos, updating image URLs, or live YouTube streams. | |
""") | |
# ------------------------------- | |
# Tabs for Detection Modes | |
tabs = st.tabs(["File Upload", "Image URL", "YouTube Live Stream"]) | |
# =============================== | |
# Tab 1: File Upload Mode | |
with tabs[0]: | |
st.header("Detect Wildfire from Uploaded File") | |
col_input, col_result = st.columns(2) | |
with col_input: | |
uploaded_file = st.file_uploader( | |
"Choose an image or video...", | |
type=["jpg", "jpeg", "png", "bmp", "webp", "mp4"] | |
) | |
file_confidence = st.slider("Select Detection Confidence", 25, 100, 40) / 100 | |
if uploaded_file: | |
file_type = uploaded_file.type.split('/')[0] | |
if file_type == "image": | |
image = PIL.Image.open(uploaded_file) | |
col_input.image(image, caption="Uploaded Image", use_column_width=True) | |
if st.button("Detect Wildfire", key="detect_file_image"): | |
results = model.predict(image, conf=file_confidence) | |
annotated = results[0].plot()[:, :, ::-1] | |
col_result.image(annotated, caption="Detection Result", use_column_width=True) | |
with col_result.expander("Detection Details"): | |
for box in results[0].boxes: | |
col_result.write("Box (xywh):", box.xywh) | |
elif file_type == "video": | |
tfile = tempfile.NamedTemporaryFile(delete=False) | |
tfile.write(uploaded_file.read()) | |
cap = cv2.VideoCapture(tfile.name) | |
if st.button("Detect Wildfire in Video", key="detect_file_video"): | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
results = model.predict(frame, conf=file_confidence) | |
annotated = results[0].plot()[:, :, ::-1] | |
col_result.image(annotated, caption="Detection Result", channels="BGR", use_column_width=True) | |
time.sleep(0.05) | |
cap.release() | |
# =============================== | |
# Tab 2: Updating Image URL Mode | |
with tabs[1]: | |
st.header("Detect Wildfire from Updating Image URL") | |
col_input, col_result = st.columns(2) | |
with col_input: | |
image_url = st.text_input( | |
"Enter the URL of the updating image", | |
value="http://<your_updating_image_url>/current.jpg" | |
) | |
url_confidence = st.slider("Select Detection Confidence", 25, 100, 40, key="url_conf") / 100 | |
start_detection = st.button("Start Detection", key="start_url") | |
if start_detection: | |
placeholder = col_result.empty() | |
if "stop_url" not in st.session_state: | |
st.session_state.stop_url = False | |
# Provide a stop button (the button will update session_state.stop_url on click) | |
def stop_detection(): | |
st.session_state.stop_url = True | |
st.button("Stop Detection", on_click=stop_detection, key="url_stop") | |
# Loop until stop button is clicked | |
while not st.session_state.stop_url: | |
try: | |
response = requests.get(image_url, timeout=5) | |
image_array = np.asarray(bytearray(response.content), dtype=np.uint8) | |
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) | |
if frame is None: | |
col_result.error("Failed to decode image from URL.") | |
break | |
results = model.predict(frame, conf=url_confidence) | |
annotated = results[0].plot()[:, :, ::-1] | |
placeholder.image(annotated, channels="BGR", use_column_width=True) | |
time.sleep(1) # update interval in seconds | |
# Rerun the loop by breaking out of script execution cycle. | |
st.experimental_rerun() | |
except Exception as e: | |
col_result.error(f"Error fetching image: {e}") | |
break | |
# =============================== | |
# Tab 3: YouTube Live Stream Mode | |
with tabs[2]: | |
st.header("Detect Wildfire from YouTube Live Stream") | |
col_input, col_result = st.columns(2) | |
with col_input: | |
youtube_url = st.text_input( | |
"Enter YouTube Live URL", | |
value="https://www.youtube.com/watch?v=<live_stream_id>" | |
) | |
yt_confidence = st.slider("Select Detection Confidence", 25, 100, 40, key="yt_conf") / 100 | |
start_yt = st.button("Start Live Detection", key="start_yt") | |
if start_yt: | |
try: | |
streams = streamlink.streams(youtube_url) | |
if not streams: | |
st.error("No streams found. Please check the URL or ensure the stream is live.") | |
else: | |
best_stream = streams.get("best") | |
if best_stream is None: | |
st.error("No suitable stream found.") | |
else: | |
stream_url = best_stream.to_url() | |
cap = cv2.VideoCapture(stream_url) | |
if not cap.isOpened(): | |
st.error("Unable to open YouTube live stream.") | |
else: | |
# Provide a stop button for live stream detection | |
stop_live = st.button("Stop Live Detection", key="yt_stop") | |
while cap.isOpened() and not stop_live: | |
ret, frame = cap.read() | |
if not ret: | |
col_result.error("Failed to retrieve frame from YouTube stream.") | |
break | |
results = model.predict(frame, conf=yt_confidence) | |
annotated = results[0].plot()[:, :, ::-1] | |
col_result.image(annotated, channels="BGR", use_column_width=True) | |
time.sleep(0.05) | |
# Check for stop command during loop | |
stop_live = st.button("Stop Live Detection", key="yt_stop_loop") | |
cap.release() | |
except Exception as e: | |
st.error(f"Error processing YouTube stream: {e}") | |