Spaces:
Build error
Build error
# Copyright (c) Meta Platforms, Inc. and affiliates. | |
# All rights reserved. | |
# | |
# This source code is licensed under the license found in the | |
# LICENSE file in the root directory of this source tree. | |
import json | |
import os.path as osp | |
import time | |
from typing import Any, Callable, Dict, List, Optional, Union | |
import torch | |
try: | |
import deepspeed | |
except ImportError: | |
deepspeed = None | |
import torch.nn as nn | |
import mmengine | |
from mmengine.dist import init_dist | |
from mmengine.optim import BaseOptimWrapper, _ParamScheduler | |
from mmengine.registry import (MODEL_WRAPPERS, OPTIM_WRAPPERS, OPTIMIZERS, | |
STRATEGIES) | |
from mmengine.utils import get_git_hash | |
from .base import BaseStrategy | |
def register_deepspeed_optimizers() -> List[str]: | |
"""Register optimizers in ``deepspeed`` to the ``OPTIMIZERS`` registry. | |
Returns: | |
List[str]: A list of registered optimizers' name. | |
""" | |
deepspeed_optimizers = [] | |
try: | |
import deepspeed # noqa: F401 | |
except ImportError: | |
pass | |
else: | |
from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam | |
from deepspeed.ops.lamb import FusedLamb | |
from deepspeed.runtime.fp16.onebit import (OnebitAdam, OnebitLamb, | |
ZeroOneAdam) | |
OPTIMIZERS.register_module(module=DeepSpeedCPUAdam) | |
deepspeed_optimizers.append('DeepSpeedCPUAdam') | |
OPTIMIZERS.register_module(module=FusedAdam) | |
deepspeed_optimizers.append('FusedAdam') | |
OPTIMIZERS.register_module(module=FusedLamb) | |
deepspeed_optimizers.append('FusedLamb') | |
OPTIMIZERS.register_module(module=OnebitAdam) | |
deepspeed_optimizers.append('OnebitAdam') | |
OPTIMIZERS.register_module(module=OnebitLamb) | |
deepspeed_optimizers.append('OnebitLamb') | |
OPTIMIZERS.register_module(module=ZeroOneAdam) | |
deepspeed_optimizers.append('ZeroOneAdam') | |
return deepspeed_optimizers | |
class DeepSpeedOptimWrapper(BaseOptimWrapper): | |
def __init__(self, optimizer): | |
super().__init__(optimizer) | |
self._model = None | |
def model(self): | |
if self._model is None: | |
raise ValueError('model attribute should be set before accessing.') | |
return self._model | |
def model(self, value): | |
self._model = value | |
def update_params(self, loss) -> None: # type: ignore | |
"""Update parameters in :attr:`optimizer`.""" | |
self.backward(loss) | |
self.step() | |
def backward(self, loss: torch.Tensor, **kwargs) -> None: | |
""""Perform gradient back propagation.""" | |
self.model.backward(loss) | |
def zero_grad(self, **kwargs) -> None: | |
raise NotImplementedError( | |
'DeepSpeedOptimWrapper does not support zero_grad method ' | |
'currently.') | |
def step(self, **kwargs): | |
self.model.step() | |
def state_dict(self) -> dict: | |
state_dict = {} | |
if self.base_param_settings is not None: | |
state_dict['base_param_settings'] = self.base_param_settings | |
return state_dict | |
def load_state_dict(self, state_dict: dict) -> None: | |
base_param_settings = state_dict.pop('base_param_settings', None) | |
if base_param_settings is not None: | |
self.base_param_settings = base_param_settings | |
class MMDeepSpeedEngineWrapper: | |
def __init__( | |
self, | |
*, | |
model: 'deepspeed.DeepSpeedEngine', | |
inputs_to_half: Optional[List[Union[int, str]]] = None, | |
): | |
self.model = model | |
self._inputs_to_half = inputs_to_half | |
def __getattr__(self, name): | |
return getattr(self.model, name) | |
def train_step( | |
self, | |
data: Union[dict, tuple, list], | |
optim_wrapper: DeepSpeedOptimWrapper, | |
) -> Dict[str, torch.Tensor]: | |
data = self.model.module.data_preprocessor(data, training=True) | |
data = self._cast_inputs_half(data) | |
losses = self._run_forward(data, mode='loss') | |
parsed_loss, log_vars = self.model.module.parse_losses(losses) | |
optim_wrapper.update_params(parsed_loss) | |
return log_vars | |
def val_step(self, data: Union[dict, tuple, list]) -> list: | |
"""Gets the prediction of module during validation process. | |
Args: | |
data (dict or tuple or list): Data sampled from dataset. | |
Returns: | |
list: The predictions of given data. | |
""" | |
data = self.model.module.data_preprocessor(data, False) | |
data = self._cast_inputs_half(data) | |
return self._run_forward(data, mode='predict') | |
def test_step(self, data: Union[dict, tuple, list]) -> list: | |
"""Gets the predictions of module during testing process. | |
Args: | |
data (dict or tuple or list): Data sampled from dataset. | |
Returns: | |
list: The predictions of given data. | |
""" | |
data = self.model.module.data_preprocessor(data, False) | |
data = self._cast_inputs_half(data) | |
return self._run_forward(data, mode='predict') | |
def _run_forward(self, data: Union[dict, tuple, list], mode: str) -> Any: | |
"""Unpacks data for :meth:`forward` | |
Args: | |
data (dict or tuple or list): Data sampled from dataset. | |
mode (str): Mode of forward. | |
Returns: | |
dict or list: Results of training or testing mode. | |
""" | |
if isinstance(data, dict): | |
results = self.model(**data, mode=mode) | |
elif isinstance(data, (list, tuple)): | |
results = self.model(*data, mode=mode) | |
else: | |
raise TypeError('Output of `data_preprocessor` should be ' | |
f'list, tuple or dict, but got {type(data)}') | |
return results | |
def _cast_inputs_half(self, inputs: Union[list, tuple, dict, None]): | |
"""Cast inputs to half precision if needed. | |
Args: | |
inputs (list or tuple or dict or None): Inputs to be casted. | |
Returns: | |
list or tuple or dict or None: Casted inputs. | |
""" | |
if self._inputs_to_half is None: | |
return inputs | |
if isinstance(inputs, (list, tuple)): | |
new_inputs = [] | |
for i, v in enumerate(inputs): | |
if i in self._inputs_to_half: | |
new_inputs.append(v.half()) | |
else: | |
new_inputs.append(v) | |
return inputs.__class__(new_inputs) | |
elif isinstance(inputs, dict): | |
for k, v in inputs.items(): | |
if k in self._inputs_to_half: | |
inputs[k] = v.half() | |
return inputs | |
else: | |
raise TypeError('inputs should be list, tuple or dict, ' | |
f'but got {type(inputs)}') | |
class DeepSpeedStrategy(BaseStrategy): | |
"""Support training models with DeepSpeed. | |
Note: | |
The detailed usage of parameters can be found at | |
https://www.deepspeed.ai/docs/config-json/. | |
Args: | |
config (str or dict, optional): If it is a string, it is a path to load | |
config for deepspeed. Defaults to None. | |
zero_optimization (dict, optional): Enabling and configuring ZeRO | |
memory optimizations. Defaults to None. | |
gradient_clipping (float, optional): Enable gradient clipping with | |
value. Defaults to None. | |
fp16 (dict, optional): Configuration for using mixed precision/FP16 | |
training that leverages NVIDIA's Apex package. Defaults to None. | |
inputs_to_half (list[int or str], optional): Which inputs are to | |
converted to half precision. Defaults to None. | |
If ``fp16`` is enabled, it also should be set. | |
bf16 (dict, optional): Configuration for using bfloat16 floating-point | |
format as an alternative to FP16. Defaults to None. | |
amp (dict, optional): Configuration for using automatic mixed | |
precision (AMP) training that leverages NVIDIA's Apex AMP package. | |
Defaults to None. | |
activation_checkpointing (dict, optional): Reduce memory usage by | |
clearing activations of certain layers and recomputing them | |
during a backward pass. | |
Defaults to None. | |
aio (dict, optional): Configuring the asynchronous I/O module for | |
offloading parameter and optimizer states to persistent (NVMe) | |
storage. This module uses Linux native asynchronous I/O (libaio). | |
Defaults to None. | |
train_micro_batch_size_per_gpu (int, optional): Batch size to be | |
processed by one GPU in one step (without gradient accumulation). | |
Defaults to None. | |
gradient_accumulation_steps (int, optional): Number of training steps | |
to accumulate gradients before averaging and applying them. | |
Defaults to None. | |
""" | |
def __init__( | |
self, | |
*, | |
# the following args are for deepspeed | |
config: Union[str, dict, None] = None, | |
zero_optimization: Optional[dict] = None, | |
gradient_clipping: Optional[float] = None, | |
fp16: Optional[dict] = None, | |
inputs_to_half: Optional[List[Union[int, str]]] = None, | |
bf16: Optional[dict] = None, | |
amp: Optional[dict] = None, | |
activation_checkpointing: Optional[dict] = None, | |
aio: Optional[dict] = None, | |
train_micro_batch_size_per_gpu: Optional[int] = None, | |
gradient_accumulation_steps: Optional[int] = None, | |
# disable the log printed by deepseed | |
steps_per_print: int = 10000000000000, | |
# the following args are for BaseStrategy | |
**kwargs, | |
): | |
assert deepspeed is not None, \ | |
'DeepSpeed is not installed. Please check ' \ | |
'https://github.com/microsoft/DeepSpeed#installation.' | |
super().__init__(**kwargs) | |
self.config = self._parse_config(config) | |
if zero_optimization is not None: | |
self.config['zero_optimization'] = zero_optimization | |
if gradient_clipping is not None: | |
self.config['gradient_clipping'] = gradient_clipping | |
if fp16 is not None: | |
self.config['fp16'] = fp16 | |
if bf16 is not None: | |
self.config['bf16'] = bf16 | |
if amp is not None: | |
self.config['amp'] = amp | |
if activation_checkpointing is not None: | |
self.config['activation_checkpointing'] = activation_checkpointing | |
if aio is not None: | |
self.config['aio'] = aio | |
if train_micro_batch_size_per_gpu is not None: | |
self.config['train_micro_batch_size_per_gpu'] = \ | |
train_micro_batch_size_per_gpu | |
if gradient_accumulation_steps is not None: | |
self.config['gradient_accumulation_steps'] = \ | |
gradient_accumulation_steps | |
else: | |
self.config.setdefault('gradient_accumulation_steps', 1) | |
self.config['steps_per_print'] = steps_per_print | |
self._inputs_to_half = inputs_to_half | |
register_deepspeed_optimizers() | |
def _parse_config(self, config): | |
if config is None: | |
config = dict() | |
elif isinstance(config, str): | |
with open(config) as f: | |
config = json.load(f) | |
return config | |
def _setup_distributed( # type: ignore | |
self, | |
launcher: Optional[str] = None, | |
backend: str = 'nccl', | |
**kwargs, | |
): | |
"""Setup distributed environment. | |
Args: | |
launcher (str, optional): Way to launch multi processes. | |
DeepSpeedStrategy does not support the launcher argument. | |
backend (str): Communication Backends. Supported backends are | |
'nccl', 'gloo' and 'mpi'. Defaults to 'nccl'. | |
**kwargs: Other arguments for :func:`deepspeed.init_distributed`. | |
""" | |
init_dist(launcher, backend, init_backend='deepspeed', **kwargs) | |
def prepare( | |
self, | |
model: Union[nn.Module, dict], | |
*, | |
optim_wrapper: Union[BaseOptimWrapper, dict, None] = None, | |
param_scheduler: Union[_ParamScheduler, Dict, List, None] = None, | |
compile: Union[dict, bool] = False, | |
dispatch_kwargs: Optional[dict] = None, | |
): | |
"""Prepare model and some components. | |
Args: | |
model (:obj:`torch.nn.Module` or dict): The model to be run. It | |
can be a dict used for build a model. | |
Keyword Args: | |
optim_wrapper (BaseOptimWrapper or dict, optional): Computing the | |
gradient of model parameters and updating them. | |
Defaults to None. | |
See :meth:`build_optim_wrapper` for examples. | |
param_scheduler (_ParamScheduler or dict or list, optional): | |
Parameter scheduler for updating optimizer parameters. If | |
specified, :attr:`optim_wrapper` should also be specified. | |
Defaults to None. | |
See :meth:`build_param_scheduler` for examples. | |
compile (dict, optional): Config to compile model. | |
Defaults to False. Requires PyTorch>=2.0. | |
dispatch_kwargs (dict, optional): Kwargs to be passed to other | |
methods of Strategy. Defaults to None. | |
""" | |
if self._prepared: | |
return self._prepared_components() | |
assert dispatch_kwargs is not None | |
self.dispatch_kwargs.update(dispatch_kwargs) | |
model = self.build_model(model) | |
model = self._init_model_weights(model) | |
if optim_wrapper is not None: | |
self.optim_wrapper = self.build_optim_wrapper(optim_wrapper, model) | |
self.model = self._wrap_model(model) | |
self.optim_wrapper.model = self.model # type: ignore | |
else: | |
self.model = self._wrap_model(model) | |
if param_scheduler is not None: | |
self.param_schedulers = self.build_param_scheduler( | |
param_scheduler, self.optim_wrapper) | |
self._prepared = True | |
return self._prepared_components() | |
def _wrap_model(self, model: nn.Module) -> nn.Module: | |
if hasattr(self, 'optim_wrapper'): | |
engine, self.optim_wrapper.optimizer, *_ = deepspeed.initialize( | |
model=model, | |
optimizer=self.optim_wrapper.optimizer, | |
config=self.config) | |
else: | |
engine, *_ = deepspeed.initialize(model=model, config=self.config) | |
wrapper = MMDeepSpeedEngineWrapper( | |
model=engine, inputs_to_half=self._inputs_to_half) | |
return wrapper | |
def load_checkpoint( | |
self, | |
filename: str, | |
*, | |
map_location: Union[str, Callable] = 'cpu', | |
strict: bool = False, | |
revise_keys: list = [(r'^module.', '')], | |
callback: Optional[Callable] = None, | |
) -> dict: | |
"""Load checkpoint from given ``filename``. | |
Warning: | |
`map_localtion` and `callback` parameters are not supported yet. | |
Args: | |
filename (str): Accept local filepath, URL, ``torchvision://xxx``, | |
``open-mmlab://xxx``. | |
""" | |
self.logger.info(f'Load checkpoint from {filename}') | |
dirname, basename = osp.split(filename) | |
_, extra_ckpt = self.model.load_checkpoint( | |
dirname, tag=basename, load_optimizer_states=False) | |
return extra_ckpt | |
def resume( | |
self, | |
filename: str, | |
*, | |
resume_optimizer: bool = True, | |
resume_param_scheduler: bool = True, | |
map_location: Union[str, Callable] = 'default', | |
callback: Optional[Callable] = None, | |
) -> dict: | |
"""Resume training from given ``filename``. | |
Warning: | |
`map_location` and `callback` parameters are not supported yet. | |
Args: | |
filename (str): Accept local filepath. | |
Keyword Args: | |
resume_optimizer (bool): Whether to resume optimizer state. | |
Defaults to True. | |
resume_param_scheduler (bool): Whether to resume param scheduler | |
state. Defaults to True. | |
""" | |
self.logger.info(f'Resume checkpoint from {filename}') | |
dirname, basename = osp.split(filename) | |
_, extra_ckpt = self.model.load_checkpoint( | |
dirname, tag=basename, load_optimizer_states=resume_optimizer) | |
if resume_optimizer: | |
self.load_optim_state_dict(extra_ckpt.pop('optim_wrapper')) | |
if resume_param_scheduler and hasattr(self, 'param_schedulers'): | |
param_schedulers = extra_ckpt.pop('param_schedulers') | |
self.load_scheduler_state_dict(param_schedulers) | |
# resume random seed | |
resumed_seed = extra_ckpt['meta'].get('seed', None) | |
current_seed = self._randomness.get('seed') | |
if resumed_seed is not None and resumed_seed != current_seed: | |
if current_seed is not None: | |
self.logger.warning(f'The value of random seed in the ' | |
f'checkpoint "{resumed_seed}" is ' | |
f'different from the value in ' | |
f'`randomness` config "{current_seed}"') | |
self._randomness.update(seed=resumed_seed) | |
self._set_randomness(**self._randomness) | |
return extra_ckpt | |
def save_checkpoint( | |
self, | |
filename: str, | |
*, | |
save_optimizer: bool = True, | |
save_param_scheduler: bool = True, | |
extra_ckpt: Optional[dict] = None, | |
callback: Optional[Callable] = None, | |
) -> None: | |
"""Save checkpoint to given ``filename``. | |
Warning: | |
`save_optimizer` and `callback` parameters are not supported yet. | |
Args: | |
filename (str): Filename to save checkpoint. | |
Keyword Args: | |
save_param_scheduler (bool): Whether to save the param_scheduler | |
to the checkpoint. Defaults to True. | |
extra_ckpt (dict, optional): Extra checkpoint to save. | |
Defaults to None. | |
""" | |
if extra_ckpt is None: | |
extra_ckpt = dict() | |
if 'meta' not in extra_ckpt: | |
extra_ckpt['meta'] = dict() | |
extra_ckpt['meta'].update( | |
seed=self.seed, | |
time=time.strftime('%Y%m%d_%H%M%S', time.localtime()), | |
mmengine=mmengine.__version__ + get_git_hash(), | |
) | |
if save_optimizer and hasattr(self, 'optim_wrapper'): | |
# The key can not be 'optimizer', otherwise error will be thrown | |
# when loading or resuming checkpoint. | |
extra_ckpt['optim_wrapper'] = self.optim_state_dict() | |
if save_param_scheduler and hasattr(self, 'param_schedulers'): | |
extra_ckpt['param_schedulers'] = self.scheduler_state_dict() | |
dirname, basename = osp.split(filename) | |
self.model.save_checkpoint( | |
dirname, tag=basename, client_state=extra_ckpt, save_latest=False) | |