# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. # # This work is licensed under the Creative Commons # Attribution-NonCommercial-ShareAlike 4.0 International License. # To view a copy of this license, visit # http://creativecommons.org/licenses/by-nc-sa/4.0/ or send a letter to # Creative Commons, PO Box 1866, Mountain View, CA 94042, USA. # # Includes modifications proposed by Jeremy Fix # from here: https://github.com/NVlabs/ffhq-dataset/pull/3 import os import sys import json import argparse import numpy as np import multiprocessing from tqdm import tqdm from scipy.ndimage import gaussian_filter1d # Image processing libraries import PIL from PIL import Image, ImageFile # Project-specific imports from lib.preprocess import align_img PIL.ImageFile.LOAD_TRUNCATED_IMAGES = True # avoid "Decompressed Data Too Large" error def save_detection_as_txt(dst, lm5p): outLand = open(dst, "w") for i in range(lm5p.shape[0]): outLand.write(str(float(lm5p[i][0])) + " " + str(float(lm5p[i][1])) + "\n") outLand.close() def process_image(kwargs): """ Processes an image by aligning and cropping it based on facial landmarks. Args: kwargs (dict): Dictionary containing the following keys: - src_dir (str): Directory containing the source image. - dst_dir (str): Directory to save the processed image. - lm5p (np.ndarray): Array of shape (N, 2) representing facial landmarks. - im_name (str): Name of the image file. - save_realign_dir (str or None): Directory to save realigned images. - save_detection_dir (str or None): Directory to save detection results. Returns: None """ # Extract parameters from kwargs src_dir = kwargs['src_dir'] dst_dir = kwargs['dst_dir'] lm5p = kwargs['lm5p'] lm3d = kwargs['lm3d'] im_name = kwargs['im_name'] save_realign_dir = kwargs.get('save_realign_dir', None) save_detection_dir = kwargs.get('save_detection_dir', None) save_align3d_dir = kwargs['save_align3d_dir'] # Ensure the destination directory exists os.makedirs(dst_dir, exist_ok=True) # Construct file paths src_file = os.path.join(src_dir, im_name) # Ensure the source file exists before proceeding assert os.path.isfile(src_file), f"Source file not found: {src_file}" # Open the image img = Image.open(src_file) _, H = img.size # Get image dimensions # Prepare alignment parameters params = {'name': src_file, 'lm': lm5p.tolist()} aligned_lm5p = lm5p.copy() aligned_lm3d = lm3d.copy() # Flip Y-coordinates to match the image coordinate system aligned_lm5p[:, -1] = H - 1 - aligned_lm5p[:, -1] aligned_lm3d[:, 1] = H - 1 - aligned_lm3d[:, 1] # Convert image name to PNG format im_name = im_name.rsplit('.', 1)[0] + '.png' dst_file = os.path.join(dst_dir, im_name) # Optionally save the realigned image if save_realign_dir: img.save(os.path.join(save_realign_dir, im_name)) # Optionally save detected landmarks as a text file if save_detection_dir: save_detection_as_txt( os.path.join(save_detection_dir, im_name.replace('.png', '.txt')), aligned_lm5p ) # Crop the image based on aligned landmarks img_cropped, crop_param, aligned_lm3d_save = crop_image(img, aligned_lm5p.copy(), aligned_lm3d.copy(), output_size=kwargs['output_size']) params['crop'] = crop_param aligned_lm3d_save = np.concatenate([aligned_lm3d_save[:, 0:1], 512 - aligned_lm3d_save[:, 1:2]], 1) np.save(os.path.join(save_align3d_dir, im_name.replace(".png", ".npy").replace(".jpg", ".npy").replace(".jpeg", ".npy")), aligned_lm3d_save) # Save the cropped image img_cropped.save(dst_file) def crop_image(im, lm, ldmk_3d, center_crop_size=700, rescale_factor=300, target_size=1024., output_size=512): """ Crops and resizes an image based on facial landmarks. Args: im (PIL.Image.Image): Input image. lm (np.ndarray): Facial landmarks array of shape (N, 2). center_crop_size (int, optional): Size of the centered crop. Defaults to 700. rescale_factor (int, optional): Scaling factor for alignment. Defaults to 300. target_size (float, optional): Target size for transformation. Defaults to 1024. output_size (int, optional): Final resized output size. Defaults to 512. Returns: tuple: - im_cropped (PIL.Image.Image): The cropped and resized image. - crop_param (list): List of cropping parameters. """ # Get image height _, H = im.size # Define a standardized 3D landmark set for alignment lm3D_std = np.array([ [-0.31148657, 0.09036078, 0.13377953], # Left eye corner [ 0.30979887, 0.08972035, 0.13179526], # Right eye corner [ 0.0032535, -0.24617933, 0.55244243], # Nose tip [-0.25216928, -0.5813392, 0.22405732], # Left mouth corner [ 0.2484662, -0.5812824, 0.22235769], # Right mouth corner ]) # Adjust standard landmarks for better alignment lm3D_std[:, 2] += 0.4 # Adjust depth (Z-axis) lm3D_std[:, 1] += 0.1 # Adjust vertical position (Y-axis) # Align the image based on landmarks _, im_high, _, _, crop_left, crop_up, s, ldmk_3d_align = align_img( im, lm, lm3D_std, ldmk_3d, target_size=target_size, rescale_factor=rescale_factor, rescale_factor_3D=218 ) # Compute center crop coordinates left = int(im_high.size[0] / 2 - center_crop_size / 2) upper = int(im_high.size[1] / 2 - center_crop_size / 2) right = left + center_crop_size lower = upper + center_crop_size # Crop the image im_cropped = im_high.crop((left, upper, right, lower)) # Resize the cropped image to the output size im_cropped = im_cropped.resize((output_size, output_size), resample=Image.LANCZOS) # Define cropping parameters for reference crop_param = [ int(left), int(upper), int(center_crop_size), int(crop_left), int(crop_up), float(H * s), int(target_size) ] return im_cropped, crop_param, ldmk_3d_align def process_video(kwargs): """ Processes a video by aligning images based on facial landmarks. Args: kwargs (dict): Dictionary containing the following keys: - src_dir (str): Directory containing video frames. - dst_dir (str): Directory to save processed images. - lm5p (dict): Dictionary of image filenames and their corresponding 5-point landmarks. - im_names (list): List of image filenames. - output_size (int): Final output image resolution. - transform_size (int): Size used for transformations before cropping. - enable_padding (bool): Whether to apply padding. - enable_warping (bool): Whether to apply warping transformation. - save_realign_dir (str or None): Directory to save realigned images. - save_detection_dir (str or None): Directory to save detection results. - apply_GF (int): Gaussian filtering level for smoothing keypoints. Returns: None """ # Extract parameters from kwargs video_dir = kwargs['src_dir'] dst_dir = kwargs['dst_dir'] lm5p_dict = kwargs['lm5p'] lm3d_dict = kwargs['lm3d'] output_size = kwargs['output_size'] enable_padding = kwargs['enable_padding'] enable_warping = kwargs['enable_warping'] save_realign_dir = kwargs['save_realign_dir'] save_detection_dir = kwargs['save_detection_dir'] save_align3d_dir = kwargs['save_align3d_dir'] apply_GF = kwargs['apply_GF'] # Use landmark dictionary keys as image names im_names = list(lm5p_dict.keys()) # Apply Gaussian filtering for smoother keypoint transitions (if enabled) if apply_GF > 0: im_names.sort(key=lambda x: int(x.split('.')[0])) # Sort images by frame index kps_sequence = np.asarray([lm5p_dict[key] for key in im_names], dtype=np.float32) kps_sequence = gaussian_filter1d(kps_sequence, sigma=apply_GF, axis=0) # Apply Gaussian smoothing else: kps_sequence = np.asarray([lm5p_dict[key] for key in im_names], dtype=np.float32) # Ensure number of images matches the number of keypoints assert len(im_names) == kps_sequence.shape[0], "Mismatch between image count and keypoint data." # Create directories for saving realigned images and detections (if specified) if save_realign_dir: os.makedirs(save_realign_dir, exist_ok=True) if save_detection_dir: os.makedirs(save_detection_dir, exist_ok=True) kps_sequence_3d = np.asarray([lm3d_dict[key] for key in im_names], dtype=np.float32) # Process each image in the video sequence for idx, im_name in enumerate(im_names): lm5p = kps_sequence[idx].reshape([-1, 2]) # Reshape keypoints to (N, 2) format lm3d = kps_sequence_3d[idx].reshape([-1, 3]) # Prepare input dictionary for image processing input_data = { 'src_dir': video_dir, 'dst_dir': dst_dir, 'im_name': im_name, 'lm5p': lm5p, 'lm3d': lm3d, 'save_realign_dir': save_realign_dir, 'save_detection_dir': save_detection_dir, 'save_align3d_dir':save_align3d_dir, 'output_size': output_size, 'enable_padding': enable_padding, 'enable_warping': enable_warping } # Process the image using the defined function process_image(input_data) # Create a 'finish' file to mark completion of processing with open(os.path.join(dst_dir, 'finish'), "w") as f: pass # Creates an empty file def recreate_aligned_images( root_dir, lms_root_dir, dst_dir, valid_imgs_json, output_size=512, enable_padding=True, already_align=False ): """ Recreates aligned images by applying facial landmark-based transformations. Args: root_dir (str): Directory containing original images. lms_root_dir (str): Directory containing facial landmark JSON files. dst_dir (str): Directory to save aligned images. save_realign_dir (str): Directory to save realigned images. valid_imgs_json (str): JSON file containing valid video names and image lists. output_size (int, optional): Final output image resolution. Defaults to 512. enable_padding (bool, optional): Whether to apply padding. Defaults to True. Returns: None """ print("Recreating aligned images...") # Load valid video names and corresponding image lists from JSON file with open(valid_imgs_json, 'r') as f: valid_idx = json.load(f) inputs = [] # List to store image processing parameters # Iterate over each valid video for video_name, img_names in valid_idx: video_dir = os.path.join(root_dir, video_name) # Path to video images dst_save_dir = os.path.join(dst_dir, video_name) # Destination folder for aligned images base_dir = os.path.dirname(os.path.dirname(dst_dir)) save_realign_dir = os.path.join(base_dir, 'realign', video_name) save_detection_dir = os.path.join(base_dir, 'realign_detections', video_name) save_align3d_dir = os.path.join(base_dir, 'align_3d_landmark', video_name) os.makedirs(save_align3d_dir, exist_ok=True) if save_realign_dir: os.makedirs(save_realign_dir, exist_ok=True) os.makedirs( save_detection_dir, exist_ok=True) # Skip processing if video directory does not exist # if not os.path.isdir(video_dir): # continue # Load facial landmark data for this video lm5p_path = os.path.join(lms_root_dir, f"{video_name}.json") lm3d_path = os.path.join(lms_root_dir, f"{video_name}3d.json") with open(lm5p_path, 'r') as f: lm5p_dict = json.load(f) with open(lm3d_path, 'r') as f: lm3d_dict = json.load(f) # Iterate over images in the video for im_name in img_names: if im_name not in lm5p_dict: continue # Skip if landmarks for this image are missing if im_name not in lm3d_dict: continue # Convert and reshape landmark points lm5p = np.asarray(lm5p_dict[im_name], dtype=np.float32).reshape([-1, 2]) lm3d = np.asarray(lm3d_dict[im_name], dtype=np.float32).reshape([-1, 3]) # Prepare input dictionary for processing input_data = { 'src_dir': video_dir, 'dst_dir': dst_save_dir, 'im_name': im_name, 'lm5p': lm5p, 'lm3d': lm3d, 'save_realign_dir': save_realign_dir, 'save_detection_dir': save_detection_dir, 'save_align3d_dir':save_align3d_dir, 'output_size': output_size, 'enable_padding': enable_padding } inputs.append(input_data) # break # Stops after processing the first video (Is this intentional?) # Parallel Processing using multiprocessing (commented out for now) # with multiprocessing.Pool(n_threads) as pool: # results = list(tqdm(pool.imap(process_image, inputs), total=len(inputs), smoothing=0.1)) # Sequential processing (useful for debugging) if already_align: for input_data in tqdm(inputs, desc="Processing images"): src_dir = input_data['src_dir'] dst_dir = input_data['dst_dir'] im_name = input_data['im_name'] lm5p = input_data['lm5p'] save_realign_dir = input_data.get('save_realign_dir', None) save_detection_dir = input_data.get('save_detection_dir', None) save_align3d_dir = input_data['save_align3d_dir'] # Ensure the destination directory exists os.makedirs(dst_dir, exist_ok=True) # Construct file paths src_file = os.path.join(src_dir, im_name) # Ensure the source file exists before proceeding assert os.path.isfile(src_file), f"Source file not found: {src_file}" # Open the image img = Image.open(src_file) _, H = img.size # Get image dimensions im_name = im_name.rsplit('.', 1)[0] + '.png' dst_file = os.path.join(dst_dir, im_name) # Optionally save the realigned image if save_realign_dir: os.makedirs(save_realign_dir, exist_ok=True) img.save(os.path.join(save_realign_dir, im_name)) aligned_lm5p = lm5p.copy() # Flip Y-coordinates to match the image coordinate system aligned_lm5p[:, -1] = H - 1 - aligned_lm5p[:, -1] # Optionally save detected landmarks as a text file if save_detection_dir: os.makedirs(save_detection_dir, exist_ok=True) save_detection_as_txt( os.path.join(save_detection_dir, im_name.replace('.png', '.txt')), aligned_lm5p ) # Save the cropped image img.save(dst_file) lm3d = input_data['lm3d'][:, 0:2] np.save(os.path.join(save_align3d_dir, im_name.replace(".png", ".npy").replace(".jpg", ".npy").replace(".jpeg", ".npy")), lm3d) else: for input_data in tqdm(inputs, desc="Processing images"): process_image(input_data) def recreate_aligned_videos_multiprocessing( root_dir, lms_root_dir, dst_dir, valid_video_json, save_realign=True, skip=True, enable_warping=False, output_size=512, enable_padding='zero_padding', n_threads=12, apply_GF=0 ): """ Recreates aligned videos by processing images with landmark-based transformations. Args: root_dir (str): Directory containing original video frames. lms_root_dir (str): Directory with corresponding facial landmark JSON files. dst_dir (str): Directory to save aligned images. valid_video_json (str): JSON file containing valid video names and frame lists. save_realign (bool, optional): Whether to save realigned images. Defaults to True. skip (bool, optional): Skip already processed videos if 'finish' file exists. Defaults to False. enable_warping (bool, optional): Apply warping transformation. Defaults to True. output_size (int, optional): Desired output image resolution. Defaults to 1024. transform_size (int, optional): Size used for transformation before cropping. Defaults to 4096. enable_padding (str, optional): Padding mode ('zero_padding', 'blur_padding', 'reflect_padding', or None). Defaults to None. n_threads (int, optional): Number of parallel threads for processing. Defaults to 12. apply_GF (int, optional): Gaussian filtering level. Defaults to 0. Returns: None """ print("Recreating aligned images...") # Validate `enable_padding` argument assert enable_padding in [None, 'zero_padding', 'blur_padding', 'reflect_padding'], \ f"Invalid enable_padding value: {enable_padding}" # Load valid video indices from JSON with open(valid_video_json, 'r') as f: valid_idx = json.load(f) inputs = [] # List to store parameters for multiprocessing # Iterate through each valid video and prepare processing inputs for video_name, im_names in valid_idx: video_dir = os.path.join(root_dir, video_name) # Path to video frames dst_save_dir = os.path.join(dst_dir, video_name) # Destination path for aligned images base_dir = os.path.dirname(os.path.dirname(dst_dir)) save_align3d_dir = os.path.join(base_dir, 'align_3d_landmark', video_name) os.makedirs(save_align3d_dir, exist_ok=True) # Paths for saving realigned images and detections (if enabled) save_realign_dir = save_detection_dir = None if save_realign: save_realign_dir = os.path.join(base_dir, 'realign', video_name) save_detection_dir = os.path.join(base_dir, 'realign_detections', video_name) # Skip processing if video directory or landmark JSON does not exist if not os.path.isdir(video_dir): continue if not os.path.exists(os.path.join(lms_root_dir, f"{video_name}.json")): continue # Skip if already processed and `skip=True` if skip and os.path.exists(os.path.join(dst_save_dir, 'finish')): continue # Load facial landmark data with open(os.path.join(lms_root_dir, f"{video_name}.json"), 'r') as f: lm5p_dict = json.load(f) with open(os.path.join(lms_root_dir, f"{video_name}3d.json"), 'r') as f: lm3d_dict = json.load(f) # Prepare input dictionary for processing input_data = { 'src_dir': video_dir, 'dst_dir': dst_save_dir, 'lm5p': lm5p_dict, 'lm3d': lm3d_dict, 'im_names': im_names, 'save_realign_dir': save_realign_dir, 'save_detection_dir': save_detection_dir, 'save_align3d_dir':save_align3d_dir, 'output_size': output_size, 'enable_padding': enable_padding, 'apply_GF': apply_GF, 'enable_warping': enable_warping } inputs.append(input_data) # Process videos in parallel using multiprocessing with multiprocessing.Pool(n_threads) as pool: results = list(tqdm(pool.imap(process_video, inputs), total=len(inputs), smoothing=0.1)) # Alternative: Process sequentially (useful for debugging) # for input_data in tqdm(inputs): # process_video(input_data) # # ---------------------------------------------------------------------------- # # if __name__ == "__main__": # parser = argparse.ArgumentParser() # parser.add_argument('--source', type=str, default='.') # parser.add_argument('--lm_source', type=str, default='') # parser.add_argument('--dest', type=str, default='realign1500') # parser.add_argument('--valid_video_json', type=str, default=None) # parser.add_argument('--threads', type=int, default=12) # parser.add_argument('--output_size', type=int, default=768) # parser.add_argument('--transform_size', type=int, default=768) # parser.add_argument('--apply_GF', type=float, default=0) # parser.add_argument('--save_realign', action='store_true') # parser.add_argument('--skip', action='store_true') # parser.add_argument('--disable_warping', action='store_true') # parser.add_argument('--padding_mode', type=str, default=None) # args = parser.parse_args() # # # recreate_aligned_images_fast(args.source, args.lm_source, args.dest, args.save_realign_dir, args.valid_video_json, # # output_size=args.output_size, transform_size=args.transform_size, n_threads=args.threads) # recreate_aligned_videos_fast(args.source, args.lm_source, args.dest, args.valid_video_json, # save_realign=args.save_realign, skip=args.skip, # output_size=args.output_size, transform_size=args.transform_size, # n_threads=args.threads, apply_GF=args.apply_GF, # enable_padding=args.padding_mode, enable_warping=False) # # # run_cmdline(sys.argv) # ----------------------------------------------------------------------------