Spaces:
Runtime error
Runtime error
# Copyright 2021 The HuggingFace Team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import os | |
import sys | |
import tempfile | |
import torch | |
from .state import AcceleratorState, PartialState | |
from .utils import ( | |
PrecisionType, | |
PrepareForLaunch, | |
are_libraries_initialized, | |
check_cuda_p2p_ib_support, | |
get_gpu_info, | |
is_mps_available, | |
is_torch_version, | |
patch_environment, | |
) | |
from .utils.constants import ELASTIC_LOG_LINE_PREFIX_TEMPLATE_PYTORCH_VERSION | |
def test_launch(): | |
"Verify a `PartialState` can be initialized." | |
_ = PartialState() | |
def notebook_launcher( | |
function, | |
args=(), | |
num_processes=None, | |
mixed_precision="no", | |
use_port="29500", | |
master_addr="127.0.0.1", | |
node_rank=0, | |
num_nodes=1, | |
rdzv_backend="static", | |
rdzv_endpoint="", | |
rdzv_conf=None, | |
rdzv_id="none", | |
max_restarts=0, | |
monitor_interval=0.1, | |
log_line_prefix_template=None, | |
): | |
""" | |
Launches a training function, using several processes or multiple nodes if it's possible in the current environment | |
(TPU with multiple cores for instance). | |
<Tip warning={true}> | |
To use this function absolutely zero calls to a CUDA device must be made in the notebook session before calling. If | |
any have been made, you will need to restart the notebook and make sure no cells use any CUDA capability. | |
Setting `ACCELERATE_DEBUG_MODE="1"` in your environment will run a test before truly launching to ensure that none | |
of those calls have been made. | |
</Tip> | |
Args: | |
function (`Callable`): | |
The training function to execute. If it accepts arguments, the first argument should be the index of the | |
process run. | |
args (`Tuple`): | |
Tuple of arguments to pass to the function (it will receive `*args`). | |
num_processes (`int`, *optional*): | |
The number of processes to use for training. Will default to 8 in Colab/Kaggle if a TPU is available, to | |
the number of GPUs available otherwise. | |
mixed_precision (`str`, *optional*, defaults to `"no"`): | |
If `fp16` or `bf16`, will use mixed precision training on multi-GPU. | |
use_port (`str`, *optional*, defaults to `"29500"`): | |
The port to use to communicate between processes when launching a multi-GPU training. | |
master_addr (`str`, *optional*, defaults to `"127.0.0.1"`): | |
The address to use for communication between processes. | |
node_rank (`int`, *optional*, defaults to 0): | |
The rank of the current node. | |
num_nodes (`int`, *optional*, defaults to 1): | |
The number of nodes to use for training. | |
rdzv_backend (`str`, *optional*, defaults to `"static"`): | |
The rendezvous method to use, such as 'static' (the default) or 'c10d' | |
rdzv_endpoint (`str`, *optional*, defaults to `""`): | |
The endpoint of the rdzv sync. storage. | |
rdzv_conf (`Dict`, *optional*, defaults to `None`): | |
Additional rendezvous configuration. | |
rdzv_id (`str`, *optional*, defaults to `"none"`): | |
The unique run id of the job. | |
max_restarts (`int`, *optional*, defaults to 0): | |
The maximum amount of restarts that elastic agent will conduct on workers before failure. | |
monitor_interval (`float`, *optional*, defaults to 0.1): | |
The interval in seconds that is used by the elastic_agent as a period of monitoring workers. | |
log_line_prefix_template (`str`, *optional*, defaults to `None`): | |
The prefix template for elastic launch logging. Available from PyTorch 2.2.0. | |
Example: | |
```python | |
# Assume this is defined in a Jupyter Notebook on an instance with two GPUs | |
from accelerate import notebook_launcher | |
def train(*args): | |
# Your training function here | |
... | |
notebook_launcher(train, args=(arg1, arg2), num_processes=2, mixed_precision="fp16") | |
``` | |
""" | |
# Are we in a google colab or a Kaggle Kernel? | |
in_colab = False | |
in_kaggle = False | |
if any(key.startswith("KAGGLE") for key in os.environ.keys()): | |
in_kaggle = True | |
elif "IPython" in sys.modules: | |
in_colab = "google.colab" in str(sys.modules["IPython"].get_ipython()) | |
try: | |
mixed_precision = PrecisionType(mixed_precision.lower()) | |
except ValueError: | |
raise ValueError( | |
f"Unknown mixed_precision mode: {args.mixed_precision.lower()}. Choose between {PrecisionType.list()}." | |
) | |
if (in_colab or in_kaggle) and (os.environ.get("TPU_NAME", None) is not None): | |
# TPU launch | |
import torch_xla.distributed.xla_multiprocessing as xmp | |
if len(AcceleratorState._shared_state) > 0: | |
raise ValueError( | |
"To train on TPU in Colab or Kaggle Kernel, the `Accelerator` should only be initialized inside " | |
"your training function. Restart your notebook and make sure no cells initializes an " | |
"`Accelerator`." | |
) | |
if num_processes is None: | |
num_processes = 8 | |
launcher = PrepareForLaunch(function, distributed_type="TPU") | |
print(f"Launching a training on {num_processes} TPU cores.") | |
xmp.spawn(launcher, args=args, nprocs=num_processes, start_method="fork") | |
elif in_colab and get_gpu_info()[1] < 2: | |
# No need for a distributed launch otherwise as it's either CPU or one GPU. | |
if torch.cuda.is_available(): | |
print("Launching training on one GPU.") | |
else: | |
print("Launching training on one CPU.") | |
function(*args) | |
else: | |
if num_processes is None: | |
raise ValueError( | |
"You have to specify the number of GPUs you would like to use, add `num_processes=...` to your call." | |
) | |
if node_rank >= num_nodes: | |
raise ValueError("The node_rank must be less than the number of nodes.") | |
if num_processes > 1: | |
# Multi-GPU launch | |
from torch.distributed.launcher.api import LaunchConfig, elastic_launch | |
from torch.multiprocessing import start_processes | |
from torch.multiprocessing.spawn import ProcessRaisedException | |
if len(AcceleratorState._shared_state) > 0: | |
raise ValueError( | |
"To launch a multi-GPU training from your notebook, the `Accelerator` should only be initialized " | |
"inside your training function. Restart your notebook and make sure no cells initializes an " | |
"`Accelerator`." | |
) | |
# Check for specific libraries known to initialize CUDA that users constantly use | |
problematic_imports = are_libraries_initialized("bitsandbytes") | |
if len(problematic_imports) > 0: | |
err = ( | |
"Could not start distributed process. Libraries known to initialize CUDA upon import have been " | |
"imported already. Please keep these imports inside your training function to try and help with this:" | |
) | |
for lib_name in problematic_imports: | |
err += f"\n\t* `{lib_name}`" | |
raise RuntimeError(err) | |
patched_env = dict( | |
nproc=num_processes, | |
node_rank=node_rank, | |
world_size=num_nodes * num_processes, | |
master_addr=master_addr, | |
master_port=use_port, | |
mixed_precision=mixed_precision, | |
) | |
# Check for CUDA P2P and IB issues | |
if not check_cuda_p2p_ib_support(): | |
patched_env["nccl_p2p_disable"] = "1" | |
patched_env["nccl_ib_disable"] = "1" | |
# torch.distributed will expect a few environment variable to be here. We set the ones common to each | |
# process here (the other ones will be set be the launcher). | |
with patch_environment(**patched_env): | |
# First dummy launch | |
if os.environ.get("ACCELERATE_DEBUG_MODE", "false").lower() == "true": | |
launcher = PrepareForLaunch(test_launch, distributed_type="MULTI_GPU") | |
try: | |
start_processes(launcher, args=(), nprocs=num_processes, start_method="fork") | |
except ProcessRaisedException as e: | |
err = "An issue was found when verifying a stable environment for the notebook launcher." | |
if "Cannot re-initialize CUDA in forked subprocess" in e.args[0]: | |
raise RuntimeError( | |
f"{err}" | |
"This likely stems from an outside import causing issues once the `notebook_launcher()` is called. " | |
"Please review your imports and test them when running the `notebook_launcher()` to identify " | |
"which one is problematic and causing CUDA to be initialized." | |
) from e | |
else: | |
raise RuntimeError(f"{err} The following error was raised: {e}") from e | |
# Now the actual launch | |
launcher = PrepareForLaunch(function, distributed_type="MULTI_GPU") | |
print(f"Launching training on {num_processes} GPUs.") | |
try: | |
if rdzv_conf is None: | |
rdzv_conf = {} | |
if rdzv_backend == "static": | |
rdzv_conf["rank"] = node_rank | |
if not rdzv_endpoint: | |
rdzv_endpoint = f"{master_addr}:{use_port}" | |
launch_config_kwargs = dict( | |
min_nodes=num_nodes, | |
max_nodes=num_nodes, | |
nproc_per_node=num_processes, | |
run_id=rdzv_id, | |
rdzv_endpoint=rdzv_endpoint, | |
rdzv_backend=rdzv_backend, | |
rdzv_configs=rdzv_conf, | |
max_restarts=max_restarts, | |
monitor_interval=monitor_interval, | |
start_method="fork", | |
) | |
if is_torch_version(">=", ELASTIC_LOG_LINE_PREFIX_TEMPLATE_PYTORCH_VERSION): | |
launch_config_kwargs["log_line_prefix_template"] = log_line_prefix_template | |
elastic_launch(config=LaunchConfig(**launch_config_kwargs), entrypoint=function)(*args) | |
except ProcessRaisedException as e: | |
if "Cannot re-initialize CUDA in forked subprocess" in e.args[0]: | |
raise RuntimeError( | |
"CUDA has been initialized before the `notebook_launcher` could create a forked subprocess. " | |
"This likely stems from an outside import causing issues once the `notebook_launcher()` is called. " | |
"Please review your imports and test them when running the `notebook_launcher()` to identify " | |
"which one is problematic and causing CUDA to be initialized." | |
) from e | |
else: | |
raise RuntimeError(f"An issue was found when launching the training: {e}") from e | |
else: | |
# No need for a distributed launch otherwise as it's either CPU, GPU or MPS. | |
if is_mps_available(): | |
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1" | |
print("Launching training on MPS.") | |
elif torch.cuda.is_available(): | |
print("Launching training on one GPU.") | |
else: | |
print("Launching training on CPU.") | |
function(*args) | |
def debug_launcher(function, args=(), num_processes=2): | |
""" | |
Launches a training function using several processes on CPU for debugging purposes. | |
<Tip warning={true}> | |
This function is provided for internal testing and debugging, but it's not intended for real trainings. It will | |
only use the CPU. | |
</Tip> | |
Args: | |
function (`Callable`): | |
The training function to execute. | |
args (`Tuple`): | |
Tuple of arguments to pass to the function (it will receive `*args`). | |
num_processes (`int`, *optional*, defaults to 2): | |
The number of processes to use for training. | |
""" | |
from torch.multiprocessing import start_processes | |
with tempfile.NamedTemporaryFile() as tmp_file: | |
# torch.distributed will expect a few environment variable to be here. We set the ones common to each | |
# process here (the other ones will be set be the launcher). | |
with patch_environment( | |
world_size=num_processes, | |
master_addr="127.0.0.1", | |
master_port="29500", | |
accelerate_mixed_precision="no", | |
accelerate_debug_rdv_file=tmp_file.name, | |
accelerate_use_cpu="yes", | |
): | |
launcher = PrepareForLaunch(function, debug=True) | |
start_processes(launcher, args=args, nprocs=num_processes, start_method="fork") | |