Spaces:
Sleeping
Sleeping
| # coding=utf-8 | |
| # Copyright 2021 The Fairseq Authors and the HuggingFace Inc. team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """ TensorFlow Hubert model. """ | |
| import inspect | |
| import warnings | |
| from typing import Any, Dict, Optional, Tuple, Union | |
| import numpy as np | |
| import tensorflow as tf | |
| from ...activations_tf import get_tf_activation | |
| from ...file_utils import ( | |
| ModelOutput, | |
| add_start_docstrings, | |
| add_start_docstrings_to_model_forward, | |
| replace_return_docstrings, | |
| ) | |
| from ...modeling_tf_outputs import TFBaseModelOutput, TFCausalLMOutput | |
| from ...modeling_tf_utils import ( | |
| TFPreTrainedModel, | |
| booleans_processing, | |
| get_initializer, | |
| keras_serializable, | |
| shape_list, | |
| ) | |
| from ...tokenization_utils_base import BatchEncoding | |
| from ...utils import logging | |
| from .configuration_hubert import HubertConfig | |
| logger = logging.get_logger(__name__) | |
| _CONFIG_FOR_DOC = "HubertConfig" | |
| TF_HUBERT_PRETRAINED_MODEL_ARCHIVE_LIST = [ | |
| "facebook/hubert-base-ls960", | |
| # See all Hubert models at https://huggingface.co/models?filter=hubert | |
| ] | |
| LARGE_NEGATIVE = -1e8 | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.input_values_processing | |
| def input_values_processing(func, config, input_values, **kwargs): | |
| """ | |
| Process the input of each TensorFlow model including the booleans. In case of a list of symbolic inputs, each input | |
| has to be named accordingly to the parameters name, i.e. :obj:`input_values = tf.keras.Input(shape=(128,), | |
| dtype='float32', name="input_values")` otherwise the order of the tensors will not be guaranteed during the | |
| training. | |
| Args: | |
| func (:obj:`callable`): | |
| The callable function of the TensorFlow model. | |
| config (:class:`~transformers.PretrainedConfig`): | |
| The config of the running model. | |
| **kwargs: | |
| The inputs of the model. | |
| Returns: | |
| Two lists, one for the missing layers, and another one for the unexpected layers. | |
| """ | |
| signature = dict(inspect.signature(func).parameters) | |
| signature.pop("kwargs", None) | |
| signature.pop("self", None) | |
| parameter_names = list(signature.keys()) | |
| output = {} | |
| allowed_types = (tf.Tensor, bool, int, ModelOutput, tuple, list, dict, np.ndarray) | |
| for k, v in kwargs.items(): | |
| if isinstance(v, allowed_types) or v is None: | |
| output[k] = v | |
| else: | |
| raise ValueError(f"Data of type {type(v)} is not allowed only {allowed_types} is accepted for {k}.") | |
| if isinstance(input_values, (tuple, list)): | |
| for i, input in enumerate(input_values): | |
| # EagerTensors don't allow to use the .name property so we check for a real Tensor | |
| if type(input) == tf.Tensor: | |
| # Tensor names have always the pattern `name:id` then we check only the | |
| # `name` part | |
| tensor_name = input.name.split(":")[0] | |
| if tensor_name in parameter_names: | |
| output[tensor_name] = input | |
| else: | |
| output[parameter_names[i]] = input | |
| elif isinstance(input, allowed_types) or input is None: | |
| output[parameter_names[i]] = input | |
| else: | |
| raise ValueError( | |
| f"Data of type {type(input)} is not allowed only {allowed_types} is accepted for {parameter_names[i]}." | |
| ) | |
| elif isinstance(input_values, (dict, BatchEncoding)): | |
| if "inputs" in input_values: | |
| warnings.warn( | |
| "The `inputs` argument is deprecated and will be removed in a future version, use `input_values` instead.", | |
| FutureWarning, | |
| ) | |
| output["input_values"] = input_values.pop("inputs") | |
| if "decoder_cached_states" in input_values: | |
| warnings.warn( | |
| "The `decoder_cached_states` argument is deprecated and will be removed in a future version, use `past_key_values` instead.", | |
| FutureWarning, | |
| ) | |
| output["past_key_values"] = input_values.pop("decoder_cached_states") | |
| for k, v in dict(input_values).items(): | |
| if isinstance(v, allowed_types) or v is None: | |
| output[k] = v | |
| elif k not in parameter_names and "args" not in parameter_names: | |
| logger.warning( | |
| f"The parameter {k} does not belongs to the parameter list {parameter_names} and will be ignored." | |
| ) | |
| continue | |
| else: | |
| raise ValueError(f"Data of type {type(v)} is not allowed only {allowed_types} is accepted for {k}.") | |
| else: | |
| if isinstance(input_values, tf.Tensor) or input_values is None: | |
| output[parameter_names[0]] = input_values | |
| else: | |
| raise ValueError( | |
| f"Data of type {type(input_values)} is not allowed only {allowed_types} is accepted for {parameter_names[0]}." | |
| ) | |
| for name in parameter_names: | |
| if name not in list(output.keys()) and name != "args": | |
| output[name] = kwargs.pop(name, signature[name].default) | |
| # When creating a SavedModel TF calls the method with LayerCall.__call__(args, **kwargs) | |
| # So to respect the proper output we have to add this exception | |
| if "args" in output: | |
| if output["args"] is not None and type(output["args"]) == tf.Tensor: | |
| tensor_name = output["args"].name.split(":")[0] | |
| output[tensor_name] = output["args"] | |
| else: | |
| # `args` in this case is always the first parameter, then `input_values` | |
| output["input_values"] = output["args"] | |
| del output["args"] | |
| if "kwargs" in output: | |
| del output["kwargs"] | |
| boolean_dict = { | |
| k: v | |
| for k, v in output.items() | |
| if k in ["return_dict", "output_attentions", "output_hidden_states", "use_cache"] | |
| } | |
| output.update(booleans_processing(config=config, **boolean_dict)) | |
| return output | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2._sample_without_replacement | |
| def _sample_without_replacement(distribution, num_samples): | |
| """ | |
| Categorical sampling without replacement is currently not implemented. The gumbel-max trick will do for now - see | |
| https://github.com/tensorflow/tensorflow/issues/9260 for more info | |
| """ | |
| z = -tf.math.log(tf.random.uniform(shape_list(distribution), 0, 1)) | |
| _, indices = tf.nn.top_k(distribution + z, num_samples) | |
| return indices | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2._scatter_values_on_batch_indices | |
| def _scatter_values_on_batch_indices(values, batch_indices, output_shape): | |
| """ | |
| Scatter function as in PyTorch with indices in format (batch_dim, indixes) | |
| """ | |
| indices_shape = shape_list(batch_indices) | |
| # broadcast batch dim to indices_shape | |
| broad_casted_batch_dims = tf.reshape( | |
| tf.broadcast_to(tf.expand_dims(tf.range(indices_shape[0]), axis=-1), indices_shape), [1, -1] | |
| ) | |
| # transform batch_indices to pair_indices | |
| pair_indices = tf.transpose(tf.concat([broad_casted_batch_dims, tf.reshape(batch_indices, [1, -1])], 0)) | |
| # scatter values to pair indices | |
| return tf.scatter_nd(pair_indices, tf.reshape(values, [-1]), output_shape) | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2._compute_mask_indices | |
| def _compute_mask_indices( | |
| shape: Tuple[int, int], | |
| mask_prob: float, | |
| mask_length: int, | |
| min_masks: int = 0, | |
| ) -> tf.Tensor: | |
| """ | |
| Computes random mask spans for a given shape | |
| Args: | |
| shape: the the shape for which to compute masks. | |
| should be of size 2 where first element is batch size and 2nd is timesteps | |
| attention_mask: optional padding mask of the same size as shape, which will prevent masking padded elements | |
| mask_prob: probability for each token to be chosen as start of the span to be masked. this will be multiplied by | |
| number of timesteps divided by length of mask span to mask approximately this percentage of all elements. | |
| however due to overlaps, the actual number will be smaller (unless no_overlap is True) | |
| mask_length: size of the mask | |
| min_masks: minimum number of masked spans | |
| Adapted from `fairseq's data_utils.py | |
| <https://github.com/pytorch/fairseq/blob/e0788f7007a8473a76db573985031f3c94201e79/fairseq/data/data_utils.py#L376>`__. | |
| """ | |
| batch_size, sequence_length = shape | |
| if mask_length < 1: | |
| raise ValueError("`mask_length` has to be bigger than 0.") | |
| if mask_length > sequence_length: | |
| raise ValueError( | |
| f"`mask_length` has to be smaller than `sequence_length`, but got `mask_length`: {mask_length} and `sequence_length`: {sequence_length}`" | |
| ) | |
| # compute number of masked spans in batch | |
| num_masked_spans = int(mask_prob * sequence_length / mask_length + tf.random.uniform((1,))) | |
| num_masked_spans = max(num_masked_spans, min_masks) | |
| # make sure num masked indices <= sequence_length | |
| if num_masked_spans * mask_length > sequence_length: | |
| num_masked_spans = sequence_length // mask_length | |
| # SpecAugment mask to fill | |
| spec_aug_mask = tf.zeros((batch_size, sequence_length), dtype=tf.int32) | |
| # uniform distribution to sample from, make sure that offset samples are < sequence_length | |
| uniform_dist = tf.ones((batch_size, sequence_length - (mask_length - 1))) | |
| # get random indices to mask | |
| spec_aug_mask_idxs = _sample_without_replacement(uniform_dist, num_masked_spans) | |
| # expand masked indices to masked spans | |
| spec_aug_mask_idxs = tf.expand_dims(spec_aug_mask_idxs, -1) | |
| spec_aug_mask_idxs = tf.tile(spec_aug_mask_idxs, (1, 1, mask_length)) | |
| spec_aug_mask_idxs = tf.reshape(spec_aug_mask_idxs, (batch_size, num_masked_spans * mask_length)) | |
| offsets = tf.range(mask_length)[tf.newaxis, tf.newaxis, :] | |
| offsets = tf.tile(offsets, (batch_size, num_masked_spans, 1)) | |
| offsets = tf.reshape(offsets, (batch_size, num_masked_spans * mask_length)) | |
| spec_aug_mask_idxs = spec_aug_mask_idxs + offsets | |
| # scatter indices to mask | |
| spec_aug_mask = _scatter_values_on_batch_indices( | |
| tf.ones_like(spec_aug_mask_idxs), spec_aug_mask_idxs, spec_aug_mask.shape | |
| ) | |
| return tf.cast(spec_aug_mask, tf.float32) | |
| # Copied from transformers.models.bart.modeling_tf_bart._expand_mask | |
| def _expand_mask(mask: tf.Tensor, tgt_len: Optional[int] = None, past_key_values_length: int = 0): | |
| """ | |
| Expands attention_mask from `[bsz, seq_len]` to `[bsz, 1, tgt_seq_len, src_seq_len]`. | |
| """ | |
| src_len = shape_list(mask)[1] | |
| tgt_len = tgt_len if tgt_len is not None else src_len | |
| one_cst = tf.constant(1.0) | |
| mask = tf.cast(mask, dtype=one_cst.dtype) | |
| expanded_mask = tf.tile(mask[:, None, None, :], (1, 1, tgt_len, 1)) | |
| return (one_cst - expanded_mask) * LARGE_NEGATIVE | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2GroupNorm with Wav2Vec2->Hubert | |
| class TFHubertGroupNorm(tf.keras.layers.Layer): | |
| """ | |
| From tensorflow-addons https://www.tensorflow.org/addons/api_docs/python/tfa/layers/GroupNormalization | |
| """ | |
| def __init__( | |
| self, | |
| groups: int = 32, | |
| axis: int = -1, | |
| epsilon: float = 1e-3, | |
| center: bool = True, | |
| scale: bool = True, | |
| beta_initializer: tf.keras.initializers.Initializer = "zeros", | |
| gamma_initializer: tf.keras.initializers.Initializer = "ones", | |
| beta_regularizer: tf.keras.regularizers.Regularizer = None, | |
| gamma_regularizer: tf.keras.regularizers.Regularizer = None, | |
| beta_constraint: tf.keras.constraints.Constraint = None, | |
| gamma_constraint: tf.keras.constraints.Constraint = None, | |
| **kwargs, | |
| ): | |
| super().__init__(**kwargs) | |
| self.supports_masking = True | |
| self.groups = groups | |
| self.axis = axis | |
| self.epsilon = epsilon | |
| self.center = center | |
| self.scale = scale | |
| self.beta_initializer = tf.keras.initializers.get(beta_initializer) | |
| self.gamma_initializer = tf.keras.initializers.get(gamma_initializer) | |
| self.beta_regularizer = tf.keras.regularizers.get(beta_regularizer) | |
| self.gamma_regularizer = tf.keras.regularizers.get(gamma_regularizer) | |
| self.beta_constraint = tf.keras.constraints.get(beta_constraint) | |
| self.gamma_constraint = tf.keras.constraints.get(gamma_constraint) | |
| self._check_axis() | |
| def build(self, input_shape): | |
| self._check_if_input_shape_is_none(input_shape) | |
| self._set_number_of_groups_for_instance_norm(input_shape) | |
| self._check_size_of_dimensions(input_shape) | |
| self._create_input_spec(input_shape) | |
| self._add_gamma_weight(input_shape) | |
| self._add_beta_weight(input_shape) | |
| self.built = True | |
| super().build(input_shape) | |
| def call(self, inputs): | |
| input_shape = tf.keras.backend.int_shape(inputs) | |
| tensor_input_shape = tf.shape(inputs) | |
| reshaped_inputs, group_shape = self._reshape_into_groups(inputs, input_shape, tensor_input_shape) | |
| normalized_inputs = self._apply_normalization(reshaped_inputs, input_shape) | |
| is_instance_norm = (input_shape[self.axis] // self.groups) == 1 | |
| if not is_instance_norm: | |
| outputs = tf.reshape(normalized_inputs, tensor_input_shape) | |
| else: | |
| outputs = normalized_inputs | |
| return outputs | |
| def get_config(self): | |
| config = { | |
| "groups": self.groups, | |
| "axis": self.axis, | |
| "epsilon": self.epsilon, | |
| "center": self.center, | |
| "scale": self.scale, | |
| "beta_initializer": tf.keras.initializers.serialize(self.beta_initializer), | |
| "gamma_initializer": tf.keras.initializers.serialize(self.gamma_initializer), | |
| "beta_regularizer": tf.keras.regularizers.serialize(self.beta_regularizer), | |
| "gamma_regularizer": tf.keras.regularizers.serialize(self.gamma_regularizer), | |
| "beta_constraint": tf.keras.constraints.serialize(self.beta_constraint), | |
| "gamma_constraint": tf.keras.constraints.serialize(self.gamma_constraint), | |
| } | |
| base_config = super().get_config() | |
| return {**base_config, **config} | |
| def compute_output_shape(self, input_shape): | |
| return input_shape | |
| def _reshape_into_groups(self, inputs, input_shape, tensor_input_shape): | |
| group_shape = [tensor_input_shape[i] for i in range(len(input_shape))] | |
| is_instance_norm = (input_shape[self.axis] // self.groups) == 1 | |
| if not is_instance_norm: | |
| group_shape[self.axis] = input_shape[self.axis] // self.groups | |
| group_shape.insert(self.axis, self.groups) | |
| group_shape = tf.stack(group_shape) | |
| reshaped_inputs = tf.reshape(inputs, group_shape) | |
| return reshaped_inputs, group_shape | |
| else: | |
| return inputs, group_shape | |
| def _apply_normalization(self, reshaped_inputs, input_shape): | |
| group_shape = tf.keras.backend.int_shape(reshaped_inputs) | |
| group_reduction_axes = list(range(1, len(group_shape))) | |
| is_instance_norm = (input_shape[self.axis] // self.groups) == 1 | |
| if not is_instance_norm: | |
| axis = -2 if self.axis == -1 else self.axis - 1 | |
| else: | |
| axis = -1 if self.axis == -1 else self.axis - 1 | |
| group_reduction_axes.pop(axis) | |
| mean, variance = tf.nn.moments(reshaped_inputs, group_reduction_axes, keepdims=True) | |
| gamma, beta = self._get_reshaped_weights(input_shape) | |
| normalized_inputs = tf.nn.batch_normalization( | |
| reshaped_inputs, | |
| mean=mean, | |
| variance=variance, | |
| scale=gamma, | |
| offset=beta, | |
| variance_epsilon=self.epsilon, | |
| ) | |
| return normalized_inputs | |
| def _get_reshaped_weights(self, input_shape): | |
| broadcast_shape = self._create_broadcast_shape(input_shape) | |
| gamma = None | |
| beta = None | |
| if self.scale: | |
| gamma = tf.reshape(self.gamma, broadcast_shape) | |
| if self.center: | |
| beta = tf.reshape(self.beta, broadcast_shape) | |
| return gamma, beta | |
| def _check_if_input_shape_is_none(self, input_shape): | |
| dim = input_shape[self.axis] | |
| if dim is None: | |
| raise ValueError( | |
| "Axis " + str(self.axis) + " of " | |
| "input tensor should have a defined dimension " | |
| "but the layer received an input with shape " + str(input_shape) + "." | |
| ) | |
| def _set_number_of_groups_for_instance_norm(self, input_shape): | |
| dim = input_shape[self.axis] | |
| if self.groups == -1: | |
| self.groups = dim | |
| def _check_size_of_dimensions(self, input_shape): | |
| dim = input_shape[self.axis] | |
| if dim < self.groups: | |
| raise ValueError( | |
| "Number of groups (" + str(self.groups) + ") cannot be " | |
| "more than the number of channels (" + str(dim) + ")." | |
| ) | |
| if dim % self.groups != 0: | |
| raise ValueError( | |
| "Number of groups (" + str(self.groups) + ") must be a " | |
| "multiple of the number of channels (" + str(dim) + ")." | |
| ) | |
| def _check_axis(self): | |
| if self.axis == 0: | |
| raise ValueError( | |
| "You are trying to normalize your batch axis. Do you want to " | |
| "use tf.layer.batch_normalization instead" | |
| ) | |
| def _create_input_spec(self, input_shape): | |
| dim = input_shape[self.axis] | |
| self.input_spec = tf.keras.layers.InputSpec(ndim=len(input_shape), axes={self.axis: dim}) | |
| def _add_gamma_weight(self, input_shape): | |
| dim = input_shape[self.axis] | |
| shape = (dim,) | |
| if self.scale: | |
| self.gamma = self.add_weight( | |
| shape=shape, | |
| name="gamma", | |
| initializer=self.gamma_initializer, | |
| regularizer=self.gamma_regularizer, | |
| constraint=self.gamma_constraint, | |
| ) | |
| else: | |
| self.gamma = None | |
| def _add_beta_weight(self, input_shape): | |
| dim = input_shape[self.axis] | |
| shape = (dim,) | |
| if self.center: | |
| self.beta = self.add_weight( | |
| shape=shape, | |
| name="beta", | |
| initializer=self.beta_initializer, | |
| regularizer=self.beta_regularizer, | |
| constraint=self.beta_constraint, | |
| ) | |
| else: | |
| self.beta = None | |
| def _create_broadcast_shape(self, input_shape): | |
| broadcast_shape = [1] * len(input_shape) | |
| is_instance_norm = (input_shape[self.axis] // self.groups) == 1 | |
| if not is_instance_norm: | |
| broadcast_shape[self.axis] = input_shape[self.axis] // self.groups | |
| broadcast_shape.insert(self.axis, self.groups) | |
| else: | |
| broadcast_shape[self.axis] = self.groups | |
| return broadcast_shape | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2WeightNormConv1D with Wav2Vec2->Hubert | |
| class TFHubertWeightNormConv1D(tf.keras.layers.Conv1D): | |
| """Adapted from https://www.tensorflow.org/probability/api_docs/python/tfp/layers/weight_norm/WeightNorm""" | |
| def __init__(self, filters, kernel_size, groups, explicit_padding, **kwargs): | |
| super().__init__( | |
| filters=filters, | |
| kernel_size=kernel_size, | |
| groups=groups, | |
| padding="valid", | |
| use_bias=True, | |
| bias_initializer="he_normal", | |
| **kwargs, | |
| ) | |
| self.explicit_padding = explicit_padding | |
| self.filter_axis = 2 | |
| self.initialized = False | |
| self.kernel_norm_axes = tf.constant([0, 1]) | |
| def _init_norm(self): | |
| """Set the norm of the weight vector.""" | |
| kernel_norm = tf.sqrt(tf.reduce_sum(tf.square(self.weight_v), axis=self.kernel_norm_axes)) | |
| self.weight_g.assign(kernel_norm[:, tf.newaxis, tf.newaxis]) | |
| def _normalize_kernel(self): | |
| """Generate normalized weights.""" | |
| kernel = tf.nn.l2_normalize(self.weight_v, axis=self.kernel_norm_axes) * tf.transpose(self.weight_g) | |
| self.kernel = tf.transpose(kernel) | |
| def build(self, input_shape): | |
| if not self.built: | |
| super().build(input_shape) | |
| self.kernel = tf.Variable(tf.transpose(self.kernel), name="weight_v", trainable=True) | |
| self.weight_v = self.kernel | |
| self.weight_g = self.add_weight( | |
| name="weight_g", | |
| shape=(int(self.weight_v.shape[self.filter_axis]), 1, 1), | |
| initializer="ones", | |
| dtype=self.weight_v.dtype, | |
| trainable=True, | |
| ) | |
| self.bias = self.add_weight(name="bias", shape=(self.filters,), initializer="zeros", trainable=True) | |
| def call(self, inputs): | |
| if not self.initialized: | |
| self._init_norm() | |
| self.initialized = True | |
| self._normalize_kernel() | |
| padded_inputs = tf.pad(inputs, ((0, 0), (self.explicit_padding, self.explicit_padding), (0, 0))) | |
| output = super().call(padded_inputs) | |
| return output | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2NoLayerNormConvLayer with Wav2Vec2->Hubert | |
| class TFHubertNoLayerNormConvLayer(tf.keras.layers.Layer): | |
| def __init__(self, config: HubertConfig, layer_id: int = 0, **kwargs: Any) -> None: | |
| super().__init__(**kwargs) | |
| self.in_conv_dim = config.conv_dim[layer_id] if layer_id > 0 else 1 | |
| self.out_conv_dim = config.conv_dim[layer_id] | |
| self.conv = tf.keras.layers.Conv1D( | |
| filters=self.out_conv_dim, | |
| kernel_size=config.conv_kernel[layer_id], | |
| strides=config.conv_stride[layer_id], | |
| use_bias=config.conv_bias, | |
| name="conv", | |
| ) | |
| self.activation = get_tf_activation(config.feat_extract_activation) | |
| def call(self, hidden_states: tf.Tensor) -> tf.Tensor: | |
| hidden_states = self.conv(hidden_states) | |
| hidden_states = self.activation(hidden_states) | |
| return hidden_states | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2LayerNormConvLayer with Wav2Vec2->Hubert | |
| class TFHubertLayerNormConvLayer(tf.keras.layers.Layer): | |
| def __init__(self, config: HubertConfig, layer_id: int = 0, **kwargs: Any) -> None: | |
| super().__init__(**kwargs) | |
| self.in_conv_dim = config.conv_dim[layer_id] if layer_id > 0 else 1 | |
| self.out_conv_dim = config.conv_dim[layer_id] | |
| self.conv = tf.keras.layers.Conv1D( | |
| filters=self.out_conv_dim, | |
| kernel_size=config.conv_kernel[layer_id], | |
| strides=config.conv_stride[layer_id], | |
| use_bias=config.conv_bias, | |
| name="conv", | |
| ) | |
| self.layer_norm = tf.keras.layers.LayerNormalization(name="layer_norm", epsilon=config.layer_norm_eps) | |
| self.activation = get_tf_activation(config.feat_extract_activation) | |
| def call(self, hidden_states: tf.Tensor) -> tf.Tensor: | |
| hidden_states = self.conv(hidden_states) | |
| hidden_states = self.layer_norm(hidden_states) | |
| hidden_states = self.activation(hidden_states) | |
| return hidden_states | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2GroupNormConvLayer with Wav2Vec2->Hubert | |
| class TFHubertGroupNormConvLayer(tf.keras.layers.Layer): | |
| def __init__(self, config: HubertConfig, layer_id: int = 0, **kwargs: Any) -> None: | |
| super().__init__(**kwargs) | |
| self.in_conv_dim = config.conv_dim[layer_id] if layer_id > 0 else 1 | |
| self.out_conv_dim = config.conv_dim[layer_id] | |
| self.conv = tf.keras.layers.Conv1D( | |
| filters=self.out_conv_dim, | |
| kernel_size=config.conv_kernel[layer_id], | |
| strides=config.conv_stride[layer_id], | |
| use_bias=config.conv_bias, | |
| name="conv", | |
| ) | |
| self.activation = get_tf_activation(config.feat_extract_activation) | |
| self.layer_norm = TFHubertGroupNorm(groups=self.out_conv_dim, epsilon=config.layer_norm_eps, name="layer_norm") | |
| def call(self, hidden_states: tf.Tensor) -> tf.Tensor: | |
| hidden_states = self.conv(hidden_states) | |
| hidden_states = self.layer_norm(hidden_states) | |
| hidden_states = self.activation(hidden_states) | |
| return hidden_states | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2PositionalConvEmbedding with Wav2Vec2->Hubert | |
| class TFHubertPositionalConvEmbedding(tf.keras.layers.Layer): | |
| def __init__(self, config: HubertConfig, **kwargs: Any) -> None: | |
| super().__init__(**kwargs) | |
| self.conv = TFHubertWeightNormConv1D( | |
| filters=config.hidden_size, | |
| kernel_size=config.num_conv_pos_embeddings, | |
| groups=config.num_conv_pos_embedding_groups, | |
| explicit_padding=config.num_conv_pos_embeddings // 2, | |
| name="conv", | |
| ) | |
| self.padding = TFHubertSamePadLayer(config.num_conv_pos_embeddings) | |
| self.activation = get_tf_activation(config.feat_extract_activation) | |
| def call(self, hidden_states: tf.Tensor) -> tf.Tensor: | |
| hidden_states = self.conv(hidden_states) | |
| hidden_states = self.padding(hidden_states) | |
| hidden_states = self.activation(hidden_states) | |
| return hidden_states | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2SamePadLayer with Wav2Vec2->Hubert | |
| class TFHubertSamePadLayer(tf.keras.layers.Layer): | |
| def __init__(self, num_conv_pos_embeddings, **kwargs): | |
| super().__init__(**kwargs) | |
| self.num_pad_remove = 1 if num_conv_pos_embeddings % 2 == 0 else 0 | |
| def call(self, hidden_states): | |
| if self.num_pad_remove > 0: | |
| hidden_states = hidden_states[:, : -self.num_pad_remove, :] | |
| return hidden_states | |
| class TFHubertFeatureExtractor(tf.keras.layers.Layer): | |
| def __init__(self, config: HubertConfig, **kwargs: Any) -> None: | |
| super().__init__(**kwargs) | |
| if config.feat_extract_norm == "group": | |
| conv_layers = [TFHubertGroupNormConvLayer(config, layer_id=0, name=f"conv_layers.{0}")] + [ | |
| TFHubertNoLayerNormConvLayer(config, layer_id=i + 1, name=f"conv_layers.{i+1}") | |
| for i in range(config.num_feat_extract_layers - 1) | |
| ] | |
| elif config.feat_extract_norm == "layer": | |
| conv_layers = [ | |
| TFHubertLayerNormConvLayer(config, layer_id=i, name=f"conv_layers.{i}") | |
| for i in range(config.num_feat_extract_layers) | |
| ] | |
| else: | |
| raise ValueError( | |
| f"`config.feat_extract_norm` is {config.feat_extract_norm}, but has to be one of ['group', 'layer']" | |
| ) | |
| self.conv_layers = conv_layers | |
| def call(self, input_values): | |
| hidden_states = tf.expand_dims(input_values, -1) | |
| for conv_layer in self.conv_layers: | |
| hidden_states = conv_layer(hidden_states) | |
| return hidden_states | |
| class TFHubertFeatureProjection(tf.keras.layers.Layer): | |
| def __init__(self, config: HubertConfig, **kwargs): | |
| super().__init__(**kwargs) | |
| self.layer_norm = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="layer_norm") | |
| self.projection = tf.keras.layers.Dense( | |
| units=config.hidden_size, | |
| kernel_initializer=get_initializer(config.initializer_range), | |
| bias_initializer="zeros", | |
| name="projection", | |
| ) | |
| self.dropout = tf.keras.layers.Dropout(rate=config.feat_proj_dropout) | |
| def call(self, hidden_states: tf.Tensor, training: bool = False) -> tf.Tensor: | |
| hidden_states = self.layer_norm(hidden_states) | |
| hidden_states = self.projection(hidden_states) | |
| hidden_states = self.dropout(hidden_states, training=training) | |
| return hidden_states | |
| # Copied from transformers.models.bart.modeling_tf_bart.TFBartAttention with TFBart->TFHubert | |
| class TFHubertAttention(tf.keras.layers.Layer): | |
| """Multi-headed attention from "Attention Is All You Need""" | |
| def __init__( | |
| self, | |
| embed_dim: int, | |
| num_heads: int, | |
| dropout: float = 0.0, | |
| is_decoder: bool = False, | |
| bias: bool = True, | |
| **kwargs, | |
| ): | |
| super().__init__(**kwargs) | |
| self.embed_dim = embed_dim | |
| self.num_heads = num_heads | |
| self.dropout = tf.keras.layers.Dropout(dropout) | |
| self.head_dim = embed_dim // num_heads | |
| assert self.head_dim * num_heads == self.embed_dim, "embed_dim must be divisible by num_heads" | |
| self.scaling = self.head_dim ** -0.5 | |
| self.is_decoder = is_decoder | |
| self.k_proj = tf.keras.layers.Dense(embed_dim, use_bias=bias, name="k_proj") | |
| self.q_proj = tf.keras.layers.Dense(embed_dim, use_bias=bias, name="q_proj") | |
| self.v_proj = tf.keras.layers.Dense(embed_dim, use_bias=bias, name="v_proj") | |
| self.out_proj = tf.keras.layers.Dense(embed_dim, use_bias=bias, name="out_proj") | |
| def _shape(self, tensor: tf.Tensor, seq_len: int, bsz: int): | |
| return tf.transpose(tf.reshape(tensor, (bsz, seq_len, self.num_heads, self.head_dim)), (0, 2, 1, 3)) | |
| def call( | |
| self, | |
| hidden_states: tf.Tensor, | |
| key_value_states: Optional[tf.Tensor] = None, | |
| past_key_value: Optional[Tuple[Tuple[tf.Tensor]]] = None, | |
| attention_mask: Optional[tf.Tensor] = None, | |
| layer_head_mask: Optional[tf.Tensor] = None, | |
| training=False, | |
| ) -> Tuple[tf.Tensor, Optional[tf.Tensor]]: | |
| """Input shape: Batch x Time x Channel""" | |
| # if key_value_states are provided this layer is used as a cross-attention layer | |
| # for the decoder | |
| is_cross_attention = key_value_states is not None | |
| bsz, tgt_len, embed_dim = shape_list(hidden_states) | |
| # get query proj | |
| query_states = self.q_proj(hidden_states) * self.scaling | |
| # get key, value proj | |
| if is_cross_attention and past_key_value is not None: | |
| # reuse k,v, cross_attentions | |
| key_states = past_key_value[0] | |
| value_states = past_key_value[1] | |
| elif is_cross_attention: | |
| # cross_attentions | |
| key_states = self._shape(self.k_proj(key_value_states), -1, bsz) | |
| value_states = self._shape(self.v_proj(key_value_states), -1, bsz) | |
| elif past_key_value is not None: | |
| # reuse k, v, self_attention | |
| key_states = self._shape(self.k_proj(hidden_states), -1, bsz) | |
| value_states = self._shape(self.v_proj(hidden_states), -1, bsz) | |
| key_states = tf.concat([past_key_value[0], key_states], axis=2) | |
| value_states = tf.concat([past_key_value[1], value_states], axis=2) | |
| else: | |
| # self_attention | |
| key_states = self._shape(self.k_proj(hidden_states), -1, bsz) | |
| value_states = self._shape(self.v_proj(hidden_states), -1, bsz) | |
| if self.is_decoder: | |
| # if cross_attention save Tuple(tf.Tensor, tf.Tensor) of all cross attention key/value_states. | |
| # Further calls to cross_attention layer can then reuse all cross-attention | |
| # key/value_states (first "if" case) | |
| # if uni-directional self-attention (decoder) save Tuple(tf.Tensor, tf.Tensor) of | |
| # all previous decoder key/value_states. Further calls to uni-directional self-attention | |
| # can concat previous decoder key/value_states to current projected key/value_states (third "elif" case) | |
| # if encoder bi-directional self-attention `past_key_value` is always `None` | |
| past_key_value = (key_states, value_states) | |
| proj_shape = (bsz * self.num_heads, -1, self.head_dim) | |
| query_states = tf.reshape(self._shape(query_states, tgt_len, bsz), proj_shape) | |
| key_states = tf.reshape(key_states, proj_shape) | |
| value_states = tf.reshape(value_states, proj_shape) | |
| src_len = shape_list(key_states)[1] | |
| attn_weights = tf.matmul(query_states, key_states, transpose_b=True) | |
| # The tf.debugging asserts are not compliant with XLA then they | |
| # have to be disabled in other modes than eager. | |
| if tf.executing_eagerly(): | |
| tf.debugging.assert_equal( | |
| shape_list(attn_weights), | |
| [bsz * self.num_heads, tgt_len, src_len], | |
| message=f"Attention weights should be of size {(bsz * self.num_heads, tgt_len, src_len)}, but is {shape_list(attn_weights)}", | |
| ) | |
| if attention_mask is not None: | |
| # The tf.debugging asserts are not compliant with XLA then they | |
| # have to be disabled in other modes than eager. | |
| if tf.executing_eagerly(): | |
| tf.debugging.assert_equal( | |
| shape_list(attention_mask), | |
| [bsz, 1, tgt_len, src_len], | |
| message=f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is {shape_list(attention_mask)}", | |
| ) | |
| attention_mask = tf.cast(attention_mask, dtype=attn_weights.dtype) | |
| attn_weights = tf.reshape(attn_weights, (bsz, self.num_heads, tgt_len, src_len)) + attention_mask | |
| attn_weights = tf.reshape(attn_weights, (bsz * self.num_heads, tgt_len, src_len)) | |
| attn_weights = tf.nn.softmax(attn_weights, axis=-1) | |
| if layer_head_mask is not None: | |
| # The tf.debugging asserts are not compliant with XLA then they | |
| # have to be disabled in other modes than eager. | |
| if tf.executing_eagerly(): | |
| tf.debugging.assert_equal( | |
| shape_list(layer_head_mask), | |
| [self.num_heads], | |
| message=f"Head mask for a single layer should be of size {(self.num_heads)}, but is {shape_list(layer_head_mask)}", | |
| ) | |
| attn_weights = tf.reshape(layer_head_mask, (1, -1, 1, 1)) * tf.reshape( | |
| attn_weights, (bsz, self.num_heads, tgt_len, src_len) | |
| ) | |
| attn_weights = tf.reshape(attn_weights, (bsz * self.num_heads, tgt_len, src_len)) | |
| attn_probs = self.dropout(attn_weights, training=training) | |
| attn_output = tf.matmul(attn_probs, value_states) | |
| # The tf.debugging asserts are not compliant with XLA then they | |
| # have to be disabled in other modes than eager. | |
| if tf.executing_eagerly(): | |
| tf.debugging.assert_equal( | |
| shape_list(attn_output), | |
| [bsz * self.num_heads, tgt_len, self.head_dim], | |
| message=f"`attn_output` should be of size {(bsz, self.num_heads, tgt_len, self.head_dim)}, but is {shape_list(attn_output)}", | |
| ) | |
| attn_output = tf.transpose( | |
| tf.reshape(attn_output, (bsz, self.num_heads, tgt_len, self.head_dim)), (0, 2, 1, 3) | |
| ) | |
| attn_output = tf.reshape(attn_output, (bsz, tgt_len, embed_dim)) | |
| attn_output = self.out_proj(attn_output) | |
| attn_weights: tf.Tensor = tf.reshape(attn_weights, (bsz, self.num_heads, tgt_len, src_len)) | |
| return attn_output, attn_weights, past_key_value | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2FeedForward with Wav2Vec2->Hubert | |
| class TFHubertFeedForward(tf.keras.layers.Layer): | |
| def __init__(self, config: HubertConfig, **kwargs): | |
| super().__init__(**kwargs) | |
| self.intermediate_dropout = tf.keras.layers.Dropout(config.activation_dropout) | |
| self.intermediate_dense = tf.keras.layers.Dense( | |
| units=config.intermediate_size, | |
| kernel_initializer=get_initializer(config.initializer_range), | |
| bias_initializer="zeros", | |
| name="intermediate_dense", | |
| ) | |
| self.intermediate_act_fn = get_tf_activation(config.hidden_act) | |
| self.output_dense = tf.keras.layers.Dense( | |
| units=config.hidden_size, | |
| kernel_initializer=get_initializer(config.initializer_range), | |
| bias_initializer="zeros", | |
| name="output_dense", | |
| ) | |
| self.output_dropout = tf.keras.layers.Dropout(config.hidden_dropout) | |
| def call(self, hidden_states: tf.Tensor, training: bool = False) -> tf.Tensor: | |
| hidden_states = self.intermediate_dense(hidden_states) | |
| hidden_states = self.intermediate_act_fn(hidden_states) | |
| hidden_states = self.intermediate_dropout(hidden_states, training=training) | |
| hidden_states = self.output_dense(hidden_states) | |
| hidden_states = self.output_dropout(hidden_states, training=training) | |
| return hidden_states | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2EncoderLayer with Wav2Vec2->Hubert | |
| class TFHubertEncoderLayer(tf.keras.layers.Layer): | |
| def __init__(self, config: HubertConfig, **kwargs): | |
| super().__init__(**kwargs) | |
| self.attention = TFHubertAttention( | |
| embed_dim=config.hidden_size, | |
| num_heads=config.num_attention_heads, | |
| dropout=config.attention_dropout, | |
| is_decoder=False, | |
| name="attention", | |
| ) | |
| self.dropout = tf.keras.layers.Dropout(config.hidden_dropout) | |
| self.layer_norm = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="layer_norm") | |
| self.feed_forward = TFHubertFeedForward(config, name="feed_forward") | |
| self.final_layer_norm = tf.keras.layers.LayerNormalization( | |
| epsilon=config.layer_norm_eps, name="final_layer_norm" | |
| ) | |
| def call( | |
| self, | |
| hidden_states: tf.Tensor, | |
| attention_mask: Optional[tf.Tensor] = None, | |
| output_attentions: Optional[bool] = False, | |
| training: bool = False, | |
| ) -> Tuple[tf.Tensor]: | |
| attn_residual = hidden_states | |
| hidden_states, attn_weights, _ = self.attention( | |
| hidden_states, attention_mask=attention_mask, training=training | |
| ) | |
| hidden_states = self.dropout(hidden_states, training=training) | |
| hidden_states = attn_residual + hidden_states | |
| hidden_states = self.layer_norm(hidden_states) | |
| hidden_states = hidden_states + self.feed_forward(hidden_states) | |
| hidden_states = self.final_layer_norm(hidden_states) | |
| outputs = (hidden_states,) | |
| if output_attentions: | |
| outputs += (attn_weights,) | |
| return outputs | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2EncoderLayerStableLayerNorm with Wav2Vec2->Hubert | |
| class TFHubertEncoderLayerStableLayerNorm(tf.keras.layers.Layer): | |
| def __init__(self, config: HubertConfig, **kwargs): | |
| super().__init__(**kwargs) | |
| self.attention = TFHubertAttention( | |
| embed_dim=config.hidden_size, | |
| num_heads=config.num_attention_heads, | |
| dropout=config.attention_dropout, | |
| is_decoder=False, | |
| name="attention", | |
| ) | |
| self.dropout = tf.keras.layers.Dropout(config.hidden_dropout) | |
| self.layer_norm = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="layer_norm") | |
| self.feed_forward = TFHubertFeedForward(config, name="feed_forward") | |
| self.final_layer_norm = tf.keras.layers.LayerNormalization( | |
| epsilon=config.layer_norm_eps, name="final_layer_norm" | |
| ) | |
| def call( | |
| self, | |
| hidden_states: tf.Tensor, | |
| attention_mask: Optional[tf.Tensor] = None, | |
| output_attentions: Optional[bool] = False, | |
| training: bool = False, | |
| ) -> Tuple[tf.Tensor]: | |
| attn_residual = hidden_states | |
| hidden_states = self.layer_norm(hidden_states) | |
| hidden_states, attn_weights, _ = self.attention( | |
| hidden_states, attention_mask=attention_mask, training=training | |
| ) | |
| hidden_states = self.dropout(hidden_states, training=training) | |
| hidden_states = attn_residual + hidden_states | |
| hidden_states = hidden_states + self.feed_forward(self.final_layer_norm(hidden_states)) | |
| outputs = (hidden_states,) | |
| if output_attentions: | |
| outputs += (attn_weights,) | |
| return outputs | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2Encoder with Wav2Vec2->Hubert | |
| class TFHubertEncoder(tf.keras.layers.Layer): | |
| def __init__(self, config: HubertConfig, **kwargs): | |
| super().__init__(**kwargs) | |
| self.config = config | |
| self.pos_conv_embed = TFHubertPositionalConvEmbedding(config, name="pos_conv_embed") | |
| self.layer_norm = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="layer_norm") | |
| self.dropout = tf.keras.layers.Dropout(config.hidden_dropout) | |
| self.layer = [TFHubertEncoderLayer(config, name=f"layers.{i}") for i in range(config.num_hidden_layers)] | |
| def call( | |
| self, | |
| hidden_states: tf.Tensor, | |
| attention_mask: Optional[tf.Tensor] = None, | |
| output_attentions: Optional[bool] = False, | |
| output_hidden_states: Optional[bool] = False, | |
| return_dict: Optional[bool] = True, | |
| training: Optional[bool] = False, | |
| ) -> Union[TFBaseModelOutput, Tuple[tf.Tensor]]: | |
| all_hidden_states = () if output_hidden_states else None | |
| all_self_attentions = () if output_attentions else None | |
| if attention_mask is not None: | |
| hidden_states = hidden_states * tf.expand_dims(attention_mask, -1) | |
| attention_mask = _expand_mask(attention_mask) | |
| else: | |
| attention_mask = None | |
| position_embeddings = self.pos_conv_embed(hidden_states) | |
| hidden_states = hidden_states + position_embeddings | |
| hidden_states = self.layer_norm(hidden_states) | |
| hidden_states = self.dropout(hidden_states, training=training) | |
| for i, layer_module in enumerate(self.layer): | |
| if output_hidden_states: | |
| all_hidden_states = all_hidden_states + (hidden_states,) | |
| # add LayerDrop (see https://arxiv.org/abs/1909.11556 for description) | |
| dropout_probability = np.random.uniform(0, 1) | |
| if training and (dropout_probability < self.config.layerdrop): # skip the layer | |
| continue | |
| layer_outputs = layer_module( | |
| hidden_states=hidden_states, | |
| attention_mask=attention_mask, | |
| output_attentions=output_attentions, | |
| training=training, | |
| ) | |
| hidden_states = layer_outputs[0] | |
| if output_attentions: | |
| all_self_attentions = all_self_attentions + (layer_outputs[1],) | |
| # Add last layer | |
| if output_hidden_states: | |
| all_hidden_states = all_hidden_states + (hidden_states,) | |
| if not return_dict: | |
| return tuple(v for v in [hidden_states, all_hidden_states, all_self_attentions] if v is not None) | |
| return TFBaseModelOutput( | |
| last_hidden_state=hidden_states, | |
| hidden_states=all_hidden_states, | |
| attentions=all_self_attentions, | |
| ) | |
| # Copied from transformers.models.wav2vec2.modeling_tf_wav2vec2.TFWav2Vec2EncoderStableLayerNorm with Wav2Vec2->Hubert | |
| class TFHubertEncoderStableLayerNorm(tf.keras.layers.Layer): | |
| def __init__(self, config: HubertConfig, **kwargs): | |
| super().__init__(**kwargs) | |
| self.config = config | |
| self.pos_conv_embed = TFHubertPositionalConvEmbedding(config, name="pos_conv_embed") | |
| self.layer_norm = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="layer_norm") | |
| self.dropout = tf.keras.layers.Dropout(config.hidden_dropout) | |
| self.layer = [ | |
| TFHubertEncoderLayerStableLayerNorm(config, name=f"layers.{i}") for i in range(config.num_hidden_layers) | |
| ] | |
| def call( | |
| self, | |
| hidden_states: tf.Tensor, | |
| attention_mask: Optional[tf.Tensor] = None, | |
| output_attentions: Optional[bool] = False, | |
| output_hidden_states: Optional[bool] = False, | |
| return_dict: Optional[bool] = True, | |
| training: Optional[bool] = False, | |
| ) -> Union[TFBaseModelOutput, Tuple[tf.Tensor]]: | |
| all_hidden_states = () if output_hidden_states else None | |
| all_self_attentions = () if output_attentions else None | |
| if attention_mask is not None: | |
| hidden_states = hidden_states * tf.expand_dims(attention_mask, -1) | |
| attention_mask = _expand_mask(attention_mask) | |
| else: | |
| attention_mask = None | |
| position_embeddings = self.pos_conv_embed(hidden_states) | |
| hidden_states = hidden_states + position_embeddings | |
| hidden_states = self.dropout(hidden_states, training=training) | |
| for i, layer_module in enumerate(self.layer): | |
| if output_hidden_states: | |
| all_hidden_states = all_hidden_states + (hidden_states,) | |
| # add LayerDrop (see https://arxiv.org/abs/1909.11556 for description) | |
| dropout_probability = np.random.uniform(0, 1) | |
| if training and (dropout_probability < self.config.layerdrop): # skip the layer | |
| continue | |
| layer_outputs = layer_module( | |
| hidden_states=hidden_states, | |
| attention_mask=attention_mask, | |
| output_attentions=output_attentions, | |
| training=training, | |
| ) | |
| hidden_states = layer_outputs[0] | |
| if output_attentions: | |
| all_self_attentions = all_self_attentions + (layer_outputs[1],) | |
| hidden_states = self.layer_norm(hidden_states) | |
| if output_hidden_states: | |
| all_hidden_states = all_hidden_states + (hidden_states,) | |
| if not return_dict: | |
| return tuple(v for v in [hidden_states, all_hidden_states, all_self_attentions] if v is not None) | |
| return TFBaseModelOutput( | |
| last_hidden_state=hidden_states, | |
| hidden_states=all_hidden_states, | |
| attentions=all_self_attentions, | |
| ) | |
| class TFHubertMainLayer(tf.keras.layers.Layer): | |
| config_class = HubertConfig | |
| def __init__(self, config: HubertConfig, **kwargs): | |
| super().__init__(**kwargs) | |
| self.config = config | |
| self.feature_extractor = TFHubertFeatureExtractor(config, name="feature_extractor") | |
| self.feature_projection = TFHubertFeatureProjection(config, name="feature_projection") | |
| if config.do_stable_layer_norm: | |
| self.encoder = TFHubertEncoderStableLayerNorm(config, name="encoder") | |
| else: | |
| self.encoder = TFHubertEncoder(config, name="encoder") | |
| def build(self, input_shape: tf.TensorShape): | |
| self.masked_spec_embed = self.add_weight( | |
| shape=(self.config.hidden_size,), initializer="uniform", trainable=True, name="masked_spec_embed" | |
| ) | |
| super().build(input_shape) | |
| def _get_feat_extract_output_lengths(self, input_lengths: tf.Tensor): | |
| """ | |
| Computes the output length of the convolutional layers | |
| """ | |
| def _conv_out_length(input_length, kernel_size, stride): | |
| # 1D convolutional layer output length formula taken | |
| # from https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html | |
| return (input_length - kernel_size) // stride + 1 | |
| for kernel_size, stride in zip(self.config.conv_kernel, self.config.conv_stride): | |
| input_lengths = _conv_out_length(input_lengths, kernel_size, stride) | |
| return input_lengths | |
| def _mask_hidden_states(self, hidden_states: tf.Tensor, mask_time_indices: Optional[tf.Tensor] = None): | |
| """ | |
| Masks extracted features along time axis and/or along feature axis according to `SpecAugment | |
| <https://arxiv.org/abs/1904.08779>`__ . | |
| """ | |
| batch_size, sequence_length, hidden_size = shape_list(hidden_states) | |
| # `config.apply_spec_augment` can set masking to False | |
| if not getattr(self.config, "apply_spec_augment", True): | |
| return hidden_states | |
| if mask_time_indices is not None: | |
| # apply SpecAugment along time axis with given mask_time_indices | |
| hidden_states = tf.where( | |
| tf.cast(mask_time_indices[:, :, tf.newaxis], tf.bool), | |
| self.masked_spec_embed[tf.newaxis, tf.newaxis, :], | |
| hidden_states, | |
| ) | |
| elif self.config.mask_time_prob > 0: | |
| # generate indices & apply SpecAugment along time axis | |
| mask_time_indices = _compute_mask_indices( | |
| (batch_size, sequence_length), | |
| mask_prob=self.config.mask_time_prob, | |
| mask_length=self.config.mask_time_length, | |
| min_masks=2, | |
| ) | |
| hidden_states = tf.where( | |
| tf.cast(mask_time_indices[:, :, tf.newaxis], tf.bool), | |
| self.masked_spec_embed[tf.newaxis, tf.newaxis, :], | |
| hidden_states, | |
| ) | |
| # apply SpecAugment along feature axis | |
| if self.config.mask_feature_prob > 0: | |
| mask_feature_indices = _compute_mask_indices( | |
| (batch_size, hidden_size), | |
| mask_prob=self.config.mask_feature_prob, | |
| mask_length=self.config.mask_feature_length, | |
| ) | |
| hidden_states = tf.where(mask_feature_indices[:, tf.newaxis, :], hidden_states, 0) | |
| return hidden_states | |
| def call( | |
| self, | |
| input_values: tf.Tensor, | |
| attention_mask: Optional[tf.Tensor] = None, | |
| token_type_ids: Optional[tf.Tensor] = None, | |
| position_ids: Optional[tf.Tensor] = None, | |
| head_mask: Optional[tf.Tensor] = None, | |
| inputs_embeds: Optional[tf.Tensor] = None, | |
| output_attentions: Optional[tf.Tensor] = None, | |
| output_hidden_states: Optional[tf.Tensor] = None, | |
| return_dict: Optional[bool] = None, | |
| training: bool = False, | |
| **kwargs: Any, | |
| ): | |
| inputs = input_values_processing( | |
| func=self.call, | |
| config=self.config, | |
| input_values=input_values, | |
| attention_mask=attention_mask, | |
| token_type_ids=token_type_ids, | |
| position_ids=position_ids, | |
| head_mask=head_mask, | |
| inputs_embeds=inputs_embeds, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| training=training, | |
| kwargs_call=kwargs, | |
| ) | |
| hidden_states = self.feature_extractor( | |
| tf.cast(inputs["input_values"], tf.float32), training=inputs["training"] | |
| ) | |
| if inputs["attention_mask"] is not None: | |
| # compute real output lengths according to convolution formula | |
| output_lengths = self._get_feat_extract_output_lengths(tf.reduce_sum(inputs["attention_mask"], -1)) | |
| attention_mask = tf.sequence_mask(output_lengths, dtype=hidden_states.dtype) | |
| hidden_states = self.feature_projection(hidden_states, training=inputs["training"]) | |
| mask_time_indices = kwargs.get("mask_time_indices", None) | |
| if inputs["training"]: | |
| hidden_states = self._mask_hidden_states(hidden_states, mask_time_indices=mask_time_indices) | |
| encoder_outputs = self.encoder( | |
| hidden_states, | |
| attention_mask=attention_mask, | |
| output_attentions=inputs["output_attentions"], | |
| output_hidden_states=inputs["output_hidden_states"], | |
| return_dict=inputs["return_dict"], | |
| training=inputs["training"], | |
| ) | |
| hidden_states = encoder_outputs[0] | |
| if not inputs["return_dict"]: | |
| return (hidden_states,) + encoder_outputs[1:] | |
| return TFBaseModelOutput( | |
| last_hidden_state=hidden_states, | |
| hidden_states=encoder_outputs.hidden_states, | |
| attentions=encoder_outputs.attentions, | |
| ) | |
| class TFHubertPreTrainedModel(TFPreTrainedModel): | |
| """ | |
| An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained | |
| models. | |
| """ | |
| config_class = HubertConfig | |
| base_model_prefix = "hubert" | |
| def dummy_inputs(self) -> Dict[str, tf.Tensor]: | |
| pad_token = 0.0 | |
| input_values = tf.convert_to_tensor(np.random.rand(1, 16000), tf.float32) | |
| dummy_inputs = { | |
| "input_values": input_values, | |
| "attention_mask": tf.cast(tf.not_equal(input_values, pad_token), tf.float32), | |
| } | |
| return dummy_inputs | |
| def serving(self, inputs): | |
| output = self.call(input_values=inputs, training=False) | |
| return self.serving_output(output) | |
| HUBERT_START_DOCSTRING = r""" | |
| This model inherits from :class:`~transformers.TFPreTrainedModel`. Check the superclass documentation for the | |
| generic methods the library implements for all its model (such as downloading or saving, resizing the input | |
| embeddings, pruning heads etc.) | |
| This model is also a `tf.keras.Model <https://www.tensorflow.org/api_docs/python/tf/keras/Model>`__ subclass. Use | |
| it as a regular TF 2.0 Keras Model and refer to the TF 2.0 documentation for all matter related to general usage | |
| and behavior. | |
| .. note:: | |
| TF 2.0 models accepts two formats as inputs: | |
| - having all inputs as keyword arguments (like PyTorch models), or | |
| - having all inputs as a list, tuple or dict in the first positional arguments. | |
| This second option is useful when using :meth:`tf.keras.Model.fit` method which currently requires having all | |
| the tensors in the first argument of the model call function: :obj:`model(inputs)`. | |
| If you choose this second option, there are three possibilities you can use to gather all the input Tensors in | |
| the first positional argument : | |
| - a single Tensor with :obj:`input_values` only and nothing else: :obj:`model(inputs_ids)` | |
| - a list of varying length with one or several input Tensors IN THE ORDER given in the docstring: | |
| :obj:`model([input_values, attention_mask])` or :obj:`model([input_values, attention_mask, token_type_ids])` | |
| - a dictionary with one or several input Tensors associated to the input names given in the docstring: | |
| :obj:`model({"input_values": input_values, "token_type_ids": token_type_ids})` | |
| Args: | |
| config (:class:`~transformers.HubertConfig`): Model configuration class with all the parameters of the model. | |
| Initializing with a config file does not load the weights associated with the model, only the | |
| configuration. Check out the :meth:`~transformers.PreTrainedModel.from_pretrained` method to load the model | |
| weights. | |
| """ | |
| HUBERT_INPUTS_DOCSTRING = r""" | |
| Args: | |
| input_values (:obj:`np.ndarray`, :obj:`tf.Tensor`, :obj:`List[tf.Tensor]` :obj:`Dict[str, tf.Tensor]` or :obj:`Dict[str, np.ndarray]` and each example must have the shape :obj:`({0})`): | |
| Indices of input sequence tokens in the vocabulary. | |
| Indices can be obtained using :class:`~transformers.BertTokenizer`. See | |
| :func:`transformers.PreTrainedTokenizer.__call__` and :func:`transformers.PreTrainedTokenizer.encode` for | |
| details. | |
| `What are input IDs? <../glossary.html#input-ids>`__ | |
| attention_mask (:obj:`np.ndarray` or :obj:`tf.Tensor` of shape :obj:`({0})`, `optional`): | |
| Mask to avoid performing attention on padding token indices. Mask values selected in ``[0, 1]``: | |
| - 1 for tokens that are **not masked**, | |
| - 0 for tokens that are **masked**. | |
| `What are attention masks? <../glossary.html#attention-mask>`__ | |
| token_type_ids (:obj:`np.ndarray` or :obj:`tf.Tensor` of shape :obj:`({0})`, `optional`): | |
| Segment token indices to indicate first and second portions of the inputs. Indices are selected in ``[0, | |
| 1]``: | |
| - 0 corresponds to a `sentence A` token, | |
| - 1 corresponds to a `sentence B` token. | |
| `What are token type IDs? <../glossary.html#token-type-ids>`__ | |
| position_ids (:obj:`np.ndarray` or :obj:`tf.Tensor` of shape :obj:`({0})`, `optional`): | |
| Indices of positions of each input sequence tokens in the position embeddings. Selected in the range ``[0, | |
| config.max_position_embeddings - 1]``. | |
| `What are position IDs? <../glossary.html#position-ids>`__ | |
| head_mask (:obj:`np.ndarray` or :obj:`tf.Tensor` of shape :obj:`(num_heads,)` or :obj:`(num_layers, num_heads)`, `optional`): | |
| Mask to nullify selected heads of the self-attention modules. Mask values selected in ``[0, 1]``: | |
| - 1 indicates the head is **not masked**, | |
| - 0 indicates the head is **masked**. | |
| inputs_embeds (:obj:`np.ndarray` or :obj:`tf.Tensor` of shape :obj:`({0}, hidden_size)`, `optional`): | |
| Optionally, instead of passing :obj:`input_values` you can choose to directly pass an embedded | |
| representation. This is useful if you want more control over how to convert :obj:`input_values` indices | |
| into associated vectors than the model's internal embedding lookup matrix. | |
| output_attentions (:obj:`bool`, `optional`): | |
| Whether or not to return the attentions tensors of all attention layers. See ``attentions`` under returned | |
| tensors for more detail. This argument can be used only in eager mode, in graph mode the value in the | |
| config will be used instead. | |
| output_hidden_states (:obj:`bool`, `optional`): | |
| Whether or not to return the hidden states of all layers. See ``hidden_states`` under returned tensors for | |
| more detail. This argument can be used only in eager mode, in graph mode the value in the config will be | |
| used instead. | |
| return_dict (:obj:`bool`, `optional`): | |
| Whether or not to return a :class:`~transformers.file_utils.ModelOutput` instead of a plain tuple. This | |
| argument can be used in eager mode, in graph mode the value will always be set to True. | |
| training (:obj:`bool`, `optional`, defaults to :obj:`False`): | |
| Whether or not to use the model in training mode (some modules like dropout modules have different | |
| behaviors between training and evaluation). | |
| """ | |
| class TFHubertModel(TFHubertPreTrainedModel): | |
| def __init__(self, config: HubertConfig, *inputs, **kwargs): | |
| super().__init__(config, *inputs, **kwargs) | |
| self.config = config | |
| self.hubert = TFHubertMainLayer(config, name="hubert") | |
| def call( | |
| self, | |
| input_values: tf.Tensor, | |
| attention_mask: Optional[tf.Tensor] = None, | |
| token_type_ids: Optional[tf.Tensor] = None, | |
| position_ids: Optional[tf.Tensor] = None, | |
| head_mask: Optional[tf.Tensor] = None, | |
| inputs_embeds: Optional[tf.Tensor] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| training: bool = False, | |
| ) -> Union[TFBaseModelOutput, Tuple[tf.Tensor]]: | |
| """ | |
| Returns: | |
| Example:: | |
| >>> from transformers import Wav2Vec2Processor, TFHubertModel | |
| >>> from datasets import load_dataset | |
| >>> import soundfile as sf | |
| >>> processor = Wav2Vec2Processor.from_pretrained("facebook/hubert-base-960h") | |
| >>> model = TFHubertModel.from_pretrained("facebook/hubert-base-960h") | |
| >>> def map_to_array(batch): | |
| ... speech, _ = sf.read(batch["file"]) | |
| ... batch["speech"] = speech | |
| ... return batch | |
| >>> ds = load_dataset("patrickvonplaten/librispeech_asr_dummy", "clean", split="validation") | |
| >>> ds = ds.map(map_to_array) | |
| >>> input_values = processor(ds["speech"][0], return_tensors="tf").input_values # Batch size 1 | |
| >>> hidden_states = model(input_values).last_hidden_state | |
| """ | |
| inputs = input_values_processing( | |
| func=self.call, | |
| config=self.config, | |
| input_values=input_values, | |
| attention_mask=attention_mask, | |
| token_type_ids=token_type_ids, | |
| position_ids=position_ids, | |
| head_mask=head_mask, | |
| inputs_embeds=inputs_embeds, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| training=training, | |
| ) | |
| inputs["output_hidden_states"] = ( | |
| inputs["output_hidden_states"] if inputs["output_hidden_states"] else self.config.output_hidden_states | |
| ) | |
| inputs["output_attentions"] = ( | |
| inputs["output_attentions"] if inputs["output_attentions"] else self.config.output_attentions | |
| ) | |
| inputs["return_dict"] = inputs["return_dict"] if inputs["return_dict"] else self.config.return_dict | |
| outputs = self.hubert( | |
| input_values=inputs["input_values"], | |
| attention_mask=inputs["attention_mask"], | |
| token_type_ids=inputs["token_type_ids"], | |
| position_ids=inputs["position_ids"], | |
| head_mask=inputs["head_mask"], | |
| inputs_embeds=inputs["inputs_embeds"], | |
| output_attentions=inputs["output_attentions"], | |
| output_hidden_states=inputs["output_hidden_states"], | |
| return_dict=inputs["return_dict"], | |
| training=inputs["training"], | |
| ) | |
| return outputs | |
| def serving_output(self, output): | |
| hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
| attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
| return TFBaseModelOutput(last_hidden_state=output.last_hidden_state, hidden_states=hs, attentions=attns) | |
| class TFHubertForCTC(TFHubertPreTrainedModel): | |
| def __init__(self, config: HubertConfig, *inputs, **kwargs): | |
| super().__init__(config, *inputs, **kwargs) | |
| self.hubert = TFHubertMainLayer(config, name="hubert") | |
| self.dropout = tf.keras.layers.Dropout(config.final_dropout) | |
| self.lm_head = tf.keras.layers.Dense(config.vocab_size, name="lm_head") | |
| def freeze_feature_extractor(self): | |
| """ | |
| Calling this function will disable the gradient computation for the feature extractor so that its parameter | |
| will not be updated during training. | |
| """ | |
| self.hubert.feature_extractor.trainable = False | |
| def call( | |
| self, | |
| input_values: tf.Tensor, | |
| attention_mask: Optional[tf.Tensor] = None, | |
| token_type_ids: Optional[tf.Tensor] = None, | |
| position_ids: Optional[tf.Tensor] = None, | |
| head_mask: Optional[tf.Tensor] = None, | |
| inputs_embeds: Optional[tf.Tensor] = None, | |
| output_attentions: Optional[bool] = None, | |
| labels: Optional[tf.Tensor] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| training: Optional[bool] = False, | |
| ) -> Union[TFCausalLMOutput, Tuple[tf.Tensor]]: | |
| r""" | |
| labels (:obj:`tf.Tensor` or :obj:`np.ndarray` of shape :obj:`(batch_size, sequence_length)`, `optional`): | |
| Labels for computing the masked language modeling loss. Indices should be in ``[-100, 0, ..., | |
| config.vocab_size]`` (see ``input_values`` docstring) Tokens with indices set to ``-100`` are ignored | |
| (masked), the loss is only computed for the tokens with labels in ``[0, ..., config.vocab_size]`` | |
| Returns: | |
| Example:: | |
| >>> import tensorflow as tf | |
| >>> from transformers import Wav2Vec2Processor, TFHubertForCTC | |
| >>> from datasets import load_dataset | |
| >>> import soundfile as sf | |
| >>> processor = Wav2Vec2Processor.from_pretrained("facebook/hubert-base-960h") | |
| >>> model = TFHubertForCTC.from_pretrained("facebook/hubert-base-960h") | |
| >>> def map_to_array(batch): | |
| ... speech, _ = sf.read(batch["file"]) | |
| ... batch["speech"] = speech | |
| ... return batch | |
| >>> ds = load_dataset("patrickvonplaten/librispeech_asr_dummy", "clean", split="validation") | |
| >>> ds = ds.map(map_to_array) | |
| >>> input_values = processor(ds["speech"][0], return_tensors="tf").input_values # Batch size 1 | |
| >>> logits = model(input_values).logits >>> predicted_ids = tf.argmax(logits, axis=-1) | |
| >>> transcription = processor.decode(predicted_ids[0]) | |
| >>> # compute loss | |
| >>> target_transcription = "A MAN SAID TO THE UNIVERSE SIR I EXIST" | |
| >>> # wrap processor as target processor to encode labels | |
| >>> with processor.as_target_processor(): | |
| ... labels = processor(transcription, return_tensors="tf").input_values | |
| >>> loss = model(input_values, labels=labels).loss | |
| """ | |
| inputs = input_values_processing( | |
| func=self.call, | |
| config=self.config, | |
| input_values=input_values, | |
| attention_mask=attention_mask, | |
| token_type_ids=token_type_ids, | |
| position_ids=position_ids, | |
| head_mask=head_mask, | |
| inputs_embeds=inputs_embeds, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| training=training, | |
| ) | |
| outputs = self.hubert( | |
| input_values=inputs["input_values"], | |
| attention_mask=inputs["attention_mask"], | |
| token_type_ids=inputs["token_type_ids"], | |
| position_ids=inputs["position_ids"], | |
| head_mask=inputs["head_mask"], | |
| inputs_embeds=inputs["inputs_embeds"], | |
| output_attentions=inputs["output_attentions"], | |
| output_hidden_states=inputs["output_hidden_states"], | |
| return_dict=inputs["return_dict"], | |
| training=inputs["training"], | |
| ) | |
| hidden_states = outputs[0] | |
| hidden_states = self.dropout(hidden_states, training=inputs["training"]) | |
| logits = self.lm_head(hidden_states) | |
| if labels is not None: | |
| if tf.reduce_max(labels) >= self.config.vocab_size: | |
| raise ValueError(f"Label values must be <= vocab_size: {self.config.vocab_size}") | |
| attention_mask = ( | |
| inputs["attention_mask"] | |
| if inputs["attention_mask"] is not None | |
| else tf.ones_like(inputs["input_values"], dtype=tf.float32) | |
| ) | |
| input_lengths = self.hubert._get_feat_extract_output_lengths(tf.reduce_sum(attention_mask, axis=-1)) | |
| # assuming that padded tokens are filled with -100 | |
| # when not being attended to | |
| labels_mask = tf.cast(labels >= 0, tf.int32) | |
| target_lengths = tf.reduce_sum(labels_mask, axis=-1) | |
| loss = tf.nn.ctc_loss( | |
| logits=logits, | |
| labels=labels, | |
| logit_length=input_lengths, | |
| label_length=target_lengths, | |
| blank_index=self.config.pad_token_id, | |
| logits_time_major=False, | |
| ) | |
| if self.config.ctc_loss_reduction == "sum": | |
| loss = tf.reduce_sum(loss) | |
| if self.config.ctc_loss_reduction == "mean": | |
| loss = tf.reduce_mean(loss) | |
| else: | |
| loss = None | |
| if not inputs["return_dict"]: | |
| output = (logits,) + outputs[1:] | |
| return ((loss,) + output) if loss is not None else output | |
| return TFCausalLMOutput( | |
| loss=loss, | |
| logits=logits, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| def serving_output(self, output: TFCausalLMOutput) -> TFCausalLMOutput: | |
| hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None | |
| attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None | |
| return TFCausalLMOutput(logits=output.logits, hidden_states=hs, attentions=attns) | |