Spaces:
Sleeping
Sleeping
import streamlit as st | |
import cv2 | |
import PIL.Image | |
from ultralytics import YOLO | |
import tempfile | |
import time | |
import requests | |
import numpy as np | |
import streamlink # pip install streamlink | |
# ------------------------------- | |
# Page & Model Setup | |
st.set_page_config( | |
page_title="WildfireWatch", | |
page_icon="🔥", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
# IMPORTANT: Ensure the model URL returns the raw .pt file. Otherwise, use a local path. | |
model_path = 'https://huggingface.co/spaces/ankitkupadhyay/fire_and_smoke/resolve/main/best.pt' | |
try: | |
model = YOLO(model_path) | |
except Exception as ex: | |
st.error(f"Unable to load model from: {model_path}") | |
st.error(ex) | |
st.stop() # Stop the app if the model cannot load | |
st.title("WildfireWatch: Detecting Fire and Smoke using AI") | |
st.markdown(""" | |
Wildfires and smoke are a major threat. This app uses a YOLOv8 model to detect fire/smoke in: | |
- Uploaded images or videos, | |
- Webcam snapshots or live streams, | |
- YouTube live streams. | |
""") | |
# ------------------------------- | |
# Create three tabs for different modes | |
tabs = st.tabs(["File Upload", "Webcam / Image URL", "YouTube Live Stream"]) | |
# =============================== | |
# Tab 1: File Upload Mode | |
with tabs[0]: | |
st.header("File Upload Mode") | |
col_left, col_right = st.columns(2) | |
with col_left: | |
uploaded_file = st.file_uploader("Choose an image or video...", type=["jpg", "jpeg", "png", "bmp", "webp", "mp4"]) | |
file_confidence = st.slider("Select Detection Confidence", 25, 100, 40) / 100 | |
display_mode = st.radio("Display Mode", options=["Detection", "Original"], index=0) | |
if uploaded_file: | |
file_type = uploaded_file.type.split('/')[0] | |
if file_type == 'image': | |
image = PIL.Image.open(uploaded_file) | |
col_left.image(image, caption="Uploaded Image", use_column_width=True) | |
results = model.predict(image, conf=file_confidence) | |
detected_image = results[0].plot()[:, :, ::-1] | |
if display_mode == "Detection": | |
col_right.image(detected_image, caption="Detection Result", use_column_width=True) | |
else: | |
col_right.image(image, caption="Original Image", use_column_width=True) | |
with col_right.expander("Detection Details"): | |
for box in results[0].boxes: | |
st.write("Box (xywh):", box.xywh) | |
elif file_type == 'video': | |
tfile = tempfile.NamedTemporaryFile(delete=False) | |
tfile.write(uploaded_file.read()) | |
cap = cv2.VideoCapture(tfile.name) | |
if st.button("Start Video Detection"): | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
results = model.predict(frame, conf=file_confidence) | |
detected_frame = results[0].plot()[:, :, ::-1] | |
col_right.image(detected_frame, caption="Detection Frame", channels="BGR", use_column_width=True) | |
time.sleep(0.05) | |
cap.release() | |
# =============================== | |
# Tab 2: Webcam / Image URL Mode | |
with tabs[1]: | |
st.header("Webcam / Image URL Mode") | |
col_left, col_right = st.columns(2) | |
with col_left: | |
# Enter a webcam snapshot URL (e.g. an updating JPEG) or a live stream URL | |
webcam_url = st.text_input("Enter Webcam URL", value="http://<your_webcam_ip>/current.jpg") | |
webcam_confidence = st.slider("Select Detection Confidence", 25, 100, 40, key="webcam_conf") / 100 | |
input_mode = st.radio("Input Mode", options=["Snapshot (Updating Image)", "Live Stream"], index=0) | |
start_webcam = st.button("Start Detection", key="start_webcam") | |
if start_webcam: | |
if input_mode == "Snapshot (Updating Image)": | |
placeholder = col_right.empty() | |
# Loop to periodically fetch the snapshot | |
while True: | |
try: | |
response = requests.get(webcam_url, timeout=5) | |
image_array = np.asarray(bytearray(response.content), dtype=np.uint8) | |
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) | |
if frame is None: | |
col_right.error("Failed to decode image from URL.") | |
break | |
results = model.predict(frame, conf=webcam_confidence) | |
detected_frame = results[0].plot()[:, :, ::-1] | |
placeholder.image(detected_frame, channels="BGR", use_column_width=True) | |
time.sleep(1) # Adjust refresh rate as needed | |
st.experimental_rerun() | |
except Exception as e: | |
col_right.error(f"Error fetching image: {e}") | |
break | |
else: | |
cap = cv2.VideoCapture(webcam_url) | |
if not cap.isOpened(): | |
col_right.error("Unable to open webcam stream. Check the URL and network connectivity.") | |
else: | |
stop_button = st.button("Stop Live Detection", key="stop_webcam") | |
while cap.isOpened() and not stop_button: | |
ret, frame = cap.read() | |
if not ret: | |
col_right.error("Failed to retrieve frame from stream.") | |
break | |
results = model.predict(frame, conf=webcam_confidence) | |
detected_frame = results[0].plot()[:, :, ::-1] | |
col_right.image(detected_frame, channels="BGR", use_column_width=True) | |
time.sleep(0.05) | |
stop_button = st.button("Stop Live Detection", key="stop_webcam_loop") | |
cap.release() | |
# =============================== | |
# Tab 3: YouTube Live Stream Mode | |
with tabs[2]: | |
st.header("YouTube Live Stream Mode") | |
col_left, col_right = st.columns(2) | |
with col_left: | |
youtube_url = st.text_input("Enter YouTube Live URL", value="https://www.youtube.com/watch?v=<live_stream_id>") | |
yt_confidence = st.slider("Select Detection Confidence", 25, 100, 40, key="yt_conf") / 100 | |
start_yt = st.button("Start Live Detection", key="start_yt") | |
if start_yt: | |
try: | |
streams = streamlink.streams(youtube_url) | |
if not streams: | |
st.error("No streams found. Please check the URL or ensure the stream is live.") | |
else: | |
best_stream = streams.get("best") | |
if best_stream is None: | |
st.error("No suitable stream found.") | |
else: | |
stream_url = best_stream.to_url() | |
cap = cv2.VideoCapture(stream_url) | |
if not cap.isOpened(): | |
st.error("Unable to open YouTube live stream.") | |
else: | |
stop_yt = st.button("Stop Live Detection", key="stop_yt") | |
while cap.isOpened() and not stop_yt: | |
ret, frame = cap.read() | |
if not ret: | |
col_right.error("Failed to retrieve frame from YouTube stream.") | |
break | |
results = model.predict(frame, conf=yt_confidence) | |
detected_frame = results[0].plot()[:, :, ::-1] | |
col_right.image(detected_frame, channels="BGR", use_column_width=True) | |
time.sleep(0.05) | |
stop_yt = st.button("Stop Live Detection", key="stop_yt_loop") | |
cap.release() | |
except Exception as e: | |
st.error(f"Error processing YouTube stream: {e}") | |