Spaces:
Running
Running
File size: 6,437 Bytes
f0f9dff f98a043 0ba77f1 f0f9dff debd205 0ba77f1 24fa59e f0f9dff 24fa59e 36fbec5 9d79b23 24fa59e debd205 0ba77f1 24fa59e 0ba77f1 36fbec5 f0f9dff 24fa59e f0f9dff 36fbec5 0ba77f1 24fa59e 36fbec5 24fa59e 36fbec5 24fa59e 36fbec5 24fa59e f98a043 24fa59e debd205 24fa59e 0ba77f1 f98a043 f0f9dff 36fbec5 0ba77f1 24fa59e 36fbec5 24fa59e 36fbec5 debd205 36fbec5 debd205 24fa59e debd205 36fbec5 0ba77f1 36fbec5 0ba77f1 24fa59e 36fbec5 24fa59e 36fbec5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import streamlit as st
import cv2
import PIL.Image
from ultralytics import YOLO
import tempfile
import time
import requests
import numpy as np
import streamlink
# Page Config
st.set_page_config(page_title="AI Fire Watch", page_icon="🌍", layout="wide")
# Lighter Background CSS
st.markdown(
"""
<style>
.stApp {
background-color: #f5f5f5;
color: #333333;
}
.stTabs > div > button {
background-color: #e0e0e0;
color: #333333;
}
.stTabs > div > button:hover {
background-color: #d0d0d0;
}
</style>
""",
unsafe_allow_html=True
)
# Load Model
model_path = 'https://huggingface.co/spaces/ankitkupadhyay/fire_and_smoke/resolve/main/best.pt'
try:
model = YOLO(model_path)
except Exception as ex:
st.error(f"Model loading failed: {ex}")
st.stop()
# Header
st.title("AI Fire Watch")
st.markdown("Monitor fire and smoke in real-time with AI precision.")
# Tabs
tabs = st.tabs(["Upload", "Webcam", "YouTube"])
# Tab 1: Upload
with tabs[0]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**Add Your File**")
st.write("Upload an image or video to scan for fire or smoke.")
uploaded_file = st.file_uploader("", type=["jpg", "jpeg", "png", "mp4"], label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="upload_conf")
with col2:
if uploaded_file:
file_type = uploaded_file.type.split('/')[0]
if file_type == 'image':
image = PIL.Image.open(uploaded_file)
results = model.predict(image, conf=confidence)
detected_image = results[0].plot()[:, :, ::-1]
st.image(detected_image, use_column_width=True)
st.write(f"Objects detected: {len(results[0].boxes)}")
elif file_type == 'video':
tfile = tempfile.NamedTemporaryFile(delete=False)
tfile.write(uploaded_file.read())
cap = cv2.VideoCapture(tfile.name)
frame_placeholder = st.empty()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
time.sleep(0.05)
cap.release()
# Tab 2: Webcam
with tabs[1]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**Webcam Feed**")
st.write("Provide a webcam URL to check snapshots for hazards every 30 seconds.")
webcam_url = st.text_input("Webcam URL", "http://<your_webcam_ip>/current.jpg", label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="webcam_conf")
start = st.button("Begin Monitoring", key="webcam_start")
with col2:
if start:
image_placeholder = st.empty()
timer_placeholder = st.empty()
refresh_interval = 30 # Refresh every 30 seconds
while True:
start_time = time.time()
try:
response = requests.get(webcam_url, timeout=5)
if response.status_code != 200:
st.error(f"Fetch failed: HTTP {response.status_code}")
break
image_array = np.asarray(bytearray(response.content), dtype=np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if frame is None:
st.error("Image decoding failed.")
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
image_placeholder.image(detected_frame, use_column_width=True)
elapsed = time.time() - start_time
remaining = max(0, refresh_interval - elapsed)
timer_placeholder.write(f"Next scan: {int(remaining)}s")
while remaining > 0:
time.sleep(1)
elapsed = time.time() - start_time
remaining = max(0, refresh_interval - elapsed)
timer_placeholder.write(f"Next scan: {int(remaining)}s")
st.experimental_rerun()
except Exception as e:
st.error(f"Error: {e}")
break
# Tab 3: YouTube
with tabs[2]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**YouTube Live**")
st.write("Enter a live YouTube URL to auto-analyze the stream.")
youtube_url = st.text_input("YouTube URL", "https://www.youtube.com/watch?v=<id>", label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="yt_conf")
with col2:
if youtube_url and youtube_url != "https://www.youtube.com/watch?v=<id>":
st.write("Analyzing live stream...")
try:
streams = streamlink.streams(youtube_url)
if not streams:
st.error("No streams found. Check the URL.")
else:
stream_url = streams["best"].to_url()
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
st.error("Unable to open stream.")
else:
frame_placeholder = st.empty()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
st.error("Stream interrupted.")
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
st.write(f"Objects detected: {len(results[0].boxes)}")
time.sleep(1) # Check every second
cap.release()
except Exception as e:
st.error(f"Error: {e}") |