|
import os
|
|
import re
|
|
import sys
|
|
import math
|
|
import torch
|
|
import parselmouth
|
|
|
|
import numba as nb
|
|
import numpy as np
|
|
|
|
from scipy.signal import medfilt
|
|
from librosa import yin, pyin, piptrack
|
|
|
|
sys.path.append(os.getcwd())
|
|
|
|
from main.library.utils import get_providers
|
|
from main.library.predictors.FCN.FCN import FCN
|
|
from main.library.predictors.FCPE.FCPE import FCPE
|
|
from main.library.predictors.CREPE.CREPE import CREPE
|
|
from main.library.predictors.RMVPE.RMVPE import RMVPE
|
|
from main.library.predictors.WORLD.WORLD import PYWORLD
|
|
from main.app.variables import configs, logger, translations
|
|
from main.library.predictors.CREPE.filter import mean, median
|
|
from main.library.predictors.WORLD.SWIPE import swipe, stonemask
|
|
from main.inference.conversion.utils import autotune_f0, proposal_f0_up_key
|
|
|
|
@nb.jit(nopython=True)
|
|
def post_process(tf0, f0, f0_up_key, manual_x_pad, f0_mel_min, f0_mel_max, manual_f0 = None):
|
|
f0 = np.multiply(f0, pow(2, f0_up_key / 12))
|
|
|
|
if manual_f0 is not None:
|
|
replace_f0 = np.interp(
|
|
list(
|
|
range(
|
|
np.round(
|
|
(manual_f0[:, 0].max() - manual_f0[:, 0].min()) * tf0 + 1
|
|
).astype(np.int16)
|
|
)
|
|
),
|
|
manual_f0[:, 0] * 100,
|
|
manual_f0[:, 1]
|
|
)
|
|
f0[manual_x_pad * tf0 : manual_x_pad * tf0 + len(replace_f0)] = replace_f0[:f0[manual_x_pad * tf0 : manual_x_pad * tf0 + len(replace_f0)].shape[0]]
|
|
|
|
f0_mel = 1127 * np.log(1 + f0 / 700)
|
|
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1
|
|
f0_mel[f0_mel <= 1] = 1
|
|
f0_mel[f0_mel > 255] = 255
|
|
|
|
return np.rint(f0_mel).astype(np.int32), f0
|
|
|
|
class Generator:
|
|
def __init__(self, sample_rate = 16000, hop_length = 160, f0_min = 50, f0_max = 1100, is_half = False, device = "cpu", f0_onnx_mode = False, del_onnx_model = True):
|
|
self.sample_rate = sample_rate
|
|
self.hop_length = hop_length
|
|
self.f0_min = f0_min
|
|
self.f0_max = f0_max
|
|
self.is_half = is_half
|
|
self.device = device
|
|
self.providers = get_providers() if f0_onnx_mode else None
|
|
self.f0_onnx_mode = f0_onnx_mode
|
|
self.del_onnx_model = del_onnx_model
|
|
self.window = 160
|
|
self.batch_size = 512
|
|
self.ref_freqs = [49.00, 51.91, 55.00, 58.27, 61.74, 65.41, 69.30, 73.42, 77.78, 82.41, 87.31, 92.50, 98.00, 103.83, 110.00, 116.54, 123.47, 130.81, 138.59, 146.83, 155.56, 164.81, 174.61, 185.00, 196.00, 207.65, 220.00, 233.08, 246.94, 261.63, 277.18, 293.66, 311.13, 329.63, 349.23, 369.99, 392.00, 415.30, 440.00, 466.16, 493.88, 523.25, 554.37, 587.33, 622.25, 659.25, 698.46, 739.99, 783.99, 830.61, 880.00, 932.33, 987.77, 1046.50]
|
|
|
|
def calculator(self, x_pad, f0_method, x, f0_up_key = 0, p_len = None, filter_radius = 3, f0_autotune = False, f0_autotune_strength = 1, manual_f0 = None, proposal_pitch = False, proposal_pitch_threshold = 255.0):
|
|
if p_len is None: p_len = x.shape[0] // self.window
|
|
if "hybrid" in f0_method: logger.debug(translations["hybrid_calc"].format(f0_method=f0_method))
|
|
|
|
model = self.get_f0_hybrid if "hybrid" in f0_method else self.compute_f0
|
|
f0 = model(f0_method, x, p_len, filter_radius if filter_radius % 2 != 0 else filter_radius + 1)
|
|
|
|
if isinstance(f0, tuple): f0 = f0[0]
|
|
|
|
if proposal_pitch:
|
|
up_key = proposal_f0_up_key(f0, proposal_pitch_threshold, configs["limit_f0"])
|
|
logger.debug(translations["proposal_f0"].format(up_key=up_key))
|
|
f0_up_key += up_key
|
|
|
|
if f0_autotune:
|
|
logger.debug(translations["startautotune"])
|
|
f0 = autotune_f0(self.ref_freqs, f0, f0_autotune_strength)
|
|
|
|
return post_process(
|
|
self.sample_rate // self.window,
|
|
f0,
|
|
f0_up_key,
|
|
x_pad,
|
|
1127 * math.log(1 + self.f0_min / 700),
|
|
1127 * math.log(1 + self.f0_max / 700),
|
|
manual_f0
|
|
)
|
|
|
|
def _resize_f0(self, x, target_len):
|
|
source = np.array(x)
|
|
source[source < 0.001] = np.nan
|
|
|
|
return np.nan_to_num(
|
|
np.interp(
|
|
np.arange(0, len(source) * target_len, len(source)) / target_len,
|
|
np.arange(0, len(source)),
|
|
source
|
|
)
|
|
)
|
|
|
|
def compute_f0(self, f0_method, x, p_len, filter_radius):
|
|
return {
|
|
"pm-ac": lambda: self.get_f0_pm(x, p_len, filter_radius=filter_radius, mode="ac"),
|
|
"pm-cc": lambda: self.get_f0_pm(x, p_len, filter_radius=filter_radius, mode="cc"),
|
|
"pm-shs": lambda: self.get_f0_pm(x, p_len, filter_radius=filter_radius, mode="shs"),
|
|
"dio": lambda: self.get_f0_pyworld(x, p_len, filter_radius, "dio"),
|
|
"mangio-crepe-tiny": lambda: self.get_f0_mangio_crepe(x, p_len, "tiny"),
|
|
"mangio-crepe-small": lambda: self.get_f0_mangio_crepe(x, p_len, "small"),
|
|
"mangio-crepe-medium": lambda: self.get_f0_mangio_crepe(x, p_len, "medium"),
|
|
"mangio-crepe-large": lambda: self.get_f0_mangio_crepe(x, p_len, "large"),
|
|
"mangio-crepe-full": lambda: self.get_f0_mangio_crepe(x, p_len, "full"),
|
|
"crepe-tiny": lambda: self.get_f0_crepe(x, p_len, "tiny", filter_radius=filter_radius),
|
|
"crepe-small": lambda: self.get_f0_crepe(x, p_len, "small", filter_radius=filter_radius),
|
|
"crepe-medium": lambda: self.get_f0_crepe(x, p_len, "medium", filter_radius=filter_radius),
|
|
"crepe-large": lambda: self.get_f0_crepe(x, p_len, "large", filter_radius=filter_radius),
|
|
"crepe-full": lambda: self.get_f0_crepe(x, p_len, "full", filter_radius=filter_radius),
|
|
"fcpe": lambda: self.get_f0_fcpe(x, p_len, filter_radius=filter_radius),
|
|
"fcpe-legacy": lambda: self.get_f0_fcpe(x, p_len, legacy=True, filter_radius=filter_radius),
|
|
"rmvpe": lambda: self.get_f0_rmvpe(x, p_len, filter_radius=filter_radius),
|
|
"rmvpe-legacy": lambda: self.get_f0_rmvpe(x, p_len, legacy=True, filter_radius=filter_radius),
|
|
"harvest": lambda: self.get_f0_pyworld(x, p_len, filter_radius, "harvest"),
|
|
"yin": lambda: self.get_f0_librosa(x, p_len, mode="yin"),
|
|
"pyin": lambda: self.get_f0_librosa(x, p_len, mode="pyin"),
|
|
"piptrack": lambda: self.get_f0_librosa(x, p_len, mode="piptrack"),
|
|
"swipe": lambda: self.get_f0_swipe(x, p_len, filter_radius=filter_radius),
|
|
"fcn": lambda: self.get_f0_fcn(x, p_len, filter_radius=filter_radius)
|
|
}[f0_method]()
|
|
|
|
def get_f0_hybrid(self, methods_str, x, p_len, filter_radius):
|
|
methods_str = re.search("hybrid\[(.+)\]", methods_str)
|
|
if methods_str: methods = [method.strip() for method in methods_str.group(1).split("+")]
|
|
f0_computation_stack, resampled_stack = [], []
|
|
|
|
x = x.astype(np.float32)
|
|
x /= np.quantile(np.abs(x), 0.999)
|
|
|
|
for method in methods:
|
|
f0 = None
|
|
f0 = self.compute_f0(method, x, p_len, filter_radius)
|
|
f0_computation_stack.append(f0)
|
|
|
|
for f0 in f0_computation_stack:
|
|
resampled_stack.append(
|
|
np.interp(
|
|
np.linspace(0, len(f0), p_len),
|
|
np.arange(len(f0)),
|
|
f0
|
|
)
|
|
)
|
|
|
|
return resampled_stack[0] if len(resampled_stack) == 1 else np.nanmedian(np.vstack(resampled_stack), axis=0)
|
|
|
|
def get_f0_pm(self, x, p_len, filter_radius=3, mode="ac"):
|
|
model = parselmouth.Sound(
|
|
x,
|
|
self.sample_rate
|
|
)
|
|
|
|
time_step = self.window / self.sample_rate * 1000 / 1000
|
|
model_mode = {"ac": model.to_pitch_ac, "cc": model.to_pitch_cc, "shs": model.to_pitch_shs}.get(mode, model.to_pitch_ac)
|
|
|
|
if mode != "shs":
|
|
f0 = (
|
|
model_mode(
|
|
time_step=time_step,
|
|
voicing_threshold=filter_radius / 10 * 2,
|
|
pitch_floor=self.f0_min,
|
|
pitch_ceiling=self.f0_max
|
|
).selected_array["frequency"]
|
|
)
|
|
else:
|
|
f0 = (
|
|
model_mode(
|
|
time_step=time_step,
|
|
minimum_pitch=self.f0_min,
|
|
maximum_frequency_component=self.f0_max
|
|
).selected_array["frequency"]
|
|
)
|
|
|
|
pad_size = (p_len - len(f0) + 1) // 2
|
|
|
|
if pad_size > 0 or p_len - len(f0) - pad_size > 0: f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant")
|
|
return f0
|
|
|
|
def get_f0_mangio_crepe(self, x, p_len, model="full"):
|
|
if not hasattr(self, "mangio_crepe"):
|
|
self.mangio_crepe = CREPE(
|
|
os.path.join(
|
|
configs["predictors_path"],
|
|
f"crepe_{model}.{'onnx' if self.f0_onnx_mode else 'pth'}"
|
|
),
|
|
model_size=model,
|
|
hop_length=self.hop_length,
|
|
batch_size=self.hop_length * 2,
|
|
f0_min=self.f0_min,
|
|
f0_max=self.f0_max,
|
|
device=self.device,
|
|
sample_rate=self.sample_rate,
|
|
providers=self.providers,
|
|
onnx=self.f0_onnx_mode,
|
|
return_periodicity=False
|
|
)
|
|
|
|
x = x.astype(np.float32)
|
|
x /= np.quantile(np.abs(x), 0.999)
|
|
|
|
audio = torch.unsqueeze(torch.from_numpy(x).to(self.device, copy=True), dim=0)
|
|
if audio.ndim == 2 and audio.shape[0] > 1: audio = torch.mean(audio, dim=0, keepdim=True).detach()
|
|
|
|
f0 = self.mangio_crepe.compute_f0(audio.detach(), pad=True)
|
|
if self.f0_onnx_mode and self.del_onnx_model: del self.mangio_crepe.model, self.mangio_crepe
|
|
|
|
return self._resize_f0(f0.squeeze(0).cpu().float().numpy(), p_len)
|
|
|
|
def get_f0_crepe(self, x, p_len, model="full", filter_radius=3):
|
|
if not hasattr(self, "crepe"):
|
|
self.crepe = CREPE(
|
|
os.path.join(
|
|
configs["predictors_path"],
|
|
f"crepe_{model}.{'onnx' if self.f0_onnx_mode else 'pth'}"
|
|
),
|
|
model_size=model,
|
|
hop_length=self.hop_length,
|
|
batch_size=self.batch_size,
|
|
f0_min=self.f0_min,
|
|
f0_max=self.f0_max,
|
|
device=self.device,
|
|
sample_rate=self.sample_rate,
|
|
providers=self.providers,
|
|
onnx=self.f0_onnx_mode,
|
|
return_periodicity=True
|
|
)
|
|
|
|
f0, pd = self.crepe.compute_f0(torch.tensor(np.copy(x))[None].float(), pad=True)
|
|
if self.f0_onnx_mode and self.del_onnx_model: del self.crepe.model, self.crepe
|
|
|
|
f0, pd = mean(f0, filter_radius), median(pd, filter_radius)
|
|
f0[pd < 0.1] = 0
|
|
|
|
return self._resize_f0(f0[0].cpu().numpy(), p_len)
|
|
|
|
def get_f0_fcpe(self, x, p_len, legacy=False, filter_radius=3):
|
|
if not hasattr(self, "fcpe"):
|
|
self.fcpe = FCPE(
|
|
configs,
|
|
os.path.join(
|
|
configs["predictors_path"],
|
|
("fcpe_legacy" if legacy else "fcpe") + (".onnx" if self.f0_onnx_mode else ".pt")
|
|
),
|
|
hop_length=self.hop_length,
|
|
f0_min=self.f0_min,
|
|
f0_max=self.f0_max,
|
|
dtype=torch.float32,
|
|
device=self.device,
|
|
sample_rate=self.sample_rate,
|
|
threshold=(filter_radius / 100) if legacy else (filter_radius / 1000 * 2),
|
|
providers=self.providers,
|
|
onnx=self.f0_onnx_mode,
|
|
legacy=legacy
|
|
)
|
|
|
|
f0 = self.fcpe.compute_f0(x, p_len)
|
|
if self.f0_onnx_mode and self.del_onnx_model: del self.fcpe.model.model, self.fcpe
|
|
|
|
return f0
|
|
|
|
def get_f0_rmvpe(self, x, p_len, legacy=False, filter_radius=3):
|
|
if not hasattr(self, "rmvpe"):
|
|
self.rmvpe = RMVPE(
|
|
os.path.join(
|
|
configs["predictors_path"],
|
|
"rmvpe" + (".onnx" if self.f0_onnx_mode else ".pt")
|
|
),
|
|
is_half=self.is_half,
|
|
device=self.device,
|
|
onnx=self.f0_onnx_mode,
|
|
providers=self.providers
|
|
)
|
|
|
|
filter_radius = filter_radius / 100
|
|
f0 = self.rmvpe.infer_from_audio_with_pitch(x, thred=filter_radius, f0_min=self.f0_min, f0_max=self.f0_max) if legacy else self.rmvpe.infer_from_audio(x, thred=filter_radius)
|
|
|
|
if self.f0_onnx_mode and self.del_onnx_model: del self.rmvpe.model, self.rmvpe
|
|
return self._resize_f0(f0, p_len)
|
|
|
|
def get_f0_pyworld(self, x, p_len, filter_radius, model="harvest"):
|
|
if not hasattr(self, "pw"): self.pw = PYWORLD(configs)
|
|
|
|
x = x.astype(np.double)
|
|
pw = self.pw.harvest if model == "harvest" else self.pw.dio
|
|
|
|
f0, t = pw(
|
|
x,
|
|
fs=self.sample_rate,
|
|
f0_ceil=self.f0_max,
|
|
f0_floor=self.f0_min,
|
|
frame_period=1000 * self.window / self.sample_rate
|
|
)
|
|
|
|
f0 = self.pw.stonemask(
|
|
x,
|
|
self.sample_rate,
|
|
t,
|
|
f0
|
|
)
|
|
|
|
if filter_radius > 2 and model == "harvest": f0 = medfilt(f0, filter_radius)
|
|
elif model == "dio":
|
|
for index, pitch in enumerate(f0):
|
|
f0[index] = round(pitch, 1)
|
|
|
|
return self._resize_f0(f0, p_len)
|
|
|
|
def get_f0_swipe(self, x, p_len, filter_radius=3):
|
|
f0, t = swipe(
|
|
x.astype(np.float32),
|
|
self.sample_rate,
|
|
f0_floor=self.f0_min,
|
|
f0_ceil=self.f0_max,
|
|
frame_period=1000 * self.window / self.sample_rate,
|
|
sTHR=filter_radius / 10
|
|
)
|
|
|
|
return self._resize_f0(
|
|
stonemask(
|
|
x,
|
|
self.sample_rate,
|
|
t,
|
|
f0
|
|
),
|
|
p_len
|
|
)
|
|
|
|
def get_f0_librosa(self, x, p_len, mode="yin"):
|
|
if mode != "piptrack":
|
|
self.if_yin = mode == "yin"
|
|
self.yin = yin if self.if_yin else pyin
|
|
|
|
f0 = self.yin(
|
|
x.astype(np.float32),
|
|
sr=self.sample_rate,
|
|
fmin=self.f0_min,
|
|
fmax=self.f0_max,
|
|
hop_length=self.hop_length
|
|
)
|
|
|
|
if not self.if_yin: f0 = f0[0]
|
|
else:
|
|
pitches, magnitudes = piptrack(
|
|
y=x.astype(np.float32),
|
|
sr=self.sample_rate,
|
|
fmin=self.f0_min,
|
|
fmax=self.f0_max,
|
|
hop_length=self.hop_length,
|
|
)
|
|
|
|
max_indexes = np.argmax(magnitudes, axis=0)
|
|
f0 = pitches[max_indexes, range(magnitudes.shape[1])]
|
|
|
|
return self._resize_f0(f0, p_len)
|
|
|
|
def get_f0_fcn(self, x, p_len, filter_radius=3):
|
|
if not hasattr(self, "fcn"):
|
|
self.fcn = FCN(
|
|
os.path.join(
|
|
configs["predictors_path"],
|
|
f"fcn.{'onnx' if self.f0_onnx_mode else 'pt'}"
|
|
),
|
|
hop_length=self.hop_length,
|
|
batch_size=self.batch_size,
|
|
f0_min=self.f0_min,
|
|
f0_max=self.f0_max,
|
|
device=self.device,
|
|
sample_rate=self.sample_rate,
|
|
providers=self.providers,
|
|
onnx=self.f0_onnx_mode,
|
|
)
|
|
|
|
x = x.astype(np.float32)
|
|
x /= np.quantile(np.abs(x), 0.999)
|
|
|
|
audio = torch.unsqueeze(torch.from_numpy(x).to(self.device, copy=True), dim=0)
|
|
if audio.ndim == 2 and audio.shape[0] > 1: audio = torch.mean(audio, dim=0, keepdim=True).detach()
|
|
|
|
f0, pd = self.fcn.compute_f0(audio.detach())
|
|
if self.f0_onnx_mode and self.del_onnx_model: del self.fcn.model, self.fcn
|
|
|
|
f0, pd = mean(f0, filter_radius), median(pd, filter_radius)
|
|
f0[pd < 0.1] = 0
|
|
|
|
f0 = f0[0].cpu().numpy()
|
|
for index, pitch in enumerate(f0):
|
|
f0[index] = pitch * 2.0190475097926434038940242706786
|
|
|
|
return self._resize_f0(f0, p_len) |