Mohssinibra's picture
Update app.py
0cf693f verified
raw
history blame
3.8 kB
import gradio as gr
import librosa
import numpy as np
import soundfile as sf
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from transformers import pipeline
import noisereduce as nr
from sklearn.metrics import silhouette_score
print("Chargement du modèle Wav2Vec2...")
stt_pipeline = pipeline("automatic-speech-recognition", model="boumehdi/wav2vec2-large-xlsr-moroccan-darija")
print("Modèle chargé avec succès !")
def find_optimal_clusters(mfccs_scaled):
"""Trouve le nombre optimal de locuteurs en utilisant la méthode du score silhouette"""
best_score = -1
best_n_clusters = 1 # Au moins 1 cluster (1 locuteur)
for n_clusters in range(1, 3): # On teste pour 1 ou 2 locuteurs
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
labels = kmeans.fit_predict(mfccs_scaled)
if n_clusters > 1:
score = silhouette_score(mfccs_scaled, labels) # Score d’évaluation
if score > best_score:
best_score = score
best_n_clusters = n_clusters
return best_n_clusters
def process_audio(audio_path):
print(f"Fichier reçu : {audio_path}")
try:
# Charger uniquement les 30 premières secondes
audio, sr = librosa.load(audio_path, sr=None, duration=30)
print(f"Audio chargé : {len(audio)} échantillons à {sr} Hz")
# Réduction du bruit (amélioration du SNR)
audio_denoised = nr.reduce_noise(y=audio, sr=sr)
print("Bruit réduit.")
# Extraction des MFCC après réduction du bruit
mfccs = librosa.feature.mfcc(y=audio_denoised, sr=sr, n_mfcc=13)
print(f"MFCC extrait, shape: {mfccs.shape}")
# Normalisation
scaler = StandardScaler()
mfccs_scaled = scaler.fit_transform(mfccs.T)
print("MFCC normalisé.")
# Trouver le nombre optimal de locuteurs
optimal_clusters = find_optimal_clusters(mfccs_scaled)
print(f"Nombre optimal de locuteurs détecté : {optimal_clusters}")
# Appliquer KMeans avec le bon nombre de locuteurs
kmeans = KMeans(n_clusters=optimal_clusters, random_state=42, n_init=10)
speaker_labels = kmeans.fit_predict(mfccs_scaled)
# Regrouper les segments audio par speaker
speaker_audio = {speaker: [] for speaker in set(speaker_labels)}
segment_duration = len(audio_denoised) // len(speaker_labels)
for i in range(len(speaker_labels)):
start = i * segment_duration
end = start + segment_duration
speaker_id = speaker_labels[i]
speaker_audio[speaker_id].extend(audio_denoised[start:end])
# Transcrire les segments fusionnés
result = []
for speaker, audio_segment in speaker_audio.items():
if len(audio_segment) == 0:
continue
temp_filename = f"temp_speaker_{speaker}.wav"
sf.write(temp_filename, np.array(audio_segment), sr) # Sauvegarder le segment
transcription = stt_pipeline(temp_filename) # Transcrire
result.append(f"Speaker {speaker}: {transcription['text']}")
print(f"Transcription Speaker {speaker} terminée.")
return "\n".join(result)
except Exception as e:
print(f"Erreur : {e}")
return "Une erreur s'est produite."
# Interface Gradio
print("Démarrage de Gradio...")
iface = gr.Interface(
fn=process_audio,
inputs=gr.Audio(type="filepath"),
outputs="text",
title="Speaker Diarization & Transcription",
description="Upload an audio file to detect speakers and transcribe speech for each segment."
)
iface.launch()
print("Interface lancée avec succès !")