Spaces:
Running
Running
File size: 7,648 Bytes
f0f9dff f98a043 0ba77f1 f0f9dff debd205 0ba77f1 24fa59e f0f9dff 24fa59e 36fbec5 cac62cc 36fbec5 cac62cc 36fbec5 cac62cc 36fbec5 cac62cc 36fbec5 9d79b23 24fa59e debd205 0ba77f1 24fa59e 0ba77f1 cac62cc 36fbec5 f0f9dff 24fa59e f0f9dff 36fbec5 0ba77f1 24fa59e 36fbec5 24fa59e 36fbec5 24fa59e 36fbec5 24fa59e f98a043 24fa59e debd205 24fa59e 0ba77f1 f98a043 f0f9dff 36fbec5 0ba77f1 24fa59e 36fbec5 cac62cc 24fa59e cac62cc 36fbec5 cac62cc 36fbec5 debd205 36fbec5 debd205 24fa59e debd205 36fbec5 cac62cc 36fbec5 0ba77f1 36fbec5 0ba77f1 24fa59e 36fbec5 24fa59e 36fbec5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
import streamlit as st
import cv2
import PIL.Image
from ultralytics import YOLO
import tempfile
import time
import requests
import numpy as np
import streamlink
# Page Config
st.set_page_config(page_title="AI Fire Watch", page_icon="🌍", layout="wide")
# Lighter Background CSS with Darker Text
st.markdown(
"""
<style>
.stApp {
background-color: #f5f5f5;
color: #1a1a1a; /* Dark text for general content */
}
h1 {
color: #1a1a1a; /* Darker title text */
}
.stTabs > div > button {
background-color: #e0e0e0;
color: #1a1a1a; /* Darker tab text */
font-weight: bold;
}
.stTabs > div > button:hover {
background-color: #d0d0d0;
color: #1a1a1a;
}
.stButton > button {
background-color: #e0e0e0;
color: #1a1a1a; /* Darker button text */
font-weight: bold;
}
.stButton > button:hover {
background-color: #d0d0d0;
color: #1a1a1a;
}
</style>
""",
unsafe_allow_html=True
)
# Load Model
model_path = 'https://huggingface.co/spaces/ankitkupadhyay/fire_and_smoke/resolve/main/best.pt'
try:
model = YOLO(model_path)
except Exception as ex:
st.error(f"Model loading failed: {ex}")
st.stop()
# Initialize Session State
if 'monitoring' not in st.session_state:
st.session_state.monitoring = False
if 'current_webcam_url' not in st.session_state:
st.session_state.current_webcam_url = None
# Header
st.title("AI Fire Watch")
st.markdown("Monitor fire and smoke in real-time with AI precision.")
# Tabs
tabs = st.tabs(["Upload", "Webcam", "YouTube"])
# Tab 1: Upload
with tabs[0]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**Add Your File**")
st.write("Upload an image or video to scan for fire or smoke.")
uploaded_file = st.file_uploader("", type=["jpg", "jpeg", "png", "mp4"], label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="upload_conf")
with col2:
if uploaded_file:
file_type = uploaded_file.type.split('/')[0]
if file_type == 'image':
image = PIL.Image.open(uploaded_file)
results = model.predict(image, conf=confidence)
detected_image = results[0].plot()[:, :, ::-1]
st.image(detected_image, use_column_width=True)
st.write(f"Objects detected: {len(results[0].boxes)}")
elif file_type == 'video':
tfile = tempfile.NamedTemporaryFile(delete=False)
tfile.write(uploaded_file.read())
cap = cv2.VideoCapture(tfile.name)
frame_placeholder = st.empty()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
time.sleep(0.05)
cap.release()
# Tab 2: Webcam
with tabs[1]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**Webcam Feed**")
st.write("Provide a webcam URL to check snapshots for hazards every 30 seconds.")
webcam_url = st.text_input("Webcam URL", "http://<your_webcam_ip>/current.jpg", label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="webcam_conf")
start = st.button("Begin Monitoring", key="webcam_start")
stop = st.button("Stop Monitoring", key="webcam_stop")
# Handle monitoring state
if start:
st.session_state.monitoring = True
st.session_state.current_webcam_url = webcam_url
if stop or (st.session_state.monitoring and webcam_url != st.session_state.current_webcam_url):
st.session_state.monitoring = False
st.session_state.current_webcam_url = None
with col2:
if st.session_state.monitoring and st.session_state.current_webcam_url:
image_placeholder = st.empty()
timer_placeholder = st.empty()
refresh_interval = 30 # Refresh every 30 seconds
while True:
start_time = time.time()
try:
response = requests.get(st.session_state.current_webcam_url, timeout=5)
if response.status_code != 200:
st.error(f"Fetch failed: HTTP {response.status_code}")
break
image_array = np.asarray(bytearray(response.content), dtype=np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if frame is None:
st.error("Image decoding failed.")
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
image_placeholder.image(detected_frame, use_column_width=True)
elapsed = time.time() - start_time
remaining = max(0, refresh_interval - elapsed)
timer_placeholder.write(f"Next scan: {int(remaining)}s")
while remaining > 0:
time.sleep(1)
elapsed = time.time() - start_time
remaining = max(0, refresh_interval - elapsed)
timer_placeholder.write(f"Next scan: {int(remaining)}s")
st.experimental_rerun()
except Exception as e:
st.error(f"Error: {e}")
st.session_state.monitoring = False
break
# Tab 3: YouTube
with tabs[2]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**YouTube Live**")
st.write("Enter a live YouTube URL to auto-analyze the stream.")
youtube_url = st.text_input("YouTube URL", "https://www.youtube.com/watch?v=<id>", label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="yt_conf")
with col2:
if youtube_url and youtube_url != "https://www.youtube.com/watch?v=<id>":
st.write("Analyzing live stream...")
try:
streams = streamlink.streams(youtube_url)
if not streams:
st.error("No streams found. Check the URL.")
else:
stream_url = streams["best"].to_url()
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
st.error("Unable to open stream.")
else:
frame_placeholder = st.empty()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
st.error("Stream interrupted.")
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
st.write(f"Objects detected: {len(results[0].boxes)}")
time.sleep(1) # Check every second
cap.release()
except Exception as e:
st.error(f"Error: {e}") |