NLTM-NITG's picture
Update HuggingFace/model.py
a8b0e92 verified
import torch
import torch.nn as nn
import numpy as np
import torchaudio
import soundfile as sf
from torch import Tensor
# Define your device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Define constants based on the loaded checkpoint
e_dim = 512 # Update with the correct embedding dimension based on your model
n_classes = 2 # Number of language classes, based on your requirement
look_back1 = 30
look_back2 = 60
lan2id = {'MA': 0, 'PU': 1}
# Function to preprocess input data
def Get_data(X):
if isinstance(X, torch.Tensor):
X = X.cpu().numpy()
mu = X.mean(axis=0)
std = X.std(axis=0)
np.place(std, std == 0, 1)
X = (X - mu) / std
Xdata1 = []
Xdata2 = []
for i in range(0, len(X)-look_back1, 1):
a = X[i:(i+look_back1), :]
Xdata1.append(a)
Xdata1 = np.array(Xdata1)
for i in range(0, len(X)-look_back2, 2):
b = X[i+1:(i+look_back2):3, :]
Xdata2.append(b)
Xdata2 = np.array(Xdata2)
return Xdata1, Xdata2
class LSTMNet(nn.Module):
def __init__(self):
super(LSTMNet, self).__init__()
self.lstm1 = nn.LSTM(1024, 512, bidirectional=True)
self.lstm2 = nn.LSTM(1024, 256, bidirectional=True)
self.fc_ha = nn.Linear(e_dim, 256)
self.fc_1 = nn.Linear(256, 1)
self.softmax = nn.Softmax(dim=1)
def forward(self, x):
x1, _ = self.lstm1(x)
x2, _ = self.lstm2(x1)
ht = x2[-1]
ht = torch.unsqueeze(ht, 0)
ha = torch.tanh(self.fc_ha(ht))
alp = self.fc_1(ha)
al = self.softmax(alp)
T = list(ht.shape)[1]
batch_size = list(ht.shape)[0]
D = list(ht.shape)[2]
c = torch.bmm(al.view(batch_size, 1, T), ht.view(batch_size, T, D))
c = torch.squeeze(c, 0)
return c
class CCSL_Net(nn.Module):
def __init__(self, model1, model2):
super(CCSL_Net, self).__init__()
self.model1 = model1
self.model2 = model2
self.att1 = nn.Linear(e_dim, 256)
self.att2 = nn.Linear(256, 1)
self.softmax = nn.Softmax(dim=1)
self.lang_classifier = nn.Linear(e_dim, n_classes, bias=False)
def forward(self, x1, x2):
e1 = self.model1(x1)
e2 = self.model2(x2)
ht_e = torch.cat((e1, e2), dim=0)
ht_e = torch.unsqueeze(ht_e, 0)
ha_e = torch.tanh(self.att1(ht_e))
alp = torch.tanh(self.att2(ha_e))
al = self.softmax(alp)
Tb = list(ht_e.shape)[1]
batch_size = list(ht_e.shape)[0]
D = list(ht_e.shape)[2]
u_vec = torch.bmm(al.view(batch_size, 1, Tb), ht_e.view(batch_size, Tb, D))
u_vec = torch.squeeze(u_vec, 0)
lan_prim = self.lang_classifier(u_vec)
return lan_prim
class DID_Model(nn.Module):
def __init__(self):
super(DID_Model, self).__init__()
self.model1 = LSTMNet()
self.model2 = LSTMNet()
self.ccslnet = CCSL_Net(self.model1, self.model2)
self.wave2vec_model_path = ""
def forward(self, x1, x2):
output = self.ccslnet(x1, x2)
return output
def load_weights(self, checkpoint_path, wave2vec_model_path):
checkpoint = torch.load(checkpoint_path, map_location=device)
self.wave2vec_model_path = wave2vec_model_path
# Load weights for model1
self.model1.lstm1.load_state_dict({
'weight_ih_l0': checkpoint['model1.lstm1.weight_ih_l0'],
'weight_hh_l0': checkpoint['model1.lstm1.weight_hh_l0'],
'bias_ih_l0': checkpoint['model1.lstm1.bias_ih_l0'],
'bias_hh_l0': checkpoint['model1.lstm1.bias_hh_l0'],
'weight_ih_l0_reverse': checkpoint['model1.lstm1.weight_ih_l0_reverse'],
'weight_hh_l0_reverse': checkpoint['model1.lstm1.weight_hh_l0_reverse'],
'bias_ih_l0_reverse': checkpoint['model1.lstm1.bias_ih_l0_reverse'],
'bias_hh_l0_reverse': checkpoint['model1.lstm1.bias_hh_l0_reverse']
})
self.model1.lstm2.load_state_dict({
'weight_ih_l0': checkpoint['model1.lstm2.weight_ih_l0'],
'weight_hh_l0': checkpoint['model1.lstm2.weight_hh_l0'],
'bias_ih_l0': checkpoint['model1.lstm2.bias_ih_l0'],
'bias_hh_l0': checkpoint['model1.lstm2.bias_hh_l0'],
'weight_ih_l0_reverse': checkpoint['model1.lstm2.weight_ih_l0_reverse'],
'weight_hh_l0_reverse': checkpoint['model1.lstm2.weight_hh_l0_reverse'],
'bias_ih_l0_reverse': checkpoint['model1.lstm2.bias_ih_l0_reverse'],
'bias_hh_l0_reverse': checkpoint['model1.lstm2.bias_hh_l0_reverse']
})
self.model1.fc_ha.load_state_dict({
'weight': checkpoint['model1.fc_ha.weight'],
'bias': checkpoint['model1.fc_ha.bias']
})
self.model1.fc_1.load_state_dict({
'weight': checkpoint['model1.fc_1.weight'],
'bias': checkpoint['model1.fc_1.bias']
})
# Load weights for model2
self.model2.lstm1.load_state_dict({
'weight_ih_l0': checkpoint['model2.lstm1.weight_ih_l0'],
'weight_hh_l0': checkpoint['model2.lstm1.weight_hh_l0'],
'bias_ih_l0': checkpoint['model2.lstm1.bias_ih_l0'],
'bias_hh_l0': checkpoint['model2.lstm1.bias_hh_l0'],
'weight_ih_l0_reverse': checkpoint['model2.lstm1.weight_ih_l0_reverse'],
'weight_hh_l0_reverse': checkpoint['model2.lstm1.weight_hh_l0_reverse'],
'bias_ih_l0_reverse': checkpoint['model2.lstm1.bias_ih_l0_reverse'],
'bias_hh_l0_reverse': checkpoint['model2.lstm1.bias_hh_l0_reverse']
})
self.model2.lstm2.load_state_dict({
'weight_ih_l0': checkpoint['model2.lstm2.weight_ih_l0'],
'weight_hh_l0': checkpoint['model2.lstm2.weight_hh_l0'],
'bias_ih_l0': checkpoint['model2.lstm2.bias_ih_l0'],
'bias_hh_l0': checkpoint['model2.lstm2.bias_hh_l0'],
'weight_ih_l0_reverse': checkpoint['model2.lstm2.weight_ih_l0_reverse'],
'weight_hh_l0_reverse': checkpoint['model2.lstm2.weight_hh_l0_reverse'],
'bias_ih_l0_reverse': checkpoint['model2.lstm2.bias_ih_l0_reverse'],
'bias_hh_l0_reverse': checkpoint['model2.lstm2.bias_hh_l0_reverse']
})
self.model2.fc_ha.load_state_dict({
'weight': checkpoint['model2.fc_ha.weight'],
'bias': checkpoint['model2.fc_ha.bias']
})
self.model2.fc_1.load_state_dict({
'weight': checkpoint['model2.fc_1.weight'],
'bias': checkpoint['model2.fc_1.bias']
})
# Load attention weights
self.ccslnet.att1.load_state_dict({
'weight': checkpoint['att1.weight'],
'bias': checkpoint['att1.bias']
})
self.ccslnet.att2.load_state_dict({
'weight': checkpoint['att2.weight'],
'bias': checkpoint['att2.bias']
})
# Load language classifier weights
self.ccslnet.lang_classifier.load_state_dict({
'weight': checkpoint['lang_classifier.weight']
})
print("Weights loaded successfully!")
print("Dialect Identification Model loaded!")
def predict_dialect(self, audio_path):
wave2vec_model_path = self.wave2vec_model_path
input_features = self.extract_wav2vec_features(audio_path, wave2vec_model_path)
X1, X2 = Get_data(input_features)
X1 = np.swapaxes(X1, 0, 1)
X2 = np.swapaxes(X2, 0, 1)
x1 = torch.from_numpy(X1).to(device)
x2 = torch.from_numpy(X2).to(device)
# Pass inputs through the model
with torch.no_grad():
output = self.forward(x1, x2)
predicted_value = output.argmax().cpu().item()
# Convert predicted value to dialect
dialect = next(key for key, value in lan2id.items() if value == predicted_value)
return dialect
def extract_wav2vec_features(self, audio_path, wave2vec_model_path):
wave2vec2_bundle = torchaudio.pipelines.WAV2VEC2_ASR_LARGE_960H
wave2vec2_model = wave2vec2_bundle.get_model()
# Load the state dictionary from the given path
wave2vec2_model.load_state_dict(torch.load(wave2vec_model_path, map_location=device))
wave2vec2_model = wave2vec2_model.to(device)
wave2vec2_model.eval()
print("Wav2Vec 2.0 model loaded!")
print(f"\n\nLoading audio from {audio_path}.")
X, sample_rate = sf.read(audio_path)
waveform = Tensor(X)
waveform = waveform.unsqueeze(0)
if sample_rate != wave2vec2_bundle.sample_rate:
waveform = torchaudio.functional.resample(waveform, sample_rate, wave2vec2_bundle.sample_rate)
waveform = waveform.squeeze(-1)
with torch.inference_mode():
features, _ = wave2vec2_model.extract_features(waveform)
input_features = torch.squeeze(features[2])
return input_features