import moviepy.editor as mp from pyannote.audio import Pipeline import torch import torchaudio from pyannote.audio import Pipeline from pyannote.core import Segment from pyannote.audio import Model import os import numpy as np def extract_audio_from_video(video_path): video = mp.VideoFileClip(video_path) audio_path = video_path.rsplit('.', 1)[0] + '.wav' video.audio.write_audiofile(audio_path) return audio_path def diarize_speakers(audio_path): hf_token = os.environ.get("py_annote_hf_token") if not hf_token: raise ValueError("py_annote_hf_token environment variable is not set. Please check your Hugging Face Space's Variables and secrets section.") pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token) diarization = pipeline(audio_path) return diarization def get_speaker_embeddings(audio_path, diarization, model_name="pyannote/embedding"): model = Model.from_pretrained(model_name, use_auth_token=os.environ.get("py_annote_hf_token")) waveform, sample_rate = torchaudio.load(audio_path) duration = waveform.shape[1] / sample_rate # Convert stereo to mono if necessary if waveform.shape[0] == 2: waveform = torch.mean(waveform, dim=0, keepdim=True) # Minimum segment duration (in seconds) min_segment_duration = 0.5 min_segment_length = int(min_segment_duration * sample_rate) embeddings = [] for turn, _, speaker in diarization.itertracks(yield_label=True): start_frame = int(turn.start * sample_rate) end_frame = int(turn.end * sample_rate) segment = waveform[:, start_frame:end_frame] if segment.shape[1] > 0: # Pad short segments if segment.shape[1] < min_segment_length: padding = torch.zeros(1, min_segment_length - segment.shape[1]) segment = torch.cat([segment, padding], dim=1) # Split long segments for i in range(0, segment.shape[1], min_segment_length): sub_segment = segment[:, i:i+min_segment_length] if sub_segment.shape[1] < min_segment_length: padding = torch.zeros(1, min_segment_length - sub_segment.shape[1]) sub_segment = torch.cat([sub_segment, padding], dim=1) # Ensure the segment is on the correct device sub_segment = sub_segment.to(model.device) with torch.no_grad(): embedding = model(sub_segment) embeddings.append({ "time": turn.start + i / sample_rate, "duration": min_segment_duration, "embedding": embedding.cpu().numpy(), "speaker": speaker }) # Ensure embeddings cover the entire duration if embeddings and embeddings[-1]['time'] + embeddings[-1]['duration'] < duration: embeddings.append({ "time": duration, "duration": 0, "embedding": np.zeros_like(embeddings[0]['embedding']), "speaker": "silence" }) return embeddings, duration def align_voice_embeddings(voice_embeddings, frame_count, fps, audio_duration): aligned_embeddings = [] current_embedding_index = 0 for frame in range(frame_count): frame_time = frame / fps while (current_embedding_index < len(voice_embeddings) - 1 and voice_embeddings[current_embedding_index + 1]["time"] <= frame_time): current_embedding_index += 1 aligned_embeddings.append(voice_embeddings[current_embedding_index]["embedding"].flatten()) return aligned_embeddings