import copy import math import numba as nb import numpy as np import multiprocessing as mp from scipy import signal from scipy.fftpack import fft from scipy.signal import lfilter from scipy.interpolate import interp1d from decimal import Decimal, ROUND_HALF_UP mp.set_start_method("spawn", force=True) EPS = 0.00000000000000022204460492503131 def harvest(x, fs, f0_floor=50, f0_ceil=1100, frame_period=10): basic_temporal_positions = np.arange(0, int(1000 * len(x) / fs / 1 + 1)) * 1 / 1000 channels_in_octave = 40 f0_floor_adjusted = f0_floor * 0.9 y, actual_fs = CalculateDownsampledSignal(x, fs, 8000) f0_candidates, number_of_candidates = DetectCandidates(CalculateCandidates(len(basic_temporal_positions), np.array([f0_floor_adjusted * pow(2.0, (i + 1) / channels_in_octave) for i in range(int(np.ceil(np.log2((f0_ceil * 1.1) / f0_floor_adjusted) * channels_in_octave) + 1))]), len(y), basic_temporal_positions, actual_fs, np.fft.fft(y, int(2 ** np.ceil(np.log2(len(y) + int(fs / f0_floor_adjusted * 4 + 0.5) + 1)))), f0_floor, f0_ceil)) f0_candidates = OverlapF0Candidates(f0_candidates, number_of_candidates) f0_candidates, f0_candidates_score = RefineCandidates(y, actual_fs, basic_temporal_positions, f0_candidates, f0_floor, f0_ceil) f0_candidates, f0_candidates_score = RemoveUnreliableCandidates(f0_candidates, f0_candidates_score) smoothed_f0 = SmoothF0(FixF0Contour(f0_candidates, f0_candidates_score)) temporal_positions = np.arange(0, int(1000 * len(x) / fs / frame_period + 1)) * frame_period / 1000 return np.array(smoothed_f0[np.array(np.minimum(len(smoothed_f0) - 1, round_matlab(temporal_positions * 1000)), dtype=int)], dtype=np.float32), np.array(temporal_positions, dtype=np.float32) def CalculateDownsampledSignal(x, fs, target_fs): decimation_ratio = int(fs / target_fs + 0.5) if fs <= target_fs: y = copy.deepcopy(x) actual_fs = fs else: offset = int(np.ceil(140 / decimation_ratio) * decimation_ratio) actual_fs = fs / decimation_ratio y = decimate_matlab(np.append(np.append(np.ones(offset) * x[0], x), np.ones(offset) * x[-1]), decimation_ratio, n = 3)[int(offset / decimation_ratio) : int(-offset / decimation_ratio)] y -= np.mean(y) return y, actual_fs def CalculateCandidates(number_of_frames, boundary_f0_list, y_length, temporal_positions, actual_fs, y_spectrum, f0_floor, f0_ceil): raw_f0_candidates = np.zeros((len(boundary_f0_list), number_of_frames)) for i in range(len(boundary_f0_list)): raw_f0_candidates[i, :] = CalculateRawEvent(boundary_f0_list[i], actual_fs, y_spectrum, y_length, temporal_positions, f0_floor, f0_ceil) return raw_f0_candidates def DetectCandidates(raw_f0_candidates): number_of_channels, number_of_frames = raw_f0_candidates.shape f0_candidates = np.zeros((int(number_of_channels / 10 + 0.5), number_of_frames)) number_of_candidates = 0 threshold = 10 for i in np.arange(number_of_frames): tmp = np.array(raw_f0_candidates[:, i]) tmp[tmp > 0] = 1 tmp[0] = 0 tmp[-1] = 0 tmp = np.diff(tmp) st = np.where(tmp == 1)[0] ed = np.where(tmp == -1)[0] count = 0 for j in np.arange(len(st)): dif = ed[j] - st[j] if dif >= threshold: f0_candidates[count, i] = np.mean(raw_f0_candidates[st[j] + 1: ed[j] + 1, i]) count += 1 number_of_candidates = max(number_of_candidates, count) return f0_candidates, number_of_candidates def OverlapF0Candidates(f0_candidates, max_candidates): n = 3 number_of_candidates = n * 2 + 1 new_f0_candidates = np.zeros((number_of_candidates * max_candidates, f0_candidates.shape[1])) new_f0_candidates[0, :] = f0_candidates[number_of_candidates - 1, :] for i in np.arange(number_of_candidates): st1 = max(-(i - n) + 1, 1) ed1 = min(-(i - n), 0) new_f0_candidates[np.arange(max_candidates) + i * max_candidates, st1 - 1 : new_f0_candidates.shape[1] + ed1] = f0_candidates[np.arange(max_candidates), -ed1 : new_f0_candidates.shape[1] - (st1 - 1)] return new_f0_candidates def RefineCandidates(x, fs, temporal_positions, f0_candidates, f0_floor, f0_ceil): N, f = f0_candidates.shape with mp.Pool(mp.cpu_count()) as pool: results = np.array(pool.starmap(GetRefinedF0, [(x, fs, temporal_positions[i], f0_candidates[j, i], f0_floor, f0_ceil) for j in np.arange(N) for i in np.arange(f)])) return np.reshape(results[:, 0], [N, f]), np.reshape(results[:, 1], [N, f]) @nb.jit((nb.float64[:],), nopython=True, cache=True) def round_matlab(x): y = x.copy() y[x > 0] += 0.5 y[x <= 0] -= 0.5 return y def GetRefinedF0(x, fs, current_time, current_f0, f0_floor, f0_ceil): if current_f0 == 0: return 0, 0 half_window_length = np.ceil(3 * fs / current_f0 / 2) fft_size = int(2 ** np.ceil(np.log2((half_window_length * 2 + 1)) + 1)) index_raw = round_matlab((current_time + (np.arange(-half_window_length, half_window_length + 1) / fs)) * fs + 0.001) common = math.pi * ((index_raw - 1) / fs - current_time) / ((2 * half_window_length + 1) / fs) main_window = 0.42 + 0.5 * np.cos(2 * common) + 0.08 * np.cos(4 * common) diff_window = np.empty_like(main_window) diff_window[0] = - main_window[1] / 2 diff_window[-1] = main_window[-2] / 2 diff = np.diff(main_window) diff_window[1:-1] = - (diff[1:] + diff[:-1]) / 2 index = (np.maximum(1, np.minimum(len(x), index_raw)) - 1).astype(int) spectrum = fft(x[index] * main_window, fft_size) diff_spectrum = fft(x[index] * diff_window, fft_size) power_spectrum = np.abs(spectrum) ** 2 number_of_harmonics = min(np.floor(fs / 2 / current_f0), 6) harmonic_index = np.arange(1, number_of_harmonics + 1) index = round_matlab(current_f0 * fft_size / fs * harmonic_index).astype(int) instantaneous_frequency_list = ((np.arange(fft_size) / fft_size + (spectrum.real * diff_spectrum.imag - spectrum.imag * diff_spectrum.real) / power_spectrum / 2 / math.pi) * fs)[index] amplitude_list = np.sqrt(power_spectrum[index]) refined_f0 = np.sum(amplitude_list * instantaneous_frequency_list) / np.sum(amplitude_list * harmonic_index) refined_score = 1 / (0.000000000001 + np.mean(np.abs(((instantaneous_frequency_list / harmonic_index) - current_f0) / current_f0))) if refined_f0 < f0_floor or refined_f0 > f0_ceil or refined_score < 2.5: refined_f0 = refined_score = 0 return refined_f0, refined_score def RemoveUnreliableCandidates(f0_candidates, f0_candidates_score): new_f0_candidates = np.array(f0_candidates) new_f0_candidates_score = np.array(f0_candidates_score) for i in np.arange(1, f0_candidates.shape[1] - 1): for j in np.arange(0, f0_candidates.shape[0]): reference_f0 = f0_candidates[j, i] if reference_f0 == 0: continue _, min_error1 = SelectBestF0(reference_f0, f0_candidates[:, i + 1], 1) _, min_error2 = SelectBestF0(reference_f0, f0_candidates[:, i - 1], 1) min_error = min([min_error1, min_error2]) if min_error > 0.05: new_f0_candidates[j, i] = new_f0_candidates_score[j, i] = 0 return new_f0_candidates, new_f0_candidates_score @nb.jit((nb.float64, nb.float64[:], nb.float64), nopython=True, cache=True) def SelectBestF0(reference_f0, f0_candidates, allowed_range): best_f0 = 0 best_error = allowed_range for i in np.arange(len(f0_candidates)): tmp = np.abs(reference_f0 - f0_candidates[i]) / reference_f0 if tmp > best_error: continue best_f0 = f0_candidates[i] best_error = tmp return best_f0, best_error def CalculateRawEvent(boundary_f0, fs, y_spectrum, y_length, temporal_positions, f0_floor, f0_ceil): filter_length_half = int(Decimal(fs / boundary_f0 * 2).quantize(0, ROUND_HALF_UP)) filtered_signal = np.real(np.fft.ifft(np.fft.fft(nuttall(filter_length_half * 2 + 1) * np.cos(2 * math.pi * boundary_f0 * np.arange(-filter_length_half, filter_length_half + 1) / fs), len(y_spectrum)) * y_spectrum)) filtered_signal = filtered_signal[(filter_length_half + 1) + np.arange(y_length)] neg_loc, neg_f0 = ZeroCrossingEngine(filtered_signal, fs) pos_loc, pos_f0 = ZeroCrossingEngine(-filtered_signal, fs) peak_loc, peak_f0 = ZeroCrossingEngine(np.diff(filtered_signal), fs) dip_loc, dip_f0 = ZeroCrossingEngine(-np.diff(filtered_signal), fs) f0_candidates = GetF0Candidates(neg_loc, neg_f0, pos_loc, pos_f0, peak_loc, peak_f0, dip_loc, dip_f0, temporal_positions) f0_candidates[f0_candidates > boundary_f0 * 1.1] = f0_candidates[f0_candidates < boundary_f0 * 0.9] = f0_candidates[f0_candidates > f0_ceil] = f0_candidates[f0_candidates < f0_floor] = 0 return f0_candidates @nb.jit((nb.float64[:], nb.float64), nopython=True, cache=True) def ZeroCrossingEngine(x, fs): y = np.empty_like(x) y[:-1] = x[1:] y[-1] = x[-1] negative_going_points = np.arange(1, len(x) + 1) * ((y * x < 0) * (y < x)) edge_list = negative_going_points[negative_going_points > 0] fine_edge_list = (edge_list) - x[edge_list - 1] / (x[edge_list] - x[edge_list - 1]) return (fine_edge_list[:len(fine_edge_list) - 1] + fine_edge_list[1:]) / 2 / fs, fs / np.diff(fine_edge_list) def FixF0Contour(f0_candidates, f0_candidates_score): return FixStep4(FixStep3(FixStep2(FixStep1(SearchF0Base(f0_candidates, f0_candidates_score), 0.008), 6), f0_candidates, 0.18, f0_candidates_score), 9) def SearchF0Base(f0_candidates, f0_candidates_score): f0_base = np.zeros((f0_candidates.shape[1])) for i in range(len(f0_base)): f0_base[i] = f0_candidates[np.argmax(f0_candidates_score[:, i]), i] return f0_base @nb.jit((nb.float64[:], nb.float64), nopython=True, cache=True) def FixStep1(f0_base, allowed_range): f0_step1 = np.empty_like(f0_base) f0_step1[:] = f0_base f0_step1[0] = f0_step1[1] = 0 for i in np.arange(2, len(f0_base)): if f0_base[i] == 0: continue reference_f0 = f0_base[i - 1] * 2 - f0_base[i - 2] if np.abs((f0_base[i] - reference_f0) / (reference_f0 + EPS)) > allowed_range and np.abs((f0_base[i] - f0_base[i - 1]) / (f0_base[i - 1] + EPS)) > allowed_range: f0_step1[i] = 0 return f0_step1 def FixStep2(f0_step1, voice_range_minimum): f0_step2 = np.empty_like(f0_step1) f0_step2[:] = f0_step1 boundary_list = GetBoundaryList(f0_step1) for i in np.arange(1, len(boundary_list) // 2 + 1): if boundary_list[2 * i - 1] - boundary_list[(2 * i) - 2] < voice_range_minimum: f0_step2[boundary_list[(2 * i) - 2] : boundary_list[2 * i - 1] + 1] = 0 return f0_step2 def FixStep3(f0_step2, f0_candidates, allowed_range, f0_candidates_score): f0_step3 = np.array(f0_step2) boundary_list = GetBoundaryList(f0_step2) multi_channel_f0 = GetMultiChannelF0(f0_step2, boundary_list) range = np.zeros((len(boundary_list) // 2, 2)) count = -1 for i in np.arange(1, len(boundary_list) // 2 + 1): tmp_range = np.zeros(2) extended_f0, tmp_range[1] = ExtendF0(multi_channel_f0[i - 1, :], boundary_list[i * 2 - 1], min(len(f0_step2) - 2, boundary_list[i * 2 - 1] + 100), 1, f0_candidates, allowed_range) tmp_f0_sequence, tmp_range[0] = ExtendF0(extended_f0, boundary_list[(i * 2) - 2], max(1, boundary_list[(i * 2) - 2] - 100), -1, f0_candidates, allowed_range) if 2200 / np.mean(tmp_f0_sequence[int(tmp_range[0]) : int(tmp_range[1]) + 1]) < tmp_range[1] - tmp_range[0]: count += 1 multi_channel_f0[count, :] = tmp_f0_sequence range[count, :] = tmp_range if count > -1: f0_step3 = MergeF0(multi_channel_f0[0 : count + 1, :], range[0 : count + 1, :], f0_candidates, f0_candidates_score) return f0_step3 def FixStep4(f0_step3, threshold): f0_step4 = np.empty_like(f0_step3) f0_step4[:] = f0_step3 boundary_list = GetBoundaryList(f0_step3) for i in np.arange(1, len(boundary_list) // 2 ): distance = boundary_list[2 * i] - boundary_list[2 * i - 1] - 1 if distance >= threshold: continue tmp0 = f0_step3[boundary_list[2 * i - 1]] + 1 c = ((f0_step3[boundary_list[2 * i]] - 1) - tmp0) / (distance + 1) count = 1 for j in np.arange(boundary_list[2 * i - 1] + 1, boundary_list[2 * i]): f0_step4[j] = tmp0 + c * count count += 1 return f0_step4 def ExtendF0(f0, origin, last_point, shift, f0_candidates, allowed_range): extended_f0 = np.array(f0) tmp_f0 = extended_f0[origin] shifted_origin = origin count = 0 if shift == 1: last_point += 1 elif shift == -1: last_point -= 1 for i in np.arange(origin, last_point, shift): extended_f0[i + shift], _ = SelectBestF0(tmp_f0, f0_candidates[:, i + shift], allowed_range) if extended_f0[i + shift] != 0: tmp_f0 = extended_f0[i + shift] count = 0 shifted_origin = i + shift else: count += + 1 if count == 4: break return extended_f0, shifted_origin def GetMultiChannelF0(f0, boundary_list): multi_channel_f0 = np.zeros((len(boundary_list) // 2, len(f0))) for i in np.arange(1, len(boundary_list) // 2 + 1): multi_channel_f0[i - 1, boundary_list[(i * 2) - 2] : boundary_list[i * 2 - 1] + 1] = f0[boundary_list[(i * 2) - 2] : boundary_list[(i * 2) - 1] + 1] return multi_channel_f0 def MergeF0(multi_channel_f0, range_, f0_candidates, f0_candidates_score): sorted_order = np.argsort(range_[:, 0], axis=0, kind='quicksort') f0 = multi_channel_f0[sorted_order[0], :] range_ = range_.astype(int) for i in np.arange(1, multi_channel_f0.shape[0]): if range_[sorted_order[i], 0] - range_[sorted_order[0], 1] > 0: f0[range_[sorted_order[i], 0] : range_[sorted_order[i], 1] + 1] = multi_channel_f0[sorted_order[i], range_[sorted_order[i], 0] : range_[sorted_order[i], 1] + 1] range_[sorted_order[0], 0] = range_[sorted_order[i], 0] range_[sorted_order[0], 1] = range_[sorted_order[i], 1] else: f0, range_[sorted_order[0], 1] = MergeF0Sub(f0, range_[sorted_order[0], 0], range_[sorted_order[0], 1], multi_channel_f0[sorted_order[i], :], range_[sorted_order[i], 0], range_[sorted_order[i], 1], f0_candidates, f0_candidates_score) return f0 def MergeF0Sub(f0_1, st1, ed1, f0_2, st2, ed2, f0_candidates, f0_candidates_score): merged_f0 = copy.deepcopy(f0_1) st1, st2, ed1, ed2 = int(st1), int(st2), int(ed1), int(ed2) if st1 <= st2 and ed1 >= ed2: new_ed = ed1 return merged_f0, new_ed new_ed = ed2 score1, score2 = 0, 0 for i in np.arange(st2, ed1 + 1): score1 = score1 + SerachScore(f0_1[i], f0_candidates[:, i], f0_candidates_score[:, i]) score2 = score2 + SerachScore(f0_2[i], f0_candidates[:, i], f0_candidates_score[:, i]) if score1 > score2: merged_f0[ed1 : ed2 + 1] = f0_2[ed1 : ed2 + 1] else: merged_f0[st2 : ed2 + 1] = f0_2[st2 : ed2 + 1] return merged_f0, new_ed def SerachScore(f0, f0_candidates, f0_candidates_score): score = 0 for i in range(f0_candidates.shape[0]): if f0 == f0_candidates[i] and score < f0_candidates_score[i]: score = f0_candidates_score[i] return score def GetF0Candidates(neg_loc, neg_f0, pos_loc, pos_f0, peak_loc, peak_f0, dip_loc, dip_f0, temporal_positions): interpolated_f0_list = np.zeros((4, np.size(temporal_positions))) if max(0, np.size(neg_loc) - 2) * max(0, np.size(pos_loc) - 2) * max(0, np.size(peak_loc) - 2) * max(0, np.size(dip_f0) - 2) > 0: interpolated_f0_list[0, :] = interp1d(neg_loc, neg_f0, fill_value='extrapolate')(temporal_positions) interpolated_f0_list[1, :] = interp1d(pos_loc, pos_f0, fill_value='extrapolate')(temporal_positions) interpolated_f0_list[2, :] = interp1d(peak_loc, peak_f0, fill_value='extrapolate')(temporal_positions) interpolated_f0_list[3, :] = interp1d(dip_loc, dip_f0, fill_value='extrapolate')(temporal_positions) interpolated_f0 = np.mean(interpolated_f0_list, axis=0) else: interpolated_f0 = temporal_positions * 0 return interpolated_f0 def SmoothF0(f0): smoothed_f0 = np.append(np.append(np.zeros(300), f0), np.zeros(300)) boundary_list = GetBoundaryList(smoothed_f0) for i in np.arange(1, len(boundary_list) // 2 + 1): tmp_f0_contour = FilterF0(GetMultiChannelF0(smoothed_f0, boundary_list)[i - 1, :], boundary_list[i * 2 - 2], boundary_list[i * 2 - 1], np.array([0.0078202080334971724, 0.015640416066994345, 0.007822412033497172]), np.array([1.0, -1.7347257688092754, 0.76600660094326412])) smoothed_f0[boundary_list[i * 2 - 2] : boundary_list[i * 2 - 1] + 1] = tmp_f0_contour[boundary_list[i * 2 - 2] : boundary_list[i * 2 - 1] + 1] return smoothed_f0[300 : len(smoothed_f0) - 300] def FilterF0(f0_contour, st, ed, b, a): smoothed_f0 = copy.deepcopy(f0_contour) smoothed_f0[0 : st] = smoothed_f0[st] smoothed_f0[ed + 1: ] = smoothed_f0[ed] smoothed_f0 = lfilter(b, a, lfilter(b, a, smoothed_f0, axis=0)[-1 : : -1], axis=0)[-1 : : -1] smoothed_f0[0 : st] = smoothed_f0[ed + 1: ] = 0 return smoothed_f0 def nuttall(N): return np.squeeze(np.asarray(np.array([0.355768, -0.487396, 0.144232, -0.012604]) @ np.cos(np.matrix([0,1,2,3]).T @ np.asmatrix(np.arange(N) * 2 * math.pi / (N-1))))) def GetBoundaryList(f0): vuv = np.array(f0) vuv[vuv != 0] = 1 vuv[0] = vuv[-1] = 0 boundary_list = np.where(np.diff(vuv) != 0)[0] boundary_list[0:: 2] += 1 return boundary_list def decimate_matlab(x, q, n=None, axis=-1): if not isinstance(q, int): raise TypeError if n is not None and not isinstance(n, int): raise TypeError system = signal.dlti(*signal.cheby1(n, 0.05, 0.8 / q)) y = signal.filtfilt(system.num, system.den, x, axis=axis, padlen=3 * (max(len(system.den), len(system.num)) - 1)) nd = len(y) return y[int(q - (q * np.ceil(nd / q) - nd)) - 1::q]