|
import os |
|
import struct |
|
from pathlib import Path |
|
from typing import Literal, Union |
|
|
|
import numpy as np |
|
import torch |
|
import lightgbm as lgb |
|
import torchaudio |
|
from huggingface_hub import hf_hub_download |
|
from joblib import dump, load |
|
from sklearn.exceptions import NotFittedError |
|
from torch import Tensor |
|
from torchaudio.transforms import Spectrogram |
|
import torch.nn.functional as F |
|
from datasets.formatting import query_table |
|
from datasets import Dataset |
|
import warnings |
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
SR = 12000 |
|
|
|
|
|
class FastModel: |
|
""" |
|
A class designed for training and predicting using LightGBM, incorporating spectral and cepstral features. |
|
|
|
Workflow: |
|
1. Batch Loading and Decoding: |
|
Load audio data in batches directly from a table and decode byte-encoded information. |
|
|
|
2. Processing Audio: |
|
- Resampling, Padding, or Truncating: |
|
Adjust audio durations by padding, cutting, or resampling as needed. |
|
- Spectral and Cepstral Feature Extraction: |
|
- Compute the spectrogram for audio signals. |
|
- Focus on a selected frequency range (~50-1500 Hz) to derive the cepstrum, calculated as the FFT of the logarithm of the spectrogram. |
|
- Average both spectrogram and cepstral features over the time axis and combine them into a unified feature vector. |
|
|
|
3. Model Application: |
|
Use the extracted features as input for the LightGBM model to perform predictions. |
|
|
|
Attributes |
|
---------- |
|
audio_processing_params : dict |
|
Parameters for configuring audio processing. |
|
feature_params : dict |
|
Parameters for configuring the Spectrogram and Cepstrogram transformation. |
|
lgbm_params : dict, optional |
|
Parameters for configuring the LightGBM model. |
|
device : str |
|
Device used for computation ("cpu" or "cuda"). |
|
""" |
|
|
|
def __init__( |
|
self, |
|
audio_processing_params: dict, |
|
feature_params: dict, |
|
lgbm_params: dict, |
|
device: str = "cuda", |
|
): |
|
self.audio_processing_params = audio_processing_params |
|
self.feature_params = feature_params |
|
self.lgbm_params = lgbm_params |
|
self.device = torch.device( |
|
"cuda" if device == "cuda" and torch.cuda.is_available() else "cpu" |
|
) |
|
self.model = None |
|
|
|
|
|
self.spectrogram_transformer = Spectrogram( |
|
n_fft=self.feature_params["n_fft"], |
|
hop_length=self.feature_params["hop_length"], |
|
pad=self.feature_params["pad"], |
|
window_fn=torch.hamming_window, |
|
power=self.feature_params["power"], |
|
pad_mode=self.feature_params["pad_mode"], |
|
onesided=True, |
|
center=False, |
|
).to(self.device) |
|
self.f = torch.fft.rfftfreq(self.feature_params["n_fft"], d=1.0 / SR) |
|
self.ind_f_filtered = torch.tensor( |
|
(self.f > self.feature_params["f_min"]) & (self.f < self.feature_params["f_max"]), |
|
device=self.device, |
|
) |
|
self.n_fft_cepstral = self.ind_f_filtered.sum() |
|
self.cepstral_transformer = Spectrogram( |
|
n_fft=self.n_fft_cepstral, |
|
hop_length=self.n_fft_cepstral, |
|
pad=0, |
|
window_fn=torch.hamming_window, |
|
power=self.feature_params["power"], |
|
pad_mode=self.feature_params["pad_mode"], |
|
onesided=True, |
|
center=False, |
|
).to(self.device) |
|
self.cf = torch.fft.rfftfreq(self.n_fft_cepstral, d=0.5) |
|
self.ind_cf_filtered = torch.tensor( |
|
(self.cf > self.feature_params["fc_min"]) & (self.cf < self.feature_params["fc_max"]), |
|
device=self.device, |
|
) |
|
|
|
def fit(self, dataset: Dataset, batch_size: int = 5000): |
|
"""Trains a LightGBM model on features extracted from the dataset. |
|
|
|
Parameters |
|
---------- |
|
dataset : Dataset |
|
Arrow Dataset object containing audio samples and their corresponding labels. |
|
batch_size : int, optional |
|
Number of audio samples per batch (default is 5000). |
|
|
|
Raises |
|
------ |
|
ValueError |
|
If the dataset is empty or invalid. |
|
""" |
|
features, labels = [], [] |
|
for audio, label in self.batch_audio_loader( |
|
dataset, |
|
batch_size=batch_size, |
|
): |
|
feature = self.get_features(audio) |
|
features.append(feature) |
|
labels.extend(label) |
|
x_train = torch.cat(features, dim=0) |
|
train_data = lgb.Dataset(x_train.cpu(), label=labels) |
|
self.model = lgb.train(self.lgbm_params, train_data) |
|
|
|
def predict(self, dataset: Dataset, get_proba: bool = False, batch_size: int = 5000): |
|
"""Predicts labels or probabilities for a dataset using the trained model. |
|
|
|
Parameters |
|
---------- |
|
dataset : Dataset |
|
The dataset containing audio data for prediction. |
|
get_proba : bool, optional |
|
If True, returns class probabilities rather than binary predictions (default is False). |
|
batch_size : int, optional |
|
Number of audio samples per batch (default is 5000). |
|
|
|
Returns |
|
------- |
|
numpy.ndarray |
|
If `get_proba` is True, returns a 1D array of class probabilities. |
|
If `get_proba` is False, returns a 1D array of binary predictions (0 or 1). |
|
|
|
Raises |
|
------ |
|
NotFittedError |
|
If the model is not yet trained. |
|
""" |
|
if not self.model: |
|
raise NotFittedError("LGBM model is not fitted yet.") |
|
features = [] |
|
for audio, _ in self.batch_audio_loader( |
|
dataset, |
|
batch_size=batch_size, |
|
): |
|
feature = self.get_features(audio) |
|
features.append(feature) |
|
features = torch.cat(features, dim=0) |
|
torch.cuda.empty_cache() |
|
|
|
y_score = self.model.predict(features.cpu()) |
|
|
|
return y_score if get_proba else (y_score >= 0.5).astype(int) |
|
|
|
def get_features(self, audios: Tensor): |
|
""" |
|
Extracts features from raw audio using spectrogram and cepstrum transformations. |
|
|
|
Parameters |
|
---------- |
|
audios : torch.Tensor |
|
A batch of audio waveforms as 2D tensors (n_audios, n_samples_per_audio). |
|
|
|
Returns |
|
------- |
|
torch.Tensor |
|
Extracted features for the audio batch. Includes both cepstral and log-scaled spectrogram features. |
|
|
|
Raises |
|
------ |
|
ValueError |
|
If the input audio tensor is empty or invalid. |
|
""" |
|
audios = audios.to(self.device) |
|
sxx = self.spectrogram_transformer(audios) |
|
sxx = torch.log10(torch.clamp(sxx.permute(0, 2, 1), min=1e-10)) |
|
cepstral_mat = self.cepstral_transformer(sxx[:, :, self.ind_f_filtered]).squeeze(dim=3)[ |
|
:, :, self.ind_cf_filtered |
|
] |
|
|
|
return torch.cat( |
|
[ |
|
cepstral_mat.mean(dim=1), |
|
sxx.mean(dim=1), |
|
], |
|
dim=1, |
|
) |
|
|
|
def batch_audio_loader( |
|
self, dataset: Dataset, batch_size: int = 1, offset: int = 0, device="cpu" |
|
): |
|
"""Optimized loader for audio data from a dataset for training or inference in batches. |
|
|
|
Parameters |
|
---------- |
|
dataset : Dataset |
|
The dataset containing audio samples and labels. |
|
waveform_duration : int, optional |
|
Desired duration of the audio waveforms in seconds (default is 3). |
|
batch_size : int, optional |
|
Number of audio samples per batch (default is 1). |
|
sr : int, optional |
|
Target sampling rate for audio processing (default is 12000). |
|
device : str, optional |
|
Device for processing ("cpu" or "cuda") (default is "cpu"). |
|
padding_method : str, optional |
|
Method to pad audio waveforms smaller than the desired size (e.g., "zero", "reflect"). |
|
offset : int, optional |
|
Number of samples to skip before processing the first audio sample (default is 0). |
|
|
|
Yields |
|
------ |
|
tuple (Tensor, Tensor) |
|
A tuple (batch_audios, batch_labels), where: |
|
- batch_audios is a torch.tensor of processed audio waveforms. |
|
- batch_labels is a torch.tensor of corresponding audio labels. |
|
|
|
Raises |
|
------ |
|
ValueError |
|
If an unsupported sampling rate is encountered in the dataset. |
|
""" |
|
|
|
def process_resampling(resample_buffer, resample_indices, batch_audios, sr, target_sr): |
|
if resample_buffer: |
|
resampler = torchaudio.transforms.Resample( |
|
orig_freq=sr, new_freq=target_sr, lowpass_filter_width=6 |
|
) |
|
resampled = resampler(torch.stack(resample_buffer)) |
|
for idx, original_idx in enumerate(resample_indices): |
|
batch_audios[original_idx] = resampled[idx] |
|
|
|
|
|
sr = self.audio_processing_params["sample_rate"] |
|
waveform_duration = self.audio_processing_params["duration"] |
|
padding_method = self.audio_processing_params["padding_method"] |
|
|
|
device = torch.device( |
|
"cuda" if device == "cuda" and torch.cuda.is_available() else "cpu" |
|
) |
|
batch_audios, batch_labels = [], [] |
|
resample_24000, resample_24000_indices = [], [] |
|
|
|
for i in range(len(dataset)): |
|
pa_subtable = query_table(dataset._data, i, indices=dataset._indices) |
|
wav_bytes = pa_subtable[0][0][0].as_py() |
|
sampling_rate = struct.unpack("<I", wav_bytes[24:28])[0] |
|
|
|
if sampling_rate not in [sr, sr * 2]: |
|
raise ValueError( |
|
f"Unsupported sampling rate: {sampling_rate}Hz. Only {sr}Hz and {sr * 2}Hz are allowed." |
|
) |
|
|
|
data_size = struct.unpack("<I", wav_bytes[40:44])[0] // 2 |
|
if data_size == 0: |
|
batch_audios.append(torch.zeros(int(waveform_duration * SR))) |
|
else: |
|
try: |
|
waveform = ( |
|
torch.frombuffer(wav_bytes[44:], dtype=torch.int16, offset=offset)[ |
|
: int(waveform_duration * sampling_rate) |
|
].float() |
|
/ 32767 |
|
) |
|
except Exception as e: |
|
continue |
|
waveform = apply_padding( |
|
waveform, int(waveform_duration * sampling_rate), padding_method |
|
) |
|
|
|
if sampling_rate == sr: |
|
batch_audios.append(waveform) |
|
elif sampling_rate == 2 * sr: |
|
resample_24000.append(waveform) |
|
resample_24000_indices.append(len(batch_audios)) |
|
batch_audios.append(None) |
|
|
|
batch_labels.append(pa_subtable[1][0].as_py()) |
|
|
|
if len(batch_audios) == batch_size: |
|
|
|
process_resampling(resample_24000, resample_24000_indices, batch_audios, sr * 2, SR) |
|
|
|
batch_audios_on_device = torch.stack(batch_audios).to(device) |
|
batch_labels_on_device = torch.tensor(batch_labels).to(device) |
|
|
|
yield batch_audios_on_device, batch_labels_on_device |
|
|
|
batch_audios, batch_labels = [], [] |
|
resample_24000, resample_24000_indices = [], [] |
|
|
|
if batch_audios: |
|
process_resampling(resample_24000, resample_24000_indices, batch_audios, sr * 2, SR) |
|
batch_audios_on_device = torch.stack(batch_audios).to(device) |
|
batch_labels_on_device = torch.tensor(batch_labels).to(device) |
|
|
|
yield batch_audios_on_device, batch_labels_on_device |
|
|
|
|
|
def apply_padding( |
|
waveform: torch.Tensor, |
|
output_size: int, |
|
padding_method: Literal["zero", "reflect", "replicate", "circular"] = "zero", |
|
) -> torch.Tensor: |
|
""" |
|
Applies padding to the waveform when its size is smaller than the desired output size. |
|
|
|
Parameters |
|
---------- |
|
waveform : torch.Tensor |
|
Input 1D waveform tensor. |
|
output_size : int |
|
Desired output size after padding or truncation. |
|
padding_method : str, default="zero" |
|
Padding method to apply. |
|
|
|
Returns |
|
------- |
|
torch.Tensor |
|
Padded or truncated waveform of size `output_size`. |
|
""" |
|
if waveform.size(0) >= output_size: |
|
return waveform[:output_size] |
|
|
|
total_pad = output_size - waveform.size(0) |
|
if padding_method == "zero": |
|
return F.pad(waveform, (0, total_pad), mode="constant", value=0) |
|
if padding_method in ["reflect", "replicate", "circular"]: |
|
|
|
if waveform.size(0) < total_pad: |
|
num_repeats = (total_pad // waveform.size(0)) + 1 |
|
waveform = torch.tile(waveform, (num_repeats,)) |
|
total_pad = output_size - waveform.size(0) |
|
|
|
return F.pad(waveform.unsqueeze(0), (0, total_pad), mode=padding_method).squeeze() |
|
raise ValueError(f"Invalid padding method: {padding_method}") |
|
|
|
|
|
class FastModelHuggingFace: |
|
""" |
|
Class for loading a FastModel instance from the Hugging Face Hub. |
|
Includes preprocessing pipelines and a LightGBM model. |
|
|
|
Attributes |
|
---------- |
|
pipeline : object |
|
The serialized preprocessing pipeline. |
|
model : lgb.Booster |
|
The LightGBM model instance used for predictions. |
|
|
|
Methods |
|
------- |
|
from_pretrained(repo_id: str, revision: str = "main", |
|
pipeline_file_name: str = "pipeline.joblib", |
|
model_file_name: str = "model_lightgbm.txt") -> "FastModelHuggingFace": |
|
Loads the FastModel pipeline and model from the Hugging Face Hub. |
|
predict(input_data: Union[str, "HuggingFaceDataset"], get_proba: bool = False) -> np.ndarray: |
|
Predicts labels or probabilities for a WAV file or dataset. |
|
""" |
|
|
|
def __init__(self, pipeline: object, lightgbm_model: lgb.Booster): |
|
""" |
|
Initializes a FastModelHuggingFace instance. |
|
|
|
Parameters |
|
---------- |
|
pipeline : object |
|
The serialized preprocessing pipeline. |
|
lightgbm_model : lgb.Booster |
|
A LightGBM booster model for predictions. |
|
""" |
|
self.pipeline = pipeline |
|
self.model = lightgbm_model |
|
|
|
@classmethod |
|
def from_pretrained( |
|
cls, |
|
repo_id: str, |
|
revision: str = "main", |
|
pipeline_file_name: str = "pipeline.joblib", |
|
model_file_name: str = "model_lightgbm.txt", |
|
) -> "FastModelHuggingFace": |
|
""" |
|
Loads the FastModel pipeline and LightGBM model from the Hugging Face Hub. |
|
|
|
Parameters |
|
---------- |
|
repo_id : str |
|
The Hugging Face repository ID. |
|
revision : str, optional |
|
The specific revision of the repository to use (default is "main"). |
|
pipeline_file_name : str, optional |
|
The filename of the serialized pipeline (default is "pipeline.joblib"). |
|
model_file_name : str, optional |
|
The filename of the LightGBM model (default is "model_lightgbm.txt"). |
|
|
|
Returns |
|
------- |
|
FastModelHuggingFace |
|
A FastModelHuggingFace instance with the loaded pipeline and model. |
|
|
|
Raises |
|
------ |
|
FileNotFoundError |
|
If either the pipeline or LightGBM model files are missing or corrupted. |
|
""" |
|
pipeline_path = hf_hub_download(repo_id, filename=pipeline_file_name, revision=revision) |
|
model_lgbm_path = hf_hub_download(repo_id, filename=model_file_name, revision=revision) |
|
|
|
if not os.path.exists(pipeline_path): |
|
raise FileNotFoundError(f"Pipeline file {pipeline_path} is missing or corrupted.") |
|
pipeline = load(pipeline_path) |
|
|
|
if not os.path.exists(model_lgbm_path): |
|
raise FileNotFoundError( |
|
f"LightGBM model file {model_lgbm_path} is missing or corrupted." |
|
) |
|
lightgbm_model = lgb.Booster(model_file=model_lgbm_path) |
|
|
|
return cls(pipeline=pipeline, lightgbm_model=lightgbm_model) |
|
|
|
def predict( |
|
self, |
|
input_data: Union[str, "HuggingFaceDataset"], |
|
get_proba: bool = False, |
|
batch_size: int = 5000, |
|
device: Literal["cpu", "cuda"] = "cuda", |
|
) -> np.ndarray: |
|
""" |
|
Predicts labels or probabilities for a given audio input. |
|
|
|
Parameters |
|
---------- |
|
input_data : Union[str, HuggingFaceDataset] |
|
The input for prediction, either the path to a WAV file or a Hugging Face dataset. |
|
get_proba : bool, optional |
|
If True, returns class probabilities instead of binary predictions (default is False). |
|
batch_size : int, optional |
|
Number of audio samples per batch (default is 5000). |
|
device : Literal["cpu", "cuda"] |
|
|
|
Returns |
|
------- |
|
np.ndarray |
|
If `get_proba` is True, returns an array of probabilities. |
|
If `get_proba` is False, returns binary predictions. |
|
|
|
Raises |
|
------ |
|
ValueError |
|
If the input data type is neither a WAV file path string nor a Hugging Face dataset. |
|
""" |
|
if isinstance(input_data, str): |
|
audio_waveform, sr = torchaudio.load(input_data) |
|
audio_waveform = audio_waveform.mean(dim=0) |
|
if sr != self.pipeline.audio_processing_params["sample_rate"]: |
|
resampler = torchaudio.transforms.Resample( |
|
orig_freq=sr, new_freq=self.pipeline.audio_processing_params["sample_rate"] |
|
) |
|
audio_waveform = resampler(audio_waveform) |
|
features = self.pipeline.get_features(audio_waveform.unsqueeze(0).to(device)) |
|
predictions = self.model.predict(features.cpu().numpy()) |
|
return predictions if get_proba else (predictions >= 0.5).astype(int) |
|
|
|
elif hasattr(input_data, "_data"): |
|
features = [] |
|
for batch_audios, _ in self.pipeline.batch_audio_loader( |
|
input_data, |
|
batch_size=batch_size, |
|
device=device, |
|
): |
|
batch_features = self.pipeline.get_features(batch_audios) |
|
features.append(batch_features) |
|
features = torch.cat(features, dim=0) |
|
predictions = self.model.predict(features.cpu().numpy()) |
|
return predictions if get_proba else (predictions >= 0.5).astype(int) |
|
else: |
|
raise ValueError("Input must be either a path to a WAV file or a Hugging Face Dataset.") |
|
|
|
|
|
def save_pipeline( |
|
model_class_instance: FastModel, |
|
path: str, |
|
lgbm_file_name: str = None, |
|
pipeline_file_name: str = None, |
|
): |
|
""" |
|
Serializes the complete FastModel instance for saving. |
|
|
|
Parameters |
|
---------- |
|
model_class_instance : FastModelHuggingFace |
|
The trained FastModel instance to serialize. |
|
path : str |
|
The directory to save the FastModel instance. |
|
lgbm_file_name : str, optional |
|
The filename for saving the LightGBM model (default is "model_fast_model.txt"). |
|
pipeline_file_name : str, optional |
|
The filename for saving the pipeline (default is "pipeline.joblib"). |
|
""" |
|
lgbm_file_name = lgbm_file_name or "model_lightgbm.txt" |
|
pipeline_file_name = pipeline_file_name or "pipeline.joblib" |
|
|
|
lightgbm_path = Path(path) / lgbm_file_name |
|
if model_class_instance.model: |
|
model_class_instance.model_file_name = str(lightgbm_path) |
|
model_class_instance.model.save_model(model_class_instance.model_file_name) |
|
|
|
pipeline_path = Path(path) / pipeline_file_name |
|
dump(model_class_instance, pipeline_path) |
|
|
|
|
|
def load_pipeline( |
|
path: str, lgbm_file_name: str = None, pipeline_file_name: str = None |
|
) -> FastModelHuggingFace: |
|
""" |
|
Loads a serialized pipeline and LightGBM model. |
|
|
|
Parameters |
|
---------- |
|
path : str |
|
The directory containing the serialized FastModel. |
|
lgbm_file_name : str, optional |
|
The filename for the LightGBM model (default is "model_fast_model.txt"). |
|
pipeline_file_name : str, optional |
|
The filename for the pipeline (default is "pipeline.joblib"). |
|
|
|
Returns |
|
------- |
|
FastModelHuggingFace |
|
An instance of the loaded FastModel. |
|
|
|
Raises |
|
------ |
|
FileNotFoundError |
|
If either the LightGBM model or pipeline file is not found. |
|
""" |
|
lgbm_file_name = lgbm_file_name or "model_fast_model.txt" |
|
pipeline_file_name = pipeline_file_name or "pipeline.joblib" |
|
|
|
pipeline_path = Path(path) / pipeline_file_name |
|
if not pipeline_path.exists(): |
|
raise FileNotFoundError(f"Pipeline file {pipeline_path} not found.") |
|
|
|
model_class_instance = load(pipeline_path) |
|
|
|
lightgbm_path = Path(path) / lgbm_file_name |
|
if not lightgbm_path.exists(): |
|
raise FileNotFoundError(f"LightGBM file {lightgbm_path} not found.") |
|
model_class_instance.model = lgb.Booster(model_file=str(lightgbm_path)) |
|
|
|
return model_class_instance |
|
|