|
import wave |
|
import io |
|
from providers.chunk_provider import chunk_text |
|
from kokoro import generate, generate_full |
|
import numpy as np |
|
import asyncio |
|
from concurrent.futures import ThreadPoolExecutor |
|
tts_pool = ThreadPoolExecutor(max_workers=10) |
|
|
|
|
|
def save_audio_to_file(audio_data, file_number, sample_rate=24000): |
|
filename = f"output-{file_number}.wav" |
|
with wave.open(filename, 'wb') as wav_file: |
|
wav_file.setnchannels(1) |
|
wav_file.setsampwidth(2) |
|
wav_file.setframerate(sample_rate) |
|
audio_int16 = (audio_data * 32767).astype(np.int16) |
|
wav_file.writeframes(audio_int16.tobytes()) |
|
return filename |
|
|
|
|
|
def get_audio_bytes(audio_data, sample_rate=24000): |
|
wav_bytes = io.BytesIO() |
|
with wave.open(wav_bytes, 'wb') as wav_file: |
|
wav_file.setnchannels(1) |
|
wav_file.setsampwidth(2) |
|
wav_file.setframerate(sample_rate) |
|
audio_int16 = (audio_data * 32767).astype(np.int16) |
|
wav_file.writeframes(audio_int16.tobytes()) |
|
wav_bytes.seek(0) |
|
return wav_bytes.read() |
|
|
|
|
|
def dummy_bytes(): |
|
buffer = io.BytesIO() |
|
dummy_data = b"This is a test of dummy byte data." |
|
buffer.write(dummy_data) |
|
buffer.seek(0) |
|
byte_value = buffer.getvalue() |
|
return byte_value |
|
|
|
|
|
def generate_audio_from_chunks(text, model, voicepack, voice_name): |
|
chunks = chunk_text(text) |
|
combined_audio = np.array([]) |
|
for chunk in chunks: |
|
try: |
|
audio, _ = generate(model, chunk, voicepack, lang=voice_name[0]) |
|
combined_audio = np.concatenate( |
|
[combined_audio, audio]) if combined_audio.size > 0 else audio |
|
except Exception: |
|
pass |
|
return combined_audio |
|
|
|
|
|
async def generate_audio_stream(text, model, vp, name): |
|
for chunk in chunk_text(text): |
|
audio, _ = await asyncio.get_running_loop().run_in_executor( |
|
tts_pool, generate_full, model, chunk, vp, name[0] |
|
) |
|
yield audio |
|
|