hanhaoniao1115's picture
Upload 172 files
911fcc1 verified
import io
import os
import time
from http import HTTPStatus
import numpy as np
import ormsgpack
import soundfile as sf
import torch
from kui.asgi import Body, HTTPException, JSONResponse, Routes, StreamResponse, request
from loguru import logger
from typing_extensions import Annotated
from tools.schema import (
ServeASRRequest,
ServeASRResponse,
ServeChatRequest,
ServeTTSRequest,
ServeVQGANDecodeRequest,
ServeVQGANDecodeResponse,
ServeVQGANEncodeRequest,
ServeVQGANEncodeResponse,
)
from tools.server.agent import get_response_generator
from tools.server.api_utils import (
buffer_to_async_generator,
get_content_type,
inference_async,
)
from tools.server.inference import inference_wrapper as inference
from tools.server.model_manager import ModelManager
from tools.server.model_utils import batch_asr, cached_vqgan_batch_encode, vqgan_decode
MAX_NUM_SAMPLES = int(os.getenv("NUM_SAMPLES", 1))
routes = Routes()
@routes.http.post("/v1/health")
async def health():
return JSONResponse({"status": "ok"})
@routes.http.post("/v1/vqgan/encode")
async def vqgan_encode(req: Annotated[ServeVQGANEncodeRequest, Body(exclusive=True)]):
# Get the model from the app
model_manager: ModelManager = request.app.state.model_manager
decoder_model = model_manager.decoder_model
# Encode the audio
start_time = time.time()
tokens = cached_vqgan_batch_encode(decoder_model, req.audios)
logger.info(f"[EXEC] VQGAN encode time: {(time.time() - start_time) * 1000:.2f}ms")
# Return the response
return ormsgpack.packb(
ServeVQGANEncodeResponse(tokens=[i.tolist() for i in tokens]),
option=ormsgpack.OPT_SERIALIZE_PYDANTIC,
)
@routes.http.post("/v1/vqgan/decode")
async def vqgan_decode(req: Annotated[ServeVQGANDecodeRequest, Body(exclusive=True)]):
# Get the model from the app
model_manager: ModelManager = request.app.state.model_manager
decoder_model = model_manager.decoder_model
# Decode the audio
tokens = [torch.tensor(token, dtype=torch.int) for token in req.tokens]
start_time = time.time()
audios = vqgan_decode(decoder_model, tokens)
logger.info(f"[EXEC] VQGAN decode time: {(time.time() - start_time) * 1000:.2f}ms")
audios = [audio.astype(np.float16).tobytes() for audio in audios]
# Return the response
return ormsgpack.packb(
ServeVQGANDecodeResponse(audios=audios),
option=ormsgpack.OPT_SERIALIZE_PYDANTIC,
)
@routes.http.post("/v1/asr")
async def asr(req: Annotated[ServeASRRequest, Body(exclusive=True)]):
# Get the model from the app
model_manager: ModelManager = request.app.state.model_manager
asr_model = model_manager.asr_model
lock = request.app.state.lock
# Perform ASR
start_time = time.time()
audios = [np.frombuffer(audio, dtype=np.float16) for audio in req.audios]
audios = [torch.from_numpy(audio).float() for audio in audios]
if any(audios.shape[-1] >= 30 * req.sample_rate for audios in audios):
raise HTTPException(status_code=400, content="Audio length is too long")
transcriptions = batch_asr(
asr_model, lock, audios=audios, sr=req.sample_rate, language=req.language
)
logger.info(f"[EXEC] ASR time: {(time.time() - start_time) * 1000:.2f}ms")
# Return the response
return ormsgpack.packb(
ServeASRResponse(transcriptions=transcriptions),
option=ormsgpack.OPT_SERIALIZE_PYDANTIC,
)
@routes.http.post("/v1/tts")
async def tts(req: Annotated[ServeTTSRequest, Body(exclusive=True)]):
# Get the model from the app
app_state = request.app.state
model_manager: ModelManager = app_state.model_manager
engine = model_manager.tts_inference_engine
sample_rate = engine.decoder_model.spec_transform.sample_rate
# Check if the text is too long
if app_state.max_text_length > 0 and len(req.text) > app_state.max_text_length:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
content=f"Text is too long, max length is {app_state.max_text_length}",
)
# Check if streaming is enabled
if req.streaming and req.format != "wav":
raise HTTPException(
HTTPStatus.BAD_REQUEST,
content="Streaming only supports WAV format",
)
# Perform TTS
if req.streaming:
return StreamResponse(
iterable=inference_async(req, engine),
headers={
"Content-Disposition": f"attachment; filename=audio.{req.format}",
},
content_type=get_content_type(req.format),
)
else:
fake_audios = next(inference(req, engine))
buffer = io.BytesIO()
sf.write(
buffer,
fake_audios,
sample_rate,
format=req.format,
)
return StreamResponse(
iterable=buffer_to_async_generator(buffer.getvalue()),
headers={
"Content-Disposition": f"attachment; filename=audio.{req.format}",
},
content_type=get_content_type(req.format),
)
@routes.http.post("/v1/chat")
async def chat(req: Annotated[ServeChatRequest, Body(exclusive=True)]):
# Check that the number of samples requested is correct
if req.num_samples < 1 or req.num_samples > MAX_NUM_SAMPLES:
raise HTTPException(
HTTPStatus.BAD_REQUEST,
content=f"Number of samples must be between 1 and {MAX_NUM_SAMPLES}",
)
# Get the type of content provided
content_type = request.headers.get("Content-Type", "application/json")
json_mode = "application/json" in content_type
# Get the models from the app
model_manager: ModelManager = request.app.state.model_manager
llama_queue = model_manager.llama_queue
tokenizer = model_manager.tokenizer
config = model_manager.config
device = request.app.state.device
# Get the response generators
response_generator = get_response_generator(
llama_queue, tokenizer, config, req, device, json_mode
)
# Return the response in the correct format
if req.streaming is False:
result = response_generator()
if json_mode:
return JSONResponse(result.model_dump())
else:
return ormsgpack.packb(result, option=ormsgpack.OPT_SERIALIZE_PYDANTIC)
return StreamResponse(
iterable=response_generator(), content_type="text/event-stream"
)