|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from vhap.util.log import get_logger |
|
|
|
from typing import Literal |
|
from tqdm import tqdm |
|
|
|
import face_alignment |
|
import numpy as np |
|
import matplotlib.path as mpltPath |
|
|
|
from fdlite import ( |
|
FaceDetection, |
|
FaceLandmark, |
|
face_detection_to_roi, |
|
IrisLandmark, |
|
iris_roi_from_face_landmarks, |
|
) |
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
class LandmarkDetectorFA: |
|
|
|
IMAGE_FILE_NAME = "image_0000.png" |
|
LMK_FILE_NAME = "keypoints_static_0000.json" |
|
|
|
def __init__( |
|
self, |
|
face_detector:Literal["sfd", "blazeface"]="sfd", |
|
): |
|
""" |
|
Creates dataset_path where all results are stored |
|
:param video_path: path to video file |
|
:param dataset_path: path to results directory |
|
""" |
|
|
|
logger.info("Initialize FaceAlignment module...") |
|
|
|
self.fa = face_alignment.FaceAlignment( |
|
face_alignment.LandmarksType.TWO_HALF_D, |
|
face_detector=face_detector, |
|
flip_input=True, |
|
device="cuda" |
|
) |
|
|
|
def detect_single_image(self, img): |
|
bbox = self.fa.face_detector.detect_from_image(img) |
|
|
|
if len(bbox) == 0: |
|
lmks = np.zeros([68, 3]) - 1 |
|
|
|
else: |
|
if len(bbox) > 1: |
|
|
|
bbox = [bbox[np.argmax(np.array(bbox)[:, -1])]] |
|
|
|
lmks = self.fa.get_landmarks_from_image(img, detected_faces=bbox)[0] |
|
lmks = np.concatenate([lmks, np.ones_like(lmks[:, :1])], axis=1) |
|
|
|
if (lmks[:, :2] == -1).sum() > 0: |
|
lmks[:, 2:] = 0.0 |
|
else: |
|
lmks[:, 2:] = 1.0 |
|
|
|
h, w = img.shape[:2] |
|
lmks[:, 0] /= w |
|
lmks[:, 1] /= h |
|
bbox[0][[0, 2]] /= w |
|
bbox[0][[1, 3]] /= h |
|
return bbox, lmks |
|
|
|
def detect_dataset(self, dataloader): |
|
""" |
|
Annotates each frame with 68 facial landmarks |
|
:return: dict mapping frame number to landmarks numpy array and the same thing for bboxes |
|
""" |
|
landmarks = {} |
|
bboxes = {} |
|
|
|
logger.info("Begin annotating landmarks...") |
|
for item in tqdm(dataloader): |
|
timestep_id = item["timestep_id"][0] |
|
camera_id = item["camera_id"][0] |
|
scale_factor = item["scale_factor"][0] |
|
|
|
logger.info( |
|
f"Annotate facial landmarks for timestep: {timestep_id}, camera: {camera_id}" |
|
) |
|
img = item["rgb"][0].numpy() |
|
|
|
bbox, lmks = self.detect_single_image(img) |
|
|
|
if len(bbox) == 0: |
|
logger.error( |
|
f"No bbox found for frame: {timestep_id}, camera: {camera_id}. Setting landmarks to all -1." |
|
) |
|
|
|
if camera_id not in landmarks: |
|
landmarks[camera_id] = {} |
|
if camera_id not in bboxes: |
|
bboxes[camera_id] = {} |
|
landmarks[camera_id][timestep_id] = lmks |
|
bboxes[camera_id][timestep_id] = bbox[0] if len(bbox) > 0 else np.zeros(5) - 1 |
|
return landmarks, bboxes |
|
|
|
def annotate_iris_landmarks(self, dataloader): |
|
""" |
|
Annotates each frame with 2 iris landmarks |
|
:return: dict mapping frame number to landmarks numpy array |
|
""" |
|
|
|
|
|
detect_faces = FaceDetection() |
|
detect_face_landmarks = FaceLandmark() |
|
detect_iris_landmarks = IrisLandmark() |
|
|
|
landmarks = {} |
|
|
|
for item in tqdm(dataloader): |
|
timestep_id = item["timestep_id"][0] |
|
camera_id = item["camera_id"][0] |
|
scale_factor = item["scale_factor"][0] |
|
if timestep_id not in landmarks: |
|
landmarks[timestep_id] = {} |
|
logger.info( |
|
f"Annotate iris landmarks for timestep: {timestep_id}, camera: {camera_id}" |
|
) |
|
|
|
img = item["rgb"][0].numpy() |
|
|
|
height, width = img.shape[:2] |
|
img_size = (width, height) |
|
|
|
face_detections = detect_faces(img) |
|
if len(face_detections) != 1: |
|
logger.error("Empty iris landmarks (type 1)") |
|
landmarks[timestep_id][camera_id] = None |
|
else: |
|
for face_detection in face_detections: |
|
try: |
|
face_roi = face_detection_to_roi(face_detection, img_size) |
|
except ValueError: |
|
logger.error("Empty iris landmarks (type 2)") |
|
landmarks[timestep_id][camera_id] = None |
|
break |
|
|
|
face_landmarks = detect_face_landmarks(img, face_roi) |
|
if len(face_landmarks) == 0: |
|
logger.error("Empty iris landmarks (type 3)") |
|
landmarks[timestep_id][camera_id] = None |
|
break |
|
|
|
iris_rois = iris_roi_from_face_landmarks(face_landmarks, img_size) |
|
|
|
if len(iris_rois) != 2: |
|
logger.error("Empty iris landmarks (type 4)") |
|
landmarks[timestep_id][camera_id] = None |
|
break |
|
|
|
lmks = [] |
|
for iris_roi in iris_rois[::-1]: |
|
try: |
|
iris_landmarks = detect_iris_landmarks(img, iris_roi).iris[ |
|
0:1 |
|
] |
|
except np.linalg.LinAlgError: |
|
logger.error("Failed to get iris landmarks") |
|
landmarks[timestep_id][camera_id] = None |
|
break |
|
|
|
for landmark in iris_landmarks: |
|
lmks.append([landmark.x * width, landmark.y * height, 1.0]) |
|
|
|
lmks = np.array(lmks, dtype=np.float32) |
|
|
|
h, w = img.shape[:2] |
|
lmks[:, 0] /= w |
|
lmks[:, 1] /= h |
|
|
|
landmarks[timestep_id][camera_id] = lmks |
|
|
|
return landmarks |
|
|
|
def iris_consistency(self, lm_iris, lm_eye): |
|
""" |
|
Checks if landmarks for eye and iris are consistent |
|
:param lm_iris: |
|
:param lm_eye: |
|
:return: |
|
""" |
|
lm_iris = lm_iris[:, :2] |
|
lm_eye = lm_eye[:, :2] |
|
|
|
polygon_eye = mpltPath.Path(lm_eye) |
|
valid = polygon_eye.contains_points(lm_iris) |
|
|
|
return valid[0] |
|
|
|
def annotate_landmarks(self, dataloader, add_iris=False): |
|
""" |
|
Annotates each frame with landmarks for face and iris. Assumes frames have been extracted |
|
:param add_iris: |
|
:return: |
|
""" |
|
lmks_face, bboxes_faces = self.detect_dataset(dataloader) |
|
|
|
if add_iris: |
|
lmks_iris = self.annotate_iris_landmarks(dataloader) |
|
|
|
|
|
for camera_id, lmk_face_camera in lmks_face.items(): |
|
for timestep_id in lmk_face_camera.keys(): |
|
|
|
discard_iris_lmks = False |
|
bboxes_face_i = bboxes_faces[camera_id][timestep_id] |
|
if bboxes_face_i is not None: |
|
lmks_face_i = lmks_face[camera_id][timestep_id] |
|
lmks_iris_i = lmks_iris[camera_id][timestep_id] |
|
if lmks_iris_i is not None: |
|
|
|
|
|
left_face = lmks_face_i[36:42] |
|
right_face = lmks_face_i[42:48] |
|
|
|
right_iris = lmks_iris_i[:1] |
|
left_iris = lmks_iris_i[1:] |
|
|
|
if not ( |
|
self.iris_consistency(left_iris, left_face) |
|
and self.iris_consistency(right_iris, right_face) |
|
): |
|
logger.error( |
|
f"Inconsistent iris landmarks for timestep: {timestep_id}, camera: {camera_id}" |
|
) |
|
discard_iris_lmks = True |
|
else: |
|
logger.error( |
|
f"No iris landmarks detected for timestep: {timestep_id}, camera: {camera_id}" |
|
) |
|
discard_iris_lmks = True |
|
|
|
else: |
|
logger.error( |
|
f"Discarding iris landmarks because no face landmark is available for timestep: {timestep_id}, camera: {camera_id}" |
|
) |
|
discard_iris_lmks = True |
|
|
|
if discard_iris_lmks: |
|
lmks_iris[timestep_id][camera_id] = ( |
|
np.zeros([2, 3]) - 1 |
|
) |
|
|
|
|
|
for camera_id, lmk_face_camera in lmks_face.items(): |
|
bounding_box = [] |
|
face_landmark_2d = [] |
|
iris_landmark_2d = [] |
|
for timestep_id in lmk_face_camera.keys(): |
|
bounding_box.append(bboxes_faces[camera_id][timestep_id][None]) |
|
face_landmark_2d.append(lmks_face[camera_id][timestep_id][None]) |
|
|
|
if add_iris: |
|
iris_landmark_2d.append(lmks_iris[camera_id][timestep_id][None]) |
|
|
|
lmk_dict = { |
|
"bounding_box": bounding_box, |
|
"face_landmark_2d": face_landmark_2d, |
|
} |
|
if len(iris_landmark_2d) > 0: |
|
lmk_dict["iris_landmark_2d"] = iris_landmark_2d |
|
|
|
for k, v in lmk_dict.items(): |
|
if len(v) > 0: |
|
lmk_dict[k] = np.concatenate(v, axis=0) |
|
out_path = dataloader.dataset.get_property_path( |
|
"landmark2d/face-alignment", camera_id=camera_id |
|
) |
|
logger.info(f"Saving landmarks to: {out_path}") |
|
if not out_path.parent.exists(): |
|
out_path.parent.mkdir(parents=True) |
|
np.savez(out_path, **lmk_dict) |
|
|
|
|
|
if __name__ == "__main__": |
|
import tyro |
|
from tqdm import tqdm |
|
from torch.utils.data import DataLoader |
|
from vhap.config.base import DataConfig, import_module |
|
|
|
cfg = tyro.cli(DataConfig) |
|
dataset = import_module(cfg._target)( |
|
cfg=cfg, |
|
img_to_tensor=False, |
|
batchify_all_views=True, |
|
) |
|
dataset.items = dataset.items[:2] |
|
|
|
dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=4) |
|
|
|
detector = LandmarkDetectorFA() |
|
detector.annotate_landmarks(dataloader) |
|
|