import time | |
from ultralytics import YOLO | |
from base64 import b64encode | |
from speech_recognition import AudioFile, Recognizer | |
import numpy as np | |
from scipy.spatial import distance as dist | |
from typing import Union, Optional | |
from fastapi import FastAPI, File, UploadFile, Form | |
from fastapi.responses import StreamingResponse | |
from fastapi.middleware.gzip import GZipMiddleware | |
from utils import tts, read_image_file, pil_to_base64, base64_to_pil, get_hist | |
from huggingface_hub import hf_hub_download | |
from io import BytesIO | |
import zipfile | |
model_path = hf_hub_download(repo_id="ultralyticsplus/yolov8s", filename='') | |
model = YOLO(model_path) | |
CLASS = model.model.names | |
default_bot_voice = "γγ―γγγγγγγΎγ" | |
area_threshold = 0.3 | |
ZIP = False | |
app = FastAPI() | |
app.add_middleware(GZipMiddleware, minimum_size=1000) | |
def read_root(): | |
return {"Message": "Application startup complete"} | |
async def predict_api( | |
file: UploadFile = File(...), | |
# last_seen: Union[UploadFile, None] = File(None), | |
last_seen: Optional[str] = Form(None), | |
): | |
# parameters | |
total_time = time.time() | |
start_time = time.time() | |
most_close = 0 | |
out_img = None | |
diff_value = 0.5 | |
# read image and predict | |
image = read_image_file(await | |
results = model.predict(image, show=False)[0] | |
masks, boxes = results.masks, results.boxes | |
area_image = image.width * image.height | |
# select and crop face image | |
if boxes is not None: | |
for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls): | |
if int(cls) != 0: | |
continue | |
box = xyxy.tolist() | |
area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image | |
if area_rate >= most_close: | |
out_img = image.crop(tuple(box)).resize((64, 64)) | |
most_close = area_rate | |
print("Get face time", time.time() - start_time) | |
# check with previous image if have | |
start_time = time.time() | |
if last_seen is not None: | |
if type(last_seen) == str: | |
last_seen = base64_to_pil(last_seen) | |
else: | |
last_seen = read_image_file(await | |
if out_img is not None: | |
diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen)) | |
print("Hist time", time.time() - start_time) | |
# return results | |
start_time = time.time() | |
print(f"Distance: {most_close}. Different value: {diff_value}") | |
if most_close >= area_threshold and diff_value >= 0.5: | |
if ZIP: | |
voice_bot_path = tts(default_bot_voice, language="ja") | |
image_bot_path = pil_to_base64(out_img) | |
print("Voice time", time.time() - start_time) | |
io = BytesIO() | |
zip_filename = "" | |
with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: | |
for file_path in [voice_bot_path, image_bot_path]: | |
zf.write(file_path) | |
zf.close() | |
print("Total time", time.time() - total_time) | |
return StreamingResponse( | |
iter([io.getvalue()]), | |
media_type="application/x-zip-compressed", | |
headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename} | |
) | |
else: | |
voice_bot_path = tts(default_bot_voice, language="ja", encode=True) | |
image_bot_path = pil_to_base64(out_img, encode=True) | |
print("Voice time", time.time() - start_time) | |
print("Total time", time.time() - total_time) | |
return { | |
"voice": voice_bot_path, | |
"image": image_bot_path | |
} | |
else: | |
return {"message": "No face detected"} |