import streamlit as st import cv2 import PIL.Image from ultralytics import YOLO import tempfile import time import requests import numpy as np import streamlink # Page Config st.set_page_config(page_title="AI Fire Watch", page_icon="🌍", layout="wide") # Lighter Background CSS with Darker Text st.markdown( """ """, unsafe_allow_html=True ) # Load Model model_path = 'https://huggingface.co/spaces/ankitkupadhyay/fire_and_smoke/resolve/main/best.pt' try: model = YOLO(model_path) except Exception as ex: st.error(f"Model loading failed: {ex}") st.stop() # Initialize Session State if 'monitoring' not in st.session_state: st.session_state.monitoring = False if 'current_webcam_url' not in st.session_state: st.session_state.current_webcam_url = None # Header st.title("AI Fire Watch") st.markdown("Monitor fire and smoke in real-time with AI precision.") # Tabs tabs = st.tabs(["Upload", "Webcam", "YouTube"]) # Tab 1: Upload with tabs[0]: col1, col2 = st.columns([1, 1]) with col1: st.markdown("**Add Your File**") st.write("Upload an image or video to scan for fire or smoke.") uploaded_file = st.file_uploader("", type=["jpg", "jpeg", "png", "mp4"], label_visibility="collapsed") confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="upload_conf") with col2: if uploaded_file: file_type = uploaded_file.type.split('/')[0] if file_type == 'image': image = PIL.Image.open(uploaded_file) results = model.predict(image, conf=confidence) detected_image = results[0].plot()[:, :, ::-1] st.image(detected_image, use_column_width=True) st.write(f"Objects detected: {len(results[0].boxes)}") elif file_type == 'video': tfile = tempfile.NamedTemporaryFile(delete=False) tfile.write(uploaded_file.read()) cap = cv2.VideoCapture(tfile.name) frame_placeholder = st.empty() while cap.isOpened(): ret, frame = cap.read() if not ret: break results = model.predict(frame, conf=confidence) detected_frame = results[0].plot()[:, :, ::-1] frame_placeholder.image(detected_frame, use_column_width=True) time.sleep(0.05) cap.release() # Tab 2: Webcam with tabs[1]: col1, col2 = st.columns([1, 1]) with col1: st.markdown("**Webcam Feed**") st.write("Provide a webcam URL to check snapshots for hazards every 30 seconds.") webcam_url = st.text_input("Webcam URL", "http:///current.jpg", label_visibility="collapsed") confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="webcam_conf") start = st.button("Begin Monitoring", key="webcam_start") stop = st.button("Stop Monitoring", key="webcam_stop") # Handle monitoring state if start: st.session_state.monitoring = True st.session_state.current_webcam_url = webcam_url if stop or (st.session_state.monitoring and webcam_url != st.session_state.current_webcam_url): st.session_state.monitoring = False st.session_state.current_webcam_url = None with col2: if st.session_state.monitoring and st.session_state.current_webcam_url: image_placeholder = st.empty() timer_placeholder = st.empty() refresh_interval = 30 # Refresh every 30 seconds while True: start_time = time.time() try: response = requests.get(st.session_state.current_webcam_url, timeout=5) if response.status_code != 200: st.error(f"Fetch failed: HTTP {response.status_code}") break image_array = np.asarray(bytearray(response.content), dtype=np.uint8) frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) if frame is None: st.error("Image decoding failed.") break results = model.predict(frame, conf=confidence) detected_frame = results[0].plot()[:, :, ::-1] image_placeholder.image(detected_frame, use_column_width=True) elapsed = time.time() - start_time remaining = max(0, refresh_interval - elapsed) timer_placeholder.write(f"Next scan: {int(remaining)}s") while remaining > 0: time.sleep(1) elapsed = time.time() - start_time remaining = max(0, refresh_interval - elapsed) timer_placeholder.write(f"Next scan: {int(remaining)}s") st.experimental_rerun() except Exception as e: st.error(f"Error: {e}") st.session_state.monitoring = False break # Tab 3: YouTube with tabs[2]: col1, col2 = st.columns([1, 1]) with col1: st.markdown("**YouTube Live**") st.write("Enter a live YouTube URL to auto-analyze the stream.") youtube_url = st.text_input("YouTube URL", "https://www.youtube.com/watch?v=", label_visibility="collapsed") confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="yt_conf") with col2: if youtube_url and youtube_url != "https://www.youtube.com/watch?v=": st.write("Analyzing live stream...") try: streams = streamlink.streams(youtube_url) if not streams: st.error("No streams found. Check the URL.") else: stream_url = streams["best"].to_url() cap = cv2.VideoCapture(stream_url) if not cap.isOpened(): st.error("Unable to open stream.") else: frame_placeholder = st.empty() while cap.isOpened(): ret, frame = cap.read() if not ret: st.error("Stream interrupted.") break results = model.predict(frame, conf=confidence) detected_frame = results[0].plot()[:, :, ::-1] frame_placeholder.image(detected_frame, use_column_width=True) st.write(f"Objects detected: {len(results[0].boxes)}") time.sleep(1) # Check every second cap.release() except Exception as e: st.error(f"Error: {e}")