AlexK-PL commited on
Commit
b58b814
·
1 Parent(s): 49e3642

Delete distributed.py

Browse files
Files changed (1) hide show
  1. distributed.py +0 -180
distributed.py DELETED
@@ -1,180 +0,0 @@
1
- import torch
2
- import torch.distributed as dist
3
- from torch.nn.modules import Module
4
- from torch.autograd import Variable
5
-
6
-
7
- def _flatten_dense_tensors(tensors):
8
- """Flatten dense tensors into a contiguous 1D buffer. Assume tensors are of
9
- same dense type.
10
- Since inputs are dense, the resulting tensor will be a concatenated 1D
11
- buffer. Element-wise operation on this buffer will be equivalent to
12
- operating individually.
13
- Arguments:
14
- tensors (Iterable[Tensor]): dense tensors to flatten.
15
- Returns:
16
- A contiguous 1D buffer containing input tensors.
17
- """
18
- if len(tensors) == 1:
19
- return tensors[0].contiguous().view(-1)
20
- flat = torch.cat([t.contiguous().view(-1) for t in tensors], dim=0)
21
- return flat
22
-
23
-
24
- def _unflatten_dense_tensors(flat, tensors):
25
- """View a flat buffer using the sizes of tensors. Assume that tensors are of
26
- same dense type, and that flat is given by _flatten_dense_tensors.
27
- Arguments:
28
- flat (Tensor): flattened dense tensors to unflatten.
29
- tensors (Iterable[Tensor]): dense tensors whose sizes will be used to
30
- unflatten flat.
31
- Returns:
32
- Unflattened dense tensors with sizes same as tensors and values from
33
- flat.
34
- """
35
- outputs = []
36
- offset = 0
37
- for tensor in tensors:
38
- numel = tensor.numel()
39
- outputs.append(flat.narrow(0, offset, numel).view_as(tensor))
40
- offset += numel
41
- return tuple(outputs)
42
-
43
-
44
- '''
45
- This version of DistributedDataParallel is designed to be used in conjunction with the multiproc.py
46
- launcher included with this example. It assumes that your run is using multiprocess with 1
47
- GPU/process, that the model is on the correct device, and that torch.set_device has been
48
- used to set the device.
49
- Parameters are broadcasted to the other processes on initialization of DistributedDataParallel,
50
- and will be allreduced at the finish of the backward pass.
51
- '''
52
-
53
-
54
- class DistributedDataParallel(Module):
55
-
56
- def __init__(self, module):
57
- super(DistributedDataParallel, self).__init__()
58
- # fallback for PyTorch 0.3
59
- if not hasattr(dist, '_backend'):
60
- self.warn_on_half = True
61
- else:
62
- self.warn_on_half = True if dist._backend == dist.dist_backend.GLOO else False
63
-
64
- self.module = module
65
-
66
- for p in self.module.state_dict().values():
67
- if not torch.is_tensor(p):
68
- continue
69
- dist.broadcast(p, 0)
70
-
71
- def allreduce_params():
72
- if(self.needs_reduction):
73
- self.needs_reduction = False
74
- buckets = {}
75
- for param in self.module.parameters():
76
- if param.requires_grad and param.grad is not None:
77
- tp = type(param.data)
78
- if tp not in buckets:
79
- buckets[tp] = []
80
- buckets[tp].append(param)
81
- if self.warn_on_half:
82
- if torch.cuda.HalfTensor in buckets:
83
- print("WARNING: gloo dist backend for half parameters may be extremely slow." +
84
- " It is recommended to use the NCCL backend in this case. This currently requires" +
85
- "PyTorch built from top of tree master.")
86
- self.warn_on_half = False
87
-
88
- for tp in buckets:
89
- bucket = buckets[tp]
90
- grads = [param.grad.data for param in bucket]
91
- coalesced = _flatten_dense_tensors(grads)
92
- dist.all_reduce(coalesced)
93
- coalesced /= dist.get_world_size()
94
- for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)):
95
- buf.copy_(synced)
96
-
97
- for param in list(self.module.parameters()):
98
- def allreduce_hook(*unused):
99
- param._execution_engine.queue_callback(allreduce_params)
100
- if param.requires_grad:
101
- param.register_hook(allreduce_hook)
102
-
103
- def forward(self, *inputs, **kwargs):
104
- self.needs_reduction = True
105
- return self.module(*inputs, **kwargs)
106
-
107
- '''
108
- def _sync_buffers(self):
109
- buffers = list(self.module._all_buffers())
110
- if len(buffers) > 0:
111
- # cross-node buffer sync
112
- flat_buffers = _flatten_dense_tensors(buffers)
113
- dist.broadcast(flat_buffers, 0)
114
- for buf, synced in zip(buffers, _unflatten_dense_tensors(flat_buffers, buffers)):
115
- buf.copy_(synced)
116
- def train(self, mode=True):
117
- # Clear NCCL communicator and CUDA event cache of the default group ID,
118
- # These cache will be recreated at the later call. This is currently a
119
- # work-around for a potential NCCL deadlock.
120
- if dist._backend == dist.dist_backend.NCCL:
121
- dist._clear_group_cache()
122
- super(DistributedDataParallel, self).train(mode)
123
- self.module.train(mode)
124
- '''
125
-
126
-
127
- '''
128
- Modifies existing model to do gradient allreduce, but doesn't change class
129
- so you don't need "module"
130
- '''
131
-
132
-
133
- def apply_gradient_allreduce(module):
134
- if not hasattr(dist, '_backend'):
135
- module.warn_on_half = True
136
- else:
137
- module.warn_on_half = True if dist._backend == dist.dist_backend.GLOO else False
138
-
139
- for p in module.state_dict().values():
140
- if not torch.is_tensor(p):
141
- continue
142
- dist.broadcast(p, 0)
143
-
144
- def allreduce_params():
145
- if module.needs_reduction:
146
- module.needs_reduction = False
147
- buckets = {}
148
- for param in module.parameters():
149
- if param.requires_grad and param.grad is not None:
150
- tp = type(param.data)
151
- if tp not in buckets:
152
- buckets[tp] = []
153
- buckets[tp].append(param)
154
- if module.warn_on_half:
155
- if torch.cuda.HalfTensor in buckets:
156
- print("WARNING: gloo dist backend for half parameters may be extremely slow." +
157
- " It is recommended to use the NCCL backend in this case. This currently requires" +
158
- "PyTorch built from top of tree master.")
159
- module.warn_on_half = False
160
-
161
- for tp in buckets:
162
- bucket = buckets[tp]
163
- grads = [param.grad.data for param in bucket]
164
- coalesced = _flatten_dense_tensors(grads)
165
- dist.all_reduce(coalesced)
166
- coalesced /= dist.get_world_size()
167
- for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)):
168
- buf.copy_(synced)
169
-
170
- for param in list(module.parameters()):
171
- def allreduce_hook(*unused):
172
- Variable._execution_engine.queue_callback(allreduce_params)
173
- if param.requires_grad:
174
- param.register_hook(allreduce_hook)
175
-
176
- def set_needs_reduction(self, input, output):
177
- self.needs_reduction = True
178
-
179
- module.register_forward_hook(set_needs_reduction)
180
- return module