Spaces:
Running
Running
import streamlit as st | |
import cv2 | |
import PIL.Image | |
from ultralytics import YOLO | |
import tempfile | |
import time | |
import requests | |
import numpy as np | |
import streamlink | |
# Page Config | |
st.set_page_config(page_title="WildfireWatch", page_icon="🔥", layout="wide") | |
# Load Model | |
model_path = 'https://huggingface.co/spaces/ankitkupadhyay/fire_and_smoke/resolve/main/best.pt' | |
try: | |
model = YOLO(model_path) | |
except Exception as ex: | |
st.error(f"Model loading failed: {ex}") | |
st.stop() | |
# Minimalist Header | |
st.title("WildfireWatch") | |
st.markdown("AI-powered detection of fire and smoke.") | |
# Tabs | |
tabs = st.tabs(["Upload", "Webcam", "YouTube"]) | |
# Tab 1: File Upload | |
with tabs[0]: | |
col1, col2 = st.columns([1, 1]) | |
with col1: | |
st.markdown("**Upload an image or video**") | |
uploaded_file = st.file_uploader("", type=["jpg", "jpeg", "png", "mp4"], label_visibility="collapsed") | |
confidence = st.slider("Confidence", 0.25, 1.0, 0.4, key="upload_conf") | |
with col2: | |
if uploaded_file: | |
file_type = uploaded_file.type.split('/')[0] | |
if file_type == 'image': | |
image = PIL.Image.open(uploaded_file) | |
results = model.predict(image, conf=confidence) | |
detected_image = results[0].plot()[:, :, ::-1] | |
st.image(detected_image, use_column_width=True) | |
st.write(f"Detections: {len(results[0].boxes)}") | |
elif file_type == 'video': | |
tfile = tempfile.NamedTemporaryFile(delete=False) | |
tfile.write(uploaded_file.read()) | |
cap = cv2.VideoCapture(tfile.name) | |
frame_placeholder = st.empty() | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
results = model.predict(frame, conf=confidence) | |
detected_frame = results[0].plot()[:, :, ::-1] | |
frame_placeholder.image(detected_frame, use_column_width=True) | |
time.sleep(0.05) | |
cap.release() | |
# Tab 2: Webcam / Image URL | |
with tabs[1]: | |
col1, col2 = st.columns([1, 1]) | |
with col1: | |
st.markdown("**Webcam snapshot or stream**") | |
webcam_url = st.text_input("URL", "http://<your_webcam_ip>/current.jpg", label_visibility="collapsed") | |
confidence = st.slider("Confidence", 0.25, 1.0, 0.4, key="webcam_conf") | |
mode = st.radio("", ["Snapshot", "Stream"], label_visibility="collapsed") | |
start = st.button("Start", key="webcam_start") | |
with col2: | |
if start: | |
if mode == "Snapshot": | |
placeholder = st.empty() | |
timer_placeholder = st.empty() | |
refresh_interval = 5 # seconds | |
while True: | |
start_time = time.time() | |
try: | |
response = requests.get(webcam_url, timeout=5) | |
image_array = np.asarray(bytearray(response.content), dtype=np.uint8) | |
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) | |
results = model.predict(frame, conf=confidence) | |
detected_frame = results[0].plot()[:, :, ::-1] | |
placeholder.image(detected_frame, use_column_width=True) | |
elapsed = time.time() - start_time | |
remaining = max(0, refresh_interval - elapsed) | |
timer_placeholder.write(f"Next refresh in: {int(remaining)}s") | |
time.sleep(1) | |
if remaining <= 0: | |
st.experimental_rerun() | |
except Exception as e: | |
st.error(f"Error: {e}") | |
break | |
else: | |
cap = cv2.VideoCapture(webcam_url) | |
frame_placeholder = st.empty() | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
st.error("Stream failed.") | |
break | |
results = model.predict(frame, conf=confidence) | |
detected_frame = results[0].plot()[:, :, ::-1] | |
frame_placeholder.image(detected_frame, use_column_width=True) | |
time.sleep(0.05) | |
# Tab 3: YouTube Live Stream | |
with tabs[2]: | |
col1, col2 = st.columns([1, 1]) | |
with col1: | |
st.markdown("**YouTube live stream**") | |
youtube_url = st.text_input("URL", "https://www.youtube.com/watch?v=<id>", label_visibility="collapsed") | |
confidence = st.slider("Confidence", 0.25, 1.0, 0.4, key="yt_conf") | |
start = st.button("Start", key="yt_start") | |
with col2: | |
if start: | |
streams = streamlink.streams(youtube_url) | |
if streams: | |
stream_url = streams["best"].to_url() | |
cap = cv2.VideoCapture(stream_url) | |
frame_placeholder = st.empty() | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
st.error("Stream failed.") | |
break | |
results = model.predict(frame, conf=confidence) | |
detected_frame = results[0].plot()[:, :, ::-1] | |
frame_placeholder.image(detected_frame, use_column_width=True) | |
time.sleep(0.05) | |
else: | |
st.error("No stream found.") |