|
import sys |
|
|
|
sys.path.append('..') |
|
|
|
import os.path |
|
import options |
|
|
|
import cv2 |
|
import dlib |
|
import numpy as np |
|
import options as opt |
|
import matplotlib.pyplot as plt |
|
|
|
from tqdm.auto import tqdm |
|
from multiprocessing import Pool |
|
|
|
predictor_path = '../pretrain/shape_predictor_68_face_landmarks.dat' |
|
predictor = dlib.shape_predictor(predictor_path) |
|
detector = dlib.get_frontal_face_detector() |
|
|
|
RUN_PARALLEL = True |
|
FORCE_RATIO = True |
|
BORDER = 10 |
|
|
|
base = os.path.abspath('..') |
|
image_dir = os.path.join(base, options.images_dir) |
|
anno_dir = os.path.join(base, options.alignments_dir) |
|
crop_dir = os.path.join(base, options.crop_images_dir) |
|
|
|
|
|
def get_mouth_marks(shape): |
|
marks = np.zeros((2, 20)) |
|
co = 0 |
|
|
|
|
|
for ii in range(48, 68): |
|
""" |
|
This for loop is going over all mouth-related features. |
|
X and Y coordinates are extracted and stored separately. |
|
""" |
|
X = shape.part(ii) |
|
A = (X.x, X.y) |
|
marks[0, co] = X.x |
|
marks[1, co] = X.y |
|
co += 1 |
|
|
|
|
|
X_left, Y_left, X_right, Y_right = [ |
|
int(np.amin(marks, axis=1)[0]), |
|
int(np.amin(marks, axis=1)[1]), |
|
int(np.amax(marks, axis=1)[0]), |
|
int(np.amax(marks, axis=1)[1]) |
|
] |
|
|
|
return X_left, Y_left, X_right, Y_right |
|
|
|
|
|
translate_pairs = [] |
|
|
|
for speaker_no in range(1, 35): |
|
speaker_name = f's{speaker_no}' |
|
speaker_image_dir = os.path.join(image_dir, speaker_name) |
|
speaker_crop_dir = os.path.join(crop_dir, speaker_name) |
|
speaker_anno_dir = os.path.join(anno_dir, speaker_name) |
|
|
|
if not os.path.exists(speaker_image_dir): |
|
continue |
|
if not os.path.exists(speaker_crop_dir): |
|
os.mkdir(speaker_crop_dir) |
|
|
|
sentence_dirs = os.listdir(speaker_image_dir) |
|
|
|
for sentence in sentence_dirs: |
|
anno_filepath = os.path.join(speaker_anno_dir, f'{sentence}.align') |
|
if not os.path.exists(anno_filepath): |
|
continue |
|
|
|
translate_pairs.append((speaker_no, sentence)) |
|
|
|
|
|
print('PAIRS', len(translate_pairs)) |
|
|
|
|
|
def extract_mouth_image(speaker_no, sentence): |
|
speaker_name = f's{speaker_no}' |
|
speaker_image_dir = os.path.join(image_dir, speaker_name) |
|
speaker_crop_dir = os.path.join(crop_dir, speaker_name) |
|
|
|
img_sentence_dir = os.path.join(speaker_image_dir, sentence) |
|
crop_sentence_dir = os.path.join(speaker_crop_dir, sentence) |
|
filenames = os.listdir(img_sentence_dir) |
|
|
|
if not os.path.exists(crop_sentence_dir): |
|
os.mkdir(crop_sentence_dir) |
|
|
|
for filename in filenames: |
|
img_filepath = os.path.join(img_sentence_dir, filename) |
|
if not img_filepath.endswith('.jpg'): |
|
continue |
|
|
|
crop_filepath = os.path.join(crop_sentence_dir, filename) |
|
image = cv2.imread(img_filepath) |
|
|
|
detection_bbox = detector(image, 1)[0] |
|
|
|
width, height, depth = image.shape |
|
shape = predictor(image, detection_bbox) |
|
X_left, Y_left, X_right, Y_right = get_mouth_marks(shape) |
|
|
|
|
|
X_center = (X_left + X_right) / 2.0 |
|
Y_center = (Y_left + Y_right) / 2.0 |
|
|
|
|
|
X_left_new = X_left - BORDER |
|
Y_left_new = Y_left - BORDER |
|
X_right_new = X_right + BORDER |
|
Y_right_new = Y_right + BORDER |
|
|
|
|
|
|
|
width_new = X_right_new - X_left_new |
|
height_new = Y_right_new - Y_left_new |
|
width_current = X_right - X_left |
|
height_current = Y_right - Y_left |
|
|
|
height_crop_max = height_new |
|
width_crop_max = width_new |
|
|
|
if width_crop_max % 2 == 1: |
|
width_crop_max += 1 |
|
if height_crop_max % 2 == 1: |
|
height_crop_max += 1 |
|
|
|
if FORCE_RATIO: |
|
if width_crop_max < height_crop_max * 2: |
|
width_crop_max = height_crop_max * 2 |
|
else: |
|
height_crop_max = width_crop_max // 2 |
|
|
|
|
|
X_left_crop = int(X_center - width_crop_max / 2.0) |
|
X_right_crop = int(X_center + width_crop_max / 2.0) |
|
Y_left_crop = int(Y_center - height_crop_max / 2.0) |
|
Y_right_crop = int(Y_center + height_crop_max / 2.0) |
|
X_left_crop = max(X_left_crop, 0) |
|
Y_left_crop = max(Y_left_crop, 0) |
|
|
|
mouth = image[ |
|
Y_left_crop:Y_right_crop, X_left_crop:X_right_crop, : |
|
] |
|
|
|
if FORCE_RATIO: |
|
height, width, _ = mouth.shape |
|
|
|
if width != height * 2: |
|
mouth = cv2.resize( |
|
mouth, dsize=(height * 2, height), |
|
interpolation=cv2.INTER_CUBIC |
|
) |
|
|
|
|
|
|
|
|
|
|
|
cv2.imwrite(crop_filepath, mouth) |
|
|
|
return speaker_no, sentence |
|
|
|
|
|
if RUN_PARALLEL: |
|
def kwargify(**kwargs): return kwargs |
|
|
|
|
|
pbar = tqdm(translate_pairs) |
|
pool = Pool(processes=12) |
|
jobs = [] |
|
|
|
|
|
def callback(resp): |
|
pbar.desc = str(resp) |
|
pbar.update(1) |
|
|
|
|
|
for translate_pair in translate_pairs: |
|
speaker_no, sentence = translate_pair |
|
|
|
job_kwargs = kwargify( |
|
speaker_no=speaker_no, sentence=sentence |
|
) |
|
job = pool.apply_async( |
|
extract_mouth_image, kwds=job_kwargs, |
|
callback=callback |
|
) |
|
|
|
jobs.append(job) |
|
|
|
|
|
for job in jobs: |
|
job.wait() |
|
|
|
pool.close() |
|
pool.join() |
|
else: |
|
for translate_pair in tqdm(translate_pairs): |
|
extract_mouth_image(*translate_pair) |