Spaces:
Running
Running
import gradio as gr | |
import librosa | |
import numpy as np | |
import whisperx | |
from transformers import pipeline | |
from pydub import AudioSegment | |
import os | |
import scipy.signal as signal | |
import torch | |
from pydub.silence import detect_nonsilent # Correct import | |
hf_token = os.getenv('diarizationToken') | |
print("Initializing Speech-to-Text Model...") | |
stt_pipeline = pipeline("automatic-speech-recognition", model="boumehdi/wav2vec2-large-xlsr-moroccan-darija") | |
print("Model Loaded Successfully.") | |
# Initialize WhisperX with diarization (not transcription) | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_token, device=device) | |
print("WhisperX Model Loaded Successfully for Diarization.") | |
def remove_phone_tonalities(audio, sr): | |
nyquist = 0.5 * sr | |
low_cut = 300 / nyquist | |
high_cut = 3400 / nyquist | |
b, a = signal.butter(1, [low_cut, high_cut], btype='band') | |
filtered_audio = signal.filtfilt(b, a, audio) | |
return filtered_audio | |
def convert_audio_to_wav(audio_path): | |
# Convert any audio format to WAV using pydub | |
sound = AudioSegment.from_file(audio_path) | |
wav_path = "converted_audio.wav" | |
sound.export(wav_path, format="wav") | |
return wav_path | |
import gradio as gr | |
import librosa | |
import numpy as np | |
import whisperx | |
from transformers import pipeline | |
from pydub import AudioSegment | |
import os | |
import scipy.signal as signal | |
import torch | |
import pandas as pd | |
from pydub.silence import detect_nonsilent | |
hf_token = os.getenv('diarizationToken') | |
print("Initializing Speech-to-Text Model...") | |
stt_pipeline = pipeline("automatic-speech-recognition", model="boumehdi/wav2vec2-large-xlsr-moroccan-darija") | |
print("Model Loaded Successfully.") | |
# Initialize WhisperX with diarization | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
whisper_model = whisperx.load_model("large-v2", device) | |
diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_token, device=device) | |
print("WhisperX Model Loaded Successfully.") | |
def remove_phone_tonalities(audio, sr): | |
nyquist = 0.5 * sr | |
low_cut = 300 / nyquist | |
high_cut = 3400 / nyquist | |
b, a = signal.butter(1, [low_cut, high_cut], btype='band') | |
filtered_audio = signal.filtfilt(b, a, audio) | |
return filtered_audio | |
def process_audio(audio_path): | |
print(f"Received audio file: {audio_path}") | |
try: | |
# Load the audio file using librosa | |
audio, sr = librosa.load(audio_path, sr=None, duration=30) | |
print(f"Audio loaded: {len(audio)} samples at {sr} Hz") | |
# Remove phone tonalities (if any) | |
audio = remove_phone_tonalities(audio, sr) | |
print("Phone tonalities removed") | |
# Convert to AudioSegment for silence detection | |
sound = AudioSegment.from_wav(audio_path) | |
# Silence detection: split based on silence | |
min_silence_len = 1000 # minimum silence length in ms | |
silence_thresh = sound.dBFS - 14 # threshold for silence (adjust as needed) | |
# Correct usage of detect_nonsilent from pydub.silence | |
nonsilent_chunks = detect_nonsilent( | |
sound, | |
min_silence_len=min_silence_len, | |
silence_thresh=silence_thresh | |
) | |
non_silent_chunks = [ | |
sound[start:end] for start, end in nonsilent_chunks | |
] | |
# Apply diarization (WhisperX) | |
diarization = diarize_model(audio_path) | |
# Check if diarization is a DataFrame and process accordingly | |
if isinstance(diarization, pd.DataFrame): | |
print("Diarization is a DataFrame") | |
diarization = diarization.to_dict(orient="records") # Convert DataFrame to a list of dicts | |
transcriptions = [] | |
for chunk in non_silent_chunks: | |
chunk.export("chunk.wav", format="wav") | |
chunk_audio, chunk_sr = librosa.load("chunk.wav", sr=None) | |
transcription = stt_pipeline(chunk_audio) # Transcribe using Wav2Vec2 | |
# Match transcription segment with diarization result | |
speaker_label = "Unknown" | |
for speaker in diarization: | |
spk_start, spk_end, label = speaker['start'], speaker['end'], speaker['label'] | |
# Adjust timestamp matching | |
if spk_start <= (chunk.start_time / 1000) <= spk_end: # Convert ms to seconds | |
speaker_label = label | |
break | |
transcriptions.append(f"Speaker {speaker_label}: {transcription['text']}") | |
# Clean up temporary files | |
os.remove("chunk.wav") | |
return "\n".join(transcriptions) | |
except Exception as e: | |
print(f"Error: {str(e)}") | |
return f"Error: {str(e)}" | |
# Create Gradio interface | |
iface = gr.Interface( | |
fn=process_audio, | |
inputs=gr.Audio(type="filepath"), | |
outputs="text", | |
title="Speaker Diarization & Transcription", | |
description="Upload an audio file to detect speakers and transcribe speech for each segment." | |
) | |
print("Launching Gradio Interface...") | |
iface.launch() | |
print("Gradio Interface Launched Successfully.") | |