Spaces:
Sleeping
Sleeping
File size: 8,561 Bytes
c61ccee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
import ctypes
import torch
from torch._streambase import _EventBase, _StreamBase
from .._utils import _dummy_type
if not hasattr(torch._C, "_CudaStreamBase"):
# Define dummy base classes
torch._C.__dict__["_CudaStreamBase"] = _dummy_type("_CudaStreamBase")
torch._C.__dict__["_CudaEventBase"] = _dummy_type("_CudaEventBase")
class Stream(torch._C._CudaStreamBase, _StreamBase):
r"""Wrapper around a CUDA stream.
A CUDA stream is a linear sequence of execution that belongs to a specific
device, independent from other streams. See :ref:`cuda-semantics` for
details.
Args:
device(torch.device or int, optional): a device on which to allocate
the stream. If :attr:`device` is ``None`` (default) or a negative
integer, this will use the current device.
priority(int, optional): priority of the stream, should be 0 or
negative, where negative numbers indicate higher priority. By default,
streams have priority 0.
"""
def __new__(cls, device=None, priority=0, **kwargs):
# setting device manager is expensive, so we avoid it unless necessary
if device is None or ("stream_id" in kwargs and "device_index" in kwargs):
return super().__new__(cls, priority=priority, **kwargs)
else:
with torch.cuda.device(device):
return super().__new__(cls, priority=priority, **kwargs)
def wait_event(self, event):
r"""Make all future work submitted to the stream wait for an event.
Args:
event (torch.cuda.Event): an event to wait for.
.. note:: This is a wrapper around ``cudaStreamWaitEvent()``: see
`CUDA Stream documentation`_ for more info.
This function returns without waiting for :attr:`event`: only future
operations are affected.
.. _CUDA Stream documentation:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__STREAM.html
"""
event.wait(self)
def wait_stream(self, stream):
r"""Synchronize with another stream.
All future work submitted to this stream will wait until all kernels
submitted to a given stream at the time of call complete.
Args:
stream (Stream): a stream to synchronize.
.. note:: This function returns without waiting for currently enqueued
kernels in :attr:`stream`: only future operations are affected.
"""
self.wait_event(stream.record_event())
def record_event(self, event=None):
r"""Record an event.
Args:
event (torch.cuda.Event, optional): event to record. If not given, a new one
will be allocated.
Returns:
Recorded event.
"""
if event is None:
event = Event()
event.record(self)
return event
def query(self):
r"""Check if all the work submitted has been completed.
Returns:
A boolean indicating if all kernels in this stream are completed.
"""
return super().query()
def synchronize(self):
r"""Wait for all the kernels in this stream to complete.
.. note:: This is a wrapper around ``cudaStreamSynchronize()``: see
`CUDA Stream documentation`_ for more info.
"""
super().synchronize()
@property
def _as_parameter_(self):
return ctypes.c_void_p(self.cuda_stream)
def __eq__(self, o):
if isinstance(o, Stream):
return super().__eq__(o)
return False
def __hash__(self):
return hash((self.cuda_stream, self.device))
def __repr__(self):
return f"<torch.cuda.Stream device={self.device} cuda_stream={self.cuda_stream:#x}>"
class ExternalStream(Stream):
r"""Wrapper around an externally allocated CUDA stream.
This class is used to wrap streams allocated in other libraries in order
to facilitate data exchange and multi-library interactions.
.. note:: This class doesn't manage the stream life-cycle, it is the user
responsibility to keep the referenced stream alive while this class is
being used.
Args:
stream_ptr(int): Integer representation of the `cudaStream_t` value.
allocated externally.
device(torch.device or int, optional): the device where the stream
was originally allocated. if device is specified incorrectly,
subsequent launches using this stream may fail.
"""
def __new__(cls, stream_ptr, device=None, **kwargs):
with torch.cuda.device(device):
return super().__new__(cls, stream_ptr=stream_ptr, **kwargs)
class Event(torch._C._CudaEventBase, _EventBase):
r"""Wrapper around a CUDA event.
CUDA events are synchronization markers that can be used to monitor the
device's progress, to accurately measure timing, and to synchronize CUDA
streams.
The underlying CUDA events are lazily initialized when the event is first
recorded or exported to another process. After creation, only streams on the
same device may record the event. However, streams on any device can wait on
the event.
Args:
enable_timing (bool, optional): indicates if the event should measure time
(default: ``False``)
blocking (bool, optional): if ``True``, :meth:`wait` will be blocking (default: ``False``)
interprocess (bool): if ``True``, the event can be shared between processes
(default: ``False``)
.. _CUDA Event Documentation:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EVENT.html
"""
def __new__(cls, enable_timing=False, blocking=False, interprocess=False):
return super().__new__(
cls,
enable_timing=enable_timing,
blocking=blocking,
interprocess=interprocess,
)
@classmethod
def from_ipc_handle(cls, device, handle):
r"""Reconstruct an event from an IPC handle on the given device."""
return super().from_ipc_handle(device, handle)
def record(self, stream=None):
r"""Record the event in a given stream.
Uses ``torch.cuda.current_stream()`` if no stream is specified. The
stream's device must match the event's device.
"""
if stream is None:
stream = torch.cuda.current_stream()
super().record(stream)
def wait(self, stream=None):
r"""Make all future work submitted to the given stream wait for this event.
Use ``torch.cuda.current_stream()`` if no stream is specified.
.. note:: This is a wrapper around ``cudaStreamWaitEvent()``: see
`CUDA Event documentation`_ for more info.
"""
if stream is None:
stream = torch.cuda.current_stream()
super().wait(stream)
def query(self):
r"""Check if all work currently captured by event has completed.
Returns:
A boolean indicating if all work currently captured by event has
completed.
"""
return super().query()
def elapsed_time(self, end_event):
r"""Return the time elapsed.
Time reported in milliseconds after the event was recorded and
before the end_event was recorded.
"""
return super().elapsed_time(end_event)
def synchronize(self):
r"""Wait for the event to complete.
Waits until the completion of all work currently captured in this event.
This prevents the CPU thread from proceeding until the event completes.
.. note:: This is a wrapper around ``cudaEventSynchronize()``: see
`CUDA Event documentation`_ for more info.
"""
super().synchronize()
def ipc_handle(self):
r"""Return an IPC handle of this event.
If not recorded yet, the event will use the current device.
"""
return super().ipc_handle()
@property
def _as_parameter_(self):
return ctypes.c_void_p(self.cuda_event)
def __repr__(self):
if self.cuda_event:
return f"<torch.cuda.Event {self._as_parameter_.value:#x}>"
else:
return "<torch.cuda.Event uninitialized>"
|