aisatsu-api / main.py
vumichien's picture
Update main.py
0f2bf45
raw
history blame
3.8 kB
import time
from ultralytics import YOLO
from base64 import b64encode
from speech_recognition import AudioFile, Recognizer
import numpy as np
from scipy.spatial import distance as dist
from typing import Union, Optional
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import StreamingResponse
from fastapi.middleware.gzip import GZipMiddleware
from utils import tts, read_image_file, pil_to_base64, base64_to_pil, get_hist
from huggingface_hub import hf_hub_download
from io import BytesIO
import zipfile
model_path = hf_hub_download(repo_id="ultralyticsplus/yolov8s", filename='yolov8s.pt')
model = YOLO(model_path)
CLASS = model.model.names
default_bot_voice = "γŠγ―γ„γ‚ˆγ†γ”γ–γ„γΎγ™"
area_threshold = 0.3
ZIP = False
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.get("/")
def read_root():
return {"Message": "Application startup complete"}
@app.post("/human_detect/")
async def predict_api(
file: UploadFile = File(...),
# last_seen: Union[UploadFile, None] = File(None),
last_seen: Optional[str] = Form(None),
):
# parameters
total_time = time.time()
start_time = time.time()
most_close = 0
out_img = None
diff_value = 0.5
# read image and predict
image = read_image_file(await file.read())
results = model.predict(image, show=False)[0]
masks, boxes = results.masks, results.boxes
area_image = image.width * image.height
# select and crop face image
if boxes is not None:
for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls):
if int(cls) != 0:
continue
box = xyxy.tolist()
area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image
if area_rate >= most_close:
out_img = image.crop(tuple(box)).resize((64, 64))
most_close = area_rate
print("Get face time", time.time() - start_time)
# check with previous image if have
start_time = time.time()
if last_seen is not None:
if type(last_seen) == str:
last_seen = base64_to_pil(last_seen)
else:
last_seen = read_image_file(await last_seen.read())
if out_img is not None:
diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen))
print("Hist time", time.time() - start_time)
# return results
start_time = time.time()
print(f"Distance: {most_close}. Different value: {diff_value}")
if most_close >= area_threshold and diff_value >= 0.5:
if ZIP:
voice_bot_path = tts(default_bot_voice, language="ja")
image_bot_path = pil_to_base64(out_img)
print("Voice time", time.time() - start_time)
io = BytesIO()
zip_filename = "final_archive.zip"
with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
for file_path in [voice_bot_path, image_bot_path]:
zf.write(file_path)
zf.close()
print("Total time", time.time() - total_time)
return StreamingResponse(
iter([io.getvalue()]),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename}
)
else:
voice_bot_path = tts(default_bot_voice, language="ja", encode=True)
image_bot_path = pil_to_base64(out_img, encode=True)
print("Voice time", time.time() - start_time)
print("Total time", time.time() - total_time)
return {
"voice": voice_bot_path,
"image": image_bot_path
}
else:
return {"message": "No face detected"}