Spaces:
Runtime error
Runtime error
import time | |
from enum import Enum | |
from typing import Generator, Iterable, Optional, Tuple, Union | |
import numpy as np | |
from inference.core.interfaces.camera.entities import ( | |
FrameID, | |
FrameTimestamp, | |
VideoFrame, | |
) | |
from inference.core.interfaces.camera.video_source import SourceProperties, VideoSource | |
MINIMAL_FPS = 0.01 | |
class FPSLimiterStrategy(Enum): | |
DROP = "drop" | |
WAIT = "wait" | |
def get_video_frames_generator( | |
video: Union[VideoSource, str, int], | |
max_fps: Optional[Union[float, int]] = None, | |
limiter_strategy: Optional[FPSLimiterStrategy] = None, | |
) -> Generator[VideoFrame, None, None]: | |
""" | |
Util function to create a frames generator from `VideoSource` with possibility to | |
limit FPS of consumed frames and dictate what to do if frames are produced to fast. | |
Args: | |
video (Union[VideoSource, str, int]): Either instance of VideoSource or video reference accepted | |
by VideoSource.init(...) | |
max_fps (Optional[Union[float, int]]): value of maximum FPS rate of generated frames - can be used to limit | |
generation frequency | |
limiter_strategy (Optional[FPSLimiterStrategy]): strategy used to deal with frames decoding exceeding | |
limit of `max_fps`. By default - for files, in the interest of processing all frames - | |
generation will be awaited, for streams - frames will be dropped on the floor. | |
Returns: generator of `VideoFrame` | |
Example: | |
```python | |
for frame in get_video_frames_generator( | |
video="./some.mp4", | |
max_fps=50, | |
): | |
pass | |
``` | |
""" | |
if issubclass(type(video), str) or issubclass(type(video), int): | |
video = VideoSource.init( | |
video_reference=video, | |
) | |
video.start() | |
if max_fps is None: | |
yield from video | |
return None | |
limiter_strategy = resolve_limiter_strategy( | |
explicitly_defined_strategy=limiter_strategy, | |
source_properties=video.describe_source().source_properties, | |
) | |
yield from limit_frame_rate( | |
frames_generator=video, max_fps=max_fps, strategy=limiter_strategy | |
) | |
def resolve_limiter_strategy( | |
explicitly_defined_strategy: Optional[FPSLimiterStrategy], | |
source_properties: Optional[SourceProperties], | |
) -> FPSLimiterStrategy: | |
if explicitly_defined_strategy is not None: | |
return explicitly_defined_strategy | |
limiter_strategy = FPSLimiterStrategy.DROP | |
if source_properties is not None and source_properties.is_file: | |
limiter_strategy = FPSLimiterStrategy.WAIT | |
return limiter_strategy | |
def limit_frame_rate( | |
frames_generator: Iterable[Tuple[FrameTimestamp, FrameID, np.ndarray]], | |
max_fps: Union[float, int], | |
strategy: FPSLimiterStrategy, | |
) -> Generator[Tuple[FrameTimestamp, FrameID, np.ndarray], None, None]: | |
rate_limiter = RateLimiter(desired_fps=max_fps) | |
for frame_data in frames_generator: | |
delay = rate_limiter.estimate_next_action_delay() | |
if delay <= 0.0: | |
rate_limiter.tick() | |
yield frame_data | |
continue | |
if strategy is FPSLimiterStrategy.WAIT: | |
time.sleep(delay) | |
rate_limiter.tick() | |
yield frame_data | |
class RateLimiter: | |
""" | |
Implements rate upper-bound rate limiting by ensuring estimate_next_tick_delay() | |
to be at min 1 / desired_fps, not letting the client obeying outcomes to exceed | |
assumed rate. | |
""" | |
def __init__(self, desired_fps: Union[float, int]): | |
self._desired_fps = max(desired_fps, MINIMAL_FPS) | |
self._last_tick: Optional[float] = None | |
def tick(self) -> None: | |
self._last_tick = time.monotonic() | |
def estimate_next_action_delay(self) -> float: | |
if self._last_tick is None: | |
return 0.0 | |
desired_delay = 1 / self._desired_fps | |
time_since_last_tick = time.monotonic() - self._last_tick | |
return max(desired_delay - time_since_last_tick, 0.0) | |