Spaces:
				
			
			
	
			
			
		Sleeping
		
	
	
	
			
			
	
	
	
	
		
		
		Sleeping
		
	| import time | |
| from ultralytics import YOLO | |
| from base64 import b64encode | |
| from speech_recognition import AudioFile, Recognizer | |
| import numpy as np | |
| from scipy.spatial import distance as dist | |
| from typing import Union, Optional | |
| from fastapi import FastAPI, File, UploadFile, Form | |
| from fastapi.responses import StreamingResponse | |
| from fastapi.middleware.gzip import GZipMiddleware | |
| from utils import tts, read_image_file, pil_to_base64, base64_to_pil, get_hist | |
| from huggingface_hub import hf_hub_download | |
| from io import BytesIO | |
| import zipfile | |
| model_path = hf_hub_download(repo_id="ultralyticsplus/yolov8s", filename='yolov8s.pt') | |
| model = YOLO(model_path) | |
| CLASS = model.model.names | |
| default_bot_voice = "γγ―γγγγγγγΎγ" | |
| area_threshold = 0.3 | |
| ZIP = False | |
| app = FastAPI() | |
| app.add_middleware(GZipMiddleware, minimum_size=1000) | |
| def read_root(): | |
| return {"Message": "Application startup complete"} | |
| async def predict_api( | |
| file: UploadFile = File(...), | |
| # last_seen: Union[UploadFile, None] = File(None), | |
| last_seen: Optional[str] = Form(None), | |
| ): | |
| # parameters | |
| total_time = time.time() | |
| start_time = time.time() | |
| most_close = 0 | |
| out_img = None | |
| diff_value = 0.5 | |
| # read image and predict | |
| image = read_image_file(await file.read()) | |
| results = model.predict(image, show=False)[0] | |
| masks, boxes = results.masks, results.boxes | |
| area_image = image.width * image.height | |
| # select and crop face image | |
| if boxes is not None: | |
| for xyxy, conf, cls in zip(boxes.xyxy, boxes.conf, boxes.cls): | |
| if int(cls) != 0: | |
| continue | |
| box = xyxy.tolist() | |
| area_rate = (box[2] - box[0]) * (box[3] - box[1]) / area_image | |
| if area_rate >= most_close: | |
| out_img = image.crop(tuple(box)).resize((64, 64)) | |
| most_close = area_rate | |
| print("Get face time", time.time() - start_time) | |
| # check with previous image if have | |
| start_time = time.time() | |
| if last_seen is not None: | |
| if type(last_seen) == str: | |
| last_seen = base64_to_pil(last_seen) | |
| else: | |
| last_seen = read_image_file(await last_seen.read()) | |
| if out_img is not None: | |
| diff_value = dist.euclidean(get_hist(out_img), get_hist(last_seen)) | |
| print("Hist time", time.time() - start_time) | |
| # return results | |
| start_time = time.time() | |
| print(f"Distance: {most_close}. Different value: {diff_value}") | |
| if most_close >= area_threshold and diff_value >= 0.5: | |
| if ZIP: | |
| voice_bot_path = tts(default_bot_voice, language="ja") | |
| image_bot_path = pil_to_base64(out_img) | |
| print("Voice time", time.time() - start_time) | |
| io = BytesIO() | |
| zip_filename = "final_archive.zip" | |
| with zipfile.ZipFile(io, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: | |
| for file_path in [voice_bot_path, image_bot_path]: | |
| zf.write(file_path) | |
| zf.close() | |
| print("Total time", time.time() - total_time) | |
| return StreamingResponse( | |
| iter([io.getvalue()]), | |
| media_type="application/x-zip-compressed", | |
| headers={"Content-Disposition": f"attachment;filename=%s" % zip_filename} | |
| ) | |
| else: | |
| voice_bot_path = tts(default_bot_voice, language="ja", encode=True) | |
| image_bot_path = pil_to_base64(out_img, encode=True) | |
| print("Voice time", time.time() - start_time) | |
| print("Total time", time.time() - total_time) | |
| return { | |
| "voice": voice_bot_path, | |
| "image": image_bot_path | |
| } | |
| else: | |
| return {"message": "No face detected"} | 

