from logging import getLogger import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import librosa from accelerate import Accelerator from datasets import Dataset from .f0 import F0Extractor, RMVPE, load_rmvpe from .hubert import HubertFeatureExtractor, HubertModel, load_hubert from .synthesizer import SynthesizerTrnMs768NSFsid from .constants import * logger = getLogger(__name__) class Synthesizer(SynthesizerTrnMs768NSFsid): def forward(self, phone, pitch, pitchf, sid): if type(phone.shape[1]) == int: phone_lengths = torch.tensor( [phone.shape[1]], device=phone.device, dtype=torch.int32 ) else: phone_lengths = phone.shape[1] g = self.emb_g(sid).unsqueeze(-1) m_p, logs_p, x_mask = self.enc_p(phone, pitch, phone_lengths) z_p = (m_p + torch.exp(logs_p) * torch.randn_like(m_p) * 0.66666) * x_mask z = self.flow(z_p, x_mask, g=g, reverse=True) o = self.dec(z * x_mask, pitchf, g=g, n_res=None) return o class FeatureExtractor(nn.Module): def __init__(self, hubert: HubertModel, rmvpe: RMVPE): super().__init__() self.hubert = hubert self.rmvpe = rmvpe def to(self, device): self.hubert = self.hubert.to(device) self.rmvpe = self.rmvpe.to(device) return super().to(device) def forward(self, audio16k, pitch_modification): phone = self.hubert(audio16k, output_hidden_states=True)["hidden_states"][12] phone = phone.squeeze(0).float() phone_lengths = phone.shape[0] if type(phone_lengths) == int: phone_lengths = torch.tensor( [phone_lengths], device=phone.device, dtype=torch.int32 ) pitchf = self.rmvpe.infer(audio16k.squeeze(0), thred=0.03, return_tensor=True) pitchf *= torch.pow( 2, torch.tensor( pitch_modification / 12.0, dtype=torch.float32, device=pitchf.device ), ) pitch = self.calculate_f0_from_f0nsf_torch(pitchf) pitch = pitch.unsqueeze(0) pitchf = pitchf.unsqueeze(0) phone = phone.unsqueeze(0) logger.info( f"{phone.shape=}, {phone_lengths=}, {pitch.shape=}, {pitchf.shape=}" ) feats0 = phone.clone() feats: torch.Tensor = F.interpolate( phone.permute(0, 2, 1), scale_factor=2 ).permute(0, 2, 1) feats0: torch.Tensor = F.interpolate( feats0.permute(0, 2, 1), scale_factor=2 ).permute(0, 2, 1) phone_len = feats.shape[1] pitch = pitch[:, :phone_len] pitchf = pitchf[:, :phone_len] pitchff = pitchf.clone() pitchff[pitchf > 0] = 1 pitchff[pitchf < 1] = 0.33 pitchff = pitchff.unsqueeze(-1) feats = feats * pitchff + feats0 * (1 - pitchff) feats = feats.to(feats0.dtype) if type(phone_len) == int: phone_len = torch.tensor( [phone_len], device=feats.device, dtype=torch.int32 ) else: phone_len = phone_len.unsqueeze(0) logger.info(f"{feats.shape=}, {pitch.shape=}, {pitchf.shape=}, {phone_len=}") return feats, phone_len, pitch, pitchf def calculate_f0_from_f0nsf_torch(self, f0nsf: torch.Tensor): f0_mel = 1127 * torch.log(1 + f0nsf / 700) f0_max = torch.tensor(1100.0) f0_min = torch.tensor(50.0) f0_bin = torch.tensor(256) f0_mel_max = 1127 * torch.log(1 + f0_max / 700) f0_mel_min = 1127 * torch.log(1 + f0_min / 700) f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * (f0_bin - 2) / ( f0_mel_max - f0_mel_min ) + 1 # use 0 or 1 f0_mel[f0_mel <= 1] = 1 f0_mel[f0_mel > f0_bin - 1] = f0_bin - 1 f0 = torch.round(f0_mel).long() f0 = torch.clamp(f0, 1, 255) return f0 class RVC: """ RVC (Retrieval-based Voice Conversion) class for converting speech using a pre-trained model. Args: name (str | SynthesizerTrnMs768NSFsid): The name of the pre-trained model or the model instance itself. sr (int, optional): The sample rate of the input audio. Defaults to SR_48K. segment_size (float, optional): The segment size for splitting the input audio. Defaults to 30.0 seconds. hubert (str | HubertModel | None, optional): The name of the pre-trained Hubert model or the model instance itself. Defaults to None. rmvpe (str | RMVPE | None, optional): The name of the pre-trained RMVPE model or the model instance itself. Defaults to None. accelerator (Accelerator, optional): The accelerator device for model inference. Defaults to Accelerator(). from_pretrained_kwargs (dict, optional): Additional keyword arguments for loading the pre-trained model. Defaults to {}. Methods: from_pretrained(name, sr=SR_48K, hubert=None, rmvpe=None, accelerator=Accelerator(), **from_pretrained_kwargs): Creates an instance of RVC using the from_pretrained method. convert(audio, protect=0.33): Converts the input audio to the target voice using the pre-trained model. convert_dataset(dataset, protect=0.33): Converts a dataset of audio samples to the target voice using the pre-trained model. convert_file(audio, protect=0.33): Converts a single audio file to the target voice using the pre-trained model. convert_from_wav16k(wav16k, protect=0.33): Converts a 16kHz waveform to the target voice using the pre-trained model. convert_from_features(phone, pitchf, pitch, protect=0.33): Converts audio features (phone, pitchf, pitch) to the target voice using the pre-trained model. """ def __init__( self, synthesizer: str | Synthesizer, hubert: HubertModel | None = None, rmvpe: RMVPE | None = None, sr=SR_48K, segment_size=30.0, accelerator: Accelerator | None = None, from_pretrained_kwargs={}, ): """ Initializes an instance of the RVC class. Args: synthesizer (str | Synthesizer): The name of the pre-trained model or the model instance itself. hubert (str | HubertModel | None, optional): The name of the pre-trained Hubert model or the model instance itself. Defaults to None. rmvpe (str | RMVPE | None, optional): The name of the pre-trained RMVPE model or the model instance itself. Defaults to None. sr (int, optional): The sample rate of the input audio. Defaults to SR_48K. segment_size (float, optional): The segment size for splitting the input audio. Defaults to 30.0 seconds. accelerator (Accelerator, optional): The accelerator device for model inference. Defaults to Accelerator(). from_pretrained_kwargs (dict, optional): Additional keyword arguments for loading the pre-trained model. Defaults to {}. """ accelerator = accelerator or Accelerator() self.accelerator = accelerator self.synthesizer = ( Synthesizer.from_pretrained(synthesizer, **from_pretrained_kwargs) if isinstance(synthesizer, str) else synthesizer ) self.synthesizer = self.synthesizer.to(accelerator.device) hubert = hubert or load_hubert() rmvpe = rmvpe or load_rmvpe() self.feature_extractor = FeatureExtractor(hubert, rmvpe) self.feature_extractor = self.feature_extractor.to(accelerator.device) self.sr = sr self.segment_size = segment_size @staticmethod def from_pretrained( name: str, hubert: HubertModel | None = None, rmvpe: RMVPE | None = None, sr=SR_48K, segment_size=30.0, accelerator: Accelerator | None = None, **from_pretrained_kwargs, ): """ Creates an instance of RVC using the from_pretrained method. Args: name (str): The name of the pre-trained model. hubert (HubertModel | None, optional): The name of the pre-trained Hubert model or the model instance itself. Defaults to None. rmvpe (RMVPE | None, optional): The name of the pre-trained RMVPE model or the model instance itself. Defaults to None. sr (int, optional): The sample rate of the input audio. Defaults to SR_48K. segment_size (float, optional): The segment size for splitting the input audio. Defaults to 30.0 seconds. accelerator (Accelerator, optional): The accelerator device for model inference. Defaults to Accelerator(). from_pretrained_kwargs (dict): Additional keyword arguments for loading the pre-trained model. Returns: RVC: An instance of the RVC class. """ return RVC( name, hubert=hubert, rmvpe=rmvpe, sr=sr, segment_size=segment_size, accelerator=accelerator, from_pretrained_kwargs=from_pretrained_kwargs, ) def convert(self, audio: str | Dataset | np.ndarray, pitch_modification=0.0): """ Converts the input audio to the target voice using the pre-trained model. Args: audio (str | Dataset | np.ndarray): The input audio to be converted. It can be a file path, a dataset of audio samples, or a numpy array. pitch_modification (float, optional): The pitch modification factor. Defaults to 0.0. Returns: np.ndarray: The converted audio in the target voice. If the input is a dataset, it yields the converted audio samples one by one. """ logger.info(f"audio: {audio}, pitch_modification: {pitch_modification}") if isinstance(audio, str): return self.convert_file(audio, pitch_modification=pitch_modification) if isinstance(audio, Dataset): return self.convert_dataset(audio, pitch_modification=pitch_modification) return self.convert_from_wav16k(audio, pitch_modification=pitch_modification) def convert_dataset(self, dataset: Dataset, pitch_modification=0.0): """ Converts a dataset of audio samples to the target voice using the pre-trained model. Args: dataset (Dataset): The dataset of audio samples to be converted. pitch_modification (float, optional): The pitch modification factor. Defaults to 0.0. Yields: np.ndarray: The converted audio samples in the target voice. """ for i, data in enumerate(dataset): logger.info(f"Converting data {i}") phone = data["hubert_feats"] pitchf = data["f0nsf"] pitch = data["f0"] yield self.convert_from_features( phone=phone, pitchf=pitchf, pitch=pitch, pitch_modification=pitch_modification, ) def convert_file(self, audio: str, pitch_modification=0.0) -> np.ndarray: """ Converts a single audio file to the target voice using the pre-trained model. Args: audio (str): The path to the audio file to be converted. pitch_modification (float, optional): The pitch modification factor. Defaults to 0.0. Returns: np.ndarray: The converted audio in the target voice. """ wav16k, _ = librosa.load(audio, sr=SR_16K) logger.info(f"Loaded {audio} with shape {wav16k.shape}") return self.convert_from_wav16k(wav16k, pitch_modification=pitch_modification) @torch.no_grad() def convert_from_wav16k( self, wav16k: np.ndarray, pitch_modification=0.0 ) -> np.ndarray: """ Converts a 16kHz waveform to the target voice using the pre-trained model. Args: wav16k (np.ndarray): The 16kHz waveform to be converted. pitch_modification (float, optional): The pitch modification factor. Defaults to 0.0. Returns: np.ndarray: The converted audio in the target voice. """ self.feature_extractor.eval() feature_extractor_device = next(self.feature_extractor.parameters()).device ret = [] segment_size = int(self.segment_size * SR_16K) for i in range(0, len(wav16k), segment_size): segment = wav16k[i : i + segment_size] segment = np.pad(segment, (SR_16K, SR_16K), mode="reflect") logger.info(f"Padded audio with shape {segment.shape}") phone, phone_lengths, pitch, pitchf = self.feature_extractor( torch.from_numpy(segment) .unsqueeze(0) .to(device=feature_extractor_device), pitch_modification, ) print(f"{phone.shape=}, {phone_lengths=}, {pitch.shape=}, {pitchf.shape=}") ret.append( self.convert_from_features(phone, pitchf, pitch)[self.sr : -self.sr] ) return np.concatenate(ret) @torch.no_grad() def convert_from_features( self, phone: np.ndarray | torch.Tensor, pitchf: np.ndarray | torch.Tensor, pitch: np.ndarray | torch.Tensor, ) -> np.ndarray: """ Converts audio features (phone, pitchf, pitch) to the target voice using the pre-trained model. Args: phone (np.ndarray): The phone features of the audio. pitchf (np.ndarray): The pitch features of the audio. pitch (np.ndarray): The pitch values of the audio. Returns: np.ndarray: The converted audio in the target voice. """ self.synthesizer.eval() synthesizer_device = next(self.synthesizer.parameters()).device if isinstance(phone, np.ndarray): phone = torch.from_numpy(phone).to(device=synthesizer_device) if isinstance(pitchf, np.ndarray): pitchf = torch.from_numpy(pitchf).to(device=synthesizer_device) if isinstance(pitch, np.ndarray): pitch = torch.from_numpy(pitch).to(device=synthesizer_device) if phone.dim() == 2: phone = phone.unsqueeze(0) if pitchf.dim() == 1: pitchf = pitchf.unsqueeze(0) if pitch.dim() == 1: pitch = pitch.unsqueeze(0) sid = torch.tensor([0], device=synthesizer_device, dtype=torch.int32) audio_segment = ( self.synthesizer(phone, pitch, pitchf, sid).squeeze().cpu().float().numpy() ) logger.info( f"Generated audio shape: {audio_segment.shape} {audio_segment.dtype}" ) return audio_segment