import time from ultralytics import YOLO from base64 import b64encode from speech_recognition import AudioFile, Recognizer import numpy as np from scipy.spatial import distance as dist from typing import Union, Optional from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import StreamingResponse from fastapi.middleware.gzip import GZipMiddleware from utils import tts, read_image_file, pil_to_base64, base64_to_pil, get_hist from typing import Optional from huggingface_hub import hf_hub_download from io import BytesIO import zipfile model_path = hf_hub_download(repo_id="ultralyticsplus/yolov8s", filename='yolov8s.pt') model = YOLO(model_path) CLASS = model.model.names defaul_bot_voice = "おはいようございます" area_thres = 0.3 ZIP = False app = FastAPI() app.add_middleware(GZipMiddleware, minimum_size=1000) @app.get("/") def read_root(): return {"Message": "Application startup complete"} @app.post("/aisatsu_api/") async def predict_api( file: UploadFile = File(...), # last_seen: Union[UploadFile, None] = File(None), last_seen: Optional[str] = Form(None), ): total_time = time.time() start_time = time.time() image = read_image_file(await file.read()) print("Read image", time.time() - start_time) start_time = time.time() results = model.predict(image, show=False)[0] print("Model predict", time.time() - start_time) masks, boxes = results.masks, results.boxes area_image = image.width * image.height most_close = 0 out_img = None diff_value = 0.5 start_time = time.time() if boxes is not None: for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls): if int(cls) != 0: continue box = xyxy.tolist() area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image if area_rate >= most_close: out_img = image.crop(tuple(box)).resize((64, 64)) most_close = area_rate print("Get face time", time.time() - start_time) start_time = time.time() if last_seen is not None: if type(last_seen) == str: last_seen = base64_to_pil(last_seen) else: last_seen = read_image_file(await last_seen.read()) if out_img is not None: diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen)) print("Hist time", time.time() - start_time) start_time = time.time() print(f"Distance: {most_close}. Different value: {diff_value}") if most_close >= area_thres and diff_value >= 0.5: if ZIP: voice_bot_path = tts(defaul_bot_voice, language="ja") image_bot_path = pil_to_base64(out_img) print("Voice time", time.time() - start_time) io = BytesIO() zip_filename = "final_archive.zip" with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: for file_path in [voice_bot_path, image_bot_path]: zf.write(file_path) zf.close() print("Total time", time.time() - total_time) return StreamingResponse( iter([io.getvalue()]), media_type="application/x-zip-compressed", headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename} ) else: voice_bot_path = tts(default_bot_voice, language="ja", encode=True) image_bot_path = pil_to_base64(out_img, encode=True) print("Voice time", time.time() - start_time) print("Total time", time.time() - total_time) return { "voice": voice_bot_path, "image": image_bot_path } else: return {"message": "No face detected"}