File size: 3,804 Bytes
b96d288
 
f840717
06bad2e
 
 
 
d7328ff
b7f8699
d7328ff
26946a6
d377dff
26946a6
d7328ff
cf598a0
b96d288
93b2c8a
 
 
cf598a0
 
b96d288
cf598a0
3fae5f0
 
d7328ff
d377dff
08ce8d2
d377dff
08ce8d2
3fae5f0
6a076a8
b7f8699
5138fea
 
 
0f2bf45
5138fea
 
d7328ff
 
5138fea
520369d
7c36af7
b96d288
520369d
 
 
 
 
5138fea
 
 
 
3fae5f0
520369d
5138fea
 
 
 
 
 
 
 
 
7c36af7
520369d
 
f8685aa
5138fea
d7328ff
 
 
 
5138fea
 
f8685aa
3fae5f0
 
f8685aa
 
3fae5f0
d7328ff
3fae5f0
d377dff
 
d7328ff
 
 
 
 
 
 
 
 
 
 
 
 
d377dff
 
 
d7328ff
 
 
 
 
ca0120c
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import time

from ultralytics import YOLO
from base64 import b64encode
from speech_recognition import AudioFile, Recognizer
import numpy as np
from scipy.spatial import distance as dist
from typing import Union, Optional

from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import StreamingResponse
from fastapi.middleware.gzip import GZipMiddleware

from utils import tts, read_image_file, pil_to_base64, base64_to_pil, get_hist
from huggingface_hub import hf_hub_download

from io import BytesIO
import zipfile

model_path = hf_hub_download(repo_id="ultralyticsplus/yolov8s", filename='yolov8s.pt')
model = YOLO(model_path)

CLASS = model.model.names
default_bot_voice = "γŠγ―γ„γ‚ˆγ†γ”γ–γ„γΎγ™"
area_threshold = 0.3
ZIP = False

app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)


@app.get("/")
def read_root():
    return {"Message": "Application startup complete"}


@app.post("/human_detect/")
async def predict_api(
        file: UploadFile = File(...),
        # last_seen: Union[UploadFile, None] = File(None),
        last_seen: Optional[str] = Form(None),
):
    # parameters
    total_time = time.time()
    start_time = time.time()
    most_close = 0
    out_img = None
    diff_value = 0.5

    # read image and predict
    image = read_image_file(await file.read())
    results = model.predict(image, show=False)[0]
    masks, boxes = results.masks, results.boxes
    area_image = image.width * image.height

    # select and crop face image
    if boxes is not None:
        for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls):
            if int(cls) != 0:
                continue
            box = xyxy.tolist()
            area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image
            if area_rate >= most_close:
                out_img = image.crop(tuple(box)).resize((64, 64))
                most_close = area_rate
    print("Get face time", time.time() - start_time)

    # check with previous image if have
    start_time = time.time()
    if last_seen is not None:
        if type(last_seen) == str:
            last_seen = base64_to_pil(last_seen)
        else:
            last_seen = read_image_file(await last_seen.read())
        if out_img is not None:
            diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen))
    print("Hist time", time.time() - start_time)
    
    # return results
    start_time = time.time()
    print(f"Distance: {most_close}. Different value: {diff_value}")
    if most_close >= area_threshold and diff_value >= 0.5:
        if ZIP:
            voice_bot_path = tts(default_bot_voice, language="ja")
            image_bot_path = pil_to_base64(out_img)
            print("Voice time", time.time() - start_time)
            io = BytesIO()
            zip_filename = "final_archive.zip"
            with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
                for file_path in [voice_bot_path, image_bot_path]:
                    zf.write(file_path)
                zf.close()
            print("Total time", time.time() - total_time)
            return StreamingResponse(
                iter([io.getvalue()]),
                media_type="application/x-zip-compressed",
                headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename}
            )
        else:
            voice_bot_path = tts(default_bot_voice, language="ja", encode=True)
            image_bot_path = pil_to_base64(out_img, encode=True)
            print("Voice time", time.time() - start_time)
            print("Total time", time.time() - total_time)
            return {
                "voice": voice_bot_path,
                "image": image_bot_path
            }
    else:
        return {"message": "No face detected"}