File size: 3,844 Bytes
b96d288
 
f840717
06bad2e
 
 
 
d7328ff
b7f8699
d7328ff
26946a6
d377dff
26946a6
d7328ff
06bad2e
cf598a0
b96d288
06bad2e
93b2c8a
 
 
cf598a0
 
b96d288
cf598a0
 
 
d7328ff
d377dff
08ce8d2
d377dff
08ce8d2
6a076a8
b7f8699
5138fea
 
 
 
 
 
d7328ff
 
5138fea
7c36af7
b96d288
5138fea
b96d288
7c36af7
5138fea
7c36af7
5138fea
 
 
 
 
f8685aa
5138fea
 
 
 
 
 
 
 
 
7c36af7
d377dff
f8685aa
5138fea
d7328ff
 
 
 
5138fea
 
f8685aa
d377dff
f8685aa
 
5138fea
d7328ff
d377dff
 
 
d7328ff
 
 
 
 
 
 
 
 
 
 
 
 
d377dff
 
 
d7328ff
 
 
 
 
ca0120c
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import time

from ultralytics import YOLO
from base64 import b64encode
from speech_recognition import AudioFile, Recognizer
import numpy as np
from scipy.spatial import distance as dist
from typing import Union, Optional

from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import StreamingResponse
from fastapi.middleware.gzip import GZipMiddleware

from utils import tts, read_image_file, pil_to_base64, base64_to_pil, get_hist
from typing import Optional
from huggingface_hub import hf_hub_download


from io import BytesIO
import zipfile

model_path = hf_hub_download(repo_id="ultralyticsplus/yolov8s", filename='yolov8s.pt')
model = YOLO(model_path)

CLASS = model.model.names
defaul_bot_voice = "γŠγ―γ„γ‚ˆγ†γ”γ–γ„γΎγ™"
area_thres = 0.3
ZIP = False

app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)

@app.get("/")
def read_root():
    return {"Message": "Application startup complete"}


@app.post("/aisatsu_api/")
async def predict_api(
        file: UploadFile = File(...),
        # last_seen: Union[UploadFile, None] = File(None),
        last_seen: Optional[str] = Form(None),
):
    total_time = time.time()
    start_time = time.time()
    image = read_image_file(await file.read())
    print("Read image", time.time() - start_time)
    start_time = time.time()
    results = model.predict(image, show=False)[0]
    print("Model predict", time.time() - start_time)
    masks, boxes = results.masks, results.boxes
    area_image = image.width * image.height
    most_close = 0
    out_img = None
    diff_value = 0.5
    start_time = time.time()
    if boxes is not None:
        for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls):
            if int(cls) != 0:
                continue
            box = xyxy.tolist()
            area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image
            if area_rate >= most_close:
                out_img = image.crop(tuple(box)).resize((64, 64))
                most_close = area_rate
    print("Get face time", time.time() - start_time)
    
    start_time = time.time()
    if last_seen is not None:
        if type(last_seen) == str:
            last_seen = base64_to_pil(last_seen)
        else:
            last_seen = read_image_file(await last_seen.read())
        if out_img is not None:
            diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen))
    print("Hist time", time.time() - start_time)
    
    start_time = time.time()
    print(f"Distance: {most_close}. Different value: {diff_value}")
    if most_close >= area_thres and diff_value >= 0.5:
        if ZIP:
            voice_bot_path = tts(defaul_bot_voice, language="ja")
            image_bot_path = pil_to_base64(out_img)
            print("Voice time", time.time() - start_time)
            io = BytesIO()
            zip_filename = "final_archive.zip"
            with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
                for file_path in [voice_bot_path, image_bot_path]:
                    zf.write(file_path)
                zf.close()
            print("Total time", time.time() - total_time)
            return StreamingResponse(
                iter([io.getvalue()]),
                media_type="application/x-zip-compressed",
                headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename}
            )
        else:
            voice_bot_path = tts(default_bot_voice, language="ja", encode=True)
            image_bot_path = pil_to_base64(out_img, encode=True)
            print("Voice time", time.time() - start_time)
            print("Total time", time.time() - total_time)
            return {
                "voice": voice_bot_path,
                "image": image_bot_path
            }
    else:
        return {"message": "No face detected"}