import time from ultralytics import YOLO from base64 import b64encode from speech_recognition import AudioFile, Recognizer import numpy as np from scipy.spatial import distance as dist from typing import Union, Optional from fastapi import FastAPI, File, UploadFile, Form from fastapi.responses import StreamingResponse from fastapi.middleware.gzip import GZipMiddleware from utils import tts, read_image_file, pil_to_base64, base64_to_pil, get_hist from huggingface_hub import hf_hub_download from io import BytesIO import zipfile model_path = hf_hub_download(repo_id="ultralyticsplus/yolov8s", filename='yolov8s.pt') model = YOLO(model_path) CLASS = model.model.names default_bot_voice = "おはいようございます" area_threshold = 0.3 ZIP = False app = FastAPI() app.add_middleware(GZipMiddleware, minimum_size=1000) @app.get("/") def read_root(): return {"Message": "Application startup complete"} @app.post("/human_detect/") async def predict_api( file: UploadFile = File(...), # last_seen: Union[UploadFile, None] = File(None), last_seen: Optional[str] = Form(None), ): # parameters total_time = time.time() start_time = time.time() most_close = 0 out_img = None diff_value = 0.5 # read image and predict image = read_image_file(await file.read()) results = model.predict(image, show=False)[0] masks, boxes = results.masks, results.boxes area_image = image.width * image.height # select and crop face image if boxes is not None: for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls): if int(cls) != 0: continue box = xyxy.tolist() area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image if area_rate >= most_close: out_img = image.crop(tuple(box)).resize((64, 64)) most_close = area_rate print("Get face time", time.time() - start_time) # check with previous image if have start_time = time.time() if last_seen is not None: if type(last_seen) == str: last_seen = base64_to_pil(last_seen) else: last_seen = read_image_file(await last_seen.read()) if out_img is not None: diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen)) print("Hist time", time.time() - start_time) # return results start_time = time.time() print(f"Distance: {most_close}. Different value: {diff_value}") if most_close >= area_threshold and diff_value >= 0.5: if ZIP: voice_bot_path = tts(default_bot_voice, language="ja") image_bot_path = pil_to_base64(out_img) print("Voice time", time.time() - start_time) io = BytesIO() zip_filename = "final_archive.zip" with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: for file_path in [voice_bot_path, image_bot_path]: zf.write(file_path) zf.close() print("Total time", time.time() - total_time) return StreamingResponse( iter([io.getvalue()]), media_type="application/x-zip-compressed", headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename} ) else: voice_bot_path = tts(default_bot_voice, language="ja", encode=True) image_bot_path = pil_to_base64(out_img, encode=True) print("Voice time", time.time() - start_time) print("Total time", time.time() - total_time) return { "voice": voice_bot_path, "image": image_bot_path } else: return {"message": "No face detected"}