import os import struct from pathlib import Path from typing import Literal, Union import numpy as np import torch import lightgbm as lgb import torchaudio from huggingface_hub import hf_hub_download from joblib import dump, load from sklearn.exceptions import NotFittedError from torch import Tensor from torchaudio.transforms import Spectrogram import torch.nn.functional as F from datasets.formatting import query_table from datasets import Dataset import warnings warnings.filterwarnings("ignore") SR = 12000 class FastModel: """ A class designed for training and predicting using LightGBM, incorporating spectral and cepstral features. Workflow: 1. Batch Loading and Decoding: Load audio data in batches directly from a table and decode byte-encoded information. 2. Processing Audio: - Resampling, Padding, or Truncating: Adjust audio durations by padding, cutting, or resampling as needed. - Spectral and Cepstral Feature Extraction: - Compute the spectrogram for audio signals. - Focus on a selected frequency range (~50-1500 Hz) to derive the cepstrum, calculated as the FFT of the logarithm of the spectrogram. - Average both spectrogram and cepstral features over the time axis and combine them into a unified feature vector. 3. Model Application: Use the extracted features as input for the LightGBM model to perform predictions. Attributes ---------- audio_processing_params : dict Parameters for configuring audio processing. feature_params : dict Parameters for configuring the Spectrogram and Cepstrogram transformation. lgbm_params : dict, optional Parameters for configuring the LightGBM model. device : str Device used for computation ("cpu" or "cuda"). """ def __init__( self, audio_processing_params: dict, feature_params: dict, lgbm_params: dict, device: str = "cuda", ): self.audio_processing_params = audio_processing_params self.feature_params = feature_params self.lgbm_params = lgbm_params self.device = torch.device( "cuda" if device == "cuda" and torch.cuda.is_available() else "cpu" ) self.model = None # Initialize Spectrogram & Cepstrogram self.spectrogram_transformer = Spectrogram( n_fft=self.feature_params["n_fft"], hop_length=self.feature_params["hop_length"], pad=self.feature_params["pad"], window_fn=torch.hamming_window, power=self.feature_params["power"], pad_mode=self.feature_params["pad_mode"], onesided=True, center=False, ).to(self.device) self.f = torch.fft.rfftfreq(self.feature_params["n_fft"], d=1.0 / SR) self.ind_f_filtered = torch.tensor( (self.f > self.feature_params["f_min"]) & (self.f < self.feature_params["f_max"]), device=self.device, ) self.n_fft_cepstral = self.ind_f_filtered.sum() self.cepstral_transformer = Spectrogram( n_fft=self.n_fft_cepstral, hop_length=self.n_fft_cepstral, pad=0, window_fn=torch.hamming_window, power=self.feature_params["power"], pad_mode=self.feature_params["pad_mode"], onesided=True, center=False, ).to(self.device) self.cf = torch.fft.rfftfreq(self.n_fft_cepstral, d=0.5) self.ind_cf_filtered = torch.tensor( (self.cf > self.feature_params["fc_min"]) & (self.cf < self.feature_params["fc_max"]), device=self.device, ) def fit(self, dataset: Dataset, batch_size: int = 5000): """Trains a LightGBM model on features extracted from the dataset. Parameters ---------- dataset : Dataset Arrow Dataset object containing audio samples and their corresponding labels. batch_size : int, optional Number of audio samples per batch (default is 5000). Raises ------ ValueError If the dataset is empty or invalid. """ features, labels = [], [] for audio, label in self.batch_audio_loader( dataset, batch_size=batch_size, ): feature = self.get_features(audio) features.append(feature) labels.extend(label) x_train = torch.cat(features, dim=0) train_data = lgb.Dataset(x_train.cpu(), label=labels) self.model = lgb.train(self.lgbm_params, train_data) def predict(self, dataset: Dataset, get_proba: bool = False, batch_size: int = 5000): """Predicts labels or probabilities for a dataset using the trained model. Parameters ---------- dataset : Dataset The dataset containing audio data for prediction. get_proba : bool, optional If True, returns class probabilities rather than binary predictions (default is False). batch_size : int, optional Number of audio samples per batch (default is 5000). Returns ------- numpy.ndarray If `get_proba` is True, returns a 1D array of class probabilities. If `get_proba` is False, returns a 1D array of binary predictions (0 or 1). Raises ------ NotFittedError If the model is not yet trained. """ if not self.model: raise NotFittedError("LGBM model is not fitted yet.") features = [] for audio, _ in self.batch_audio_loader( dataset, batch_size=batch_size, ): feature = self.get_features(audio) features.append(feature) features = torch.cat(features, dim=0) torch.cuda.empty_cache() y_score = self.model.predict(features.cpu()) return y_score if get_proba else (y_score >= 0.5).astype(int) def get_features(self, audios: Tensor): """ Extracts features from raw audio using spectrogram and cepstrum transformations. Parameters ---------- audios : torch.Tensor A batch of audio waveforms as 2D tensors (n_audios, n_samples_per_audio). Returns ------- torch.Tensor Extracted features for the audio batch. Includes both cepstral and log-scaled spectrogram features. Raises ------ ValueError If the input audio tensor is empty or invalid. """ audios = audios.to(self.device) sxx = self.spectrogram_transformer(audios) # shape : (n_audios, n_f, n_blocks) sxx = torch.log10(torch.clamp(sxx.permute(0, 2, 1), min=1e-10)) cepstral_mat = self.cepstral_transformer(sxx[:, :, self.ind_f_filtered]).squeeze(dim=3)[ :, :, self.ind_cf_filtered ] return torch.cat( [ cepstral_mat.mean(dim=1), sxx.mean(dim=1), ], dim=1, ) def batch_audio_loader( self, dataset: Dataset, batch_size: int = 1, offset: int = 0, device="cpu" ): """Optimized loader for audio data from a dataset for training or inference in batches. Parameters ---------- dataset : Dataset The dataset containing audio samples and labels. waveform_duration : int, optional Desired duration of the audio waveforms in seconds (default is 3). batch_size : int, optional Number of audio samples per batch (default is 1). sr : int, optional Target sampling rate for audio processing (default is 12000). device : str, optional Device for processing ("cpu" or "cuda") (default is "cpu"). padding_method : str, optional Method to pad audio waveforms smaller than the desired size (e.g., "zero", "reflect"). offset : int, optional Number of samples to skip before processing the first audio sample (default is 0). Yields ------ tuple (Tensor, Tensor) A tuple (batch_audios, batch_labels), where: - batch_audios is a torch.tensor of processed audio waveforms. - batch_labels is a torch.tensor of corresponding audio labels. Raises ------ ValueError If an unsupported sampling rate is encountered in the dataset. """ def process_resampling(resample_buffer, resample_indices, batch_audios, sr, target_sr): if resample_buffer: resampler = torchaudio.transforms.Resample( orig_freq=sr, new_freq=target_sr, lowpass_filter_width=6 ) resampled = resampler(torch.stack(resample_buffer)) for idx, original_idx in enumerate(resample_indices): batch_audios[original_idx] = resampled[idx] # For readability sr = self.audio_processing_params["sample_rate"] waveform_duration = self.audio_processing_params["duration"] padding_method = self.audio_processing_params["padding_method"] device = torch.device( "cuda" if device == "cuda" and torch.cuda.is_available() else "cpu" ) batch_audios, batch_labels = [], [] resample_24000, resample_24000_indices = [], [] for i in range(len(dataset)): pa_subtable = query_table(dataset._data, i, indices=dataset._indices) wav_bytes = pa_subtable[0][0][0].as_py() sampling_rate = struct.unpack(" torch.Tensor: """ Applies padding to the waveform when its size is smaller than the desired output size. Parameters ---------- waveform : torch.Tensor Input 1D waveform tensor. output_size : int Desired output size after padding or truncation. padding_method : str, default="zero" Padding method to apply. Returns ------- torch.Tensor Padded or truncated waveform of size `output_size`. """ if waveform.size(0) >= output_size: return waveform[:output_size] total_pad = output_size - waveform.size(0) if padding_method == "zero": return F.pad(waveform, (0, total_pad), mode="constant", value=0) if padding_method in ["reflect", "replicate", "circular"]: # Pad not possible if waveform.size(0) < total_pad. if waveform.size(0) < total_pad: num_repeats = (total_pad // waveform.size(0)) + 1 waveform = torch.tile(waveform, (num_repeats,)) total_pad = output_size - waveform.size(0) return F.pad(waveform.unsqueeze(0), (0, total_pad), mode=padding_method).squeeze() raise ValueError(f"Invalid padding method: {padding_method}") class FastModelHuggingFace: """ Class for loading a FastModel instance from the Hugging Face Hub. Includes preprocessing pipelines and a LightGBM model. Attributes ---------- pipeline : object The serialized preprocessing pipeline. model : lgb.Booster The LightGBM model instance used for predictions. Methods ------- from_pretrained(repo_id: str, revision: str = "main", pipeline_file_name: str = "pipeline.joblib", model_file_name: str = "model_lightgbm.txt") -> "FastModelHuggingFace": Loads the FastModel pipeline and model from the Hugging Face Hub. predict(input_data: Union[str, "HuggingFaceDataset"], get_proba: bool = False) -> np.ndarray: Predicts labels or probabilities for a WAV file or dataset. """ def __init__(self, pipeline: object, lightgbm_model: lgb.Booster): """ Initializes a FastModelHuggingFace instance. Parameters ---------- pipeline : object The serialized preprocessing pipeline. lightgbm_model : lgb.Booster A LightGBM booster model for predictions. """ self.pipeline = pipeline self.model = lightgbm_model @classmethod def from_pretrained( cls, repo_id: str, revision: str = "main", pipeline_file_name: str = "pipeline.joblib", model_file_name: str = "model_lightgbm.txt", ) -> "FastModelHuggingFace": """ Loads the FastModel pipeline and LightGBM model from the Hugging Face Hub. Parameters ---------- repo_id : str The Hugging Face repository ID. revision : str, optional The specific revision of the repository to use (default is "main"). pipeline_file_name : str, optional The filename of the serialized pipeline (default is "pipeline.joblib"). model_file_name : str, optional The filename of the LightGBM model (default is "model_lightgbm.txt"). Returns ------- FastModelHuggingFace A FastModelHuggingFace instance with the loaded pipeline and model. Raises ------ FileNotFoundError If either the pipeline or LightGBM model files are missing or corrupted. """ pipeline_path = hf_hub_download(repo_id, filename=pipeline_file_name, revision=revision) model_lgbm_path = hf_hub_download(repo_id, filename=model_file_name, revision=revision) if not os.path.exists(pipeline_path): raise FileNotFoundError(f"Pipeline file {pipeline_path} is missing or corrupted.") pipeline = load(pipeline_path) if not os.path.exists(model_lgbm_path): raise FileNotFoundError( f"LightGBM model file {model_lgbm_path} is missing or corrupted." ) lightgbm_model = lgb.Booster(model_file=model_lgbm_path) return cls(pipeline=pipeline, lightgbm_model=lightgbm_model) def predict( self, input_data: Union[str, "HuggingFaceDataset"], get_proba: bool = False, batch_size: int = 5000, device: Literal["cpu", "cuda"] = "cuda", ) -> np.ndarray: """ Predicts labels or probabilities for a given audio input. Parameters ---------- input_data : Union[str, HuggingFaceDataset] The input for prediction, either the path to a WAV file or a Hugging Face dataset. get_proba : bool, optional If True, returns class probabilities instead of binary predictions (default is False). batch_size : int, optional Number of audio samples per batch (default is 5000). device : Literal["cpu", "cuda"] Returns ------- np.ndarray If `get_proba` is True, returns an array of probabilities. If `get_proba` is False, returns binary predictions. Raises ------ ValueError If the input data type is neither a WAV file path string nor a Hugging Face dataset. """ if isinstance(input_data, str): audio_waveform, sr = torchaudio.load(input_data) audio_waveform = audio_waveform.mean(dim=0) if sr != self.pipeline.audio_processing_params["sample_rate"]: resampler = torchaudio.transforms.Resample( orig_freq=sr, new_freq=self.pipeline.audio_processing_params["sample_rate"] ) audio_waveform = resampler(audio_waveform) features = self.pipeline.get_features(audio_waveform.unsqueeze(0).to(device)) predictions = self.model.predict(features.cpu().numpy()) return predictions if get_proba else (predictions >= 0.5).astype(int) elif hasattr(input_data, "_data"): features = [] for batch_audios, _ in self.pipeline.batch_audio_loader( input_data, batch_size=batch_size, device=device, ): batch_features = self.pipeline.get_features(batch_audios) features.append(batch_features) features = torch.cat(features, dim=0) predictions = self.model.predict(features.cpu().numpy()) return predictions if get_proba else (predictions >= 0.5).astype(int) else: raise ValueError("Input must be either a path to a WAV file or a Hugging Face Dataset.") def save_pipeline( model_class_instance: FastModel, path: str, lgbm_file_name: str = None, pipeline_file_name: str = None, ): """ Serializes the complete FastModel instance for saving. Parameters ---------- model_class_instance : FastModelHuggingFace The trained FastModel instance to serialize. path : str The directory to save the FastModel instance. lgbm_file_name : str, optional The filename for saving the LightGBM model (default is "model_fast_model.txt"). pipeline_file_name : str, optional The filename for saving the pipeline (default is "pipeline.joblib"). """ lgbm_file_name = lgbm_file_name or "model_lightgbm.txt" pipeline_file_name = pipeline_file_name or "pipeline.joblib" lightgbm_path = Path(path) / lgbm_file_name if model_class_instance.model: model_class_instance.model_file_name = str(lightgbm_path) model_class_instance.model.save_model(model_class_instance.model_file_name) pipeline_path = Path(path) / pipeline_file_name dump(model_class_instance, pipeline_path) def load_pipeline( path: str, lgbm_file_name: str = None, pipeline_file_name: str = None ) -> FastModelHuggingFace: """ Loads a serialized pipeline and LightGBM model. Parameters ---------- path : str The directory containing the serialized FastModel. lgbm_file_name : str, optional The filename for the LightGBM model (default is "model_fast_model.txt"). pipeline_file_name : str, optional The filename for the pipeline (default is "pipeline.joblib"). Returns ------- FastModelHuggingFace An instance of the loaded FastModel. Raises ------ FileNotFoundError If either the LightGBM model or pipeline file is not found. """ lgbm_file_name = lgbm_file_name or "model_fast_model.txt" pipeline_file_name = pipeline_file_name or "pipeline.joblib" pipeline_path = Path(path) / pipeline_file_name if not pipeline_path.exists(): raise FileNotFoundError(f"Pipeline file {pipeline_path} not found.") model_class_instance = load(pipeline_path) lightgbm_path = Path(path) / lgbm_file_name if not lightgbm_path.exists(): raise FileNotFoundError(f"LightGBM file {lightgbm_path} not found.") model_class_instance.model = lgb.Booster(model_file=str(lightgbm_path)) return model_class_instance