import io import os import six import sys import math import librosa import tempfile import platform import traceback import audioread import subprocess import numpy as np import soundfile as sf from scipy.signal import correlate, hilbert now_dir = os.getcwd() sys.path.append(now_dir) from main.configs.config import Config translations = Config().translations OPERATING_SYSTEM = platform.system() SYSTEM_ARCH = platform.platform() SYSTEM_PROC = platform.processor() ARM = "arm" AUTO_PHASE = "Automatic" POSITIVE_PHASE = "Positive Phase" NEGATIVE_PHASE = "Negative Phase" NONE_P = ("None",) LOW_P = ("Shifts: Low",) MED_P = ("Shifts: Medium",) HIGH_P = ("Shifts: High",) VHIGH_P = "Shifts: Very High" MAXIMUM_P = "Shifts: Maximum" BASE_PATH_RUB = sys._MEIPASS if getattr(sys, 'frozen', False) else os.path.dirname(os.path.abspath(__file__)) DEVNULL = open(os.devnull, 'w') if six.PY2 else subprocess.DEVNULL MAX_SPEC = "Max Spec" MIN_SPEC = "Min Spec" LIN_ENSE = "Linear Ensemble" MAX_WAV = MAX_SPEC MIN_WAV = MIN_SPEC AVERAGE = "Average" progress_value = 0 last_update_time = 0 is_macos = False if OPERATING_SYSTEM == "Darwin": wav_resolution = "polyphase" if SYSTEM_PROC == ARM or ARM in SYSTEM_ARCH else "sinc_fastest" wav_resolution_float_resampling = "kaiser_best" if SYSTEM_PROC == ARM or ARM in SYSTEM_ARCH else wav_resolution is_macos = True else: wav_resolution = "sinc_fastest" wav_resolution_float_resampling = wav_resolution def crop_center(h1, h2): h1_shape = h1.size() h2_shape = h2.size() if h1_shape[3] == h2_shape[3]: return h1 elif h1_shape[3] < h2_shape[3]: raise ValueError("h1_shape[3] > h2_shape[3]") s_time = (h1_shape[3] - h2_shape[3]) // 2 e_time = s_time + h2_shape[3] h1 = h1[:, :, :, s_time:e_time] return h1 def preprocess(X_spec): return np.abs(X_spec), np.angle(X_spec) def make_padding(width, cropsize, offset): left = offset roi_size = cropsize - offset * 2 if roi_size == 0: roi_size = cropsize right = roi_size - (width % roi_size) + left return left, right, roi_size def normalize(wave, max_peak=1.0): maxv = np.abs(wave).max() if maxv > max_peak: wave *= max_peak / maxv return wave def auto_transpose(audio_array: np.ndarray): if audio_array.shape[1] == 2: return audio_array.T return audio_array def write_array_to_mem(audio_data, subtype): if isinstance(audio_data, np.ndarray): audio_buffer = io.BytesIO() sf.write(audio_buffer, audio_data, 44100, subtype=subtype, format="WAV") audio_buffer.seek(0) return audio_buffer else: return audio_data def spectrogram_to_image(spec, mode="magnitude"): if mode == "magnitude": y = np.abs(spec) if np.iscomplexobj(spec) else spec y = np.log10(y**2 + 1e-8) elif mode == "phase": y = np.angle(spec) if np.iscomplexobj(spec) else spec y -= y.min() y *= 255 / y.max() img = np.uint8(y) if y.ndim == 3: img = img.transpose(1, 2, 0) img = np.concatenate([np.max(img, axis=2, keepdims=True), img], axis=2) return img def reduce_vocal_aggressively(X, y, softmask): v = X - y y_mag_tmp = np.abs(y) v_mag_tmp = np.abs(v) v_mask = v_mag_tmp > y_mag_tmp y_mag = np.clip(y_mag_tmp - v_mag_tmp * v_mask * softmask, 0, np.inf) return y_mag * np.exp(1.0j * np.angle(y)) def merge_artifacts(y_mask, thres=0.01, min_range=64, fade_size=32): mask = y_mask try: if min_range < fade_size * 2: raise ValueError("min_range >= fade_size * 2") idx = np.where(y_mask.min(axis=(0, 1)) > thres)[0] start_idx = np.insert(idx[np.where(np.diff(idx) != 1)[0] + 1], 0, idx[0]) end_idx = np.append(idx[np.where(np.diff(idx) != 1)[0]], idx[-1]) artifact_idx = np.where(end_idx - start_idx > min_range)[0] weight = np.zeros_like(y_mask) if len(artifact_idx) > 0: start_idx = start_idx[artifact_idx] end_idx = end_idx[artifact_idx] old_e = None for s, e in zip(start_idx, end_idx): if old_e is not None and s - old_e < fade_size: s = old_e - fade_size * 2 if s != 0: weight[:, :, s : s + fade_size] = np.linspace(0, 1, fade_size) else: s -= fade_size if e != y_mask.shape[2]: weight[:, :, e - fade_size : e] = np.linspace(1, 0, fade_size) else: e += fade_size weight[:, :, s + fade_size : e - fade_size] = 1 old_e = e v_mask = 1 - y_mask y_mask += weight * v_mask mask = y_mask except Exception as e: error_name = f"{type(e).__name__}" traceback_text = "".join(traceback.format_tb(e.__traceback__)) message = f'{error_name}: "{e}"\n{traceback_text}"' print(translations["not_success"], message) return mask def align_wave_head_and_tail(a, b): l = min([a[0].size, b[0].size]) return a[:l, :l], b[:l, :l] def convert_channels(spec, mp, band): cc = mp.param["band"][band].get("convert_channels") if "mid_side_c" == cc: spec_left = np.add(spec[0], spec[1] * 0.25) spec_right = np.subtract(spec[1], spec[0] * 0.25) elif "mid_side" == cc: spec_left = np.add(spec[0], spec[1]) / 2 spec_right = np.subtract(spec[0], spec[1]) elif "stereo_n" == cc: spec_left = np.add(spec[0], spec[1] * 0.25) / 0.9375 spec_right = np.add(spec[1], spec[0] * 0.25) / 0.9375 else: return spec return np.asfortranarray([spec_left, spec_right]) def combine_spectrograms(specs, mp, is_v51_model=False): l = min([specs[i].shape[2] for i in specs]) spec_c = np.zeros(shape=(2, mp.param["bins"] + 1, l), dtype=np.complex64) offset = 0 bands_n = len(mp.param["band"]) for d in range(1, bands_n + 1): h = mp.param["band"][d]["crop_stop"] - mp.param["band"][d]["crop_start"] spec_c[:, offset : offset + h, :l] = specs[d][:, mp.param["band"][d]["crop_start"] : mp.param["band"][d]["crop_stop"], :l] offset += h if offset > mp.param["bins"]: raise ValueError("Quá nhiều thùng") if mp.param["pre_filter_start"] > 0: if is_v51_model: spec_c *= get_lp_filter_mask(spec_c.shape[1], mp.param["pre_filter_start"], mp.param["pre_filter_stop"]) else: if bands_n == 1: spec_c = fft_lp_filter(spec_c, mp.param["pre_filter_start"], mp.param["pre_filter_stop"]) else: gp = 1 for b in range(mp.param["pre_filter_start"] + 1, mp.param["pre_filter_stop"]): g = math.pow(10, -(b - mp.param["pre_filter_start"]) * (3.5 - gp) / 20.0) gp = g spec_c[:, b, :] *= g return np.asfortranarray(spec_c) def wave_to_spectrogram(wave, hop_length, n_fft, mp, band, is_v51_model=False): if wave.ndim == 1: wave = np.asfortranarray([wave, wave]) if not is_v51_model: if mp.param["reverse"]: wave_left = np.flip(np.asfortranarray(wave[0])) wave_right = np.flip(np.asfortranarray(wave[1])) elif mp.param["mid_side"]: wave_left = np.asfortranarray(np.add(wave[0], wave[1]) / 2) wave_right = np.asfortranarray(np.subtract(wave[0], wave[1])) elif mp.param["mid_side_b2"]: wave_left = np.asfortranarray(np.add(wave[1], wave[0] * 0.5)) wave_right = np.asfortranarray(np.subtract(wave[0], wave[1] * 0.5)) else: wave_left = np.asfortranarray(wave[0]) wave_right = np.asfortranarray(wave[1]) else: wave_left = np.asfortranarray(wave[0]) wave_right = np.asfortranarray(wave[1]) spec_left = librosa.stft(wave_left, n_fft=n_fft, hop_length=hop_length) spec_right = librosa.stft(wave_right, n_fft=n_fft, hop_length=hop_length) spec = np.asfortranarray([spec_left, spec_right]) if is_v51_model: spec = convert_channels(spec, mp, band) return spec def spectrogram_to_wave(spec, hop_length=1024, mp={}, band=0, is_v51_model=True): spec_left = np.asfortranarray(spec[0]) spec_right = np.asfortranarray(spec[1]) wave_left = librosa.istft(spec_left, hop_length=hop_length) wave_right = librosa.istft(spec_right, hop_length=hop_length) if is_v51_model: cc = mp.param["band"][band].get("convert_channels") if "mid_side_c" == cc: return np.asfortranarray([np.subtract(wave_left / 1.0625, wave_right / 4.25), np.add(wave_right / 1.0625, wave_left / 4.25)]) elif "mid_side" == cc: return np.asfortranarray([np.add(wave_left, wave_right / 2), np.subtract(wave_left, wave_right / 2)]) elif "stereo_n" == cc: return np.asfortranarray([np.subtract(wave_left, wave_right * 0.25), np.subtract(wave_right, wave_left * 0.25)]) else: if mp.param["reverse"]: return np.asfortranarray([np.flip(wave_left), np.flip(wave_right)]) elif mp.param["mid_side"]: return np.asfortranarray([np.add(wave_left, wave_right / 2), np.subtract(wave_left, wave_right / 2)]) elif mp.param["mid_side_b2"]: return np.asfortranarray([np.add(wave_right / 1.25, 0.4 * wave_left), np.subtract(wave_left / 1.25, 0.4 * wave_right)]) return np.asfortranarray([wave_left, wave_right]) def cmb_spectrogram_to_wave(spec_m, mp, extra_bins_h=None, extra_bins=None, is_v51_model=False): bands_n = len(mp.param["band"]) offset = 0 for d in range(1, bands_n + 1): bp = mp.param["band"][d] spec_s = np.zeros(shape=(2, bp["n_fft"] // 2 + 1, spec_m.shape[2]), dtype=complex) h = bp["crop_stop"] - bp["crop_start"] spec_s[:, bp["crop_start"] : bp["crop_stop"], :] = spec_m[:, offset : offset + h, :] offset += h if d == bands_n: if extra_bins_h: max_bin = bp["n_fft"] // 2 spec_s[:, max_bin - extra_bins_h : max_bin, :] = extra_bins[:, :extra_bins_h, :] if bp["hpf_start"] > 0: if is_v51_model: spec_s *= get_hp_filter_mask(spec_s.shape[1], bp["hpf_start"], bp["hpf_stop"] - 1) else: spec_s = fft_hp_filter(spec_s, bp["hpf_start"], bp["hpf_stop"] - 1) if bands_n == 1: wave = spectrogram_to_wave(spec_s, bp["hl"], mp, d, is_v51_model) else: wave = np.add(wave, spectrogram_to_wave(spec_s, bp["hl"], mp, d, is_v51_model)) else: sr = mp.param["band"][d + 1]["sr"] if d == 1: if is_v51_model: spec_s *= get_lp_filter_mask(spec_s.shape[1], bp["lpf_start"], bp["lpf_stop"]) else: spec_s = fft_lp_filter(spec_s, bp["lpf_start"], bp["lpf_stop"]) try: wave = librosa.resample(spectrogram_to_wave(spec_s, bp["hl"], mp, d, is_v51_model), orig_sr=bp["sr"], target_sr=sr, res_type=wav_resolution) except ValueError as e: print(f"{translations['resample_error']}: {e}") print(f"{translations['shapes']} Spec_s: {spec_s.shape}, SR: {sr}, {translations['wav_resolution']}: {wav_resolution}") else: if is_v51_model: spec_s *= get_hp_filter_mask(spec_s.shape[1], bp["hpf_start"], bp["hpf_stop"] - 1) spec_s *= get_lp_filter_mask(spec_s.shape[1], bp["lpf_start"], bp["lpf_stop"]) else: spec_s = fft_hp_filter(spec_s, bp["hpf_start"], bp["hpf_stop"] - 1) spec_s = fft_lp_filter(spec_s, bp["lpf_start"], bp["lpf_stop"]) wave2 = np.add(wave, spectrogram_to_wave(spec_s, bp["hl"], mp, d, is_v51_model)) try: wave = librosa.resample(wave2, orig_sr=bp["sr"], target_sr=sr, res_type=wav_resolution) except ValueError as e: print(f"{translations['resample_error']}: {e}") print(f"{translations['shapes']} Spec_s: {spec_s.shape}, SR: {sr}, {translations['wav_resolution']}: {wav_resolution}") return wave def get_lp_filter_mask(n_bins, bin_start, bin_stop): return np.concatenate([np.ones((bin_start - 1, 1)), np.linspace(1, 0, bin_stop - bin_start + 1)[:, None], np.zeros((n_bins - bin_stop, 1))], axis=0) def get_hp_filter_mask(n_bins, bin_start, bin_stop): return np.concatenate([np.zeros((bin_stop + 1, 1)), np.linspace(0, 1, 1 + bin_start - bin_stop)[:, None], np.ones((n_bins - bin_start - 2, 1))], axis=0) def fft_lp_filter(spec, bin_start, bin_stop): g = 1.0 for b in range(bin_start, bin_stop): g -= 1 / (bin_stop - bin_start) spec[:, b, :] = g * spec[:, b, :] spec[:, bin_stop:, :] *= 0 return spec def fft_hp_filter(spec, bin_start, bin_stop): g = 1.0 for b in range(bin_start, bin_stop, -1): g -= 1 / (bin_start - bin_stop) spec[:, b, :] = g * spec[:, b, :] spec[:, 0 : bin_stop + 1, :] *= 0 return spec def spectrogram_to_wave_old(spec, hop_length=1024): if spec.ndim == 2: wave = librosa.istft(spec, hop_length=hop_length) elif spec.ndim == 3: spec_left = np.asfortranarray(spec[0]) spec_right = np.asfortranarray(spec[1]) wave_left = librosa.istft(spec_left, hop_length=hop_length) wave_right = librosa.istft(spec_right, hop_length=hop_length) wave = np.asfortranarray([wave_left, wave_right]) return wave def wave_to_spectrogram_old(wave, hop_length, n_fft): wave_left = np.asfortranarray(wave[0]) wave_right = np.asfortranarray(wave[1]) spec_left = librosa.stft(wave_left, n_fft=n_fft, hop_length=hop_length) spec_right = librosa.stft(wave_right, n_fft=n_fft, hop_length=hop_length) return np.asfortranarray([spec_left, spec_right]) def mirroring(a, spec_m, input_high_end, mp): if "mirroring" == a: mirror = np.flip(np.abs(spec_m[:, mp.param["pre_filter_start"] - 10 - input_high_end.shape[1] : mp.param["pre_filter_start"] - 10, :]), 1) mirror = mirror * np.exp(1.0j * np.angle(input_high_end)) return np.where(np.abs(input_high_end) <= np.abs(mirror), input_high_end, mirror) if "mirroring2" == a: mirror = np.flip(np.abs(spec_m[:, mp.param["pre_filter_start"] - 10 - input_high_end.shape[1] : mp.param["pre_filter_start"] - 10, :]), 1) mi = np.multiply(mirror, input_high_end * 1.7) return np.where(np.abs(input_high_end) <= np.abs(mi), input_high_end, mi) def adjust_aggr(mask, is_non_accom_stem, aggressiveness): aggr = aggressiveness["value"] * 2 if aggr != 0: if is_non_accom_stem: aggr = 1 - aggr if np.any(aggr > 10) or np.any(aggr < -10): print(f"{translations['warnings']}: {aggr}") aggr = [aggr, aggr] if aggressiveness["aggr_correction"] is not None: aggr[0] += aggressiveness["aggr_correction"]["left"] aggr[1] += aggressiveness["aggr_correction"]["right"] for ch in range(2): mask[ch, : aggressiveness["split_bin"]] = np.power(mask[ch, : aggressiveness["split_bin"]], 1 + aggr[ch] / 3) mask[ch, aggressiveness["split_bin"] :] = np.power(mask[ch, aggressiveness["split_bin"] :], 1 + aggr[ch]) return mask def stft(wave, nfft, hl): wave_left = np.asfortranarray(wave[0]) wave_right = np.asfortranarray(wave[1]) spec_left = librosa.stft(wave_left, n_fft=nfft, hop_length=hl) spec_right = librosa.stft(wave_right, n_fft=nfft, hop_length=hl) spec = np.asfortranarray([spec_left, spec_right]) return spec def istft(spec, hl): spec_left = np.asfortranarray(spec[0]) spec_right = np.asfortranarray(spec[1]) wave_left = librosa.istft(spec_left, hop_length=hl) wave_right = librosa.istft(spec_right, hop_length=hl) wave = np.asfortranarray([wave_left, wave_right]) return wave def spec_effects(wave, algorithm="Default", value=None): if np.isnan(wave).any() or np.isinf(wave).any(): print(f"{translations['warnings_2']}: {wave.shape}") spec = [stft(wave[0], 2048, 1024), stft(wave[1], 2048, 1024)] if algorithm == "Min_Mag": v_spec_m = np.where(np.abs(spec[1]) <= np.abs(spec[0]), spec[1], spec[0]) wave = istft(v_spec_m, 1024) elif algorithm == "Max_Mag": v_spec_m = np.where(np.abs(spec[1]) >= np.abs(spec[0]), spec[1], spec[0]) wave = istft(v_spec_m, 1024) elif algorithm == "Default": wave = (wave[1] * value) + (wave[0] * (1 - value)) elif algorithm == "Invert_p": X_mag = np.abs(spec[0]) y_mag = np.abs(spec[1]) max_mag = np.where(X_mag >= y_mag, X_mag, y_mag) v_spec = spec[1] - max_mag * np.exp(1.0j * np.angle(spec[0])) wave = istft(v_spec, 1024) return wave def spectrogram_to_wave_no_mp(spec, n_fft=2048, hop_length=1024): wave = librosa.istft(spec, n_fft=n_fft, hop_length=hop_length) if wave.ndim == 1: wave = np.asfortranarray([wave, wave]) return wave def wave_to_spectrogram_no_mp(wave): spec = librosa.stft(wave, n_fft=2048, hop_length=1024) if spec.ndim == 1: spec = np.asfortranarray([spec, spec]) return spec def invert_audio(specs, invert_p=True): ln = min([specs[0].shape[2], specs[1].shape[2]]) specs[0] = specs[0][:, :, :ln] specs[1] = specs[1][:, :, :ln] if invert_p: X_mag = np.abs(specs[0]) y_mag = np.abs(specs[1]) max_mag = np.where(X_mag >= y_mag, X_mag, y_mag) v_spec = specs[1] - max_mag * np.exp(1.0j * np.angle(specs[0])) else: specs[1] = reduce_vocal_aggressively(specs[0], specs[1], 0.2) v_spec = specs[0] - specs[1] return v_spec def invert_stem(mixture, stem): mixture = wave_to_spectrogram_no_mp(mixture) stem = wave_to_spectrogram_no_mp(stem) output = spectrogram_to_wave_no_mp(invert_audio([mixture, stem])) return -output.T def ensembling(a, inputs, is_wavs=False): for i in range(1, len(inputs)): if i == 1: input = inputs[0] if is_wavs: ln = min([input.shape[1], inputs[i].shape[1]]) input = input[:, :ln] inputs[i] = inputs[i][:, :ln] else: ln = min([input.shape[2], inputs[i].shape[2]]) input = input[:, :, :ln] inputs[i] = inputs[i][:, :, :ln] if MIN_SPEC == a: input = np.where(np.abs(inputs[i]) <= np.abs(input), inputs[i], input) if MAX_SPEC == a: input = np.where(np.abs(inputs[i]) >= np.abs(input), inputs[i], input) return input def ensemble_for_align(waves): specs = [] for wav in waves: spec = wave_to_spectrogram_no_mp(wav.T) specs.append(spec) wav_aligned = spectrogram_to_wave_no_mp(ensembling(MIN_SPEC, specs)).T wav_aligned = match_array_shapes(wav_aligned, waves[1], is_swap=True) return wav_aligned def ensemble_inputs(audio_input, algorithm, is_normalization, wav_type_set, save_path, is_wave=False, is_array=False): wavs_ = [] if algorithm == AVERAGE: output = average_audio(audio_input) samplerate = 44100 else: specs = [] for i in range(len(audio_input)): wave, samplerate = librosa.load(audio_input[i], mono=False, sr=44100) wavs_.append(wave) spec = wave if is_wave else wave_to_spectrogram_no_mp(wave) specs.append(spec) wave_shapes = [w.shape[1] for w in wavs_] target_shape = wavs_[wave_shapes.index(max(wave_shapes))] output = ensembling(algorithm, specs, is_wavs=True) if is_wave else spectrogram_to_wave_no_mp(ensembling(algorithm, specs)) output = to_shape(output, target_shape.shape) sf.write(save_path, normalize(output.T, is_normalization), samplerate, subtype=wav_type_set) def to_shape(x, target_shape): padding_list = [] for x_dim, target_dim in zip(x.shape, target_shape): pad_value = target_dim - x_dim pad_tuple = (0, pad_value) padding_list.append(pad_tuple) return np.pad(x, tuple(padding_list), mode="constant") def to_shape_minimize(x: np.ndarray, target_shape): padding_list = [] for x_dim, target_dim in zip(x.shape, target_shape): pad_value = target_dim - x_dim pad_tuple = (0, pad_value) padding_list.append(pad_tuple) return np.pad(x, tuple(padding_list), mode="constant") def detect_leading_silence(audio, sr, silence_threshold=0.007, frame_length=1024): if len(audio.shape) == 2: channel = np.argmax(np.sum(np.abs(audio), axis=1)) audio = audio[channel] for i in range(0, len(audio), frame_length): if np.max(np.abs(audio[i : i + frame_length])) > silence_threshold: return (i / sr) * 1000 return (len(audio) / sr) * 1000 def adjust_leading_silence(target_audio, reference_audio, silence_threshold=0.01, frame_length=1024): def find_silence_end(audio): if len(audio.shape) == 2: channel = np.argmax(np.sum(np.abs(audio), axis=1)) audio_mono = audio[channel] else: audio_mono = audio for i in range(0, len(audio_mono), frame_length): if np.max(np.abs(audio_mono[i : i + frame_length])) > silence_threshold: return i return len(audio_mono) ref_silence_end = find_silence_end(reference_audio) target_silence_end = find_silence_end(target_audio) silence_difference = ref_silence_end - target_silence_end try: ref_silence_end_p = (ref_silence_end / 44100) * 1000 target_silence_end_p = (target_silence_end / 44100) * 1000 silence_difference_p = ref_silence_end_p - target_silence_end_p print("im lặng khác biệt: ", silence_difference_p) except Exception as e: pass if silence_difference > 0: silence_to_add = np.zeros((target_audio.shape[0], silence_difference))if len(target_audio.shape) == 2 else np.zeros(silence_difference) return np.hstack((silence_to_add, target_audio)) elif silence_difference < 0: if len(target_audio.shape) == 2: return target_audio[:, -silence_difference:] else: return target_audio[-silence_difference:] else: return target_audio def match_array_shapes(array_1: np.ndarray, array_2: np.ndarray, is_swap=False): if is_swap: array_1, array_2 = array_1.T, array_2.T if array_1.shape[1] > array_2.shape[1]: array_1 = array_1[:, : array_2.shape[1]] elif array_1.shape[1] < array_2.shape[1]: padding = array_2.shape[1] - array_1.shape[1] array_1 = np.pad(array_1, ((0, 0), (0, padding)), "constant", constant_values=0) if is_swap: array_1, array_2 = array_1.T, array_2.T return array_1 def match_mono_array_shapes(array_1: np.ndarray, array_2: np.ndarray): if len(array_1) > len(array_2): array_1 = array_1[: len(array_2)] elif len(array_1) < len(array_2): padding = len(array_2) - len(array_1) array_1 = np.pad(array_1, (0, padding), "constant", constant_values=0) return array_1 def change_pitch_semitones(y, sr, semitone_shift): factor = 2 ** (semitone_shift / 12) y_pitch_tuned = [] for y_channel in y: y_pitch_tuned.append(librosa.resample(y_channel, orig_sr=sr, target_sr=sr * factor, res_type=wav_resolution_float_resampling)) y_pitch_tuned = np.array(y_pitch_tuned) new_sr = sr * factor return y_pitch_tuned, new_sr def augment_audio(export_path, audio_file, rate, is_normalization, wav_type_set, save_format=None, is_pitch=False, is_time_correction=True): wav, sr = librosa.load(audio_file, sr=44100, mono=False) if wav.ndim == 1: wav = np.asfortranarray([wav, wav]) if not is_time_correction: wav_mix = change_pitch_semitones(wav, 44100, semitone_shift=-rate)[0] else: if is_pitch: wav_1 = pitch_shift(wav[0], sr, rate, rbargs=None) wav_2 = pitch_shift(wav[1], sr, rate, rbargs=None) else: wav_1 = time_stretch(wav[0], sr, rate, rbargs=None) wav_2 = time_stretch(wav[1], sr, rate, rbargs=None) if wav_1.shape > wav_2.shape: wav_2 = to_shape(wav_2, wav_1.shape) if wav_1.shape < wav_2.shape: wav_1 = to_shape(wav_1, wav_2.shape) wav_mix = np.asfortranarray([wav_1, wav_2]) sf.write(export_path, normalize(wav_mix.T, is_normalization), sr, subtype=wav_type_set) save_format(export_path) def average_audio(audio): waves = [] wave_shapes = [] final_waves = [] for i in range(len(audio)): wave = librosa.load(audio[i], sr=44100, mono=False) waves.append(wave[0]) wave_shapes.append(wave[0].shape[1]) wave_shapes_index = wave_shapes.index(max(wave_shapes)) target_shape = waves[wave_shapes_index] waves.pop(wave_shapes_index) final_waves.append(target_shape) for n_array in waves: wav_target = to_shape(n_array, target_shape.shape) final_waves.append(wav_target) waves = sum(final_waves) waves = waves / len(audio) return waves def average_dual_sources(wav_1, wav_2, value): if wav_1.shape > wav_2.shape: wav_2 = to_shape(wav_2, wav_1.shape) if wav_1.shape < wav_2.shape: wav_1 = to_shape(wav_1, wav_2.shape) wave = (wav_1 * value) + (wav_2 * (1 - value)) return wave def reshape_sources(wav_1: np.ndarray, wav_2: np.ndarray): if wav_1.shape > wav_2.shape: wav_2 = to_shape(wav_2, wav_1.shape) if wav_1.shape < wav_2.shape: ln = min([wav_1.shape[1], wav_2.shape[1]]) wav_2 = wav_2[:, :ln] ln = min([wav_1.shape[1], wav_2.shape[1]]) wav_1 = wav_1[:, :ln] wav_2 = wav_2[:, :ln] return wav_2 def reshape_sources_ref(wav_1_shape, wav_2: np.ndarray): if wav_1_shape > wav_2.shape: wav_2 = to_shape(wav_2, wav_1_shape) return wav_2 def combine_arrarys(audio_sources, is_swap=False): source = np.zeros_like(max(audio_sources, key=np.size)) for v in audio_sources: v = match_array_shapes(v, source, is_swap=is_swap) source += v return source def combine_audio(paths: list, audio_file_base=None, wav_type_set="FLOAT", save_format=None): source = combine_arrarys([load_audio(i) for i in paths]) save_path = f"{audio_file_base}_combined.wav" sf.write(save_path, source.T, 44100, subtype=wav_type_set) save_format(save_path) def reduce_mix_bv(inst_source, voc_source, reduction_rate=0.9): inst_source = inst_source * (1 - reduction_rate) mix_reduced = combine_arrarys([inst_source, voc_source], is_swap=True) return mix_reduced def organize_inputs(inputs): input_list = {"target": None, "reference": None, "reverb": None, "inst": None} for i in inputs: if i.endswith("_(Vocals).wav"): input_list["reference"] = i elif "_RVC_" in i: input_list["target"] = i elif i.endswith("reverbed_stem.wav"): input_list["reverb"] = i elif i.endswith("_(Instrumental).wav"): input_list["inst"] = i return input_list def check_if_phase_inverted(wav1, wav2, is_mono=False): if not is_mono: wav1 = np.mean(wav1, axis=0) wav2 = np.mean(wav2, axis=0) correlation = np.corrcoef(wav1[:1000], wav2[:1000]) return correlation[0, 1] < 0 def align_audio(file1, file2, file2_aligned, file_subtracted, wav_type_set, is_save_aligned, command_Text, save_format, align_window: list, align_intro_val: list, db_analysis: tuple, set_progress_bar, phase_option, phase_shifts, is_match_silence, is_spec_match): global progress_value progress_value = 0 is_mono = False def get_diff(a, b): corr = np.correlate(a, b, "full") diff = corr.argmax() - (b.shape[0] - 1) return diff def progress_bar(length): global progress_value progress_value += 1 if (0.90 / length * progress_value) >= 0.9: length = progress_value + 1 set_progress_bar(0.1, (0.9 / length * progress_value)) if file1.endswith(".mp3") and is_macos: length1 = rerun_mp3(file1) wav1, sr1 = librosa.load(file1, duration=length1, sr=44100, mono=False) else: wav1, sr1 = librosa.load(file1, sr=44100, mono=False) if file2.endswith(".mp3") and is_macos: length2 = rerun_mp3(file2) wav2, sr2 = librosa.load(file2, duration=length2, sr=44100, mono=False) else: wav2, sr2 = librosa.load(file2, sr=44100, mono=False) if wav1.ndim == 1 and wav2.ndim == 1: is_mono = True elif wav1.ndim == 1: wav1 = np.asfortranarray([wav1, wav1]) elif wav2.ndim == 1: wav2 = np.asfortranarray([wav2, wav2]) if phase_option == AUTO_PHASE: if check_if_phase_inverted(wav1, wav2, is_mono=is_mono): wav2 = -wav2 elif phase_option == POSITIVE_PHASE: wav2 = +wav2 elif phase_option == NEGATIVE_PHASE: wav2 = -wav2 if is_match_silence: wav2 = adjust_leading_silence(wav2, wav1) wav1_length = int(librosa.get_duration(y=wav1, sr=44100)) wav2_length = int(librosa.get_duration(y=wav2, sr=44100)) if not is_mono: wav1 = wav1.transpose() wav2 = wav2.transpose() wav2_org = wav2.copy() command_Text(translations["process_file"]) seconds_length = min(wav1_length, wav2_length) wav2_aligned_sources = [] for sec_len in align_intro_val: sec_seg = 1 if sec_len == 1 else int(seconds_length // sec_len) index = sr1 * sec_seg if is_mono: samp1, samp2 = wav1[index : index + sr1], wav2[index : index + sr1] diff = get_diff(samp1, samp2) else: index = sr1 * sec_seg samp1, samp2 = wav1[index : index + sr1, 0], wav2[index : index + sr1, 0] samp1_r, samp2_r = wav1[index : index + sr1, 1], wav2[index : index + sr1, 1] diff, diff_r = get_diff(samp1, samp2), get_diff(samp1_r, samp2_r) if diff > 0: zeros_to_append = np.zeros(diff) if is_mono else np.zeros((diff, 2)) wav2_aligned = np.append(zeros_to_append, wav2_org, axis=0) elif diff < 0: wav2_aligned = wav2_org[-diff:] else: wav2_aligned = wav2_org if not any(np.array_equal(wav2_aligned, source) for source in wav2_aligned_sources): wav2_aligned_sources.append(wav2_aligned) unique_sources = len(wav2_aligned_sources) sub_mapper_big_mapper = {} for s in wav2_aligned_sources: wav2_aligned = match_mono_array_shapes(s, wav1) if is_mono else match_array_shapes(s, wav1, is_swap=True) if align_window: wav_sub = time_correction(wav1, wav2_aligned, seconds_length, align_window=align_window, db_analysis=db_analysis, progress_bar=progress_bar, unique_sources=unique_sources, phase_shifts=phase_shifts) wav_sub_size = np.abs(wav_sub).mean() sub_mapper_big_mapper = {**sub_mapper_big_mapper, **{wav_sub_size: wav_sub}} else: wav2_aligned = wav2_aligned * np.power(10, db_analysis[0] / 20) db_range = db_analysis[1] for db_adjustment in db_range: s_adjusted = wav2_aligned * (10 ** (db_adjustment / 20)) wav_sub = wav1 - s_adjusted wav_sub_size = np.abs(wav_sub).mean() sub_mapper_big_mapper = {**sub_mapper_big_mapper, **{wav_sub_size: wav_sub}} sub_mapper_value_list = list(sub_mapper_big_mapper.values()) wav_sub = ensemble_for_align(list(sub_mapper_big_mapper.values())) if is_spec_match and len(sub_mapper_value_list) >= 2 else ensemble_wav(list(sub_mapper_big_mapper.values())) wav_sub = np.clip(wav_sub, -1, +1) command_Text(translations["save_instruments"]) if is_save_aligned or is_spec_match: wav1 = match_mono_array_shapes(wav1, wav_sub) if is_mono else match_array_shapes(wav1, wav_sub, is_swap=True) wav2_aligned = wav1 - wav_sub if is_spec_match: if wav1.ndim == 1 and wav2.ndim == 1: wav2_aligned = np.asfortranarray([wav2_aligned, wav2_aligned]).T wav1 = np.asfortranarray([wav1, wav1]).T wav2_aligned = ensemble_for_align([wav2_aligned, wav1]) wav_sub = wav1 - wav2_aligned if is_save_aligned: sf.write(file2_aligned, wav2_aligned, sr1, subtype=wav_type_set) save_format(file2_aligned) sf.write(file_subtracted, wav_sub, sr1, subtype=wav_type_set) save_format(file_subtracted) def phase_shift_hilbert(signal, degree): analytic_signal = hilbert(signal) return np.cos(np.radians(degree)) * analytic_signal.real - np.sin(np.radians(degree)) * analytic_signal.imag def get_phase_shifted_tracks(track, phase_shift): if phase_shift == 180: return [track, -track] step = phase_shift end = 180 - (180 % step) if 180 % step == 0 else 181 phase_range = range(step, end, step) flipped_list = [track, -track] for i in phase_range: flipped_list.extend([phase_shift_hilbert(track, i), phase_shift_hilbert(track, -i)]) return flipped_list def time_correction(mix: np.ndarray, instrumental: np.ndarray, seconds_length, align_window, db_analysis, sr=44100, progress_bar=None, unique_sources=None, phase_shifts=NONE_P): def align_tracks(track1, track2): shifted_tracks = {} track2 = track2 * np.power(10, db_analysis[0] / 20) db_range = db_analysis[1] track2_flipped = [track2] if phase_shifts == 190 else get_phase_shifted_tracks(track2, phase_shifts) for db_adjustment in db_range: for t in track2_flipped: track2_adjusted = t * (10 ** (db_adjustment / 20)) corr = correlate(track1, track2_adjusted) delay = np.argmax(np.abs(corr)) - (len(track1) - 1) track2_shifted = np.roll(track2_adjusted, shift=delay) track2_shifted_sub = track1 - track2_shifted mean_abs_value = np.abs(track2_shifted_sub).mean() shifted_tracks[mean_abs_value] = track2_shifted return shifted_tracks[min(shifted_tracks.keys())] assert mix.shape == instrumental.shape, translations["assert"].format(mixshape=mix.shape, instrumentalshape=instrumental.shape) seconds_length = seconds_length // 2 sub_mapper = {} progress_update_interval = 120 total_iterations = 0 if len(align_window) > 2: progress_update_interval = 320 for secs in align_window: step = secs / 2 window_size = int(sr * secs) step_size = int(sr * step) if len(mix.shape) == 1: total_mono = (len(range(0, len(mix) - window_size, step_size)) // progress_update_interval) * unique_sources total_iterations += total_mono else: total_stereo_ = len(range(0, len(mix[:, 0]) - window_size, step_size)) * 2 total_stereo = (total_stereo_ // progress_update_interval) * unique_sources total_iterations += total_stereo for secs in align_window: sub = np.zeros_like(mix) divider = np.zeros_like(mix) step = secs / 2 window_size = int(sr * secs) step_size = int(sr * step) window = np.hanning(window_size) if len(mix.shape) == 1: counter = 0 for i in range(0, len(mix) - window_size, step_size): counter += 1 if counter % progress_update_interval == 0: progress_bar(total_iterations) window_mix = mix[i : i + window_size] * window window_instrumental = instrumental[i : i + window_size] * window window_instrumental_aligned = align_tracks(window_mix, window_instrumental) sub[i : i + window_size] += window_mix - window_instrumental_aligned divider[i : i + window_size] += window else: counter = 0 for ch in range(mix.shape[1]): for i in range(0, len(mix[:, ch]) - window_size, step_size): counter += 1 if counter % progress_update_interval == 0: progress_bar(total_iterations) window_mix = mix[i : i + window_size, ch] * window window_instrumental = instrumental[i : i + window_size, ch] * window window_instrumental_aligned = align_tracks(window_mix, window_instrumental) sub[i : i + window_size, ch] += window_mix - window_instrumental_aligned divider[i : i + window_size, ch] += window sub = np.where(divider > 1e-6, sub / divider, sub) sub_size = np.abs(sub).mean() sub_mapper = {**sub_mapper, **{sub_size: sub}} sub = ensemble_wav(list(sub_mapper.values()), split_size=12) return sub def ensemble_wav(waveforms, split_size=240): waveform_thirds = {i: np.array_split(waveform, split_size) for i, waveform in enumerate(waveforms)} final_waveform = [] for third_idx in range(split_size): means = [np.abs(waveform_thirds[i][third_idx]).mean() for i in range(len(waveforms))] min_index = np.argmin(means) final_waveform.append(waveform_thirds[min_index][third_idx]) final_waveform = np.concatenate(final_waveform) return final_waveform def ensemble_wav_min(waveforms): for i in range(1, len(waveforms)): if i == 1: wave = waveforms[0] ln = min(len(wave), len(waveforms[i])) wave = wave[:ln] waveforms[i] = waveforms[i][:ln] wave = np.where(np.abs(waveforms[i]) <= np.abs(wave), waveforms[i], wave) return wave def align_audio_test(wav1, wav2, sr1=44100): def get_diff(a, b): corr = np.correlate(a, b, "full") diff = corr.argmax() - (b.shape[0] - 1) return diff wav1 = wav1.transpose() wav2 = wav2.transpose() wav2_org = wav2.copy() index = sr1 samp1 = wav1[index : index + sr1, 0] samp2 = wav2[index : index + sr1, 0] diff = get_diff(samp1, samp2) if diff > 0: wav2_aligned = np.append(np.zeros((diff, 1)), wav2_org, axis=0) elif diff < 0: wav2_aligned = wav2_org[-diff:] else: wav2_aligned = wav2_org return wav2_aligned def load_audio(audio_file): wav, sr = librosa.load(audio_file, sr=44100, mono=False) if wav.ndim == 1: wav = np.asfortranarray([wav, wav]) return wav def rerun_mp3(audio_file): with audioread.audio_open(audio_file) as f: track_length = int(f.duration) return track_length def __rubberband(y, sr, **kwargs): assert sr > 0 fd, infile = tempfile.mkstemp(suffix='.wav') os.close(fd) fd, outfile = tempfile.mkstemp(suffix='.wav') os.close(fd) sf.write(infile, y, sr) try: arguments = [os.path.join(BASE_PATH_RUB, 'rubberband'), '-q'] for key, value in six.iteritems(kwargs): arguments.append(str(key)) arguments.append(str(value)) arguments.extend([infile, outfile]) subprocess.check_call(arguments, stdout=DEVNULL, stderr=DEVNULL) y_out, _ = sf.read(outfile, always_2d=True) if y.ndim == 1: y_out = np.squeeze(y_out) except OSError as exc: six.raise_from(RuntimeError(translations["rubberband"]), exc) finally: os.unlink(infile) os.unlink(outfile) return y_out def time_stretch(y, sr, rate, rbargs=None): if rate <= 0: raise ValueError(translations["rate"]) if rate == 1.0: return y if rbargs is None: rbargs = dict() rbargs.setdefault('--tempo', rate) return __rubberband(y, sr, **rbargs) def pitch_shift(y, sr, n_steps, rbargs=None): if n_steps == 0: return y if rbargs is None: rbargs = dict() rbargs.setdefault('--pitch', n_steps) return __rubberband(y, sr, **rbargs)