tan-z-tan's picture
Reset
6274b4a
raw
history blame
5.96 kB
import gradio as gr
import numpy as np
import pandas as pd
import torch
import torchaudio
from datetime import datetime
from lang_id import identify_languages
from whisper import transcribe
# アプリケーションの状態を保持する変数
data = []
data_df = pd.DataFrame()
current_chunk = []
SAMPLING_RATE = 16000
CHUNK_DURATION = 5 # 初期値としての5秒
def normalize_audio(audio):
# 音量の正規化(最大振幅が1になるようにスケーリング)
audio = audio / np.max(np.abs(audio))
return audio
def resample_audio(audio, orig_sr, target_sr=16000):
if orig_sr != target_sr:
print(f"Resampling audio from {orig_sr} to {target_sr}")
audio = audio.astype(np.float32)
resampler = torchaudio.transforms.Resample(orig_freq=orig_sr, new_freq=target_sr)
audio = resampler(torch.from_numpy(audio).unsqueeze(0)).squeeze(0).numpy()
return audio
def process_chunk(chunk, language_set) -> pd.DataFrame:
print(f"Processing audio chunk of length {len(chunk)}")
volume_norm = np.linalg.norm(chunk)
length = len(chunk) / SAMPLING_RATE # 音声データの長さ(秒)
s = datetime.now()
selected_scores, all_scores = identify_languages(chunk, language_set)
lang_id_time = (datetime.now() - s).total_seconds()
# 日本語と英語の確率値を取得
ja_prob = selected_scores['Japanese']
en_prob = selected_scores['English']
ja_en = 'ja' if ja_prob > en_prob else 'en'
# Top 3言語を取得
top3_languages = ", ".join([f"{lang} ({all_scores[lang]:.2f})" for lang in sorted(all_scores, key=all_scores.get, reverse=True)[:3]])
# テキストの認識
s = datetime.now()
transcription = transcribe(chunk, language=ja_en)
transcribe_time = (datetime.now() - s).total_seconds()
return pd.DataFrame({
"Length (s)": [length],
"Volume": [volume_norm],
"Japanese_English": [f"{ja_en} ({ja_prob:.2f}, {en_prob:.2f})"],
"Language": [top3_languages],
"Lang ID Time": [lang_id_time],
"Transcribe Time": [transcribe_time],
"Text": [transcription],
})
def process_audio_stream(audio, chunk_duration, language_set):
global data_df, current_chunk, SAMPLING_RATE
print("Process_audio_stream")
if audio is None:
return None, data_df
sr, audio_data = audio
# language_set
language_set = [lang.strip() for lang in language_set.split(",")]
print(audio_data.shape, audio_data.dtype)
# 一番最初にSampling rateを揃えておく
audio_data = resample_audio(audio_data, sr, target_sr=SAMPLING_RATE)
audio_sec = 0
# 音量の正規化
audio_data = normalize_audio(audio_data)
current_chunk.append(audio_data)
total_chunk = np.concatenate(current_chunk)
# CHUNK_DURATIONを超えていたら処理
if len(total_chunk) >= SAMPLING_RATE * chunk_duration:
chunk = total_chunk[:SAMPLING_RATE * chunk_duration]
total_chunk = total_chunk[SAMPLING_RATE * chunk_duration:]
audio_sec += chunk_duration
df = process_chunk(chunk, language_set)
data_df = pd.concat([data_df, df], ignore_index=True)
current_chunk = [total_chunk]
return (SAMPLING_RATE, chunk), data_df
else:
return (SAMPLING_RATE, total_chunk), data_df
def process_audio(audio, chunk_duration, language_set):
global data, data_df, current_chunk, SAMPLING_RATE
# reset state
data = []
data_df = pd.DataFrame()
current_chunk = []
print("Process_audio")
print(audio)
if audio is None:
return
sr, audio_data = audio
# language_set
language_set = [lang.strip() for lang in language_set.split(",")]
print(audio_data.shape, audio_data.dtype)
# 一番最初にSampling rateを揃えておく
audio_data = resample_audio(audio_data, sr, target_sr=SAMPLING_RATE)
audio_sec = 0
# 音量の正規化
audio_data = normalize_audio(audio_data)
# 新しいデータを現在のチャンクに追加
current_chunk.append(audio_data)
total_chunk = np.concatenate(current_chunk)
while len(total_chunk) >= SAMPLING_RATE * chunk_duration:
chunk = total_chunk[:SAMPLING_RATE * chunk_duration]
total_chunk = total_chunk[SAMPLING_RATE * chunk_duration:] # 処理済みの部分を削除
audio_sec += chunk_duration
print(f"Processing audio chunk of length {len(chunk)}")
df = process_chunk(chunk, language_set)
data_df = pd.concat([data_df, df], ignore_index=True)
yield (SAMPLING_RATE, chunk), data_df
# 未処理の残りのデータを保持
current_chunk = [total_chunk]
# パラメータの入力コンポーネント
chunk_duration_input = gr.Number(value=5, label="Chunk Duration (seconds)")
language_set_input = gr.Textbox(value="Japanese,English", label="Language Set (comma-separated)")
inputs_file = [gr.Audio(sources=["upload"], type="numpy"), chunk_duration_input, language_set_input]
inputs_stream = [gr.Audio(sources=["microphone"], type="numpy", streaming=True), chunk_duration_input, language_set_input]
outputs = [gr.Audio(type="numpy"), gr.DataFrame(headers=["Time", "Volume", "Length (s)"])]
with gr.Blocks() as demo:
with gr.TabItem("Upload"):
gr.Interface(
fn=process_audio,
inputs=inputs_file,
outputs=outputs,
live=False,
title="File Audio Processing",
description="Upload an audio file to see the processing results."
)
with gr.TabItem("Microphone"):
gr.Interface(
fn=process_audio_stream,
inputs=inputs_stream,
outputs=outputs,
live=True,
title="Real-time Audio Processing",
description="Speak into the microphone and see real-time audio processing results."
)
if __name__ == "__main__":
demo.launch()