File size: 3,674 Bytes
f30bc3d 3a24bc8 f30bc3d 3a24bc8 f30bc3d 3a24bc8 f30bc3d 3a24bc8 f30bc3d 3a24bc8 f30bc3d 3a24bc8 f30bc3d 3a24bc8 f30bc3d 3a24bc8 f30bc3d 3a24bc8 f30bc3d 3a24bc8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
import streamlit as st
import cv2
import time
import threading
import os
from streamlit_autorefresh import st_autorefresh
# Initialize session state for broadcasting if not already set
if "broadcasting" not in st.session_state:
st.session_state.broadcasting = False
if "broadcaster_thread" not in st.session_state:
st.session_state.broadcaster_thread = None
def broadcast_loop(camera_index):
"""
Capture 3-second segments from the selected camera and write them to 'latest.mp4'.
This loop runs in a background thread while st.session_state.broadcasting is True.
"""
cap = cv2.VideoCapture(int(camera_index))
if not cap.isOpened():
st.error("Could not open camera.")
return
# Try to determine FPS; default to 20 if not available.
fps = cap.get(cv2.CAP_PROP_FPS)
if fps is None or fps < 1:
fps = 20
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
while st.session_state.broadcasting:
# Open a VideoWriter to record a 3-second segment.
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter("latest.mp4", fourcc, fps, (width, height))
segment_start = time.time()
while time.time() - segment_start < 3 and st.session_state.broadcasting:
ret, frame = cap.read()
if not ret:
break
out.write(frame)
time.sleep(1.0 / fps)
out.release() # Close the file so that it can be served.
cap.release()
def start_broadcast(password, camera_index):
"""
Start broadcasting if the correct password is entered.
"""
if password != "test123":
return "Incorrect password! Broadcast not started."
if not st.session_state.broadcasting:
st.session_state.broadcasting = True
thread = threading.Thread(target=broadcast_loop, args=(camera_index,), daemon=True)
st.session_state.broadcaster_thread = thread
thread.start()
return "Broadcasting started."
else:
return "Already broadcasting."
def stop_broadcast():
"""
Stop broadcasting by setting the broadcasting flag to False.
"""
if st.session_state.broadcasting:
st.session_state.broadcasting = False
return "Broadcast stopped."
else:
return "Not broadcasting."
def get_latest_video_path():
"""
Return the path to the latest video if it exists.
"""
return "latest.mp4" if os.path.exists("latest.mp4") else None
# Set page configuration
st.set_page_config(page_title="Temporary File‑Based Live Stream", layout="wide")
st.title("Temporary File‑Based Live Stream with Streamlit")
# Create two tabs: one for broadcasting and one for viewing.
tabs = st.tabs(["Broadcast", "View"])
with tabs[0]:
st.header("Broadcasting Controls")
password_input = st.text_input("Enter broadcast password:", type="password")
camera_index_input = st.number_input("Camera Index", min_value=0, value=0, step=1)
col1, col2 = st.columns(2)
with col1:
if st.button("Start Broadcasting"):
status = start_broadcast(password_input, camera_index_input)
st.info(status)
with col2:
if st.button("Stop Broadcasting"):
status = stop_broadcast()
st.info(status)
with tabs[1]:
st.header("Live View")
video_path = get_latest_video_path()
if video_path:
# Automatically refresh the app every 3 seconds to load the latest file.
st_autorefresh(interval=3000, key="video_autorefresh")
st.video(video_path)
else:
st.write("No broadcast available yet.")
|