Spaces:
Sleeping
Sleeping
File size: 11,218 Bytes
c61ccee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import sys
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import torch.distributed.elastic.rendezvous.registry as rdzv_registry
from torch.distributed.elastic import events, metrics
from torch.distributed.elastic.agent.server.api import WorkerSpec
from torch.distributed.elastic.agent.server.local_elastic_agent import LocalElasticAgent
from torch.distributed.elastic.multiprocessing import DefaultLogsSpecs, LogsSpecs, SignalException
from torch.distributed.elastic.multiprocessing.errors import ChildFailedError
from torch.distributed.elastic.rendezvous import RendezvousParameters
from torch.distributed.elastic.rendezvous.utils import parse_rendezvous_endpoint
from torch.distributed.elastic.utils.logging import get_logger
__all__ = ['LaunchConfig', 'elastic_launch', 'launch_agent']
logger = get_logger(__name__)
@dataclass
class LaunchConfig:
"""
Creates a rendezvous config.
Args:
min_nodes: Minimum amount of nodes that the user function will
be launched on. Elastic agent ensures that the user
function start only when the min_nodes amount enters
the rendezvous.
max_nodes: Maximum amount of nodes that the user function
will be launched on.
nproc_per_node: On each node the elastic agent will launch
this amount of workers that will execute user
defined function.
rdzv_backend: rdzv_backend to use in the rendezvous (zeus-adapter, etcd).
rdzv_endpoint: The endpoint of the rdzv sync. storage.
rdzv_configs: Key, value pair that specifies rendezvous specific configuration.
rdzv_timeout: Legacy argument that specifies timeout for the rendezvous. It is going
to be removed in future versions, see the note below. The default timeout is 900 seconds.
run_id: The unique run id of the job (if not passed a unique one will be
deduced from run environment - flow workflow id in flow - or auto generated).
role: User defined role of the worker (defaults to "trainer").
max_restarts: The maximum amount of restarts that elastic agent will conduct
on workers before failure.
monitor_interval: The interval in seconds that is used by the elastic_agent
as a period of monitoring workers.
start_method: The method is used by the elastic agent to start the
workers (spawn, fork, forkserver).
metrics_cfg: configuration to initialize metrics.
local_addr: address of the local node if any. If not set, a lookup on the local
machine's FQDN will be performed.
local_ranks_filter: ranks for which to show logs in console. If not set, show from all.
..note:
`rdzv_timeout` is a legacy argument that will be removed in future.
Set the timeout via `rdzv_configs['timeout']`
"""
min_nodes: int
max_nodes: int
nproc_per_node: int
logs_specs: Optional[LogsSpecs] = None
run_id: str = ""
role: str = "default_role"
rdzv_endpoint: str = ""
rdzv_backend: str = "etcd"
rdzv_configs: Dict[str, Any] = field(default_factory=dict)
rdzv_timeout: int = -1
max_restarts: int = 3
monitor_interval: float = 30
start_method: str = "spawn"
log_line_prefix_template: Optional[str] = None
metrics_cfg: Dict[str, str] = field(default_factory=dict)
local_addr: Optional[str] = None
def __post_init__(self):
default_timeout = 900
if self.rdzv_timeout != -1:
self.rdzv_configs["timeout"] = self.rdzv_timeout
elif "timeout" not in self.rdzv_configs:
self.rdzv_configs["timeout"] = default_timeout
# Post-processing to enable refactoring to introduce logs_specs due to non-torchrun API usage
if self.logs_specs is None:
self.logs_specs = DefaultLogsSpecs()
class elastic_launch:
"""
Launches an torchelastic agent on the container that invoked the entrypoint.
1. Pass the ``entrypoint`` arguments as non ``kwargs`` (e.g. no named parameters)/
``entrypoint`` can be a function or a command.
2. The return value is a map of each worker's output mapped
by their respective global rank.
Usage
::
def worker_fn(foo):
# ...
def main():
# entrypoint is a function.
outputs = elastic_launch(LaunchConfig, worker_fn)(foo)
# return rank 0's output
return outputs[0]
# entrypoint is a command and ``script.py`` is the python module.
outputs = elastic_launch(LaunchConfig, "script.py")(args)
outputs = elastic_launch(LaunchConfig, "python")("script.py")
"""
def __init__(
self,
config: LaunchConfig,
entrypoint: Union[Callable, str, None],
):
self._config = config
self._entrypoint = entrypoint
def __call__(self, *args):
return launch_agent(self._config, self._entrypoint, list(args))
def _get_entrypoint_name(
entrypoint: Union[Callable, str, None], args: List[Any]
) -> str:
"""Retrieve entrypoint name with the rule:
1. If entrypoint is a function, use ``entrypoint.__qualname__``.
2. If entrypoint is a string, check its value:
2.1 if entrypoint equals to ``sys.executable`` (like "python"), use the first element from ``args``
which does not start with hifen letter (for example, "-u" will be skipped).
2.2 otherwise, use ``entrypoint`` value.
3. Otherwise, return empty string.
"""
if isinstance(entrypoint, Callable): # type: ignore[arg-type]
return entrypoint.__name__ # type: ignore[union-attr]
elif isinstance(entrypoint, str):
if entrypoint == sys.executable:
return next((arg for arg in args if arg[0] != "-"), "")
else:
return entrypoint
else:
return ""
def _get_addr_and_port(
rdzv_parameters: RendezvousParameters,
) -> Tuple[Optional[str], Optional[int]]:
if rdzv_parameters.backend != "static":
return (None, None)
endpoint = rdzv_parameters.endpoint
endpoint = endpoint.strip()
if not endpoint:
raise ValueError(
"Endpoint is missing in endpoint. Try to add --master-addr and --master-port"
)
master_addr, master_port = parse_rendezvous_endpoint(endpoint, default_port=-1)
if master_port == -1:
raise ValueError(
f"port is missing in endpoint: {endpoint}. Try to specify --master-port"
)
return (master_addr, master_port)
def launch_agent(
config: LaunchConfig,
entrypoint: Union[Callable, str, None],
args: List[Any],
) -> Dict[int, Any]:
if not config.run_id:
run_id = str(uuid.uuid4().int)
logger.warning("config has no run_id, generated a random run_id: %s", run_id)
config.run_id = run_id
entrypoint_name = _get_entrypoint_name(entrypoint, args)
logger.info(
"Starting elastic_operator with launch configs:\n"
" entrypoint : %(entrypoint)s\n"
" min_nodes : %(min_nodes)s\n"
" max_nodes : %(max_nodes)s\n"
" nproc_per_node : %(nproc_per_node)s\n"
" run_id : %(run_id)s\n"
" rdzv_backend : %(rdzv_backend)s\n"
" rdzv_endpoint : %(rdzv_endpoint)s\n"
" rdzv_configs : %(rdzv_configs)s\n"
" max_restarts : %(max_restarts)s\n"
" monitor_interval : %(monitor_interval)s\n"
" log_dir : %(log_dir)s\n"
" metrics_cfg : %(metrics_cfg)s\n",
{
"entrypoint": entrypoint_name,
"min_nodes": config.min_nodes,
"max_nodes": config.max_nodes,
"nproc_per_node": config.nproc_per_node,
"run_id": config.run_id,
"rdzv_backend": config.rdzv_backend,
"rdzv_endpoint": config.rdzv_endpoint,
"rdzv_configs": config.rdzv_configs,
"max_restarts": config.max_restarts,
"monitor_interval": config.monitor_interval,
"log_dir": config.logs_specs.root_log_dir, # type: ignore[union-attr]
"metrics_cfg": config.metrics_cfg
}
)
rdzv_parameters = RendezvousParameters(
backend=config.rdzv_backend,
endpoint=config.rdzv_endpoint,
run_id=config.run_id,
min_nodes=config.min_nodes,
max_nodes=config.max_nodes,
local_addr=config.local_addr,
**config.rdzv_configs,
)
master_addr, master_port = _get_addr_and_port(rdzv_parameters)
spec = WorkerSpec(
role=config.role,
local_world_size=config.nproc_per_node,
entrypoint=entrypoint,
args=tuple(args),
rdzv_handler=rdzv_registry.get_rendezvous_handler(rdzv_parameters),
max_restarts=config.max_restarts,
monitor_interval=config.monitor_interval,
master_addr=master_addr,
master_port=master_port,
local_addr=config.local_addr,
)
agent = LocalElasticAgent(
spec=spec,
logs_specs=config.logs_specs, # type: ignore[arg-type]
start_method=config.start_method,
log_line_prefix_template=config.log_line_prefix_template,
)
shutdown_rdzv = True
try:
metrics.initialize_metrics(metrics.MetricsConfig(config.metrics_cfg))
result = agent.run()
# records that agent.run() has succeeded NOT that workers have succeeded
events.record(agent.get_event_succeeded())
if result.is_failed():
# ChildFailedError is treated specially by @record
# if the error files for the failed children exist
# @record will copy the first error (root cause)
# to the error file of the launcher process.
raise ChildFailedError(
name=entrypoint_name,
failures=result.failures,
)
return result.return_values
except ChildFailedError:
raise
except SignalException:
# when the agent dies with a signal do NOT shutdown the rdzv_handler
# since this closes the rendezvous on this rdzv_id permanently and
# prevents any additional scaling events
shutdown_rdzv = False
events.record(agent.get_event_failed())
raise
except Exception:
events.record(agent.get_event_failed())
raise
finally:
if shutdown_rdzv:
spec.rdzv_handler.shutdown()
|