Spaces:
Build error
Build error
import datetime | |
import os | |
os.system('pip install git+https://github.com/openai/whisper.git') | |
from whisper.audio import N_SAMPLES | |
import gradio as gr | |
import wave | |
import whisper | |
import logging | |
import torchaudio | |
import torchaudio.functional as F | |
LOGGING_FORMAT = '%(asctime)s %(message)s' | |
logging.basicConfig(format=LOGGING_FORMAT,level=logging.INFO) | |
RECOGNITION_INTERVAL = 2 | |
CNT_PER_CHUNK = 6 | |
# tmp dir to store audio files. | |
if not os.path.isdir('./tmp/'): | |
os.mkdir('./tmp') | |
class WhisperStreaming(): | |
def __init__(self, model_name='base', language='en', fp16=False): | |
self.model_name = model_name | |
self.language = language | |
self.fp16 = fp16 | |
self.whisper_model = whisper.load_model(f'{model_name}.{language}') | |
self.decode_option = whisper.DecodingOptions(language=self.language, | |
without_timestamps=True, | |
fp16=self.fp16) | |
self.whisper_sample_rate = 16000 | |
def transcribe_audio_file(self, wave_file_path): | |
waveform, sample_rate = torchaudio.load(wave_file_path) | |
resampled_waveform = F.resample(waveform, sample_rate, self.whisper_sample_rate, lowpass_filter_width=6) | |
audio_tmp = whisper.pad_or_trim(resampled_waveform[0], length=N_SAMPLES) | |
mel = whisper.log_mel_spectrogram(audio_tmp) | |
results = self.whisper_model.decode(mel, self.decode_option) | |
return results | |
def concat_multiple_wav_files(wav_files): | |
logging.info(f'Concat {wav_files}') | |
concat_audio = [] | |
for wav_file in wav_files: | |
w = wave.open(wav_file, 'rb') | |
concat_audio.append([w.getparams(), w.readframes(w.getnframes())]) | |
w.close() | |
logging.info(f'Delete audio file {wav_file}') | |
os.remove(wav_file) | |
output_file_name = f'{datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")}.wav' | |
output_file_path = os.path.join('./tmp', output_file_name) | |
output = wave.open(output_file_path, 'wb') | |
output.setparams(concat_audio[0][0]) | |
for i in range(len(concat_audio)): | |
output.writeframes(concat_audio[i][1]) | |
output.close() | |
logging.info(f'Concat past {len(wav_files)} wav files into {output_file_path}') | |
return output_file_path | |
# fp16 indicates whether using Float16 or Float32. Normally, PyTorch does not support fp16 when run on CPU | |
whisper_model = WhisperStreaming(model_name='base', language='en', fp16=False) | |
def transcribe(audio, state={}): | |
logging.info(f'Transcribe audio file {audio}') | |
print('=====================') | |
logging.info(state) | |
# Whisper only take maximum 30s of audio as input. | |
# And the gradio streaming does not guarantee each callback is 1s, And I set CNT_PER_CHUNK as 6, it's just a rough guess that 6 callbacks does not sum up an audio longer than 30s. | |
# The logic of chunk splitting could be improved by reading exact how many samples in audio files. | |
# After count reach CNT_PER_CHUNK * n, a new audio file is created. | |
# However the text should not change. | |
if not state: | |
state['all_chunk_texts'] = 'Waitting...' | |
state['count'] = 0 | |
state['chunks'] = {} | |
return state['all_chunk_texts'], state | |
chunk = state['count'] // CNT_PER_CHUNK | |
chunk_offset = state['count'] % CNT_PER_CHUNK | |
if chunk_offset == 0: | |
state['chunks'][chunk] = {} | |
state['chunks'][chunk]['concated_audio'] = audio | |
state['chunks'][chunk]['result_text'] = '' | |
else: | |
state['chunks'][chunk]['concated_audio'] = concat_multiple_wav_files([state['chunks'][chunk]['concated_audio'], audio]) | |
state['count'] += 1 | |
# Determin if recognizes current chunk. | |
if (chunk_offset + 1) % RECOGNITION_INTERVAL == 0 and chunk_offset > 0: | |
logging.info(f'start to transcribe chunk: {chunk}, offset: {chunk_offset}') | |
result = whisper_model.transcribe_audio_file(state['chunks'][chunk]['concated_audio']) | |
logging.info('complete transcribe.......') | |
state['chunks'][chunk]['result_text'] = result.text | |
logging.info('The text is:' + state['chunks'][chunk]['result_text']) | |
else: | |
logging.info(f'The offset of streaming chunk is {chunk_offset}, and skip speech recognition') | |
# Concat result_texts of all chunks | |
result_texts = '' | |
for tmp_chunk_idx, tmp_chunk_values in state['chunks'].items(): | |
result_texts += tmp_chunk_values['result_text'] + ' ' | |
state['all_chunk_texts'] = result_texts | |
return state['all_chunk_texts'], state | |
# Make sure not missing any audio clip. | |
assert CNT_PER_CHUNK % RECOGNITION_INTERVAL == 0 | |
STEP_ONE_DESCRIPTION = ''' | |
Model: base | |
Language: en | |
<div> | |
<h3> | |
Step1. Click button <i>"Record from microphone"</i> and allow this site to use your microphone. | |
</h3> | |
<note>Right now the continuous Speech to text transcription is lag and sometimes missing some sentences...</note> | |
</div> | |
''' | |
STEP_TWO_DESCRIPTION = ''' | |
<div align=center> | |
<h3 style="font-weight: 900; margin-bottom: 7px;"> | |
Step2. Try to play the video and see how Whisper transcribe! | |
</h3> | |
<p> | |
Note: make sure using speaker that your computer microphone is able to hear! i.e. computer default speaker | |
</p> | |
<video id="video" width=50% controls="" preload="none"> | |
<source id="mp4" src="https://nomorewzx.github.io/near-continuous-whispering/demo_video/whisper_demo.mp4" type="video/mp4"> | |
</videos> | |
</div> | |
''' | |
gr.Interface(fn=transcribe, | |
inputs=[gr.Audio(source="microphone", type='filepath', streaming=True), 'state'], | |
outputs = ['text', 'state'], | |
description=STEP_ONE_DESCRIPTION, | |
article=STEP_TWO_DESCRIPTION, | |
live=True).queue(concurrency_count=5).launch() |