Spaces:
Running
Running
import json | |
import math | |
import os | |
import re | |
from typing import Dict, List, Optional, Set | |
import torch | |
import torch.utils.benchmark as benchmark | |
from torch._C._profiler import ( | |
_EventType, | |
_ExtraFields_PyCall, | |
_ExtraFields_PyCCall, | |
_ExtraFields_TorchOp, | |
_ProfilerEvent, | |
) | |
from torch.profiler import profile | |
from torch.profiler._utils import index_of_first_match, traverse_bfs, traverse_dfs | |
class Pattern: | |
""" | |
Base class for all patterns, subclass this class and implement match() | |
to define custom patterns. | |
In subclass, define description and skip property. | |
""" | |
def __init__(self, prof: profile, should_benchmark: bool = False): | |
self.prof = prof | |
self.should_benchmark = should_benchmark | |
self.name = "Please specify a name for pattern" | |
self.description = "Please specify a description for pattern" | |
self.url = "" | |
assert prof.profiler is not None and prof.profiler.kineto_results is not None | |
self.event_tree = prof.profiler.kineto_results.experimental_event_tree() | |
self.tid_root: Dict[int, List[_ProfilerEvent]] = {} | |
for event in self.event_tree: | |
self.tid_root.setdefault(event.start_tid, []).append(event) | |
def skip(self): | |
return False | |
def report(self, event: _ProfilerEvent): | |
msg = ( | |
f"{self.description}\n[Source Code Location] {source_code_location(event)}" | |
) | |
return msg | |
def eventTreeTraversal(self): | |
""" | |
Traverse the event tree and yield all events. | |
Override this method in subclass to customize the traversal. | |
""" | |
yield from traverse_dfs(self.event_tree) | |
def summary(self, events: List[_ProfilerEvent]): | |
default_summary = f"{self.name}: {len(events)} events matched." | |
if self.should_benchmark: | |
# If benchmark summary is not empty, use it. | |
return ( | |
self.benchmark_summary(events) | |
if hasattr(self, "benchmark") # type: ignore[attr-defined] | |
else default_summary | |
) | |
return default_summary | |
def benchmark_summary(self, events: List[_ProfilerEvent]): | |
def format_time(time_ns: int): | |
unit_lst = ["ns", "us", "ms"] | |
for unit in unit_lst: | |
if time_ns < 1000: | |
return f"{time_ns:.2f} {unit}" | |
time_ns //= 1000 | |
return f"{time_ns:.2f} s" | |
assert hasattr(self, "benchmark"), "Please implement benchmark()" | |
shapes_factor_map = self.benchmark(events) # type: ignore[attr-defined] | |
original_time = sum(event.duration_time_ns for event in events) | |
new_time = sum( | |
shapes_factor_map[input_shapes(event)] * event.duration_time_ns | |
for event in events | |
) | |
return ( | |
f"{self.name}: {len(events)} events matched. " | |
f"Total Estimated Speedup: {format_time(original_time - new_time)} ({round(original_time/new_time, 2)}X)" | |
) | |
def match(self, event: _ProfilerEvent): | |
""" | |
Return True if the event matches the pattern. | |
This method should be overriden in subclass. | |
""" | |
raise NotImplementedError | |
def matched_events(self): | |
if self.skip: | |
return [] | |
matched_events = [] | |
for event in self.eventTreeTraversal(): | |
if self.match(event): | |
matched_events.append(event) | |
return matched_events | |
def root_of(self, event: _ProfilerEvent): | |
while event.parent: | |
event = event.parent | |
return event | |
def siblings_of(self, event: _ProfilerEvent): | |
if event.parent: | |
children = event.parent.children | |
else: | |
children = self.tid_root[event.start_tid] | |
index = children.index(event) | |
return children[:index], children[index + 1 :] | |
def next_of(self, event: _ProfilerEvent): | |
_, next_events = self.siblings_of(event) | |
return next_events[0] if next_events else None | |
def prev_of(self, event: _ProfilerEvent): | |
prev_events, _ = self.siblings_of(event) | |
return prev_events[-1] if prev_events else None | |
def go_up_until(self, event: _ProfilerEvent, predicate): | |
if not event: | |
return None | |
while event.parent and not predicate(event): | |
event = event.parent | |
return event | |
# Patterns | |
class NamePattern(Pattern): | |
def __init__(self, prof: profile, name: str, should_benchmark: bool = False): | |
super().__init__(prof, should_benchmark) | |
self.description = f"Matched Name Event: {name}" | |
self.name = name | |
def match(self, event: _ProfilerEvent): | |
return re.search(self.name, event.name) is not None | |
class ExtraCUDACopyPattern(Pattern): | |
""" | |
This pattern identifies if we creates a constant tensor on CPU and immediately moves it to GPU. | |
example: torch.zeros((100, 100)).to("cuda") | |
Pattern: | |
build-in method |build-in method | |
... | aten::to | |
aten::fill_/aten::zero_ | aten::_to_copy | |
Algorithm: | |
We start at node aten::to, go parent events' previous events, | |
and check if we have a aten::fill_/aten::zero_ as we keep going down the tree. | |
We always select the last child in the children list when we go down the tree. | |
If at any step we failed, it is not a match. | |
""" | |
def __init__(self, prof: profile, should_benchmark: bool = False): | |
super().__init__(prof, should_benchmark) | |
self.name = "Extra CUDA Copy Pattern" | |
self.description = "Filled a CPU tensor and immediately moved it to GPU. Please initialize it on GPU." | |
self.url = "https://pytorch.org/tutorials/recipes/recipes/tuning_guide.html#create-tensors-directly-on-the-target-device" | |
self.init_ops = { | |
"aten::fill_", | |
"aten::zero_", | |
"aten::normal_", | |
"aten::uniform_", | |
} | |
def skip(self): | |
return not self.prof.with_stack or not self.prof.record_shapes | |
def match(self, event): | |
# TODO: We should also check tensor identities | |
if event.name != "aten::to": | |
return False | |
to_event = event | |
if not event.children: | |
return False | |
event = event.children[-1] | |
if event.name != "aten::_to_copy": | |
return False | |
if not event.children: | |
return False | |
event = event.children[-1] | |
if event.name != "aten::copy_": | |
return False | |
# aten::copy_ should have the first 2 args dtype the same | |
dtypes = input_dtypes(event) | |
if len(dtypes) < 2: | |
return False | |
if dtypes[0] is None or dtypes[0] != dtypes[1]: | |
return False | |
event = to_event | |
# Up one level | |
event = event.parent | |
if event is None: | |
return False | |
# Check if we have a aten::fill_ in previous leaf | |
event = self.prev_of(event) | |
if event is None: | |
return False | |
while event.children: | |
event = event.children[-1] | |
# aten::zero_ is a special optimzation case where fill_ is not called | |
if event.name in self.init_ops: | |
return True | |
return event.name in self.init_ops | |
# TODO: Check if tensor is reused | |
def benchmark(self, events: List[_ProfilerEvent]): | |
shapes_factor_map = {input_shapes(event): 0.0 for event in events} | |
for shape in shapes_factor_map: | |
size = shape[0] | |
to_timer = benchmark.Timer( | |
stmt='torch.ones(size).to("cuda")', globals={"size": size} | |
) | |
de_timer = benchmark.Timer( | |
stmt='torch.ones(size, device="cuda")', globals={"size": size} | |
) | |
to_time = to_timer.timeit(10).mean | |
de_time = de_timer.timeit(10).mean | |
shapes_factor_map[shape] = de_time / to_time | |
return shapes_factor_map | |
class ForLoopIndexingPattern(Pattern): | |
""" | |
This pattern identifies if we use a for loop to index a tensor that | |
can be vectorized. | |
example: | |
tensor = torch.empty((100, 100)) | |
for i in range(100): | |
tensor[i] = i | |
Pattern: | |
aten::select | ... | aten::select | ... (Repeat) | |
Algorithm: | |
We start at node aten::select, and we check if we can find this alternating patterns. | |
We also keep a dictionary to avoid duplicate match in the for loop. | |
""" | |
def __init__(self, prof: profile, should_benchmark: bool = False): | |
super().__init__(prof, should_benchmark) | |
self.name = "For Loop Indexing Pattern" | |
self.description = "For loop indexing detected. Vectorization recommended." | |
self.visited: Set[int] = set() | |
def eventTreeTraversal(self): | |
""" | |
We need to use BFS traversal order to avoid duplicate match. | |
""" | |
yield from traverse_bfs(self.event_tree) | |
def match(self, event: _ProfilerEvent): | |
if event.name != "aten::select": | |
return False | |
if event.id in self.visited: | |
return False | |
repeat_count = 1 | |
_, next = self.siblings_of(event) | |
if len(next) <= 1: | |
return False | |
# Custom event list matching | |
def same_ops(list1, list2): | |
if len(list1) != len(list2): | |
return False | |
for op1, op2 in zip(list1, list2): | |
if op1.name != op2.name: | |
return False | |
return True | |
# Record the ops between two aten::select | |
next_select_idx = index_of_first_match(next, lambda e: e.name == "aten::select") | |
if next_select_idx is None: | |
return False | |
indexing_ops = [event] + next[:next_select_idx] | |
next = next[len(indexing_ops) - 1 :] | |
for i in range(0, len(next), len(indexing_ops)): | |
if same_ops(indexing_ops, next[i : i + len(indexing_ops)]): | |
repeat_count += 1 | |
self.visited.add(next[i].id) | |
else: | |
break | |
return repeat_count >= 10 | |
class FP32MatMulPattern(Pattern): | |
def __init__(self, prof: profile, should_benchmark: bool = False): | |
super().__init__(prof, should_benchmark) | |
self.name = "FP32 MatMul Pattern" | |
self.description = ( | |
"You are currently using GPU that supports TF32. " | |
"Please enable TF32 by setting 'torch.backends.cuda.matmul.allow_tf32 = True'" | |
) | |
self.url = "https://pytorch.org/docs/stable/notes/cuda.html#tensorfloat-32-tf32-on-ampere-devices" | |
def skip(self): | |
if torch.version.hip is not None: | |
has_tf32 = False | |
else: | |
# Anything less than sm_80 is not Ampere which doesn't support TF32 | |
has_tf32 = all(int(arch[3:]) >= 80 for arch in torch.cuda.get_arch_list()) | |
return has_tf32 is False or super().skip or not self.prof.record_shapes | |
def match(self, event: _ProfilerEvent): | |
# If we saw this pattern once, we don't need to match it again | |
if event.tag != _EventType.TorchOp: | |
return False | |
assert isinstance(event.extra_fields, _ExtraFields_TorchOp) | |
if event.name == "aten::mm": | |
if event.extra_fields.allow_tf32_cublas is False: | |
return True | |
return False | |
def report(self, event: _ProfilerEvent): | |
return self.description | |
def benchmark(self, events: List[_ProfilerEvent]): | |
shapes_factor_map = {input_shapes(event): 0.0 for event in events} | |
for shape in shapes_factor_map: | |
matrixA = torch.randn(shape[0], device="cuda", dtype=torch.float32) | |
matrixB = torch.randn(shape[1], device="cuda", dtype=torch.float32) | |
fp32_timer = benchmark.Timer( | |
stmt="torch.mm(matrixA, matrixB)", | |
globals={"matrixA": matrixA, "matrixB": matrixB}, | |
) | |
tf32_timer = benchmark.Timer( | |
stmt="torch.mm(matrixA, matrixB)", | |
setup="torch.backends.cuda.matmul.allow_tf32 = True", | |
globals={"matrixA": matrixA, "matrixB": matrixB}, | |
) | |
torch.backends.cuda.matmul.allow_tf32 = False | |
fp32_time = fp32_timer.timeit(10).mean | |
tf32_time = tf32_timer.timeit(10).mean | |
shapes_factor_map[shape] = tf32_time / fp32_time | |
return shapes_factor_map | |
class OptimizerSingleTensorPattern(Pattern): | |
""" | |
This pattern identifies if we are using the single-tensor version of an optimizer. | |
example: | |
optimizer = torch.optim.SGD(model.parameters(), lr=0.1) | |
By adding foreach=True to enable multi-tensor optimizer, we can gain speedup when | |
the kernels are relatively small. | |
Pattern: | |
XXXXX: _single_tenser_<OPTIMIZER_NAME> | |
Algorithm: | |
String match | |
""" | |
def __init__(self, prof: profile, should_benchmark: bool = False): | |
super().__init__(prof, should_benchmark) | |
self.name = "Optimizer Single Tensor Pattern" | |
self.optimizers_with_foreach = ["adam", "sgd", "adamw"] | |
self.description = ( | |
"Deteced optimizer running with single tensor implementation. " | |
"Please enable multi tensor implementation by passing 'foreach=True' into optimizer." | |
) | |
self.url = "" | |
def match(self, event: _ProfilerEvent): | |
for optimizer in self.optimizers_with_foreach: | |
if event.name.endswith(f"_single_tensor_{optimizer}"): | |
return True | |
return False | |
class SynchronizedDataLoaderPattern(Pattern): | |
""" | |
This pattern identifies if we are using num_workers=0 in DataLoader. | |
example: | |
torch.utils.data.DataLoader(dataset, batch_size=batch_size) | |
Add num_workers=N to the arguments. N depends on system configuration. | |
Pattern: | |
dataloader.py(...): __iter__ | |
dataloader.py(...): _get_iterator | |
NOT dataloader.py(...): check_worker_number_rationality | |
Algorithm: | |
If we don't see check_worker_number_rationality call in the dataloader __iter__, | |
It is not an asynchronous dataloader. | |
""" | |
def __init__(self, prof: profile, should_benchmark: bool = False): | |
super().__init__(prof, should_benchmark) | |
self.name = "Synchronized DataLoader Pattern" | |
self.description = ( | |
"Detected DataLoader running with synchronized implementation. " | |
"Please enable asynchronous dataloading by setting num_workers > 0 when initializing DataLoader." | |
) | |
self.url = ( | |
"https://pytorch.org/tutorials/recipes/recipes/tuning_guide.html" | |
"#enable-async-data-loading-and-augmentation" | |
) | |
def match(self, event: _ProfilerEvent): | |
def is_dataloader_function(name: str, function_name: str): | |
return name.startswith( | |
os.path.join("torch", "utils", "data", "dataloader.py") | |
) and name.endswith(function_name) | |
# TODO: fixme! Due to lifetime issues of the function name, this field might | |
# actually point to an already freed string when the even is a PyCall. | |
# Just silently skip this to unblock testing. | |
try: | |
event.name | |
except UnicodeDecodeError: | |
return False | |
if not is_dataloader_function(event.name, "__iter__"): | |
return False | |
if not event.children: | |
return False | |
event = event.children[0] | |
if not is_dataloader_function(event.name, "_get_iterator"): | |
return False | |
if not event.children: | |
return False | |
event = event.children[0] | |
return not is_dataloader_function(event.name, "check_worker_number_rationality") | |
# TODO: We should also check if the loader is bottleneck. | |
class GradNotSetToNonePattern(Pattern): | |
""" | |
This pattern identifies if we are not setting grad to None in zero_grad. | |
example: | |
optimizer.zero_grad() | |
By setting set_to_none=True, we can gain speedup | |
Pattern: | |
XXXXX: _zero_grad | |
NOT aten::zeros | |
aten::zero_ | |
aten::zero_ is called on each parameter in the model. | |
We also want to make sure it is not called by aten::zeros. | |
Algorithm: | |
String match | |
""" | |
def __init__(self, prof: profile, should_benchmark: bool = False): | |
super().__init__(prof, should_benchmark) | |
self.name = "Gradient Set To Zero Instead of None Pattern" | |
self.description = ( | |
"Detected gradient set to zero instead of None. " | |
"Please add 'set_to_none=True' when calling zero_grad()." | |
) | |
self.url = ( | |
"https://pytorch.org/tutorials/recipes/recipes/tuning_guide.html" | |
"#disable-gradient-calculation-for-validation-or-inference" | |
) | |
def match(self, event: _ProfilerEvent): | |
if not event.name.endswith(": zero_grad"): | |
return False | |
if not event.children: | |
return False | |
for sub_event in traverse_dfs(event.children): | |
if ( | |
sub_event.name == "aten::zero_" | |
and sub_event.parent.name != "aten::zeros" | |
): | |
return True | |
# TODO: We should also check if the optimizer's numerical behavior will change. | |
return False | |
class Conv2dBiasFollowedByBatchNorm2dPattern(Pattern): | |
""" | |
This pattern identifies if we are enabling bias in Conv2d which is followed by BatchNorm2d. | |
Bias doesn't do anything when followed by batchnorm. | |
Pattern: | |
nn.Module: Conv2d | nn.Module: BatchNorm2d | |
... | |
aten::conv2d AND dtype of third argument is not null | |
The third argument is the bias | |
Algorithm: | |
String match | |
""" | |
def __init__(self, prof: profile, should_benchmark: bool = False): | |
super().__init__(prof, should_benchmark) | |
self.name = "Enabling Bias in Conv2d Followed By BatchNorm Pattern" | |
self.description = "Detected bias enabled in Conv2d that is followed by BatchNorm2d. Please set 'bias=False' in Conv2d." | |
self.url = ( | |
"https://pytorch.org/tutorials/recipes/recipes/tuning_guide.html" | |
"#disable-bias-for-convolutions-directly-followed-by-a-batch-norm" | |
) | |
def skip(self): | |
return self.prof.record_shapes is False or super().skip | |
def match(self, event: _ProfilerEvent): | |
if event.name != "aten::conv2d": | |
return False | |
if len(input_dtypes(event)) < 3 or input_dtypes(event)[2] is None: | |
return False | |
# This means bias=True | |
event = self.go_up_until( | |
event, lambda e: e.name.startswith("nn.Module: Conv2d") | |
) | |
if not event: | |
return False | |
event = self.next_of(event) | |
if not event: | |
return False | |
return event.name.startswith("nn.Module: BatchNorm2d") | |
class MatMulDimInFP16Pattern(Pattern): | |
def __init__(self, prof: profile, should_benchmark: bool = False): | |
super().__init__(prof, should_benchmark) | |
self.name = "Matrix Multiplication Dimension Not Aligned Pattern" | |
self.description = "Detected matmul with dimension not aligned. Please use matmul with aligned dimension." | |
self.url = "https://pytorch.org/tutorials/recipes/recipes/tuning_guide.html#use-mixed-precision-and-amp" | |
def skip(self): | |
return not self.prof.with_stack or not self.prof.record_shapes | |
def match(self, event: _ProfilerEvent): | |
def mutiple_of(shapes, multiple): | |
return all(dim % multiple == 0 for shape in shapes for dim in shape[-2:]) | |
if event.name not in ("aten::mm", "aten::bmm", "aten::addmm"): | |
return False | |
if not input_dtypes(event): | |
return False | |
arg_dtype = input_dtypes(event)[0] | |
if arg_dtype in (torch.bfloat16, torch.half) and not mutiple_of( | |
input_shapes(event), 8 | |
): | |
return True | |
return False | |
def benchmark(self, events: List[_ProfilerEvent]): | |
def closest_multiple(shapes, multiple): | |
return [multiple * math.ceil(shape / multiple) for shape in shapes] | |
shapes_factor_map = {input_shapes(event): 0.0 for event in events} | |
for shape in shapes_factor_map: | |
matrixA = torch.randn(shape[0], device="cuda", dtype=torch.float16) | |
matrixB = torch.randn(shape[1], device="cuda", dtype=torch.float16) | |
not_aligned_dim_timer = benchmark.Timer( | |
stmt="torch.mm(matrixA, matrixB)", | |
globals={"matrixA": matrixA, "matrixB": matrixB}, | |
) | |
matrixA = torch.randn( | |
closest_multiple(shape[0], 8), device="cuda", dtype=torch.float16 | |
) | |
matrixB = torch.randn( | |
closest_multiple(shape[1], 8), device="cuda", dtype=torch.float16 | |
) | |
aligned_dim_timer = benchmark.Timer( | |
stmt="torch.mm(matrixA, matrixB)", | |
globals={"matrixA": matrixA, "matrixB": matrixB}, | |
) | |
not_aligned_dim_time = not_aligned_dim_timer.timeit(10).mean | |
aligned_dim_time = aligned_dim_timer.timeit(10).mean | |
shapes_factor_map[shape] = aligned_dim_time / not_aligned_dim_time | |
return shapes_factor_map | |
def source_code_location(event: Optional[_ProfilerEvent]): | |
while event: | |
if event.tag == _EventType.PyCall or event.tag == _EventType.PyCCall: | |
assert isinstance( | |
event.extra_fields, (_ExtraFields_PyCall, _ExtraFields_PyCCall) | |
) | |
if not event.extra_fields.caller.file_name.startswith("torch" + os.sep): | |
return f"{event.extra_fields.caller.file_name}:{event.extra_fields.caller.line_number}" | |
event = event.parent | |
return "No source code location found" | |
def input_shapes(event: _ProfilerEvent): | |
assert isinstance(event.extra_fields, _ExtraFields_TorchOp) | |
return tuple(tuple(getattr(i, "sizes", ())) for i in event.extra_fields.inputs) | |
def input_dtypes(event: _ProfilerEvent): | |
assert isinstance(event.extra_fields, _ExtraFields_TorchOp) | |
return tuple(getattr(i, "dtype", None) for i in event.extra_fields.inputs) | |
def report_all_anti_patterns( | |
prof, | |
should_benchmark: bool = False, | |
print_enable: bool = True, | |
json_report_dir: Optional[str] = None, | |
): | |
report_dict: Dict = {} | |
anti_patterns = [ | |
ExtraCUDACopyPattern(prof, should_benchmark), | |
# ForLoopIndexingPattern(prof, should_benchmark), | |
FP32MatMulPattern(prof, should_benchmark), | |
OptimizerSingleTensorPattern(prof, should_benchmark), | |
SynchronizedDataLoaderPattern(prof, should_benchmark), | |
GradNotSetToNonePattern(prof, should_benchmark), | |
Conv2dBiasFollowedByBatchNorm2dPattern(prof, should_benchmark), | |
MatMulDimInFP16Pattern(prof, should_benchmark), | |
] | |
reported = set() | |
summaries = [] | |
message_list = [f"{'-'*40}TorchTidy Report{'-'*40}"] | |
message_list.append("Matched Events:") | |
for anti_pattern in anti_patterns: | |
matched_events = anti_pattern.matched_events() | |
if not matched_events: | |
continue | |
summaries.append(anti_pattern.summary(matched_events)) | |
for event in matched_events: | |
report_msg = anti_pattern.report(event) | |
if report_msg not in reported: | |
message_list.append(report_msg) | |
reported.add(report_msg) | |
src_location, line_no = source_code_location(event).split(":") | |
report_dict.setdefault(src_location, []).append( | |
{ | |
"line_number": int(line_no), | |
"name": anti_pattern.name, | |
"url": anti_pattern.url, | |
"message": anti_pattern.description, | |
} | |
) | |
if json_report_dir is not None: | |
json_report_path = os.path.join(json_report_dir, "torchtidy_report.json") | |
if os.path.exists(json_report_path): | |
with open(json_report_path) as f: | |
exisiting_report = json.load(f) | |
exisiting_report.update(report_dict) | |
report_dict = exisiting_report | |
with open(json_report_path, "w") as f: | |
json.dump(report_dict, f, indent=4) | |
message_list.append("Summary:") | |
message_list += summaries | |
message_list.append(f"{'-'*40}TorchTidy Report{'-'*40}") | |
if print_enable: | |
print("\n".join(message_list)) | |