File size: 3,262 Bytes
a4db1a6
 
 
 
7bbb7f4
a4db1a6
93892b7
8a421c8
b568300
a4db1a6
 
 
 
 
d811c94
a4db1a6
a202d1e
3414f18
 
7bbb7f4
3414f18
a202d1e
a4db1a6
 
b568300
9642c0e
 
 
 
 
 
 
7c1ee96
9642c0e
a4db1a6
9263021
38c3415
 
 
 
 
 
a4db1a6
b568300
a4db1a6
7c1ee96
 
a4db1a6
7c1ee96
9263021
 
a4db1a6
 
b568300
b9da44c
 
 
7c1ee96
b9da44c
 
 
 
 
9263021
7c1ee96
a4db1a6
7c1ee96
a4db1a6
 
b568300
a4db1a6
d811c94
a4db1a6
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import moviepy.editor as mp
from pyannote.audio import Pipeline
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Segment
from pyannote.audio import Model
import os

def extract_audio_from_video(video_path):
    video = mp.VideoFileClip(video_path)
    audio_path = video_path.rsplit('.', 1)[0] + '.wav'
    video.audio.write_audiofile(audio_path)
    return audio_path

def diarize_speakers(audio_path):
    hf_token = os.environ.get("py_annote_hf_token")
    
    if not hf_token:
        raise ValueError("py_annote_hf_token environment variable is not set. Please check your Hugging Face Space's Variables and secrets section.")
    
    pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token)
    diarization = pipeline(audio_path)
    return diarization

def get_speaker_embeddings(audio_path, diarization, model_name="pyannote/embedding"):
    hf_token = os.environ.get("py_annote_hf_token")
    
    if not hf_token:
        raise ValueError("py_annote_hf_token environment variable is not set. Please check your Hugging Face Space's Variables and secrets section.")

    model = Model.from_pretrained(model_name, use_auth_token=hf_token)
    model.eval()  # Set the model to evaluation mode
    
    waveform, sample_rate = torchaudio.load(audio_path)
    print(f"Sample rate: {sample_rate}")
    print(f"Waveform shape: {waveform.shape}")
    
    # Convert stereo to mono if necessary
    if waveform.shape[0] == 2:
        waveform = torch.mean(waveform, dim=0, keepdim=True)
    
    embeddings = []

    for turn, _, speaker in diarization.itertracks(yield_label=True):
        start_frame = int(turn.start * sample_rate)
        end_frame = int(turn.end * sample_rate)
        
        segment = waveform[:, start_frame:end_frame]
        print(f"Segment shape before processing: {segment.shape}")
        
        if segment.shape[1] == 0:
            continue

        # Ensure the segment is long enough (at least 2 seconds)
        if segment.shape[1] < 2 * sample_rate:
            padding = torch.zeros(1, 2 * sample_rate - segment.shape[1])
            segment = torch.cat([segment, padding], dim=1)
        
        # Ensure the segment is not too long (maximum 10 seconds)
        if segment.shape[1] > 10 * sample_rate:
            segment = segment[:, :10 * sample_rate]

        print(f"Segment shape after processing: {segment.shape}")

        with torch.no_grad():
            embedding = model(segment)  # Pass the tensor directly, not a dictionary
        
        embeddings.append({"time": turn.start, "embedding": embedding.squeeze().cpu().numpy(), "speaker": speaker})

    return embeddings

def align_voice_embeddings(voice_embeddings, frame_count, fps):
    aligned_embeddings = []
    current_embedding_index = 0
    
    for frame in range(frame_count):
        frame_time = frame / fps
        
        while (current_embedding_index < len(voice_embeddings) - 1 and 
               voice_embeddings[current_embedding_index + 1]["time"] <= frame_time):
            current_embedding_index += 1
        
        aligned_embeddings.append(voice_embeddings[current_embedding_index]["embedding"])
    
    return np.array(aligned_embeddings)