ccr-colorado / app.py
tstone87's picture
Update app.py
13a0ff7 verified
raw
history blame
11.5 kB
import streamlit as st
import cv2
import PIL.Image
from ultralytics import YOLO
import tempfile
import time
import requests
import numpy as np
import streamlink
# Page Config
st.set_page_config(page_title="AI Fire Watch", page_icon="🌍", layout="wide")
# Lighter Background CSS with Darker Text
st.markdown(
"""
<style>
.stApp {
background-color: #cdc0b0;
color: #1a1a1a;
}
h1 {
color: #1a1a1a;
}
.stTabs > div > button {
background-color: #e0e0e0;
color: #333333; /* Darker tab text */
font-weight: bold;
}
.stTabs > div > button:hover {
background-color: #d0d0d0;
color: #333333;
}
.stTabs > div > button[aria-selected="true"] {
background-color: #ffffff;
color: #333333;
}
.stButton > button {
background-color: #e0e0e0;
color: #1a1a1a;
font-weight: bold;
}
.stButton > button:hover {
background-color: #d0d0d0;
color: #1a1a1a;
}
/* Fix container height to prevent scrolling */
.main .block-container {
max-height: 100vh;
overflow-y: auto;
}
.stImage > img {
max-height: 50vh; /* Limit image height */
object-fit: contain;
}
</style>
""",
unsafe_allow_html=True
)
# Load Model
model_path = 'https://huggingface.co/spaces/tstone87/ccr-colorado/resolve/main/best.pt'
try:
model = YOLO(model_path)
except Exception as ex:
st.error(f"Model loading failed: {ex}")
st.stop()
# Initialize Session State
if 'monitoring' not in st.session_state:
st.session_state.monitoring = False
if 'current_webcam_url' not in st.session_state:
st.session_state.current_webcam_url = None
if 'yt_monitoring' not in st.session_state:
st.session_state.yt_monitoring = False
# Header
st.title("AI Fire Watch")
st.markdown("Monitor fire and smoke in real-time with AI vision models.")
# Tabs
tabs = st.tabs(["Upload", "Webcam", "YouTube"])
# Tab 1: Upload
with tabs[0]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**Add Your File**")
st.write("Upload an image or video to scan for fire or smoke.")
uploaded_file = st.file_uploader("", type=["jpg", "jpeg", "png", "mp4"], label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="upload_conf")
with col2:
frame_placeholder = st.empty()
status_placeholder = st.empty()
if uploaded_file:
try:
file_type = uploaded_file.type.split('/')[0]
status_placeholder.write(f"Processing {file_type} file...")
if file_type == 'image':
image = PIL.Image.open(uploaded_file)
results = model.predict(image, conf=confidence)
detected_image = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_image, use_column_width=True)
status_placeholder.write(f"Objects detected: {len(results[0].boxes)}")
elif file_type == 'video':
# Save uploaded file to temporary location
tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
tfile.write(uploaded_file.read())
tfile.close()
# Open video with OpenCV
cap = cv2.VideoCapture(tfile.name)
if not cap.isOpened():
status_placeholder.error("Failed to open video file.")
else:
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
status_placeholder.write(f"Finished processing video. Processed {frame_count} frames.")
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
status_placeholder.write(f"Frame {frame_count}: Objects detected: {len(results[0].boxes)}")
frame_count += 1
time.sleep(0.05) # Control playback speed
cap.release()
# Clean up temporary file
import os
os.unlink(tfile.name)
except Exception as e:
status_placeholder.error(f"Error processing file: {str(e)}")
# Tab 2: Webcam
with tabs[1]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**Webcam Feed**")
st.write("Provide a webcam URL (image or video stream) to monitor for hazards.")
webcam_url = st.text_input("Webcam URL", "http://<your_webcam_ip>/current.jpg", label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="webcam_conf")
refresh_rate = st.slider("Refresh Rate (seconds)", 1, 60, 30, key="webcam_rate")
start = st.button("Begin Monitoring", key="webcam_start")
stop = st.button("Stop Monitoring", key="webcam_stop")
if start:
st.session_state.monitoring = True
st.session_state.current_webcam_url = webcam_url
if stop or (st.session_state.monitoring and webcam_url != st.session_state.current_webcam_url):
st.session_state.monitoring = False
st.session_state.current_webcam_url = None
with col2:
frame_placeholder = st.empty()
status_placeholder = st.empty()
timer_placeholder = st.empty()
if st.session_state.monitoring and st.session_state.current_webcam_url:
# Try as video stream first
cap = cv2.VideoCapture(webcam_url)
is_video_stream = cap.isOpened()
if is_video_stream:
status_placeholder.write("Connected to video stream...")
while st.session_state.monitoring:
try:
ret, frame = cap.read()
if not ret:
status_placeholder.error("Video stream interrupted.")
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
status_placeholder.write(f"Objects detected: {len(results[0].boxes)}")
time.sleep(0.1) # Fast update for video
except Exception as e:
status_placeholder.error(f"Video error: {e}")
st.session_state.monitoring = False
break
cap.release()
else:
# Image-based webcam
status_placeholder.write("Monitoring image-based webcam...")
while st.session_state.monitoring:
try:
start_time = time.time()
response = requests.get(webcam_url, timeout=5)
if response.status_code != 200:
status_placeholder.error(f"Fetch failed: HTTP {response.status_code}")
break
image_array = np.asarray(bytearray(response.content), dtype=np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
if frame is None:
status_placeholder.error("Image decoding failed.")
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
status_placeholder.write(f"Objects detected: {len(results[0].boxes)}")
# Proper refresh timing for images
elapsed = time.time() - start_time
remaining = max(0, refresh_rate - elapsed)
for i in range(int(remaining), -1, -1):
if not st.session_state.monitoring:
break
timer_placeholder.write(f"Next scan: {i}s")
time.sleep(1)
except Exception as e:
status_placeholder.error(f"Image fetch error: {e}")
st.session_state.monitoring = False
break
if not st.session_state.monitoring:
timer_placeholder.write("Monitoring stopped.")
# Tab 3: YouTube
with tabs[2]:
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("**YouTube Live**")
st.write("Enter a live YouTube URL to auto-analyze the stream.")
youtube_url = st.text_input("YouTube URL", "https://www.youtube.com/watch?v=<id>", label_visibility="collapsed")
confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="yt_conf")
start_yt = st.button("Start Analysis", key="yt_start")
stop_yt = st.button("Stop Analysis", key="yt_stop")
if start_yt:
st.session_state.yt_monitoring = True
if stop_yt:
st.session_state.yt_monitoring = False
with col2:
frame_placeholder = st.empty()
status_placeholder = st.empty()
if st.session_state.yt_monitoring and youtube_url and youtube_url != "https://www.youtube.com/watch?v=<id>":
try:
status_placeholder.write("Initializing stream...")
streams = streamlink.streams(youtube_url)
if not streams:
status_placeholder.error("No streams found. Check if the URL is a live stream.")
else:
stream_url = streams["best"].url
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
status_placeholder.error("Unable to open stream.")
else:
status_placeholder.write("Analyzing live stream...")
while st.session_state.yt_monitoring and cap.isOpened():
ret, frame = cap.read()
if not ret:
status_placeholder.error("Stream interrupted.")
break
results = model.predict(frame, conf=confidence)
detected_frame = results[0].plot()[:, :, ::-1]
frame_placeholder.image(detected_frame, use_column_width=True)
status_placeholder.write(f"Objects detected: {len(results[0].boxes)}")
time.sleep(0.1)
cap.release()
except Exception as e:
status_placeholder.error(f"Error: {e}")
st.session_state.yt_monitoring = False