File size: 3,262 Bytes
a4db1a6 7bbb7f4 a4db1a6 93892b7 8a421c8 b568300 a4db1a6 d811c94 a4db1a6 a202d1e 3414f18 7bbb7f4 3414f18 a202d1e a4db1a6 b568300 9642c0e 7c1ee96 9642c0e a4db1a6 9263021 38c3415 a4db1a6 b568300 a4db1a6 7c1ee96 a4db1a6 7c1ee96 9263021 a4db1a6 b568300 b9da44c 7c1ee96 b9da44c 9263021 7c1ee96 a4db1a6 7c1ee96 a4db1a6 b568300 a4db1a6 d811c94 a4db1a6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
import moviepy.editor as mp
from pyannote.audio import Pipeline
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Segment
from pyannote.audio import Model
import os
def extract_audio_from_video(video_path):
video = mp.VideoFileClip(video_path)
audio_path = video_path.rsplit('.', 1)[0] + '.wav'
video.audio.write_audiofile(audio_path)
return audio_path
def diarize_speakers(audio_path):
hf_token = os.environ.get("py_annote_hf_token")
if not hf_token:
raise ValueError("py_annote_hf_token environment variable is not set. Please check your Hugging Face Space's Variables and secrets section.")
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token)
diarization = pipeline(audio_path)
return diarization
def get_speaker_embeddings(audio_path, diarization, model_name="pyannote/embedding"):
hf_token = os.environ.get("py_annote_hf_token")
if not hf_token:
raise ValueError("py_annote_hf_token environment variable is not set. Please check your Hugging Face Space's Variables and secrets section.")
model = Model.from_pretrained(model_name, use_auth_token=hf_token)
model.eval() # Set the model to evaluation mode
waveform, sample_rate = torchaudio.load(audio_path)
print(f"Sample rate: {sample_rate}")
print(f"Waveform shape: {waveform.shape}")
# Convert stereo to mono if necessary
if waveform.shape[0] == 2:
waveform = torch.mean(waveform, dim=0, keepdim=True)
embeddings = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
start_frame = int(turn.start * sample_rate)
end_frame = int(turn.end * sample_rate)
segment = waveform[:, start_frame:end_frame]
print(f"Segment shape before processing: {segment.shape}")
if segment.shape[1] == 0:
continue
# Ensure the segment is long enough (at least 2 seconds)
if segment.shape[1] < 2 * sample_rate:
padding = torch.zeros(1, 2 * sample_rate - segment.shape[1])
segment = torch.cat([segment, padding], dim=1)
# Ensure the segment is not too long (maximum 10 seconds)
if segment.shape[1] > 10 * sample_rate:
segment = segment[:, :10 * sample_rate]
print(f"Segment shape after processing: {segment.shape}")
with torch.no_grad():
embedding = model(segment) # Pass the tensor directly, not a dictionary
embeddings.append({"time": turn.start, "embedding": embedding.squeeze().cpu().numpy(), "speaker": speaker})
return embeddings
def align_voice_embeddings(voice_embeddings, frame_count, fps):
aligned_embeddings = []
current_embedding_index = 0
for frame in range(frame_count):
frame_time = frame / fps
while (current_embedding_index < len(voice_embeddings) - 1 and
voice_embeddings[current_embedding_index + 1]["time"] <= frame_time):
current_embedding_index += 1
aligned_embeddings.append(voice_embeddings[current_embedding_index]["embedding"])
return np.array(aligned_embeddings) |