Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import cv2 | |
| import PIL.Image | |
| from ultralytics import YOLO | |
| import tempfile | |
| import time | |
| import requests | |
| import numpy as np | |
| import streamlink | |
| # Page Config | |
| st.set_page_config(page_title="AI Fire Watch", page_icon="🌍", layout="wide") | |
| # Lighter Background CSS with Darker Text | |
| st.markdown( | |
| """ | |
| <style> | |
| .stApp { | |
| background-color: #f5f5f5; | |
| color: #1a1a1a; | |
| } | |
| h1 { | |
| color: #1a1a1a; | |
| } | |
| .stTabs > div > button { | |
| background-color: #e0e0e0; | |
| color: #1a1a1a; | |
| font-weight: bold; | |
| } | |
| .stTabs > div > button:hover { | |
| background-color: #d0d0d0; | |
| color: #1a1a1a; | |
| } | |
| .stButton > button { | |
| background-color: #e0e0e0; | |
| color: #1a1a1a; | |
| font-weight: bold; | |
| } | |
| .stButton > button:hover { | |
| background-color: #d0d0d0; | |
| color: #1a1a1a; | |
| } | |
| </style> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| # Load Model | |
| model_path = 'https://huggingface.co/spaces/ankitkupadhyay/fire_and_smoke/resolve/main/best.pt' | |
| try: | |
| model = YOLO(model_path) | |
| except Exception as ex: | |
| st.error(f"Model loading failed: {ex}") | |
| st.stop() | |
| # Initialize Session State | |
| if 'monitoring' not in st.session_state: | |
| st.session_state.monitoring = False | |
| if 'current_webcam_url' not in st.session_state: | |
| st.session_state.current_webcam_url = None | |
| # Header | |
| st.title("AI Fire Watch") | |
| st.markdown("Monitor fire and smoke in real-time with AI precision.") | |
| # Tabs | |
| tabs = st.tabs(["Upload", "Webcam", "YouTube"]) | |
| # Tab 1: Upload | |
| with tabs[0]: | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| st.markdown("**Add Your File**") | |
| st.write("Upload an image or video to scan for fire or smoke.") | |
| uploaded_file = st.file_uploader("", type=["jpg", "jpeg", "png", "mp4"], label_visibility="collapsed") | |
| confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="upload_conf") | |
| with col2: | |
| if uploaded_file: | |
| file_type = uploaded_file.type.split('/')[0] | |
| if file_type == 'image': | |
| image = PIL.Image.open(uploaded_file) | |
| results = model.predict(image, conf=confidence) | |
| detected_image = results[0].plot()[:, :, ::-1] | |
| st.image(detected_image, use_column_width=True) | |
| st.write(f"Objects detected: {len(results[0].boxes)}") | |
| elif file_type == 'video': | |
| tfile = tempfile.NamedTemporaryFile(delete=False) | |
| tfile.write(uploaded_file.read()) | |
| cap = cv2.VideoCapture(tfile.name) | |
| frame_placeholder = st.empty() | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| results = model.predict(frame, conf=confidence) | |
| detected_frame = results[0].plot()[:, :, ::-1] | |
| frame_placeholder.image(detected_frame, use_column_width=True) | |
| time.sleep(0.05) | |
| cap.release() | |
| # Tab 2: Webcam | |
| with tabs[1]: | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| st.markdown("**Webcam Feed**") | |
| st.write("Provide a webcam URL (image or video stream) to monitor for hazards.") | |
| webcam_url = st.text_input("Webcam URL", "http://<your_webcam_ip>/current.jpg", label_visibility="collapsed") | |
| confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="webcam_conf") | |
| refresh_rate = st.slider("Refresh Rate (seconds)", 1, 60, 30, key="webcam_rate") | |
| start = st.button("Begin Monitoring", key="webcam_start") | |
| stop = st.button("Stop Monitoring", key="webcam_stop") | |
| if start: | |
| st.session_state.monitoring = True | |
| st.session_state.current_webcam_url = webcam_url | |
| if stop or (st.session_state.monitoring and webcam_url != st.session_state.current_webcam_url): | |
| st.session_state.monitoring = False | |
| st.session_state.current_webcam_url = None | |
| with col2: | |
| if st.session_state.monitoring and st.session_state.current_webcam_url: | |
| frame_placeholder = st.empty() | |
| status_placeholder = st.empty() | |
| timer_placeholder = st.empty() | |
| # Try video stream first | |
| cap = cv2.VideoCapture(webcam_url) | |
| is_video_stream = cap.isOpened() | |
| while st.session_state.monitoring: | |
| try: | |
| start_time = time.time() | |
| if is_video_stream: | |
| ret, frame = cap.read() | |
| if not ret: | |
| status_placeholder.error("Video stream interrupted.") | |
| break | |
| else: | |
| # Fallback to image-based webcam | |
| response = requests.get(webcam_url, timeout=5) | |
| if response.status_code != 200: | |
| status_placeholder.error(f"Fetch failed: HTTP {response.status_code}") | |
| break | |
| image_array = np.asarray(bytearray(response.content), dtype=np.uint8) | |
| frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR) | |
| if frame is None: | |
| status_placeholder.error("Image decoding failed.") | |
| break | |
| results = model.predict(frame, conf=confidence) | |
| detected_frame = results[0].plot()[:, :, ::-1] | |
| frame_placeholder.image(detected_frame, use_column_width=True) | |
| status_placeholder.write(f"Objects detected: {len(results[0].boxes)}") | |
| elapsed = time.time() - start_time | |
| remaining = max(0, refresh_rate - elapsed) | |
| timer_placeholder.write(f"Next scan: {int(remaining)}s") | |
| if not is_video_stream: | |
| time.sleep(remaining) | |
| else: | |
| time.sleep(0.1) # Faster update for video streams | |
| except Exception as e: | |
| status_placeholder.error(f"Error: {e}") | |
| st.session_state.monitoring = False | |
| break | |
| if is_video_stream: | |
| cap.release() | |
| # Tab 3: YouTube | |
| with tabs[2]: | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| st.markdown("**YouTube Live**") | |
| st.write("Enter a live YouTube URL to auto-analyze the stream.") | |
| youtube_url = st.text_input("YouTube URL", "https://www.youtube.com/watch?v=<id>", label_visibility="collapsed") | |
| confidence = st.slider("Detection Threshold", 0.25, 1.0, 0.4, key="yt_conf") | |
| start_yt = st.button("Start Analysis", key="yt_start") | |
| stop_yt = st.button("Stop Analysis", key="yt_stop") | |
| if 'yt_monitoring' not in st.session_state: | |
| st.session_state.yt_monitoring = False | |
| if start_yt: | |
| st.session_state.yt_monitoring = True | |
| if stop_yt: | |
| st.session_state.yt_monitoring = False | |
| with col2: | |
| if st.session_state.yt_monitoring and youtube_url and youtube_url != "https://www.youtube.com/watch?v=<id>": | |
| status_placeholder = st.empty() | |
| frame_placeholder = st.empty() | |
| try: | |
| status_placeholder.write("Initializing stream...") | |
| streams = streamlink.streams(youtube_url) | |
| if not streams: | |
| status_placeholder.error("No streams found. Check if the URL is a live stream.") | |
| else: | |
| stream_url = streams["best"].url | |
| cap = cv2.VideoCapture(stream_url) | |
| if not cap.isOpened(): | |
| status_placeholder.error("Unable to open stream.") | |
| else: | |
| status_placeholder.write("Analyzing live stream...") | |
| while st.session_state.yt_monitoring and cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| status_placeholder.error("Stream interrupted.") | |
| break | |
| results = model.predict(frame, conf=confidence) | |
| detected_frame = results[0].plot()[:, :, ::-1] | |
| frame_placeholder.image(detected_frame, use_column_width=True) | |
| status_placeholder.write(f"Objects detected: {len(results[0].boxes)}") | |
| time.sleep(0.1) # Control frame rate | |
| cap.release() | |
| except Exception as e: | |
| status_placeholder.error(f"Error: {e}") | |
| st.session_state.yt_monitoring = False |