import gradio as gr | |
import os | |
import time | |
import sys | |
import subprocess | |
import tempfile | |
import requests | |
from urllib.parse import urlparse | |
# Clone and install faster-whisper from GitHub | |
try: | |["git", "clone", ""], check=True) | |["pip", "install", "-e", "./faster-whisper"], check=True) | |
except subprocess.CalledProcessError as e: | |
print(f"Error during faster-whisper installation: {e}") | |
sys.exit(1) | |
# Add the faster-whisper directory to the Python path | |
sys.path.append("./faster-whisper") | |
from faster_whisper import WhisperModel | |
from faster_whisper.transcribe import BatchedInferencePipeline | |
import yt_dlp | |
def download_audio(url, method_choice): | |
parsed_url = urlparse(url) | |
if parsed_url.netloc in ['', '', '']: | |
return download_youtube_audio(url, method_choice) | |
else: | |
return download_direct_audio(url, method_choice) | |
# Additional YouTube download methods | |
def download_youtube_audio(url, method_choice): | |
methods = { | |
'yt-dlp': youtube_dl_method, | |
'pytube': pytube_method, | |
'youtube-dl': youtube_dl_classic_method, | |
'yt-dlp-alt': youtube_dl_alternative_method, | |
'ffmpeg': ffmpeg_method, | |
'aria2': aria2_method | |
} | |
method = methods.get(method_choice, youtube_dl_method) | |
try: | |
return method(url) | |
except Exception as e: | |
return f"Error downloading using {method_choice}: {str(e)}" | |
def youtube_dl_method(url): | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
'outtmpl': '%(id)s.%(ext)s', | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=True) | |
return f"{info['id']}.mp3" | |
def pytube_method(url): | |
from pytube import YouTube | |
yt = YouTube(url) | |
audio_stream = yt.streams.filter(only_audio=True).first() | |
out_file = | |
base, ext = os.path.splitext(out_file) | |
new_file = base + '.mp3' | |
os.rename(out_file, new_file) | |
return new_file | |
def youtube_dl_classic_method(url): | |
# Classic youtube-dl method | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
'outtmpl': '%(id)s.%(ext)s', | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=True) | |
return f"{info['id']}.mp3" | |
def youtube_dl_alternative_method(url): | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
'outtmpl': '%(id)s.%(ext)s', | |
'no_warnings': True, | |
'quiet': True, | |
'no_check_certificate': True, | |
'prefer_insecure': True, | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=True) | |
return f"{info['id']}.mp3" | |
def ffmpeg_method(url): | |
output_file = tempfile.mktemp(suffix='.mp3') | |
command = ['ffmpeg', '-i', url, '-vn', '-acodec', 'libmp3lame', '-q:a', '2', output_file] | |, check=True, capture_output=True) | |
return output_file | |
def aria2_method(url): | |
output_file = tempfile.mktemp(suffix='.mp3') | |
command = ['aria2c', '--split=4', '--max-connection-per-server=4', '--out', output_file, url] | |, check=True, capture_output=True) | |
return output_file | |
def download_direct_audio(url, method_choice): | |
if method_choice == 'wget': | |
return wget_method(url) | |
else: | |
try: | |
response = requests.get(url) | |
if response.status_code == 200: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_file: | |
temp_file.write(response.content) | |
return | |
else: | |
raise Exception(f"Failed to download audio from {url}") | |
except Exception as e: | |
return f"Error downloading direct audio: {str(e)}" | |
def wget_method(url): | |
output_file = tempfile.mktemp(suffix='.mp3') | |
command = ['wget', '-O', output_file, url] | |, check=True, capture_output=True) | |
return output_file | |
def transcribe_audio(input_source, batch_size, download_method): | |
try: | |
# Initialize the model | |
model = WhisperModel("cstr/whisper-large-v3-turbo-int8_float32", device="auto", compute_type="int8") | |
batched_model = BatchedInferencePipeline(model=model) | |
# Handle input source | |
if isinstance(input_source, str) and (input_source.startswith('http://') or input_source.startswith('https://')): | |
# It's a URL, download the audio | |
audio_path = download_audio(input_source, download_method) | |
if audio_path.startswith("Error"): | |
yield f"Error: {audio_path}", "", None | |
return | |
else: | |
# It's a local file path | |
audio_path = input_source | |
# Benchmark transcription time | |
start_time = time.time() | |
segments, info = batched_model.transcribe(audio_path, batch_size=batch_size) | |
end_time = time.time() | |
# Show initial metrics as soon as possible | |
transcription_time = end_time - start_time | |
real_time_factor = info.duration / transcription_time | |
audio_file_size = os.path.getsize(audio_path) / (1024 * 1024) # Size in MB | |
metrics_output = ( | |
f"Language: {info.language}, Probability: {info.language_probability:.2f}\n" | |
f"Duration: {info.duration:.2f}s, Duration after VAD: {info.duration_after_vad:.2f}s\n" | |
f"Transcription time: {transcription_time:.2f} seconds\n" | |
f"Real-time factor: {real_time_factor:.2f}x\n" | |
f"Audio file size: {audio_file_size:.2f} MB\n" | |
) | |
yield metrics_output, "", None | |
transcription = "" | |
# Stream transcription output gradually | |
for segment in segments: | |
transcription_segment = f"[{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text}\n" | |
transcription += transcription_segment | |
yield metrics_output, transcription, None | |
# Final output with download option | |
transcription_file = save_transcription(transcription) | |
yield metrics_output, transcription, transcription_file | |
except Exception as e: | |
yield f"An error occurred: {str(e)}", "", None | |
finally: | |
# Clean up downloaded file if it was a URL | |
if isinstance(input_source, str) and (input_source.startswith('http://') or input_source.startswith('https://')): | |
try: | |
os.remove(audio_path) | |
except: | |
pass | |
def save_transcription(transcription): | |
file_path = tempfile.mktemp(suffix='.txt') | |
with open(file_path, 'w') as f: | |
f.write(transcription) | |
return file_path | |
# Gradio interface | |
iface = gr.Interface( | |
fn=transcribe_audio, | |
inputs=[ | |
gr.Textbox(label="Audio Source (Upload, MP3 URL, or YouTube URL)"), | |
gr.Slider(minimum=1, maximum=32, step=1, value=16, label="Batch Size"), | |
gr.Dropdown(choices=["yt-dlp", "pytube", "youtube-dl", "yt-dlp-alt", "ffmpeg", "aria2", "wget"], label="Download Method", value="yt-dlp") | |
], | |
outputs=[ | |
gr.Textbox(label="Transcription Metrics and Verbose Messages", live=True), | |
gr.Textbox(label="Transcription", live=True), | |
gr.File(label="Download Transcription") | |
], | |
title="Faster Whisper Multi-Input Transcription", | |
description="Enter an audio file path, MP3 URL, or YouTube URL to transcribe using Faster Whisper (GitHub version). Adjust the batch size and choose a download method.", | |
examples=[ | |
["", 16, "yt-dlp"], | |
["", 16, "ffmpeg"], | |
["path/to/local/audio.mp3", 16, "yt-dlp"] | |
], | |
cache_examples=False # Prevents automatic processing of examples | |
) | |
iface.launch() | |