from pyannote.audio import Pipeline from pydub import AudioSegment from tool.file_name import * import torch import json import gc import os gc.collect() torch.cuda.empty_cache() hugging_face_token = os.environ["HUGGING_FACE_TOKEN"] pipeline = Pipeline.from_pretrained( 'pyannote/speaker-diarization', use_auth_token=hugging_face_token) use_device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') pipeline.to(use_device) def start_diarization(input_file): diarization = pipeline(input_file) sample_groups = [] speaker_groups = {} for turn, _, speaker in diarization.itertracks(yield_label=True): if (speaker not in sample_groups): sample_groups.append(str(speaker)) suffix = 1 file_name = f"{speaker}-{suffix}" while file_name in speaker_groups: suffix += 1 file_name = f"{speaker}-{suffix}" speaker_groups[file_name] = [turn.start, turn.end] print(f"speaker_groups {file_name}: {speaker_groups[file_name]}") print( f"start={turn.start:.3f}s stop={turn.end:.3f}s speaker_{speaker}") save_groups_json(input_file, sample_groups, speaker_groups) audio_segmentation(input_file, speaker_groups) print(str(speaker_groups)) return str(speaker_groups) def audio_segmentation(input_file, speaker_groups_dict): audioSegment = AudioSegment.from_wav(input_file) for speaker in speaker_groups_dict: time = speaker_groups_dict[speaker] audioSegment[time[0]*1000: time[1] * 1000].export(f"{speaker}.wav", format='wav') print(f"group {speaker}: {time[0]*1000}--{time[1]*1000}") def save_groups_json(input_file, sample_groups_list: list, speaker_groups_dict: dict): with open(dir_sample_groups_json, "w", encoding="utf-8") as json_file_sample: json.dump(sample_groups_list, json_file_sample) with open(dir_speaker_groups_json, "w", encoding="utf-8") as json_file_speaker: json.dump(speaker_groups_dict, json_file_speaker)