import os import time from threading import Thread import cv2 from PIL import Image from inference.core.logger import logger class WebcamStream: """Class to handle webcam streaming using a separate thread. Attributes: stream_id (int): The ID of the webcam stream. frame_id (int): A counter for the current frame. vcap (VideoCapture): OpenCV video capture object. width (int): The width of the video frame. height (int): The height of the video frame. fps_input_stream (int): Frames per second of the input stream. grabbed (bool): A flag indicating if a frame was successfully grabbed. frame (array): The current frame as a NumPy array. pil_image (Image): The current frame as a PIL image. stopped (bool): A flag indicating if the stream is stopped. t (Thread): The thread used to update the stream. """ def __init__(self, stream_id=0, enforce_fps=False): """Initialize the webcam stream. Args: stream_id (int, optional): The ID of the webcam stream. Defaults to 0. """ self.stream_id = stream_id self.enforce_fps = enforce_fps self.frame_id = 0 self.vcap = cv2.VideoCapture(self.stream_id) for key in os.environ: if key.startswith("CV2_CAP_PROP"): opencv_prop = key[4:] opencv_constant = getattr(cv2, opencv_prop, None) if opencv_constant is not None: value = int(os.getenv(key)) self.vcap.set(opencv_constant, value) logger.info(f"set {opencv_prop} to {value}") else: logger.warn(f"Property {opencv_prop} not found in cv2") self.width = int(self.vcap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.height = int(self.vcap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.file_mode = self.vcap.get(cv2.CAP_PROP_FRAME_COUNT) > 0 if self.enforce_fps and not self.file_mode: logger.warn( "Ignoring enforce_fps flag for this stream. It is not compatible with streams and will cause the process to crash" ) self.enforce_fps = False self.max_fps = None if self.vcap.isOpened() is False: logger.debug("[Exiting]: Error accessing webcam stream.") exit(0) self.fps_input_stream = int(self.vcap.get(cv2.CAP_PROP_FPS)) logger.debug( "FPS of webcam hardware/input stream: {}".format(self.fps_input_stream) ) self.grabbed, self.frame = self.vcap.read() self.pil_image = Image.fromarray(cv2.cvtColor(self.frame, cv2.COLOR_BGR2RGB)) if self.grabbed is False: logger.debug("[Exiting] No more frames to read") exit(0) self.stopped = True self.t = Thread(target=self.update, args=()) self.t.daemon = True def start(self): """Start the thread for reading frames.""" self.stopped = False self.t.start() def update(self): """Update the frame by reading from the webcam.""" frame_id = 0 next_frame_time = 0 t0 = time.perf_counter() while True: t1 = time.perf_counter() if self.stopped is True: break self.grabbed = self.vcap.grab() if self.grabbed is False: logger.debug("[Exiting] No more frames to read") self.stopped = True break frame_id += 1 # We can't retrieve each frame on nano and other lower powered devices quickly enough to keep up with the stream. # By default, we will only retrieve frames when we'll be ready process them (determined by self.max_fps). if t1 > next_frame_time: ret, frame = self.vcap.retrieve() if frame is None: logger.debug("[Exiting] Frame not available for read") self.stopped = True break logger.debug( f"retrieved frame {frame_id}, effective FPS: {frame_id / (t1 - t0):.2f}" ) self.frame_id = frame_id self.frame = frame while self.file_mode and self.enforce_fps and self.max_fps is None: # sleep until we have processed the first frame and we know what our FPS should be time.sleep(0.01) if self.max_fps is None: self.max_fps = 30 next_frame_time = t1 + (1 / self.max_fps) + 0.02 if self.file_mode: t2 = time.perf_counter() if self.enforce_fps: # when enforce_fps is true, grab video frames 1:1 with inference speed time_to_sleep = next_frame_time - t2 else: # otherwise, grab at native FPS of the video file time_to_sleep = (1 / self.fps_input_stream) - (t2 - t1) if time_to_sleep > 0: time.sleep(time_to_sleep) self.vcap.release() def read_opencv(self): """Read the current frame using OpenCV. Returns: array, int: The current frame as a NumPy array, and the frame ID. """ return self.frame, self.frame_id def stop(self): """Stop the webcam stream.""" self.stopped = True