Spaces:
Configuration error
Configuration error
| import time | |
| from enum import Enum | |
| from typing import Generator, Iterable, Optional, Tuple, Union | |
| import numpy as np | |
| from inference.core.interfaces.camera.entities import ( | |
| FrameID, | |
| FrameTimestamp, | |
| VideoFrame, | |
| ) | |
| from inference.core.interfaces.camera.video_source import SourceProperties, VideoSource | |
| MINIMAL_FPS = 0.01 | |
| class FPSLimiterStrategy(Enum): | |
| DROP = "drop" | |
| WAIT = "wait" | |
| def get_video_frames_generator( | |
| video: Union[VideoSource, str, int], | |
| max_fps: Optional[Union[float, int]] = None, | |
| limiter_strategy: Optional[FPSLimiterStrategy] = None, | |
| ) -> Generator[VideoFrame, None, None]: | |
| """ | |
| Util function to create a frames generator from `VideoSource` with possibility to | |
| limit FPS of consumed frames and dictate what to do if frames are produced to fast. | |
| Args: | |
| video (Union[VideoSource, str, int]): Either instance of VideoSource or video reference accepted | |
| by VideoSource.init(...) | |
| max_fps (Optional[Union[float, int]]): value of maximum FPS rate of generated frames - can be used to limit | |
| generation frequency | |
| limiter_strategy (Optional[FPSLimiterStrategy]): strategy used to deal with frames decoding exceeding | |
| limit of `max_fps`. By default - for files, in the interest of processing all frames - | |
| generation will be awaited, for streams - frames will be dropped on the floor. | |
| Returns: generator of `VideoFrame` | |
| Example: | |
| ```python | |
| for frame in get_video_frames_generator( | |
| video="./some.mp4", | |
| max_fps=50, | |
| ): | |
| pass | |
| ``` | |
| """ | |
| if issubclass(type(video), str) or issubclass(type(video), int): | |
| video = VideoSource.init( | |
| video_reference=video, | |
| ) | |
| video.start() | |
| if max_fps is None: | |
| yield from video | |
| return None | |
| limiter_strategy = resolve_limiter_strategy( | |
| explicitly_defined_strategy=limiter_strategy, | |
| source_properties=video.describe_source().source_properties, | |
| ) | |
| yield from limit_frame_rate( | |
| frames_generator=video, max_fps=max_fps, strategy=limiter_strategy | |
| ) | |
| def resolve_limiter_strategy( | |
| explicitly_defined_strategy: Optional[FPSLimiterStrategy], | |
| source_properties: Optional[SourceProperties], | |
| ) -> FPSLimiterStrategy: | |
| if explicitly_defined_strategy is not None: | |
| return explicitly_defined_strategy | |
| limiter_strategy = FPSLimiterStrategy.DROP | |
| if source_properties is not None and source_properties.is_file: | |
| limiter_strategy = FPSLimiterStrategy.WAIT | |
| return limiter_strategy | |
| def limit_frame_rate( | |
| frames_generator: Iterable[Tuple[FrameTimestamp, FrameID, np.ndarray]], | |
| max_fps: Union[float, int], | |
| strategy: FPSLimiterStrategy, | |
| ) -> Generator[Tuple[FrameTimestamp, FrameID, np.ndarray], None, None]: | |
| rate_limiter = RateLimiter(desired_fps=max_fps) | |
| for frame_data in frames_generator: | |
| delay = rate_limiter.estimate_next_action_delay() | |
| if delay <= 0.0: | |
| rate_limiter.tick() | |
| yield frame_data | |
| continue | |
| if strategy is FPSLimiterStrategy.WAIT: | |
| time.sleep(delay) | |
| rate_limiter.tick() | |
| yield frame_data | |
| class RateLimiter: | |
| """ | |
| Implements rate upper-bound rate limiting by ensuring estimate_next_tick_delay() | |
| to be at min 1 / desired_fps, not letting the client obeying outcomes to exceed | |
| assumed rate. | |
| """ | |
| def __init__(self, desired_fps: Union[float, int]): | |
| self._desired_fps = max(desired_fps, MINIMAL_FPS) | |
| self._last_tick: Optional[float] = None | |
| def tick(self) -> None: | |
| self._last_tick = time.monotonic() | |
| def estimate_next_action_delay(self) -> float: | |
| if self._last_tick is None: | |
| return 0.0 | |
| desired_delay = 1 / self._desired_fps | |
| time_since_last_tick = time.monotonic() - self._last_tick | |
| return max(desired_delay - time_since_last_tick, 0.0) | |