Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
"""AudioSpeechSentimentAnalysis_JRMDIOUF.ipynb | |
Automatically generated by Colab. | |
Original file is located at | |
https://colab.research.google.com/drive/1tizgeMs7DXaZPQO3V253paATKev0ra0m | |
""" | |
#!pip install transformers | |
#!pip install wandb | |
import os | |
os.environ["CUDA_LAUNCH_BLOCKING"] = "1" | |
import pickle | |
import re | |
from typing import DefaultDict | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
import seaborn as sns | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import torchaudio | |
import torchaudio.functional as F | |
import wandb | |
# from google.colab import userdata | |
# from huggingface_hub import login | |
from sklearn.metrics import ( | |
accuracy_score, | |
confusion_matrix, | |
precision_score, | |
recall_score, | |
) | |
from torch.utils.data import DataLoader, Dataset, Subset | |
from transformers import AutoTokenizer, BertModel, Wav2Vec2ForCTC, Wav2Vec2Processor | |
"""hf_token = userdata.get("HF_TOKEN") | |
wandb_token = userdata.get("WAND_TOKEN")""" | |
# Commented out IPython magic to ensure Python compatibility. | |
# %env HF_TOKEN_ENV=$hf_token | |
"""!wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/dev.tsv | |
!wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/fine-tune.tsv | |
!wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/test.tsv | |
!wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/audio/dev.zip | |
!wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/audio/fine-tune.zip | |
!wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/audio/test.zip | |
if not os.path.exists("dev_raw"): | |
print("dev_raw folder not found. Unzipping dev.zip...") | |
!unzip -q dev.zip | |
else: | |
print("dev_raw folder already exists. Skipping unzip.") | |
if not os.path.exists("fine-tune_raw"): | |
print("fine-tune_raw folder not found. Unzipping fine-tune.zip...") | |
!unzip -q fine-tune.zip | |
else: | |
print("fine-tune_raw folder already exists. Skipping unzip.") | |
if not os.path.exists("test_raw"): | |
print("test_raw folder not found. Unzipping test.zip...") | |
!unzip -q test.zip | |
else: | |
print("test_raw folder already exists. Skipping unzip.")""" | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
NUM_EPOCHS = 5 | |
BATCH_SIZE = 16 | |
SAVED_CUSTOM_BERT_TOKEN_MAX_LEN_PATH = "max_len.pkl" | |
SAVED_CUSTOM_BERT_TOKENIZER_DIR = "bert_tokenizer_local" | |
SAVED_CUSTOM_BERT_MODEL_PATH = "custom_bert_model.bin" | |
SAVED_TARGET_CAT_PATH = "categories.bin" | |
TRAIN_DS_PATH = "fine-tune.tsv" | |
TEST_DS_PATH = "test.tsv" | |
BERT_BASE_MODEL = "google-bert/bert-base-uncased" | |
INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE = 30 | |
SAVED_AUDIO_MODEL_DIR_PATH = "wav2vec2_local" | |
AUDIO_BASE_MODEL = "facebook/wav2vec2-base-960h" | |
PROCESSOR_NAME = "preprocessor_config.json" | |
MODEL_NAME = "config.json" | |
SENTIMENT_MODALITIES = ["Neutral", "Positive", "Negative"] | |
class CustomBertDataset(Dataset): | |
def __init__( | |
self, | |
file_path, | |
audio_folder, | |
model_path=BERT_BASE_MODEL, | |
saved_target_cats_path=SAVED_TARGET_CAT_PATH, | |
saved_max_len_path=SAVED_CUSTOM_BERT_TOKEN_MAX_LEN_PATH, | |
): | |
self.model_path = model_path | |
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path) | |
self.lines = open(file_path).readlines() | |
self.lines = np.array( | |
[ | |
[ | |
re.split(r"\t+", line.replace("\n", ""))[1], | |
re.split(r"\t+", line.replace("\n", ""))[4], | |
re.split(r"\t+", line.replace("\n", ""))[0], | |
] | |
for i, line in enumerate(self.lines) | |
if line != "\n" and i != 0 | |
] | |
) | |
self.elem_cats = self.lines[:, 1] | |
self.corpus = self.lines[:, 0] | |
self.audio_files_id = self.lines[:, 2] | |
# We have to proceed in this order here | |
self.corpus = [ | |
sent.lower() | |
for sent, cat in zip(self.corpus, self.elem_cats) | |
if cat in SENTIMENT_MODALITIES | |
] | |
self.audio_files = np.array( | |
[ | |
os.path.join(audio_folder, f"{file_name}.flac") | |
for file_name, cat in zip(self.audio_files_id, self.elem_cats) | |
if cat in SENTIMENT_MODALITIES | |
] | |
) | |
self.elem_cats = [cat for cat in self.elem_cats if cat in SENTIMENT_MODALITIES] | |
self.unique_cats = sorted(list(set(self.elem_cats))) | |
self.num_class = len(self.unique_cats) | |
self.cats_dict = {cat: i for i, cat in enumerate(self.unique_cats)} | |
self.targets = np.array([self.cats_dict[cat] for cat in self.elem_cats]) | |
torch.save(self.unique_cats, saved_target_cats_path) | |
self.tokenizer.save_pretrained(SAVED_CUSTOM_BERT_TOKENIZER_DIR) | |
"""entry_dict = DefaultDict(list) | |
for i in range(len(self.corpus)): | |
entry_dict[self.targets[i]].append(self.corpus[i]) | |
self.final_corpus = [] | |
self.final_targets = [] | |
n=0 | |
while n < len(self.corpus): | |
for key in entry_dict.keys(): | |
if len(entry_dict[key]) > 0: | |
self.final_corpus.append(entry_dict[key].pop(0)) | |
self.final_targets.append(key) | |
n+=1 | |
self.corpus = np.array(self.final_corpus) | |
self.targets = np.array(self.final_targets)""" | |
self.max_len = 0 | |
for sent in self.corpus: | |
input_ids = self.tokenizer.encode(sent, add_special_tokens=True) | |
self.max_len = max(self.max_len, len(input_ids)) | |
self.max_len = min(self.max_len, 512) | |
print(f"Max length : {self.max_len}") | |
print(f"Nombre de classes : {self.num_class}") | |
print(f"Exemples de targets : {np.unique(self.targets)}") | |
# Save max_len | |
with open(saved_max_len_path, "wb") as f: | |
pickle.dump(self.max_len, f) | |
print(f"max_len saved to {saved_max_len_path}") | |
def __len__(self): | |
return len(self.elem_cats) | |
def __getitem__(self, idx): | |
text = self.corpus[idx] | |
target = self.targets[idx] | |
# Vérification : target doit être entre 0 et num_class - 1 | |
if target < 0 or target >= self.num_class: | |
raise ValueError( | |
f"Target out of bounds: {target} not in [0, {self.num_class - 1}]" | |
) | |
encoded_input = self.tokenizer.encode_plus( | |
text, | |
max_length=self.max_len, | |
padding="max_length", | |
truncation=True, | |
return_tensors="pt", | |
) | |
return ( | |
encoded_input["input_ids"].squeeze(0), | |
encoded_input["attention_mask"].squeeze(0), | |
torch.tensor(target, dtype=torch.long), | |
self.audio_files[idx], | |
) | |
# return np.array(encoded_input), torch.tensor(target, dtype=torch.long) | |
class CustomBertModel(nn.Module): | |
def __init__(self, num_class, model_path=BERT_BASE_MODEL): | |
super(CustomBertModel, self).__init__() | |
self.model_path = model_path | |
self.num_class = num_class | |
self.bert = BertModel.from_pretrained(self.model_path) | |
# Freeze of the parameters of this layer for the training process | |
for param in self.bert.parameters(): | |
param.requires_grad = False | |
# self.proj_intermediate = nn.Sequential(nn.Linear(self.bert.config.hidden_size, INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE),nn.Linear(INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE, INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE), INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE),nn.Linear(INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE, INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE)) | |
self.proj_lin = nn.Linear(self.bert.config.hidden_size, self.num_class) | |
def forward(self, input_ids, attention_mask): | |
x = self.bert(input_ids=input_ids, attention_mask=attention_mask) | |
x = x.last_hidden_state[:, 0, :] | |
# x = self.proj_intermediate(x) | |
x = self.proj_lin(x) | |
return x | |
def train_step(model, train_dataloader, loss_fn, optimizer): | |
num_iterations = len(train_dataloader) | |
for i in range(NUM_EPOCHS): | |
print(f"Training Epoch n° {i}") | |
model.train() | |
for j, batch in enumerate(train_dataloader): | |
input = batch[:][0] | |
attention = batch[:][1] | |
target = batch[:][2] | |
output = model(input.to(device), attention.to(device)) | |
loss = loss_fn(output, target.to(device)) | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
run.log({"Training loss": loss}) | |
print(f"Epoch {i+1} | step {j+1} / {num_iterations} | loss : {loss}") | |
# Save model | |
torch.save(model.state_dict(), SAVED_CUSTOM_BERT_MODEL_PATH) | |
print(f"Custom BERT Model saved at {SAVED_CUSTOM_BERT_MODEL_PATH}") | |
def eval_step( | |
test_dataloader, | |
loss_fn, | |
num_class, | |
saved_model_path=SAVED_CUSTOM_BERT_MODEL_PATH, | |
saved_target_cats_path=SAVED_TARGET_CAT_PATH, | |
): | |
y_pred = [] | |
y_true = [] | |
num_iterations = len(test_dataloader) | |
# Load the saved model | |
saved_model = CustomBertModel(num_class) | |
saved_model.load_state_dict( | |
torch.load(saved_model_path, weights_only=False) | |
) # Explicitly set weights_only to False | |
saved_model = saved_model.to(device) | |
saved_model.eval() # Set the model to evaluation mode | |
print(f"Model loaded from path :{saved_model_path}") | |
with torch.no_grad(): | |
for j, batch in enumerate(test_dataloader): | |
input = batch[:][0] | |
attention = batch[:][1] | |
target = batch[:][2] | |
output = saved_model(input.to(device), attention.to(device)) | |
loss = loss_fn(output, target.to(device)) | |
run.log({"Eval loss": loss}) | |
print(f"Step {j+1} / {num_iterations} | Eval loss : {loss}") | |
y_pred.extend(output.cpu().numpy().argmax(axis=1)) | |
y_true.extend(target.cpu().numpy()) | |
class_labels = torch.load(saved_target_cats_path, weights_only=False) | |
true_labels = [class_labels[i] for i in y_true] | |
pred_labels = [class_labels[i] for i in y_pred] | |
print(f"Accuracy : {accuracy_score(true_labels, pred_labels)}") | |
cm = confusion_matrix(true_labels, pred_labels, labels=class_labels) | |
df_cm = pd.DataFrame(cm, index=class_labels, columns=class_labels) | |
sns.heatmap(df_cm, annot=True, fmt="d") | |
plt.title("Confusion Matrix for Sentiment analysis dataset") | |
plt.xlabel("Predicted Label") | |
plt.ylabel("True Label") | |
plt.show() | |
def eval_pipeline_step( | |
test_dataloader, | |
loss_fn, | |
num_class, | |
audio_model_dir=SAVED_AUDIO_MODEL_DIR_PATH, | |
audio_model_name=MODEL_NAME, | |
audio_processor_name=PROCESSOR_NAME, | |
saved_model_path=SAVED_CUSTOM_BERT_MODEL_PATH, | |
saved_target_cats_path=SAVED_TARGET_CAT_PATH, | |
): | |
y_pred = [] | |
y_true = [] | |
num_iterations = len(test_dataloader) | |
# Load the saved model | |
saved_model = CustomBertModel(num_class) | |
saved_model.load_state_dict( | |
torch.load(saved_model_path, weights_only=False) | |
) # Explicitly set weights_only to False | |
saved_model = saved_model.to(device) | |
saved_model.eval() # Set the model to evaluation mode | |
print(f"Model loaded from path :{saved_model_path}") | |
audio_processor = None | |
audio_model = None | |
processor_path = os.path.join( | |
audio_model_dir, audio_processor_name | |
) # Check for a key file, like the preprocessor config | |
model_path = os.path.join( | |
audio_model_dir, audio_model_name | |
) # Check for a key file, like the model config | |
if ( | |
os.path.exists(audio_model_dir) | |
and os.path.exists(processor_path) | |
and os.path.exists(model_path) | |
): | |
print("Local Wav2Vec2 processor and model found. Loading from local directory.") | |
audio_processor = Wav2Vec2Processor.from_pretrained(audio_model_dir) | |
audio_model = Wav2Vec2ForCTC.from_pretrained(audio_model_dir) | |
else: | |
print( | |
"Local Wav2Vec2 processor and model not found. Downloading from Hugging Face Hub." | |
) | |
audio_processor = Wav2Vec2Processor.from_pretrained(AUDIO_BASE_MODEL) | |
audio_model = Wav2Vec2ForCTC.from_pretrained(AUDIO_BASE_MODEL) | |
# Optionally save the downloaded model and processor for future use | |
audio_processor.save_pretrained(audio_model_dir) | |
audio_model.save_pretrained(audio_model_dir) | |
print(f"Wav2Vec2 processor and model downloaded and saved to {audio_model_dir}") | |
# Move audio model to GPU | |
audio_model = audio_model.to(device) | |
audio_model.eval() | |
with torch.no_grad(): | |
for j, batch in enumerate(test_dataloader): | |
target = batch[:][2] | |
audio_file_path = batch[:][3] | |
encoded_inputs = [] | |
attention_masks = [] | |
bundle = torchaudio.pipelines.WAV2VEC2_ASR_BASE_960H | |
sample_rate = bundle.sample_rate | |
for audio_file in audio_file_path: | |
waveform, sr = torchaudio.load(audio_file) | |
if sr != sample_rate: | |
print("Resampling") | |
resampler = torchaudio.transforms.Resample( | |
orig_freq=sr, new_freq=sample_rate | |
) | |
waveform = resampler(waveform) | |
# Move waveform to GPU before processing | |
input_values = audio_processor( | |
waveform.squeeze().numpy(), | |
sampling_rate=sample_rate, | |
return_tensors="pt", | |
).input_values.to(device) | |
with torch.no_grad(): | |
logits = audio_model(input_values).logits | |
predicted_ids_hf = torch.argmax(logits, dim=-1) | |
transcript_hf = audio_processor.decode( | |
predicted_ids_hf[0].cpu().numpy() | |
) # Move predicted_ids_hf back to CPU for decoding | |
transcript_hf = ( | |
transcript_hf.lower() if transcript_hf is not None else None | |
) | |
encoded_input = test_dataloader.dataset.tokenizer.encode_plus( | |
transcript_hf, | |
max_length=test_dataloader.dataset.max_len, | |
padding="max_length", | |
truncation=True, | |
return_tensors="pt", | |
) | |
encoded_inputs.append(encoded_input["input_ids"].squeeze(0)) | |
attention_masks.append(encoded_input["attention_mask"].squeeze(0)) | |
text_input = torch.stack(encoded_inputs) | |
attention = torch.stack(attention_masks) | |
output = saved_model(text_input.to(device), attention.to(device)) | |
loss = loss_fn(output, target.to(device)) | |
run.log({"Pipeline Eval loss": loss}) | |
print(f"Step {j+1} / {num_iterations} | Pipeline Eval loss : {loss}") | |
y_pred.extend(output.cpu().numpy().argmax(axis=1)) | |
y_true.extend(target.cpu().numpy()) | |
class_labels = torch.load(saved_target_cats_path, weights_only=False) | |
true_labels = [class_labels[i] for i in y_true] | |
pred_labels = [class_labels[i] for i in y_pred] | |
print(f"Pipeline Accuracy : {accuracy_score(true_labels, pred_labels)}") | |
cm = confusion_matrix(true_labels, pred_labels, labels=class_labels) | |
df_cm = pd.DataFrame(cm, index=class_labels, columns=class_labels) | |
sns.heatmap(df_cm, annot=True, fmt="d") | |
plt.title("Confusion Matrix for Sentiment analysis Pipeline") | |
plt.xlabel("Predicted Label") | |
plt.ylabel("True Label") | |
plt.show() | |
def get_audio_sentiment( | |
input_audio_path, | |
num_class=len(SENTIMENT_MODALITIES), | |
audio_model_dir=SAVED_AUDIO_MODEL_DIR_PATH, | |
audio_model_name=MODEL_NAME, | |
audio_processor_name=PROCESSOR_NAME, | |
saved_model_path=SAVED_CUSTOM_BERT_MODEL_PATH, | |
saved_target_cats_path=SAVED_TARGET_CAT_PATH, | |
tokenizer_save_directory=SAVED_CUSTOM_BERT_TOKENIZER_DIR, | |
saved_max_len_path=SAVED_CUSTOM_BERT_TOKEN_MAX_LEN_PATH, | |
): | |
# Load the saved model | |
saved_model = CustomBertModel(num_class) | |
saved_model.load_state_dict( | |
torch.load( | |
saved_model_path, weights_only=False, map_location=torch.device(device) | |
) | |
) # Explicitly set weights_only to False | |
saved_model = saved_model.to(device) | |
saved_model.eval() # Set the model to evaluation mode | |
print(f"Model loaded from path :{saved_model_path}") | |
loaded_tokenizer = AutoTokenizer.from_pretrained(tokenizer_save_directory) | |
max_len = 0 | |
with open(saved_max_len_path, "rb") as f: | |
max_len = pickle.load(f) | |
audio_processor = None | |
audio_model = None | |
processor_path = os.path.join( | |
audio_model_dir, audio_processor_name | |
) # Check for a key file, like the preprocessor config | |
model_path = os.path.join( | |
audio_model_dir, audio_model_name | |
) # Check for a key file, like the model config | |
if ( | |
os.path.exists(audio_model_dir) | |
and os.path.exists(processor_path) | |
and os.path.exists(model_path) | |
): | |
print("Local Wav2Vec2 processor and model found. Loading from local directory.") | |
audio_processor = Wav2Vec2Processor.from_pretrained(audio_model_dir) | |
audio_model = Wav2Vec2ForCTC.from_pretrained(audio_model_dir) | |
else: | |
print( | |
"Local Wav2Vec2 processor and model not found. Downloading from Hugging Face Hub." | |
) | |
audio_processor = Wav2Vec2Processor.from_pretrained(AUDIO_BASE_MODEL) | |
audio_model = Wav2Vec2ForCTC.from_pretrained(AUDIO_BASE_MODEL) | |
# Optionally save the downloaded model and processor for future use | |
audio_processor.save_pretrained(audio_model_dir) | |
audio_model.save_pretrained(audio_model_dir) | |
print(f"Wav2Vec2 processor and model downloaded and saved to {audio_model_dir}") | |
# Move audio model to GPU | |
audio_model = audio_model.to(device) | |
audio_model.eval() | |
with torch.no_grad(): | |
audio_file_path = input_audio_path | |
encoded_inputs = [] | |
attention_masks = [] | |
bundle = torchaudio.pipelines.WAV2VEC2_ASR_BASE_960H | |
sample_rate = bundle.sample_rate | |
waveform, sr = torchaudio.load(audio_file_path) | |
if sr != sample_rate: | |
print("Resampling") | |
resampler = torchaudio.transforms.Resample( | |
orig_freq=sr, new_freq=sample_rate | |
) | |
waveform = resampler(waveform) | |
# Move waveform to GPU before processing | |
input_values = audio_processor( | |
waveform.squeeze().numpy(), sampling_rate=sample_rate, return_tensors="pt" | |
).input_values.to(device) | |
with torch.no_grad(): | |
logits = audio_model(input_values).logits | |
predicted_ids_hf = torch.argmax(logits, dim=-1) | |
transcript_hf = audio_processor.decode( | |
predicted_ids_hf[0].cpu().numpy() | |
) # Move predicted_ids_hf back to CPU for decoding | |
transcript_hf = transcript_hf.lower() if transcript_hf is not None else None | |
encoded_input = loaded_tokenizer.encode_plus( | |
transcript_hf, | |
max_length=max_len, | |
padding="max_length", | |
truncation=True, | |
return_tensors="pt", | |
) | |
encoded_inputs.append(encoded_input["input_ids"].squeeze(0)) | |
attention_masks.append(encoded_input["attention_mask"].squeeze(0)) | |
# Stack the lists of tensors before moving to device | |
text_input = torch.stack(encoded_inputs) | |
attention = torch.stack(attention_masks) | |
output = saved_model(text_input.to(device), attention.to(device)) | |
class_labels = torch.load(saved_target_cats_path, weights_only=False) | |
return class_labels[output.cpu().numpy().argmax(axis=1)[0]] | |
# Login using e.g. `huggingface-cli login` to access this dataset | |
# global_train_ds = load_dataset("asapp/slue-voxceleb", streaming=True, token='jrmd_hf_token') | |
# global_train_ds = load_dataset('asapp/slue',token='jrmd_hf_token') | |
# global_train_ds = load_dataset('voxceleb',token='jrmd_hf_token') | |
# global_test_ds = load_dataset("asapp/slue", "voxceleb", split="test", token='jrmd_hf_token') | |
# Get torchaudio pipeline components | |
"""bundle = torchaudio.pipelines.WAV2VEC2_ASR_BASE_960H | |
#model = bundle.get_model() | |
#labels = bundle.get_labels() | |
sample_rate = bundle.sample_rate""" | |
"""waveform, sr = torchaudio.load("/content/dev_raw/id10012_0AXjxNXiEzo_00001.flac") | |
# Resample if sr != sample_rate (or model_hf.config.sampling_rate) | |
if sr != sample_rate: | |
print("Resampling") | |
resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sample_rate) | |
waveform = resampler(waveform)""" | |
# Using torchaudio pipeline - Manual Greedy Decoding | |
"""with torch.no_grad(): | |
emission = model(waveform)""" | |
# Assuming emission is log-probabilities or logits | |
# Perform greedy decoding: get the index of the max probability at each time step | |
# predicted_ids_torchaudio = torch.argmax(emission[0], dim=-1) | |
# Process the predicted IDs: remove consecutive duplicates and blank tokens | |
# Assuming the blank token is at index 0 (which is common for CTC, check labels if unsure) | |
"""processed_ids_torchaudio = [] | |
for id in predicted_ids_torchaudio[0]: # emission has shape (batch_size, num_frames, num_labels) | |
if id.item() != 0 and (len(processed_ids_torchaudio) == 0 or id.item() != processed_ids_torchaudio[-1]): | |
processed_ids_torchaudio.append(id.item())""" | |
"""# Convert token IDs to transcript using labels | |
#transcript = "".join([labels[id] for id in processed_ids_torchaudio]) | |
# Using Hugging Face transformers | |
# Note: processor and model_hf are defined in cell DnJDG6P3BTjZ | |
# To make this cell fully self-contained, you might want to include their definitions here as well. | |
# For now, assuming they are defined in a previously executed cell. | |
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h") | |
model_hf = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h") | |
# Load and resample waveform | |
waveform, sr = torchaudio.load("/content/dev_raw/id10012_0AXjxNXiEzo_00001.flac") | |
if sr != sample_rate: | |
print("Resampling") | |
resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sample_rate) | |
waveform = resampler(waveform) | |
input_values = processor(waveform.squeeze().numpy(), sampling_rate=sample_rate, return_tensors="pt").input_values | |
with torch.no_grad(): | |
logits = model_hf(input_values).logits | |
predicted_ids_hf = torch.argmax(logits, dim=-1) | |
transcript_hf = processor.decode(predicted_ids_hf[0]) | |
#print("Torchaudio Transcript:", transcript) | |
print("Hugging Face Transcript:", transcript_hf)""" | |
if __name__ == "__main__": | |
wandb.login(key=wandb_token) | |
run = wandb.init(project="DIT-Wav2Vec-Bert-Sentiment-Analysis-project") | |
bert_train_dataset = CustomBertDataset(TRAIN_DS_PATH, "fine-tune_raw") | |
bert_test_dataset = CustomBertDataset(TEST_DS_PATH, "test_raw") | |
print(f"Size of bert dataset : {len(bert_train_dataset)}") | |
"""train_dataset = Subset(our_bert_dataset, range(int(len(our_bert_dataset)*0.8))) | |
test_dataset = Subset(our_bert_dataset, range(int(len(our_bert_dataset)*0.8), len(our_bert_dataset)))""" | |
train_dataloader = DataLoader( | |
bert_train_dataset, batch_size=BATCH_SIZE, shuffle=True | |
) | |
test_dataloader = DataLoader( | |
bert_test_dataset, batch_size=BATCH_SIZE, shuffle=False | |
) | |
our_bert_model = CustomBertModel(bert_train_dataset.num_class) | |
our_bert_model = our_bert_model.to(device) | |
loss_fn = nn.CrossEntropyLoss() | |
optimizer = optim.SGD( | |
filter(lambda p: p.requires_grad, our_bert_model.parameters()), lr=0.01 | |
) | |
train_step(our_bert_model, train_dataloader, loss_fn, optimizer) | |
eval_step(test_dataloader, loss_fn, bert_train_dataset.num_class) | |
eval_pipeline_step(test_dataloader, loss_fn, bert_train_dataset.num_class) | |
test_inference_audio_path = "/content/dev_raw/id10012_0AXjxNXiEzo_00001.flac" | |
print(get_audio_sentiment(test_inference_audio_path)) | |