Spaces:
Runtime error
Runtime error
| # Copyright (c) SenseTime Research. All rights reserved. | |
| # Copyright (c) 2019, NVIDIA Corporation. All rights reserved. | |
| # | |
| # This work is made available under the Nvidia Source Code License-NC. | |
| # To view a copy of this license, visit | |
| # https://nvlabs.github.io/stylegan2/license.html | |
| """Helper for managing networks.""" | |
| import types | |
| import inspect | |
| import re | |
| import uuid | |
| import sys | |
| import numpy as np | |
| import tensorflow as tf | |
| from collections import OrderedDict | |
| from typing import Any, List, Tuple, Union | |
| from . import tfutil | |
| from .. import util | |
| from .tfutil import TfExpression, TfExpressionEx | |
| _import_handlers = [] # Custom import handlers for dealing with legacy data in pickle import. | |
| _import_module_src = dict() # Source code for temporary modules created during pickle import. | |
| def import_handler(handler_func): | |
| """Function decorator for declaring custom import handlers.""" | |
| _import_handlers.append(handler_func) | |
| return handler_func | |
| class Network: | |
| """Generic network abstraction. | |
| Acts as a convenience wrapper for a parameterized network construction | |
| function, providing several utility methods and convenient access to | |
| the inputs/outputs/weights. | |
| Network objects can be safely pickled and unpickled for long-term | |
| archival purposes. The pickling works reliably as long as the underlying | |
| network construction function is defined in a standalone Python module | |
| that has no side effects or application-specific imports. | |
| Args: | |
| name: Network name. Used to select TensorFlow name and variable scopes. | |
| func_name: Fully qualified name of the underlying network construction function, or a top-level function object. | |
| static_kwargs: Keyword arguments to be passed in to the network construction function. | |
| Attributes: | |
| name: User-specified name, defaults to build func name if None. | |
| scope: Unique TensorFlow scope containing template graph and variables, derived from the user-specified name. | |
| static_kwargs: Arguments passed to the user-supplied build func. | |
| components: Container for sub-networks. Passed to the build func, and retained between calls. | |
| num_inputs: Number of input tensors. | |
| num_outputs: Number of output tensors. | |
| input_shapes: Input tensor shapes (NC or NCHW), including minibatch dimension. | |
| output_shapes: Output tensor shapes (NC or NCHW), including minibatch dimension. | |
| input_shape: Short-hand for input_shapes[0]. | |
| output_shape: Short-hand for output_shapes[0]. | |
| input_templates: Input placeholders in the template graph. | |
| output_templates: Output tensors in the template graph. | |
| input_names: Name string for each input. | |
| output_names: Name string for each output. | |
| own_vars: Variables defined by this network (local_name => var), excluding sub-networks. | |
| vars: All variables (local_name => var). | |
| trainables: All trainable variables (local_name => var). | |
| var_global_to_local: Mapping from variable global names to local names. | |
| """ | |
| def __init__(self, name: str = None, func_name: Any = None, **static_kwargs): | |
| tfutil.assert_tf_initialized() | |
| assert isinstance(name, str) or name is None | |
| assert func_name is not None | |
| assert isinstance(func_name, str) or util.is_top_level_function(func_name) | |
| assert util.is_pickleable(static_kwargs) | |
| self._init_fields() | |
| self.name = name | |
| self.static_kwargs = util.EasyDict(static_kwargs) | |
| # Locate the user-specified network build function. | |
| if util.is_top_level_function(func_name): | |
| func_name = util.get_top_level_function_name(func_name) | |
| module, self._build_func_name = util.get_module_from_obj_name(func_name) | |
| self._build_func = util.get_obj_from_module(module, self._build_func_name) | |
| assert callable(self._build_func) | |
| # Dig up source code for the module containing the build function. | |
| self._build_module_src = _import_module_src.get(module, None) | |
| if self._build_module_src is None: | |
| self._build_module_src = inspect.getsource(module) | |
| # Init TensorFlow graph. | |
| self._init_graph() | |
| self.reset_own_vars() | |
| def _init_fields(self) -> None: | |
| self.name = None | |
| self.scope = None | |
| self.static_kwargs = util.EasyDict() | |
| self.components = util.EasyDict() | |
| self.num_inputs = 0 | |
| self.num_outputs = 0 | |
| self.input_shapes = [[]] | |
| self.output_shapes = [[]] | |
| self.input_shape = [] | |
| self.output_shape = [] | |
| self.input_templates = [] | |
| self.output_templates = [] | |
| self.input_names = [] | |
| self.output_names = [] | |
| self.own_vars = OrderedDict() | |
| self.vars = OrderedDict() | |
| self.trainables = OrderedDict() | |
| self.var_global_to_local = OrderedDict() | |
| self._build_func = None # User-supplied build function that constructs the network. | |
| self._build_func_name = None # Name of the build function. | |
| self._build_module_src = None # Full source code of the module containing the build function. | |
| self._run_cache = dict() # Cached graph data for Network.run(). | |
| def _init_graph(self) -> None: | |
| # Collect inputs. | |
| self.input_names = [] | |
| for param in inspect.signature(self._build_func).parameters.values(): | |
| if param.kind == param.POSITIONAL_OR_KEYWORD and param.default is param.empty: | |
| self.input_names.append(param.name) | |
| self.num_inputs = len(self.input_names) | |
| assert self.num_inputs >= 1 | |
| # Choose name and scope. | |
| if self.name is None: | |
| self.name = self._build_func_name | |
| assert re.match("^[A-Za-z0-9_.\\-]*$", self.name) | |
| with tf.name_scope(None): | |
| self.scope = tf.get_default_graph().unique_name(self.name, mark_as_used=True) | |
| # Finalize build func kwargs. | |
| build_kwargs = dict(self.static_kwargs) | |
| build_kwargs["is_template_graph"] = True | |
| build_kwargs["components"] = self.components | |
| # Build template graph. | |
| with tfutil.absolute_variable_scope(self.scope, reuse=False), tfutil.absolute_name_scope(self.scope): # ignore surrounding scopes | |
| assert tf.get_variable_scope().name == self.scope | |
| assert tf.get_default_graph().get_name_scope() == self.scope | |
| with tf.control_dependencies(None): # ignore surrounding control dependencies | |
| self.input_templates = [tf.placeholder(tf.float32, name=name) for name in self.input_names] | |
| out_expr = self._build_func(*self.input_templates, **build_kwargs) | |
| # Collect outputs. | |
| assert tfutil.is_tf_expression(out_expr) or isinstance(out_expr, tuple) | |
| self.output_templates = [out_expr] if tfutil.is_tf_expression(out_expr) else list(out_expr) | |
| self.num_outputs = len(self.output_templates) | |
| assert self.num_outputs >= 1 | |
| assert all(tfutil.is_tf_expression(t) for t in self.output_templates) | |
| # Perform sanity checks. | |
| if any(t.shape.ndims is None for t in self.input_templates): | |
| raise ValueError("Network input shapes not defined. Please call x.set_shape() for each input.") | |
| if any(t.shape.ndims is None for t in self.output_templates): | |
| raise ValueError("Network output shapes not defined. Please call x.set_shape() where applicable.") | |
| if any(not isinstance(comp, Network) for comp in self.components.values()): | |
| raise ValueError("Components of a Network must be Networks themselves.") | |
| if len(self.components) != len(set(comp.name for comp in self.components.values())): | |
| raise ValueError("Components of a Network must have unique names.") | |
| # List inputs and outputs. | |
| self.input_shapes = [t.shape.as_list() for t in self.input_templates] | |
| self.output_shapes = [t.shape.as_list() for t in self.output_templates] | |
| self.input_shape = self.input_shapes[0] | |
| self.output_shape = self.output_shapes[0] | |
| self.output_names = [t.name.split("/")[-1].split(":")[0] for t in self.output_templates] | |
| # List variables. | |
| self.own_vars = OrderedDict((var.name[len(self.scope) + 1:].split(":")[0], var) for var in tf.global_variables(self.scope + "/")) | |
| self.vars = OrderedDict(self.own_vars) | |
| self.vars.update((comp.name + "/" + name, var) for comp in self.components.values() for name, var in comp.vars.items()) | |
| self.trainables = OrderedDict((name, var) for name, var in self.vars.items() if var.trainable) | |
| self.var_global_to_local = OrderedDict((var.name.split(":")[0], name) for name, var in self.vars.items()) | |
| def reset_own_vars(self) -> None: | |
| """Re-initialize all variables of this network, excluding sub-networks.""" | |
| tfutil.run([var.initializer for var in self.own_vars.values()]) | |
| def reset_vars(self) -> None: | |
| """Re-initialize all variables of this network, including sub-networks.""" | |
| tfutil.run([var.initializer for var in self.vars.values()]) | |
| def reset_trainables(self) -> None: | |
| """Re-initialize all trainable variables of this network, including sub-networks.""" | |
| tfutil.run([var.initializer for var in self.trainables.values()]) | |
| def get_output_for(self, *in_expr: TfExpression, return_as_list: bool = False, **dynamic_kwargs) -> Union[TfExpression, List[TfExpression]]: | |
| """Construct TensorFlow expression(s) for the output(s) of this network, given the input expression(s).""" | |
| assert len(in_expr) == self.num_inputs | |
| assert not all(expr is None for expr in in_expr) | |
| # Finalize build func kwargs. | |
| build_kwargs = dict(self.static_kwargs) | |
| build_kwargs.update(dynamic_kwargs) | |
| build_kwargs["is_template_graph"] = False | |
| build_kwargs["components"] = self.components | |
| # Build TensorFlow graph to evaluate the network. | |
| with tfutil.absolute_variable_scope(self.scope, reuse=True), tf.name_scope(self.name): | |
| assert tf.get_variable_scope().name == self.scope | |
| valid_inputs = [expr for expr in in_expr if expr is not None] | |
| final_inputs = [] | |
| for expr, name, shape in zip(in_expr, self.input_names, self.input_shapes): | |
| if expr is not None: | |
| expr = tf.identity(expr, name=name) | |
| else: | |
| expr = tf.zeros([tf.shape(valid_inputs[0])[0]] + shape[1:], name=name) | |
| final_inputs.append(expr) | |
| out_expr = self._build_func(*final_inputs, **build_kwargs) | |
| # Propagate input shapes back to the user-specified expressions. | |
| for expr, final in zip(in_expr, final_inputs): | |
| if isinstance(expr, tf.Tensor): | |
| expr.set_shape(final.shape) | |
| # Express outputs in the desired format. | |
| assert tfutil.is_tf_expression(out_expr) or isinstance(out_expr, tuple) | |
| if return_as_list: | |
| out_expr = [out_expr] if tfutil.is_tf_expression(out_expr) else list(out_expr) | |
| return out_expr | |
| def get_var_local_name(self, var_or_global_name: Union[TfExpression, str]) -> str: | |
| """Get the local name of a given variable, without any surrounding name scopes.""" | |
| assert tfutil.is_tf_expression(var_or_global_name) or isinstance(var_or_global_name, str) | |
| global_name = var_or_global_name if isinstance(var_or_global_name, str) else var_or_global_name.name | |
| return self.var_global_to_local[global_name] | |
| def find_var(self, var_or_local_name: Union[TfExpression, str]) -> TfExpression: | |
| """Find variable by local or global name.""" | |
| assert tfutil.is_tf_expression(var_or_local_name) or isinstance(var_or_local_name, str) | |
| return self.vars[var_or_local_name] if isinstance(var_or_local_name, str) else var_or_local_name | |
| def get_var(self, var_or_local_name: Union[TfExpression, str]) -> np.ndarray: | |
| """Get the value of a given variable as NumPy array. | |
| Note: This method is very inefficient -- prefer to use tflib.run(list_of_vars) whenever possible.""" | |
| return self.find_var(var_or_local_name).eval() | |
| def set_var(self, var_or_local_name: Union[TfExpression, str], new_value: Union[int, float, np.ndarray]) -> None: | |
| """Set the value of a given variable based on the given NumPy array. | |
| Note: This method is very inefficient -- prefer to use tflib.set_vars() whenever possible.""" | |
| tfutil.set_vars({self.find_var(var_or_local_name): new_value}) | |
| def __getstate__(self) -> dict: | |
| """Pickle export.""" | |
| state = dict() | |
| state["version"] = 4 | |
| state["name"] = self.name | |
| state["static_kwargs"] = dict(self.static_kwargs) | |
| state["components"] = dict(self.components) | |
| state["build_module_src"] = self._build_module_src | |
| state["build_func_name"] = self._build_func_name | |
| state["variables"] = list(zip(self.own_vars.keys(), tfutil.run(list(self.own_vars.values())))) | |
| return state | |
| def __setstate__(self, state: dict) -> None: | |
| """Pickle import.""" | |
| # pylint: disable=attribute-defined-outside-init | |
| tfutil.assert_tf_initialized() | |
| self._init_fields() | |
| # Execute custom import handlers. | |
| for handler in _import_handlers: | |
| state = handler(state) | |
| # Set basic fields. | |
| assert state["version"] in [2, 3, 4] | |
| self.name = state["name"] | |
| self.static_kwargs = util.EasyDict(state["static_kwargs"]) | |
| self.components = util.EasyDict(state.get("components", {})) | |
| self._build_module_src = state["build_module_src"] | |
| self._build_func_name = state["build_func_name"] | |
| # Create temporary module from the imported source code. | |
| module_name = "_tflib_network_import_" + uuid.uuid4().hex | |
| module = types.ModuleType(module_name) | |
| sys.modules[module_name] = module | |
| _import_module_src[module] = self._build_module_src | |
| exec(self._build_module_src, module.__dict__) # pylint: disable=exec-used | |
| # Locate network build function in the temporary module. | |
| self._build_func = util.get_obj_from_module(module, self._build_func_name) | |
| assert callable(self._build_func) | |
| # Init TensorFlow graph. | |
| self._init_graph() | |
| self.reset_own_vars() | |
| tfutil.set_vars({self.find_var(name): value for name, value in state["variables"]}) | |
| def clone(self, name: str = None, **new_static_kwargs) -> "Network": | |
| """Create a clone of this network with its own copy of the variables.""" | |
| # pylint: disable=protected-access | |
| net = object.__new__(Network) | |
| net._init_fields() | |
| net.name = name if name is not None else self.name | |
| net.static_kwargs = util.EasyDict(self.static_kwargs) | |
| net.static_kwargs.update(new_static_kwargs) | |
| net._build_module_src = self._build_module_src | |
| net._build_func_name = self._build_func_name | |
| net._build_func = self._build_func | |
| net._init_graph() | |
| net.copy_vars_from(self) | |
| return net | |
| def copy_own_vars_from(self, src_net: "Network") -> None: | |
| """Copy the values of all variables from the given network, excluding sub-networks.""" | |
| names = [name for name in self.own_vars.keys() if name in src_net.own_vars] | |
| tfutil.set_vars(tfutil.run({self.vars[name]: src_net.vars[name] for name in names})) | |
| def copy_vars_from(self, src_net: "Network") -> None: | |
| """Copy the values of all variables from the given network, including sub-networks.""" | |
| names = [name for name in self.vars.keys() if name in src_net.vars] | |
| tfutil.set_vars(tfutil.run({self.vars[name]: src_net.vars[name] for name in names})) | |
| def copy_trainables_from(self, src_net: "Network") -> None: | |
| """Copy the values of all trainable variables from the given network, including sub-networks.""" | |
| names = [name for name in self.trainables.keys() if name in src_net.trainables] | |
| tfutil.set_vars(tfutil.run({self.vars[name]: src_net.vars[name] for name in names})) | |
| def convert(self, new_func_name: str, new_name: str = None, **new_static_kwargs) -> "Network": | |
| """Create new network with the given parameters, and copy all variables from this network.""" | |
| if new_name is None: | |
| new_name = self.name | |
| static_kwargs = dict(self.static_kwargs) | |
| static_kwargs.update(new_static_kwargs) | |
| net = Network(name=new_name, func_name=new_func_name, **static_kwargs) | |
| net.copy_vars_from(self) | |
| return net | |
| def setup_as_moving_average_of(self, src_net: "Network", beta: TfExpressionEx = 0.99, beta_nontrainable: TfExpressionEx = 0.0) -> tf.Operation: | |
| """Construct a TensorFlow op that updates the variables of this network | |
| to be slightly closer to those of the given network.""" | |
| with tfutil.absolute_name_scope(self.scope + "/_MovingAvg"): | |
| ops = [] | |
| for name, var in self.vars.items(): | |
| if name in src_net.vars: | |
| cur_beta = beta if name in self.trainables else beta_nontrainable | |
| new_value = tfutil.lerp(src_net.vars[name], var, cur_beta) | |
| ops.append(var.assign(new_value)) | |
| return tf.group(*ops) | |
| def run(self, | |
| *in_arrays: Tuple[Union[np.ndarray, None], ...], | |
| input_transform: dict = None, | |
| output_transform: dict = None, | |
| return_as_list: bool = False, | |
| print_progress: bool = False, | |
| minibatch_size: int = None, | |
| num_gpus: int = 1, | |
| assume_frozen: bool = False, | |
| **dynamic_kwargs) -> Union[np.ndarray, Tuple[np.ndarray, ...], List[np.ndarray]]: | |
| """Run this network for the given NumPy array(s), and return the output(s) as NumPy array(s). | |
| Args: | |
| input_transform: A dict specifying a custom transformation to be applied to the input tensor(s) before evaluating the network. | |
| The dict must contain a 'func' field that points to a top-level function. The function is called with the input | |
| TensorFlow expression(s) as positional arguments. Any remaining fields of the dict will be passed in as kwargs. | |
| output_transform: A dict specifying a custom transformation to be applied to the output tensor(s) after evaluating the network. | |
| The dict must contain a 'func' field that points to a top-level function. The function is called with the output | |
| TensorFlow expression(s) as positional arguments. Any remaining fields of the dict will be passed in as kwargs. | |
| return_as_list: True = return a list of NumPy arrays, False = return a single NumPy array, or a tuple if there are multiple outputs. | |
| print_progress: Print progress to the console? Useful for very large input arrays. | |
| minibatch_size: Maximum minibatch size to use, None = disable batching. | |
| num_gpus: Number of GPUs to use. | |
| assume_frozen: Improve multi-GPU performance by assuming that the trainable parameters will remain changed between calls. | |
| dynamic_kwargs: Additional keyword arguments to be passed into the network build function. | |
| """ | |
| assert len(in_arrays) == self.num_inputs | |
| assert not all(arr is None for arr in in_arrays) | |
| assert input_transform is None or util.is_top_level_function(input_transform["func"]) | |
| assert output_transform is None or util.is_top_level_function(output_transform["func"]) | |
| output_transform, dynamic_kwargs = _handle_legacy_output_transforms(output_transform, dynamic_kwargs) | |
| num_items = in_arrays[0].shape[0] | |
| if minibatch_size is None: | |
| minibatch_size = num_items | |
| # Construct unique hash key from all arguments that affect the TensorFlow graph. | |
| key = dict(input_transform=input_transform, output_transform=output_transform, num_gpus=num_gpus, assume_frozen=assume_frozen, dynamic_kwargs=dynamic_kwargs) | |
| def unwind_key(obj): | |
| if isinstance(obj, dict): | |
| return [(key, unwind_key(value)) for key, value in sorted(obj.items())] | |
| if callable(obj): | |
| return util.get_top_level_function_name(obj) | |
| return obj | |
| key = repr(unwind_key(key)) | |
| # Build graph. | |
| if key not in self._run_cache: | |
| with tfutil.absolute_name_scope(self.scope + "/_Run"), tf.control_dependencies(None): | |
| with tf.device("/cpu:0"): | |
| in_expr = [tf.placeholder(tf.float32, name=name) for name in self.input_names] | |
| in_split = list(zip(*[tf.split(x, num_gpus) for x in in_expr])) | |
| out_split = [] | |
| for gpu in range(num_gpus): | |
| with tf.device("/gpu:%d" % gpu): | |
| net_gpu = self.clone() if assume_frozen else self | |
| in_gpu = in_split[gpu] | |
| if input_transform is not None: | |
| in_kwargs = dict(input_transform) | |
| in_gpu = in_kwargs.pop("func")(*in_gpu, **in_kwargs) | |
| in_gpu = [in_gpu] if tfutil.is_tf_expression(in_gpu) else list(in_gpu) | |
| assert len(in_gpu) == self.num_inputs | |
| out_gpu = net_gpu.get_output_for(*in_gpu, return_as_list=True, **dynamic_kwargs) | |
| if output_transform is not None: | |
| out_kwargs = dict(output_transform) | |
| out_gpu = out_kwargs.pop("func")(*out_gpu, **out_kwargs) | |
| out_gpu = [out_gpu] if tfutil.is_tf_expression(out_gpu) else list(out_gpu) | |
| assert len(out_gpu) == self.num_outputs | |
| out_split.append(out_gpu) | |
| with tf.device("/cpu:0"): | |
| out_expr = [tf.concat(outputs, axis=0) for outputs in zip(*out_split)] | |
| self._run_cache[key] = in_expr, out_expr | |
| # Run minibatches. | |
| in_expr, out_expr = self._run_cache[key] | |
| out_arrays = [np.empty([num_items] + expr.shape.as_list()[1:], expr.dtype.name) for expr in out_expr] | |
| for mb_begin in range(0, num_items, minibatch_size): | |
| if print_progress: | |
| print("\r%d / %d" % (mb_begin, num_items), end="") | |
| mb_end = min(mb_begin + minibatch_size, num_items) | |
| mb_num = mb_end - mb_begin | |
| mb_in = [src[mb_begin : mb_end] if src is not None else np.zeros([mb_num] + shape[1:]) for src, shape in zip(in_arrays, self.input_shapes)] | |
| mb_out = tf.get_default_session().run(out_expr, dict(zip(in_expr, mb_in))) | |
| for dst, src in zip(out_arrays, mb_out): | |
| dst[mb_begin: mb_end] = src | |
| # Done. | |
| if print_progress: | |
| print("\r%d / %d" % (num_items, num_items)) | |
| if not return_as_list: | |
| out_arrays = out_arrays[0] if len(out_arrays) == 1 else tuple(out_arrays) | |
| return out_arrays | |
| def list_ops(self) -> List[TfExpression]: | |
| include_prefix = self.scope + "/" | |
| exclude_prefix = include_prefix + "_" | |
| ops = tf.get_default_graph().get_operations() | |
| ops = [op for op in ops if op.name.startswith(include_prefix)] | |
| ops = [op for op in ops if not op.name.startswith(exclude_prefix)] | |
| return ops | |
| def list_layers(self) -> List[Tuple[str, TfExpression, List[TfExpression]]]: | |
| """Returns a list of (layer_name, output_expr, trainable_vars) tuples corresponding to | |
| individual layers of the network. Mainly intended to be used for reporting.""" | |
| layers = [] | |
| def recurse(scope, parent_ops, parent_vars, level): | |
| # Ignore specific patterns. | |
| if any(p in scope for p in ["/Shape", "/strided_slice", "/Cast", "/concat", "/Assign"]): | |
| return | |
| # Filter ops and vars by scope. | |
| global_prefix = scope + "/" | |
| local_prefix = global_prefix[len(self.scope) + 1:] | |
| cur_ops = [op for op in parent_ops if op.name.startswith(global_prefix) or op.name == global_prefix[:-1]] | |
| cur_vars = [(name, var) for name, var in parent_vars if name.startswith(local_prefix) or name == local_prefix[:-1]] | |
| if not cur_ops and not cur_vars: | |
| return | |
| # Filter out all ops related to variables. | |
| for var in [op for op in cur_ops if op.type.startswith("Variable")]: | |
| var_prefix = var.name + "/" | |
| cur_ops = [op for op in cur_ops if not op.name.startswith(var_prefix)] | |
| # Scope does not contain ops as immediate children => recurse deeper. | |
| contains_direct_ops = any("/" not in op.name[len(global_prefix):] and op.type not in ["Identity", "Cast", "Transpose"] for op in cur_ops) | |
| if (level == 0 or not contains_direct_ops) and (len(cur_ops) + len(cur_vars)) > 1: | |
| visited = set() | |
| for rel_name in [op.name[len(global_prefix):] for op in cur_ops] + [name[len(local_prefix):] for name, _var in cur_vars]: | |
| token = rel_name.split("/")[0] | |
| if token not in visited: | |
| recurse(global_prefix + token, cur_ops, cur_vars, level + 1) | |
| visited.add(token) | |
| return | |
| # Report layer. | |
| layer_name = scope[len(self.scope) + 1:] | |
| layer_output = cur_ops[-1].outputs[0] if cur_ops else cur_vars[-1][1] | |
| layer_trainables = [var for _name, var in cur_vars if var.trainable] | |
| layers.append((layer_name, layer_output, layer_trainables)) | |
| recurse(self.scope, self.list_ops(), list(self.vars.items()), 0) | |
| return layers | |
| def print_layers(self, title: str = None, hide_layers_with_no_params: bool = False) -> None: | |
| """Print a summary table of the network structure.""" | |
| rows = [[title if title is not None else self.name, "Params", "OutputShape", "WeightShape"]] | |
| rows += [["---"] * 4] | |
| total_params = 0 | |
| for layer_name, layer_output, layer_trainables in self.list_layers(): | |
| num_params = sum(int(np.prod(var.shape.as_list())) for var in layer_trainables) | |
| weights = [var for var in layer_trainables if var.name.endswith("/weight:0")] | |
| weights.sort(key=lambda x: len(x.name)) | |
| if len(weights) == 0 and len(layer_trainables) == 1: | |
| weights = layer_trainables | |
| total_params += num_params | |
| if not hide_layers_with_no_params or num_params != 0: | |
| num_params_str = str(num_params) if num_params > 0 else "-" | |
| output_shape_str = str(layer_output.shape) | |
| weight_shape_str = str(weights[0].shape) if len(weights) >= 1 else "-" | |
| rows += [[layer_name, num_params_str, output_shape_str, weight_shape_str]] | |
| rows += [["---"] * 4] | |
| rows += [["Total", str(total_params), "", ""]] | |
| widths = [max(len(cell) for cell in column) for column in zip(*rows)] | |
| print() | |
| for row in rows: | |
| print(" ".join(cell + " " * (width - len(cell)) for cell, width in zip(row, widths))) | |
| print() | |
| def setup_weight_histograms(self, title: str = None) -> None: | |
| """Construct summary ops to include histograms of all trainable parameters in TensorBoard.""" | |
| if title is None: | |
| title = self.name | |
| with tf.name_scope(None), tf.device(None), tf.control_dependencies(None): | |
| for local_name, var in self.trainables.items(): | |
| if "/" in local_name: | |
| p = local_name.split("/") | |
| name = title + "_" + p[-1] + "/" + "_".join(p[:-1]) | |
| else: | |
| name = title + "_toplevel/" + local_name | |
| tf.summary.histogram(name, var) | |
| #---------------------------------------------------------------------------- | |
| # Backwards-compatible emulation of legacy output transformation in Network.run(). | |
| _print_legacy_warning = True | |
| def _handle_legacy_output_transforms(output_transform, dynamic_kwargs): | |
| global _print_legacy_warning | |
| legacy_kwargs = ["out_mul", "out_add", "out_shrink", "out_dtype"] | |
| if not any(kwarg in dynamic_kwargs for kwarg in legacy_kwargs): | |
| return output_transform, dynamic_kwargs | |
| if _print_legacy_warning: | |
| _print_legacy_warning = False | |
| print() | |
| print("WARNING: Old-style output transformations in Network.run() are deprecated.") | |
| print("Consider using 'output_transform=dict(func=tflib.convert_images_to_uint8)'") | |
| print("instead of 'out_mul=127.5, out_add=127.5, out_dtype=np.uint8'.") | |
| print() | |
| assert output_transform is None | |
| new_kwargs = dict(dynamic_kwargs) | |
| new_transform = {kwarg: new_kwargs.pop(kwarg) for kwarg in legacy_kwargs if kwarg in dynamic_kwargs} | |
| new_transform["func"] = _legacy_output_transform_func | |
| return new_transform, new_kwargs | |
| def _legacy_output_transform_func(*expr, out_mul=1.0, out_add=0.0, out_shrink=1, out_dtype=None): | |
| if out_mul != 1.0: | |
| expr = [x * out_mul for x in expr] | |
| if out_add != 0.0: | |
| expr = [x + out_add for x in expr] | |
| if out_shrink > 1: | |
| ksize = [1, 1, out_shrink, out_shrink] | |
| expr = [tf.nn.avg_pool(x, ksize=ksize, strides=ksize, padding="VALID", data_format="NCHW") for x in expr] | |
| if out_dtype is not None: | |
| if tf.as_dtype(out_dtype).is_integer: | |
| expr = [tf.round(x) for x in expr] | |
| expr = [tf.saturate_cast(x, out_dtype) for x in expr] | |
| return expr | |