VOICEVN / main /inference /convert.py
AnhP's picture
Upload 65 files
98bb602 verified
raw
history blame
47.8 kB
import gc
import re
import os
import sys
import time
import torch
import faiss
import shutil
import codecs
import pyworld
import librosa
import logging
import argparse
import warnings
import traceback
import torchcrepe
import subprocess
import parselmouth
import logging.handlers
import numpy as np
import soundfile as sf
import noisereduce as nr
import torch.nn.functional as F
import torch.multiprocessing as mp
from tqdm import tqdm
from scipy import signal
from torch import Tensor
from scipy.io import wavfile
from audio_upscaler import upscale
from distutils.util import strtobool
from fairseq import checkpoint_utils
from pydub import AudioSegment, silence
now_dir = os.getcwd()
sys.path.append(now_dir)
from main.configs.config import Config
from main.library.predictors.FCPE import FCPE
from main.library.predictors.RMVPE import RMVPE
from main.library.algorithm.synthesizers import Synthesizer
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
logging.getLogger("wget").setLevel(logging.ERROR)
logging.getLogger("torch").setLevel(logging.ERROR)
logging.getLogger("faiss").setLevel(logging.ERROR)
logging.getLogger("httpx").setLevel(logging.ERROR)
logging.getLogger("fairseq").setLevel(logging.ERROR)
logging.getLogger("httpcore").setLevel(logging.ERROR)
logging.getLogger("faiss.loader").setLevel(logging.ERROR)
FILTER_ORDER = 5
CUTOFF_FREQUENCY = 48
SAMPLE_RATE = 16000
bh, ah = signal.butter(N=FILTER_ORDER, Wn=CUTOFF_FREQUENCY, btype="high", fs=SAMPLE_RATE)
input_audio_path2wav = {}
log_file = os.path.join("assets", "logs", "convert.log")
logger = logging.getLogger(__name__)
logger.propagate = False
translations = Config().translations
if logger.hasHandlers(): logger.handlers.clear()
else:
console_handler = logging.StreamHandler()
console_formatter = logging.Formatter(fmt="\n%(asctime)s.%(msecs)03d | %(levelname)s | %(module)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
console_handler.setFormatter(console_formatter)
console_handler.setLevel(logging.INFO)
file_handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=5*1024*1024, backupCount=3, encoding='utf-8')
file_formatter = logging.Formatter(fmt="\n%(asctime)s.%(msecs)03d | %(levelname)s | %(module)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
file_handler.setFormatter(file_formatter)
file_handler.setLevel(logging.DEBUG)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.setLevel(logging.DEBUG)
def parse_arguments() -> tuple:
parser = argparse.ArgumentParser()
parser.add_argument("--pitch", type=int, default=0)
parser.add_argument("--filter_radius", type=int, default=3)
parser.add_argument("--index_rate", type=float, default=0.5)
parser.add_argument("--volume_envelope", type=float, default=1)
parser.add_argument("--protect", type=float, default=0.33)
parser.add_argument("--hop_length", type=int, default=64)
parser.add_argument( "--f0_method", type=str, default="rmvpe")
parser.add_argument("--input_path", type=str, required=True)
parser.add_argument("--output_path", type=str, default="./audios/output.wav")
parser.add_argument("--pth_path", type=str, required=True)
parser.add_argument("--index_path", type=str, required=True)
parser.add_argument("--f0_autotune", type=lambda x: bool(strtobool(x)), default=False)
parser.add_argument("--f0_autotune_strength", type=float, default=1)
parser.add_argument("--clean_audio", type=lambda x: bool(strtobool(x)), default=False)
parser.add_argument("--clean_strength", type=float, default=0.7)
parser.add_argument("--export_format", type=str, default="wav")
parser.add_argument("--embedder_model", type=str, default="contentvec_base")
parser.add_argument("--upscale_audio", type=lambda x: bool(strtobool(x)), default=False)
parser.add_argument("--resample_sr", type=int, default=0)
parser.add_argument("--batch_process", type=lambda x: bool(strtobool(x)), default=False)
parser.add_argument("--batch_size", type=int, default=2)
parser.add_argument("--split_audio", type=lambda x: bool(strtobool(x)), default=False)
args = parser.parse_args()
return args
def main():
args = parse_arguments()
pitch = args.pitch
filter_radius = args.filter_radius
index_rate = args.index_rate
volume_envelope = args.volume_envelope
protect = args.protect
hop_length = args.hop_length
f0_method = args.f0_method
input_path = args.input_path
output_path = args.output_path
pth_path = args.pth_path
index_path = args.index_path
f0_autotune = args.f0_autotune
f0_autotune_strength = args.f0_autotune_strength
clean_audio = args.clean_audio
clean_strength = args.clean_strength
export_format = args.export_format
embedder_model = args.embedder_model
upscale_audio = args.upscale_audio
resample_sr = args.resample_sr
batch_process = args.batch_process
batch_size = args.batch_size
split_audio = args.split_audio
logger.debug(f"{translations['pitch']}: {pitch}")
logger.debug(f"{translations['filter_radius']}: {filter_radius}")
logger.debug(f"{translations['index_strength']} {index_rate}")
logger.debug(f"{translations['volume_envelope']}: {volume_envelope}")
logger.debug(f"{translations['protect']}: {protect}")
if f0_method == "crepe" or f0_method == "crepe-tiny": logger.debug(f"Hop length: {hop_length}")
logger.debug(f"{translations['f0_method']}: {f0_method}")
logger.debug(f"f0_method: {input_path}")
logger.debug(f"{translations['audio_path']}: {input_path}")
logger.debug(f"{translations['output_path']}: {output_path.replace('.wav', f'.{export_format}')}")
logger.debug(f"{translations['model_path']}: {pth_path}")
logger.debug(f"{translations['indexpath']}: {index_path}")
logger.debug(f"{translations['autotune']}: {f0_autotune}")
logger.debug(f"{translations['clear_audio']}: {clean_audio}")
if clean_audio: logger.debug(f"{translations['clean_strength']}: {clean_strength}")
logger.debug(f"{translations['export_format']}: {export_format}")
logger.debug(f"{translations['hubert_model']}: {embedder_model}")
logger.debug(f"{translations['upscale_audio']}: {upscale_audio}")
if resample_sr != 0: logger.debug(f"{translations['sample_rate']}: {resample_sr}")
if split_audio: logger.debug(f"{translations['batch_process']}: {batch_process}")
if batch_process and split_audio: logger.debug(f"{translations['batch_size']}: {batch_size}")
logger.debug(f"{translations['split_audio']}: {split_audio}")
if f0_autotune: logger.debug(f"{translations['autotune_rate_info']}: {f0_autotune_strength}")
check_rmvpe_fcpe(f0_method)
check_hubert(embedder_model)
run_convert_script(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, input_path=input_path, output_path=output_path, pth_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, embedder_model=embedder_model, upscale_audio=upscale_audio, resample_sr=resample_sr, batch_process=batch_process, batch_size=batch_size, split_audio=split_audio)
def check_rmvpe_fcpe(method):
def download_rmvpe():
if not os.path.exists(os.path.join("assets", "model", "predictors", "rmvpe.pt")): subprocess.run(["wget", "-q", "--show-progress", "--no-check-certificate", codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Pbyno_EIP_Cebwrpg_2/erfbyir/znva/", "rot13") + "rmvpe.pt", "-P", os.path.join("assets", "model", "predictors")], check=True)
def download_fcpe():
if not os.path.exists(os.path.join("assets", "model", "predictors", "fcpe.pt")): subprocess.run(["wget", "-q", "--show-progress", "--no-check-certificate", codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Pbyno_EIP_Cebwrpg_2/erfbyir/znva/", "rot13") + "fcpe.pt", "-P", os.path.join("assets", "model", "predictors")], check=True)
if method == "rmvpe": download_rmvpe()
elif method == "fcpe": download_fcpe()
elif "hybrid" in method:
methods_str = re.search("hybrid\[(.+)\]", method)
if methods_str: methods = [method.strip() for method in methods_str.group(1).split("+")]
for method in methods:
if method == "rmvpe": download_rmvpe()
elif method == "fcpe": download_fcpe()
def check_hubert(hubert):
if hubert == "contentvec_base" or hubert == "hubert_base" or hubert == "japanese_hubert_base" or hubert == "korean_hubert_base" or hubert == "chinese_hubert_base":
model_path = os.path.join(now_dir, "assets", "model", "embedders", hubert + '.pt')
if not os.path.exists(model_path): subprocess.run(["wget", "-q", "--show-progress", "--no-check-certificate", codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Pbyno_EIP_Cebwrpg_2/erfbyir/znva/", "rot13") + f"{hubert}.pt", "-P", os.path.join("assets", "model", "embedders")], check=True)
def load_audio_infer(file, sample_rate):
try:
file = file.strip(" ").strip('"').strip("\n").strip('"').strip(" ")
if not os.path.isfile(file): raise FileNotFoundError(translations["not_found"].format(name=file))
audio, sr = sf.read(file)
if len(audio.shape) > 1: audio = librosa.to_mono(audio.T)
if sr != sample_rate: audio = librosa.resample(audio, orig_sr=sr, target_sr=sample_rate)
except Exception as e:
raise RuntimeError(f"{translations['errors_loading_audio']}: {e}")
return audio.flatten()
def process_audio(file_path, output_path):
try:
song = AudioSegment.from_file(file_path)
nonsilent_parts = silence.detect_nonsilent(song, min_silence_len=750, silence_thresh=-70)
cut_files = []
time_stamps = []
min_chunk_duration = 30
for i, (start_i, end_i) in enumerate(nonsilent_parts):
chunk = song[start_i:end_i]
if len(chunk) >= min_chunk_duration:
chunk_file_path = os.path.join(output_path, f"chunk{i}.wav")
if os.path.exists(chunk_file_path): os.remove(chunk_file_path)
chunk.export(chunk_file_path, format="wav")
cut_files.append(chunk_file_path)
time_stamps.append((start_i, end_i))
else: logger.debug(translations["skip_file"].format(i=i, chunk=len(chunk)))
logger.info(f"{translations['split_total']}: {len(cut_files)}")
return cut_files, time_stamps
except Exception as e:
raise RuntimeError(f"{translations['process_audio_error']}: {e}")
def merge_audio(files_list, time_stamps, original_file_path, output_path, format):
try:
def extract_number(filename):
match = re.search(r'_(\d+)', filename)
return int(match.group(1)) if match else 0
files_list = sorted(files_list, key=extract_number)
total_duration = len(AudioSegment.from_file(original_file_path))
combined = AudioSegment.empty()
current_position = 0
for file, (start_i, end_i) in zip(files_list, time_stamps):
if start_i > current_position:
silence_duration = start_i - current_position
combined += AudioSegment.silent(duration=silence_duration)
combined += AudioSegment.from_file(file)
current_position = end_i
if current_position < total_duration: combined += AudioSegment.silent(duration=total_duration - current_position)
combined.export(output_path, format=format)
return output_path
except Exception as e:
raise RuntimeError(f"{translations['merge_error']}: {e}")
def run_batch_convert(params):
cvt = VoiceConverter()
path = params["path"]
audio_temp = params["audio_temp"]
export_format = params["export_format"]
cut_files = params["cut_files"]
pitch = params["pitch"]
filter_radius = params["filter_radius"]
index_rate = params["index_rate"]
volume_envelope = params["volume_envelope"]
protect = params["protect"]
hop_length = params["hop_length"]
f0_method = params["f0_method"]
pth_path = params["pth_path"]
index_path = params["index_path"]
f0_autotune = params["f0_autotune"]
f0_autotune_strength = params["f0_autotune_strength"]
clean_audio = params["clean_audio"]
clean_strength = params["clean_strength"]
upscale_audio = params["upscale_audio"]
embedder_model = params["embedder_model"]
resample_sr = params["resample_sr"]
segment_output_path = os.path.join(audio_temp, f"output_{cut_files.index(path)}.{export_format}")
if os.path.exists(segment_output_path): os.remove(segment_output_path)
cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=path, audio_output_path=segment_output_path, model_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, upscale_audio=upscale_audio, embedder_model=embedder_model, resample_sr=resample_sr)
os.remove(path)
if os.path.exists(segment_output_path): return segment_output_path
else:
logger.warning(f"{translations['not_found_convert_file']}: {segment_output_path}")
sys.exit(1)
def run_convert_script(pitch, filter_radius, index_rate, volume_envelope, protect, hop_length, f0_method, input_path, output_path, pth_path, index_path, f0_autotune, f0_autotune_strength, clean_audio, clean_strength, export_format, upscale_audio, embedder_model, resample_sr, batch_process, batch_size, split_audio):
cvt = VoiceConverter()
start_time = time.time()
if not pth_path or not os.path.exists(pth_path) or os.path.isdir(pth_path) or not pth_path.endswith(".pth"):
logger.warning(translations["provide_file"].format(filename=translations["model"]))
sys.exit(1)
if not index_path or not os.path.exists(index_path) or os.path.isdir(index_path) or not index_path.endswith(".index"):
logger.warning(translations["provide_file"].format(filename=translations["index"]))
sys.exit(1)
output_dir = os.path.dirname(output_path)
output_dir = output_path if not output_dir else output_dir
if output_dir is None: output_dir = "audios"
if not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True)
audio_temp = os.path.join("audios_temp")
if not os.path.exists(audio_temp) and split_audio: os.makedirs(audio_temp, exist_ok=True)
processed_segments = []
if os.path.isdir(input_path):
try:
logger.info(translations["convert_batch"])
audio_files = [f for f in os.listdir(input_path) if f.endswith(("wav", "mp3", "flac", "ogg", "opus", "m4a", "mp4", "aac", "alac", "wma", "aiff", "webm", "ac3"))]
if not audio_files:
logger.warning(translations["not_found_audio"])
sys.exit(1)
logger.info(translations["found_audio"].format(audio_files=len(audio_files)))
for audio in audio_files:
audio_path = os.path.join(input_path, audio)
output_audio = os.path.join(input_path, os.path.splitext(audio)[0] + f"_output.{export_format}")
if split_audio:
try:
cut_files, time_stamps = process_audio(audio_path, audio_temp)
num_threads = min(batch_size, len(cut_files))
params_list = [
{
"path": path,
"audio_temp": audio_temp,
"export_format": export_format,
"cut_files": cut_files,
"pitch": pitch,
"filter_radius": filter_radius,
"index_rate": index_rate,
"volume_envelope": volume_envelope,
"protect": protect,
"hop_length": hop_length,
"f0_method": f0_method,
"pth_path": pth_path,
"index_path": index_path,
"f0_autotune": f0_autotune,
"f0_autotune_strength": f0_autotune_strength,
"clean_audio": clean_audio,
"clean_strength": clean_strength,
"upscale_audio": upscale_audio,
"embedder_model": embedder_model,
"resample_sr": resample_sr
}
for path in cut_files
]
if batch_process:
with mp.Pool(processes=num_threads) as pool:
with tqdm(total=len(params_list), desc=translations["convert_audio"]) as pbar:
for results in pool.imap_unordered(run_batch_convert, params_list):
processed_segments.append(results)
pbar.update(1)
else:
for params in tqdm(params_list, desc=translations["convert_audio"]):
run_batch_convert(params)
merge_audio(processed_segments, time_stamps, audio_path, output_audio, export_format)
except Exception as e:
logger.error(translations["error_convert_batch"].format(e=e))
finally:
if os.path.exists(audio_temp): shutil.rmtree(audio_temp, ignore_errors=True)
else:
try:
logger.info(f"{translations['convert_audio']} '{audio_path}'...")
if os.path.exists(output_audio): os.remove(output_audio)
with tqdm(total=1, desc=translations["convert_audio"]) as pbar:
cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=audio_path, audio_output_path=output_audio, model_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, upscale_audio=upscale_audio, embedder_model=embedder_model, resample_sr=resample_sr)
pbar.update(1)
except Exception as e:
logger.error(translations["error_convert"].format(e=e))
elapsed_time = time.time() - start_time
logger.info(translations["convert_batch_success"].format(elapsed_time=f"{elapsed_time:.2f}", output_path=output_path.replace('.wav', f'.{export_format}')))
except Exception as e:
logger.error(translations["error_convert_batch_2"].format(e=e))
else:
logger.info(f"{translations['convert_audio']} '{input_path}'...")
if not os.path.exists(input_path):
logger.warning(translations["not_found_audio"])
sys.exit(1)
if os.path.isdir(output_path): output_path = os.path.join(output_path, f"output.{export_format}")
if os.path.exists(output_path): os.remove(output_path)
if split_audio:
try:
cut_files, time_stamps = process_audio(input_path, audio_temp)
num_threads = min(batch_size, len(cut_files))
params_list = [
{
"path": path,
"audio_temp": audio_temp,
"export_format": export_format,
"cut_files": cut_files,
"pitch": pitch,
"filter_radius": filter_radius,
"index_rate": index_rate,
"volume_envelope": volume_envelope,
"protect": protect,
"hop_length": hop_length,
"f0_method": f0_method,
"pth_path": pth_path,
"index_path": index_path,
"f0_autotune": f0_autotune,
"f0_autotune_strength": f0_autotune_strength,
"clean_audio": clean_audio,
"clean_strength": clean_strength,
"upscale_audio": upscale_audio,
"embedder_model": embedder_model,
"resample_sr": resample_sr
}
for path in cut_files
]
if batch_process:
with mp.Pool(processes=num_threads) as pool:
with tqdm(total=len(params_list), desc=translations["convert_audio"]) as pbar:
for results in pool.imap_unordered(run_batch_convert, params_list):
processed_segments.append(results)
pbar.update(1)
else:
for params in tqdm(params_list, desc=translations["convert_audio"]):
run_batch_convert(params)
merge_audio(processed_segments, time_stamps, input_path, output_path.replace(".wav", f".{export_format}"), export_format)
except Exception as e:
logger.error(translations["error_convert_batch"].format(e=e))
finally:
if os.path.exists(audio_temp): shutil.rmtree(audio_temp, ignore_errors=True)
else:
try:
with tqdm(total=1, desc=translations["convert_audio"]) as pbar:
cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=input_path, audio_output_path=output_path, model_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, upscale_audio=upscale_audio, embedder_model=embedder_model, resample_sr=resample_sr)
pbar.update(1)
except Exception as e:
logger.error(translations["error_convert"].format(e=e))
elapsed_time = time.time() - start_time
logger.info(translations["convert_audio_success"].format(input_path=input_path, elapsed_time=f"{elapsed_time:.2f}", output_path=output_path.replace('.wav', f'.{export_format}')))
def change_rms(source_audio: np.ndarray, source_rate: int, target_audio: np.ndarray, target_rate: int, rate: float) -> np.ndarray:
rms1 = librosa.feature.rms(
y=source_audio,
frame_length=source_rate // 2 * 2,
hop_length=source_rate // 2,
)
rms2 = librosa.feature.rms(
y=target_audio,
frame_length=target_rate // 2 * 2,
hop_length=target_rate // 2,
)
rms1 = F.interpolate(
torch.from_numpy(rms1).float().unsqueeze(0),
size=target_audio.shape[0],
mode="linear",
).squeeze()
rms2 = F.interpolate(
torch.from_numpy(rms2).float().unsqueeze(0),
size=target_audio.shape[0],
mode="linear",
).squeeze()
rms2 = torch.maximum(rms2, torch.zeros_like(rms2) + 1e-6)
adjusted_audio = (target_audio * (torch.pow(rms1, 1 - rate) * torch.pow(rms2, rate - 1)).numpy())
return adjusted_audio
class Autotune:
def __init__(self, ref_freqs):
self.ref_freqs = ref_freqs
self.note_dict = self.ref_freqs
def autotune_f0(self, f0, f0_autotune_strength):
autotuned_f0 = np.zeros_like(f0)
for i, freq in enumerate(f0):
closest_note = min(self.note_dict, key=lambda x: abs(x - freq))
autotuned_f0[i] = freq + (closest_note - freq) * f0_autotune_strength
return autotuned_f0
class VC:
def __init__(self, tgt_sr, config):
self.x_pad = config.x_pad
self.x_query = config.x_query
self.x_center = config.x_center
self.x_max = config.x_max
self.is_half = config.is_half
self.sample_rate = 16000
self.window = 160
self.t_pad = self.sample_rate * self.x_pad
self.t_pad_tgt = tgt_sr * self.x_pad
self.t_pad2 = self.t_pad * 2
self.t_query = self.sample_rate * self.x_query
self.t_center = self.sample_rate * self.x_center
self.t_max = self.sample_rate * self.x_max
self.time_step = self.window / self.sample_rate * 1000
self.f0_min = 50
self.f0_max = 1100
self.f0_mel_min = 1127 * np.log(1 + self.f0_min / 700)
self.f0_mel_max = 1127 * np.log(1 + self.f0_max / 700)
self.device = config.device
self.ref_freqs = [
49.00,
51.91,
55.00,
58.27,
61.74,
65.41,
69.30,
73.42,
77.78,
82.41,
87.31,
92.50,
98.00,
103.83,
110.00,
116.54,
123.47,
130.81,
138.59,
146.83,
155.56,
164.81,
174.61,
185.00,
196.00,
207.65,
220.00,
233.08,
246.94,
261.63,
277.18,
293.66,
311.13,
329.63,
349.23,
369.99,
392.00,
415.30,
440.00,
466.16,
493.88,
523.25,
554.37,
587.33,
622.25,
659.25,
698.46,
739.99,
783.99,
830.61,
880.00,
932.33,
987.77,
1046.50
]
self.autotune = Autotune(self.ref_freqs)
self.note_dict = self.autotune.note_dict
def get_f0_crepe(self, x, f0_min, f0_max, p_len, hop_length, model="full"):
x = x.astype(np.float32)
x /= np.quantile(np.abs(x), 0.999)
audio = torch.from_numpy(x).to(self.device, copy=True)
audio = torch.unsqueeze(audio, dim=0)
if audio.ndim == 2 and audio.shape[0] > 1: audio = torch.mean(audio, dim=0, keepdim=True).detach()
audio = audio.detach()
pitch: Tensor = torchcrepe.predict(audio, self.sample_rate, hop_length, f0_min, f0_max, model, batch_size=hop_length * 2, device=self.device, pad=True)
p_len = p_len or x.shape[0] // hop_length
source = np.array(pitch.squeeze(0).cpu().float().numpy())
source[source < 0.001] = np.nan
target = np.interp(
np.arange(0, len(source) * p_len, len(source)) / p_len,
np.arange(0, len(source)),
source,
)
f0 = np.nan_to_num(target)
return f0
def get_f0_hybrid(self, methods_str, x, f0_min, f0_max, p_len, hop_length, filter_radius):
methods_str = re.search("hybrid\[(.+)\]", methods_str)
if methods_str: methods = [method.strip() for method in methods_str.group(1).split("+")]
f0_computation_stack = []
logger.debug(translations["hybrid_methods"].format(methods=methods))
x = x.astype(np.float32)
x /= np.quantile(np.abs(x), 0.999)
for method in methods:
f0 = None
if method == "pm":
f0 = (parselmouth.Sound(x, self.sample_rate).to_pitch_ac(time_step=self.window / self.sample_rate * 1000 / 1000, voicing_threshold=0.6, pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array["frequency"])
pad_size = (p_len - len(f0) + 1) // 2
if pad_size > 0 or p_len - len(f0) - pad_size > 0: f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant")
elif method == 'dio':
f0, t = pyworld.dio(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10)
f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate)
f0 = signal.medfilt(f0, 3)
elif method == "crepe-tiny":
f0 = self.get_f0_crepe(x, self.f0_min, self.f0_max, p_len, int(hop_length), "tiny")
elif method == "crepe":
f0 = self.get_f0_crepe(x, f0_min, f0_max, p_len, int(hop_length))
elif method == "fcpe":
self.model_fcpe = FCPE(os.path.join("assets", "model", "predictors", "fcpe.pt"), hop_length=int(hop_length), f0_min=int(f0_min), f0_max=int(f0_max), dtype=torch.float32, device=self.device, sample_rate=self.sample_rate, threshold=0.03)
f0 = self.model_fcpe.compute_f0(x, p_len=p_len)
del self.model_fcpe
gc.collect()
elif method == "rmvpe":
f0 = RMVPE(os.path.join("assets", "model", "predictors", "rmvpe.pt"), is_half=self.is_half, device=self.device).infer_from_audio(x, thred=0.03)
f0 = f0[1:]
elif method == "harvest":
f0, t = pyworld.harvest(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10)
f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate)
if filter_radius > 2: f0 = signal.medfilt(f0, 3)
else: raise ValueError(translations["method_not_valid"])
f0_computation_stack.append(f0)
resampled_stack = []
for f0 in f0_computation_stack:
resampled_f0 = np.interp(np.linspace(0, len(f0), p_len), np.arange(len(f0)), f0)
resampled_stack.append(resampled_f0)
f0_median_hybrid = resampled_stack[0] if len(resampled_stack) == 1 else np.nanmedian(np.vstack(resampled_stack), axis=0)
return f0_median_hybrid
def get_f0(self, input_audio_path, x, p_len, pitch, f0_method, filter_radius, hop_length, f0_autotune, f0_autotune_strength):
global input_audio_path2wav
if f0_method == "pm":
f0 = (parselmouth.Sound(x, self.sample_rate).to_pitch_ac(time_step=self.window / self.sample_rate * 1000 / 1000, voicing_threshold=0.6, pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array["frequency"])
pad_size = (p_len - len(f0) + 1) // 2
if pad_size > 0 or p_len - len(f0) - pad_size > 0: f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant")
elif f0_method == "dio":
f0, t = pyworld.dio(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10)
f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate)
f0 = signal.medfilt(f0, 3)
elif f0_method == "crepe-tiny":
f0 = self.get_f0_crepe(x, self.f0_min, self.f0_max, p_len, int(hop_length), "tiny")
elif f0_method == "crepe":
f0 = self.get_f0_crepe(x, self.f0_min, self.f0_max, p_len, int(hop_length))
elif f0_method == "fcpe":
self.model_fcpe = FCPE(os.path.join("assets", "model", "predictors", "fcpe.pt"), hop_length=int(hop_length), f0_min=int(self.f0_min), f0_max=int(self.f0_max), dtype=torch.float32, device=self.device, sample_rate=self.sample_rate, threshold=0.03)
f0 = self.model_fcpe.compute_f0(x, p_len=p_len)
del self.model_fcpe
gc.collect()
elif f0_method == "rmvpe":
f0 = RMVPE(os.path.join("assets", "model", "predictors", "rmvpe.pt"), is_half=self.is_half, device=self.device).infer_from_audio(x, thred=0.03)
elif f0_method == "harvest":
f0, t = pyworld.harvest(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10)
f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate)
if filter_radius > 2: f0 = signal.medfilt(f0, 3)
elif "hybrid" in f0_method:
input_audio_path2wav[input_audio_path] = x.astype(np.double)
f0 = self.get_f0_hybrid(f0_method, x, self.f0_min, self.f0_max, p_len, hop_length, filter_radius)
else: raise ValueError(translations["method_not_valid"])
if f0_autotune: f0 = Autotune.autotune_f0(self, f0, f0_autotune_strength)
f0 *= pow(2, pitch / 12)
f0bak = f0.copy()
f0_mel = 1127 * np.log(1 + f0 / 700)
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - self.f0_mel_min) * 254 / (self.f0_mel_max - self.f0_mel_min) + 1
f0_mel[f0_mel <= 1] = 1
f0_mel[f0_mel > 255] = 255
f0_coarse = np.rint(f0_mel).astype(np.int32)
return f0_coarse, f0bak
def voice_conversion(self, model, net_g, sid, audio0, pitch, pitchf, index, big_npy, index_rate, version, protect):
pitch_guidance = pitch != None and pitchf != None
feats = (torch.from_numpy(audio0).half() if self.is_half else torch.from_numpy(audio0).float())
if feats.dim() == 2: feats = feats.mean(-1)
assert feats.dim() == 1, feats.dim()
feats = feats.view(1, -1)
padding_mask = torch.BoolTensor(feats.shape).to(self.device).fill_(False)
inputs = {
"source": feats.to(self.device),
"padding_mask": padding_mask,
"output_layer": 9 if version == "v1" else 12,
}
with torch.no_grad():
logits = model.extract_features(**inputs)
feats = model.final_proj(logits[0]) if version == "v1" else logits[0]
if protect < 0.5 and pitch_guidance: feats0 = feats.clone()
if (not isinstance(index, type(None)) and not isinstance(big_npy, type(None)) and index_rate != 0):
npy = feats[0].cpu().numpy()
if self.is_half: npy = npy.astype("float32")
score, ix = index.search(npy, k=8)
weight = np.square(1 / score)
weight /= weight.sum(axis=1, keepdims=True)
npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1)
if self.is_half: npy = npy.astype("float16")
feats = (torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate + (1 - index_rate) * feats)
feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1)
if protect < 0.5 and pitch_guidance: feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1)
p_len = audio0.shape[0] // self.window
if feats.shape[1] < p_len:
p_len = feats.shape[1]
if pitch_guidance:
pitch = pitch[:, :p_len]
pitchf = pitchf[:, :p_len]
if protect < 0.5 and pitch_guidance:
pitchff = pitchf.clone()
pitchff[pitchf > 0] = 1
pitchff[pitchf < 1] = protect
pitchff = pitchff.unsqueeze(-1)
feats = feats * pitchff + feats0 * (1 - pitchff)
feats = feats.to(feats0.dtype)
p_len = torch.tensor([p_len], device=self.device).long()
with torch.no_grad():
audio1 = ((net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0]).data.cpu().float().numpy()) if pitch_guidance else ((net_g.infer(feats, p_len, sid)[0][0, 0]).data.cpu().float().numpy())
del feats, p_len, padding_mask
if torch.cuda.is_available(): torch.cuda.empty_cache()
return audio1
def pipeline(self, model, net_g, sid, audio, input_audio_path, pitch, f0_method, file_index, index_rate, pitch_guidance, filter_radius, tgt_sr, resample_sr, volume_envelope, version, protect, hop_length, f0_autotune, f0_autotune_strength):
if file_index != "" and os.path.exists(file_index) and index_rate != 0:
try:
index = faiss.read_index(file_index)
big_npy = index.reconstruct_n(0, index.ntotal)
except Exception as e:
logger.error(translations["read_faiss_index_error"].format(e=e))
index = big_npy = None
else: index = big_npy = None
audio = signal.filtfilt(bh, ah, audio)
audio_pad = np.pad(audio, (self.window // 2, self.window // 2), mode="reflect")
opt_ts = []
if audio_pad.shape[0] > self.t_max:
audio_sum = np.zeros_like(audio)
for i in range(self.window):
audio_sum += audio_pad[i : i - self.window]
for t in range(self.t_center, audio.shape[0], self.t_center):
opt_ts.append(t - self.t_query + np.where(np.abs(audio_sum[t - self.t_query : t + self.t_query]) == np.abs(audio_sum[t - self.t_query : t + self.t_query]).min())[0][0])
s = 0
audio_opt = []
t = None
audio_pad = np.pad(audio, (self.t_pad, self.t_pad), mode="reflect")
p_len = audio_pad.shape[0] // self.window
sid = torch.tensor(sid, device=self.device).unsqueeze(0).long()
if pitch_guidance:
pitch, pitchf = self.get_f0(input_audio_path, audio_pad, p_len, pitch, f0_method, filter_radius, hop_length, f0_autotune, f0_autotune_strength)
pitch = pitch[:p_len]
pitchf = pitchf[:p_len]
if self.device == "mps": pitchf = pitchf.astype(np.float32)
pitch = torch.tensor(pitch, device=self.device).unsqueeze(0).long()
pitchf = torch.tensor(pitchf, device=self.device).unsqueeze(0).float()
for t in opt_ts:
t = t // self.window * self.window
if pitch_guidance: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[s : t + self.t_pad2 + self.window], pitch[:, s // self.window : (t + self.t_pad2) // self.window], pitchf[:, s // self.window : (t + self.t_pad2) // self.window], index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt])
else: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[s : t + self.t_pad2 + self.window], None, None, index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt])
s = t
if pitch_guidance: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[t:], pitch[:, t // self.window :] if t is not None else pitch, pitchf[:, t // self.window :] if t is not None else pitchf, index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt])
else: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[t:], None, None, index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt])
audio_opt = np.concatenate(audio_opt)
if volume_envelope != 1: audio_opt = change_rms(audio, self.sample_rate, audio_opt, tgt_sr, volume_envelope)
if resample_sr >= self.sample_rate and tgt_sr != resample_sr: audio_opt = librosa.resample(audio_opt, orig_sr=tgt_sr, target_sr=resample_sr)
audio_max = np.abs(audio_opt).max() / 0.99
max_int16 = 32768
if audio_max > 1: max_int16 /= audio_max
audio_opt = (audio_opt * max_int16).astype(np.int16)
if pitch_guidance: del pitch, pitchf
del sid
if torch.cuda.is_available(): torch.cuda.empty_cache()
return audio_opt
class VoiceConverter:
def __init__(self):
self.config = Config()
self.hubert_model = (None)
self.tgt_sr = None
self.net_g = None
self.vc = None
self.cpt = None
self.version = None
self.n_spk = None
self.use_f0 = None
self.loaded_model = None
def load_hubert(self, embedder_model):
try:
models, _, _ = checkpoint_utils.load_model_ensemble_and_task([os.path.join(now_dir, "assets", "model", "embedders", embedder_model + '.pt')], suffix="")
except Exception as e:
raise ImportError(translations["read_model_error"].format(e=e))
self.hubert_model = models[0].to(self.config.device)
self.hubert_model = (self.hubert_model.half() if self.config.is_half else self.hubert_model.float())
self.hubert_model.eval()
@staticmethod
def remove_audio_noise(input_audio_path, reduction_strength=0.7):
try:
rate, data = wavfile.read(input_audio_path)
reduced_noise = nr.reduce_noise(y=data, sr=rate, prop_decrease=reduction_strength)
return reduced_noise
except Exception as e:
logger.error(translations["denoise_error"].format(e=e))
return None
@staticmethod
def convert_audio_format(input_path, output_path, output_format):
try:
if output_format != "wav":
logger.debug(translations["change_format"].format(output_format=output_format))
audio, sample_rate = sf.read(input_path)
common_sample_rates = [
8000,
11025,
12000,
16000,
22050,
24000,
32000,
44100,
48000
]
target_sr = min(common_sample_rates, key=lambda x: abs(x - sample_rate))
audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=target_sr)
sf.write(output_path, audio, target_sr, format=output_format)
return output_path
except Exception as e:
raise RuntimeError(translations["change_format_error"].format(e=e))
def convert_audio(self, audio_input_path, audio_output_path, model_path, index_path, embedder_model, pitch, f0_method, index_rate, volume_envelope, protect, hop_length, f0_autotune, f0_autotune_strength, filter_radius, clean_audio, clean_strength, export_format, upscale_audio, resample_sr = 0, sid = 0):
self.get_vc(model_path, sid)
try:
if upscale_audio: upscale(audio_input_path, audio_input_path)
audio = load_audio_infer(audio_input_path, 16000)
audio_max = np.abs(audio).max() / 0.95
if audio_max > 1: audio /= audio_max
if not self.hubert_model:
if not os.path.exists(os.path.join(now_dir, "assets", "model", "embedders", embedder_model + '.pt')): raise FileNotFoundError(f"Không tìm thấy mô hình: {embedder_model}")
self.load_hubert(embedder_model)
if self.tgt_sr != resample_sr >= 16000: self.tgt_sr = resample_sr
file_index = (index_path.strip().strip('"').strip("\n").strip('"').strip().replace("trained", "added"))
audio_opt = self.vc.pipeline(model=self.hubert_model, net_g=self.net_g, sid=sid, audio=audio, input_audio_path=audio_input_path, pitch=pitch, f0_method=f0_method, file_index=file_index, index_rate=index_rate, pitch_guidance=self.use_f0, filter_radius=filter_radius, tgt_sr=self.tgt_sr, resample_sr=resample_sr, volume_envelope=volume_envelope, version=self.version, protect=protect, hop_length=hop_length, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength)
if audio_output_path: sf.write(audio_output_path, audio_opt, self.tgt_sr, format="wav")
if clean_audio:
cleaned_audio = self.remove_audio_noise(audio_output_path, clean_strength)
if cleaned_audio is not None: sf.write(audio_output_path, cleaned_audio, self.tgt_sr, format="wav")
output_path_format = audio_output_path.replace(".wav", f".{export_format}")
audio_output_path = self.convert_audio_format(audio_output_path, output_path_format, export_format)
except Exception as e:
logger.error(translations["error_convert"].format(e=e))
logger.error(traceback.format_exc())
def get_vc(self, weight_root, sid):
if sid == "" or sid == []:
self.cleanup_model()
if torch.cuda.is_available(): torch.cuda.empty_cache()
if not self.loaded_model or self.loaded_model != weight_root:
self.load_model(weight_root)
if self.cpt is not None:
self.setup_network()
self.setup_vc_instance()
self.loaded_model = weight_root
def cleanup_model(self):
if self.hubert_model is not None:
del self.net_g, self.n_spk, self.vc, self.hubert_model, self.tgt_sr
self.hubert_model = self.net_g = self.n_spk = self.vc = self.tgt_sr = None
if torch.cuda.is_available(): torch.cuda.empty_cache()
del self.net_g, self.cpt
if torch.cuda.is_available(): torch.cuda.empty_cache()
self.cpt = None
def load_model(self, weight_root):
self.cpt = (torch.load(weight_root, map_location="cpu") if os.path.isfile(weight_root) else None)
def setup_network(self):
if self.cpt is not None:
self.tgt_sr = self.cpt["config"][-1]
self.cpt["config"][-3] = self.cpt["weight"]["emb_g.weight"].shape[0]
self.use_f0 = self.cpt.get("f0", 1)
self.version = self.cpt.get("version", "v1")
self.text_enc_hidden_dim = 768 if self.version == "v2" else 256
self.net_g = Synthesizer(*self.cpt["config"], use_f0=self.use_f0, text_enc_hidden_dim=self.text_enc_hidden_dim, is_half=self.config.is_half)
del self.net_g.enc_q
self.net_g.load_state_dict(self.cpt["weight"], strict=False)
self.net_g.eval().to(self.config.device)
self.net_g = (self.net_g.half() if self.config.is_half else self.net_g.float())
def setup_vc_instance(self):
if self.cpt is not None:
self.vc = VC(self.tgt_sr, self.config)
self.n_spk = self.cpt["config"][-3]
if __name__ == "__main__":
mp.set_start_method("spawn", force=True)
main()