Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import sys | |
import json | |
import time | |
import glob | |
import logging | |
import argparse | |
from datetime import datetime | |
import cv2 | |
import torch | |
import numpy as np | |
from tqdm import tqdm | |
from scipy.ndimage import gaussian_filter1d | |
from PIL import Image | |
from natsort import ns, natsorted | |
from lib.config.config_demo import cfg | |
from lib.faceverse_process.fit_faceverse import fit_faceverse | |
from lib.face_detect_ldmk_pipeline import FaceLdmkDetector | |
from lib.model_builder import make_model | |
from lib.align_in_the_wild import recreate_aligned_images, recreate_aligned_videos_multiprocessing | |
from lib.preprocess_faceverse import make_cam_dataset_FFHQ, render_orth_mp | |
from lib.pdfgc.encoder import FanEncoder | |
from lib.pdfgc.utils import get_motion_feature | |
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") | |
def is_image_file(filename): | |
""" | |
Check if a file has a common image format extension. | |
Args: | |
filename (str): The name or path of the file. | |
Returns: | |
bool: True if the file has an image extension (.jpg, .png, etc.), otherwise False. | |
""" | |
image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"} | |
return os.path.splitext(filename)[1].lower() in image_extensions | |
def get_images_in_folder(folder_path): | |
""" | |
Check if a given folder contains images and return a list of image file names. | |
Args: | |
folder_path (str): The path of the folder to check. | |
Returns: | |
list: A list of image file names if found, otherwise an empty list. | |
""" | |
if not os.path.isdir(folder_path): # Check if the path is a directory | |
return [] | |
image_files = [file for file in os.listdir(folder_path) if | |
os.path.isfile(os.path.join(folder_path, file)) and is_image_file(file)] | |
return image_files # Return a list of image names | |
def is_video_file(file_path): | |
video_extensions = {".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv" } | |
ext = os.path.splitext(file_path)[1].lower() | |
return ext in video_extensions | |
def extract_imgs(input_path, save_dir, skip=1, center_crop=False, res=512, is_video=False, is_img=True): | |
os.makedirs(save_dir, exist_ok=True) | |
if is_video and is_video_file(input_path): | |
videoCapture = cv2.VideoCapture(input_path) | |
size = (int(videoCapture.get(cv2.CAP_PROP_FRAME_WIDTH)), | |
int(videoCapture.get(cv2.CAP_PROP_FRAME_HEIGHT))) | |
if center_crop: | |
length = min(size) // 2 | |
top, bottom, left, right = max(0, size[1] // 2 - length), min(size[1], size[1] // 2 + length), max(0, size[ | |
0] // 2 - length), min(size[0], size[0] // 2 + length) | |
else: | |
length = max(size) // 2 | |
top, bottom, left, right = max(0, length - size[1] // 2), max(0, length - size[1] // 2), max(0, | |
length - size[ | |
0] // 2), max( | |
0, length - size[0] // 2) | |
count = -1 | |
while True: | |
flag, frame = videoCapture.read() | |
count += 1 | |
if not flag: | |
break | |
if skip > 1 and not (count % skip == 0): | |
continue | |
if center_crop: | |
crop_frame = frame[top: bottom, left: right] | |
else: | |
crop_frame = cv2.copyMakeBorder(frame, top, bottom, left, right, cv2.BORDER_CONSTANT, value=0) | |
if not res == crop_frame.shape[0]: | |
crop_frame = cv2.resize(crop_frame, dsize=(res, res), interpolation=cv2.INTER_LINEAR) | |
cv2.imwrite(os.path.join(save_dir, str(count) + '.png'), crop_frame) | |
videoCapture.release() | |
logging.info(f"Video frames saved in {save_dir}") | |
elif is_img: | |
all_imgs = get_images_in_folder(input_path) | |
if len(all_imgs) == 0: | |
raise ValueError("The input file has no images") | |
else: | |
count = -1 | |
for image_name in all_imgs: | |
count += 1 | |
img = cv2.imread(os.path.join(input_path, image_name)) | |
size = (img.shape[1], img.shape[0]) | |
if center_crop: | |
length = min(size) // 2 | |
top, bottom, left, right = max(0, size[1] // 2 - length), min(size[1], size[1] // 2 + length), max( | |
0, size[0] // 2 - length), min(size[0], size[0] // 2 + length) | |
else: | |
length = max(size) // 2 | |
top, bottom, left, right = max(0, length - size[1] // 2), max(0, length - size[1] // 2), max(0, | |
length - | |
size[ | |
0] // 2), max( | |
0, length - size[0] // 2) | |
if center_crop: | |
crop_frame = img[top: bottom, left: right] | |
else: | |
crop_frame = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=0) | |
if not res == crop_frame.shape[0]: | |
crop_frame = cv2.resize(crop_frame, dsize=(res, res), interpolation=cv2.INTER_LINEAR) | |
cv2.imwrite(os.path.join(save_dir, image_name.split('.')[0].replace(" ", "_") + '.png'), crop_frame) | |
logging.info(f"Images saved in {save_dir}") | |
else: | |
raise ValueError("The input file is not a video") | |
def get_valid_input_idx(input_path, save_dir, is_video, is_img, skip=1): | |
""" | |
Extracts images from the input file, organizes them, and creates a JSON file listing valid images. | |
Args: | |
input_path (str): Path to the input file (e.g., a video or archive). | |
save_dir (str): Directory to save extracted images. | |
skip (int, optional): Step size for selecting images (default: 1). | |
Returns: | |
str: Path to the generated JSON file containing valid image names. | |
""" | |
# Extract images to a subdirectory named after the input file | |
raw_imgs_save_dir = os.path.join(save_dir, os.path.splitext(os.path.basename(input_path))[0]) | |
extract_imgs(input_path, raw_imgs_save_dir, is_video=is_video, is_img=is_img) | |
# Define JSON save path | |
valid_imgs_json_save_path = os.path.join(save_dir, 'valid_imgs.json') | |
valid_videos, count, img_nums = [], 0, [0] | |
# Process subdirectories in `save_dir` | |
for video_imgs_name in tqdm(os.listdir(save_dir)): | |
print(video_imgs_name) | |
video_imgs_dir = os.path.join(save_dir, video_imgs_name) | |
if not os.path.isdir(video_imgs_dir): | |
continue | |
# Collect and sort image files | |
img_names = natsorted([x for x in os.listdir(video_imgs_dir) if x.endswith((".jpg", ".png", ".webp"))], alg=ns.PATH) | |
# Store selected images with skip interval | |
valid_videos.append([video_imgs_name, img_names[::skip]]) | |
count += len(valid_videos[-1][1]) | |
img_nums.append(count) | |
# Save results to JSON | |
with open(valid_imgs_json_save_path, 'w') as f: | |
json.dump(valid_videos, f, indent=4) | |
return valid_imgs_json_save_path # Return JSON file path | |
def make_coeff_dataset_FFHQ(tracking_dir, save_dir, smooth=False, is_img=False): | |
""" | |
Processes and organizes FaceVerse coefficients into a structured dataset. | |
Parameters: | |
tracking_dir (str): Source directory containing tracked coefficient sequences | |
save_dir (str): Target directory to save processed coefficients | |
smooth (bool): Apply temporal smoothing to coefficients when True | |
""" | |
# Iterate through each sequence directory | |
for prefix in tqdm(os.listdir(tracking_dir)): | |
seq_path = os.path.join(tracking_dir, prefix) | |
# Skip non-directory entries | |
if not os.path.isdir(seq_path): | |
continue | |
# Collect valid frame directories containing 'finish' flag file | |
frame_dirs = [ | |
name for name in os.listdir(seq_path) | |
if os.path.exists(os.path.join(seq_path, name, 'finish')) | |
] | |
# Sort frames numerically (requires directory names to be integer strings) | |
if not is_img: | |
frame_dirs.sort(key=lambda x: int(x)) | |
try: | |
# Load all coefficient sequences for this sequence | |
coeff_seq = np.stack([ | |
np.load(os.path.join(seq_path, fname, 'coeffs.npy')) | |
for fname in frame_dirs | |
], axis=0) # Shape: [num_frames, num_coeffs] | |
# Apply temporal smoothing if enabled | |
if smooth: | |
# Gaussian filter with σ=0.5 across time dimension | |
coeff_seq = gaussian_filter1d(coeff_seq, sigma=0.5, axis=0) | |
# Create output directory structure | |
output_seq_dir = os.path.join(save_dir, prefix) | |
os.makedirs(output_seq_dir, exist_ok=True) | |
# Save processed coefficients per frame | |
for idx, fname in enumerate(frame_dirs): | |
output_path = os.path.join(output_seq_dir, fname + '.npy') | |
np.save(output_path, coeff_seq[idx]) | |
except Exception as e: | |
# Note: Consider adding error logging in production code | |
# print(f"Skipping sequence {prefix} due to error: {str(e)}") | |
continue | |
class Process(object): | |
def __init__(self, cfg): | |
self.cfg = cfg | |
self.net_fd, self.net_ldmk, self.net_ldmk_3d = make_model(self.cfg) | |
self.net_fd.cuda() | |
self.net_ldmk.cuda() | |
self.net_ldmk_3d.cuda() | |
pd_fgc = FanEncoder() | |
weight_dict = torch.load(cfg.pdfgc_path) | |
pd_fgc.load_state_dict(weight_dict, strict=False) | |
pd_fgc = pd_fgc.eval().cuda() | |
self.pd_fgc = pd_fgc | |
### set eval and freeze models | |
self.net_fd.eval() | |
self.net_ldmk.eval() | |
self.net_ldmk_3d.eval() | |
self.stand_index = np.array([96, 97, 54, 76, 82]) | |
self.fd_ldmk_detector = FaceLdmkDetector(self.net_fd, self.net_ldmk, self.net_ldmk_3d) | |
def get_faceverse_labels_FFHQ(self, tracking_dir, root_dir, fv2fl_T_path='data_process/lib/FaceVerse/v3/fv2fl_30.npy', | |
focal=4.2647, need_render=False, | |
save_uv=True, save_mesh=False, save_name=None, render_normal_uv=False, | |
num_thread=1, use_smooth=False, test_data=False, skip=False, is_img=False): | |
""" | |
Processes FaceVerse tracking data to generate dataset labels for FFHQ-style datasets. | |
Parameters: | |
tracking_dir (str): Path to directory containing FaceVerse tracking data | |
root_dir (str): Root directory for dataset outputs | |
fv2fl_T_path (str): Path to FaceVerse-to-FLAME transformation matrix | |
focal (float): Camera focal length | |
need_render (bool): Whether to render visualizations | |
save_uv (bool): Save UV texture maps if True | |
save_mesh (bool): Save 3D meshes if True | |
save_name (str): Custom name for output directory | |
render_normal_uv (bool): Render normal maps in UV space if True | |
num_thread (int): Number of parallel processing threads | |
use_smooth (bool): Apply temporal smoothing to coefficients if True | |
test_data (bool): Process as test data with different output structure if True | |
skip (bool): Skip existing files if True | |
""" | |
# Setup base directories | |
save_dir = os.path.join(root_dir, 'dataset') | |
os.makedirs(save_dir, exist_ok=True) | |
# Load FaceVerse to FLAME transformation matrix | |
fv2fl_T = np.load(fv2fl_T_path).astype(np.float32) | |
# Coordinate transformation parameters | |
orth_scale = 5.00 # Orthographic projection scaling factor | |
orth_shift = np.array([0, 0.005, 0.], dtype=np.float32) # Coordinate shift | |
box_warp = 2.0 # Normalization scaling factor | |
# Path configurations | |
face_model_dir = 'data_process/lib/FaceVerse/v3' # Pre-trained FaceVerse model | |
save_render_dir = os.path.join(save_dir, 'orthRender256x256_face_eye' if save_name is None else save_name) | |
save_mesh_dir = os.path.join(save_dir, 'FVmeshes512x512') if save_mesh else None | |
save_uv_dir = os.path.join(save_dir, 'uvRender256x256') if save_uv else None | |
# Render orthographic projections and process geometry | |
render_orth_mp( | |
tracking_dir, | |
save_render_dir, | |
face_model_dir, | |
fv2fl_T, | |
{'scale': orth_scale, 'shift': orth_shift}, | |
focal, | |
render_vis=need_render, | |
save_mesh_dir=save_mesh_dir, | |
save_uv_dir=save_uv_dir, | |
render_normal_uv=render_normal_uv, | |
skip=skip, | |
num_thread=num_thread, | |
crop_param=[128, 114, 256, 256], # Crop parameters: x_offset, y_offset, width, height | |
save_coeff=True | |
) | |
# Compute normalization transformation matrix | |
normalizeFL_T = np.eye(4, dtype=np.float32) | |
scale_T = (orth_scale / box_warp) * np.eye(3, dtype=np.float32) # Scaling component | |
shift_T = scale_T.dot(orth_shift.reshape(3, 1)) # Translation component | |
normalizeFL_T[:3, :3], normalizeFL_T[:3, 3:] = scale_T, shift_T | |
# Update FaceVerse to FLAME transformation with normalization | |
fv2fl_T = np.dot(normalizeFL_T, fv2fl_T) | |
# Generate camera parameters dataset | |
cam_params, cond_cam_params, fv_exp_eye_params = make_cam_dataset_FFHQ( | |
tracking_dir, fv2fl_T, focal, test_data=test_data | |
) | |
# Handle different output structures for test vs training data | |
if test_data: | |
# Save per-sequence camera parameters | |
for prefix in cam_params.keys(): | |
save_json_name = f'dataset_{prefix}_realcam.json' | |
output_path = os.path.join(save_dir, 'images512x512', save_json_name) | |
with open(output_path, "w") as f: | |
json.dump({"labels": cam_params[prefix]}, f, indent=4) | |
else: | |
# Save unified camera parameters with optional temporal smoothing | |
save_json_name = 'dataset_realcam.json' | |
if use_smooth: | |
smoothed_params = [] | |
# Process each subdirectory sequence | |
for sub_name in os.listdir(os.path.join(save_dir, 'images512x512')): | |
sub_path = os.path.join(save_dir, 'images512x512', sub_name) | |
if not os.path.isdir(sub_path): | |
continue | |
# Extract and sort sequence frames | |
sub_json = [case for case in cam_params if case[0].split('/')[0] == sub_name] | |
sub_json.sort(key=lambda x: int(x[0].split('/')[1].split('.')[0])) | |
# Apply Gaussian smoothing to coefficients | |
coeff_seq = np.array([x[1] for x in sub_json], dtype=np.float32) | |
coeff_seq = gaussian_filter1d(coeff_seq, sigma=1.5, axis=0) | |
# Rebuild parameter list with smoothed coefficients | |
smoothed_params.extend([ | |
[x[0], coeff_seq[idx].tolist()] | |
for idx, x in enumerate(sub_json) | |
]) | |
cam_params = smoothed_params | |
# Save final parameters | |
output_path = os.path.join(save_dir, 'images512x512', save_json_name) | |
with open(output_path, "w") as f: | |
json.dump({"labels": cam_params}, f, indent=4) | |
# Generate final coefficient dataset | |
make_coeff_dataset_FFHQ(tracking_dir, os.path.join(save_dir, 'coeffs'), smooth=use_smooth, is_img=is_img) | |
def get_landmarks(self, imgs_root, save_dir, valid_imgs_json, skip=False, is_img=False): | |
self.fd_ldmk_detector.reset() | |
out_detection = save_dir | |
os.makedirs(out_detection, exist_ok=True) | |
valid_idx = json.loads(open(valid_imgs_json).read()) | |
no_face_log = [] | |
for vidx, (video_name, imgs) in enumerate(valid_idx): | |
if skip and os.path.exists(os.path.join(out_detection, video_name + '.json')): | |
continue | |
bar = tqdm(imgs) | |
save_kps = dict() | |
save_kps_3d = dict() | |
for img_name in bar: | |
bar.set_description('%d/%d: %s' % (vidx, len(valid_idx), video_name)) | |
img_path = os.path.join(imgs_root, video_name, img_name) | |
img = cv2.imread(img_path) | |
with torch.no_grad(): | |
try: | |
ldmks, ldmks_3d, boxes = self.fd_ldmk_detector.inference(img) | |
except Exception as e: | |
self.fd_ldmk_detector.reset() | |
logging.error(f"Error during inference: {e}") # Error log | |
no_face_log.append([video_name, img_name]) | |
continue | |
if is_img: | |
self.fd_ldmk_detector.reset() | |
# default the first one face | |
keypoints = ldmks[0, self.stand_index] | |
ldmks_3d = ldmks_3d[0] | |
kps = [[float(int(keypoints[0][0])), float(int(keypoints[0][1]))], | |
[float(int(keypoints[1][0])), float(int(keypoints[1][1]))], | |
[float(int(keypoints[2][0])), float(int(keypoints[2][1]))], | |
[float(int(keypoints[3][0])), float(int(keypoints[3][1]))], | |
[float(int(keypoints[4][0])), float(int(keypoints[4][1]))] | |
] | |
save_kps[img_name] = kps | |
save_kps_3d[img_name] = ldmks_3d.tolist() | |
logging.info(f"landmarks: {os.path.join(out_detection, video_name + '.json')}") | |
with open(os.path.join(out_detection, video_name + '.json'), 'w') as f: | |
f.write(json.dumps(save_kps, indent=4)) | |
with open(os.path.join(out_detection, video_name + '3d.json'), 'w') as f: | |
f.write(json.dumps(save_kps_3d, indent=4)) | |
if len(no_face_log) > 0: | |
jstr = json.dumps(no_face_log, indent=4) | |
with open(os.path.join(out_detection, str(datetime.now()) + '_total_no_face_log.json'), 'w') as f: | |
f.write(jstr) | |
self.fd_ldmk_detector.reset() | |
def get_pdfgc(self, input_imgs_dir, input_ldm3d_dir, motion_save_base_dir): | |
all_items = os.listdir(input_ldm3d_dir) | |
folders = [item for item in all_items if os.path.isdir(os.path.join(input_ldm3d_dir, item))] | |
with torch.no_grad(): | |
for file_name in folders: | |
motion_save_dir = os.path.join(motion_save_base_dir, file_name) | |
os.makedirs(motion_save_dir, exist_ok=True) | |
img_list = sorted( | |
[f for f in os.listdir(os.path.join(input_imgs_dir, file_name)) | |
if os.path.splitext(f)[-1].lower() in {'.jpg', '.jpeg', '.png', '.bmp', '.gif'}] | |
) | |
for img_name in img_list: | |
img_dir = os.path.join(input_imgs_dir, file_name, img_name) | |
lmks_dir = os.path.join(input_ldm3d_dir, file_name, | |
img_name.replace('.png', '.npy').replace('.jpg', '.npy').replace('.jpeg', | |
'.npy')) | |
img = np.array(Image.open(img_dir)) | |
img = torch.from_numpy((img.astype(np.float32) / 127.5 - 1)).cuda() | |
img = img.permute([2, 0, 1]).unsqueeze(0) | |
lmks = torch.from_numpy(np.load(lmks_dir)).cuda().unsqueeze(0) | |
motion = get_motion_feature(self.pd_fgc, img, lmks).squeeze(0).cpu().numpy() | |
np.save(os.path.join(motion_save_dir, img_name.replace('.png', '.npy').replace('.jpg', '.npy')), | |
motion) | |
def get_faceverse(self, save_dir, save_tracking_dir, is_img): | |
fit_faceverse(save_dir, save_tracking_dir, is_img=is_img) | |
def clean_labels(self, tracking_dir, labels_path, final_label_path): | |
""" | |
Filter out labels containing frames with no detected faces | |
Args: | |
tracking_dir: Path to directory containing no-face detection logs | |
labels_path: Path to original labels JSON file | |
final_path: Output path for cleaned labels JSON | |
""" | |
# Initialize collection of frames with no faces | |
no_face_entries = [] | |
# Load all no-face detection logs | |
for log_file in os.listdir(tracking_dir): | |
if not log_file.endswith("_total_no_face_log.json"): | |
continue | |
with open(os.path.join(tracking_dir, log_file)) as f: | |
no_face_entries.extend(json.load(f)) | |
# Load original labels and extract filenames | |
with open(labels_path) as f: | |
original_labels = json.load(f)['labels'] | |
label_filenames = [label[0] for label in original_labels] | |
# Identify frames to exclude | |
excluded_frames = set() | |
for entry in no_face_entries: | |
# Extract video and frame names from log entry path | |
path_parts = entry[1].split('/') | |
video_name = path_parts[-2] | |
frame_name = path_parts[-1] | |
composite_key = f"{video_name}/{frame_name}" | |
logging.debug(f"Processing no-face entry: {composite_key}") | |
if composite_key in label_filenames: | |
excluded_frames.add(frame_name) | |
logging.info(f"Identified {len(excluded_frames)} frames to exclude") | |
# Filter out excluded frames from labels | |
cleaned_labels = [] | |
for label in tqdm(original_labels, desc="Filtering labels"): | |
# Label format: "video_name/frame_name" | |
frame_id = label[0].split('/')[1] | |
if frame_id not in excluded_frames: | |
cleaned_labels.append(label) | |
# Save cleaned labels | |
logging.info(f"Original labels: {len(original_labels)}, Cleaned labels: {len(cleaned_labels)}") | |
with open(final_label_path, 'w') as f: | |
json.dump({'labels': cleaned_labels}, f, indent=4) | |
def inference(self, input_dir, save_dir, is_video=True, is_img=False, smooth_cropping_mode=3.0, | |
no_extract_frames=False, no_extract_landmarks=False, no_align=False, | |
no_fitting_faceverse=False, no_render_faceverse=False, already_align=False, no_pdfgc_motion=False): | |
""" | |
End-to-end processing pipeline for facial analysis and reconstruction. | |
Parameters: | |
input_dir (str): Source directory containing input videos/images | |
save_dir (str): Root directory for all processed outputs | |
is_video (bool): True when processing video files | |
is_img (bool): True when processing image sequences | |
smooth_cropping_mode (float): Smoothing strength for video frame alignment | |
no_extract_frames (bool): Extract frames from video if True | |
no_extract_landmarks (bool): Detect facial landmarks if True | |
no_align (bool): Align and crop face regions if True | |
no_fitting_faceverse (bool): Fit FaceVerse model if True | |
no_render_faceverse (bool): Render FaceVerse outputs if True | |
""" | |
# Initialize directory paths | |
motions_save_dir = os.path.join(save_dir, 'dataset', "motions") | |
data_save_dir = os.path.join(save_dir, 'dataset', "images512x512") # Final processed images | |
raw_detection_dir = os.path.join(save_dir, "raw_detections") # Landmark detection results | |
save_tracking_dir = os.path.join(save_dir, 'crop_fv_tracking') # FaceVerse tracking data | |
indir = os.path.join(save_dir, 'raw_frames') # Raw extracted frames | |
aligned3d_save_dir = os.path.join(save_dir, 'align_3d_landmark') | |
# --- Frame Extraction Stage --- | |
if not no_extract_frames: | |
logging.info("Extracting frames from input source") | |
valid_imgs_json = get_valid_input_idx( | |
input_dir, | |
indir, | |
is_video=is_video, | |
is_img=is_img, | |
skip=1 # Frame sampling interval | |
) | |
logging.info(f"Frame extraction complete. Results in {indir}") | |
else: | |
valid_imgs_json = os.path.join(indir, 'valid_imgs.json') | |
logging.info(f"Using pre-extracted frames from {indir}") | |
# --- Landmark Detection Stage --- | |
if not no_extract_landmarks: | |
logging.info("Performing facial landmark detection") | |
self.get_landmarks(indir, raw_detection_dir, valid_imgs_json, is_img=is_img) | |
logging.info(f"Landmark detection complete. Results in {raw_detection_dir}") | |
else: | |
logging.info(f"Using pre-computed landmarks from {raw_detection_dir}") | |
# --- Face Alignment Stage --- | |
if not no_align: | |
logging.info("Aligning and cropping face regions") | |
if is_video: | |
# Video processing with temporal smoothing | |
recreate_aligned_videos_multiprocessing( | |
indir, | |
raw_detection_dir, | |
data_save_dir, | |
valid_imgs_json, | |
apply_GF=smooth_cropping_mode # Gaussian filtering strength | |
) | |
elif is_img: | |
# Image sequence processing | |
recreate_aligned_images( | |
indir, | |
raw_detection_dir, | |
data_save_dir, | |
already_align=already_align, | |
valid_imgs_json=valid_imgs_json | |
) | |
else: | |
raise ValueError("Invalid input type - must be video or image sequence") | |
logging.info(f"Alignment complete. Results in {data_save_dir}") | |
else: | |
logging.info(f"Using pre-aligned images from {data_save_dir}") | |
if not no_pdfgc_motion: | |
logging.info("Getting the pdfgc motions") | |
self.get_pdfgc(data_save_dir, aligned3d_save_dir, motions_save_dir) | |
logging.info(f"Alignment complete. Results in {motions_save_dir}") | |
else: | |
logging.info(f"Using pre-aligned images from {motions_save_dir}") | |
# --- FaceVerse Model Fitting Stage --- | |
if not no_fitting_faceverse: | |
logging.info("Fitting FaceVerse 3D face model") | |
self.get_faceverse( | |
data_save_dir, | |
save_tracking_dir, | |
is_img # Different processing for images vs video | |
) | |
logging.info(f"FaceVerse fitting complete. Results in {save_tracking_dir}") | |
else: | |
logging.info(f"Using pre-computed FaceVerse fits from {save_tracking_dir}") | |
# --- Rendering and Final Output Stage --- | |
if not no_render_faceverse: | |
logging.info("Generating FaceVerse renders and camera parameters") | |
self.get_faceverse_labels_FFHQ( | |
save_tracking_dir, | |
save_dir, | |
is_img=is_img | |
) | |
logging.info("Rendering and label generation complete") | |
else: | |
logging.info("Skipping final rendering stage") | |
if is_video: | |
logging.info("Clean labels") | |
self.clean_labels(save_tracking_dir, os.path.join(data_save_dir, 'dataset_realcam.json'), | |
os.path.join(data_save_dir, 'dataset_realcam_clean.json')) | |
# def main(): | |
# os.makedirs(cfg.save_dir, exist_ok=True) | |
# process = Process(cfg) | |
# process.inference(cfg.input_dir, cfg.save_dir, is_video=cfg.is_video, is_img=cfg.is_img, | |
# no_extract_frames=cfg.no_extract_frames, no_extract_landmarks=cfg.no_extract_landmarks, | |
# no_align=cfg.no_align, | |
# no_fitting_faceverse=cfg.no_fitting_faceverse, no_render_faceverse=cfg.no_render_faceverse, | |
# already_align=cfg.already_align, no_pdfgc_motion=cfg.no_pdfgc_motion) | |
# | |
# | |
# if __name__ == "__main__": | |
# import torch.multiprocessing as mp | |
# | |
# mp.set_start_method('spawn', force=True) | |
# main() | |