|
import moviepy.editor as mp |
|
from pyannote.audio import Pipeline |
|
import torch |
|
import torchaudio |
|
from pyannote.audio import Pipeline |
|
from pyannote.core import Segment |
|
from pyannote.audio import Model |
|
import os |
|
import numpy as np |
|
|
|
def extract_audio_from_video(video_path): |
|
video = mp.VideoFileClip(video_path) |
|
audio_path = video_path.rsplit('.', 1)[0] + '.wav' |
|
video.audio.write_audiofile(audio_path) |
|
return audio_path |
|
|
|
def diarize_speakers(audio_path): |
|
hf_token = os.environ.get("py_annote_hf_token") |
|
|
|
if not hf_token: |
|
raise ValueError("py_annote_hf_token environment variable is not set. Please check your Hugging Face Space's Variables and secrets section.") |
|
|
|
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token) |
|
diarization = pipeline(audio_path) |
|
|
|
|
|
speaker_segments = {} |
|
for turn, _, speaker in diarization.itertracks(yield_label=True): |
|
if speaker not in speaker_segments: |
|
speaker_segments[speaker] = 0 |
|
speaker_segments[speaker] += turn.end - turn.start |
|
|
|
most_frequent_speaker = max(speaker_segments, key=speaker_segments.get) |
|
|
|
return diarization, most_frequent_speaker |
|
|
|
|
|
def get_speaker_embeddings(audio_path, diarization, most_frequent_speaker, model_name="pyannote/embedding"): |
|
model = Model.from_pretrained(model_name, use_auth_token=os.environ.get("py_annote_hf_token")) |
|
waveform, sample_rate = torchaudio.load(audio_path) |
|
duration = waveform.shape[1] / sample_rate |
|
|
|
|
|
if waveform.shape[0] == 2: |
|
waveform = torch.mean(waveform, dim=0, keepdim=True) |
|
|
|
|
|
min_segment_duration = 0.5 |
|
min_segment_length = int(min_segment_duration * sample_rate) |
|
|
|
embeddings = [] |
|
for turn, _, speaker in diarization.itertracks(yield_label=True): |
|
if speaker != most_frequent_speaker: |
|
continue |
|
|
|
start_frame = int(turn.start * sample_rate) |
|
end_frame = int(turn.end * sample_rate) |
|
segment = waveform[:, start_frame:end_frame] |
|
|
|
if segment.shape[1] > 0: |
|
|
|
if segment.shape[1] < min_segment_length: |
|
padding = torch.zeros(1, min_segment_length - segment.shape[1]) |
|
segment = torch.cat([segment, padding], dim=1) |
|
|
|
|
|
for i in range(0, segment.shape[1], min_segment_length): |
|
sub_segment = segment[:, i:i+min_segment_length] |
|
if sub_segment.shape[1] < min_segment_length: |
|
padding = torch.zeros(1, min_segment_length - sub_segment.shape[1]) |
|
sub_segment = torch.cat([sub_segment, padding], dim=1) |
|
|
|
|
|
sub_segment = sub_segment.to(model.device) |
|
|
|
with torch.no_grad(): |
|
embedding = model(sub_segment) |
|
embeddings.append({ |
|
"time": turn.start + i / sample_rate, |
|
"duration": min_segment_duration, |
|
"embedding": embedding.cpu().numpy(), |
|
"speaker": speaker |
|
}) |
|
|
|
|
|
if embeddings and embeddings[-1]['time'] + embeddings[-1]['duration'] < duration: |
|
embeddings.append({ |
|
"time": duration, |
|
"duration": 0, |
|
"embedding": np.zeros_like(embeddings[0]['embedding']), |
|
"speaker": "silence" |
|
}) |
|
|
|
return embeddings, duration |
|
|
|
|
|
|
|
if embeddings and embeddings[-1]['time'] + embeddings[-1]['duration'] < duration: |
|
embeddings.append({ |
|
"time": duration, |
|
"duration": 0, |
|
"embedding": np.zeros_like(embeddings[0]['embedding']), |
|
"speaker": "silence" |
|
}) |
|
|
|
return embeddings, duration |
|
|
|
def align_voice_embeddings(voice_embeddings, frame_count, fps, audio_duration): |
|
aligned_embeddings = [] |
|
current_embedding_index = 0 |
|
|
|
for frame in range(frame_count): |
|
frame_time = frame / fps |
|
|
|
while (current_embedding_index < len(voice_embeddings) - 1 and |
|
voice_embeddings[current_embedding_index + 1]["time"] <= frame_time): |
|
current_embedding_index += 1 |
|
|
|
aligned_embeddings.append(voice_embeddings[current_embedding_index]["embedding"].flatten()) |
|
|
|
return aligned_embeddings |