aisatsu-api / main.py
vumichien's picture
Update main.py
d377dff
raw
history blame
3.84 kB
import time
from ultralytics import YOLO
from base64 import b64encode
from speech_recognition import AudioFile, Recognizer
import numpy as np
from scipy.spatial import distance as dist
from typing import Union, Optional
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import StreamingResponse
from fastapi.middleware.gzip import GZipMiddleware
from utils import tts, read_image_file, pil_to_base64, base64_to_pil, get_hist
from typing import Optional
from huggingface_hub import hf_hub_download
from io import BytesIO
import zipfile
model_path = hf_hub_download(repo_id="ultralyticsplus/yolov8s", filename='yolov8s.pt')
model = YOLO(model_path)
CLASS = model.model.names
defaul_bot_voice = "γŠγ―γ„γ‚ˆγ†γ”γ–γ„γΎγ™"
area_thres = 0.3
ZIP = False
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.get("/")
def read_root():
return {"Message": "Application startup complete"}
@app.post("/aisatsu_api/")
async def predict_api(
file: UploadFile = File(...),
# last_seen: Union[UploadFile, None] = File(None),
last_seen: Optional[str] = Form(None),
):
total_time = time.time()
start_time = time.time()
image = read_image_file(await file.read())
print("Read image", time.time() - start_time)
start_time = time.time()
results = model.predict(image, show=False)[0]
print("Model predict", time.time() - start_time)
masks, boxes = results.masks, results.boxes
area_image = image.width * image.height
most_close = 0
out_img = None
diff_value = 0.5
start_time = time.time()
if boxes is not None:
for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls):
if int(cls) != 0:
continue
box = xyxy.tolist()
area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image
if area_rate >= most_close:
out_img = image.crop(tuple(box)).resize((64, 64))
most_close = area_rate
print("Get face time", time.time() - start_time)
start_time = time.time()
if last_seen is not None:
if type(last_seen) == str:
last_seen = base64_to_pil(last_seen)
else:
last_seen = read_image_file(await last_seen.read())
if out_img is not None:
diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen))
print("Hist time", time.time() - start_time)
start_time = time.time()
print(f"Distance: {most_close}. Different value: {diff_value}")
if most_close >= area_thres and diff_value >= 0.5:
if ZIP:
voice_bot_path = tts(defaul_bot_voice, language="ja")
image_bot_path = pil_to_base64(out_img)
print("Voice time", time.time() - start_time)
io = BytesIO()
zip_filename = "final_archive.zip"
with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
for file_path in [voice_bot_path, image_bot_path]:
zf.write(file_path)
zf.close()
print("Total time", time.time() - total_time)
return StreamingResponse(
iter([io.getvalue()]),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename}
)
else:
voice_bot_path = tts(default_bot_voice, language="ja", encode=True)
image_bot_path = pil_to_base64(out_img, encode=True)
print("Voice time", time.time() - start_time)
print("Total time", time.time() - total_time)
return {
"voice": voice_bot_path,
"image": image_bot_path
}
else:
return {"message": "No face detected"}