Spaces:
Running
Running
File size: 4,091 Bytes
c61ccee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
from typing import Any, Dict, List, NamedTuple, Optional
import torch
from torch.fx._compatibility import compatibility
from torch.fx.graph import Graph
from torch.fx.graph_module import GraphModule
from torch.fx.node import (
map_arg,
Node,
Target,
)
from torch.fx.passes.shape_prop import ShapeProp
__all__ = ['replace_target_nodes_with', 'size_bytes', 'get_size_of_all_nodes', 'get_tensor_meta',
'get_size_of_node']
@compatibility(is_backward_compatible=False)
def replace_target_nodes_with(
fx_module: GraphModule,
old_op: str,
old_target: Target,
new_op: str,
new_target: Target,
):
"""Modifies all nodes in fx_module.graph.nodes which match the specified op code and target,
and updates them to match the new op code and target"""
new_graph = Graph()
val_map: Dict[Node, Node] = {}
for node in fx_module.graph.nodes:
if node.op == old_op and node.target == old_target:
args = map_arg(node.args, lambda n: val_map[n])
kwargs = map_arg(node.kwargs, lambda n: val_map[n])
assert isinstance(args, tuple)
assert isinstance(kwargs, dict)
val_map[node] = new_graph.create_node(
new_op, new_target, args, kwargs, node.name
)
else:
val_map[node] = new_graph.node_copy(node, lambda n: val_map[n])
fx_module.graph = new_graph
@compatibility(is_backward_compatible=False)
class size_bytes(NamedTuple):
output_size: int
total_size: int
@compatibility(is_backward_compatible=False)
def get_size_of_all_nodes(
fx_module: GraphModule, args: Optional[List[torch.Tensor]] = None
) -> None:
"""Given a fx graph module, update each node with its total size (weights + bias + output)
and its output_size(output). For a non-module node, the total size is the output size.
return total size"""
if args is not None:
# Mark shape and dtype for each node (node.shape and node.dtype)
ShapeProp(fx_module).propagate(*args)
# Calculate the total size of the whole fx graph
total_size_of_graph = 0.0
for node in fx_module.graph.nodes:
if node.op == "output":
break
node.size_bytes = get_size_of_node(fx_module, node)
return
@compatibility(is_backward_compatible=False)
def get_tensor_meta(node: Node) -> Any:
tensor_meta = node.meta.get("tensor_meta")
if not tensor_meta:
raise RuntimeError(
f"Node {node} has no tensor metadata associated with it! "
f"Check that shape propagation has run."
)
return tensor_meta
@compatibility(is_backward_compatible=False)
def get_size_of_node(fx_module: GraphModule, node: Node) -> size_bytes:
"""Given a node with node.dtype and node.shape, return its total size and its output size.
total_size = weights + bias + output_size
"""
# Total num of elements
total_num_of_elems = 0
# For a module, conside all parameters
if node.op == "call_module":
submodule_dict = dict(fx_module.named_modules())
submodule = submodule_dict[node.target]
parameters = submodule.named_parameters()
# Parameters are named tuples
for name, p in parameters:
total_num_of_elems += p.numel()
# Don't forget the output size
# node.shape is the shape of this node's output
tensor_meta = get_tensor_meta(node)
output_elem = tensor_meta.shape.numel()
total_num_of_elems += output_elem
# Assume for now if it's quantized then it's qint8 or quint8
if tensor_meta.is_quantized:
size_per_elem_bytes = torch._empty_affine_quantized(
[], dtype=tensor_meta.dtype
).element_size()
else:
size_per_elem_bytes = torch.tensor([], dtype=tensor_meta.dtype).element_size()
total_size = size_per_elem_bytes * total_num_of_elems
output_size = size_per_elem_bytes * output_elem
return size_bytes(output_size, total_size)
|