File size: 5,188 Bytes
99269d6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
from dataclasses import dataclass
from typing import Optional, Union, Tuple
from rich import print
import torch
import torch.nn as nn
from transformers import PreTrainedModel
from transformers.utils import ModelOutput
from .configuration_xvector import XVectorConfig
from .audio_processing import AudioToMelSpectrogramPreprocessor
from .audio_processing import SpectrogramAugmentation
from .conv_asr import XVectorEncoder, SpeakerDecoder
from .angular_loss import AdditiveMarginSoftmaxLoss, AdditiveAngularMarginSoftmaxLoss
@dataclass
class XVectorBaseModelOutput(ModelOutput):
encoder_outputs: torch.FloatTensor = None
extract_features: torch.FloatTensor = None
output_lengths: torch.FloatTensor = None
@dataclass
class XVectorSequenceClassifierOutput(ModelOutput):
loss: torch.FloatTensor = None
logits: torch.FloatTensor = None
embeddings: torch.FloatTensor = None
class XVectorPreTrainedModel(PreTrainedModel):
config_class = XVectorConfig
base_model_prefix = "xvector"
main_input_name = "input_values"
def _init_weights(self, module):
"""Initialize the weights"""
config: XVectorConfig = self.config
if isinstance(module, (nn.Linear, nn.Conv1d)):
module.weight.data.normal_(mean=0.0, std=config.initializer_range)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Conv2d):
module.weight.data.normal_(mean=0.0, std=config.initializer_range)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
elif isinstance(module, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.constant_(module.weight, 1)
nn.init.constant_(module.bias, 0)
@property
def num_weights(self):
"""
Utility property that returns the total number of parameters of NeuralModule.
"""
return self._num_weights()
@torch.jit.ignore
def _num_weights(self):
num: int = 0
for p in self.parameters():
if p.requires_grad:
num += p.numel()
return num
class XVectorModel(XVectorPreTrainedModel):
def __init__(self, config: XVectorConfig):
super().__init__(config)
self.config = config
self.preprocessor = AudioToMelSpectrogramPreprocessor(**config.mel_spectrogram_config)
self.spec_augment = SpectrogramAugmentation(**config.spectrogram_augmentation_config)
self.encoder = XVectorEncoder(**config.encoder_config)
# Initialize weights and apply final processing
self.post_init()
def forward(
self,
input_values: Optional[torch.Tensor],
attention_mask: Optional[torch.Tensor] = None,
) -> Union[Tuple, XVectorBaseModelOutput]:
if attention_mask is None:
attention_mask = torch.ones_like(input_values).to(input_values)
lengths = attention_mask.sum(dim=1).long()
extract_features, output_lengths = self.preprocessor(input_values, lengths)
if self.training:
extract_features = self.spec_augment(extract_features, output_lengths)
encoder_outputs, output_lengths = self.encoder(extract_features, output_lengths)
return XVectorBaseModelOutput(
encoder_outputs=encoder_outputs,
extract_features=extract_features,
output_lengths=output_lengths,
)
class XVectorForSequenceClassification(XVectorPreTrainedModel):
def __init__(self, config: XVectorConfig):
super().__init__(config)
self.xvector = XVectorModel(config)
self.classifier = SpeakerDecoder(**config.decoder_config)
if config.objective == 'additive_angular_margin':
self.loss_fct = AdditiveAngularMarginSoftmaxLoss(**config.objective_config)
elif config.objective == 'additive_margin':
self.loss_fct = AdditiveMarginSoftmaxLoss(**config.objective_config)
elif config.objective == 'cross_entropy':
self.loss_fct = nn.CrossEntropyLoss(**config.objective_config)
self.init_weights()
def freeze_base_model(self):
for param in self.xvector.parameters():
param.requires_grad = False
def forward(
self,
input_values: Optional[torch.Tensor],
attention_mask: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
) -> Union[Tuple, XVectorSequenceClassifierOutput]:
xvector_outputs = self.xvector(
input_values,
attention_mask,
)
logits, output_embeddings = self.classifier(
xvector_outputs.encoder_outputs,
xvector_outputs.output_lengths
)
logits = logits.view(-1, self.config.num_labels)
loss = None
if labels is not None:
loss = self.loss_fct(logits, labels.view(-1))
return XVectorSequenceClassifierOutput(
loss=loss,
logits=logits,
embeddings=output_embeddings,
) |