|
import torch |
|
import gradio as gr |
|
import time |
|
import numpy as np |
|
import scipy.io.wavfile |
|
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline |
|
|
|
|
|
device = "cpu" |
|
torch_dtype = torch.float32 |
|
MODEL_NAME = "openai/whisper-tiny" |
|
|
|
|
|
model = AutoModelForSpeechSeq2Seq.from_pretrained( |
|
MODEL_NAME, torch_dtype=torch_dtype, use_safetensors=True |
|
) |
|
model.to(device) |
|
|
|
|
|
processor = AutoProcessor.from_pretrained(MODEL_NAME) |
|
|
|
pipe = pipeline( |
|
task="automatic-speech-recognition", |
|
model=model, |
|
tokenizer=processor.tokenizer, |
|
feature_extractor=processor.feature_extractor, |
|
chunk_length_s=2, |
|
torch_dtype=torch_dtype, |
|
device=device, |
|
) |
|
|
|
|
|
def stream_transcribe(stream, new_chunk): |
|
start_time = time.time() |
|
try: |
|
sr, y = new_chunk |
|
|
|
|
|
if y.ndim > 1: |
|
y = y.mean(axis=1) |
|
|
|
y = y.astype(np.float32) |
|
y /= np.max(np.abs(y)) |
|
|
|
|
|
if stream is not None: |
|
stream = np.concatenate([stream, y]) |
|
else: |
|
stream = y |
|
|
|
|
|
transcription = pipe({"sampling_rate": sr, "raw": stream})["text"] |
|
latency = time.time() - start_time |
|
|
|
return stream, transcription, f"{latency:.2f} sec" |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
return stream, str(e), "Error" |
|
|
|
|
|
def transcribe(inputs, previous_transcription): |
|
start_time = time.time() |
|
try: |
|
|
|
sample_rate, audio_data = inputs |
|
transcription = pipe({"sampling_rate": sample_rate, "raw": audio_data})["text"] |
|
|
|
previous_transcription += transcription |
|
latency = time.time() - start_time |
|
|
|
return previous_transcription, f"{latency:.2f} sec" |
|
|
|
except Exception as e: |
|
print(f"Error: {e}") |
|
return previous_transcription, "Error" |
|
|
|
|
|
def clear(): |
|
return "" |
|
|
|
|
|
with gr.Blocks() as microphone: |
|
gr.Markdown(f"# Whisper Tiny - Real-Time Transcription (CPU) ποΈ") |
|
gr.Markdown(f"Using [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) for ultra-fast speech-to-text.") |
|
|
|
with gr.Row(): |
|
input_audio_microphone = gr.Audio(sources=["microphone"], type="numpy", streaming=True) |
|
output = gr.Textbox(label="Live Transcription", value="") |
|
latency_textbox = gr.Textbox(label="Latency (seconds)", value="0.0") |
|
|
|
with gr.Row(): |
|
clear_button = gr.Button("Clear Output") |
|
|
|
state = gr.State() |
|
input_audio_microphone.stream( |
|
stream_transcribe, [state, input_audio_microphone], |
|
[state, output, latency_textbox], time_limit=30, stream_every=1 |
|
) |
|
clear_button.click(clear, outputs=[output]) |
|
|
|
|
|
with gr.Blocks() as file: |
|
gr.Markdown(f"# Upload Audio File for Transcription π΅") |
|
gr.Markdown(f"Using [{MODEL_NAME}](https://huggingface.co/{MODEL_NAME}) for speech-to-text.") |
|
|
|
with gr.Row(): |
|
input_audio = gr.Audio(sources=["upload"], type="numpy") |
|
output = gr.Textbox(label="Transcription", value="") |
|
latency_textbox = gr.Textbox(label="Latency (seconds)", value="0.0") |
|
|
|
with gr.Row(): |
|
submit_button = gr.Button("Submit") |
|
clear_button = gr.Button("Clear Output") |
|
|
|
submit_button.click(transcribe, [input_audio, output], [output, latency_textbox]) |
|
clear_button.click(clear, outputs=[output]) |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Ocean()) as demo: |
|
gr.TabbedInterface([microphone, file], ["Microphone", "Upload Audio"]) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|