Spaces:
Runtime error
Runtime error
import moviepy.editor as mp | |
from pyannote.audio import Pipeline | |
import torch | |
import torchaudio | |
from pyannote.audio import Pipeline | |
from pyannote.core import Segment | |
from pyannote.audio import Model | |
import os | |
def extract_audio_from_video(video_path): | |
video = mp.VideoFileClip(video_path) | |
audio_path = video_path.rsplit('.', 1)[0] + '.wav' | |
video.audio.write_audiofile(audio_path) | |
return audio_path | |
def diarize_speakers(audio_path): | |
hf_token = os.environ.get("py_annote_hf_token") | |
if not hf_token: | |
raise ValueError("py_annote_hf_token environment variable is not set. Please check your Hugging Face Space's Variables and secrets section.") | |
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token) | |
diarization = pipeline(audio_path) | |
return diarization | |
def get_speaker_embeddings(audio_path, diarization, model_name="pyannote/embedding"): | |
hf_token = os.environ.get("py_annote_hf_token") | |
if not hf_token: | |
raise ValueError("py_annote_hf_token environment variable is not set. Please check your Hugging Face Space's Variables and secrets section.") | |
model = Model.from_pretrained(model_name, use_auth_token=hf_token) | |
model.eval() # Set the model to evaluation mode | |
waveform, sample_rate = torchaudio.load(audio_path) | |
print(f"Sample rate: {sample_rate}") | |
print(f"Waveform shape: {waveform.shape}") | |
# Convert stereo to mono if necessary | |
if waveform.shape[0] == 2: | |
waveform = torch.mean(waveform, dim=0, keepdim=True) | |
embeddings = [] | |
for turn, _, speaker in diarization.itertracks(yield_label=True): | |
start_frame = int(turn.start * sample_rate) | |
end_frame = int(turn.end * sample_rate) | |
segment = waveform[:, start_frame:end_frame] | |
print(f"Segment shape before processing: {segment.shape}") | |
if segment.shape[1] == 0: | |
continue | |
# Ensure the segment is long enough (at least 2 seconds) | |
if segment.shape[1] < 2 * sample_rate: | |
padding = torch.zeros(1, 2 * sample_rate - segment.shape[1]) | |
segment = torch.cat([segment, padding], dim=1) | |
# Ensure the segment is not too long (maximum 10 seconds) | |
if segment.shape[1] > 10 * sample_rate: | |
segment = segment[:, :10 * sample_rate] | |
print(f"Segment shape after processing: {segment.shape}") | |
with torch.no_grad(): | |
embedding = model(segment) # Pass the tensor directly, not a dictionary | |
embeddings.append({"time": turn.start, "embedding": embedding.squeeze().cpu().numpy(), "speaker": speaker}) | |
return embeddings | |
def align_voice_embeddings(voice_embeddings, frame_count, fps): | |
aligned_embeddings = [] | |
current_embedding_index = 0 | |
for frame in range(frame_count): | |
frame_time = frame / fps | |
while (current_embedding_index < len(voice_embeddings) - 1 and | |
voice_embeddings[current_embedding_index + 1]["time"] <= frame_time): | |
current_embedding_index += 1 | |
aligned_embeddings.append(voice_embeddings[current_embedding_index]["embedding"]) | |
return np.array(aligned_embeddings) |