|
import os
|
|
import sys
|
|
import json
|
|
import onnx
|
|
import time
|
|
import torch
|
|
import librosa
|
|
import logging
|
|
import argparse
|
|
import warnings
|
|
import onnxruntime
|
|
|
|
import numpy as np
|
|
import soundfile as sf
|
|
|
|
from tqdm import tqdm
|
|
from distutils.util import strtobool
|
|
|
|
warnings.filterwarnings("ignore")
|
|
sys.path.append(os.getcwd())
|
|
|
|
from main.inference.conversion.pipeline import Pipeline
|
|
from main.app.variables import config, logger, translations
|
|
from main.library.algorithm.synthesizers import Synthesizer
|
|
from main.inference.conversion.utils import clear_gpu_cache
|
|
from main.library.utils import check_assets, load_audio, load_embedders_model, cut, restore, get_providers
|
|
|
|
for l in ["torch", "faiss", "omegaconf", "httpx", "httpcore", "faiss.loader", "numba.core", "urllib3", "transformers", "matplotlib"]:
|
|
logging.getLogger(l).setLevel(logging.ERROR)
|
|
|
|
def parse_arguments():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--convert", action='store_true')
|
|
parser.add_argument("--pitch", type=int, default=0)
|
|
parser.add_argument("--filter_radius", type=int, default=3)
|
|
parser.add_argument("--index_rate", type=float, default=0.5)
|
|
parser.add_argument("--rms_mix_rate", type=float, default=1)
|
|
parser.add_argument("--protect", type=float, default=0.33)
|
|
parser.add_argument("--hop_length", type=int, default=64)
|
|
parser.add_argument("--f0_method", type=str, default="rmvpe")
|
|
parser.add_argument("--embedder_model", type=str, default="contentvec_base")
|
|
parser.add_argument("--input_path", type=str, required=True)
|
|
parser.add_argument("--output_path", type=str, default="./audios/output.wav")
|
|
parser.add_argument("--export_format", type=str, default="wav")
|
|
parser.add_argument("--pth_path", type=str, required=True)
|
|
parser.add_argument("--index_path", type=str, default="")
|
|
parser.add_argument("--f0_autotune", type=lambda x: bool(strtobool(x)), default=False)
|
|
parser.add_argument("--f0_autotune_strength", type=float, default=1)
|
|
parser.add_argument("--clean_audio", type=lambda x: bool(strtobool(x)), default=False)
|
|
parser.add_argument("--clean_strength", type=float, default=0.7)
|
|
parser.add_argument("--resample_sr", type=int, default=0)
|
|
parser.add_argument("--split_audio", type=lambda x: bool(strtobool(x)), default=False)
|
|
parser.add_argument("--checkpointing", type=lambda x: bool(strtobool(x)), default=False)
|
|
parser.add_argument("--f0_file", type=str, default="")
|
|
parser.add_argument("--f0_onnx", type=lambda x: bool(strtobool(x)), default=False)
|
|
parser.add_argument("--embedders_mode", type=str, default="fairseq")
|
|
parser.add_argument("--formant_shifting", type=lambda x: bool(strtobool(x)), default=False)
|
|
parser.add_argument("--formant_qfrency", type=float, default=0.8)
|
|
parser.add_argument("--formant_timbre", type=float, default=0.8)
|
|
parser.add_argument("--proposal_pitch", type=lambda x: bool(strtobool(x)), default=False)
|
|
parser.add_argument("--proposal_pitch_threshold", type=float, default=255.0)
|
|
|
|
return parser.parse_args()
|
|
|
|
def main():
|
|
args = parse_arguments()
|
|
pitch, filter_radius, index_rate, rms_mix_rate, protect, hop_length, f0_method, input_path, output_path, pth_path, index_path, f0_autotune, f0_autotune_strength, clean_audio, clean_strength, export_format, embedder_model, resample_sr, split_audio, checkpointing, f0_file, f0_onnx, embedders_mode, formant_shifting, formant_qfrency, formant_timbre, proposal_pitch, proposal_pitch_threshold = args.pitch, args.filter_radius, args.index_rate, args.rms_mix_rate,args.protect, args.hop_length, args.f0_method, args.input_path, args.output_path, args.pth_path, args.index_path, args.f0_autotune, args.f0_autotune_strength, args.clean_audio, args.clean_strength, args.export_format, args.embedder_model, args.resample_sr, args.split_audio, args.checkpointing, args.f0_file, args.f0_onnx, args.embedders_mode, args.formant_shifting, args.formant_qfrency, args.formant_timbre, args.proposal_pitch, args.proposal_pitch_threshold
|
|
|
|
run_convert_script(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, rms_mix_rate=rms_mix_rate, protect=protect, hop_length=hop_length, f0_method=f0_method, input_path=input_path, output_path=output_path, pth_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, embedder_model=embedder_model, resample_sr=resample_sr, split_audio=split_audio, checkpointing=checkpointing, f0_file=f0_file, f0_onnx=f0_onnx, embedders_mode=embedders_mode, formant_shifting=formant_shifting, formant_qfrency=formant_qfrency, formant_timbre=formant_timbre, proposal_pitch=proposal_pitch, proposal_pitch_threshold=proposal_pitch_threshold)
|
|
|
|
def run_convert_script(pitch=0, filter_radius=3, index_rate=0.5, rms_mix_rate=1, protect=0.5, hop_length=64, f0_method="rmvpe", input_path=None, output_path="./output.wav", pth_path=None, index_path=None, f0_autotune=False, f0_autotune_strength=1, clean_audio=False, clean_strength=0.7, export_format="wav", embedder_model="contentvec_base", resample_sr=0, split_audio=False, checkpointing=False, f0_file=None, f0_onnx=False, embedders_mode="fairseq", formant_shifting=False, formant_qfrency=0.8, formant_timbre=0.8, proposal_pitch=False, proposal_pitch_threshold=255.0):
|
|
check_assets(f0_method, embedder_model, f0_onnx=f0_onnx, embedders_mode=embedders_mode)
|
|
log_data = {translations['pitch']: pitch, translations['filter_radius']: filter_radius, translations['index_strength']: index_rate, translations['rms_mix_rate']: rms_mix_rate, translations['protect']: protect, "Hop length": hop_length, translations['f0_method']: f0_method, translations['audio_path']: input_path, translations['output_path']: output_path.replace('wav', export_format), translations['model_path']: pth_path, translations['indexpath']: index_path, translations['autotune']: f0_autotune, translations['clear_audio']: clean_audio, translations['export_format']: export_format, translations['hubert_model']: embedder_model, translations['split_audio']: split_audio, translations['memory_efficient_training']: checkpointing, translations["f0_onnx_mode"]: f0_onnx, translations["embed_mode"]: embedders_mode, translations["proposal_pitch"]: proposal_pitch}
|
|
|
|
if clean_audio: log_data[translations['clean_strength']] = clean_strength
|
|
if resample_sr != 0: log_data[translations['sample_rate']] = resample_sr
|
|
if f0_autotune: log_data[translations['autotune_rate_info']] = f0_autotune_strength
|
|
if os.path.isfile(f0_file): log_data[translations['f0_file']] = f0_file
|
|
if proposal_pitch: log_data[translations["proposal_pitch_threshold"]] = proposal_pitch_threshold
|
|
if formant_shifting:
|
|
log_data[translations['formant_qfrency']] = formant_qfrency
|
|
log_data[translations['formant_timbre']] = formant_timbre
|
|
|
|
for key, value in log_data.items():
|
|
logger.debug(f"{key}: {value}")
|
|
|
|
if not pth_path or not os.path.exists(pth_path) or os.path.isdir(pth_path) or not pth_path.endswith((".pth", ".onnx")):
|
|
logger.warning(translations["provide_file"].format(filename=translations["model"]))
|
|
sys.exit(1)
|
|
|
|
cvt = VoiceConverter(pth_path, 0)
|
|
start_time = time.time()
|
|
|
|
pid_path = os.path.join("assets", "convert_pid.txt")
|
|
with open(pid_path, "w") as pid_file:
|
|
pid_file.write(str(os.getpid()))
|
|
|
|
if os.path.isdir(input_path):
|
|
logger.info(translations["convert_batch"])
|
|
audio_files = [f for f in os.listdir(input_path) if f.lower().endswith(("wav", "mp3", "flac", "ogg", "opus", "m4a", "mp4", "aac", "alac", "wma", "aiff", "webm", "ac3"))]
|
|
|
|
if not audio_files:
|
|
logger.warning(translations["not_found_audio"])
|
|
sys.exit(1)
|
|
|
|
logger.info(translations["found_audio"].format(audio_files=len(audio_files)))
|
|
|
|
for audio in audio_files:
|
|
audio_path = os.path.join(input_path, audio)
|
|
output_audio = os.path.join(input_path, os.path.splitext(audio)[0] + f"_output.{export_format}")
|
|
|
|
logger.info(f"{translations['convert_audio']} '{audio_path}'...")
|
|
if os.path.exists(output_audio): os.remove(output_audio)
|
|
|
|
cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, rms_mix_rate=rms_mix_rate, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=audio_path, audio_output_path=output_audio, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, embedder_model=embedder_model, resample_sr=resample_sr, checkpointing=checkpointing, f0_file=f0_file, f0_onnx=f0_onnx, embedders_mode=embedders_mode, formant_shifting=formant_shifting, formant_qfrency=formant_qfrency, formant_timbre=formant_timbre, split_audio=split_audio, proposal_pitch=proposal_pitch, proposal_pitch_threshold=proposal_pitch_threshold)
|
|
|
|
logger.info(translations["convert_batch_success"].format(elapsed_time=f"{(time.time() - start_time):.2f}", output_path=output_path.replace('wav', export_format)))
|
|
else:
|
|
if not os.path.exists(input_path):
|
|
logger.warning(translations["not_found_audio"])
|
|
sys.exit(1)
|
|
|
|
logger.info(f"{translations['convert_audio']} '{input_path}'...")
|
|
if os.path.exists(output_path): os.remove(output_path)
|
|
|
|
cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, rms_mix_rate=rms_mix_rate, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=input_path, audio_output_path=output_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, embedder_model=embedder_model, resample_sr=resample_sr, checkpointing=checkpointing, f0_file=f0_file, f0_onnx=f0_onnx, embedders_mode=embedders_mode, formant_shifting=formant_shifting, formant_qfrency=formant_qfrency, formant_timbre=formant_timbre, split_audio=split_audio, proposal_pitch=proposal_pitch, proposal_pitch_threshold=proposal_pitch_threshold)
|
|
logger.info(translations["convert_audio_success"].format(input_path=input_path, elapsed_time=f"{(time.time() - start_time):.2f}", output_path=output_path.replace('wav', export_format)))
|
|
|
|
if os.path.exists(pid_path): os.remove(pid_path)
|
|
|
|
class VoiceConverter:
|
|
def __init__(self, model_path, sid = 0):
|
|
self.config = config
|
|
self.device = config.device
|
|
self.hubert_model = None
|
|
self.tgt_sr = None
|
|
self.net_g = None
|
|
self.vc = None
|
|
self.cpt = None
|
|
self.version = None
|
|
self.n_spk = None
|
|
self.use_f0 = None
|
|
self.loaded_model = None
|
|
self.vocoder = "Default"
|
|
self.checkpointing = False
|
|
self.sample_rate = 16000
|
|
self.sid = sid
|
|
self.get_vc(model_path, sid)
|
|
|
|
def convert_audio(self, audio_input_path, audio_output_path, index_path, embedder_model, pitch, f0_method, index_rate, rms_mix_rate, protect, hop_length, f0_autotune, f0_autotune_strength, filter_radius, clean_audio, clean_strength, export_format, resample_sr = 0, checkpointing = False, f0_file = None, f0_onnx = False, embedders_mode = "fairseq", formant_shifting = False, formant_qfrency = 0.8, formant_timbre = 0.8, split_audio = False, proposal_pitch = False, proposal_pitch_threshold = 255.0):
|
|
try:
|
|
with tqdm(total=10, desc=translations["convert_audio"], ncols=100, unit="a", leave=not split_audio) as pbar:
|
|
audio = load_audio(audio_input_path, self.sample_rate, formant_shifting=formant_shifting, formant_qfrency=formant_qfrency, formant_timbre=formant_timbre)
|
|
self.checkpointing = checkpointing
|
|
|
|
audio_max = np.abs(audio).max() / 0.95
|
|
if audio_max > 1: audio /= audio_max
|
|
|
|
if not self.hubert_model:
|
|
models, embed_suffix = load_embedders_model(embedder_model, embedders_mode)
|
|
self.hubert_model = (models.to(self.device).half() if self.config.is_half else models.to(self.device).float()).eval() if embed_suffix in [".pt", ".safetensors"] else models
|
|
self.embed_suffix = embed_suffix
|
|
|
|
pbar.update(1)
|
|
if split_audio:
|
|
pbar.close()
|
|
chunks = cut(audio, self.sample_rate, db_thresh=-60, min_interval=500)
|
|
|
|
logger.info(f"{translations['split_total']}: {len(chunks)}")
|
|
pbar = tqdm(total=len(chunks) * 5 + 4, desc=translations["convert_audio"], ncols=100, unit="a", leave=True)
|
|
else: chunks = [(audio, 0, 0)]
|
|
|
|
pbar.update(1)
|
|
converted_chunks = [(
|
|
start,
|
|
end,
|
|
self.vc.pipeline(
|
|
logger=logger,
|
|
model=self.hubert_model,
|
|
net_g=self.net_g,
|
|
sid=self.sid,
|
|
audio=waveform,
|
|
f0_up_key=pitch,
|
|
f0_method=f0_method,
|
|
file_index=(index_path.strip().strip('"').strip("\n").strip('"').strip().replace("trained", "added")),
|
|
index_rate=index_rate,
|
|
pitch_guidance=self.use_f0,
|
|
filter_radius=filter_radius,
|
|
rms_mix_rate=rms_mix_rate,
|
|
version=self.version,
|
|
protect=protect,
|
|
hop_length=hop_length,
|
|
f0_autotune=f0_autotune,
|
|
f0_autotune_strength=f0_autotune_strength,
|
|
suffix=self.suffix,
|
|
embed_suffix=self.embed_suffix,
|
|
f0_file=f0_file,
|
|
f0_onnx=f0_onnx,
|
|
pbar=pbar,
|
|
proposal_pitch=proposal_pitch,
|
|
proposal_pitch_threshold=proposal_pitch_threshold,
|
|
energy_use=self.energy
|
|
)
|
|
) for waveform, start, end in chunks]
|
|
|
|
pbar.update(1)
|
|
|
|
del self.net_g, self.hubert_model
|
|
audio_output = restore(converted_chunks, total_len=len(audio), dtype=converted_chunks[0][2].dtype) if split_audio else converted_chunks[0][2]
|
|
|
|
if self.tgt_sr != resample_sr and resample_sr > 0:
|
|
audio_output = librosa.resample(audio_output, orig_sr=self.tgt_sr, target_sr=resample_sr, res_type="soxr_vhq")
|
|
self.tgt_sr = resample_sr
|
|
|
|
pbar.update(1)
|
|
if clean_audio:
|
|
from main.tools.noisereduce import reduce_noise
|
|
audio_output = reduce_noise(y=audio_output, sr=self.tgt_sr, prop_decrease=clean_strength, device=self.device)
|
|
|
|
if len(audio) / self.sample_rate > len(audio_output) / self.tgt_sr:
|
|
padding = np.zeros(int(np.round(len(audio) / self.sample_rate * self.tgt_sr) - len(audio_output)), dtype=audio_output.dtype)
|
|
audio_output = np.concatenate([audio_output, padding])
|
|
|
|
try:
|
|
sf.write(audio_output_path, audio_output, self.tgt_sr, format=export_format)
|
|
except:
|
|
sf.write(audio_output_path, librosa.resample(audio_output, orig_sr=self.tgt_sr, target_sr=48000, res_type="soxr_vhq"), 48000, format=export_format)
|
|
|
|
pbar.update(1)
|
|
except Exception as e:
|
|
logger.error(translations["error_convert"].format(e=e))
|
|
import traceback
|
|
logger.debug(traceback.format_exc())
|
|
|
|
def get_vc(self, weight_root, sid):
|
|
if sid == "" or sid == []:
|
|
self.cleanup()
|
|
clear_gpu_cache()
|
|
|
|
if not self.loaded_model or self.loaded_model != weight_root:
|
|
self.loaded_model = weight_root
|
|
self.load_model()
|
|
if self.cpt is not None: self.setup()
|
|
|
|
def cleanup(self):
|
|
if self.hubert_model is not None:
|
|
del self.net_g, self.n_spk, self.vc, self.hubert_model, self.tgt_sr
|
|
self.hubert_model = self.net_g = self.n_spk = self.vc = self.tgt_sr = None
|
|
clear_gpu_cache()
|
|
|
|
del self.net_g, self.cpt
|
|
clear_gpu_cache()
|
|
self.cpt = None
|
|
|
|
def load_model(self):
|
|
if os.path.isfile(self.loaded_model):
|
|
if self.loaded_model.endswith(".pth"): self.cpt = torch.load(self.loaded_model, map_location="cpu", weights_only=True)
|
|
else:
|
|
sess_options = onnxruntime.SessionOptions()
|
|
sess_options.log_severity_level = 3
|
|
self.cpt = onnxruntime.InferenceSession(self.loaded_model, sess_options=sess_options, providers=get_providers())
|
|
else: self.cpt = None
|
|
|
|
def setup(self):
|
|
if self.cpt is not None:
|
|
if self.loaded_model.endswith(".pth"):
|
|
self.tgt_sr = self.cpt["config"][-1]
|
|
self.cpt["config"][-3] = self.cpt["weight"]["emb_g.weight"].shape[0]
|
|
|
|
self.use_f0 = self.cpt.get("f0", 1)
|
|
self.version = self.cpt.get("version", "v1")
|
|
self.vocoder = self.cpt.get("vocoder", "Default")
|
|
self.energy = self.cpt.get("energy", False)
|
|
|
|
if self.vocoder != "Default": self.config.is_half = False
|
|
self.net_g = Synthesizer(*self.cpt["config"], use_f0=self.use_f0, text_enc_hidden_dim=768 if self.version == "v2" else 256, vocoder=self.vocoder, checkpointing=self.checkpointing, energy=self.energy)
|
|
del self.net_g.enc_q
|
|
|
|
self.net_g.load_state_dict(self.cpt["weight"], strict=False)
|
|
self.net_g.eval().to(self.device)
|
|
self.net_g = (self.net_g.half() if self.config.is_half else self.net_g.float())
|
|
self.n_spk = self.cpt["config"][-3]
|
|
self.suffix = ".pth"
|
|
else:
|
|
metadata_dict = None
|
|
for prop in onnx.load(self.loaded_model).metadata_props:
|
|
if prop.key == "model_info":
|
|
metadata_dict = json.loads(prop.value)
|
|
break
|
|
|
|
self.net_g = self.cpt
|
|
self.tgt_sr = metadata_dict.get("sr", 32000)
|
|
self.use_f0 = metadata_dict.get("f0", 1)
|
|
self.version = metadata_dict.get("version", "v1")
|
|
self.energy = metadata_dict.get("energy", False)
|
|
self.suffix = ".onnx"
|
|
|
|
self.vc = Pipeline(self.tgt_sr, self.config)
|
|
|
|
if __name__ == "__main__": main() |