# coding=utf-8 # Copyright 2021 The Fairseq Authors and the HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ PyTorch Hubert model. """ from typing import Optional, Tuple, Union import numpy as np import torch import torch.utils.checkpoint from torch import nn from transformers.deepspeed import is_deepspeed_zero3_enabled from ...activations import ACT2FN from ...file_utils import add_start_docstrings, add_start_docstrings_to_model_forward, replace_return_docstrings from ...modeling_outputs import BaseModelOutput, CausalLMOutput from ...modeling_utils import PreTrainedModel from ...utils import logging from .configuration_hubert import HubertConfig logger = logging.get_logger(__name__) _CONFIG_FOR_DOC = "HubertConfig" _CHECKPOINT_FOR_DOC = "facebook/hubert-base-ls960" HUBERT_PRETRAINED_MODEL_ARCHIVE_LIST = [ "facebook/hubert-base-ls960", # See all Hubert models at https://huggingface.co/models?filter=hubert ] # Copied from transformers.models.wav2vec2.modeling_wav2vec2._compute_mask_indices def _compute_mask_indices( shape: Tuple[int, int], mask_prob: float, mask_length: int, device: torch.device, attention_mask: Optional[torch.tensor] = None, min_masks: int = 0, ) -> torch.tensor: """ Computes random mask spans for a given shape. Used to implement `SpecAugment: A Simple Data Augmentation Method for ASR `__. Args: shape: the the shape for which to compute masks. should be of size 2 where first element is batch size and 2nd is timesteps mask_prob: probability for each token to be chosen as start of the span to be masked. this will be multiplied by number of timesteps divided by length of mask span to mask approximately this percentage of all elements. however due to overlaps, the actual number will be smaller (unless no_overlap is True) mask_length: size of the mask min_masks: minimum number of masked spans """ batch_size, sequence_length = shape if mask_length < 1: raise ValueError("`mask_length` has to be bigger than 0.") if mask_length > sequence_length: raise ValueError( f"`mask_length` has to be smaller than `sequence_length`, but got `mask_length`: {mask_length} and `sequence_length`: {sequence_length}`" ) # compute number of masked spans in batch num_masked_spans = int(mask_prob * sequence_length / mask_length + torch.rand((1,)).item()) num_masked_spans = max(num_masked_spans, min_masks) # make sure num masked indices <= sequence_length if num_masked_spans * mask_length > sequence_length: num_masked_spans = sequence_length // mask_length # SpecAugment mask to fill spec_aug_mask = torch.zeros((batch_size, sequence_length), device=device, dtype=torch.bool) # uniform distribution to sample from, make sure that offset samples are < sequence_length uniform_dist = torch.ones((batch_size, sequence_length - (mask_length - 1)), device=device) # get random indices to mask spec_aug_mask_idxs = torch.multinomial(uniform_dist, num_masked_spans) # expand masked indices to masked spans spec_aug_mask_idxs = ( spec_aug_mask_idxs.unsqueeze(dim=-1) .expand((batch_size, num_masked_spans, mask_length)) .reshape(batch_size, num_masked_spans * mask_length) ) offsets = ( torch.arange(mask_length, device=device)[None, None, :] .expand((batch_size, num_masked_spans, mask_length)) .reshape(batch_size, num_masked_spans * mask_length) ) spec_aug_mask_idxs = spec_aug_mask_idxs + offsets # scatter indices to mask spec_aug_mask = spec_aug_mask.scatter(1, spec_aug_mask_idxs, True) return spec_aug_mask # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2NoLayerNormConvLayer with Wav2Vec2->Hubert class HubertNoLayerNormConvLayer(nn.Module): def __init__(self, config, layer_id=0): super().__init__() self.in_conv_dim = config.conv_dim[layer_id] if layer_id > 0 else 1 self.out_conv_dim = config.conv_dim[layer_id] self.conv = nn.Conv1d( self.in_conv_dim, self.out_conv_dim, kernel_size=config.conv_kernel[layer_id], stride=config.conv_stride[layer_id], bias=config.conv_bias, ) self.activation = ACT2FN[config.feat_extract_activation] def forward(self, hidden_states): hidden_states = self.conv(hidden_states) hidden_states = self.activation(hidden_states) return hidden_states # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2LayerNormConvLayer with Wav2Vec2->Hubert class HubertLayerNormConvLayer(nn.Module): def __init__(self, config, layer_id=0): super().__init__() self.in_conv_dim = config.conv_dim[layer_id] if layer_id > 0 else 1 self.out_conv_dim = config.conv_dim[layer_id] self.conv = nn.Conv1d( self.in_conv_dim, self.out_conv_dim, kernel_size=config.conv_kernel[layer_id], stride=config.conv_stride[layer_id], bias=config.conv_bias, ) self.layer_norm = nn.LayerNorm(self.out_conv_dim, elementwise_affine=True) self.activation = ACT2FN[config.feat_extract_activation] def forward(self, hidden_states): hidden_states = self.conv(hidden_states) hidden_states = hidden_states.transpose(-2, -1) hidden_states = self.layer_norm(hidden_states) hidden_states = hidden_states.transpose(-2, -1) hidden_states = self.activation(hidden_states) return hidden_states # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2GroupNormConvLayer with Wav2Vec2->Hubert class HubertGroupNormConvLayer(nn.Module): def __init__(self, config, layer_id=0): super().__init__() self.in_conv_dim = config.conv_dim[layer_id] if layer_id > 0 else 1 self.out_conv_dim = config.conv_dim[layer_id] self.conv = nn.Conv1d( self.in_conv_dim, self.out_conv_dim, kernel_size=config.conv_kernel[layer_id], stride=config.conv_stride[layer_id], bias=config.conv_bias, ) self.activation = ACT2FN[config.feat_extract_activation] self.layer_norm = nn.GroupNorm(num_groups=self.out_conv_dim, num_channels=self.out_conv_dim, affine=True) def forward(self, hidden_states): hidden_states = self.conv(hidden_states) hidden_states = self.layer_norm(hidden_states) hidden_states = self.activation(hidden_states) return hidden_states # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2PositionalConvEmbedding with Wav2Vec2->Hubert class HubertPositionalConvEmbedding(nn.Module): def __init__(self, config): super().__init__() self.conv = nn.Conv1d( config.hidden_size, config.hidden_size, kernel_size=config.num_conv_pos_embeddings, padding=config.num_conv_pos_embeddings // 2, groups=config.num_conv_pos_embedding_groups, ) if is_deepspeed_zero3_enabled(): import deepspeed with deepspeed.zero.GatheredParameters(self.conv.weight, modifier_rank=0): self.conv = nn.utils.weight_norm(self.conv, name="weight", dim=2) deepspeed.zero.register_external_parameter(self, self.conv.weight_v) deepspeed.zero.register_external_parameter(self, self.conv.weight_g) else: self.conv = nn.utils.weight_norm(self.conv, name="weight", dim=2) self.padding = HubertSamePadLayer(config.num_conv_pos_embeddings) self.activation = ACT2FN[config.feat_extract_activation] def forward(self, hidden_states): hidden_states = hidden_states.transpose(1, 2) hidden_states = self.conv(hidden_states) hidden_states = self.padding(hidden_states) hidden_states = self.activation(hidden_states) hidden_states = hidden_states.transpose(1, 2) return hidden_states # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2SamePadLayer with Wav2Vec2->Hubert class HubertSamePadLayer(nn.Module): def __init__(self, num_conv_pos_embeddings): super().__init__() self.num_pad_remove = 1 if num_conv_pos_embeddings % 2 == 0 else 0 def forward(self, hidden_states): if self.num_pad_remove > 0: hidden_states = hidden_states[:, :, : -self.num_pad_remove] return hidden_states # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2FeatureExtractor with Wav2Vec2->Hubert class HubertFeatureExtractor(nn.Module): """Construct the featurs from raw audio waveform""" def __init__(self, config): super().__init__() if config.feat_extract_norm == "group": conv_layers = [HubertGroupNormConvLayer(config, layer_id=0)] + [ HubertNoLayerNormConvLayer(config, layer_id=i + 1) for i in range(config.num_feat_extract_layers - 1) ] elif config.feat_extract_norm == "layer": conv_layers = [HubertLayerNormConvLayer(config, layer_id=i) for i in range(config.num_feat_extract_layers)] else: raise ValueError( f"`config.feat_extract_norm` is {config.feat_extract_norm}, but has to be one of ['group', 'layer']" ) self.conv_layers = nn.ModuleList(conv_layers) def _freeze_parameters(self): for param in self.parameters(): param.requires_grad = False def forward(self, input_values): hidden_states = input_values[:, None] for conv_layer in self.conv_layers: hidden_states = conv_layer(hidden_states) return hidden_states class HubertFeatureProjection(nn.Module): def __init__(self, config): super().__init__() self.layer_norm = nn.LayerNorm(config.conv_dim[-1], eps=config.layer_norm_eps) self.projection = nn.Linear(config.conv_dim[-1], config.hidden_size) self.dropout = nn.Dropout(config.feat_proj_dropout) def forward(self, hidden_states): # non-projected hidden states are needed for quantization hidden_states = self.layer_norm(hidden_states) hidden_states = self.projection(hidden_states) hidden_states = self.dropout(hidden_states) return hidden_states # Copied from transformers.models.bart.modeling_bart.BartAttention with Bart->Hubert class HubertAttention(nn.Module): """Multi-headed attention from 'Attention Is All You Need' paper""" def __init__( self, embed_dim: int, num_heads: int, dropout: float = 0.0, is_decoder: bool = False, bias: bool = True, ): super().__init__() self.embed_dim = embed_dim self.num_heads = num_heads self.dropout = dropout self.head_dim = embed_dim // num_heads assert ( self.head_dim * num_heads == self.embed_dim ), f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`: {num_heads})." self.scaling = self.head_dim ** -0.5 self.is_decoder = is_decoder self.k_proj = nn.Linear(embed_dim, embed_dim, bias=bias) self.v_proj = nn.Linear(embed_dim, embed_dim, bias=bias) self.q_proj = nn.Linear(embed_dim, embed_dim, bias=bias) self.out_proj = nn.Linear(embed_dim, embed_dim, bias=bias) def _shape(self, tensor: torch.Tensor, seq_len: int, bsz: int): return tensor.view(bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous() def forward( self, hidden_states: torch.Tensor, key_value_states: Optional[torch.Tensor] = None, past_key_value: Optional[Tuple[torch.Tensor]] = None, attention_mask: Optional[torch.Tensor] = None, layer_head_mask: Optional[torch.Tensor] = None, output_attentions: bool = False, ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: """Input shape: Batch x Time x Channel""" # if key_value_states are provided this layer is used as a cross-attention layer # for the decoder is_cross_attention = key_value_states is not None bsz, tgt_len, embed_dim = hidden_states.size() # get query proj query_states = self.q_proj(hidden_states) * self.scaling # get key, value proj if is_cross_attention and past_key_value is not None: # reuse k,v, cross_attentions key_states = past_key_value[0] value_states = past_key_value[1] elif is_cross_attention: # cross_attentions key_states = self._shape(self.k_proj(key_value_states), -1, bsz) value_states = self._shape(self.v_proj(key_value_states), -1, bsz) elif past_key_value is not None: # reuse k, v, self_attention key_states = self._shape(self.k_proj(hidden_states), -1, bsz) value_states = self._shape(self.v_proj(hidden_states), -1, bsz) key_states = torch.cat([past_key_value[0], key_states], dim=2) value_states = torch.cat([past_key_value[1], value_states], dim=2) else: # self_attention key_states = self._shape(self.k_proj(hidden_states), -1, bsz) value_states = self._shape(self.v_proj(hidden_states), -1, bsz) if self.is_decoder: # if cross_attention save Tuple(torch.Tensor, torch.Tensor) of all cross attention key/value_states. # Further calls to cross_attention layer can then reuse all cross-attention # key/value_states (first "if" case) # if uni-directional self-attention (decoder) save Tuple(torch.Tensor, torch.Tensor) of # all previous decoder key/value_states. Further calls to uni-directional self-attention # can concat previous decoder key/value_states to current projected key/value_states (third "elif" case) # if encoder bi-directional self-attention `past_key_value` is always `None` past_key_value = (key_states, value_states) proj_shape = (bsz * self.num_heads, -1, self.head_dim) query_states = self._shape(query_states, tgt_len, bsz).view(*proj_shape) key_states = key_states.view(*proj_shape) value_states = value_states.view(*proj_shape) src_len = key_states.size(1) attn_weights = torch.bmm(query_states, key_states.transpose(1, 2)) if attn_weights.size() != (bsz * self.num_heads, tgt_len, src_len): raise ValueError( f"Attention weights should be of size {(bsz * self.num_heads, tgt_len, src_len)}, but is {attn_weights.size()}" ) if attention_mask is not None: if attention_mask.size() != (bsz, 1, tgt_len, src_len): raise ValueError( f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is {attention_mask.size()}" ) attn_weights = attn_weights.view(bsz, self.num_heads, tgt_len, src_len) + attention_mask attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len) attn_weights = nn.functional.softmax(attn_weights, dim=-1) if layer_head_mask is not None: if layer_head_mask.size() != (self.num_heads,): raise ValueError( f"Head mask for a single layer should be of size {(self.num_heads,)}, but is {layer_head_mask.size()}" ) attn_weights = layer_head_mask.view(1, -1, 1, 1) * attn_weights.view(bsz, self.num_heads, tgt_len, src_len) attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len) if output_attentions: # this operation is a bit awkward, but it's required to # make sure that attn_weights keeps its gradient. # In order to do so, attn_weights have to be reshaped # twice and have to be reused in the following attn_weights_reshaped = attn_weights.view(bsz, self.num_heads, tgt_len, src_len) attn_weights = attn_weights_reshaped.view(bsz * self.num_heads, tgt_len, src_len) else: attn_weights_reshaped = None attn_probs = nn.functional.dropout(attn_weights, p=self.dropout, training=self.training) attn_output = torch.bmm(attn_probs, value_states) if attn_output.size() != (bsz * self.num_heads, tgt_len, self.head_dim): raise ValueError( f"`attn_output` should be of size {(bsz, self.num_heads, tgt_len, self.head_dim)}, but is {attn_output.size()}" ) attn_output = attn_output.view(bsz, self.num_heads, tgt_len, self.head_dim) attn_output = attn_output.transpose(1, 2) attn_output = attn_output.reshape(bsz, tgt_len, embed_dim) attn_output = self.out_proj(attn_output) return attn_output, attn_weights_reshaped, past_key_value # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2FeedForward with Wav2Vec2->Hubert class HubertFeedForward(nn.Module): def __init__(self, config): super().__init__() self.intermediate_dropout = nn.Dropout(config.activation_dropout) self.intermediate_dense = nn.Linear(config.hidden_size, config.intermediate_size) if isinstance(config.hidden_act, str): self.intermediate_act_fn = ACT2FN[config.hidden_act] else: self.intermediate_act_fn = config.hidden_act self.output_dense = nn.Linear(config.intermediate_size, config.hidden_size) self.output_dropout = nn.Dropout(config.hidden_dropout) def forward(self, hidden_states): hidden_states = self.intermediate_dense(hidden_states) hidden_states = self.intermediate_act_fn(hidden_states) hidden_states = self.intermediate_dropout(hidden_states) hidden_states = self.output_dense(hidden_states) hidden_states = self.output_dropout(hidden_states) return hidden_states # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2EncoderLayer with Wav2Vec2->Hubert class HubertEncoderLayer(nn.Module): def __init__(self, config): super().__init__() self.attention = HubertAttention( embed_dim=config.hidden_size, num_heads=config.num_attention_heads, dropout=config.attention_dropout, is_decoder=False, ) self.dropout = nn.Dropout(config.hidden_dropout) self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.feed_forward = HubertFeedForward(config) self.final_layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) def forward(self, hidden_states, attention_mask=None, output_attentions=False): attn_residual = hidden_states hidden_states, attn_weights, _ = self.attention( hidden_states, attention_mask=attention_mask, output_attentions=output_attentions ) hidden_states = self.dropout(hidden_states) hidden_states = attn_residual + hidden_states hidden_states = self.layer_norm(hidden_states) hidden_states = hidden_states + self.feed_forward(hidden_states) hidden_states = self.final_layer_norm(hidden_states) outputs = (hidden_states,) if output_attentions: outputs += (attn_weights,) return outputs # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2EncoderLayerStableLayerNorm with Wav2Vec2->Hubert class HubertEncoderLayerStableLayerNorm(nn.Module): def __init__(self, config): super().__init__() self.attention = HubertAttention( embed_dim=config.hidden_size, num_heads=config.num_attention_heads, dropout=config.attention_dropout, is_decoder=False, ) self.dropout = nn.Dropout(config.hidden_dropout) self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.feed_forward = HubertFeedForward(config) self.final_layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) def forward(self, hidden_states, attention_mask=None, output_attentions=False): attn_residual = hidden_states hidden_states = self.layer_norm(hidden_states) hidden_states, attn_weights, _ = self.attention( hidden_states, attention_mask=attention_mask, output_attentions=output_attentions ) hidden_states = self.dropout(hidden_states) hidden_states = attn_residual + hidden_states hidden_states = hidden_states + self.feed_forward(self.final_layer_norm(hidden_states)) outputs = (hidden_states,) if output_attentions: outputs += (attn_weights,) return outputs # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2Encoder with Wav2Vec2->Hubert class HubertEncoder(nn.Module): def __init__(self, config): super().__init__() self.config = config self.pos_conv_embed = HubertPositionalConvEmbedding(config) self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout) self.layers = nn.ModuleList([HubertEncoderLayer(config) for _ in range(config.num_hidden_layers)]) def forward( self, hidden_states, attention_mask=None, output_attentions=False, output_hidden_states=False, return_dict=True, ): all_hidden_states = () if output_hidden_states else None all_self_attentions = () if output_attentions else None if attention_mask is not None: # make sure padded tokens output 0 hidden_states[~attention_mask] = 0.0 # extend attention_mask attention_mask = (1.0 - attention_mask[:, None, None, :].to(dtype=hidden_states.dtype)) * -10000.0 attention_mask = attention_mask.expand( attention_mask.shape[0], 1, attention_mask.shape[-1], attention_mask.shape[-1] ) position_embeddings = self.pos_conv_embed(hidden_states) hidden_states = hidden_states + position_embeddings hidden_states = self.layer_norm(hidden_states) hidden_states = self.dropout(hidden_states) deepspeed_zero3_is_enabled = is_deepspeed_zero3_enabled() for layer in self.layers: if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) # add LayerDrop (see https://arxiv.org/abs/1909.11556 for description) dropout_probability = np.random.uniform(0, 1) skip_the_layer = True if self.training and (dropout_probability < self.config.layerdrop) else False if not skip_the_layer or deepspeed_zero3_is_enabled: # under deepspeed zero3 all gpus must run in sync if getattr(self.config, "gradient_checkpointing", False) and self.training: # create gradient checkpointing function def create_custom_forward(module): def custom_forward(*inputs): return module(*inputs, output_attentions) return custom_forward layer_outputs = torch.utils.checkpoint.checkpoint( create_custom_forward(layer), hidden_states, attention_mask, ) else: layer_outputs = layer( hidden_states, attention_mask=attention_mask, output_attentions=output_attentions ) hidden_states = layer_outputs[0] if skip_the_layer: layer_outputs = (None, None) if output_attentions: all_self_attentions = all_self_attentions + (layer_outputs[1],) if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) if not return_dict: return tuple(v for v in [hidden_states, all_hidden_states, all_self_attentions] if v is not None) return BaseModelOutput( last_hidden_state=hidden_states, hidden_states=all_hidden_states, attentions=all_self_attentions, ) # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2EncoderStableLayerNorm with Wav2Vec2->Hubert class HubertEncoderStableLayerNorm(nn.Module): def __init__(self, config): super().__init__() self.config = config self.pos_conv_embed = HubertPositionalConvEmbedding(config) self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) self.dropout = nn.Dropout(config.hidden_dropout) self.layers = nn.ModuleList( [HubertEncoderLayerStableLayerNorm(config) for _ in range(config.num_hidden_layers)] ) def forward( self, hidden_states, attention_mask=None, output_attentions=False, output_hidden_states=False, return_dict=True, ): all_hidden_states = () if output_hidden_states else None all_self_attentions = () if output_attentions else None if attention_mask is not None: # make sure padded tokens are not attended to hidden_states[~attention_mask] = 0 # extend attention_mask attention_mask = (1.0 - attention_mask[:, None, None, :].to(dtype=hidden_states.dtype)) * -10000.0 attention_mask = attention_mask.expand( attention_mask.shape[0], 1, attention_mask.shape[-1], attention_mask.shape[-1] ) position_embeddings = self.pos_conv_embed(hidden_states) hidden_states = hidden_states + position_embeddings hidden_states = self.dropout(hidden_states) deepspeed_zero3_is_enabled = is_deepspeed_zero3_enabled() for layer in self.layers: if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) # add LayerDrop (see https://arxiv.org/abs/1909.11556 for description) dropout_probability = np.random.uniform(0, 1) skip_the_layer = True if self.training and (dropout_probability < self.config.layerdrop) else False if not skip_the_layer or deepspeed_zero3_is_enabled: # under deepspeed zero3 all gpus must run in sync # XXX: could optimize this like synced_gpus in generate_utils but not sure if it's worth the code complication if getattr(self.config, "gradient_checkpointing", False) and self.training: # create gradient checkpointing function def create_custom_forward(module): def custom_forward(*inputs): return module(*inputs, output_attentions) return custom_forward layer_outputs = torch.utils.checkpoint.checkpoint( create_custom_forward(layer), hidden_states, attention_mask, ) else: layer_outputs = layer( hidden_states, attention_mask=attention_mask, output_attentions=output_attentions ) hidden_states = layer_outputs[0] if skip_the_layer: layer_outputs = (None, None) if output_attentions: all_self_attentions = all_self_attentions + (layer_outputs[1],) hidden_states = self.layer_norm(hidden_states) if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) if not return_dict: return tuple(v for v in [hidden_states, all_hidden_states, all_self_attentions] if v is not None) return BaseModelOutput( last_hidden_state=hidden_states, hidden_states=all_hidden_states, attentions=all_self_attentions, ) class HubertPreTrainedModel(PreTrainedModel): """ An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained models. """ config_class = HubertConfig base_model_prefix = "hubert" _keys_to_ignore_on_load_missing = [r"position_ids"] def _init_weights(self, module): """Initialize the weights""" if isinstance(module, nn.Linear): # Slightly different from the TF version which uses truncated_normal for initialization # cf https://github.com/pytorch/pytorch/pull/5617 module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) elif isinstance(module, (nn.LayerNorm, nn.GroupNorm)): module.bias.data.zero_() module.weight.data.fill_(1.0) elif isinstance(module, nn.Conv1d): if is_deepspeed_zero3_enabled(): import deepspeed if hasattr(module, "weight_v") and hasattr(module, "weight_g"): with deepspeed.zero.GatheredParameters([module.weight_v, module.weight_g], modifier_rank=0): nn.init.kaiming_normal_(module.weight.data) else: with deepspeed.zero.GatheredParameters(module.weight, modifier_rank=0): nn.init.kaiming_normal_(module.weight.data) else: nn.init.kaiming_normal_(module.weight.data) if isinstance(module, (nn.Linear, nn.Conv1d)) and module.bias is not None: module.bias.data.zero_() def _get_feat_extract_output_lengths(self, input_lengths: Union[torch.LongTensor, int]): """ Computes the output length of the convolutional layers """ def _conv_out_length(input_length, kernel_size, stride): # 1D convolutional layer output length formula taken # from https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html return (input_length - kernel_size) // stride + 1 for kernel_size, stride in zip(self.config.conv_kernel, self.config.conv_stride): input_lengths = _conv_out_length(input_lengths, kernel_size, stride) return input_lengths HUBERT_START_DOCSTRING = r""" Hubert was proposed in `HuBERT: Self-Supervised Speech Representation Learning by Masked Prediction of Hidden Units `__ by Wei-Ning Hsu, Benjamin Bolte, Yao-Hung Hubert Tsai, Kushal Lakhotia, Ruslan Salakhutdinov, Abdelrahman Mohamed. This model inherits from :class:`~transformers.PreTrainedModel`. Check the superclass documentation for the generic methods the library implements for all its model (such as downloading or saving etc.). This model is a PyTorch `torch.nn.Module `_ sub-class. Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage and behavior. Parameters: config (:class:`~transformers.HubertConfig`): Model configuration class with all the parameters of the model. Initializing with a config file does not load the weights associated with the model, only the configuration. Check out the :meth:`~transformers.PreTrainedModel.from_pretrained` method to load the model weights. """ HUBERT_INPUTS_DOCSTRING = r""" Args: input_values (:obj:`torch.FloatTensor` of shape :obj:`(batch_size, sequence_length)`): Float values of input raw speech waveform. Values can be obtained by loading a `.flac` or `.wav` audio file into an array of type `List[float]` or a `numpy.ndarray`, *e.g.* via the soundfile library (`pip install soundfile`). To prepare the array into `input_values`, the :class:`~transformers.Wav2Vec2Processor` should be used for padding and conversion into a tensor of type `torch.FloatTensor`. See :meth:`transformers.Wav2Vec2Processor.__call__` for details. attention_mask (:obj:`torch.LongTensor` of shape :obj:`(batch_size, sequence_length)`, `optional`): Mask to avoid performing convolution and attention on padding token indices. Mask values selected in ``[0, 1]``: - 1 for tokens that are **not masked**, - 0 for tokens that are **masked**. `What are attention masks? <../glossary.html#attention-mask>`__ .. warning:: :obj:`attention_mask` should only be passed if the corresponding processor has ``config.return_attention_mask == True``. For all models whose processor has ``config.return_attention_mask == False``, such as `hubert-base `__, :obj:`attention_mask` should **not** be passed to avoid degraded performance when doing batched inference. For such models :obj:`input_values` should simply be padded with 0 and passed without :obj:`attention_mask`. Be aware that these models also yield slightly different results depending on whether :obj:`input_values` is padded or not. output_attentions (:obj:`bool`, `optional`): Whether or not to return the attentions tensors of all attention layers. See ``attentions`` under returned tensors for more detail. output_hidden_states (:obj:`bool`, `optional`): Whether or not to return the hidden states of all layers. See ``hidden_states`` under returned tensors for more detail. return_dict (:obj:`bool`, `optional`): Whether or not to return a :class:`~transformers.file_utils.ModelOutput` instead of a plain tuple. """ @add_start_docstrings( "The bare Hubert Model transformer outputting raw hidden-states without any specific head on top.", HUBERT_START_DOCSTRING, ) class HubertModel(HubertPreTrainedModel): def __init__(self, config: HubertConfig): super().__init__(config) self.config = config self.feature_extractor = HubertFeatureExtractor(config) self.feature_projection = HubertFeatureProjection(config) self.masked_spec_embed = nn.Parameter(torch.FloatTensor(config.hidden_size).uniform_()) if config.do_stable_layer_norm: self.encoder = HubertEncoderStableLayerNorm(config) else: self.encoder = HubertEncoder(config) self.init_weights() # Copied from transformers.models.wav2vec2.modeling_wav2vec2.Wav2Vec2Model._mask_hidden_states def _mask_hidden_states( self, hidden_states: torch.FloatTensor, mask_time_indices: Optional[torch.FloatTensor] = None, attention_mask: Optional[torch.LongTensor] = None, ): """ Masks extracted features along time axis and/or along feature axis according to `SpecAugment `__ . """ # `config.apply_spec_augment` can set masking to False if not getattr(self.config, "apply_spec_augment", True): return hidden_states # generate indices & apply SpecAugment along time axis batch_size, sequence_length, hidden_size = hidden_states.size() if mask_time_indices is not None: # apply SpecAugment along time axis with given mask_time_indices hidden_states[mask_time_indices] = self.masked_spec_embed.to(hidden_states.dtype) elif self.config.mask_time_prob > 0 and self.training: mask_time_indices = _compute_mask_indices( (batch_size, sequence_length), mask_prob=self.config.mask_time_prob, mask_length=self.config.mask_time_length, device=hidden_states.device, attention_mask=attention_mask, min_masks=2, ) hidden_states[mask_time_indices] = self.masked_spec_embed.to(hidden_states.dtype) if self.config.mask_feature_prob > 0 and self.training: # generate indices & apply SpecAugment along feature axis mask_feature_indices = _compute_mask_indices( (batch_size, hidden_size), mask_prob=self.config.mask_feature_prob, mask_length=self.config.mask_feature_length, device=hidden_states.device, attention_mask=attention_mask, ) hidden_states[mask_feature_indices[:, None].expand(-1, sequence_length, -1)] = 0 return hidden_states @add_start_docstrings_to_model_forward(HUBERT_INPUTS_DOCSTRING) @replace_return_docstrings(output_type=BaseModelOutput, config_class=_CONFIG_FOR_DOC) def forward( self, input_values, attention_mask=None, mask_time_indices=None, output_attentions=None, output_hidden_states=None, return_dict=None, ): """ Returns: Example:: >>> from transformers import Wav2Vec2Processor, HubertModel >>> from datasets import load_dataset >>> import soundfile as sf >>> processor = Wav2Vec2Processor.from_pretrained("facebook/hubert-large-ls960-ft") >>> model = HubertModel.from_pretrained("facebook/hubert-large-ls960-ft") >>> def map_to_array(batch): ... speech, _ = sf.read(batch["file"]) ... batch["speech"] = speech ... return batch >>> ds = load_dataset("patrickvonplaten/librispeech_asr_dummy", "clean", split="validation") >>> ds = ds.map(map_to_array) >>> input_values = processor(ds["speech"][0], return_tensors="pt").input_values # Batch size 1 >>> hidden_states = model(input_values).last_hidden_state """ output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict extract_features = self.feature_extractor(input_values) extract_features = extract_features.transpose(1, 2) if attention_mask is not None: # compute real output lengths according to convolution formula output_lengths = self._get_feat_extract_output_lengths(attention_mask.sum(-1)).to(torch.long) attention_mask = torch.zeros( extract_features.shape[:2], dtype=extract_features.dtype, device=extract_features.device ) # these two operations makes sure that all values # before the output lengths indices are attended to attention_mask[ (torch.arange(attention_mask.shape[0], device=extract_features.device), output_lengths - 1) ] = 1 attention_mask = attention_mask.flip([-1]).cumsum(-1).flip([-1]).bool() hidden_states = self.feature_projection(extract_features) hidden_states = self._mask_hidden_states(hidden_states, mask_time_indices=mask_time_indices) encoder_outputs = self.encoder( hidden_states, attention_mask=attention_mask, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = encoder_outputs[0] if not return_dict: return (hidden_states,) + encoder_outputs[1:] return BaseModelOutput( last_hidden_state=hidden_states, hidden_states=encoder_outputs.hidden_states, attentions=encoder_outputs.attentions, ) @add_start_docstrings( """Hubert Model with a `language modeling` head on top for Connectionist Temporal Classification (CTC). """, HUBERT_START_DOCSTRING, ) class HubertForCTC(HubertPreTrainedModel): def __init__(self, config): super().__init__(config) self.hubert = HubertModel(config) self.dropout = nn.Dropout(config.final_dropout) self.lm_head = nn.Linear(config.hidden_size, config.vocab_size) self.init_weights() def freeze_feature_extractor(self): """ Calling this function will disable the gradient computation for the feature extractor so that its parameter will not be updated during training. """ self.hubert.feature_extractor._freeze_parameters() @add_start_docstrings_to_model_forward(HUBERT_INPUTS_DOCSTRING) @replace_return_docstrings(output_type=CausalLMOutput, config_class=_CONFIG_FOR_DOC) def forward( self, input_values, attention_mask=None, output_attentions=None, output_hidden_states=None, return_dict=None, labels=None, ): r""" labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size, target_length)`, `optional`): Labels for connectionist temporal classification. Note that ``target_length`` has to be smaller or equal to the sequence length of the output logits. Indices are selected in ``[-100, 0, ..., config.vocab_size - 1]``. All labels set to ``-100`` are ignored (masked), the loss is only computed for labels in ``[0, ..., config.vocab_size - 1]``. Returns: Example:: >>> import torch >>> from transformers import Wav2Vec2Processor, HubertForCTC >>> from datasets import load_dataset >>> import soundfile as sf >>> processor = Wav2Vec2Processor.from_pretrained("facebook/hubert-large-ls960-ft") >>> model = HubertForCTC.from_pretrained("facebook/hubert-large-ls960-ft") >>> def map_to_array(batch): ... speech, _ = sf.read(batch["file"]) ... batch["speech"] = speech ... return batch >>> ds = load_dataset("patrickvonplaten/librispeech_asr_dummy", "clean", split="validation") >>> ds = ds.map(map_to_array) >>> input_values = processor(ds["speech"][0], return_tensors="pt").input_values # Batch size 1 >>> logits = model(input_values).logits >>> predicted_ids = torch.argmax(logits, dim=-1) >>> transcription = processor.decode(predicted_ids[0]) >>> # compute loss >>> target_transcription = "A MAN SAID TO THE UNIVERSE SIR I EXIST" >>> # wrap processor as target processor to encode labels >>> with processor.as_target_processor(): ... labels = processor(target_transcription, return_tensors="pt").input_ids >>> loss = model(input_values, labels=labels).loss """ return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.hubert( input_values, attention_mask=attention_mask, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = outputs[0] hidden_states = self.dropout(hidden_states) logits = self.lm_head(hidden_states) loss = None if labels is not None: if labels.max() >= self.config.vocab_size: raise ValueError(f"Label values must be <= vocab_size: {self.config.vocab_size}") # retrieve loss input_lengths from attention_mask attention_mask = ( attention_mask if attention_mask is not None else torch.ones_like(input_values, dtype=torch.long) ) input_lengths = self._get_feat_extract_output_lengths(attention_mask.sum(-1)).to(torch.long) # assuming that padded tokens are filled with -100 # when not being attended to labels_mask = labels >= 0 target_lengths = labels_mask.sum(-1) flattened_targets = labels.masked_select(labels_mask) # ctc_loss doesn't support fp16 log_probs = nn.functional.log_softmax(logits, dim=-1, dtype=torch.float32).transpose(0, 1) with torch.backends.cudnn.flags(enabled=False): loss = nn.functional.ctc_loss( log_probs, flattened_targets, input_lengths, target_lengths, blank=self.config.pad_token_id, reduction=self.config.ctc_loss_reduction, zero_infinity=self.config.ctc_zero_infinity, ) if not return_dict: output = (logits,) + outputs[1:] return ((loss,) + output) if loss is not None else output return CausalLMOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions )