|
import torch |
|
import numpy as np |
|
from speechbrain.pretrained import EncoderClassifier |
|
from pydub import AudioSegment |
|
from sklearn.cluster import DBSCAN |
|
import librosa |
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
classifier = EncoderClassifier.from_hparams(source="speechbrain/spkrec-ecapa-voxceleb", savedir="pretrained_models/spkrec-ecapa-voxceleb", run_opts={"device": device}) |
|
|
|
def extract_voice_embedding(audio_segment): |
|
signal = np.array(audio_segment.get_array_of_samples()) |
|
signal = signal.astype(np.float32) / 32768.0 |
|
embedding = classifier.encode_batch(torch.tensor(signal).unsqueeze(0)) |
|
return embedding.squeeze().cpu().numpy() |
|
|
|
def process_audio(audio_path, segment_duration=1000): |
|
audio = AudioSegment.from_file(audio_path) |
|
segments = [audio[i:i+segment_duration] for i in range(0, len(audio), segment_duration)] |
|
embeddings = [extract_voice_embedding(segment) for segment in segments] |
|
return embeddings |
|
|
|
def cluster_voices(embeddings): |
|
if len(embeddings) < 2: |
|
print("Not enough voice segments for clustering. Assigning all to one cluster.") |
|
return np.zeros(len(embeddings), dtype=int) |
|
|
|
X = np.stack(embeddings) |
|
dbscan = DBSCAN(eps=0.3, min_samples=5, metric='cosine') |
|
clusters = dbscan.fit_predict(X) |
|
|
|
if np.all(clusters == -1): |
|
print("DBSCAN assigned all to noise. Considering as one cluster.") |
|
return np.zeros(len(embeddings), dtype=int) |
|
|
|
return clusters |
|
|
|
def get_most_frequent_voice(embeddings, clusters): |
|
largest_cluster = max(set(clusters), key=list(clusters).count) |
|
return [emb for emb, cluster in zip(embeddings, clusters) if cluster == largest_cluster] |