LAM / vhap /util /landmark_detector_fa.py
yuandong513
feat: init
17cd746
#
# Toyota Motor Europe NV/SA and its affiliated companies retain all intellectual
# property and proprietary rights in and to this software and related documentation.
# Any commercial use, reproduction, disclosure or distribution of this software and
# related documentation without an express license agreement from Toyota Motor Europe NV/SA
# is strictly prohibited.
#
from vhap.util.log import get_logger
from typing import Literal
from tqdm import tqdm
import face_alignment
import numpy as np
import matplotlib.path as mpltPath
from fdlite import (
FaceDetection,
FaceLandmark,
face_detection_to_roi,
IrisLandmark,
iris_roi_from_face_landmarks,
)
logger = get_logger(__name__)
class LandmarkDetectorFA:
IMAGE_FILE_NAME = "image_0000.png"
LMK_FILE_NAME = "keypoints_static_0000.json"
def __init__(
self,
face_detector:Literal["sfd", "blazeface"]="sfd",
):
"""
Creates dataset_path where all results are stored
:param video_path: path to video file
:param dataset_path: path to results directory
"""
logger.info("Initialize FaceAlignment module...")
# 68 facial landmark detector
self.fa = face_alignment.FaceAlignment(
face_alignment.LandmarksType.TWO_HALF_D,
face_detector=face_detector,
flip_input=True,
device="cuda"
)
def detect_single_image(self, img):
bbox = self.fa.face_detector.detect_from_image(img)
if len(bbox) == 0:
lmks = np.zeros([68, 3]) - 1 # set to -1 when landmarks is inavailable
else:
if len(bbox) > 1:
# if multiple boxes detected, use the one with highest confidence
bbox = [bbox[np.argmax(np.array(bbox)[:, -1])]]
lmks = self.fa.get_landmarks_from_image(img, detected_faces=bbox)[0]
lmks = np.concatenate([lmks, np.ones_like(lmks[:, :1])], axis=1)
if (lmks[:, :2] == -1).sum() > 0:
lmks[:, 2:] = 0.0
else:
lmks[:, 2:] = 1.0
h, w = img.shape[:2]
lmks[:, 0] /= w
lmks[:, 1] /= h
bbox[0][[0, 2]] /= w
bbox[0][[1, 3]] /= h
return bbox, lmks
def detect_dataset(self, dataloader):
"""
Annotates each frame with 68 facial landmarks
:return: dict mapping frame number to landmarks numpy array and the same thing for bboxes
"""
landmarks = {}
bboxes = {}
logger.info("Begin annotating landmarks...")
for item in tqdm(dataloader):
timestep_id = item["timestep_id"][0]
camera_id = item["camera_id"][0]
scale_factor = item["scale_factor"][0]
logger.info(
f"Annotate facial landmarks for timestep: {timestep_id}, camera: {camera_id}"
)
img = item["rgb"][0].numpy()
bbox, lmks = self.detect_single_image(img)
if len(bbox) == 0:
logger.error(
f"No bbox found for frame: {timestep_id}, camera: {camera_id}. Setting landmarks to all -1."
)
if camera_id not in landmarks:
landmarks[camera_id] = {}
if camera_id not in bboxes:
bboxes[camera_id] = {}
landmarks[camera_id][timestep_id] = lmks
bboxes[camera_id][timestep_id] = bbox[0] if len(bbox) > 0 else np.zeros(5) - 1
return landmarks, bboxes
def annotate_iris_landmarks(self, dataloader):
"""
Annotates each frame with 2 iris landmarks
:return: dict mapping frame number to landmarks numpy array
"""
# iris detector
detect_faces = FaceDetection()
detect_face_landmarks = FaceLandmark()
detect_iris_landmarks = IrisLandmark()
landmarks = {}
for item in tqdm(dataloader):
timestep_id = item["timestep_id"][0]
camera_id = item["camera_id"][0]
scale_factor = item["scale_factor"][0]
if timestep_id not in landmarks:
landmarks[timestep_id] = {}
logger.info(
f"Annotate iris landmarks for timestep: {timestep_id}, camera: {camera_id}"
)
img = item["rgb"][0].numpy()
height, width = img.shape[:2]
img_size = (width, height)
face_detections = detect_faces(img)
if len(face_detections) != 1:
logger.error("Empty iris landmarks (type 1)")
landmarks[timestep_id][camera_id] = None
else:
for face_detection in face_detections:
try:
face_roi = face_detection_to_roi(face_detection, img_size)
except ValueError:
logger.error("Empty iris landmarks (type 2)")
landmarks[timestep_id][camera_id] = None
break
face_landmarks = detect_face_landmarks(img, face_roi)
if len(face_landmarks) == 0:
logger.error("Empty iris landmarks (type 3)")
landmarks[timestep_id][camera_id] = None
break
iris_rois = iris_roi_from_face_landmarks(face_landmarks, img_size)
if len(iris_rois) != 2:
logger.error("Empty iris landmarks (type 4)")
landmarks[timestep_id][camera_id] = None
break
lmks = []
for iris_roi in iris_rois[::-1]:
try:
iris_landmarks = detect_iris_landmarks(img, iris_roi).iris[
0:1
]
except np.linalg.LinAlgError:
logger.error("Failed to get iris landmarks")
landmarks[timestep_id][camera_id] = None
break
for landmark in iris_landmarks:
lmks.append([landmark.x * width, landmark.y * height, 1.0])
lmks = np.array(lmks, dtype=np.float32)
h, w = img.shape[:2]
lmks[:, 0] /= w
lmks[:, 1] /= h
landmarks[timestep_id][camera_id] = lmks
return landmarks
def iris_consistency(self, lm_iris, lm_eye):
"""
Checks if landmarks for eye and iris are consistent
:param lm_iris:
:param lm_eye:
:return:
"""
lm_iris = lm_iris[:, :2]
lm_eye = lm_eye[:, :2]
polygon_eye = mpltPath.Path(lm_eye)
valid = polygon_eye.contains_points(lm_iris)
return valid[0]
def annotate_landmarks(self, dataloader, add_iris=False):
"""
Annotates each frame with landmarks for face and iris. Assumes frames have been extracted
:param add_iris:
:return:
"""
lmks_face, bboxes_faces = self.detect_dataset(dataloader)
if add_iris:
lmks_iris = self.annotate_iris_landmarks(dataloader)
# check conistency of iris landmarks and facial keypoints
for camera_id, lmk_face_camera in lmks_face.items():
for timestep_id in lmk_face_camera.keys():
discard_iris_lmks = False
bboxes_face_i = bboxes_faces[camera_id][timestep_id]
if bboxes_face_i is not None:
lmks_face_i = lmks_face[camera_id][timestep_id]
lmks_iris_i = lmks_iris[camera_id][timestep_id]
if lmks_iris_i is not None:
# validate iris landmarks
left_face = lmks_face_i[36:42]
right_face = lmks_face_i[42:48]
right_iris = lmks_iris_i[:1]
left_iris = lmks_iris_i[1:]
if not (
self.iris_consistency(left_iris, left_face)
and self.iris_consistency(right_iris, right_face)
):
logger.error(
f"Inconsistent iris landmarks for timestep: {timestep_id}, camera: {camera_id}"
)
discard_iris_lmks = True
else:
logger.error(
f"No iris landmarks detected for timestep: {timestep_id}, camera: {camera_id}"
)
discard_iris_lmks = True
else:
logger.error(
f"Discarding iris landmarks because no face landmark is available for timestep: {timestep_id}, camera: {camera_id}"
)
discard_iris_lmks = True
if discard_iris_lmks:
lmks_iris[timestep_id][camera_id] = (
np.zeros([2, 3]) - 1
) # set to -1 for inconsistent iris landmarks
# construct final json
for camera_id, lmk_face_camera in lmks_face.items():
bounding_box = []
face_landmark_2d = []
iris_landmark_2d = []
for timestep_id in lmk_face_camera.keys():
bounding_box.append(bboxes_faces[camera_id][timestep_id][None])
face_landmark_2d.append(lmks_face[camera_id][timestep_id][None])
if add_iris:
iris_landmark_2d.append(lmks_iris[camera_id][timestep_id][None])
lmk_dict = {
"bounding_box": bounding_box,
"face_landmark_2d": face_landmark_2d,
}
if len(iris_landmark_2d) > 0:
lmk_dict["iris_landmark_2d"] = iris_landmark_2d
for k, v in lmk_dict.items():
if len(v) > 0:
lmk_dict[k] = np.concatenate(v, axis=0)
out_path = dataloader.dataset.get_property_path(
"landmark2d/face-alignment", camera_id=camera_id
)
logger.info(f"Saving landmarks to: {out_path}")
if not out_path.parent.exists():
out_path.parent.mkdir(parents=True)
np.savez(out_path, **lmk_dict)
if __name__ == "__main__":
import tyro
from tqdm import tqdm
from torch.utils.data import DataLoader
from vhap.config.base import DataConfig, import_module
cfg = tyro.cli(DataConfig)
dataset = import_module(cfg._target)(
cfg=cfg,
img_to_tensor=False,
batchify_all_views=True,
)
dataset.items = dataset.items[:2]
dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=4)
detector = LandmarkDetectorFA()
detector.annotate_landmarks(dataloader)