|
import torch |
|
import torchaudio |
|
import torch.nn.functional as F |
|
import numpy as np |
|
from scipy.signal import get_window |
|
import librosa.util as librosa_util |
|
from librosa.util import pad_center, tiny |
|
from librosa.filters import mel as librosa_mel_fn |
|
import io |
|
|
|
|
|
class STFT(torch.nn.Module): |
|
"""adapted from Prem Seetharaman's https://github.com/pseeth/pytorch-stft""" |
|
|
|
def __init__(self, filter_length, hop_length, win_length, window="hann"): |
|
super(STFT, self).__init__() |
|
self.filter_length = filter_length |
|
self.hop_length = hop_length |
|
self.win_length = win_length |
|
self.window = window |
|
self.forward_transform = None |
|
scale = self.filter_length / self.hop_length |
|
fourier_basis = np.fft.fft(np.eye(self.filter_length)) |
|
|
|
cutoff = int((self.filter_length / 2 + 1)) |
|
fourier_basis = np.vstack( |
|
[np.real(fourier_basis[:cutoff, :]), np.imag(fourier_basis[:cutoff, :])] |
|
) |
|
|
|
forward_basis = torch.FloatTensor(fourier_basis[:, None, :]) |
|
inverse_basis = torch.FloatTensor( |
|
np.linalg.pinv(scale * fourier_basis).T[:, None, :] |
|
) |
|
|
|
if window is not None: |
|
assert filter_length >= win_length |
|
|
|
fft_window = get_window(window, win_length, fftbins=True) |
|
fft_window = pad_center(fft_window, filter_length) |
|
fft_window = torch.from_numpy(fft_window).float() |
|
|
|
|
|
forward_basis *= fft_window |
|
inverse_basis *= fft_window |
|
|
|
self.register_buffer("forward_basis", forward_basis.float()) |
|
self.register_buffer("inverse_basis", inverse_basis.float()) |
|
|
|
def transform(self, input_data): |
|
num_batches = input_data.size(0) |
|
num_samples = input_data.size(1) |
|
|
|
self.num_samples = num_samples |
|
|
|
|
|
input_data = input_data.view(num_batches, 1, num_samples) |
|
input_data = F.pad( |
|
input_data.unsqueeze(1), |
|
(int(self.filter_length / 2), int(self.filter_length / 2), 0, 0), |
|
mode="reflect", |
|
) |
|
input_data = input_data.squeeze(1) |
|
|
|
forward_transform = F.conv1d( |
|
input_data, |
|
torch.autograd.Variable(self.forward_basis, requires_grad=False), |
|
stride=self.hop_length, |
|
padding=0, |
|
).cpu() |
|
|
|
cutoff = int((self.filter_length / 2) + 1) |
|
real_part = forward_transform[:, :cutoff, :] |
|
imag_part = forward_transform[:, cutoff:, :] |
|
|
|
magnitude = torch.sqrt(real_part**2 + imag_part**2) |
|
phase = torch.autograd.Variable(torch.atan2(imag_part.data, real_part.data)) |
|
|
|
return magnitude, phase |
|
|
|
def inverse(self, magnitude, phase): |
|
recombine_magnitude_phase = torch.cat( |
|
[magnitude * torch.cos(phase), magnitude * torch.sin(phase)], dim=1 |
|
) |
|
|
|
inverse_transform = F.conv_transpose1d( |
|
recombine_magnitude_phase, |
|
torch.autograd.Variable(self.inverse_basis, requires_grad=False), |
|
stride=self.hop_length, |
|
padding=0, |
|
) |
|
|
|
if self.window is not None: |
|
window_sum = window_sumsquare( |
|
self.window, |
|
magnitude.size(-1), |
|
hop_length=self.hop_length, |
|
win_length=self.win_length, |
|
n_fft=self.filter_length, |
|
dtype=np.float32, |
|
) |
|
|
|
approx_nonzero_indices = torch.from_numpy( |
|
np.where(window_sum > tiny(window_sum))[0] |
|
) |
|
window_sum = torch.autograd.Variable( |
|
torch.from_numpy(window_sum), requires_grad=False |
|
) |
|
window_sum = window_sum |
|
inverse_transform[:, :, approx_nonzero_indices] /= window_sum[ |
|
approx_nonzero_indices |
|
] |
|
|
|
|
|
inverse_transform *= float(self.filter_length) / self.hop_length |
|
|
|
inverse_transform = inverse_transform[:, :, int(self.filter_length / 2) :] |
|
inverse_transform = inverse_transform[:, :, : -int(self.filter_length / 2) :] |
|
|
|
return inverse_transform |
|
|
|
def forward(self, input_data): |
|
self.magnitude, self.phase = self.transform(input_data) |
|
reconstruction = self.inverse(self.magnitude, self.phase) |
|
return reconstruction |
|
|
|
def window_sumsquare( |
|
window, |
|
n_frames, |
|
hop_length, |
|
win_length, |
|
n_fft, |
|
dtype=np.float32, |
|
norm=None, |
|
): |
|
""" |
|
# from librosa 0.6 |
|
Compute the sum-square envelope of a window function at a given hop length. |
|
|
|
This is used to estimate modulation effects induced by windowing |
|
observations in short-time fourier transforms. |
|
|
|
Parameters |
|
---------- |
|
window : string, tuple, number, callable, or list-like |
|
Window specification, as in `get_window` |
|
|
|
n_frames : int > 0 |
|
The number of analysis frames |
|
|
|
hop_length : int > 0 |
|
The number of samples to advance between frames |
|
|
|
win_length : [optional] |
|
The length of the window function. By default, this matches `n_fft`. |
|
|
|
n_fft : int > 0 |
|
The length of each analysis frame. |
|
|
|
dtype : np.dtype |
|
The data type of the output |
|
|
|
Returns |
|
------- |
|
wss : np.ndarray, shape=`(n_fft + hop_length * (n_frames - 1))` |
|
The sum-squared envelope of the window function |
|
""" |
|
if win_length is None: |
|
win_length = n_fft |
|
|
|
n = n_fft + hop_length * (n_frames - 1) |
|
x = np.zeros(n, dtype=dtype) |
|
|
|
|
|
win_sq = get_window(window, win_length, fftbins=True) |
|
win_sq = librosa_util.normalize(win_sq, norm=norm) ** 2 |
|
win_sq = librosa_util.pad_center(win_sq, n_fft) |
|
|
|
|
|
for i in range(n_frames): |
|
sample = i * hop_length |
|
x[sample : min(n, sample + n_fft)] += win_sq[: max(0, min(n_fft, n - sample))] |
|
return x |
|
|
|
|
|
def griffin_lim(magnitudes, stft_fn, n_iters=30): |
|
""" |
|
PARAMS |
|
------ |
|
magnitudes: spectrogram magnitudes |
|
stft_fn: STFT class with transform (STFT) and inverse (ISTFT) methods |
|
""" |
|
|
|
angles = np.angle(np.exp(2j * np.pi * np.random.rand(*magnitudes.size()))) |
|
angles = angles.astype(np.float32) |
|
angles = torch.autograd.Variable(torch.from_numpy(angles)) |
|
signal = stft_fn.inverse(magnitudes, angles).squeeze(1) |
|
|
|
for i in range(n_iters): |
|
_, angles = stft_fn.transform(signal) |
|
signal = stft_fn.inverse(magnitudes, angles).squeeze(1) |
|
return signal |
|
|
|
def dynamic_range_compression(x, normalize_fun=torch.log, C=1, clip_val=1e-5): |
|
""" |
|
PARAMS |
|
------ |
|
C: compression factor |
|
""" |
|
return normalize_fun(torch.clamp(x, min=clip_val) * C) |
|
|
|
|
|
def dynamic_range_decompression(x, C=1): |
|
""" |
|
PARAMS |
|
------ |
|
C: compression factor used to compress |
|
""" |
|
return torch.exp(x) / C |
|
class TacotronSTFT(torch.nn.Module): |
|
def __init__( |
|
self, |
|
filter_length, |
|
hop_length, |
|
win_length, |
|
n_mel_channels, |
|
sampling_rate, |
|
mel_fmin, |
|
mel_fmax, |
|
): |
|
super(TacotronSTFT, self).__init__() |
|
self.n_mel_channels = n_mel_channels |
|
self.sampling_rate = sampling_rate |
|
self.stft_fn = STFT(filter_length, hop_length, win_length) |
|
mel_basis = librosa_mel_fn( |
|
sampling_rate, filter_length, n_mel_channels, mel_fmin, mel_fmax |
|
) |
|
mel_basis = torch.from_numpy(mel_basis).float() |
|
self.register_buffer("mel_basis", mel_basis) |
|
|
|
def spectral_normalize(self, magnitudes, normalize_fun): |
|
output = dynamic_range_compression(magnitudes, normalize_fun) |
|
return output |
|
|
|
def spectral_de_normalize(self, magnitudes): |
|
output = dynamic_range_decompression(magnitudes) |
|
return output |
|
|
|
def mel_spectrogram(self, y, normalize_fun=torch.log): |
|
"""Computes mel-spectrograms from a batch of waves |
|
PARAMS |
|
------ |
|
y: Variable(torch.FloatTensor) with shape (B, T) in range [-1, 1] |
|
|
|
RETURNS |
|
------- |
|
mel_output: torch.FloatTensor of shape (B, n_mel_channels, T) |
|
""" |
|
assert torch.min(y.data) >= -1, torch.min(y.data) |
|
assert torch.max(y.data) <= 1, torch.max(y.data) |
|
|
|
magnitudes, phases = self.stft_fn.transform(y) |
|
magnitudes = magnitudes.data |
|
mel_output = torch.matmul(self.mel_basis, magnitudes) |
|
mel_output = self.spectral_normalize(mel_output, normalize_fun) |
|
energy = torch.norm(magnitudes, dim=1) |
|
|
|
log_magnitudes = self.spectral_normalize(magnitudes, normalize_fun) |
|
|
|
return mel_output, log_magnitudes, energy |
|
|
|
def pad_wav(waveform, segment_length): |
|
waveform_length = waveform.shape[-1] |
|
assert waveform_length > 100, "Waveform is too short, %s" % waveform_length |
|
if segment_length is None or waveform_length == segment_length: |
|
return waveform |
|
elif waveform_length > segment_length: |
|
return waveform[:,:segment_length] |
|
elif waveform_length < segment_length: |
|
temp_wav = np.zeros((1, segment_length)) |
|
temp_wav[:, :waveform_length] = waveform |
|
return temp_wav |
|
|
|
def normalize_wav(waveform): |
|
waveform = waveform - np.mean(waveform) |
|
waveform = waveform / (np.max(np.abs(waveform)) + 1e-8) |
|
return waveform * 0.5 |
|
|
|
def _pad_spec(fbank, target_length=1024): |
|
n_frames = fbank.shape[0] |
|
p = target_length - n_frames |
|
|
|
if p > 0: |
|
m = torch.nn.ZeroPad2d((0, 0, 0, p)) |
|
fbank = m(fbank) |
|
elif p < 0: |
|
fbank = fbank[0:target_length, :] |
|
|
|
if fbank.size(-1) % 2 != 0: |
|
fbank = fbank[..., :-1] |
|
|
|
return fbank |
|
|
|
def get_mel_from_wav(audio, _stft): |
|
audio = torch.clip(torch.FloatTensor(audio).unsqueeze(0), -1, 1) |
|
audio = torch.autograd.Variable(audio, requires_grad=False) |
|
melspec, log_magnitudes_stft, energy = _stft.mel_spectrogram(audio) |
|
melspec = torch.squeeze(melspec, 0).numpy().astype(np.float32) |
|
log_magnitudes_stft = ( |
|
torch.squeeze(log_magnitudes_stft, 0).numpy().astype(np.float32) |
|
) |
|
energy = torch.squeeze(energy, 0).numpy().astype(np.float32) |
|
return melspec, log_magnitudes_stft, energy |
|
|
|
def read_wav_file_io(bytes): |
|
|
|
waveform, sr = torchaudio.load(bytes, format='mp4') |
|
waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=16000) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return waveform |
|
|
|
def load_audio(bytes, sample_rate=16000): |
|
waveform, sr = torchaudio.load(bytes, format='mp4') |
|
waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=sample_rate) |
|
return waveform |
|
|
|
def read_wav_file(filename): |
|
|
|
waveform, sr = torchaudio.load(filename) |
|
waveform = torchaudio.functional.resample(waveform, orig_freq=sr, new_freq=16000) |
|
waveform = waveform.numpy()[0, ...] |
|
waveform = normalize_wav(waveform) |
|
waveform = waveform[None, ...] |
|
|
|
waveform = waveform / np.max(np.abs(waveform)) |
|
waveform = 0.5 * waveform |
|
|
|
return waveform |
|
|
|
def norm_wav_tensor(waveform: torch.FloatTensor): |
|
waveform = waveform.numpy()[0, ...] |
|
waveform = normalize_wav(waveform) |
|
waveform = waveform[None, ...] |
|
waveform = waveform / (np.max(np.abs(waveform)) + 1e-8) |
|
waveform = 0.5 * waveform |
|
return waveform |
|
|
|
def wav_to_fbank(filename, target_length=1024, fn_STFT=None): |
|
if fn_STFT is None: |
|
fn_STFT = TacotronSTFT( |
|
1024, |
|
160, |
|
1024, |
|
64, |
|
16000, |
|
0, |
|
8000, |
|
) |
|
|
|
|
|
waveform = read_wav_file(filename, target_length * 160) |
|
|
|
waveform = waveform[0, ...] |
|
waveform = torch.FloatTensor(waveform) |
|
|
|
fbank, log_magnitudes_stft, energy = get_mel_from_wav(waveform, fn_STFT) |
|
|
|
fbank = torch.FloatTensor(fbank.T) |
|
log_magnitudes_stft = torch.FloatTensor(log_magnitudes_stft.T) |
|
|
|
fbank, log_magnitudes_stft = _pad_spec(fbank, target_length), _pad_spec( |
|
log_magnitudes_stft, target_length |
|
) |
|
|
|
return fbank, log_magnitudes_stft, waveform |
|
|
|
def wav_tensor_to_fbank(waveform, target_length=512, fn_STFT=None): |
|
if fn_STFT is None: |
|
fn_STFT = TacotronSTFT( |
|
1024, |
|
160, |
|
1024, |
|
256, |
|
16000, |
|
0, |
|
8000, |
|
) |
|
|
|
fbank, log_magnitudes_stft, energy = get_mel_from_wav(waveform, fn_STFT) |
|
|
|
fbank = torch.FloatTensor(fbank.T) |
|
log_magnitudes_stft = torch.FloatTensor(log_magnitudes_stft.T) |
|
|
|
fbank, log_magnitudes_stft = _pad_spec(fbank, target_length), _pad_spec( |
|
log_magnitudes_stft, target_length |
|
) |
|
|
|
return fbank |