Spaces:
Runtime error
Runtime error
| # | |
| # Toyota Motor Europe NV/SA and its affiliated companies retain all intellectual | |
| # property and proprietary rights in and to this software and related documentation. | |
| # Any commercial use, reproduction, disclosure or distribution of this software and | |
| # related documentation without an express license agreement from Toyota Motor Europe NV/SA | |
| # is strictly prohibited. | |
| # | |
| from vhap.util.log import get_logger | |
| from typing import Literal | |
| from tqdm import tqdm | |
| import face_alignment | |
| import numpy as np | |
| import matplotlib.path as mpltPath | |
| from fdlite import ( | |
| FaceDetection, | |
| FaceLandmark, | |
| face_detection_to_roi, | |
| IrisLandmark, | |
| iris_roi_from_face_landmarks, | |
| ) | |
| logger = get_logger(__name__) | |
| class LandmarkDetectorFA: | |
| IMAGE_FILE_NAME = "image_0000.png" | |
| LMK_FILE_NAME = "keypoints_static_0000.json" | |
| def __init__( | |
| self, | |
| face_detector:Literal["sfd", "blazeface"]="sfd", | |
| ): | |
| """ | |
| Creates dataset_path where all results are stored | |
| :param video_path: path to video file | |
| :param dataset_path: path to results directory | |
| """ | |
| logger.info("Initialize FaceAlignment module...") | |
| # 68 facial landmark detector | |
| self.fa = face_alignment.FaceAlignment( | |
| face_alignment.LandmarksType.TWO_HALF_D, | |
| face_detector=face_detector, | |
| flip_input=True, | |
| device="cuda" | |
| ) | |
| def detect_single_image(self, img): | |
| bbox = self.fa.face_detector.detect_from_image(img) | |
| if len(bbox) == 0: | |
| lmks = np.zeros([68, 3]) - 1 # set to -1 when landmarks is inavailable | |
| else: | |
| if len(bbox) > 1: | |
| # if multiple boxes detected, use the one with highest confidence | |
| bbox = [bbox[np.argmax(np.array(bbox)[:, -1])]] | |
| lmks = self.fa.get_landmarks_from_image(img, detected_faces=bbox)[0] | |
| lmks = np.concatenate([lmks, np.ones_like(lmks[:, :1])], axis=1) | |
| if (lmks[:, :2] == -1).sum() > 0: | |
| lmks[:, 2:] = 0.0 | |
| else: | |
| lmks[:, 2:] = 1.0 | |
| h, w = img.shape[:2] | |
| lmks[:, 0] /= w | |
| lmks[:, 1] /= h | |
| bbox[0][[0, 2]] /= w | |
| bbox[0][[1, 3]] /= h | |
| return bbox, lmks | |
| def detect_dataset(self, dataloader): | |
| """ | |
| Annotates each frame with 68 facial landmarks | |
| :return: dict mapping frame number to landmarks numpy array and the same thing for bboxes | |
| """ | |
| landmarks = {} | |
| bboxes = {} | |
| logger.info("Begin annotating landmarks...") | |
| for item in tqdm(dataloader): | |
| timestep_id = item["timestep_id"][0] | |
| camera_id = item["camera_id"][0] | |
| scale_factor = item["scale_factor"][0] | |
| logger.info( | |
| f"Annotate facial landmarks for timestep: {timestep_id}, camera: {camera_id}" | |
| ) | |
| img = item["rgb"][0].numpy() | |
| bbox, lmks = self.detect_single_image(img) | |
| if len(bbox) == 0: | |
| logger.error( | |
| f"No bbox found for frame: {timestep_id}, camera: {camera_id}. Setting landmarks to all -1." | |
| ) | |
| if camera_id not in landmarks: | |
| landmarks[camera_id] = {} | |
| if camera_id not in bboxes: | |
| bboxes[camera_id] = {} | |
| landmarks[camera_id][timestep_id] = lmks | |
| bboxes[camera_id][timestep_id] = bbox[0] if len(bbox) > 0 else np.zeros(5) - 1 | |
| return landmarks, bboxes | |
| def annotate_iris_landmarks(self, dataloader): | |
| """ | |
| Annotates each frame with 2 iris landmarks | |
| :return: dict mapping frame number to landmarks numpy array | |
| """ | |
| # iris detector | |
| detect_faces = FaceDetection() | |
| detect_face_landmarks = FaceLandmark() | |
| detect_iris_landmarks = IrisLandmark() | |
| landmarks = {} | |
| for item in tqdm(dataloader): | |
| timestep_id = item["timestep_id"][0] | |
| camera_id = item["camera_id"][0] | |
| scale_factor = item["scale_factor"][0] | |
| if timestep_id not in landmarks: | |
| landmarks[timestep_id] = {} | |
| logger.info( | |
| f"Annotate iris landmarks for timestep: {timestep_id}, camera: {camera_id}" | |
| ) | |
| img = item["rgb"][0].numpy() | |
| height, width = img.shape[:2] | |
| img_size = (width, height) | |
| face_detections = detect_faces(img) | |
| if len(face_detections) != 1: | |
| logger.error("Empty iris landmarks (type 1)") | |
| landmarks[timestep_id][camera_id] = None | |
| else: | |
| for face_detection in face_detections: | |
| try: | |
| face_roi = face_detection_to_roi(face_detection, img_size) | |
| except ValueError: | |
| logger.error("Empty iris landmarks (type 2)") | |
| landmarks[timestep_id][camera_id] = None | |
| break | |
| face_landmarks = detect_face_landmarks(img, face_roi) | |
| if len(face_landmarks) == 0: | |
| logger.error("Empty iris landmarks (type 3)") | |
| landmarks[timestep_id][camera_id] = None | |
| break | |
| iris_rois = iris_roi_from_face_landmarks(face_landmarks, img_size) | |
| if len(iris_rois) != 2: | |
| logger.error("Empty iris landmarks (type 4)") | |
| landmarks[timestep_id][camera_id] = None | |
| break | |
| lmks = [] | |
| for iris_roi in iris_rois[::-1]: | |
| try: | |
| iris_landmarks = detect_iris_landmarks(img, iris_roi).iris[ | |
| 0:1 | |
| ] | |
| except np.linalg.LinAlgError: | |
| logger.error("Failed to get iris landmarks") | |
| landmarks[timestep_id][camera_id] = None | |
| break | |
| for landmark in iris_landmarks: | |
| lmks.append([landmark.x * width, landmark.y * height, 1.0]) | |
| lmks = np.array(lmks, dtype=np.float32) | |
| h, w = img.shape[:2] | |
| lmks[:, 0] /= w | |
| lmks[:, 1] /= h | |
| landmarks[timestep_id][camera_id] = lmks | |
| return landmarks | |
| def iris_consistency(self, lm_iris, lm_eye): | |
| """ | |
| Checks if landmarks for eye and iris are consistent | |
| :param lm_iris: | |
| :param lm_eye: | |
| :return: | |
| """ | |
| lm_iris = lm_iris[:, :2] | |
| lm_eye = lm_eye[:, :2] | |
| polygon_eye = mpltPath.Path(lm_eye) | |
| valid = polygon_eye.contains_points(lm_iris) | |
| return valid[0] | |
| def annotate_landmarks(self, dataloader, add_iris=False): | |
| """ | |
| Annotates each frame with landmarks for face and iris. Assumes frames have been extracted | |
| :param add_iris: | |
| :return: | |
| """ | |
| lmks_face, bboxes_faces = self.detect_dataset(dataloader) | |
| if add_iris: | |
| lmks_iris = self.annotate_iris_landmarks(dataloader) | |
| # check conistency of iris landmarks and facial keypoints | |
| for camera_id, lmk_face_camera in lmks_face.items(): | |
| for timestep_id in lmk_face_camera.keys(): | |
| discard_iris_lmks = False | |
| bboxes_face_i = bboxes_faces[camera_id][timestep_id] | |
| if bboxes_face_i is not None: | |
| lmks_face_i = lmks_face[camera_id][timestep_id] | |
| lmks_iris_i = lmks_iris[camera_id][timestep_id] | |
| if lmks_iris_i is not None: | |
| # validate iris landmarks | |
| left_face = lmks_face_i[36:42] | |
| right_face = lmks_face_i[42:48] | |
| right_iris = lmks_iris_i[:1] | |
| left_iris = lmks_iris_i[1:] | |
| if not ( | |
| self.iris_consistency(left_iris, left_face) | |
| and self.iris_consistency(right_iris, right_face) | |
| ): | |
| logger.error( | |
| f"Inconsistent iris landmarks for timestep: {timestep_id}, camera: {camera_id}" | |
| ) | |
| discard_iris_lmks = True | |
| else: | |
| logger.error( | |
| f"No iris landmarks detected for timestep: {timestep_id}, camera: {camera_id}" | |
| ) | |
| discard_iris_lmks = True | |
| else: | |
| logger.error( | |
| f"Discarding iris landmarks because no face landmark is available for timestep: {timestep_id}, camera: {camera_id}" | |
| ) | |
| discard_iris_lmks = True | |
| if discard_iris_lmks: | |
| lmks_iris[timestep_id][camera_id] = ( | |
| np.zeros([2, 3]) - 1 | |
| ) # set to -1 for inconsistent iris landmarks | |
| # construct final json | |
| for camera_id, lmk_face_camera in lmks_face.items(): | |
| bounding_box = [] | |
| face_landmark_2d = [] | |
| iris_landmark_2d = [] | |
| for timestep_id in lmk_face_camera.keys(): | |
| bounding_box.append(bboxes_faces[camera_id][timestep_id][None]) | |
| face_landmark_2d.append(lmks_face[camera_id][timestep_id][None]) | |
| if add_iris: | |
| iris_landmark_2d.append(lmks_iris[camera_id][timestep_id][None]) | |
| lmk_dict = { | |
| "bounding_box": bounding_box, | |
| "face_landmark_2d": face_landmark_2d, | |
| } | |
| if len(iris_landmark_2d) > 0: | |
| lmk_dict["iris_landmark_2d"] = iris_landmark_2d | |
| for k, v in lmk_dict.items(): | |
| if len(v) > 0: | |
| lmk_dict[k] = np.concatenate(v, axis=0) | |
| out_path = dataloader.dataset.get_property_path( | |
| "landmark2d/face-alignment", camera_id=camera_id | |
| ) | |
| logger.info(f"Saving landmarks to: {out_path}") | |
| if not out_path.parent.exists(): | |
| out_path.parent.mkdir(parents=True) | |
| np.savez(out_path, **lmk_dict) | |
| if __name__ == "__main__": | |
| import tyro | |
| from tqdm import tqdm | |
| from torch.utils.data import DataLoader | |
| from vhap.config.base import DataConfig, import_module | |
| cfg = tyro.cli(DataConfig) | |
| dataset = import_module(cfg._target)( | |
| cfg=cfg, | |
| img_to_tensor=False, | |
| batchify_all_views=True, | |
| ) | |
| dataset.items = dataset.items[:2] | |
| dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=4) | |
| detector = LandmarkDetectorFA() | |
| detector.annotate_landmarks(dataloader) | |