Spaces:
Runtime error
Runtime error
| import torch | |
| import torchaudio | |
| import torch.nn.functional as F | |
| import numpy as np | |
| from scipy.signal import get_window | |
| import librosa.util as librosa_util | |
| from librosa.util import pad_center, tiny | |
| from librosa.filters import mel as librosa_mel_fn | |
| import io | |
| # spectrogram to mel | |
| class STFT(torch.nn.Module): | |
| """adapted from Prem Seetharaman's https://github.com/pseeth/pytorch-stft""" | |
| def __init__(self, filter_length, hop_length, win_length, window="hann"): | |
| super(STFT, self).__init__() | |
| self.filter_length = filter_length | |
| self.hop_length = hop_length | |
| self.win_length = win_length | |
| self.window = window | |
| self.forward_transform = None | |
| scale = self.filter_length / self.hop_length | |
| fourier_basis = np.fft.fft(np.eye(self.filter_length)) | |
| cutoff = int((self.filter_length / 2 + 1)) | |
| fourier_basis = np.vstack( | |
| [np.real(fourier_basis[:cutoff, :]), np.imag(fourier_basis[:cutoff, :])] | |
| ) | |
| forward_basis = torch.FloatTensor(fourier_basis[:, None, :]) | |
| inverse_basis = torch.FloatTensor( | |
| np.linalg.pinv(scale * fourier_basis).T[:, None, :] | |
| ) | |
| if window is not None: | |
| assert filter_length >= win_length | |
| # get window and zero center pad it to filter_length | |
| fft_window = get_window(window, win_length, fftbins=True) | |
| fft_window = pad_center(fft_window, filter_length) | |
| fft_window = torch.from_numpy(fft_window).float() | |
| # window the bases | |
| forward_basis *= fft_window | |
| inverse_basis *= fft_window | |
| self.register_buffer("forward_basis", forward_basis.float()) | |
| self.register_buffer("inverse_basis", inverse_basis.float()) | |
| def transform(self, input_data): | |
| num_batches = input_data.size(0) | |
| num_samples = input_data.size(1) | |
| self.num_samples = num_samples | |
| # similar to librosa, reflect-pad the input | |
| input_data = input_data.view(num_batches, 1, num_samples) | |
| input_data = F.pad( | |
| input_data.unsqueeze(1), | |
| (int(self.filter_length / 2), int(self.filter_length / 2), 0, 0), | |
| mode="reflect", | |
| ) | |
| input_data = input_data.squeeze(1) | |
| forward_transform = F.conv1d( | |
| input_data, | |
| torch.autograd.Variable(self.forward_basis, requires_grad=False), | |
| stride=self.hop_length, | |
| padding=0, | |
| ).cpu() | |
| cutoff = int((self.filter_length / 2) + 1) | |
| real_part = forward_transform[:, :cutoff, :] | |
| imag_part = forward_transform[:, cutoff:, :] | |
| magnitude = torch.sqrt(real_part**2 + imag_part**2) | |
| phase = torch.autograd.Variable(torch.atan2(imag_part.data, real_part.data)) | |
| return magnitude, phase | |
| def inverse(self, magnitude, phase): | |
| recombine_magnitude_phase = torch.cat( | |
| [magnitude * torch.cos(phase), magnitude * torch.sin(phase)], dim=1 | |
| ) | |
| inverse_transform = F.conv_transpose1d( | |
| recombine_magnitude_phase, | |
| torch.autograd.Variable(self.inverse_basis, requires_grad=False), | |
| stride=self.hop_length, | |
| padding=0, | |
| ) | |
| if self.window is not None: | |
| window_sum = window_sumsquare( | |
| self.window, | |
| magnitude.size(-1), | |
| hop_length=self.hop_length, | |
| win_length=self.win_length, | |
| n_fft=self.filter_length, | |
| dtype=np.float32, | |
| ) | |
| # remove modulation effects | |
| approx_nonzero_indices = torch.from_numpy( | |
| np.where(window_sum > tiny(window_sum))[0] | |
| ) | |
| window_sum = torch.autograd.Variable( | |
| torch.from_numpy(window_sum), requires_grad=False | |
| ) | |
| window_sum = window_sum | |
| inverse_transform[:, :, approx_nonzero_indices] /= window_sum[ | |
| approx_nonzero_indices | |
| ] | |
| # scale by hop ratio | |
| inverse_transform *= float(self.filter_length) / self.hop_length | |
| inverse_transform = inverse_transform[:, :, int(self.filter_length / 2) :] | |
| inverse_transform = inverse_transform[:, :, : -int(self.filter_length / 2) :] | |
| return inverse_transform | |
| def forward(self, input_data): | |
| self.magnitude, self.phase = self.transform(input_data) | |
| reconstruction = self.inverse(self.magnitude, self.phase) | |
| return reconstruction | |
| def window_sumsquare( | |
| window, | |
| n_frames, | |
| hop_length, | |
| win_length, | |
| n_fft, | |
| dtype=np.float32, | |
| norm=None, | |
| ): | |
| """ | |
| # from librosa 0.6 | |
| Compute the sum-square envelope of a window function at a given hop length. | |
| This is used to estimate modulation effects induced by windowing | |
| observations in short-time fourier transforms. | |
| Parameters | |
| ---------- | |
| window : string, tuple, number, callable, or list-like | |
| Window specification, as in `get_window` | |
| n_frames : int > 0 | |
| The number of analysis frames | |
| hop_length : int > 0 | |
| The number of samples to advance between frames | |
| win_length : [optional] | |
| The length of the window function. By default, this matches `n_fft`. | |
| n_fft : int > 0 | |
| The length of each analysis frame. | |
| dtype : np.dtype | |
| The data type of the output | |
| Returns | |
| ------- | |
| wss : np.ndarray, shape=`(n_fft + hop_length * (n_frames - 1))` | |
| The sum-squared envelope of the window function | |
| """ | |
| if win_length is None: | |
| win_length = n_fft | |
| n = n_fft + hop_length * (n_frames - 1) | |
| x = np.zeros(n, dtype=dtype) | |
| # Compute the squared window at the desired length | |
| win_sq = get_window(window, win_length, fftbins=True) | |
| win_sq = librosa_util.normalize(win_sq, norm=norm) ** 2 | |
| win_sq = librosa_util.pad_center(win_sq, n_fft) | |
| # Fill the envelope | |
| for i in range(n_frames): | |
| sample = i * hop_length | |
| x[sample : min(n, sample + n_fft)] += win_sq[: max(0, min(n_fft, n - sample))] | |
| return x | |
| def griffin_lim(magnitudes, stft_fn, n_iters=30): | |
| """ | |
| PARAMS | |
| ------ | |
| magnitudes: spectrogram magnitudes | |
| stft_fn: STFT class with transform (STFT) and inverse (ISTFT) methods | |
| """ | |
| angles = np.angle(np.exp(2j * np.pi * np.random.rand(*magnitudes.size()))) | |
| angles = angles.astype(np.float32) | |
| angles = torch.autograd.Variable(torch.from_numpy(angles)) | |
| signal = stft_fn.inverse(magnitudes, angles).squeeze(1) | |
| for i in range(n_iters): | |
| _, angles = stft_fn.transform(signal) | |
| signal = stft_fn.inverse(magnitudes, angles).squeeze(1) | |
| return signal | |
| def dynamic_range_compression(x, normalize_fun=torch.log, C=1, clip_val=1e-5): | |
| """ | |
| PARAMS | |
| ------ | |
| C: compression factor | |
| """ | |
| return normalize_fun(torch.clamp(x, min=clip_val) * C) | |
| def dynamic_range_decompression(x, C=1): | |
| """ | |
| PARAMS | |
| ------ | |
| C: compression factor used to compress | |
| """ | |
| return torch.exp(x) / C | |
| class TacotronSTFT(torch.nn.Module): | |
| def __init__( | |
| self, | |
| filter_length, | |
| hop_length, | |
| win_length, | |
| n_mel_channels, | |
| sampling_rate, | |
| mel_fmin, | |
| mel_fmax, | |
| ): | |
| super(TacotronSTFT, self).__init__() | |
| self.n_mel_channels = n_mel_channels | |
| self.sampling_rate = sampling_rate | |
| self.stft_fn = STFT(filter_length, hop_length, win_length) | |
| mel_basis = librosa_mel_fn( | |
| sampling_rate, filter_length, n_mel_channels, mel_fmin, mel_fmax | |
| ) | |
| mel_basis = torch.from_numpy(mel_basis).float() | |
| self.register_buffer("mel_basis", mel_basis) | |
| def spectral_normalize(self, magnitudes, normalize_fun): | |
| output = dynamic_range_compression(magnitudes, normalize_fun) | |
| return output | |
| def spectral_de_normalize(self, magnitudes): | |
| output = dynamic_range_decompression(magnitudes) | |
| return output | |
| def mel_spectrogram(self, y, normalize_fun=torch.log): | |
| """Computes mel-spectrograms from a batch of waves | |
| PARAMS | |
| ------ | |
| y: Variable(torch.FloatTensor) with shape (B, T) in range [-1, 1] | |
| RETURNS | |
| ------- | |
| mel_output: torch.FloatTensor of shape (B, n_mel_channels, T) | |
| """ | |
| assert torch.min(y.data) >= -1, torch.min(y.data) | |
| assert torch.max(y.data) <= 1, torch.max(y.data) | |
| magnitudes, phases = self.stft_fn.transform(y) | |
| magnitudes = magnitudes.data | |
| mel_output = torch.matmul(self.mel_basis, magnitudes) | |
| mel_output = self.spectral_normalize(mel_output, normalize_fun) | |
| energy = torch.norm(magnitudes, dim=1) | |
| log_magnitudes = self.spectral_normalize(magnitudes, normalize_fun) | |
| return mel_output, log_magnitudes, energy | |
| def pad_wav(waveform, segment_length): | |
| waveform_length = waveform.shape[-1] | |
| assert waveform_length > 100, "Waveform is too short, %s" % waveform_length | |
| if segment_length is None or waveform_length == segment_length: | |
| return waveform | |
| elif waveform_length > segment_length: | |
| return waveform[:,:segment_length] | |
| elif waveform_length < segment_length: | |
| temp_wav = np.zeros((1, segment_length)) | |
| temp_wav[:, :waveform_length] = waveform | |
| return temp_wav | |
| def normalize_wav(waveform): | |
| waveform = waveform - np.mean(waveform) | |
| waveform = waveform / (np.max(np.abs(waveform)) + 1e-8) | |
| return waveform * 0.5 | |
| def _pad_spec(fbank, target_length=1024): | |
| n_frames = fbank.shape[0] | |
| p = target_length - n_frames | |
| # cut and pad | |
| if p > 0: | |
| m = torch.nn.ZeroPad2d((0, 0, 0, p)) | |
| fbank = m(fbank) | |
| elif p < 0: | |
| fbank = fbank[0:target_length, :] | |
| if fbank.size(-1) % 2 != 0: | |
| fbank = fbank[..., :-1] | |
| return fbank | |
| def get_mel_from_wav(audio, _stft): | |
| audio = torch.clip(torch.FloatTensor(audio).unsqueeze(0), -1, 1) | |
| audio = torch.autograd.Variable(audio, requires_grad=False) | |
| melspec, log_magnitudes_stft, energy = _stft.mel_spectrogram(audio) | |
| melspec = torch.squeeze(melspec, 0).numpy().astype(np.float32) | |
| log_magnitudes_stft = ( | |
| torch.squeeze(log_magnitudes_stft, 0).numpy().astype(np.float32) | |
| ) | |
| energy = torch.squeeze(energy, 0).numpy().astype(np.float32) | |
| return melspec, log_magnitudes_stft, energy | |
| def read_wav_file_io(bytes): | |
| # waveform, sr = librosa.load(filename, sr=None, mono=True) # 4 times slower | |
| waveform, sr = torchaudio.load(bytes, format='mp4') # Faster!!! | |
| waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=16000) | |
| # waveform = waveform.numpy()[0, ...] | |
| # waveform = normalize_wav(waveform) | |
| # waveform = waveform[None, ...] | |
| # waveform = waveform / (np.max(np.abs(waveform)) + 1e-8) | |
| # waveform = 0.5 * waveform | |
| return waveform | |
| def load_audio(bytes, sample_rate=16000): | |
| waveform, sr = torchaudio.load(bytes, format='mp4') | |
| waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=sample_rate) | |
| return waveform | |
| def read_wav_file(filename): | |
| # waveform, sr = librosa.load(filename, sr=None, mono=True) # 4 times slower | |
| waveform, sr = torchaudio.load(filename) # Faster!!! | |
| waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=16000) | |
| waveform = waveform.numpy()[0, ...] | |
| waveform = normalize_wav(waveform) | |
| waveform = waveform[None, ...] | |
| waveform = waveform / np.max(np.abs(waveform)) | |
| waveform = 0.5 * waveform | |
| return waveform | |
| def norm_wav_tensor(waveform: torch.FloatTensor): | |
| waveform = waveform.numpy()[0, ...] | |
| waveform = normalize_wav(waveform) | |
| waveform = waveform[None, ...] | |
| waveform = waveform / (np.max(np.abs(waveform)) + 1e-8) | |
| waveform = 0.5 * waveform | |
| return waveform | |
| def wav_to_fbank(filename, target_length=1024, fn_STFT=None): | |
| if fn_STFT is None: | |
| fn_STFT = TacotronSTFT( | |
| 1024, # filter_length | |
| 160, # hop_length | |
| 1024, # win_length | |
| 64, # n_mel | |
| 16000, # sample_rate | |
| 0, # fmin | |
| 8000, # fmax | |
| ) | |
| # mixup | |
| waveform = read_wav_file(filename, target_length * 160) # hop size is 160 | |
| waveform = waveform[0, ...] | |
| waveform = torch.FloatTensor(waveform) | |
| fbank, log_magnitudes_stft, energy = get_mel_from_wav(waveform, fn_STFT) | |
| fbank = torch.FloatTensor(fbank.T) | |
| log_magnitudes_stft = torch.FloatTensor(log_magnitudes_stft.T) | |
| fbank, log_magnitudes_stft = _pad_spec(fbank, target_length), _pad_spec( | |
| log_magnitudes_stft, target_length | |
| ) | |
| return fbank, log_magnitudes_stft, waveform | |
| def wav_tensor_to_fbank(waveform, target_length=512, fn_STFT=None): | |
| if fn_STFT is None: | |
| fn_STFT = TacotronSTFT( | |
| 1024, # filter_length | |
| 160, # hop_length | |
| 1024, # win_length | |
| 256, # n_mel | |
| 16000, # sample_rate | |
| 0, # fmin | |
| 8000, # fmax | |
| ) # In practice used | |
| fbank, log_magnitudes_stft, energy = get_mel_from_wav(waveform, fn_STFT) | |
| fbank = torch.FloatTensor(fbank.T) | |
| log_magnitudes_stft = torch.FloatTensor(log_magnitudes_stft.T) | |
| fbank, log_magnitudes_stft = _pad_spec(fbank, target_length), _pad_spec( | |
| log_magnitudes_stft, target_length | |
| ) | |
| return fbank |