File size: 8,561 Bytes
c61ccee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import ctypes

import torch
from torch._streambase import _EventBase, _StreamBase
from .._utils import _dummy_type


if not hasattr(torch._C, "_CudaStreamBase"):
    # Define dummy base classes
    torch._C.__dict__["_CudaStreamBase"] = _dummy_type("_CudaStreamBase")
    torch._C.__dict__["_CudaEventBase"] = _dummy_type("_CudaEventBase")


class Stream(torch._C._CudaStreamBase, _StreamBase):
    r"""Wrapper around a CUDA stream.



    A CUDA stream is a linear sequence of execution that belongs to a specific

    device, independent from other streams.  See :ref:`cuda-semantics` for

    details.



    Args:

        device(torch.device or int, optional): a device on which to allocate

            the stream. If :attr:`device` is ``None`` (default) or a negative

            integer, this will use the current device.

        priority(int, optional): priority of the stream, should be 0 or

            negative, where negative numbers indicate higher priority. By default,

            streams have priority 0.



    """

    def __new__(cls, device=None, priority=0, **kwargs):
        # setting device manager is expensive, so we avoid it unless necessary
        if device is None or ("stream_id" in kwargs and "device_index" in kwargs):
            return super().__new__(cls, priority=priority, **kwargs)
        else:
            with torch.cuda.device(device):
                return super().__new__(cls, priority=priority, **kwargs)

    def wait_event(self, event):
        r"""Make all future work submitted to the stream wait for an event.



        Args:

            event (torch.cuda.Event): an event to wait for.



        .. note:: This is a wrapper around ``cudaStreamWaitEvent()``: see

           `CUDA Stream documentation`_ for more info.



           This function returns without waiting for :attr:`event`: only future

           operations are affected.



        .. _CUDA Stream documentation:

           https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__STREAM.html

        """
        event.wait(self)

    def wait_stream(self, stream):
        r"""Synchronize with another stream.



        All future work submitted to this stream will wait until all kernels

        submitted to a given stream at the time of call complete.



        Args:

            stream (Stream): a stream to synchronize.



        .. note:: This function returns without waiting for currently enqueued

           kernels in :attr:`stream`: only future operations are affected.

        """
        self.wait_event(stream.record_event())

    def record_event(self, event=None):
        r"""Record an event.



        Args:

            event (torch.cuda.Event, optional): event to record. If not given, a new one

                will be allocated.



        Returns:

            Recorded event.

        """
        if event is None:
            event = Event()
        event.record(self)
        return event

    def query(self):
        r"""Check if all the work submitted has been completed.



        Returns:

            A boolean indicating if all kernels in this stream are completed.

        """
        return super().query()

    def synchronize(self):
        r"""Wait for all the kernels in this stream to complete.



        .. note:: This is a wrapper around ``cudaStreamSynchronize()``: see

           `CUDA Stream documentation`_ for more info.

        """
        super().synchronize()

    @property
    def _as_parameter_(self):
        return ctypes.c_void_p(self.cuda_stream)

    def __eq__(self, o):
        if isinstance(o, Stream):
            return super().__eq__(o)
        return False

    def __hash__(self):
        return hash((self.cuda_stream, self.device))

    def __repr__(self):
        return f"<torch.cuda.Stream device={self.device} cuda_stream={self.cuda_stream:#x}>"


class ExternalStream(Stream):
    r"""Wrapper around an externally allocated CUDA stream.



    This class is used to wrap streams allocated in other libraries in order

    to facilitate data exchange and multi-library interactions.



    .. note:: This class doesn't manage the stream life-cycle, it is the user

       responsibility to keep the referenced stream alive while this class is

       being used.



    Args:

        stream_ptr(int): Integer representation of the `cudaStream_t` value.

            allocated externally.

        device(torch.device or int, optional): the device where the stream

            was originally allocated. if device is specified incorrectly,

            subsequent launches using this stream may fail.

    """

    def __new__(cls, stream_ptr, device=None, **kwargs):
        with torch.cuda.device(device):
            return super().__new__(cls, stream_ptr=stream_ptr, **kwargs)


class Event(torch._C._CudaEventBase, _EventBase):
    r"""Wrapper around a CUDA event.



    CUDA events are synchronization markers that can be used to monitor the

    device's progress, to accurately measure timing, and to synchronize CUDA

    streams.



    The underlying CUDA events are lazily initialized when the event is first

    recorded or exported to another process. After creation, only streams on the

    same device may record the event. However, streams on any device can wait on

    the event.



    Args:

        enable_timing (bool, optional): indicates if the event should measure time

            (default: ``False``)

        blocking (bool, optional): if ``True``, :meth:`wait` will be blocking (default: ``False``)

        interprocess (bool): if ``True``, the event can be shared between processes

            (default: ``False``)



    .. _CUDA Event Documentation:

       https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html

    """

    def __new__(cls, enable_timing=False, blocking=False, interprocess=False):
        return super().__new__(
            cls,
            enable_timing=enable_timing,
            blocking=blocking,
            interprocess=interprocess,
        )

    @classmethod
    def from_ipc_handle(cls, device, handle):
        r"""Reconstruct an event from an IPC handle on the given device."""
        return super().from_ipc_handle(device, handle)

    def record(self, stream=None):
        r"""Record the event in a given stream.



        Uses ``torch.cuda.current_stream()`` if no stream is specified. The

        stream's device must match the event's device.

        """
        if stream is None:
            stream = torch.cuda.current_stream()
        super().record(stream)

    def wait(self, stream=None):
        r"""Make all future work submitted to the given stream wait for this event.



        Use ``torch.cuda.current_stream()`` if no stream is specified.



        .. note:: This is a wrapper around ``cudaStreamWaitEvent()``: see

            `CUDA Event documentation`_ for more info.

        """
        if stream is None:
            stream = torch.cuda.current_stream()
        super().wait(stream)

    def query(self):
        r"""Check if all work currently captured by event has completed.



        Returns:

            A boolean indicating if all work currently captured by event has

            completed.

        """
        return super().query()

    def elapsed_time(self, end_event):
        r"""Return the time elapsed.



        Time reported in milliseconds after the event was recorded and

        before the end_event was recorded.

        """
        return super().elapsed_time(end_event)

    def synchronize(self):
        r"""Wait for the event to complete.



        Waits until the completion of all work currently captured in this event.

        This prevents the CPU thread from proceeding until the event completes.



         .. note:: This is a wrapper around ``cudaEventSynchronize()``: see

            `CUDA Event documentation`_ for more info.

        """
        super().synchronize()

    def ipc_handle(self):
        r"""Return an IPC handle of this event.



        If not recorded yet, the event will use the current device.

        """
        return super().ipc_handle()

    @property
    def _as_parameter_(self):
        return ctypes.c_void_p(self.cuda_event)

    def __repr__(self):
        if self.cuda_event:
            return f"<torch.cuda.Event {self._as_parameter_.value:#x}>"
        else:
            return "<torch.cuda.Event uninitialized>"