File size: 2,781 Bytes
a4db1a6
 
 
 
7bbb7f4
a4db1a6
93892b7
8a421c8
b568300
a4db1a6
 
 
 
 
d811c94
a4db1a6
a202d1e
3414f18
 
7bbb7f4
3414f18
a202d1e
a4db1a6
 
36f6fd8
9642c0e
4a67bd7
a4db1a6
4a67bd7
38c3415
36f6fd8
 
 
 
a4db1a6
 
7c1ee96
 
 
9263021
4a67bd7
36f6fd8
 
 
4a67bd7
36f6fd8
4a67bd7
 
 
 
 
 
 
d811c94
ce25ed6
a4db1a6
 
 
 
 
 
 
 
 
 
c5ca481
a4db1a6
ce25ed6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import moviepy.editor as mp
from pyannote.audio import Pipeline
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Segment
from pyannote.audio import Model
import os

def extract_audio_from_video(video_path):
    video = mp.VideoFileClip(video_path)
    audio_path = video_path.rsplit('.', 1)[0] + '.wav'
    video.audio.write_audiofile(audio_path)
    return audio_path

def diarize_speakers(audio_path):
    hf_token = os.environ.get("py_annote_hf_token")
    
    if not hf_token:
        raise ValueError("py_annote_hf_token environment variable is not set. Please check your Hugging Face Space's Variables and secrets section.")
    
    pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token)
    diarization = pipeline(audio_path)
    return diarization
    
def get_speaker_embeddings(audio_path, diarization, model_name="pyannote/embedding"):
    model = Model.from_pretrained(model_name, use_auth_token=os.environ.get("py_annote_hf_token"))
    waveform, sample_rate = torchaudio.load(audio_path)
    duration = waveform.shape[1] / sample_rate
    
    # Convert stereo to mono if necessary
    if waveform.shape[0] == 2:
        waveform = torch.mean(waveform, dim=0, keepdim=True)
    
    embeddings = []
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        start_frame = int(turn.start * sample_rate)
        end_frame = int(turn.end * sample_rate)
        segment = waveform[:, start_frame:end_frame]
        
        if segment.shape[1] > 0:
            # Ensure the segment is on the correct device
            segment = segment.to(model.device)
            
            with torch.no_grad():
                embedding = model(segment)
            embeddings.append({"time": turn.start, "duration": turn.duration, "embedding": embedding.cpu().numpy(), "speaker": speaker})
    
    # Ensure embeddings cover the entire duration
    if embeddings and embeddings[-1]['time'] + embeddings[-1]['duration'] < duration:
        embeddings.append({"time": duration, "duration": 0, "embedding": np.zeros_like(embeddings[0]['embedding']), "speaker": "silence"})
    
    return embeddings, duration

def align_voice_embeddings(voice_embeddings, frame_count, fps, audio_duration):
    aligned_embeddings = []
    current_embedding_index = 0
    
    for frame in range(frame_count):
        frame_time = frame / fps
        
        while (current_embedding_index < len(voice_embeddings) - 1 and 
               voice_embeddings[current_embedding_index + 1]["time"] <= frame_time):
            current_embedding_index += 1
        
        aligned_embeddings.append(voice_embeddings[current_embedding_index]["embedding"].flatten())
    
    return aligned_embeddings