Spaces:
Sleeping
Sleeping
File size: 10,882 Bytes
c61ccee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
import warnings
import torch
from torch.cuda import nccl
from torch._utils import _take_tensors, _flatten_dense_tensors, \
_unflatten_dense_tensors, _reorder_tensors_as, _get_device_index, _handle_complex
from typing import List
def broadcast(tensor, devices=None, *, out=None):
r"""Broadcasts a tensor to specified GPU devices.
Args:
tensor (Tensor): tensor to broadcast. Can be on CPU or GPU.
devices (Iterable[torch.device, str or int], optional): an iterable of
GPU devices, among which to broadcast.
out (Sequence[Tensor], optional, keyword-only): the GPU tensors to
store output results.
.. note::
Exactly one of :attr:`devices` and :attr:`out` must be specified.
Returns:
- If :attr:`devices` is specified,
a tuple containing copies of :attr:`tensor`, placed on
:attr:`devices`.
- If :attr:`out` is specified,
a tuple containing :attr:`out` tensors, each containing a copy of
:attr:`tensor`.
"""
tensor = _handle_complex(tensor)
if not ((devices is None) ^ (out is None)):
raise RuntimeError(
f"Exactly one of 'devices' and 'out' must be specified, but got devices={devices} and out={out}")
if devices is not None:
devices = [_get_device_index(d) for d in devices]
return torch._C._broadcast(tensor, devices)
else:
return torch._C._broadcast_out(tensor, out)
def broadcast_coalesced(tensors, devices, buffer_size=10485760):
"""Broadcast a sequence of tensors to the specified GPUs.
Small tensors are first coalesced into a buffer to reduce the number of synchronizations.
Args:
tensors (sequence): tensors to broadcast. Must be on the same device,
either CPU or GPU.
devices (Iterable[torch.device, str or int]): an iterable of GPU
devices, among which to broadcast.
buffer_size (int): maximum size of the buffer used for coalescing
Returns:
A tuple containing copies of :attr:`tensor`, placed on :attr:`devices`.
"""
devices = [_get_device_index(d) for d in devices]
tensors = [_handle_complex(t) for t in tensors]
return torch._C._broadcast_coalesced(tensors, devices, buffer_size)
def reduce_add(inputs, destination=None):
"""Sum tensors from multiple GPUs.
All inputs should have matching shapes, dtype, and layout. The output tensor
will be of the same shape, dtype, and layout.
Args:
inputs (Iterable[Tensor]): an iterable of tensors to add.
destination (int, optional): a device on which the output will be
placed (default: current device).
Returns:
A tensor containing an elementwise sum of all inputs, placed on the
:attr:`destination` device.
"""
destination = _get_device_index(destination, optional=True)
input_size = inputs[0].size()
root_index = None # index of input tensor that already is on the correct device
for i, inp in enumerate(inputs):
assert inp.device.type != "cpu", "reduce_add expects all inputs to be on GPUs"
if inp.get_device() == destination:
root_index = i
if inp.size() != input_size:
got = 'x'.join(str(x) for x in inp.size())
expected = 'x'.join(str(x) for x in input_size)
raise ValueError(f"input {i} has invalid size: got {got}, but expected {expected}")
if root_index is None:
raise RuntimeError("reduce_add expects destination to be on the same GPU with one of the tensors")
if len(inputs) == 1:
return inputs[0]
if nccl.is_available(inputs):
result = torch.empty_like(inputs[root_index])
nccl.reduce(inputs, output=result, root=root_index)
else:
destination_device = torch.device(inputs[root_index].device.type, destination)
nonroot = [t for i, t in enumerate(inputs) if i != root_index]
# make a new tensor w/o clone
result = inputs[root_index] + nonroot[0].to(device=destination_device, non_blocking=True)
for other in nonroot[1:]:
result.add_(other.to(device=destination_device, non_blocking=True))
return result
def reduce_add_coalesced(inputs, destination=None, buffer_size=10485760):
"""Sum tensors from multiple GPUs.
Small tensors are first coalesced into a buffer to reduce the number
of synchronizations.
Args:
inputs (Iterable[Iterable[Tensor]]): iterable of iterables that
contain tensors from a single device.
destination (int, optional): a device on which the output will be
placed (default: current device).
buffer_size (int): maximum size of the buffer used for coalescing
Returns:
A tuple of tensors containing an elementwise sum of each group of
inputs, placed on the ``destination`` device.
"""
# TODO: When `len(inputs) == 1` and all inputs are on `destination`, just
# return `inputs`.
dense_tensors: List[List] = [[] for _ in inputs] # shape (num_gpus, num_tensors)
output = []
ref_order = []
# process sparse ones first since they may have different sizes on different gpus
for tensor_at_gpus in zip(*inputs):
if all(t.is_sparse for t in tensor_at_gpus):
result = reduce_add(tensor_at_gpus, destination) # this will be sparse too
output.append(result)
ref_order.append(tensor_at_gpus[0])
else:
for coll, t in zip(dense_tensors, tensor_at_gpus):
coll.append(t.to_dense() if t.is_sparse else t)
ref_order.append(dense_tensors[0][-1])
itrs = [_take_tensors(tensors, buffer_size) for tensors in dense_tensors]
# now the dense ones, which have consistent sizes
for chunks in zip(*itrs):
flat_tensors = [_flatten_dense_tensors(chunk) for chunk in chunks] # (num_gpus,)
flat_result = reduce_add(flat_tensors, destination)
for t in _unflatten_dense_tensors(flat_result, chunks[0]):
# The unflattened tensors do not share storage, and we don't expose
# base flat tensor anyways, so give them different version counters.
# See NOTE [ Version Counter in comm.*_coalesced ]
output.append(t.data)
return tuple(_reorder_tensors_as(output, ref_order))
def scatter(tensor, devices=None, chunk_sizes=None, dim=0, streams=None, *, out=None):
"""Scatters tensor across multiple GPUs.
Args:
tensor (Tensor): tensor to scatter. Can be on CPU or GPU.
devices (Iterable[torch.device, str or int], optional): an iterable of
GPU devices, among which to scatter.
chunk_sizes (Iterable[int], optional): sizes of chunks to be placed on
each device. It should match :attr:`devices` in length and sums to
``tensor.size(dim)``. If not specified, :attr:`tensor` will be divided
into equal chunks.
dim (int, optional): A dimension along which to chunk :attr:`tensor`.
Default: ``0``.
streams (Iterable[torch.cuda.Stream], optional): an iterable of Streams, among
which to execute the scatter. If not specified, the default stream will
be utilized.
out (Sequence[Tensor], optional, keyword-only): the GPU tensors to
store output results. Sizes of these tensors must match that of
:attr:`tensor`, except for :attr:`dim`, where the total size must
sum to ``tensor.size(dim)``.
.. note::
Exactly one of :attr:`devices` and :attr:`out` must be specified. When
:attr:`out` is specified, :attr:`chunk_sizes` must not be specified and
will be inferred from sizes of :attr:`out`.
Returns:
- If :attr:`devices` is specified,
a tuple containing chunks of :attr:`tensor`, placed on
:attr:`devices`.
- If :attr:`out` is specified,
a tuple containing :attr:`out` tensors, each containing a chunk of
:attr:`tensor`.
"""
tensor = _handle_complex(tensor)
if out is None:
devices = [_get_device_index(d) for d in devices]
return tuple(torch._C._scatter(tensor, devices, chunk_sizes, dim, streams))
else:
if devices is not None:
raise RuntimeError(
f"'devices' must not be specified when 'out' is specified, but got devices={devices}")
if chunk_sizes is not None:
raise RuntimeError(
f"'chunk_sizes' must not be specified when 'out' is specified, but got chunk_sizes={chunk_sizes}")
return tuple(torch._C._scatter_out(tensor, out, dim, streams))
def gather(tensors, dim=0, destination=None, *, out=None):
r"""Gathers tensors from multiple GPU devices.
Args:
tensors (Iterable[Tensor]): an iterable of tensors to gather.
Tensor sizes in all dimensions other than :attr:`dim` have to match.
dim (int, optional): a dimension along which the tensors will be
concatenated. Default: ``0``.
destination (torch.device, str, or int, optional): the output device.
Can be CPU or CUDA. Default: the current CUDA device.
out (Tensor, optional, keyword-only): the tensor to store gather result.
Its sizes must match those of :attr:`tensors`, except for :attr:`dim`,
where the size must equal ``sum(tensor.size(dim) for tensor in tensors)``.
Can be on CPU or CUDA.
.. note::
:attr:`destination` must not be specified when :attr:`out` is specified.
Returns:
- If :attr:`destination` is specified,
a tensor located on :attr:`destination` device, that is a result of
concatenating :attr:`tensors` along :attr:`dim`.
- If :attr:`out` is specified,
the :attr:`out` tensor, now containing results of concatenating
:attr:`tensors` along :attr:`dim`.
"""
tensors = [_handle_complex(t) for t in tensors]
if out is None:
if destination == -1:
warnings.warn(
'Using -1 to represent CPU tensor is deprecated. Please use a '
'device object or string instead, e.g., "cpu".')
destination = _get_device_index(destination, allow_cpu=True, optional=True)
return torch._C._gather(tensors, dim, destination)
else:
if destination is not None:
raise RuntimeError(
f"'destination' must not be specified when 'out' is specified, but got destination={destination}")
return torch._C._gather_out(tensors, out, dim)
|