Spaces:
Sleeping
Sleeping
File size: 7,782 Bytes
f0f9dff f98a043 0ba77f1 f0f9dff debd205 0ba77f1 f0f9dff 0ba77f1 f0f9dff f98a043 9d79b23 debd205 0ba77f1 debd205 0ba77f1 debd205 0ba77f1 debd205 0ba77f1 f0f9dff 0ba77f1 debd205 f0f9dff 0ba77f1 debd205 0ba77f1 debd205 0ba77f1 debd205 0ba77f1 debd205 f98a043 debd205 423354e debd205 f98a043 debd205 f98a043 0ba77f1 debd205 0ba77f1 f98a043 f0f9dff 0ba77f1 debd205 0ba77f1 debd205 f98a043 debd205 0ba77f1 debd205 0ba77f1 debd205 0ba77f1 debd205 0ba77f1 debd205 0ba77f1 debd205 0ba77f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
import streamlit as st
import cv2
import PIL.Image
from ultralytics import YOLO
import tempfile
import time
import requests
import numpy as np
import streamlink # pip install streamlink
# -------------------------------
# Page & Model Setup
st.set_page_config(
page_title="WildfireWatch",
page_icon="🔥",
layout="wide",
initial_sidebar_state="expanded"
)
# IMPORTANT: Ensure the model URL returns the raw .pt file. Otherwise, use a local path.
model_path = 'https://huggingface.co/spaces/ankitkupadhyay/fire_and_smoke/resolve/main/best.pt'
try:
model = YOLO(model_path)
except Exception as ex:
st.error(f"Unable to load model from: {model_path}")
st.error(ex)
st.stop() # Stop the app if the model cannot load
st.title("WildfireWatch: Detecting Fire and Smoke using AI")
st.markdown("""
Wildfires and smoke are a major threat. This app uses a YOLOv8 model to detect fire/smoke in:
- Uploaded images or videos,
- Webcam snapshots or live streams,
- YouTube live streams.
""")
# -------------------------------
# Create three tabs for different modes
tabs = st.tabs(["File Upload", "Webcam / Image URL", "YouTube Live Stream"])
# ===============================
# Tab 1: File Upload Mode
with tabs[0]:
st.header("File Upload Mode")
col_left, col_right = st.columns(2)
with col_left:
uploaded_file = st.file_uploader("Choose an image or video...", type=["jpg", "jpeg", "png", "bmp", "webp", "mp4"])
file_confidence = st.slider("Select Detection Confidence", 25, 100, 40) / 100
display_mode = st.radio("Display Mode", options=["Detection", "Original"], index=0)
if uploaded_file:
file_type = uploaded_file.type.split('/')[0]
if file_type == 'image':
image = PIL.Image.open(uploaded_file)
col_left.image(image, caption="Uploaded Image", use_column_width=True)
results = model.predict(image, conf=file_confidence)
detected_image = results[0].plot()[:, :, ::-1]
if display_mode == "Detection":
col_right.image(detected_image, caption="Detection Result", use_column_width=True)
else:
col_right.image(image, caption="Original Image", use_column_width=True)
with col_right.expander("Detection Details"):
for box in results[0].boxes:
st.write("Box (xywh):", box.xywh)
elif file_type == 'video':
tfile = tempfile.NamedTemporaryFile(delete=False)
tfile.write(uploaded_file.read())
cap = cv2.VideoCapture(tfile.name)
if st.button("Start Video Detection"):
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.predict(frame, conf=file_confidence)
detected_frame = results[0].plot()[:, :, ::-1]
col_right.image(detected_frame, caption="Detection Frame", channels="BGR", use_column_width=True)
time.sleep(0.05)
cap.release()
# ===============================
# Tab 2: Webcam / Image URL Mode
with tabs[1]:
st.header("Webcam / Image URL Mode")
col_left, col_right = st.columns(2)
with col_left:
# Enter a webcam snapshot URL (e.g. an updating JPEG) or a live stream URL
webcam_url = st.text_input("Enter Webcam URL", value="http://<your_webcam_ip>/current.jpg")
webcam_confidence = st.slider("Select Detection Confidence", 25, 100, 40, key="webcam_conf") / 100
input_mode = st.radio("Input Mode", options=["Snapshot (Updating Image)", "Live Stream"], index=0)
start_webcam = st.button("Start Detection", key="start_webcam")
if start_webcam:
if input_mode == "Snapshot (Updating Image)":
placeholder = col_right.empty()
# Loop to periodically fetch the snapshot
while True:
try:
response = requests.get(webcam_url, timeout=5)
image_array = np.asarray(bytearray(response.content), dtype=np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if frame is None:
col_right.error("Failed to decode image from URL.")
break
results = model.predict(frame, conf=webcam_confidence)
detected_frame = results[0].plot()[:, :, ::-1]
placeholder.image(detected_frame, channels="BGR", use_column_width=True)
time.sleep(1) # Adjust refresh rate as needed
st.experimental_rerun()
except Exception as e:
col_right.error(f"Error fetching image: {e}")
break
else:
cap = cv2.VideoCapture(webcam_url)
if not cap.isOpened():
col_right.error("Unable to open webcam stream. Check the URL and network connectivity.")
else:
stop_button = st.button("Stop Live Detection", key="stop_webcam")
while cap.isOpened() and not stop_button:
ret, frame = cap.read()
if not ret:
col_right.error("Failed to retrieve frame from stream.")
break
results = model.predict(frame, conf=webcam_confidence)
detected_frame = results[0].plot()[:, :, ::-1]
col_right.image(detected_frame, channels="BGR", use_column_width=True)
time.sleep(0.05)
stop_button = st.button("Stop Live Detection", key="stop_webcam_loop")
cap.release()
# ===============================
# Tab 3: YouTube Live Stream Mode
with tabs[2]:
st.header("YouTube Live Stream Mode")
col_left, col_right = st.columns(2)
with col_left:
youtube_url = st.text_input("Enter YouTube Live URL", value="https://www.youtube.com/watch?v=<live_stream_id>")
yt_confidence = st.slider("Select Detection Confidence", 25, 100, 40, key="yt_conf") / 100
start_yt = st.button("Start Live Detection", key="start_yt")
if start_yt:
try:
streams = streamlink.streams(youtube_url)
if not streams:
st.error("No streams found. Please check the URL or ensure the stream is live.")
else:
best_stream = streams.get("best")
if best_stream is None:
st.error("No suitable stream found.")
else:
stream_url = best_stream.to_url()
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
st.error("Unable to open YouTube live stream.")
else:
stop_yt = st.button("Stop Live Detection", key="stop_yt")
while cap.isOpened() and not stop_yt:
ret, frame = cap.read()
if not ret:
col_right.error("Failed to retrieve frame from YouTube stream.")
break
results = model.predict(frame, conf=yt_confidence)
detected_frame = results[0].plot()[:, :, ::-1]
col_right.image(detected_frame, channels="BGR", use_column_width=True)
time.sleep(0.05)
stop_yt = st.button("Stop Live Detection", key="stop_yt_loop")
cap.release()
except Exception as e:
st.error(f"Error processing YouTube stream: {e}")
|