File size: 7,022 Bytes
cc0dd3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# Copyright (c) OpenMMLab. All rights reserved.
from functools import wraps
from queue import Queue
from typing import Any, Dict, List, Optional

from mmengine import is_seq_of

__all__ = ['BufferManager']


def check_buffer_registered(exist=True):
    """A function wrapper to check the buffer existence before it is being used
    by the wrapped function.

    Args:
        exist (bool): If set to ``True``, assert the buffer exists; if set to
            ``False``, assert the buffer does not exist. Default: ``True``
    """

    def wrapper(func):

        @wraps(func)
        def wrapped(manager, name, *args, **kwargs):
            if exist:
                # Assert buffer exist
                if name not in manager:
                    raise ValueError(f'Fail to call {func.__name__}: '
                                     f'buffer "{name}" is not registered.')
            else:
                # Assert buffer not exist
                if name in manager:
                    raise ValueError(f'Fail to call {func.__name__}: '
                                     f'buffer "{name}" is already registered.')
            return func(manager, name, *args, **kwargs)

        return wrapped

    return wrapper


class Buffer(Queue):

    def put_force(self, item: Any):
        """Force to put an item into the buffer.

        If the buffer is already full, the earliest item in the buffer will be
        remove to make room for the incoming item.

        Args:
            item (any): The item to put into the buffer
        """
        with self.mutex:
            if self.maxsize > 0:
                while self._qsize() >= self.maxsize:
                    _ = self._get()
                    self.unfinished_tasks -= 1

            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()


class BufferManager():
    """A helper class to manage multiple buffers.

    Parameters:
        buffer_type (type): The class to build buffer instances. Default:
            :class:`mmpose.apis.webcam.utils.buffer.Buffer`.
        buffers (dict, optional): Create :class:`BufferManager` from existing
            buffers. Each item should a buffer name and the buffer. If not
            given, an empty buffer manager will be create. Default: ``None``
    """

    def __init__(self,
                 buffer_type: type = Buffer,
                 buffers: Optional[Dict] = None):
        self.buffer_type = buffer_type
        if buffers is None:
            self._buffers = {}
        else:
            if is_seq_of(list(buffers.values()), buffer_type):
                self._buffers = buffers.copy()
            else:
                raise ValueError('The values of buffers should be instance '
                                 f'of {buffer_type}')

    def __contains__(self, name):
        return name in self._buffers

    @check_buffer_registered(False)
    def register_buffer(self, name, maxsize: int = 0):
        """Register a buffer.

        If the buffer already exists, an ValueError will be raised.

        Args:
            name (any): The buffer name
            maxsize (int): The capacity of the buffer. If set to 0, the
                capacity is unlimited. Default: 0
        """
        self._buffers[name] = self.buffer_type(maxsize)

    @check_buffer_registered()
    def put(self, name, item, block: bool = True, timeout: float = None):
        """Put an item into specified buffer.

        Args:
            name (any): The buffer name
            item (any): The item to put into the buffer
            block (bool): If set to ``True``, block if necessary util a free
                slot is available in the target buffer. It blocks at most
                ``timeout`` seconds and raises the ``Full`` exception.
                Otherwise, put an item on the queue if a free slot is
                immediately available, else raise the ``Full`` exception.
                Default: ``True``
            timeout (float, optional): The most waiting time in seconds if
                ``block`` is ``True``. Default: ``None``
        """
        self._buffers[name].put(item, block, timeout)

    @check_buffer_registered()
    def put_force(self, name, item):
        """Force to put an item into specified buffer. If the buffer was full,
        the earliest item within the buffer will be popped out to make a free
        slot.

        Args:
            name (any): The buffer name
            item (any): The item to put into the buffer
        """
        self._buffers[name].put_force(item)

    @check_buffer_registered()
    def get(self, name, block: bool = True, timeout: float = None) -> Any:
        """Remove an return an item from the specified buffer.

        Args:
            name (any): The buffer name
            block (bool): If set to ``True``, block if necessary until an item
                is available in the target buffer. It blocks at most
                ``timeout`` seconds and raises the ``Empty`` exception.
                Otherwise, return an item if one is immediately available,
                else raise the ``Empty`` exception. Default: ``True``
            timeout (float, optional): The most waiting time in seconds if
                ``block`` is ``True``. Default: ``None``

        Returns:
            any: The returned item.
        """
        return self._buffers[name].get(block, timeout)

    @check_buffer_registered()
    def is_empty(self, name) -> bool:
        """Check if a buffer is empty.

        Args:
            name (any): The buffer name

        Returns:
            bool: Weather the buffer is empty.
        """
        return self._buffers[name].empty()

    @check_buffer_registered()
    def is_full(self, name):
        """Check if a buffer is full.

        Args:
            name (any): The buffer name

        Returns:
            bool: Weather the buffer is full.
        """
        return self._buffers[name].full()

    def get_sub_manager(self, buffer_names: List[str]) -> 'BufferManager':
        """Return a :class:`BufferManager` instance that covers a subset of the
        buffers in the parent. The is usually used to partially share the
        buffers of the executor to the node.

        Args:
            buffer_names (list): The list of buffers to create the sub manager

        Returns:
            BufferManager: The created sub buffer manager.
        """
        buffers = {name: self._buffers[name] for name in buffer_names}
        return BufferManager(self.buffer_type, buffers)

    def get_info(self):
        """Returns the information of all buffers in the manager.

        Returns:
            dict[any, dict]: Each item is a buffer name and the information
            dict of that buffer.
        """
        buffer_info = {}
        for name, buffer in self._buffers.items():
            buffer_info[name] = {
                'size': buffer.qsize(),
                'maxsize': buffer.maxsize
            }
        return buffer_info