import streamlit as st import cv2 import time import threading import os from streamlit_autorefresh import st_autorefresh # Initialize session state for broadcasting if not already set if "broadcasting" not in st.session_state: st.session_state.broadcasting = False if "broadcaster_thread" not in st.session_state: st.session_state.broadcaster_thread = None def broadcast_loop(camera_index): """ Capture 3-second segments from the selected camera and write them to 'latest.mp4'. This loop runs in a background thread while st.session_state.broadcasting is True. """ cap = cv2.VideoCapture(int(camera_index)) if not cap.isOpened(): st.error("Could not open camera.") return # Try to determine FPS; default to 20 if not available. fps = cap.get(cv2.CAP_PROP_FPS) if fps is None or fps < 1: fps = 20 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) while st.session_state.broadcasting: # Open a VideoWriter to record a 3-second segment. fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter("latest.mp4", fourcc, fps, (width, height)) segment_start = time.time() while time.time() - segment_start < 3 and st.session_state.broadcasting: ret, frame = cap.read() if not ret: break out.write(frame) time.sleep(1.0 / fps) out.release() # Close the file so that it can be served. cap.release() def start_broadcast(password, camera_index): """ Start broadcasting if the correct password is entered. """ if password != "test123": return "Incorrect password! Broadcast not started." if not st.session_state.broadcasting: st.session_state.broadcasting = True thread = threading.Thread(target=broadcast_loop, args=(camera_index,), daemon=True) st.session_state.broadcaster_thread = thread thread.start() return "Broadcasting started." else: return "Already broadcasting." def stop_broadcast(): """ Stop broadcasting by setting the broadcasting flag to False. """ if st.session_state.broadcasting: st.session_state.broadcasting = False return "Broadcast stopped." else: return "Not broadcasting." def get_latest_video_path(): """ Return the path to the latest video if it exists. """ return "latest.mp4" if os.path.exists("latest.mp4") else None # Set page configuration st.set_page_config(page_title="Temporary File‑Based Live Stream", layout="wide") st.title("Temporary File‑Based Live Stream with Streamlit") # Create two tabs: one for broadcasting and one for viewing. tabs = st.tabs(["Broadcast", "View"]) with tabs[0]: st.header("Broadcasting Controls") password_input = st.text_input("Enter broadcast password:", type="password") camera_index_input = st.number_input("Camera Index", min_value=0, value=0, step=1) col1, col2 = st.columns(2) with col1: if st.button("Start Broadcasting"): status = start_broadcast(password_input, camera_index_input) st.info(status) with col2: if st.button("Stop Broadcasting"): status = stop_broadcast() st.info(status) with tabs[1]: st.header("Live View") video_path = get_latest_video_path() if video_path: # Automatically refresh the app every 3 seconds to load the latest file. st_autorefresh(interval=3000, key="video_autorefresh") st.video(video_path) else: st.write("No broadcast available yet.")