|
import os
|
|
import re
|
|
import sys
|
|
import torch
|
|
import codecs
|
|
import librosa
|
|
import logging
|
|
import onnxruntime
|
|
|
|
import numpy as np
|
|
import torch.nn as nn
|
|
import soundfile as sf
|
|
|
|
from pydub import AudioSegment
|
|
from transformers import HubertModel
|
|
|
|
sys.path.append(os.getcwd())
|
|
|
|
from main.tools import huggingface
|
|
from main.library.architectures import fairseq
|
|
from main.app.variables import translations, configs, config, embedders_model, logger
|
|
|
|
for l in ["httpx", "httpcore"]:
|
|
logging.getLogger(l).setLevel(logging.ERROR)
|
|
|
|
class HubertModelWithFinalProj(HubertModel):
|
|
def __init__(self, config):
|
|
super().__init__(config)
|
|
self.final_proj = nn.Linear(config.hidden_size, config.classifier_proj_size)
|
|
|
|
def check_assets(method, hubert, f0_onnx=False, embedders_mode="fairseq"):
|
|
predictors_url = codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/cerqvpgbef/", "rot13")
|
|
embedders_url = codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/rzorqqref/", "rot13")
|
|
|
|
if f0_onnx: method += "-onnx"
|
|
if embedders_mode == "spin": embedders_mode, hubert = "transformers", "spin"
|
|
|
|
def download_predictor(predictor):
|
|
model_path = os.path.join(configs["predictors_path"], predictor)
|
|
|
|
if not os.path.exists(os.path.join(configs["predictors_path"], predictor)):
|
|
huggingface.HF_download_file(
|
|
predictors_url + predictor,
|
|
model_path
|
|
)
|
|
|
|
return os.path.exists(model_path)
|
|
|
|
def download_embedder(embedders_mode, hubert):
|
|
model_path = os.path.join(configs["embedders_path"], hubert)
|
|
|
|
if embedders_mode != "transformers" and not os.path.exists(model_path): huggingface.HF_download_file("".join([embedders_url, "fairseq/" if embedders_mode == "fairseq" else "onnx/", hubert]), model_path)
|
|
elif embedders_mode == "transformers":
|
|
url, hubert = ("transformers/", hubert) if hubert != "spin" else ("spin", "")
|
|
|
|
bin_file = os.path.join(model_path, "model.safetensors")
|
|
config_file = os.path.join(model_path, "config.json")
|
|
|
|
os.makedirs(model_path, exist_ok=True)
|
|
|
|
if not os.path.exists(bin_file): huggingface.HF_download_file("".join([embedders_url, url, hubert, "/model.safetensors"]), bin_file)
|
|
if not os.path.exists(config_file): huggingface.HF_download_file("".join([embedders_url, url, hubert, "/config.json"]), config_file)
|
|
|
|
return os.path.exists(bin_file) and os.path.exists(config_file)
|
|
|
|
return os.path.exists(model_path)
|
|
|
|
model_dict = {
|
|
**dict.fromkeys(["rmvpe", "rmvpe-legacy"], "rmvpe.pt"),
|
|
**dict.fromkeys(["rmvpe-onnx", "rmvpe-legacy-onnx"], "rmvpe.onnx"),
|
|
**dict.fromkeys(["fcpe"], "fcpe.pt"),
|
|
**dict.fromkeys(["fcpe-legacy"], "fcpe_legacy.pt"),
|
|
**dict.fromkeys(["fcpe-onnx"], "fcpe.onnx"),
|
|
**dict.fromkeys(["fcpe-legacy-onnx"], "fcpe_legacy.onnx"),
|
|
**dict.fromkeys(["crepe-full", "mangio-crepe-full"], "crepe_full.pth"),
|
|
**dict.fromkeys(["crepe-full-onnx", "mangio-crepe-full-onnx"], "crepe_full.onnx"),
|
|
**dict.fromkeys(["crepe-large", "mangio-crepe-large"], "crepe_large.pth"),
|
|
**dict.fromkeys(["crepe-large-onnx", "mangio-crepe-large-onnx"], "crepe_large.onnx"),
|
|
**dict.fromkeys(["crepe-medium", "mangio-crepe-medium"], "crepe_medium.pth"),
|
|
**dict.fromkeys(["crepe-medium-onnx", "mangio-crepe-medium-onnx"], "crepe_medium.onnx"),
|
|
**dict.fromkeys(["crepe-small", "mangio-crepe-small"], "crepe_small.pth"),
|
|
**dict.fromkeys(["crepe-small-onnx", "mangio-crepe-small-onnx"], "crepe_small.onnx"),
|
|
**dict.fromkeys(["crepe-tiny", "mangio-crepe-tiny"], "crepe_tiny.pth"),
|
|
**dict.fromkeys(["crepe-tiny-onnx", "mangio-crepe-tiny-onnx"], "crepe_tiny.onnx"),
|
|
**dict.fromkeys(["fcn"], "fcn.pt"),
|
|
**dict.fromkeys(["fcn-onnx"], "fcn.onnx")
|
|
}
|
|
|
|
results = []
|
|
count = configs.get("num_of_restart", 5)
|
|
|
|
for i in range(count):
|
|
if "hybrid" in method:
|
|
methods_str = re.search("hybrid\[(.+)\]", method)
|
|
if methods_str: methods = [method.strip() for method in methods_str.group(1).split("+")]
|
|
|
|
for method in methods:
|
|
if method in model_dict: results.append(download_predictor(model_dict[method]))
|
|
elif method in model_dict: results.append(download_predictor(model_dict[method]))
|
|
|
|
if hubert in embedders_model:
|
|
if embedders_mode != "transformers": hubert += ".pt" if embedders_mode == "fairseq" else ".onnx"
|
|
results.append(download_embedder(embedders_mode, hubert))
|
|
|
|
if all(results): return
|
|
else: results = []
|
|
|
|
logger.warning(translations["check_assets_error"].format(count=count))
|
|
sys.exit(1)
|
|
|
|
def check_spk_diarization(model_size):
|
|
whisper_model = os.path.join(configs["speaker_diarization_path"], "models", f"{model_size}.pt")
|
|
if not os.path.exists(whisper_model): huggingface.HF_download_file("".join([codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/fcrnxre_qvnevmngvba/", "rot13"), model_size, ".pt"]), whisper_model)
|
|
|
|
speechbrain_path = os.path.join(configs["speaker_diarization_path"], "models", "speechbrain")
|
|
if not os.path.exists(speechbrain_path): os.makedirs(speechbrain_path, exist_ok=True)
|
|
|
|
for f in ["classifier.ckpt", "config.json", "embedding_model.ckpt", "hyperparams.yaml", "mean_var_norm_emb.ckpt"]:
|
|
speechbrain_model = os.path.join(speechbrain_path, f)
|
|
|
|
if not os.path.exists(speechbrain_model): huggingface.HF_download_file(codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Ivrganzrfr-EIP-Cebwrpg/erfbyir/znva/fcrnxre_qvnevmngvba/fcrrpuoenva/", "rot13") + f, speechbrain_model)
|
|
|
|
def load_audio(file, sample_rate=16000, formant_shifting=False, formant_qfrency=0.8, formant_timbre=0.8):
|
|
try:
|
|
file = file.strip(" ").strip('"').strip("\n").strip('"').strip(" ")
|
|
if not os.path.isfile(file): raise FileNotFoundError(translations["not_found"].format(name=file))
|
|
|
|
try:
|
|
audio, sr = sf.read(file, dtype=np.float32)
|
|
except:
|
|
audio, sr = librosa.load(file, sr=None)
|
|
|
|
if len(audio.shape) > 1: audio = librosa.to_mono(audio.T)
|
|
if sr != sample_rate: audio = librosa.resample(audio, orig_sr=sr, target_sr=sample_rate, res_type="soxr_vhq")
|
|
|
|
if formant_shifting:
|
|
from main.library.algorithm.stftpitchshift import StftPitchShift
|
|
|
|
pitchshifter = StftPitchShift(1024, 32, sample_rate)
|
|
audio = pitchshifter.shiftpitch(audio, factors=1, quefrency=formant_qfrency * 1e-3, distortion=formant_timbre)
|
|
except Exception as e:
|
|
raise RuntimeError(f"{translations['errors_loading_audio']}: {e}")
|
|
|
|
return audio.flatten()
|
|
|
|
def pydub_load(input_path, volume = None):
|
|
try:
|
|
if input_path.endswith(".wav"): audio = AudioSegment.from_wav(input_path)
|
|
elif input_path.endswith(".mp3"): audio = AudioSegment.from_mp3(input_path)
|
|
elif input_path.endswith(".ogg"): audio = AudioSegment.from_ogg(input_path)
|
|
else: audio = AudioSegment.from_file(input_path)
|
|
except:
|
|
audio = AudioSegment.from_file(input_path)
|
|
|
|
return audio if volume is None else audio + volume
|
|
|
|
def load_embedders_model(embedder_model, embedders_mode="fairseq"):
|
|
if embedders_mode == "fairseq": embedder_model += ".pt"
|
|
elif embedders_mode == "onnx": embedder_model += ".onnx"
|
|
elif embedders_mode == "spin": embedders_mode, embedder_model = "transformers", "spin"
|
|
|
|
embedder_model_path = os.path.join(configs["embedders_path"], embedder_model)
|
|
if not os.path.exists(embedder_model_path): raise FileNotFoundError(f"{translations['not_found'].format(name=translations['model'])}: {embedder_model}")
|
|
|
|
try:
|
|
if embedders_mode == "fairseq":
|
|
embed_suffix = ".pt"
|
|
hubert_model = fairseq.load_model(embedder_model_path)
|
|
elif embedders_mode == "onnx":
|
|
sess_options = onnxruntime.SessionOptions()
|
|
sess_options.log_severity_level = 3
|
|
embed_suffix = ".onnx"
|
|
hubert_model = onnxruntime.InferenceSession(embedder_model_path, sess_options=sess_options, providers=get_providers())
|
|
elif embedders_mode == "transformers":
|
|
embed_suffix = ".safetensors"
|
|
hubert_model = HubertModelWithFinalProj.from_pretrained(embedder_model_path)
|
|
else: raise ValueError(translations["option_not_valid"])
|
|
except Exception as e:
|
|
raise RuntimeError(translations["read_model_error"].format(e=e))
|
|
|
|
return hubert_model, embed_suffix
|
|
|
|
def cut(audio, sr, db_thresh=-60, min_interval=250):
|
|
from main.inference.preprocess.slicer2 import Slicer2
|
|
|
|
slicer = Slicer2(sr=sr, threshold=db_thresh, min_interval=min_interval)
|
|
return slicer.slice2(audio)
|
|
|
|
def restore(segments, total_len, dtype=np.float32):
|
|
out = []
|
|
last_end = 0
|
|
|
|
for start, end, processed_seg in segments:
|
|
if start > last_end: out.append(np.zeros(start - last_end, dtype=dtype))
|
|
|
|
out.append(processed_seg)
|
|
last_end = end
|
|
|
|
if last_end < total_len: out.append(np.zeros(total_len - last_end, dtype=dtype))
|
|
return np.concatenate(out, axis=-1)
|
|
|
|
def get_providers():
|
|
ort_providers = onnxruntime.get_available_providers()
|
|
|
|
if "CUDAExecutionProvider" in ort_providers and config.device.startswith("cuda"):
|
|
providers = ["CUDAExecutionProvider"]
|
|
elif "DmlExecutionProvider" in ort_providers and config.device.startswith("ocl"):
|
|
providers = ["DmlExecutionProvider"]
|
|
elif "CoreMLExecutionProvider" in ort_providers and config.device.startswith("mps"):
|
|
providers = ["CoreMLExecutionProvider"]
|
|
else:
|
|
providers = ["CPUExecutionProvider"]
|
|
logger.info(translations["running_in_cpu"])
|
|
|
|
if not providers[0].startswith("CPUExecutionProvider"): logger.debug(translations["onnx_have"].format(have=providers[0]))
|
|
return providers
|
|
|
|
def extract_features(model, feats, version):
|
|
feats0 = model.run(
|
|
[model.get_outputs()[0].name, model.get_outputs()[1].name],
|
|
{
|
|
"feats": feats.detach().cpu().numpy()
|
|
}
|
|
)[0 if version == "v1" else 1]
|
|
|
|
return torch.as_tensor(feats0, dtype=torch.float32, device=feats.device) |