Spaces:
Sleeping
Sleeping
File size: 7,365 Bytes
1352988 662a6a2 b96d288 06bad2e 1352988 d7328ff 26946a6 d377dff 93b2c8a 1352988 93b2c8a 1352988 cff8ca1 93b2c8a 1352988 cf598a0 1352988 3fae5f0 1352988 d377dff 08ce8d2 d377dff 08ce8d2 3fae5f0 6a076a8 b7f8699 5138fea 1352988 5138fea 1352988 c1ec7d2 5138fea 520369d 7c36af7 520369d 5138fea 3fae5f0 520369d 5138fea 1352988 c1ec7d2 1352988 520369d 5138fea d7328ff 1352988 3fae5f0 d7328ff 1352988 d7328ff c1ec7d2 0bbdba4 c1ec7d2 d7328ff c1ec7d2 d7328ff 1352988 c1ec7d2 1352988 c1ec7d2 1352988 c1ec7d2 1352988 c1ec7d2 1352988 c1ec7d2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
from ultralyticsplus import YOLO
from typing import Optional, Union, Annotated
from scipy.spatial import distance as dist
import time
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import StreamingResponse
from fastapi.middleware.gzip import GZipMiddleware
from io import BytesIO
from utils import tts, stt, read_image_file, pil_to_base64, base64_to_pil, get_hist, ffmpeg_read
import zipfile
import soundfile as sf
import openai
import os
# Config for camera picture
model = YOLO('ultralyticsplus/yolov8s')
CLASS = model.model.names
ZIP = False
default_bot_voice = "おはいようございます"
area_threshold = 0.3
# Config for human input
prompt_template = "私はあなたに、Detomo社が作ったロボットのように振る舞ってほしいです。あなたの名前はアイサツです。"\
"あなたのミッションは、子供たちが他の子供たちに挨拶する自信を持ち、幸せになることを助けることです。"\
"質問には簡単な方法でしか答えないようにし、明示的に要求されない限り、追加情報を提供しないでください。"
system_prompt = [{"role": "system", "content": prompt_template}]
openai.api_key = os.environ["OPENAI_API_KEY"]
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
@app.get("/")
def read_root():
return {"Message": "Application startup complete"}
@app.get("/client_settings/")
def client_settings_api():
return {"camera_picture_period": 5}
@app.post("/camera_picture/")
async def camera_picture_api(
file: UploadFile = File(...),
last_seen: Optional[Union[str, UploadFile]] = Form(None),
return_voice: Annotated[bool, Form()] = True,
):
# parameters
total_time = time.time()
most_close = 0
out_img = None
diff_value = 0.5
# read image and predict
image = read_image_file(await file.read())
results = model.predict(image, show=False)[0]
masks, boxes = results.masks, results.boxes
area_image = image.width * image.height
# select and crop face image
if boxes is not None:
for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls):
if int(cls) != 0:
continue
box = xyxy.tolist()
area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image
if area_rate >= most_close:
out_img = image.crop(tuple(box)).resize((64, 64))
most_close = area_rate
# check detect people or not
if out_img is None:
if return_voice:
return {
"status": "No face detected",
"text": None,
"voice": None,
"image": None
}
else:
return {
"status": "No face detected",
"image": None
}
else:
if ZIP:
image_bot_path = pil_to_base64(out_img, encode=False)
else:
image_bot_path = pil_to_base64(out_img, encode=True)
# check with previous image if have
if last_seen is not None:
if type(last_seen) == str:
last_seen = base64_to_pil(last_seen)
else:
last_seen = read_image_file(await last_seen.read())
diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen))
print(f"Distance: {most_close}. Different value: {diff_value}")
# return results
if most_close >= area_threshold and diff_value >= 0.5:
if ZIP:
voice_bot_path = tts(default_bot_voice, language="ja", encode=False)
io = BytesIO()
zip_filename = "final_archive.zip"
with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
for file_path in [voice_bot_path, image_bot_path]:
zf.write(file_path)
zf.close()
print("Total time", time.time() - total_time)
return StreamingResponse(
iter([io.getvalue()]),
media_type="application/x-zip-compressed",
headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename}
)
else:
if return_voice:
print("Total time", time.time() - total_time)
return {
"status": "New people",
"text": default_bot_voice,
"voice": tts(default_bot_voice, language="ja", encode=True),
"image": image_bot_path
}
else:
print("Total time", time.time() - total_time)
return {
"status": "New people",
"image": image_bot_path
}
elif most_close < area_threshold:
if return_voice:
print("Total time", time.time() - total_time)
return {
"status": "People far from camera",
"text": None,
"voice": None,
"image": image_bot_path,
}
else:
print("Total time", time.time() - total_time)
return {
"status": "People far from camera",
"image": image_bot_path,
}
else:
if return_voice:
print("Total time", time.time() - total_time)
return {
"status": "Old people",
"text": None,
"voice": None,
"image": image_bot_path,
}
else:
print("Total time", time.time() - total_time)
return {
"status": "Old people",
"image": image_bot_path,
}
@app.post("/human_input/")
async def human_input_api(
voice_input: bytes = File(None),
text_input: str = Form(None),
temperature: Annotated[float, Form()] = 0.7,
max_tokens: Annotated[int, Form()] = 100,
return_voice: Annotated[bool, Form()] = False,
):
if text_input:
text = text_input
elif text_input is None and voice_input is not None:
upload_audio = ffmpeg_read(voice_input, sampling_rate=24000)
sf.write('temp.wav', upload_audio, 24000, subtype='PCM_16')
text = stt('temp.wav')
print(text)
else:
if return_voice:
return {
"human_text": None,
"robot_text": None,
"robot_voice": None
}
else:
return {
"human_text": None,
"robot_text": None,
}
prompt_msg = {"role": "user", "content": text}
messages = system_prompt + [prompt_msg]
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=messages, temperature=temperature,
max_tokens=max_tokens)
print(completion['usage']['total_tokens'])
if return_voice:
return {
"human_text": text,
"robot_text": completion.choices[0].message.content,
"robot_voice": tts(completion.choices[0].message.content, language="ja", encode=True)
}
else:
return {
"human_text": text,
"robot_text": completion.choices[0].message.content,
} |