Spaces:
Running
Running
import gradio as gr | |
import librosa | |
import numpy as np | |
import soundfile as sf | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.cluster import KMeans | |
from transformers import pipeline | |
import noisereduce as nr | |
from sklearn.metrics import silhouette_score | |
print("Chargement du modèle Wav2Vec2...") | |
stt_pipeline = pipeline("automatic-speech-recognition", model="boumehdi/wav2vec2-large-xlsr-moroccan-darija") | |
print("Modèle chargé avec succès !") | |
def find_optimal_clusters(mfccs_scaled): | |
"""Trouve le nombre optimal de locuteurs en utilisant la méthode du score silhouette""" | |
best_score = -1 | |
best_n_clusters = 1 # Au moins 1 cluster (1 locuteur) | |
for n_clusters in range(1, 3): # On teste pour 1 ou 2 locuteurs | |
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) | |
labels = kmeans.fit_predict(mfccs_scaled) | |
if n_clusters > 1: | |
score = silhouette_score(mfccs_scaled, labels) # Score d’évaluation | |
if score > best_score: | |
best_score = score | |
best_n_clusters = n_clusters | |
return best_n_clusters | |
def process_audio(audio_path): | |
print(f"Fichier reçu : {audio_path}") | |
try: | |
# Charger uniquement les 30 premières secondes | |
audio, sr = librosa.load(audio_path, sr=None, duration=30) | |
print(f"Audio chargé : {len(audio)} échantillons à {sr} Hz") | |
# Réduction du bruit (amélioration du SNR) | |
audio_denoised = nr.reduce_noise(y=audio, sr=sr) | |
print("Bruit réduit.") | |
# Extraction des MFCC après réduction du bruit | |
mfccs = librosa.feature.mfcc(y=audio_denoised, sr=sr, n_mfcc=13) | |
print(f"MFCC extrait, shape: {mfccs.shape}") | |
# Normalisation | |
scaler = StandardScaler() | |
mfccs_scaled = scaler.fit_transform(mfccs.T) | |
print("MFCC normalisé.") | |
# Trouver le nombre optimal de locuteurs | |
optimal_clusters = find_optimal_clusters(mfccs_scaled) | |
print(f"Nombre optimal de locuteurs détecté : {optimal_clusters}") | |
# Appliquer KMeans avec le bon nombre de locuteurs | |
kmeans = KMeans(n_clusters=optimal_clusters, random_state=42, n_init=10) | |
speaker_labels = kmeans.fit_predict(mfccs_scaled) | |
# Regrouper les segments audio par speaker | |
speaker_audio = {speaker: [] for speaker in set(speaker_labels)} | |
segment_duration = len(audio_denoised) // len(speaker_labels) | |
for i in range(len(speaker_labels)): | |
start = i * segment_duration | |
end = start + segment_duration | |
speaker_id = speaker_labels[i] | |
speaker_audio[speaker_id].extend(audio_denoised[start:end]) | |
# Transcrire les segments fusionnés | |
result = [] | |
for speaker, audio_segment in speaker_audio.items(): | |
if len(audio_segment) == 0: | |
continue | |
temp_filename = f"temp_speaker_{speaker}.wav" | |
sf.write(temp_filename, np.array(audio_segment), sr) # Sauvegarder le segment | |
transcription = stt_pipeline(temp_filename) # Transcrire | |
result.append(f"Speaker {speaker}: {transcription['text']}") | |
print(f"Transcription Speaker {speaker} terminée.") | |
return "\n".join(result) | |
except Exception as e: | |
print(f"Erreur : {e}") | |
return "Une erreur s'est produite." | |
# Interface Gradio | |
print("Démarrage de Gradio...") | |
iface = gr.Interface( | |
fn=process_audio, | |
inputs=gr.Audio(type="filepath"), | |
outputs="text", | |
title="Speaker Diarization & Transcription", | |
description="Upload an audio file to detect speakers and transcribe speech for each segment." | |
) | |
iface.launch() | |
print("Interface lancée avec succès !") | |