import moviepy.editor as mp from pyannote.audio import Pipeline import torch import torchaudio from pyannote.core import Segment def extract_audio_from_video(video_path): video = mp.VideoFileClip(video_path) audio_path = video_path.rsplit('.', 1)[0] + '.wav' video.audio.write_audiofile(audio_path) return audio_path def diarize_speakers(audio_path): # Load the token from the environment variable hf_token = os.environ.get("HF_TOKEN") if not hf_token: raise ValueError("HF_TOKEN environment variable is not set. Please add it in your Hugging Face Space's Variables and secrets section.") pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=py_annote_hf_token) diarization = pipeline(audio_path) return diarization def get_speaker_embeddings(audio_path, diarization, model): waveform, sample_rate = torchaudio.load(audio_path) embeddings = [] for turn, _, speaker in diarization.itertracks(yield_label=True): start = int(turn.start * sample_rate) end = int(turn.end * sample_rate) segment = waveform[:, start:end] if segment.shape[1] == 0: continue with torch.no_grad(): embedding = model({"waveform": segment, "sample_rate": sample_rate}) embeddings.append({"time": turn.start, "embedding": embedding.squeeze().cpu().numpy(), "speaker": speaker}) return embeddings def align_voice_embeddings(voice_embeddings, frame_count, fps): aligned_embeddings = [] current_embedding_index = 0 for frame in range(frame_count): frame_time = frame / fps while (current_embedding_index < len(voice_embeddings) - 1 and voice_embeddings[current_embedding_index + 1]["time"] <= frame_time): current_embedding_index += 1 aligned_embeddings.append(voice_embeddings[current_embedding_index]["embedding"]) return np.array(aligned_embeddings)