|
import moviepy.editor as mp |
|
from pyannote.audio import Pipeline |
|
import torch |
|
import torchaudio |
|
from pyannote.core import Segment |
|
|
|
def extract_audio_from_video(video_path): |
|
video = mp.VideoFileClip(video_path) |
|
audio_path = video_path.rsplit('.', 1)[0] + '.wav' |
|
video.audio.write_audiofile(audio_path) |
|
return audio_path |
|
|
|
def diarize_speakers(audio_path): |
|
|
|
hf_token = os.environ.get("HF_TOKEN") |
|
|
|
if not hf_token: |
|
raise ValueError("HF_TOKEN environment variable is not set. Please add it in your Hugging Face Space's Variables and secrets section.") |
|
|
|
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=py_annote_hf_token) |
|
diarization = pipeline(audio_path) |
|
return diarization |
|
|
|
def get_speaker_embeddings(audio_path, diarization, model): |
|
waveform, sample_rate = torchaudio.load(audio_path) |
|
embeddings = [] |
|
|
|
for turn, _, speaker in diarization.itertracks(yield_label=True): |
|
start = int(turn.start * sample_rate) |
|
end = int(turn.end * sample_rate) |
|
|
|
segment = waveform[:, start:end] |
|
if segment.shape[1] == 0: |
|
continue |
|
|
|
with torch.no_grad(): |
|
embedding = model({"waveform": segment, "sample_rate": sample_rate}) |
|
|
|
embeddings.append({"time": turn.start, "embedding": embedding.squeeze().cpu().numpy(), "speaker": speaker}) |
|
|
|
return embeddings |
|
|
|
def align_voice_embeddings(voice_embeddings, frame_count, fps): |
|
aligned_embeddings = [] |
|
current_embedding_index = 0 |
|
|
|
for frame in range(frame_count): |
|
frame_time = frame / fps |
|
|
|
while (current_embedding_index < len(voice_embeddings) - 1 and |
|
voice_embeddings[current_embedding_index + 1]["time"] <= frame_time): |
|
current_embedding_index += 1 |
|
|
|
aligned_embeddings.append(voice_embeddings[current_embedding_index]["embedding"]) |
|
|
|
return np.array(aligned_embeddings) |