import time from enum import Enum from typing import Generator, Iterable, Optional, Tuple, Union import numpy as np from inference.core.interfaces.camera.entities import ( FrameID, FrameTimestamp, VideoFrame, ) from inference.core.interfaces.camera.video_source import SourceProperties, VideoSource MINIMAL_FPS = 0.01 class FPSLimiterStrategy(Enum): DROP = "drop" WAIT = "wait" def get_video_frames_generator( video: Union[VideoSource, str, int], max_fps: Optional[Union[float, int]] = None, limiter_strategy: Optional[FPSLimiterStrategy] = None, ) -> Generator[VideoFrame, None, None]: """ Util function to create a frames generator from `VideoSource` with possibility to limit FPS of consumed frames and dictate what to do if frames are produced to fast. Args: video (Union[VideoSource, str, int]): Either instance of VideoSource or video reference accepted by VideoSource.init(...) max_fps (Optional[Union[float, int]]): value of maximum FPS rate of generated frames - can be used to limit generation frequency limiter_strategy (Optional[FPSLimiterStrategy]): strategy used to deal with frames decoding exceeding limit of `max_fps`. By default - for files, in the interest of processing all frames - generation will be awaited, for streams - frames will be dropped on the floor. Returns: generator of `VideoFrame` Example: ```python for frame in get_video_frames_generator( video="./some.mp4", max_fps=50, ): pass ``` """ if issubclass(type(video), str) or issubclass(type(video), int): video = VideoSource.init( video_reference=video, ) video.start() if max_fps is None: yield from video return None limiter_strategy = resolve_limiter_strategy( explicitly_defined_strategy=limiter_strategy, source_properties=video.describe_source().source_properties, ) yield from limit_frame_rate( frames_generator=video, max_fps=max_fps, strategy=limiter_strategy ) def resolve_limiter_strategy( explicitly_defined_strategy: Optional[FPSLimiterStrategy], source_properties: Optional[SourceProperties], ) -> FPSLimiterStrategy: if explicitly_defined_strategy is not None: return explicitly_defined_strategy limiter_strategy = FPSLimiterStrategy.DROP if source_properties is not None and source_properties.is_file: limiter_strategy = FPSLimiterStrategy.WAIT return limiter_strategy def limit_frame_rate( frames_generator: Iterable[Tuple[FrameTimestamp, FrameID, np.ndarray]], max_fps: Union[float, int], strategy: FPSLimiterStrategy, ) -> Generator[Tuple[FrameTimestamp, FrameID, np.ndarray], None, None]: rate_limiter = RateLimiter(desired_fps=max_fps) for frame_data in frames_generator: delay = rate_limiter.estimate_next_action_delay() if delay <= 0.0: rate_limiter.tick() yield frame_data continue if strategy is FPSLimiterStrategy.WAIT: time.sleep(delay) rate_limiter.tick() yield frame_data class RateLimiter: """ Implements rate upper-bound rate limiting by ensuring estimate_next_tick_delay() to be at min 1 / desired_fps, not letting the client obeying outcomes to exceed assumed rate. """ def __init__(self, desired_fps: Union[float, int]): self._desired_fps = max(desired_fps, MINIMAL_FPS) self._last_tick: Optional[float] = None def tick(self) -> None: self._last_tick = time.monotonic() def estimate_next_action_delay(self) -> float: if self._last_tick is None: return 0.0 desired_delay = 1 / self._desired_fps time_since_last_tick = time.monotonic() - self._last_tick return max(desired_delay - time_since_last_tick, 0.0)