VocRT / providers /audio_provider.py
Anurag
version-2 initial version
5306da4
import wave
import io
from providers.chunk_provider import chunk_text
from kokoro import generate, generate_full
import numpy as np
import asyncio
from concurrent.futures import ThreadPoolExecutor
tts_pool = ThreadPoolExecutor(max_workers=10)
def save_audio_to_file(audio_data, file_number, sample_rate=24000):
filename = f"output-{file_number}.wav"
with wave.open(filename, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
audio_int16 = (audio_data * 32767).astype(np.int16)
wav_file.writeframes(audio_int16.tobytes())
return filename
def get_audio_bytes(audio_data, sample_rate=24000):
wav_bytes = io.BytesIO()
with wave.open(wav_bytes, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
audio_int16 = (audio_data * 32767).astype(np.int16)
wav_file.writeframes(audio_int16.tobytes())
wav_bytes.seek(0)
return wav_bytes.read()
def dummy_bytes():
buffer = io.BytesIO()
dummy_data = b"This is a test of dummy byte data."
buffer.write(dummy_data)
buffer.seek(0)
byte_value = buffer.getvalue()
return byte_value
def generate_audio_from_chunks(text, model, voicepack, voice_name):
chunks = chunk_text(text)
combined_audio = np.array([])
for chunk in chunks:
try:
audio, _ = generate(model, chunk, voicepack, lang=voice_name[0])
combined_audio = np.concatenate(
[combined_audio, audio]) if combined_audio.size > 0 else audio
except Exception:
pass
return combined_audio
async def generate_audio_stream(text, model, vp, name):
for chunk in chunk_text(text):
audio, _ = await asyncio.get_running_loop().run_in_executor(
tts_pool, generate_full, model, chunk, vp, name[0]
)
yield audio