| import os |
| import random |
|
|
| import numpy as np |
| import torch |
| import torch.utils.data |
| from tqdm import tqdm |
|
|
| from . import spec_utils |
|
|
|
|
| class VocalRemoverValidationSet(torch.utils.data.Dataset): |
| def __init__(self, patch_list): |
| self.patch_list = patch_list |
|
|
| def __len__(self): |
| return len(self.patch_list) |
|
|
| def __getitem__(self, idx): |
| path = self.patch_list[idx] |
| data = np.load(path) |
|
|
| X, y = data["X"], data["y"] |
|
|
| X_mag = np.abs(X) |
| y_mag = np.abs(y) |
|
|
| return X_mag, y_mag |
|
|
|
|
| def make_pair(mix_dir, inst_dir): |
| input_exts = [".wav", ".m4a", ".mp3", ".mp4", ".flac"] |
|
|
| X_list = sorted( |
| [ |
| os.path.join(mix_dir, fname) |
| for fname in os.listdir(mix_dir) |
| if os.path.splitext(fname)[1] in input_exts |
| ] |
| ) |
| y_list = sorted( |
| [ |
| os.path.join(inst_dir, fname) |
| for fname in os.listdir(inst_dir) |
| if os.path.splitext(fname)[1] in input_exts |
| ] |
| ) |
|
|
| filelist = list(zip(X_list, y_list)) |
|
|
| return filelist |
|
|
|
|
| def train_val_split(dataset_dir, split_mode, val_rate, val_filelist): |
| if split_mode == "random": |
| filelist = make_pair( |
| os.path.join(dataset_dir, "mixtures"), |
| os.path.join(dataset_dir, "instruments"), |
| ) |
|
|
| random.shuffle(filelist) |
|
|
| if len(val_filelist) == 0: |
| val_size = int(len(filelist) * val_rate) |
| train_filelist = filelist[:-val_size] |
| val_filelist = filelist[-val_size:] |
| else: |
| train_filelist = [ |
| pair for pair in filelist if list(pair) not in val_filelist |
| ] |
| elif split_mode == "subdirs": |
| if len(val_filelist) != 0: |
| raise ValueError( |
| "The `val_filelist` option is not available in `subdirs` mode" |
| ) |
|
|
| train_filelist = make_pair( |
| os.path.join(dataset_dir, "training/mixtures"), |
| os.path.join(dataset_dir, "training/instruments"), |
| ) |
|
|
| val_filelist = make_pair( |
| os.path.join(dataset_dir, "validation/mixtures"), |
| os.path.join(dataset_dir, "validation/instruments"), |
| ) |
|
|
| return train_filelist, val_filelist |
|
|
|
|
| def augment(X, y, reduction_rate, reduction_mask, mixup_rate, mixup_alpha): |
| perm = np.random.permutation(len(X)) |
| for i, idx in enumerate(tqdm(perm)): |
| if np.random.uniform() < reduction_rate: |
| y[idx] = spec_utils.reduce_vocal_aggressively( |
| X[idx], y[idx], reduction_mask |
| ) |
|
|
| if np.random.uniform() < 0.5: |
| |
| X[idx] = X[idx, ::-1] |
| y[idx] = y[idx, ::-1] |
| if np.random.uniform() < 0.02: |
| |
| X[idx] = X[idx].mean(axis=0, keepdims=True) |
| y[idx] = y[idx].mean(axis=0, keepdims=True) |
| if np.random.uniform() < 0.02: |
| |
| X[idx] = y[idx] |
|
|
| if np.random.uniform() < mixup_rate and i < len(perm) - 1: |
| lam = np.random.beta(mixup_alpha, mixup_alpha) |
| X[idx] = lam * X[idx] + (1 - lam) * X[perm[i + 1]] |
| y[idx] = lam * y[idx] + (1 - lam) * y[perm[i + 1]] |
|
|
| return X, y |
|
|
|
|
| def make_padding(width, cropsize, offset): |
| left = offset |
| roi_size = cropsize - left * 2 |
| if roi_size == 0: |
| roi_size = cropsize |
| right = roi_size - (width % roi_size) + left |
|
|
| return left, right, roi_size |
|
|
|
|
| def make_training_set(filelist, cropsize, patches, sr, hop_length, n_fft, offset): |
| len_dataset = patches * len(filelist) |
|
|
| X_dataset = np.zeros((len_dataset, 2, n_fft // 2 + 1, cropsize), dtype=np.complex64) |
| y_dataset = np.zeros((len_dataset, 2, n_fft // 2 + 1, cropsize), dtype=np.complex64) |
|
|
| for i, (X_path, y_path) in enumerate(tqdm(filelist)): |
| X, y = spec_utils.cache_or_load(X_path, y_path, sr, hop_length, n_fft) |
| coef = np.max([np.abs(X).max(), np.abs(y).max()]) |
| X, y = X / coef, y / coef |
|
|
| l, r, roi_size = make_padding(X.shape[2], cropsize, offset) |
| X_pad = np.pad(X, ((0, 0), (0, 0), (l, r)), mode="constant") |
| y_pad = np.pad(y, ((0, 0), (0, 0), (l, r)), mode="constant") |
|
|
| starts = np.random.randint(0, X_pad.shape[2] - cropsize, patches) |
| ends = starts + cropsize |
| for j in range(patches): |
| idx = i * patches + j |
| X_dataset[idx] = X_pad[:, :, starts[j] : ends[j]] |
| y_dataset[idx] = y_pad[:, :, starts[j] : ends[j]] |
|
|
| return X_dataset, y_dataset |
|
|
|
|
| def make_validation_set(filelist, cropsize, sr, hop_length, n_fft, offset): |
| patch_list = [] |
| patch_dir = "cs{}_sr{}_hl{}_nf{}_of{}".format( |
| cropsize, sr, hop_length, n_fft, offset |
| ) |
| os.makedirs(patch_dir, exist_ok=True) |
|
|
| for i, (X_path, y_path) in enumerate(tqdm(filelist)): |
| basename = os.path.splitext(os.path.basename(X_path))[0] |
|
|
| X, y = spec_utils.cache_or_load(X_path, y_path, sr, hop_length, n_fft) |
| coef = np.max([np.abs(X).max(), np.abs(y).max()]) |
| X, y = X / coef, y / coef |
|
|
| l, r, roi_size = make_padding(X.shape[2], cropsize, offset) |
| X_pad = np.pad(X, ((0, 0), (0, 0), (l, r)), mode="constant") |
| y_pad = np.pad(y, ((0, 0), (0, 0), (l, r)), mode="constant") |
|
|
| len_dataset = int(np.ceil(X.shape[2] / roi_size)) |
| for j in range(len_dataset): |
| outpath = os.path.join(patch_dir, "{}_p{}.npz".format(basename, j)) |
| start = j * roi_size |
| if not os.path.exists(outpath): |
| np.savez( |
| outpath, |
| X=X_pad[:, :, start : start + cropsize], |
| y=y_pad[:, :, start : start + cropsize], |
| ) |
| patch_list.append(outpath) |
|
|
| return VocalRemoverValidationSet(patch_list) |
|
|