Spaces:
Build error
Build error
import gc | |
import re | |
import os | |
import sys | |
import time | |
import torch | |
import faiss | |
import shutil | |
import codecs | |
import pyworld | |
import librosa | |
import logging | |
import argparse | |
import warnings | |
import traceback | |
import torchcrepe | |
import subprocess | |
import parselmouth | |
import logging.handlers | |
import numpy as np | |
import soundfile as sf | |
import noisereduce as nr | |
import torch.nn.functional as F | |
import torch.multiprocessing as mp | |
from tqdm import tqdm | |
from scipy import signal | |
from torch import Tensor | |
from scipy.io import wavfile | |
from audio_upscaler import upscale | |
from distutils.util import strtobool | |
from fairseq import checkpoint_utils | |
from pydub import AudioSegment, silence | |
now_dir = os.getcwd() | |
sys.path.append(now_dir) | |
from main.configs.config import Config | |
from main.library.predictors.FCPE import FCPE | |
from main.library.predictors.RMVPE import RMVPE | |
from main.library.algorithm.synthesizers import Synthesizer | |
warnings.filterwarnings("ignore", category=FutureWarning) | |
warnings.filterwarnings("ignore", category=UserWarning) | |
logging.getLogger("wget").setLevel(logging.ERROR) | |
logging.getLogger("torch").setLevel(logging.ERROR) | |
logging.getLogger("faiss").setLevel(logging.ERROR) | |
logging.getLogger("httpx").setLevel(logging.ERROR) | |
logging.getLogger("fairseq").setLevel(logging.ERROR) | |
logging.getLogger("httpcore").setLevel(logging.ERROR) | |
logging.getLogger("faiss.loader").setLevel(logging.ERROR) | |
FILTER_ORDER = 5 | |
CUTOFF_FREQUENCY = 48 | |
SAMPLE_RATE = 16000 | |
bh, ah = signal.butter(N=FILTER_ORDER, Wn=CUTOFF_FREQUENCY, btype="high", fs=SAMPLE_RATE) | |
input_audio_path2wav = {} | |
log_file = os.path.join("assets", "logs", "convert.log") | |
logger = logging.getLogger(__name__) | |
logger.propagate = False | |
translations = Config().translations | |
if logger.hasHandlers(): logger.handlers.clear() | |
else: | |
console_handler = logging.StreamHandler() | |
console_formatter = logging.Formatter(fmt="\n%(asctime)s.%(msecs)03d | %(levelname)s | %(module)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S") | |
console_handler.setFormatter(console_formatter) | |
console_handler.setLevel(logging.INFO) | |
file_handler = logging.handlers.RotatingFileHandler(log_file, maxBytes=5*1024*1024, backupCount=3, encoding='utf-8') | |
file_formatter = logging.Formatter(fmt="\n%(asctime)s.%(msecs)03d | %(levelname)s | %(module)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S") | |
file_handler.setFormatter(file_formatter) | |
file_handler.setLevel(logging.DEBUG) | |
logger.addHandler(console_handler) | |
logger.addHandler(file_handler) | |
logger.setLevel(logging.DEBUG) | |
def parse_arguments() -> tuple: | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--pitch", type=int, default=0) | |
parser.add_argument("--filter_radius", type=int, default=3) | |
parser.add_argument("--index_rate", type=float, default=0.5) | |
parser.add_argument("--volume_envelope", type=float, default=1) | |
parser.add_argument("--protect", type=float, default=0.33) | |
parser.add_argument("--hop_length", type=int, default=64) | |
parser.add_argument( "--f0_method", type=str, default="rmvpe") | |
parser.add_argument("--input_path", type=str, required=True) | |
parser.add_argument("--output_path", type=str, default="./audios/output.wav") | |
parser.add_argument("--pth_path", type=str, required=True) | |
parser.add_argument("--index_path", type=str, required=True) | |
parser.add_argument("--f0_autotune", type=lambda x: bool(strtobool(x)), default=False) | |
parser.add_argument("--f0_autotune_strength", type=float, default=1) | |
parser.add_argument("--clean_audio", type=lambda x: bool(strtobool(x)), default=False) | |
parser.add_argument("--clean_strength", type=float, default=0.7) | |
parser.add_argument("--export_format", type=str, default="wav") | |
parser.add_argument("--embedder_model", type=str, default="contentvec_base") | |
parser.add_argument("--upscale_audio", type=lambda x: bool(strtobool(x)), default=False) | |
parser.add_argument("--resample_sr", type=int, default=0) | |
parser.add_argument("--batch_process", type=lambda x: bool(strtobool(x)), default=False) | |
parser.add_argument("--batch_size", type=int, default=2) | |
parser.add_argument("--split_audio", type=lambda x: bool(strtobool(x)), default=False) | |
args = parser.parse_args() | |
return args | |
def main(): | |
args = parse_arguments() | |
pitch = args.pitch | |
filter_radius = args.filter_radius | |
index_rate = args.index_rate | |
volume_envelope = args.volume_envelope | |
protect = args.protect | |
hop_length = args.hop_length | |
f0_method = args.f0_method | |
input_path = args.input_path | |
output_path = args.output_path | |
pth_path = args.pth_path | |
index_path = args.index_path | |
f0_autotune = args.f0_autotune | |
f0_autotune_strength = args.f0_autotune_strength | |
clean_audio = args.clean_audio | |
clean_strength = args.clean_strength | |
export_format = args.export_format | |
embedder_model = args.embedder_model | |
upscale_audio = args.upscale_audio | |
resample_sr = args.resample_sr | |
batch_process = args.batch_process | |
batch_size = args.batch_size | |
split_audio = args.split_audio | |
logger.debug(f"{translations['pitch']}: {pitch}") | |
logger.debug(f"{translations['filter_radius']}: {filter_radius}") | |
logger.debug(f"{translations['index_strength']} {index_rate}") | |
logger.debug(f"{translations['volume_envelope']}: {volume_envelope}") | |
logger.debug(f"{translations['protect']}: {protect}") | |
if f0_method == "crepe" or f0_method == "crepe-tiny": logger.debug(f"Hop length: {hop_length}") | |
logger.debug(f"{translations['f0_method']}: {f0_method}") | |
logger.debug(f"f0_method: {input_path}") | |
logger.debug(f"{translations['audio_path']}: {input_path}") | |
logger.debug(f"{translations['output_path']}: {output_path.replace('.wav', f'.{export_format}')}") | |
logger.debug(f"{translations['model_path']}: {pth_path}") | |
logger.debug(f"{translations['indexpath']}: {index_path}") | |
logger.debug(f"{translations['autotune']}: {f0_autotune}") | |
logger.debug(f"{translations['clear_audio']}: {clean_audio}") | |
if clean_audio: logger.debug(f"{translations['clean_strength']}: {clean_strength}") | |
logger.debug(f"{translations['export_format']}: {export_format}") | |
logger.debug(f"{translations['hubert_model']}: {embedder_model}") | |
logger.debug(f"{translations['upscale_audio']}: {upscale_audio}") | |
if resample_sr != 0: logger.debug(f"{translations['sample_rate']}: {resample_sr}") | |
if split_audio: logger.debug(f"{translations['batch_process']}: {batch_process}") | |
if batch_process and split_audio: logger.debug(f"{translations['batch_size']}: {batch_size}") | |
logger.debug(f"{translations['split_audio']}: {split_audio}") | |
if f0_autotune: logger.debug(f"{translations['autotune_rate_info']}: {f0_autotune_strength}") | |
check_rmvpe_fcpe(f0_method) | |
check_hubert(embedder_model) | |
run_convert_script(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, input_path=input_path, output_path=output_path, pth_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, embedder_model=embedder_model, upscale_audio=upscale_audio, resample_sr=resample_sr, batch_process=batch_process, batch_size=batch_size, split_audio=split_audio) | |
def check_rmvpe_fcpe(method): | |
def download_rmvpe(): | |
if not os.path.exists(os.path.join("assets", "model", "predictors", "rmvpe.pt")): subprocess.run(["wget", "-q", "--show-progress", "--no-check-certificate", codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Pbyno_EIP_Cebwrpg_2/erfbyir/znva/", "rot13") + "rmvpe.pt", "-P", os.path.join("assets", "model", "predictors")], check=True) | |
def download_fcpe(): | |
if not os.path.exists(os.path.join("assets", "model", "predictors", "fcpe.pt")): subprocess.run(["wget", "-q", "--show-progress", "--no-check-certificate", codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Pbyno_EIP_Cebwrpg_2/erfbyir/znva/", "rot13") + "fcpe.pt", "-P", os.path.join("assets", "model", "predictors")], check=True) | |
if method == "rmvpe": download_rmvpe() | |
elif method == "fcpe": download_fcpe() | |
elif "hybrid" in method: | |
methods_str = re.search("hybrid\[(.+)\]", method) | |
if methods_str: methods = [method.strip() for method in methods_str.group(1).split("+")] | |
for method in methods: | |
if method == "rmvpe": download_rmvpe() | |
elif method == "fcpe": download_fcpe() | |
def check_hubert(hubert): | |
if hubert == "contentvec_base" or hubert == "hubert_base" or hubert == "japanese_hubert_base" or hubert == "korean_hubert_base" or hubert == "chinese_hubert_base": | |
model_path = os.path.join(now_dir, "assets", "model", "embedders", hubert + '.pt') | |
if not os.path.exists(model_path): subprocess.run(["wget", "-q", "--show-progress", "--no-check-certificate", codecs.decode("uggcf://uhttvatsnpr.pb/NauC/Pbyno_EIP_Cebwrpg_2/erfbyir/znva/", "rot13") + f"{hubert}.pt", "-P", os.path.join("assets", "model", "embedders")], check=True) | |
def load_audio_infer(file, sample_rate): | |
try: | |
file = file.strip(" ").strip('"').strip("\n").strip('"').strip(" ") | |
if not os.path.isfile(file): raise FileNotFoundError(translations["not_found"].format(name=file)) | |
audio, sr = sf.read(file) | |
if len(audio.shape) > 1: audio = librosa.to_mono(audio.T) | |
if sr != sample_rate: audio = librosa.resample(audio, orig_sr=sr, target_sr=sample_rate) | |
except Exception as e: | |
raise RuntimeError(f"{translations['errors_loading_audio']}: {e}") | |
return audio.flatten() | |
def process_audio(file_path, output_path): | |
try: | |
song = AudioSegment.from_file(file_path) | |
nonsilent_parts = silence.detect_nonsilent(song, min_silence_len=750, silence_thresh=-70) | |
cut_files = [] | |
time_stamps = [] | |
min_chunk_duration = 30 | |
for i, (start_i, end_i) in enumerate(nonsilent_parts): | |
chunk = song[start_i:end_i] | |
if len(chunk) >= min_chunk_duration: | |
chunk_file_path = os.path.join(output_path, f"chunk{i}.wav") | |
if os.path.exists(chunk_file_path): os.remove(chunk_file_path) | |
chunk.export(chunk_file_path, format="wav") | |
cut_files.append(chunk_file_path) | |
time_stamps.append((start_i, end_i)) | |
else: logger.debug(translations["skip_file"].format(i=i, chunk=len(chunk))) | |
logger.info(f"{translations['split_total']}: {len(cut_files)}") | |
return cut_files, time_stamps | |
except Exception as e: | |
raise RuntimeError(f"{translations['process_audio_error']}: {e}") | |
def merge_audio(files_list, time_stamps, original_file_path, output_path, format): | |
try: | |
def extract_number(filename): | |
match = re.search(r'_(\d+)', filename) | |
return int(match.group(1)) if match else 0 | |
files_list = sorted(files_list, key=extract_number) | |
total_duration = len(AudioSegment.from_file(original_file_path)) | |
combined = AudioSegment.empty() | |
current_position = 0 | |
for file, (start_i, end_i) in zip(files_list, time_stamps): | |
if start_i > current_position: | |
silence_duration = start_i - current_position | |
combined += AudioSegment.silent(duration=silence_duration) | |
combined += AudioSegment.from_file(file) | |
current_position = end_i | |
if current_position < total_duration: combined += AudioSegment.silent(duration=total_duration - current_position) | |
combined.export(output_path, format=format) | |
return output_path | |
except Exception as e: | |
raise RuntimeError(f"{translations['merge_error']}: {e}") | |
def run_batch_convert(params): | |
cvt = VoiceConverter() | |
path = params["path"] | |
audio_temp = params["audio_temp"] | |
export_format = params["export_format"] | |
cut_files = params["cut_files"] | |
pitch = params["pitch"] | |
filter_radius = params["filter_radius"] | |
index_rate = params["index_rate"] | |
volume_envelope = params["volume_envelope"] | |
protect = params["protect"] | |
hop_length = params["hop_length"] | |
f0_method = params["f0_method"] | |
pth_path = params["pth_path"] | |
index_path = params["index_path"] | |
f0_autotune = params["f0_autotune"] | |
f0_autotune_strength = params["f0_autotune_strength"] | |
clean_audio = params["clean_audio"] | |
clean_strength = params["clean_strength"] | |
upscale_audio = params["upscale_audio"] | |
embedder_model = params["embedder_model"] | |
resample_sr = params["resample_sr"] | |
segment_output_path = os.path.join(audio_temp, f"output_{cut_files.index(path)}.{export_format}") | |
if os.path.exists(segment_output_path): os.remove(segment_output_path) | |
cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=path, audio_output_path=segment_output_path, model_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, upscale_audio=upscale_audio, embedder_model=embedder_model, resample_sr=resample_sr) | |
os.remove(path) | |
if os.path.exists(segment_output_path): return segment_output_path | |
else: | |
logger.warning(f"{translations['not_found_convert_file']}: {segment_output_path}") | |
sys.exit(1) | |
def run_convert_script(pitch, filter_radius, index_rate, volume_envelope, protect, hop_length, f0_method, input_path, output_path, pth_path, index_path, f0_autotune, f0_autotune_strength, clean_audio, clean_strength, export_format, upscale_audio, embedder_model, resample_sr, batch_process, batch_size, split_audio): | |
cvt = VoiceConverter() | |
start_time = time.time() | |
if not pth_path or not os.path.exists(pth_path) or os.path.isdir(pth_path) or not pth_path.endswith(".pth"): | |
logger.warning(translations["provide_file"].format(filename=translations["model"])) | |
sys.exit(1) | |
if not index_path or not os.path.exists(index_path) or os.path.isdir(index_path) or not index_path.endswith(".index"): | |
logger.warning(translations["provide_file"].format(filename=translations["index"])) | |
sys.exit(1) | |
output_dir = os.path.dirname(output_path) | |
output_dir = output_path if not output_dir else output_dir | |
if output_dir is None: output_dir = "audios" | |
if not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) | |
audio_temp = os.path.join("audios_temp") | |
if not os.path.exists(audio_temp) and split_audio: os.makedirs(audio_temp, exist_ok=True) | |
processed_segments = [] | |
if os.path.isdir(input_path): | |
try: | |
logger.info(translations["convert_batch"]) | |
audio_files = [f for f in os.listdir(input_path) if f.endswith(("wav", "mp3", "flac", "ogg", "opus", "m4a", "mp4", "aac", "alac", "wma", "aiff", "webm", "ac3"))] | |
if not audio_files: | |
logger.warning(translations["not_found_audio"]) | |
sys.exit(1) | |
logger.info(translations["found_audio"].format(audio_files=len(audio_files))) | |
for audio in audio_files: | |
audio_path = os.path.join(input_path, audio) | |
output_audio = os.path.join(input_path, os.path.splitext(audio)[0] + f"_output.{export_format}") | |
if split_audio: | |
try: | |
cut_files, time_stamps = process_audio(audio_path, audio_temp) | |
num_threads = min(batch_size, len(cut_files)) | |
params_list = [ | |
{ | |
"path": path, | |
"audio_temp": audio_temp, | |
"export_format": export_format, | |
"cut_files": cut_files, | |
"pitch": pitch, | |
"filter_radius": filter_radius, | |
"index_rate": index_rate, | |
"volume_envelope": volume_envelope, | |
"protect": protect, | |
"hop_length": hop_length, | |
"f0_method": f0_method, | |
"pth_path": pth_path, | |
"index_path": index_path, | |
"f0_autotune": f0_autotune, | |
"f0_autotune_strength": f0_autotune_strength, | |
"clean_audio": clean_audio, | |
"clean_strength": clean_strength, | |
"upscale_audio": upscale_audio, | |
"embedder_model": embedder_model, | |
"resample_sr": resample_sr | |
} | |
for path in cut_files | |
] | |
if batch_process: | |
with mp.Pool(processes=num_threads) as pool: | |
with tqdm(total=len(params_list), desc=translations["convert_audio"]) as pbar: | |
for results in pool.imap_unordered(run_batch_convert, params_list): | |
processed_segments.append(results) | |
pbar.update(1) | |
else: | |
for params in tqdm(params_list, desc=translations["convert_audio"]): | |
run_batch_convert(params) | |
merge_audio(processed_segments, time_stamps, audio_path, output_audio, export_format) | |
except Exception as e: | |
logger.error(translations["error_convert_batch"].format(e=e)) | |
finally: | |
if os.path.exists(audio_temp): shutil.rmtree(audio_temp, ignore_errors=True) | |
else: | |
try: | |
logger.info(f"{translations['convert_audio']} '{audio_path}'...") | |
if os.path.exists(output_audio): os.remove(output_audio) | |
with tqdm(total=1, desc=translations["convert_audio"]) as pbar: | |
cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=audio_path, audio_output_path=output_audio, model_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, upscale_audio=upscale_audio, embedder_model=embedder_model, resample_sr=resample_sr) | |
pbar.update(1) | |
except Exception as e: | |
logger.error(translations["error_convert"].format(e=e)) | |
elapsed_time = time.time() - start_time | |
logger.info(translations["convert_batch_success"].format(elapsed_time=f"{elapsed_time:.2f}", output_path=output_path.replace('.wav', f'.{export_format}'))) | |
except Exception as e: | |
logger.error(translations["error_convert_batch_2"].format(e=e)) | |
else: | |
logger.info(f"{translations['convert_audio']} '{input_path}'...") | |
if not os.path.exists(input_path): | |
logger.warning(translations["not_found_audio"]) | |
sys.exit(1) | |
if os.path.isdir(output_path): output_path = os.path.join(output_path, f"output.{export_format}") | |
if os.path.exists(output_path): os.remove(output_path) | |
if split_audio: | |
try: | |
cut_files, time_stamps = process_audio(input_path, audio_temp) | |
num_threads = min(batch_size, len(cut_files)) | |
params_list = [ | |
{ | |
"path": path, | |
"audio_temp": audio_temp, | |
"export_format": export_format, | |
"cut_files": cut_files, | |
"pitch": pitch, | |
"filter_radius": filter_radius, | |
"index_rate": index_rate, | |
"volume_envelope": volume_envelope, | |
"protect": protect, | |
"hop_length": hop_length, | |
"f0_method": f0_method, | |
"pth_path": pth_path, | |
"index_path": index_path, | |
"f0_autotune": f0_autotune, | |
"f0_autotune_strength": f0_autotune_strength, | |
"clean_audio": clean_audio, | |
"clean_strength": clean_strength, | |
"upscale_audio": upscale_audio, | |
"embedder_model": embedder_model, | |
"resample_sr": resample_sr | |
} | |
for path in cut_files | |
] | |
if batch_process: | |
with mp.Pool(processes=num_threads) as pool: | |
with tqdm(total=len(params_list), desc=translations["convert_audio"]) as pbar: | |
for results in pool.imap_unordered(run_batch_convert, params_list): | |
processed_segments.append(results) | |
pbar.update(1) | |
else: | |
for params in tqdm(params_list, desc=translations["convert_audio"]): | |
run_batch_convert(params) | |
merge_audio(processed_segments, time_stamps, input_path, output_path.replace(".wav", f".{export_format}"), export_format) | |
except Exception as e: | |
logger.error(translations["error_convert_batch"].format(e=e)) | |
finally: | |
if os.path.exists(audio_temp): shutil.rmtree(audio_temp, ignore_errors=True) | |
else: | |
try: | |
with tqdm(total=1, desc=translations["convert_audio"]) as pbar: | |
cvt.convert_audio(pitch=pitch, filter_radius=filter_radius, index_rate=index_rate, volume_envelope=volume_envelope, protect=protect, hop_length=hop_length, f0_method=f0_method, audio_input_path=input_path, audio_output_path=output_path, model_path=pth_path, index_path=index_path, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength, clean_audio=clean_audio, clean_strength=clean_strength, export_format=export_format, upscale_audio=upscale_audio, embedder_model=embedder_model, resample_sr=resample_sr) | |
pbar.update(1) | |
except Exception as e: | |
logger.error(translations["error_convert"].format(e=e)) | |
elapsed_time = time.time() - start_time | |
logger.info(translations["convert_audio_success"].format(input_path=input_path, elapsed_time=f"{elapsed_time:.2f}", output_path=output_path.replace('.wav', f'.{export_format}'))) | |
def change_rms(source_audio: np.ndarray, source_rate: int, target_audio: np.ndarray, target_rate: int, rate: float) -> np.ndarray: | |
rms1 = librosa.feature.rms( | |
y=source_audio, | |
frame_length=source_rate // 2 * 2, | |
hop_length=source_rate // 2, | |
) | |
rms2 = librosa.feature.rms( | |
y=target_audio, | |
frame_length=target_rate // 2 * 2, | |
hop_length=target_rate // 2, | |
) | |
rms1 = F.interpolate( | |
torch.from_numpy(rms1).float().unsqueeze(0), | |
size=target_audio.shape[0], | |
mode="linear", | |
).squeeze() | |
rms2 = F.interpolate( | |
torch.from_numpy(rms2).float().unsqueeze(0), | |
size=target_audio.shape[0], | |
mode="linear", | |
).squeeze() | |
rms2 = torch.maximum(rms2, torch.zeros_like(rms2) + 1e-6) | |
adjusted_audio = (target_audio * (torch.pow(rms1, 1 - rate) * torch.pow(rms2, rate - 1)).numpy()) | |
return adjusted_audio | |
class Autotune: | |
def __init__(self, ref_freqs): | |
self.ref_freqs = ref_freqs | |
self.note_dict = self.ref_freqs | |
def autotune_f0(self, f0, f0_autotune_strength): | |
autotuned_f0 = np.zeros_like(f0) | |
for i, freq in enumerate(f0): | |
closest_note = min(self.note_dict, key=lambda x: abs(x - freq)) | |
autotuned_f0[i] = freq + (closest_note - freq) * f0_autotune_strength | |
return autotuned_f0 | |
class VC: | |
def __init__(self, tgt_sr, config): | |
self.x_pad = config.x_pad | |
self.x_query = config.x_query | |
self.x_center = config.x_center | |
self.x_max = config.x_max | |
self.is_half = config.is_half | |
self.sample_rate = 16000 | |
self.window = 160 | |
self.t_pad = self.sample_rate * self.x_pad | |
self.t_pad_tgt = tgt_sr * self.x_pad | |
self.t_pad2 = self.t_pad * 2 | |
self.t_query = self.sample_rate * self.x_query | |
self.t_center = self.sample_rate * self.x_center | |
self.t_max = self.sample_rate * self.x_max | |
self.time_step = self.window / self.sample_rate * 1000 | |
self.f0_min = 50 | |
self.f0_max = 1100 | |
self.f0_mel_min = 1127 * np.log(1 + self.f0_min / 700) | |
self.f0_mel_max = 1127 * np.log(1 + self.f0_max / 700) | |
self.device = config.device | |
self.ref_freqs = [ | |
49.00, | |
51.91, | |
55.00, | |
58.27, | |
61.74, | |
65.41, | |
69.30, | |
73.42, | |
77.78, | |
82.41, | |
87.31, | |
92.50, | |
98.00, | |
103.83, | |
110.00, | |
116.54, | |
123.47, | |
130.81, | |
138.59, | |
146.83, | |
155.56, | |
164.81, | |
174.61, | |
185.00, | |
196.00, | |
207.65, | |
220.00, | |
233.08, | |
246.94, | |
261.63, | |
277.18, | |
293.66, | |
311.13, | |
329.63, | |
349.23, | |
369.99, | |
392.00, | |
415.30, | |
440.00, | |
466.16, | |
493.88, | |
523.25, | |
554.37, | |
587.33, | |
622.25, | |
659.25, | |
698.46, | |
739.99, | |
783.99, | |
830.61, | |
880.00, | |
932.33, | |
987.77, | |
1046.50 | |
] | |
self.autotune = Autotune(self.ref_freqs) | |
self.note_dict = self.autotune.note_dict | |
def get_f0_crepe(self, x, f0_min, f0_max, p_len, hop_length, model="full"): | |
x = x.astype(np.float32) | |
x /= np.quantile(np.abs(x), 0.999) | |
audio = torch.from_numpy(x).to(self.device, copy=True) | |
audio = torch.unsqueeze(audio, dim=0) | |
if audio.ndim == 2 and audio.shape[0] > 1: audio = torch.mean(audio, dim=0, keepdim=True).detach() | |
audio = audio.detach() | |
pitch: Tensor = torchcrepe.predict(audio, self.sample_rate, hop_length, f0_min, f0_max, model, batch_size=hop_length * 2, device=self.device, pad=True) | |
p_len = p_len or x.shape[0] // hop_length | |
source = np.array(pitch.squeeze(0).cpu().float().numpy()) | |
source[source < 0.001] = np.nan | |
target = np.interp( | |
np.arange(0, len(source) * p_len, len(source)) / p_len, | |
np.arange(0, len(source)), | |
source, | |
) | |
f0 = np.nan_to_num(target) | |
return f0 | |
def get_f0_hybrid(self, methods_str, x, f0_min, f0_max, p_len, hop_length, filter_radius): | |
methods_str = re.search("hybrid\[(.+)\]", methods_str) | |
if methods_str: methods = [method.strip() for method in methods_str.group(1).split("+")] | |
f0_computation_stack = [] | |
logger.debug(translations["hybrid_methods"].format(methods=methods)) | |
x = x.astype(np.float32) | |
x /= np.quantile(np.abs(x), 0.999) | |
for method in methods: | |
f0 = None | |
if method == "pm": | |
f0 = (parselmouth.Sound(x, self.sample_rate).to_pitch_ac(time_step=self.window / self.sample_rate * 1000 / 1000, voicing_threshold=0.6, pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array["frequency"]) | |
pad_size = (p_len - len(f0) + 1) // 2 | |
if pad_size > 0 or p_len - len(f0) - pad_size > 0: f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant") | |
elif method == 'dio': | |
f0, t = pyworld.dio(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10) | |
f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate) | |
f0 = signal.medfilt(f0, 3) | |
elif method == "crepe-tiny": | |
f0 = self.get_f0_crepe(x, self.f0_min, self.f0_max, p_len, int(hop_length), "tiny") | |
elif method == "crepe": | |
f0 = self.get_f0_crepe(x, f0_min, f0_max, p_len, int(hop_length)) | |
elif method == "fcpe": | |
self.model_fcpe = FCPE(os.path.join("assets", "model", "predictors", "fcpe.pt"), hop_length=int(hop_length), f0_min=int(f0_min), f0_max=int(f0_max), dtype=torch.float32, device=self.device, sample_rate=self.sample_rate, threshold=0.03) | |
f0 = self.model_fcpe.compute_f0(x, p_len=p_len) | |
del self.model_fcpe | |
gc.collect() | |
elif method == "rmvpe": | |
f0 = RMVPE(os.path.join("assets", "model", "predictors", "rmvpe.pt"), is_half=self.is_half, device=self.device).infer_from_audio(x, thred=0.03) | |
f0 = f0[1:] | |
elif method == "harvest": | |
f0, t = pyworld.harvest(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10) | |
f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate) | |
if filter_radius > 2: f0 = signal.medfilt(f0, 3) | |
else: raise ValueError(translations["method_not_valid"]) | |
f0_computation_stack.append(f0) | |
resampled_stack = [] | |
for f0 in f0_computation_stack: | |
resampled_f0 = np.interp(np.linspace(0, len(f0), p_len), np.arange(len(f0)), f0) | |
resampled_stack.append(resampled_f0) | |
f0_median_hybrid = resampled_stack[0] if len(resampled_stack) == 1 else np.nanmedian(np.vstack(resampled_stack), axis=0) | |
return f0_median_hybrid | |
def get_f0(self, input_audio_path, x, p_len, pitch, f0_method, filter_radius, hop_length, f0_autotune, f0_autotune_strength): | |
global input_audio_path2wav | |
if f0_method == "pm": | |
f0 = (parselmouth.Sound(x, self.sample_rate).to_pitch_ac(time_step=self.window / self.sample_rate * 1000 / 1000, voicing_threshold=0.6, pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array["frequency"]) | |
pad_size = (p_len - len(f0) + 1) // 2 | |
if pad_size > 0 or p_len - len(f0) - pad_size > 0: f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode="constant") | |
elif f0_method == "dio": | |
f0, t = pyworld.dio(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10) | |
f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate) | |
f0 = signal.medfilt(f0, 3) | |
elif f0_method == "crepe-tiny": | |
f0 = self.get_f0_crepe(x, self.f0_min, self.f0_max, p_len, int(hop_length), "tiny") | |
elif f0_method == "crepe": | |
f0 = self.get_f0_crepe(x, self.f0_min, self.f0_max, p_len, int(hop_length)) | |
elif f0_method == "fcpe": | |
self.model_fcpe = FCPE(os.path.join("assets", "model", "predictors", "fcpe.pt"), hop_length=int(hop_length), f0_min=int(self.f0_min), f0_max=int(self.f0_max), dtype=torch.float32, device=self.device, sample_rate=self.sample_rate, threshold=0.03) | |
f0 = self.model_fcpe.compute_f0(x, p_len=p_len) | |
del self.model_fcpe | |
gc.collect() | |
elif f0_method == "rmvpe": | |
f0 = RMVPE(os.path.join("assets", "model", "predictors", "rmvpe.pt"), is_half=self.is_half, device=self.device).infer_from_audio(x, thred=0.03) | |
elif f0_method == "harvest": | |
f0, t = pyworld.harvest(x.astype(np.double), fs=self.sample_rate, f0_ceil=self.f0_max, f0_floor=self.f0_min, frame_period=10) | |
f0 = pyworld.stonemask(x.astype(np.double), f0, t, self.sample_rate) | |
if filter_radius > 2: f0 = signal.medfilt(f0, 3) | |
elif "hybrid" in f0_method: | |
input_audio_path2wav[input_audio_path] = x.astype(np.double) | |
f0 = self.get_f0_hybrid(f0_method, x, self.f0_min, self.f0_max, p_len, hop_length, filter_radius) | |
else: raise ValueError(translations["method_not_valid"]) | |
if f0_autotune: f0 = Autotune.autotune_f0(self, f0, f0_autotune_strength) | |
f0 *= pow(2, pitch / 12) | |
f0bak = f0.copy() | |
f0_mel = 1127 * np.log(1 + f0 / 700) | |
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - self.f0_mel_min) * 254 / (self.f0_mel_max - self.f0_mel_min) + 1 | |
f0_mel[f0_mel <= 1] = 1 | |
f0_mel[f0_mel > 255] = 255 | |
f0_coarse = np.rint(f0_mel).astype(np.int32) | |
return f0_coarse, f0bak | |
def voice_conversion(self, model, net_g, sid, audio0, pitch, pitchf, index, big_npy, index_rate, version, protect): | |
pitch_guidance = pitch != None and pitchf != None | |
feats = (torch.from_numpy(audio0).half() if self.is_half else torch.from_numpy(audio0).float()) | |
if feats.dim() == 2: feats = feats.mean(-1) | |
assert feats.dim() == 1, feats.dim() | |
feats = feats.view(1, -1) | |
padding_mask = torch.BoolTensor(feats.shape).to(self.device).fill_(False) | |
inputs = { | |
"source": feats.to(self.device), | |
"padding_mask": padding_mask, | |
"output_layer": 9 if version == "v1" else 12, | |
} | |
with torch.no_grad(): | |
logits = model.extract_features(**inputs) | |
feats = model.final_proj(logits[0]) if version == "v1" else logits[0] | |
if protect < 0.5 and pitch_guidance: feats0 = feats.clone() | |
if (not isinstance(index, type(None)) and not isinstance(big_npy, type(None)) and index_rate != 0): | |
npy = feats[0].cpu().numpy() | |
if self.is_half: npy = npy.astype("float32") | |
score, ix = index.search(npy, k=8) | |
weight = np.square(1 / score) | |
weight /= weight.sum(axis=1, keepdims=True) | |
npy = np.sum(big_npy[ix] * np.expand_dims(weight, axis=2), axis=1) | |
if self.is_half: npy = npy.astype("float16") | |
feats = (torch.from_numpy(npy).unsqueeze(0).to(self.device) * index_rate + (1 - index_rate) * feats) | |
feats = F.interpolate(feats.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1) | |
if protect < 0.5 and pitch_guidance: feats0 = F.interpolate(feats0.permute(0, 2, 1), scale_factor=2).permute(0, 2, 1) | |
p_len = audio0.shape[0] // self.window | |
if feats.shape[1] < p_len: | |
p_len = feats.shape[1] | |
if pitch_guidance: | |
pitch = pitch[:, :p_len] | |
pitchf = pitchf[:, :p_len] | |
if protect < 0.5 and pitch_guidance: | |
pitchff = pitchf.clone() | |
pitchff[pitchf > 0] = 1 | |
pitchff[pitchf < 1] = protect | |
pitchff = pitchff.unsqueeze(-1) | |
feats = feats * pitchff + feats0 * (1 - pitchff) | |
feats = feats.to(feats0.dtype) | |
p_len = torch.tensor([p_len], device=self.device).long() | |
with torch.no_grad(): | |
audio1 = ((net_g.infer(feats, p_len, pitch, pitchf, sid)[0][0, 0]).data.cpu().float().numpy()) if pitch_guidance else ((net_g.infer(feats, p_len, sid)[0][0, 0]).data.cpu().float().numpy()) | |
del feats, p_len, padding_mask | |
if torch.cuda.is_available(): torch.cuda.empty_cache() | |
return audio1 | |
def pipeline(self, model, net_g, sid, audio, input_audio_path, pitch, f0_method, file_index, index_rate, pitch_guidance, filter_radius, tgt_sr, resample_sr, volume_envelope, version, protect, hop_length, f0_autotune, f0_autotune_strength): | |
if file_index != "" and os.path.exists(file_index) and index_rate != 0: | |
try: | |
index = faiss.read_index(file_index) | |
big_npy = index.reconstruct_n(0, index.ntotal) | |
except Exception as e: | |
logger.error(translations["read_faiss_index_error"].format(e=e)) | |
index = big_npy = None | |
else: index = big_npy = None | |
audio = signal.filtfilt(bh, ah, audio) | |
audio_pad = np.pad(audio, (self.window // 2, self.window // 2), mode="reflect") | |
opt_ts = [] | |
if audio_pad.shape[0] > self.t_max: | |
audio_sum = np.zeros_like(audio) | |
for i in range(self.window): | |
audio_sum += audio_pad[i : i - self.window] | |
for t in range(self.t_center, audio.shape[0], self.t_center): | |
opt_ts.append(t - self.t_query + np.where(np.abs(audio_sum[t - self.t_query : t + self.t_query]) == np.abs(audio_sum[t - self.t_query : t + self.t_query]).min())[0][0]) | |
s = 0 | |
audio_opt = [] | |
t = None | |
audio_pad = np.pad(audio, (self.t_pad, self.t_pad), mode="reflect") | |
p_len = audio_pad.shape[0] // self.window | |
sid = torch.tensor(sid, device=self.device).unsqueeze(0).long() | |
if pitch_guidance: | |
pitch, pitchf = self.get_f0(input_audio_path, audio_pad, p_len, pitch, f0_method, filter_radius, hop_length, f0_autotune, f0_autotune_strength) | |
pitch = pitch[:p_len] | |
pitchf = pitchf[:p_len] | |
if self.device == "mps": pitchf = pitchf.astype(np.float32) | |
pitch = torch.tensor(pitch, device=self.device).unsqueeze(0).long() | |
pitchf = torch.tensor(pitchf, device=self.device).unsqueeze(0).float() | |
for t in opt_ts: | |
t = t // self.window * self.window | |
if pitch_guidance: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[s : t + self.t_pad2 + self.window], pitch[:, s // self.window : (t + self.t_pad2) // self.window], pitchf[:, s // self.window : (t + self.t_pad2) // self.window], index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt]) | |
else: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[s : t + self.t_pad2 + self.window], None, None, index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt]) | |
s = t | |
if pitch_guidance: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[t:], pitch[:, t // self.window :] if t is not None else pitch, pitchf[:, t // self.window :] if t is not None else pitchf, index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt]) | |
else: audio_opt.append(self.voice_conversion(model, net_g, sid, audio_pad[t:], None, None, index, big_npy, index_rate, version, protect)[self.t_pad_tgt : -self.t_pad_tgt]) | |
audio_opt = np.concatenate(audio_opt) | |
if volume_envelope != 1: audio_opt = change_rms(audio, self.sample_rate, audio_opt, tgt_sr, volume_envelope) | |
if resample_sr >= self.sample_rate and tgt_sr != resample_sr: audio_opt = librosa.resample(audio_opt, orig_sr=tgt_sr, target_sr=resample_sr) | |
audio_max = np.abs(audio_opt).max() / 0.99 | |
max_int16 = 32768 | |
if audio_max > 1: max_int16 /= audio_max | |
audio_opt = (audio_opt * max_int16).astype(np.int16) | |
if pitch_guidance: del pitch, pitchf | |
del sid | |
if torch.cuda.is_available(): torch.cuda.empty_cache() | |
return audio_opt | |
class VoiceConverter: | |
def __init__(self): | |
self.config = Config() | |
self.hubert_model = (None) | |
self.tgt_sr = None | |
self.net_g = None | |
self.vc = None | |
self.cpt = None | |
self.version = None | |
self.n_spk = None | |
self.use_f0 = None | |
self.loaded_model = None | |
def load_hubert(self, embedder_model): | |
try: | |
models, _, _ = checkpoint_utils.load_model_ensemble_and_task([os.path.join(now_dir, "assets", "model", "embedders", embedder_model + '.pt')], suffix="") | |
except Exception as e: | |
raise ImportError(translations["read_model_error"].format(e=e)) | |
self.hubert_model = models[0].to(self.config.device) | |
self.hubert_model = (self.hubert_model.half() if self.config.is_half else self.hubert_model.float()) | |
self.hubert_model.eval() | |
def remove_audio_noise(input_audio_path, reduction_strength=0.7): | |
try: | |
rate, data = wavfile.read(input_audio_path) | |
reduced_noise = nr.reduce_noise(y=data, sr=rate, prop_decrease=reduction_strength) | |
return reduced_noise | |
except Exception as e: | |
logger.error(translations["denoise_error"].format(e=e)) | |
return None | |
def convert_audio_format(input_path, output_path, output_format): | |
try: | |
if output_format != "wav": | |
logger.debug(translations["change_format"].format(output_format=output_format)) | |
audio, sample_rate = sf.read(input_path) | |
common_sample_rates = [ | |
8000, | |
11025, | |
12000, | |
16000, | |
22050, | |
24000, | |
32000, | |
44100, | |
48000 | |
] | |
target_sr = min(common_sample_rates, key=lambda x: abs(x - sample_rate)) | |
audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=target_sr) | |
sf.write(output_path, audio, target_sr, format=output_format) | |
return output_path | |
except Exception as e: | |
raise RuntimeError(translations["change_format_error"].format(e=e)) | |
def convert_audio(self, audio_input_path, audio_output_path, model_path, index_path, embedder_model, pitch, f0_method, index_rate, volume_envelope, protect, hop_length, f0_autotune, f0_autotune_strength, filter_radius, clean_audio, clean_strength, export_format, upscale_audio, resample_sr = 0, sid = 0): | |
self.get_vc(model_path, sid) | |
try: | |
if upscale_audio: upscale(audio_input_path, audio_input_path) | |
audio = load_audio_infer(audio_input_path, 16000) | |
audio_max = np.abs(audio).max() / 0.95 | |
if audio_max > 1: audio /= audio_max | |
if not self.hubert_model: | |
if not os.path.exists(os.path.join(now_dir, "assets", "model", "embedders", embedder_model + '.pt')): raise FileNotFoundError(f"Không tìm thấy mô hình: {embedder_model}") | |
self.load_hubert(embedder_model) | |
if self.tgt_sr != resample_sr >= 16000: self.tgt_sr = resample_sr | |
file_index = (index_path.strip().strip('"').strip("\n").strip('"').strip().replace("trained", "added")) | |
audio_opt = self.vc.pipeline(model=self.hubert_model, net_g=self.net_g, sid=sid, audio=audio, input_audio_path=audio_input_path, pitch=pitch, f0_method=f0_method, file_index=file_index, index_rate=index_rate, pitch_guidance=self.use_f0, filter_radius=filter_radius, tgt_sr=self.tgt_sr, resample_sr=resample_sr, volume_envelope=volume_envelope, version=self.version, protect=protect, hop_length=hop_length, f0_autotune=f0_autotune, f0_autotune_strength=f0_autotune_strength) | |
if audio_output_path: sf.write(audio_output_path, audio_opt, self.tgt_sr, format="wav") | |
if clean_audio: | |
cleaned_audio = self.remove_audio_noise(audio_output_path, clean_strength) | |
if cleaned_audio is not None: sf.write(audio_output_path, cleaned_audio, self.tgt_sr, format="wav") | |
output_path_format = audio_output_path.replace(".wav", f".{export_format}") | |
audio_output_path = self.convert_audio_format(audio_output_path, output_path_format, export_format) | |
except Exception as e: | |
logger.error(translations["error_convert"].format(e=e)) | |
logger.error(traceback.format_exc()) | |
def get_vc(self, weight_root, sid): | |
if sid == "" or sid == []: | |
self.cleanup_model() | |
if torch.cuda.is_available(): torch.cuda.empty_cache() | |
if not self.loaded_model or self.loaded_model != weight_root: | |
self.load_model(weight_root) | |
if self.cpt is not None: | |
self.setup_network() | |
self.setup_vc_instance() | |
self.loaded_model = weight_root | |
def cleanup_model(self): | |
if self.hubert_model is not None: | |
del self.net_g, self.n_spk, self.vc, self.hubert_model, self.tgt_sr | |
self.hubert_model = self.net_g = self.n_spk = self.vc = self.tgt_sr = None | |
if torch.cuda.is_available(): torch.cuda.empty_cache() | |
del self.net_g, self.cpt | |
if torch.cuda.is_available(): torch.cuda.empty_cache() | |
self.cpt = None | |
def load_model(self, weight_root): | |
self.cpt = (torch.load(weight_root, map_location="cpu") if os.path.isfile(weight_root) else None) | |
def setup_network(self): | |
if self.cpt is not None: | |
self.tgt_sr = self.cpt["config"][-1] | |
self.cpt["config"][-3] = self.cpt["weight"]["emb_g.weight"].shape[0] | |
self.use_f0 = self.cpt.get("f0", 1) | |
self.version = self.cpt.get("version", "v1") | |
self.text_enc_hidden_dim = 768 if self.version == "v2" else 256 | |
self.net_g = Synthesizer(*self.cpt["config"], use_f0=self.use_f0, text_enc_hidden_dim=self.text_enc_hidden_dim, is_half=self.config.is_half) | |
del self.net_g.enc_q | |
self.net_g.load_state_dict(self.cpt["weight"], strict=False) | |
self.net_g.eval().to(self.config.device) | |
self.net_g = (self.net_g.half() if self.config.is_half else self.net_g.float()) | |
def setup_vc_instance(self): | |
if self.cpt is not None: | |
self.vc = VC(self.tgt_sr, self.config) | |
self.n_spk = self.cpt["config"][-3] | |
if __name__ == "__main__": | |
mp.set_start_method("spawn", force=True) | |
main() |