AnhP's picture
Upload 170 files
1e4a2ab verified
raw
history blame
19.3 kB
import os
import sys
import json
import onnx
import time
import torch
import librosa
import logging
import argparse
import warnings
import onnxruntime
import numpy as np
import soundfile as sf
from tqdm import tqdm
from distutils.util import strtobool
warnings.filterwarnings("ignore")
sys.path.append(os.getcwd())
from main.inference.conversion.pipeline import Pipeline
from main.app.variables import config, logger, translations
from main.library.algorithm.synthesizers import Synthesizer
from main.inference.conversion.utils import clear_gpu_cache
from main.library.utils import check_assets, load_audio, load_embedders_model, cut, restore, get_providers
for l in ["torch", "faiss", "omegaconf", "httpx", "httpcore", "faiss.loader", "numba.core", "urllib3", "transformers", "matplotlib"]:
logging.getLogger(l).setLevel(logging.ERROR)
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--convert", action='store_true')
parser.add_argument("--pitch", type=int, default=0)
parser.add_argument("--filter_radius", type=int, default=3)
parser.add_argument("--index_rate", type=float, default=0.5)
parser.add_argument("--rms_mix_rate", type=float, default=1)
parser.add_argument("--protect", type=float, default=0.33)
parser.add_argument("--hop_length", type=int, default=64)
parser.add_argument("--f0_method", type=str, default="rmvpe")
parser.add_argument("--embedder_model", type=str, default="contentvec_base")
parser.add_argument("--input_path", type=str, required=True)
parser.add_argument("--output_path", type=str, default="./audios/output.wav")
parser.add_argument("--export_format", type=str, default="wav")
parser.add_argument("--pth_path", type=str, required=True)
parser.add_argument("--index_path", type=str, default="")
parser.add_argument("--f0_autotune", type=lambda x: bool(strtobool(x)), default=False)
parser.add_argument("--f0_autotune_strength", type=float, default=1)
parser.add_argument("--clean_audio", type=lambda x: bool(strtobool(x)), default=False)
parser.add_argument("--clean_strength", type=float, default=0.7)
parser.add_argument("--resample_sr", type=int, default=0)
parser.add_argument("--split_audio", type=lambda x: bool(strtobool(x)), default=False)
parser.add_argument("--checkpointing", type=lambda x: bool(strtobool(x)), default=False)
parser.add_argument("--f0_file", type=str, default="")
parser.add_argument("--f0_onnx", type=lambda x: bool(strtobool(x)), default=False)
parser.add_argument("--embedders_mode", type=str, default="fairseq")
parser.add_argument("--formant_shifting", type=lambda x: bool(strtobool(x)), default=False)
parser.add_argument("--formant_qfrency", type=float, default=0.8)
parser.add_argument("--formant_timbre", type=float, default=0.8)
parser.add_argument("--proposal_pitch", type=lambda x: bool(strtobool(x)), default=False)
parser.add_argument("--proposal_pitch_threshold", type=float, default=255.0)
return parser.parse_args()
def main():
args = parse_arguments()
pitch, filter_radius, index_rate, rms_mix_rate, protect, hop_length, f0_method, input_path, output_path, pth_path, index_path, f0_autotune, f0_autotune_strength, clean_audio, clean_strength, export_format, embedder_model, resample_sr, split_audio, checkpointing, f0_file, f0_onnx, embedders_mode, formant_shifting, formant_qfrency, formant_timbre, proposal_pitch, proposal_pitch_threshold = args.pitch, args.filter_radius, args.index_rate, args.rms_mix_rate,args.protect, args.hop_length, args.f0_method, args.input_path, args.output_path, args.pth_path, args.index_path, args.f0_autotune, args.f0_autotune_strength, args.clean_audio, args.clean_strength, args.export_format, args.embedder_model, args.resample_sr, args.split_audio, args.checkpointing, args.f0_file, args.f0_onnx, args.embedders_mode, args.formant_shifting, args.formant_qfrency, args.formant_timbre, args.proposal_pitch, args.proposal_pitch_threshold
run_convert_script(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, rms_mix_rate=rms_mix_rate, protect=protect, hop_length=hop_length, f0_method=f0_method, input_path=input_path, output_path=output_path, pth_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, embedder_model=embedder_model, resample_sr=resample_sr, split_audio=split_audio, checkpointing=checkpointing, f0_file=f0_file, f0_onnx=f0_onnx, embedders_mode=embedders_mode, formant_shifting=formant_shifting, formant_qfrency=formant_qfrency, formant_timbre=formant_timbre, proposal_pitch=proposal_pitch, proposal_pitch_threshold=proposal_pitch_threshold)
def run_convert_script(pitch=0, filter_radius=3, index_rate=0.5, rms_mix_rate=1, protect=0.5, hop_length=64, f0_method="rmvpe", input_path=None, output_path="./output.wav", pth_path=None, index_path=None, f0_autotune=False, f0_autotune_strength=1, clean_audio=False, clean_strength=0.7, export_format="wav", embedder_model="contentvec_base", resample_sr=0, split_audio=False, checkpointing=False, f0_file=None, f0_onnx=False, embedders_mode="fairseq", formant_shifting=False, formant_qfrency=0.8, formant_timbre=0.8, proposal_pitch=False, proposal_pitch_threshold=255.0):
check_assets(f0_method, embedder_model, f0_onnx=f0_onnx, embedders_mode=embedders_mode)
log_data = {translations['pitch']: pitch, translations['filter_radius']: filter_radius, translations['index_strength']: index_rate, translations['rms_mix_rate']: rms_mix_rate, translations['protect']: protect, "Hop length": hop_length, translations['f0_method']: f0_method, translations['audio_path']: input_path, translations['output_path']: output_path.replace('wav', export_format), translations['model_path']: pth_path, translations['indexpath']: index_path, translations['autotune']: f0_autotune, translations['clear_audio']: clean_audio, translations['export_format']: export_format, translations['hubert_model']: embedder_model, translations['split_audio']: split_audio, translations['memory_efficient_training']: checkpointing, translations["f0_onnx_mode"]: f0_onnx, translations["embed_mode"]: embedders_mode, translations["proposal_pitch"]: proposal_pitch}
if clean_audio: log_data[translations['clean_strength']] = clean_strength
if resample_sr != 0: log_data[translations['sample_rate']] = resample_sr
if f0_autotune: log_data[translations['autotune_rate_info']] = f0_autotune_strength
if os.path.isfile(f0_file): log_data[translations['f0_file']] = f0_file
if proposal_pitch: log_data[translations["proposal_pitch_threshold"]] = proposal_pitch_threshold
if formant_shifting:
log_data[translations['formant_qfrency']] = formant_qfrency
log_data[translations['formant_timbre']] = formant_timbre
for key, value in log_data.items():
logger.debug(f"{key}: {value}")
if not pth_path or not os.path.exists(pth_path) or os.path.isdir(pth_path) or not pth_path.endswith((".pth", ".onnx")):
logger.warning(translations["provide_file"].format(filename=translations["model"]))
sys.exit(1)
cvt = VoiceConverter(pth_path, 0)
start_time = time.time()
pid_path = os.path.join("assets", "convert_pid.txt")
with open(pid_path, "w") as pid_file:
pid_file.write(str(os.getpid()))
if os.path.isdir(input_path):
logger.info(translations["convert_batch"])
audio_files = [f for f in os.listdir(input_path) if f.lower().endswith(("wav", "mp3", "flac", "ogg", "opus", "m4a", "mp4", "aac", "alac", "wma", "aiff", "webm", "ac3"))]
if not audio_files:
logger.warning(translations["not_found_audio"])
sys.exit(1)
logger.info(translations["found_audio"].format(audio_files=len(audio_files)))
for audio in audio_files:
audio_path = os.path.join(input_path, audio)
output_audio = os.path.join(input_path, os.path.splitext(audio)[0] + f"_output.{export_format}")
logger.info(f"{translations['convert_audio']} '{audio_path}'...")
if os.path.exists(output_audio): os.remove(output_audio)
cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, rms_mix_rate=rms_mix_rate, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=audio_path, audio_output_path=output_audio, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, embedder_model=embedder_model, resample_sr=resample_sr, checkpointing=checkpointing, f0_file=f0_file, f0_onnx=f0_onnx, embedders_mode=embedders_mode, formant_shifting=formant_shifting, formant_qfrency=formant_qfrency, formant_timbre=formant_timbre, split_audio=split_audio, proposal_pitch=proposal_pitch, proposal_pitch_threshold=proposal_pitch_threshold)
logger.info(translations["convert_batch_success"].format(elapsed_time=f"{(time.time() - start_time):.2f}", output_path=output_path.replace('wav', export_format)))
else:
if not os.path.exists(input_path):
logger.warning(translations["not_found_audio"])
sys.exit(1)
logger.info(f"{translations['convert_audio']} '{input_path}'...")
if os.path.exists(output_path): os.remove(output_path)
cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, rms_mix_rate=rms_mix_rate, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=input_path, audio_output_path=output_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, embedder_model=embedder_model, resample_sr=resample_sr, checkpointing=checkpointing, f0_file=f0_file, f0_onnx=f0_onnx, embedders_mode=embedders_mode, formant_shifting=formant_shifting, formant_qfrency=formant_qfrency, formant_timbre=formant_timbre, split_audio=split_audio, proposal_pitch=proposal_pitch, proposal_pitch_threshold=proposal_pitch_threshold)
logger.info(translations["convert_audio_success"].format(input_path=input_path, elapsed_time=f"{(time.time() - start_time):.2f}", output_path=output_path.replace('wav', export_format)))
if os.path.exists(pid_path): os.remove(pid_path)
class VoiceConverter:
def __init__(self, model_path, sid = 0):
self.config = config
self.device = config.device
self.hubert_model = None
self.tgt_sr = None
self.net_g = None
self.vc = None
self.cpt = None
self.version = None
self.n_spk = None
self.use_f0 = None
self.loaded_model = None
self.vocoder = "Default"
self.checkpointing = False
self.sample_rate = 16000
self.sid = sid
self.get_vc(model_path, sid)
def convert_audio(self, audio_input_path, audio_output_path, index_path, embedder_model, pitch, f0_method, index_rate, rms_mix_rate, protect, hop_length, f0_autotune, f0_autotune_strength, filter_radius, clean_audio, clean_strength, export_format, resample_sr = 0, checkpointing = False, f0_file = None, f0_onnx = False, embedders_mode = "fairseq", formant_shifting = False, formant_qfrency = 0.8, formant_timbre = 0.8, split_audio = False, proposal_pitch = False, proposal_pitch_threshold = 255.0):
try:
with tqdm(total=10, desc=translations["convert_audio"], ncols=100, unit="a", leave=not split_audio) as pbar:
audio = load_audio(audio_input_path, self.sample_rate, formant_shifting=formant_shifting, formant_qfrency=formant_qfrency, formant_timbre=formant_timbre)
self.checkpointing = checkpointing
audio_max = np.abs(audio).max() / 0.95
if audio_max > 1: audio /= audio_max
if not self.hubert_model:
models, embed_suffix = load_embedders_model(embedder_model, embedders_mode)
self.hubert_model = (models.to(self.device).half() if self.config.is_half else models.to(self.device).float()).eval() if embed_suffix in [".pt", ".safetensors"] else models
self.embed_suffix = embed_suffix
pbar.update(1)
if split_audio:
pbar.close()
chunks = cut(audio, self.sample_rate, db_thresh=-60, min_interval=500)
logger.info(f"{translations['split_total']}: {len(chunks)}")
pbar = tqdm(total=len(chunks) * 5 + 4, desc=translations["convert_audio"], ncols=100, unit="a", leave=True)
else: chunks = [(audio, 0, 0)]
pbar.update(1)
converted_chunks = [(
start,
end,
self.vc.pipeline(
logger=logger,
model=self.hubert_model,
net_g=self.net_g,
sid=self.sid,
audio=waveform,
f0_up_key=pitch,
f0_method=f0_method,
file_index=(index_path.strip().strip('"').strip("\n").strip('"').strip().replace("trained", "added")),
index_rate=index_rate,
pitch_guidance=self.use_f0,
filter_radius=filter_radius,
rms_mix_rate=rms_mix_rate,
version=self.version,
protect=protect,
hop_length=hop_length,
f0_autotune=f0_autotune,
f0_autotune_strength=f0_autotune_strength,
suffix=self.suffix,
embed_suffix=self.embed_suffix,
f0_file=f0_file,
f0_onnx=f0_onnx,
pbar=pbar,
proposal_pitch=proposal_pitch,
proposal_pitch_threshold=proposal_pitch_threshold,
energy_use=self.energy
)
) for waveform, start, end in chunks]
pbar.update(1)
del self.net_g, self.hubert_model
audio_output = restore(converted_chunks, total_len=len(audio), dtype=converted_chunks[0][2].dtype) if split_audio else converted_chunks[0][2]
if self.tgt_sr != resample_sr and resample_sr > 0:
audio_output = librosa.resample(audio_output, orig_sr=self.tgt_sr, target_sr=resample_sr, res_type="soxr_vhq")
self.tgt_sr = resample_sr
pbar.update(1)
if clean_audio:
from main.tools.noisereduce import reduce_noise
audio_output = reduce_noise(y=audio_output, sr=self.tgt_sr, prop_decrease=clean_strength, device=self.device)
if len(audio) / self.sample_rate > len(audio_output) / self.tgt_sr:
padding = np.zeros(int(np.round(len(audio) / self.sample_rate * self.tgt_sr) - len(audio_output)), dtype=audio_output.dtype)
audio_output = np.concatenate([audio_output, padding])
try:
sf.write(audio_output_path, audio_output, self.tgt_sr, format=export_format)
except:
sf.write(audio_output_path, librosa.resample(audio_output, orig_sr=self.tgt_sr, target_sr=48000, res_type="soxr_vhq"), 48000, format=export_format)
pbar.update(1)
except Exception as e:
logger.error(translations["error_convert"].format(e=e))
import traceback
logger.debug(traceback.format_exc())
def get_vc(self, weight_root, sid):
if sid == "" or sid == []:
self.cleanup()
clear_gpu_cache()
if not self.loaded_model or self.loaded_model != weight_root:
self.loaded_model = weight_root
self.load_model()
if self.cpt is not None: self.setup()
def cleanup(self):
if self.hubert_model is not None:
del self.net_g, self.n_spk, self.vc, self.hubert_model, self.tgt_sr
self.hubert_model = self.net_g = self.n_spk = self.vc = self.tgt_sr = None
clear_gpu_cache()
del self.net_g, self.cpt
clear_gpu_cache()
self.cpt = None
def load_model(self):
if os.path.isfile(self.loaded_model):
if self.loaded_model.endswith(".pth"): self.cpt = torch.load(self.loaded_model, map_location="cpu", weights_only=True)
else:
sess_options = onnxruntime.SessionOptions()
sess_options.log_severity_level = 3
self.cpt = onnxruntime.InferenceSession(self.loaded_model, sess_options=sess_options, providers=get_providers())
else: self.cpt = None
def setup(self):
if self.cpt is not None:
if self.loaded_model.endswith(".pth"):
self.tgt_sr = self.cpt["config"][-1]
self.cpt["config"][-3] = self.cpt["weight"]["emb_g.weight"].shape[0]
self.use_f0 = self.cpt.get("f0", 1)
self.version = self.cpt.get("version", "v1")
self.vocoder = self.cpt.get("vocoder", "Default")
self.energy = self.cpt.get("energy", False)
if self.vocoder != "Default": self.config.is_half = False
self.net_g = Synthesizer(*self.cpt["config"], use_f0=self.use_f0, text_enc_hidden_dim=768 if self.version == "v2" else 256, vocoder=self.vocoder, checkpointing=self.checkpointing, energy=self.energy)
del self.net_g.enc_q
self.net_g.load_state_dict(self.cpt["weight"], strict=False)
self.net_g.eval().to(self.device)
self.net_g = (self.net_g.half() if self.config.is_half else self.net_g.float())
self.n_spk = self.cpt["config"][-3]
self.suffix = ".pth"
else:
metadata_dict = None
for prop in onnx.load(self.loaded_model).metadata_props:
if prop.key == "model_info":
metadata_dict = json.loads(prop.value)
break
self.net_g = self.cpt
self.tgt_sr = metadata_dict.get("sr", 32000)
self.use_f0 = metadata_dict.get("f0", 1)
self.version = metadata_dict.get("version", "v1")
self.energy = metadata_dict.get("energy", False)
self.suffix = ".onnx"
self.vc = Pipeline(self.tgt_sr, self.config)
if __name__ == "__main__": main()