|
import streamlit as st |
|
import cv2 |
|
import time |
|
import threading |
|
import os |
|
from streamlit_autorefresh import st_autorefresh |
|
|
|
|
|
if "broadcasting" not in st.session_state: |
|
st.session_state.broadcasting = False |
|
if "broadcaster_thread" not in st.session_state: |
|
st.session_state.broadcaster_thread = None |
|
|
|
def broadcast_loop(camera_index): |
|
""" |
|
Capture 3-second segments from the selected camera and write them to 'latest.mp4'. |
|
This loop runs in a background thread while st.session_state.broadcasting is True. |
|
""" |
|
cap = cv2.VideoCapture(int(camera_index)) |
|
if not cap.isOpened(): |
|
st.error("Could not open camera.") |
|
return |
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
if fps is None or fps < 1: |
|
fps = 20 |
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
while st.session_state.broadcasting: |
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
out = cv2.VideoWriter("latest.mp4", fourcc, fps, (width, height)) |
|
segment_start = time.time() |
|
|
|
while time.time() - segment_start < 3 and st.session_state.broadcasting: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
out.write(frame) |
|
time.sleep(1.0 / fps) |
|
out.release() |
|
cap.release() |
|
|
|
def start_broadcast(password, camera_index): |
|
""" |
|
Start broadcasting if the correct password is entered. |
|
""" |
|
if password != "test123": |
|
return "Incorrect password! Broadcast not started." |
|
if not st.session_state.broadcasting: |
|
st.session_state.broadcasting = True |
|
thread = threading.Thread(target=broadcast_loop, args=(camera_index,), daemon=True) |
|
st.session_state.broadcaster_thread = thread |
|
thread.start() |
|
return "Broadcasting started." |
|
else: |
|
return "Already broadcasting." |
|
|
|
def stop_broadcast(): |
|
""" |
|
Stop broadcasting by setting the broadcasting flag to False. |
|
""" |
|
if st.session_state.broadcasting: |
|
st.session_state.broadcasting = False |
|
return "Broadcast stopped." |
|
else: |
|
return "Not broadcasting." |
|
|
|
def get_latest_video_path(): |
|
""" |
|
Return the path to the latest video if it exists. |
|
""" |
|
return "latest.mp4" if os.path.exists("latest.mp4") else None |
|
|
|
|
|
st.set_page_config(page_title="Temporary File‑Based Live Stream", layout="wide") |
|
st.title("Temporary File‑Based Live Stream with Streamlit") |
|
|
|
|
|
tabs = st.tabs(["Broadcast", "View"]) |
|
|
|
with tabs[0]: |
|
st.header("Broadcasting Controls") |
|
password_input = st.text_input("Enter broadcast password:", type="password") |
|
camera_index_input = st.number_input("Camera Index", min_value=0, value=0, step=1) |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
if st.button("Start Broadcasting"): |
|
status = start_broadcast(password_input, camera_index_input) |
|
st.info(status) |
|
with col2: |
|
if st.button("Stop Broadcasting"): |
|
status = stop_broadcast() |
|
st.info(status) |
|
|
|
with tabs[1]: |
|
st.header("Live View") |
|
video_path = get_latest_video_path() |
|
if video_path: |
|
|
|
st_autorefresh(interval=3000, key="video_autorefresh") |
|
st.video(video_path) |
|
else: |
|
st.write("No broadcast available yet.") |
|
|