Spaces:
Build error
Build error
File size: 14,820 Bytes
28c256d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
import logging
import os
import os.path as osp
import sys
from typing import Callable, Optional, Union
import torch
from mmengine.dist import master_only
from mmengine.hooks import Hook
from mmengine.logging import print_log
from mmengine.registry import HOOKS
def check_kineto() -> bool: # noqa
kineto_exist = False
try:
if torch.autograd.kineto_available():
kineto_exist = True
except AttributeError:
print_log('NO KINETO', logger='current', level=logging.WARNING)
return kineto_exist
@HOOKS.register_module()
class ProfilerHook(Hook):
"""A hook to analyze performance during training and inference.
PyTorch Profiler is a tool that allows the collection of the performance
metrics during the training. More details on Profiler can be found at
`official docs <https://pytorch.org/docs/stable/profiler.html
#torch.profiler.profile>`_
Args:
by_epoch (bool): Profile performance by epoch or by iteration.
Defaults to True.
profile_times (int): The period (epoch/iter) recorded by the profiler.
Defaults to 1. For example, profile_iters=10 and by_epoch=False,
indicate that 0-10 iterations are recorded.
activity_with_cpu (bool): Activities to be used in the analysis (CPU)
activity_with_cuda (bool): Activities to be used in the analysis (CUDA)
schedule (dict, optional): Key-word arguments passed to
`torch.profile.schedule <https://pytorch.org/docs/stable/
profiler.html#torch.profiler.schedule>`_.
Defaults to None, which means profiling without a schedule
on_trace_ready (callable, dict, optional): Either a handler or a dict
of generating handler. Defaults to None, which means profiling
without an on_trace_ready.The Callable type needs to construct its
own function that can handle 'torch.autograd.profiler.profile'.
Two officially recommended ways are provided:
- ``schedule=dict(type='log_trace')``: Print the profiling result
in the terminal. See more details in the `PyTorch official tutorial`_.
The configurable arguments are the same as
``prof.key_averages().table``
- ``scheduler=dict(type='tb_trace')``: Profile the performance
with tensorboard. See more details in the tutorial
`profile with tensorboard`_.
record_shapes (bool): Save information about operator's input shapes.
Defaults to False.
profile_memory (bool): Track tensor memory allocation/deallocation.
Defaults to False.
with_stack (bool): Record source information (file and line number)
for the ops. Defaults to False.
with_flops (bool): Use formula to estimate the FLOPS of specific
operators (matrix multiplication and 2D convolution).
Defaults to False.
json_trace_path (str, optional): Exports the collected trace in Chrome
JSON format. Chrome use 'chrome://tracing' view json file.
Defaults to None, which means profiling does not store json files.
Warnings:
The profiler will be closed after ``profile_times`` iterations
automatically. Please make sure the configuration of your scheduler
will not close the profiler before the iteration reach the value of
``profile_times``
Examples:
>>> # tensorboard trace
>>> trace_config = dict(type='tb_trace')
>>> profiler_hook_cfg = dict(on_trace_ready=trace_config)
.. _PyTorch official tutorial: https://pytorch.org/tutorials/recipes/recipes/profiler_recipe.html#using-profiler-to-analyze-execution-time
.. _profile with tensorboard: https://pytorch.org/tutorials/intermediate/tensorboard_profiler_tutorial.html#pytorch-profiler-with-tensorboard
""" # noqa: E501
priority = 'VERY_LOW'
def __init__(self,
*,
by_epoch: bool = True,
profile_times: int = 1,
activity_with_cpu: bool = True,
activity_with_cuda: bool = False,
schedule: Optional[dict] = None,
on_trace_ready: Union[Callable, dict, None] = None,
record_shapes: bool = False,
profile_memory: bool = False,
with_stack: bool = False,
with_flops: bool = False,
json_trace_path: Optional[str] = None) -> None:
try:
from torch import profiler
except ImportError:
raise ImportError('please upgrade torch above 1.8.1')
if not check_kineto():
raise ImportError('Due to Kineto support issues, please upgrade '
'pytorch above 1.8.1(windows users above 1.9.1)')
assert isinstance(by_epoch, bool), '``by_epoch`` should be a boolean.'
self.by_epoch = by_epoch
if profile_times < 1:
raise ValueError('profile_iters should be greater than 0, '
f'but got {profile_times}')
if by_epoch and profile_times > 1:
raise ValueError(
f'Profiler will profile 0-{profile_times} epochs.\n'
'Since profiler will slow down the training, it is recommended'
' to train 1 epoch with ProfilerHook and adjust your setting '
'according to the profiler summary.\n'
'During normal training(epoch > 1), '
'you may disable the ProfilerHook.')
self.profile_times = profile_times
assert isinstance(activity_with_cpu, bool), \
'``activity_with_cpu`` should be a boolean.'
assert isinstance(activity_with_cuda, bool), \
'``activity_with_cuda`` should be a boolean.'
self.activities = []
if activity_with_cpu:
self.activities.append(profiler.ProfilerActivity.CPU)
if activity_with_cuda:
self.activities.append(profiler.ProfilerActivity.CUDA)
if schedule is not None:
assert isinstance(schedule, dict), '``schedule`` should be a dict.'
self.schedule = profiler.schedule(**schedule)
else:
self.schedule = None
self.on_trace_ready = on_trace_ready
self.record_shapes = record_shapes
self.profile_memory = profile_memory
self.with_stack = with_stack
self.with_flops = with_flops
self.json_trace_path = json_trace_path
self._closed = False
def before_run(self, runner):
"""Initialize the profiler.
Through the runner parameter, the validity of the parameter is further
determined.
"""
max_times = runner.max_epochs if self.by_epoch else runner.max_iters
if max_times < self.profile_times:
raise ValueError(
f'``profile_times`` should not be greater than {max_times}')
on_trace_ready = self._parse_trace_config(runner)
self.profiler = torch.profiler.profile( # noqa
activities=self.activities,
schedule=self.schedule,
on_trace_ready=on_trace_ready,
record_shapes=self.record_shapes,
profile_memory=self.profile_memory,
with_stack=self.with_stack,
with_flops=self.with_flops)
self.profiler.__enter__()
runner.logger.info('profiler is profiling...')
def _parse_trace_config(self, runner):
"""Used to parse the parameter 'on_trace_ready'."""
if self.on_trace_ready is None:
_on_trace_ready = None
elif callable(self.on_trace_ready):
_on_trace_ready = self.on_trace_ready
elif isinstance(self.on_trace_ready, dict):
trace_cfg = self.on_trace_ready.copy()
trace_type = trace_cfg.pop('type')
# Build a log printing handle
if trace_type == 'log_trace':
def _log_handler(_profile):
print(_profile.key_averages().table(**trace_cfg))
_on_trace_ready = _log_handler
elif trace_type == 'tb_trace': # tensorboard_trace handler
try:
import torch_tb_profiler # noqa: F401
except ImportError:
raise ImportError(
'please run ``pip install torch-tb-profiler``')
if 'dir_name' not in trace_cfg:
trace_cfg['dir_name'] = osp.join(runner.log_dir,
'tf_tracing_logs')
elif not osp.isabs(trace_cfg['dir_name']):
trace_cfg['dir_name'] = osp.join(runner.log_dir,
trace_cfg['dir_name'])
runner.logger.info('trace_files of ProfilerHook will be '
f'saved to {trace_cfg["dir_name"]}.')
if self.json_trace_path is not None:
runner.logger.warn(
'When using tensorboard_trace, it is recommended to '
'save json files by setting ``worker_name`` instead of'
' setting ``json_trace_path``')
_on_trace_ready = torch.profiler.tensorboard_trace_handler(
**trace_cfg)
else:
raise ValueError('trace_type should be "log_trace" or '
f'"tb_trace", but got {trace_type}')
else:
raise ValueError(
'``on_trace_ready`` should be a handler, or dict, or None, '
f'but got {self.on_trace_ready}')
return _on_trace_ready
def after_train_epoch(self, runner):
"""Determine if the content is exported."""
# `after_train_epoch` will also be called in IterBasedTrainLoop.
# Here we check `self._closed` to avoid exiting twice.
if not self._closed:
self._export_chrome_trace(runner)
def after_train_iter(self, runner, batch_idx, data_batch, outputs):
"""profiler will call `step` method if it is not closed."""
if not self._closed:
self.profiler.step()
if runner.iter == self.profile_times - 1 and not self.by_epoch:
self._export_chrome_trace(runner)
def _export_chrome_trace(self, runner):
"""Exporting content."""
self._closed = True
runner.logger.info('profiler may take a few minutes...')
self.profiler.__exit__(None, None, None)
if self.json_trace_path is not None:
self.profiler.export_chrome_trace(self.json_trace_path)
@HOOKS.register_module()
class NPUProfilerHook(Hook):
"""NPUProfiler to analyze performance during training.
NPU Profiling is used to count the device execution time of all operators.
The torch_npu.npu.profile interface is used to complete the profiling data
collection at each stage of the project, and the data is analyzed by the
msprof tool and the data can be dumped to further manually analyze the
key performance bottlenecks. For more details on the torch_npu.npu.profile
interface, please visit
https://gitee.com/ascend/pytorch/blob/master/torch_npu/npu/profiler.py#profile
Args:
begin (int): Number of start iterations for profiling. Defaults to 0.
end (int): Number of end iterations for profiling. Defaults to 1.
result_path (str): The path to save the profiling results file.
Defaults to 'cann_profiling'.
exit_after_profiling (bool): Whether to exit the program after
profiling. Defaults to True.
use_e2e_profiler (bool): Turn on E2E profiling, E2E profiling combines
performance data at the Pytorch level and the NPU level to analyze
the bottlenecks of model performance end-to-end, and cannot show
detailed content, and only as an auxiliary analysis.
Defaults to False.
ge_profiling_to_std_out (bool): Turn on GE profiling, GE uses to
collect the profiling data of the host side scheduling of the
Assend device. Defaults to False.
Examples:
>>> cfg = ...
>>> profiler_config = dict(type='NPUProfilerHook', end=2)
>>> cfg.merge_from_dict({'custom_hooks': custom_hooks})
>>> runner = Runner.from_cfg(cfg)
>>> runner.train()
"""
priority = 'VERY_LOW'
def __init__(self,
*,
begin: int = 0,
end: int = 1,
result_path: str = 'cann_profiling',
exit_after_profiling: bool = True,
use_e2e_profiler: bool = False,
ge_profiling_to_std_out: bool = False):
try:
import torch_npu
except ImportError:
raise ImportError('Failed to import torch_npu module')
if begin >= end:
raise ValueError(
'The iteration to start profiling should not be greater'
'than or equal to profile end')
self.begin = begin
self.end = end
self.result_path = result_path
self.exit_after_profiling = exit_after_profiling
if ge_profiling_to_std_out:
os.environ['GE_PROFILING_TO_STD_OUT'] = '1'
if not osp.exists(self.result_path):
os.makedirs(self.result_path, exist_ok=True)
self.profiler = torch_npu.npu.profile(
self.result_path, use_e2e_profiler=use_e2e_profiler)
@master_only
def before_run(self, runner):
if self.end > runner.max_iters:
raise ValueError(
'The profiling end iteration should not be greater'
'than the max iteration')
@master_only
def before_train_iter(self, runner, batch_idx, data_batch=None):
if runner.iter == self.begin:
self.profiler.__enter__()
runner.logger.info('NPUProfiler starts profiling...')
@master_only
def after_train_iter(self,
runner,
batch_idx,
data_batch=None,
outputs=None):
if runner.iter == self.end - 1:
runner.logger.info('profiler may take a few minutes to'
' save the profiling result.')
self.profiler.__exit__(None, None, None)
if self.exit_after_profiling:
sys.exit()
|