|
from transformers import PretrainedConfig, PreTrainedModel, AutoModel, AutoConfig |
|
import torch |
|
import os |
|
import json |
|
from huggingface_hub import snapshot_download |
|
|
|
class IndicASRConfig(PretrainedConfig): |
|
model_type = "iasr" |
|
|
|
def __init__(self, ts_folder: str = "path", BLANK_ID: int = 256, RNNT_MAX_SYMBOLS: int = 10, |
|
PRED_RNN_LAYERS: int = 2, PRED_RNN_HIDDEN_DIM: int = 640, SOS: int = 5632, **kwargs): |
|
super().__init__(**kwargs) |
|
self.ts_folder = ts_folder |
|
self.BLANK_ID = BLANK_ID |
|
self.RNNT_MAX_SYMBOLS = RNNT_MAX_SYMBOLS |
|
self.PRED_RNN_LAYERS = PRED_RNN_LAYERS |
|
self.PRED_RNN_HIDDEN_DIM = PRED_RNN_HIDDEN_DIM |
|
self.SOS = SOS |
|
|
|
class IndicASRModel(PreTrainedModel): |
|
config_class = IndicASRConfig |
|
|
|
def __init__(self, config): |
|
super().__init__(config) |
|
|
|
|
|
|
|
self.models = {} |
|
names = ['preprocessor','encoder', 'ctc_decoder', 'rnnt_decoder', 'joint_enc', 'joint_pred', 'joint_pre_net'] + \ |
|
[f'joint_post_net_{z}' for z in ['as', 'bn', 'brx', 'doi', 'gu', 'hi', 'kn', 'kok', 'ks', 'mai', 'ml', 'mni', 'mr', 'ne', 'or', 'pa', 'sa', 'sat', 'sd', 'ta', 'te', 'ur']] |
|
|
|
for n in names: |
|
component_name = f'{config.ts_folder}/assets/{n}.ts' |
|
if os.path.exists(component_name): |
|
self.models[n] = torch.jit.load(component_name) |
|
else: |
|
self.models[n] = None |
|
print(f'Failed to load {component_name}') |
|
|
|
|
|
with open(f'{config.ts_folder}/assets/vocab.json') as reader: |
|
self.vocab = json.load(reader) |
|
|
|
with open(f'{config.ts_folder}/assets/language_masks.json') as reader: |
|
self.language_masks = json.load(reader) |
|
|
|
def forward(self, wav, lang, decoding='ctc'): |
|
encoder_outputs, encoded_lengths = self.encode(wav) |
|
if decoding == 'ctc': |
|
return self._ctc_decode(encoder_outputs, encoded_lengths, lang) |
|
if decoding == 'rnnt': |
|
return self._rnnt_decode(encoder_outputs, encoded_lengths, lang) |
|
|
|
def encode(self, wav): |
|
audio_signal, length = self.models['preprocessor'](input_signal=wav, length=torch.tensor([wav.shape[-1]])) |
|
outputs, encoded_lengths = self.models['encoder'](audio_signal=audio_signal, length=length) |
|
return outputs, encoded_lengths |
|
|
|
def _ctc_decode(self, encoder_outputs, encoded_lengths, lang): |
|
logprobs = self.models['ctc_decoder'](encoder_output=encoder_outputs) |
|
logprobs = logprobs[:,:,self.language_masks[lang]].log_softmax(dim=-1) |
|
indices = torch.argmax(logprobs[0],dim=-1) |
|
collapsed_indices = torch.unique_consecutive(indices, dim=-1) |
|
return ''.join([self.vocab[lang][x] for x in collapsed_indices if x != self.config.BLANK_ID]).replace('▁',' ').strip() |
|
|
|
def _rnnt_decode(self, encoder_outputs, encoded_lengths, lang): |
|
joint_enc = self.models['joint_enc'](encoder_outputs.transpose(1, 2)) |
|
hyp = [self.config.SOS] |
|
prev_dec_state = (torch.zeros(self.config.PRED_RNN_LAYERS,1,self.config.PRED_RNN_HIDDEN_DIM), |
|
torch.zeros(self.config.PRED_RNN_LAYERS,1,self.config.PRED_RNN_HIDDEN_DIM)) |
|
|
|
for t in range(joint_enc.size(1)): |
|
f = joint_enc[:, t, :].unsqueeze(1) |
|
not_blank = True |
|
symbols_added = 0 |
|
|
|
while not_blank and ((self.config.RNNT_MAX_SYMBOLS is None) or (symbols_added < self.config.RNNT_MAX_SYMBOLS)): |
|
g, _, dec_state = self.models['rnnt_decoder'](targets=torch.Tensor([[hyp[-1]]]).long(), target_length=torch.tensor([1]), states=prev_dec_state) |
|
g = self.models['joint_pred'](g.transpose(1,2)) |
|
joint_out = f + g |
|
joint_out = self.models['joint_pre_net'](joint_out) |
|
logits = self.models[f'joint_post_net_{lang}'](joint_out) |
|
log_probs = logits.log_softmax(dim=-1) |
|
pred_token = log_probs.argmax(dim=-1).item() |
|
|
|
if pred_token == self.config.BLANK_ID: |
|
not_blank = False |
|
else: |
|
hyp.append(pred_token) |
|
prev_dec_state = dec_state |
|
symbols_added += 1 |
|
|
|
return ''.join([self.vocab[lang][x] for x in hyp if x != self.config.SOS]).replace('▁',' ').strip() |
|
|
|
def _save_pretrained(self, save_directory) -> None: |
|
|
|
os.makedirs(f'{save_directory}/assets', exist_ok=True) |
|
for m_name, m in self.models.items(): |
|
if m is not None: |
|
m.save(os.path.join(save_directory,'assets',m_name+'.ts')) |
|
|
|
|
|
with open(f'{save_directory}/assets/vocab.json','w') as writer: |
|
print(json.dumps(self.vocab),file=writer) |
|
|
|
|
|
with open(f'{save_directory}/assets/language_masks.json','w') as writer: |
|
print(json.dumps(self.language_masks),file=writer) |
|
|
|
@classmethod |
|
def from_pretrained(cls, |
|
pretrained_model_name_or_path, |
|
*, |
|
force_download=False, |
|
resume_download=None, |
|
proxies=None, |
|
token=None, |
|
cache_dir=None, |
|
local_files_only=False, |
|
revision=None, **kwargs): |
|
loc = snapshot_download(repo_id=pretrained_model_name_or_path, token=token) |
|
return cls(IndicASRConfig(ts_folder=loc)) |
|
|
|
if __name__ == '__main__': |
|
from transformers import AutoConfig, AutoModel |
|
|
|
|
|
AutoConfig.register("iasr", IndicASRConfig) |
|
AutoModel.register(IndicASRConfig, IndicASRModel) |