Spaces:
Runtime error
Runtime error
# Copyright 2021 The HuggingFace Team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import base64 | |
import json | |
import os | |
from copy import deepcopy | |
from ..optimizer import AcceleratedOptimizer | |
from ..scheduler import AcceleratedScheduler | |
class HfDeepSpeedConfig: | |
""" | |
This object contains a DeepSpeed configuration dictionary and can be quickly queried for things like zero stage. | |
A `weakref` of this object is stored in the module's globals to be able to access the config from areas where | |
things like the Trainer object is not available (e.g. `from_pretrained` and `_get_resized_embeddings`). Therefore | |
it's important that this object remains alive while the program is still running. | |
[`Trainer`] uses the `HfTrainerDeepSpeedConfig` subclass instead. That subclass has logic to sync the configuration | |
with values of [`TrainingArguments`] by replacing special placeholder values: `"auto"`. Without this special logic | |
the DeepSpeed configuration is not modified in any way. | |
Args: | |
config_file_or_dict (`Union[str, Dict]`): path to DeepSpeed config file or dict. | |
""" | |
def __init__(self, config_file_or_dict): | |
if isinstance(config_file_or_dict, dict): | |
# Don't modify user's data should they want to reuse it (e.g. in tests), because once we | |
# modified it, it will not be accepted here again, since `auto` values would have been overridden | |
config = deepcopy(config_file_or_dict) | |
elif os.path.exists(config_file_or_dict): | |
with open(config_file_or_dict, encoding="utf-8") as f: | |
config = json.load(f) | |
else: | |
try: | |
config_decoded = base64.urlsafe_b64decode(config_file_or_dict).decode("utf-8") | |
config = json.loads(config_decoded) | |
except (UnicodeDecodeError, AttributeError, ValueError): | |
raise ValueError( | |
f"Expected a string path to an existing deepspeed config, or a dictionary, or a base64 encoded string. Received: {config_file_or_dict}" | |
) | |
self.config = config | |
self.set_stage_and_offload() | |
def set_stage_and_offload(self): | |
# zero stage - this is done as early as possible, before model is created, to allow | |
# ``is_deepspeed_zero3_enabled`` query and getting to the early deepspeed config object | |
# during ``zero.Init()`` which needs to know the dtype, and some other hparams. | |
self._stage = self.get_value("zero_optimization.stage", -1) | |
# offload | |
self._offload = False | |
if self.is_zero2() or self.is_zero3(): | |
offload_devices_valid = set(["cpu", "nvme"]) | |
offload_devices = set( | |
[ | |
self.get_value("zero_optimization.offload_optimizer.device"), | |
self.get_value("zero_optimization.offload_param.device"), | |
] | |
) | |
if len(offload_devices & offload_devices_valid) > 0: | |
self._offload = True | |
def find_config_node(self, ds_key_long): | |
config = self.config | |
# find the config node of interest if it exists | |
nodes = ds_key_long.split(".") | |
ds_key = nodes.pop() | |
for node in nodes: | |
config = config.get(node) | |
if config is None: | |
return None, ds_key | |
return config, ds_key | |
def get_value(self, ds_key_long, default=None): | |
""" | |
Returns the set value or `default` if no value is set | |
""" | |
config, ds_key = self.find_config_node(ds_key_long) | |
if config is None: | |
return default | |
return config.get(ds_key, default) | |
def del_config_sub_tree(self, ds_key_long, must_exist=False): | |
""" | |
Deletes a sub-section of the config file if it's found. | |
Unless `must_exist` is `True` the section doesn't have to exist. | |
""" | |
config = self.config | |
# find the config node of interest if it exists | |
nodes = ds_key_long.split(".") | |
for node in nodes: | |
parent_config = config | |
config = config.get(node) | |
if config is None: | |
if must_exist: | |
raise ValueError(f"Can't find {ds_key_long} entry in the config: {self.config}") | |
else: | |
return | |
# if found remove it | |
if parent_config is not None: | |
parent_config.pop(node) | |
def is_true(self, ds_key_long): | |
""" | |
Returns `True`/``False` only if the value is set, always `False` otherwise. So use this method to ask the very | |
specific question of whether the value is set to `True` (and it's not set to `False`` or isn't set). | |
""" | |
value = self.get_value(ds_key_long) | |
return False if value is None else bool(value) | |
def is_false(self, ds_key_long): | |
""" | |
Returns `True`/``False` only if the value is set, always `False` otherwise. So use this method to ask the very | |
specific question of whether the value is set to `False` (and it's not set to `True`` or isn't set). | |
""" | |
value = self.get_value(ds_key_long) | |
return False if value is None else not bool(value) | |
def is_zero2(self): | |
return self._stage == 2 | |
def is_zero3(self): | |
return self._stage == 3 | |
def is_offload(self): | |
return self._offload | |
class DeepSpeedEngineWrapper: | |
""" | |
Internal wrapper for deepspeed.runtime.engine.DeepSpeedEngine. This is used to follow conventional training loop. | |
Args: | |
engine (deepspeed.runtime.engine.DeepSpeedEngine): deepspeed engine to wrap | |
""" | |
def __init__(self, engine): | |
self.engine = engine | |
def backward(self, loss, **kwargs): | |
# runs backpropagation and handles mixed precision | |
self.engine.backward(loss, **kwargs) | |
# Deepspeed's `engine.step` performs the following operations: | |
# - gradient accumulation check | |
# - gradient clipping | |
# - optimizer step | |
# - zero grad | |
# - checking overflow | |
# - lr_scheduler step (only if engine.lr_scheduler is not None) | |
self.engine.step() | |
# and this plugin overrides the above calls with no-ops when Accelerate runs under | |
# Deepspeed, but allows normal functionality for non-Deepspeed cases thus enabling a simple | |
# training loop that works transparently under many training regimes. | |
class DeepSpeedOptimizerWrapper(AcceleratedOptimizer): | |
""" | |
Internal wrapper around a deepspeed optimizer. | |
Args: | |
optimizer (`torch.optim.optimizer.Optimizer`): | |
The optimizer to wrap. | |
""" | |
def __init__(self, optimizer): | |
super().__init__(optimizer, device_placement=False, scaler=None) | |
self.__has_overflow__ = hasattr(self.optimizer, "overflow") | |
def zero_grad(self, set_to_none=None): | |
pass # `accelerator.backward(loss)` is doing that automatically. Therefore, its implementation is not needed | |
def step(self): | |
pass # `accelerator.backward(loss)` is doing that automatically. Therefore, its implementation is not needed | |
def step_was_skipped(self): | |
"""Whether or not the optimizer step was done, or skipped because of gradient overflow.""" | |
if self.__has_overflow__: | |
return self.optimizer.overflow | |
return False | |
class DeepSpeedSchedulerWrapper(AcceleratedScheduler): | |
""" | |
Internal wrapper around a deepspeed scheduler. | |
Args: | |
scheduler (`torch.optim.lr_scheduler.LambdaLR`): | |
The scheduler to wrap. | |
optimizers (one or a list of `torch.optim.Optimizer`): | |
""" | |
def __init__(self, scheduler, optimizers): | |
super().__init__(scheduler, optimizers) | |
def step(self): | |
pass # `accelerator.backward(loss)` is doing that automatically. Therefore, its implementation is not needed | |
class DummyOptim: | |
""" | |
Dummy optimizer presents model parameters or param groups, this is primarily used to follow conventional training | |
loop when optimizer config is specified in the deepspeed config file. | |
Args: | |
lr (float): | |
Learning rate. | |
params (iterable): iterable of parameters to optimize or dicts defining | |
parameter groups | |
weight_decay (float): | |
Weight decay. | |
**kwargs (additional keyword arguments, *optional*): | |
Other arguments. | |
""" | |
def __init__(self, params, lr=0.001, weight_decay=0, **kwargs): | |
self.params = params | |
self.lr = lr | |
self.weight_decay = weight_decay | |
self.kwargs = kwargs | |
class DummyScheduler: | |
""" | |
Dummy scheduler presents model parameters or param groups, this is primarily used to follow conventional training | |
loop when scheduler config is specified in the deepspeed config file. | |
Args: | |
optimizer (`torch.optim.optimizer.Optimizer`): | |
The optimizer to wrap. | |
total_num_steps (int, *optional*): | |
Total number of steps. | |
warmup_num_steps (int, *optional*): | |
Number of steps for warmup. | |
lr_scheduler_callable (callable, *optional*): | |
A callable function that creates an LR Scheduler. It accepts only one argument `optimizer`. | |
**kwargs (additional keyword arguments, *optional*): | |
Other arguments. | |
""" | |
def __init__(self, optimizer, total_num_steps=None, warmup_num_steps=0, lr_scheduler_callable=None, **kwargs): | |
self.optimizer = optimizer | |
self.total_num_steps = total_num_steps | |
self.warmup_num_steps = warmup_num_steps | |
self.lr_scheduler_callable = lr_scheduler_callable | |
self.kwargs = kwargs | |