ccr-colorado / app.py
tstone87's picture
Update app.py
d44cea7 verified
raw
history blame
8.94 kB
import streamlit as st
import cv2
import PIL.Image
from ultralytics import YOLO
import tempfile
import time
import requests
import numpy as np
import streamlink
# Page Config
st.set_page_config(page_title="AI Fire Watch", page_icon="🌍", layout="wide")
# Lighter Background CSS with Darker Text
st.markdown(
"""
<style>
.stApp {
background-color: #f5f5f5;
color: #1a1a1a;
}
h1 {
color: #1a1a1a;
}
.stTabs > div > button {
background-color: #e0e0e0;
color: #1a1a1a;
font-weight: bold;
}
.stTabs > div > button:hover {
background-color: #d0d0d0;
color: #1a1a1a;
}
.stButton > button {
background-color: #e0e0e0;
color: #1a1a1a;
font-weight: bold;
}
.stButton > button:hover {
background-color: #d0d0d0;
color: #1a1a1a;
}
</style>
""",
unsafe_allow_html=True
)
# Load Model
model_path = 'https://huggingface.co/spaces/ankitkupadhyay/fire_and_smoke/resolve/main/best.pt'
try:
model = YOLO(model_path)
except Exception as ex:
st.error(f"Model loading failed: {ex}")
st.stop()
# Initialize Session State
if 'monitoring' not in st.session_state:
st.session_state.monitoring = False
if 'current_webcam_url' not in st.session_state:
st.session_state.current_webcam_url = None
# Header
st.title("AI Fire Watch")
st.markdown("Monitor fire and smoke in real-time with AI precision.")
# Tabs
tabs = st.tabs(["Upload", "Webcam", "YouTube"])
# Tab 1: Upload
with tabs[0]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**Add Your File**")
st.write("Upload an image or video to scan for fire or smoke.")
uploaded_file = st.file_uploader("", type=["jpg", "jpeg", "png", "mp4"], label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="upload_conf")
with col2:
if uploaded_file:
file_type = uploaded_file.type.split('/')[0]
if file_type == 'image':
image = PIL.Image.open(uploaded_file)
results = model.predict(image, conf=confidence)
detected_image = results[0].plot()[:, :, ::-1]
st.image(detected_image, use_column_width=True)
st.write(f"Objects detected: {len(results[0].boxes)}")
elif file_type == 'video':
tfile = tempfile.NamedTemporaryFile(delete=False)
tfile.write(uploaded_file.read())
cap = cv2.VideoCapture(tfile.name)
frame_placeholder = st.empty()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
time.sleep(0.05)
cap.release()
# Tab 2: Webcam
with tabs[1]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**Webcam Feed**")
st.write("Provide a webcam URL (image or video stream) to monitor for hazards.")
webcam_url = st.text_input("Webcam URL", "http://<your_webcam_ip>/current.jpg", label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="webcam_conf")
refresh_rate = st.slider("Refresh Rate (seconds)", 1, 60, 30, key="webcam_rate")
start = st.button("Begin Monitoring", key="webcam_start")
stop = st.button("Stop Monitoring", key="webcam_stop")
if start:
st.session_state.monitoring = True
st.session_state.current_webcam_url = webcam_url
if stop or (st.session_state.monitoring and webcam_url != st.session_state.current_webcam_url):
st.session_state.monitoring = False
st.session_state.current_webcam_url = None
with col2:
if st.session_state.monitoring and st.session_state.current_webcam_url:
frame_placeholder = st.empty()
status_placeholder = st.empty()
timer_placeholder = st.empty()
# Try video stream first
cap = cv2.VideoCapture(webcam_url)
is_video_stream = cap.isOpened()
while st.session_state.monitoring:
try:
start_time = time.time()
if is_video_stream:
ret, frame = cap.read()
if not ret:
status_placeholder.error("Video stream interrupted.")
break
else:
# Fallback to image-based webcam
response = requests.get(webcam_url, timeout=5)
if response.status_code != 200:
status_placeholder.error(f"Fetch failed: HTTP {response.status_code}")
break
image_array = np.asarray(bytearray(response.content), dtype=np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if frame is None:
status_placeholder.error("Image decoding failed.")
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
status_placeholder.write(f"Objects detected: {len(results[0].boxes)}")
elapsed = time.time() - start_time
remaining = max(0, refresh_rate - elapsed)
timer_placeholder.write(f"Next scan: {int(remaining)}s")
if not is_video_stream:
time.sleep(remaining)
else:
time.sleep(0.1) # Faster update for video streams
except Exception as e:
status_placeholder.error(f"Error: {e}")
st.session_state.monitoring = False
break
if is_video_stream:
cap.release()
# Tab 3: YouTube
with tabs[2]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**YouTube Live**")
st.write("Enter a live YouTube URL to auto-analyze the stream.")
youtube_url = st.text_input("YouTube URL", "https://www.youtube.com/watch?v=<id>", label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="yt_conf")
start_yt = st.button("Start Analysis", key="yt_start")
stop_yt = st.button("Stop Analysis", key="yt_stop")
if 'yt_monitoring' not in st.session_state:
st.session_state.yt_monitoring = False
if start_yt:
st.session_state.yt_monitoring = True
if stop_yt:
st.session_state.yt_monitoring = False
with col2:
if st.session_state.yt_monitoring and youtube_url and youtube_url != "https://www.youtube.com/watch?v=<id>":
status_placeholder = st.empty()
frame_placeholder = st.empty()
try:
status_placeholder.write("Initializing stream...")
streams = streamlink.streams(youtube_url)
if not streams:
status_placeholder.error("No streams found. Check if the URL is a live stream.")
else:
stream_url = streams["best"].url
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
status_placeholder.error("Unable to open stream.")
else:
status_placeholder.write("Analyzing live stream...")
while st.session_state.yt_monitoring and cap.isOpened():
ret, frame = cap.read()
if not ret:
status_placeholder.error("Stream interrupted.")
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
status_placeholder.write(f"Objects detected: {len(results[0].boxes)}")
time.sleep(0.1) # Control frame rate
cap.release()
except Exception as e:
status_placeholder.error(f"Error: {e}")
st.session_state.yt_monitoring = False