Spaces:
Paused
Paused
| import os | |
| import numpy as np | |
| import cv2 | |
| from moviepy.editor import VideoFileClip | |
| from .face_det import FaceAnalysis | |
| from .super_resolution import BSRGAN | |
| from dofaker.face_swap import get_swapper_model | |
| from dofaker.face_enhance import GFPGAN | |
| class FaceSwapper: | |
| def __init__(self, | |
| face_det_model='buffalo_l', | |
| face_swap_model='inswapper', | |
| image_sr_model='bsrgan', | |
| face_enhance_model='gfpgan', | |
| face_det_model_dir='weights/models', | |
| face_swap_model_dir='weights/models', | |
| image_sr_model_dir='weights/models', | |
| face_enhance_model_dir='weights/models', | |
| face_sim_thre=0.5, | |
| log_iters=10, | |
| use_enhancer=True, | |
| use_sr=True, | |
| scale=1): | |
| self.face_sim_thre = face_sim_thre | |
| self.log_iters = log_iters | |
| self.det_model = FaceAnalysis(name=face_det_model, | |
| root=face_det_model_dir) | |
| self.det_model.prepare(ctx_id=1, det_size=(640, 640)) | |
| self.swapper_model = get_swapper_model(name=face_swap_model, | |
| root=face_swap_model_dir) | |
| if use_enhancer: | |
| self.face_enhance = GFPGAN(name=face_enhance_model, | |
| root=face_enhance_model_dir) | |
| else: | |
| self.face_enhance = None | |
| if use_sr: | |
| self.sr = BSRGAN(name=image_sr_model, | |
| root=image_sr_model_dir, | |
| scale=scale) | |
| self.scale = scale | |
| else: | |
| self.sr = None | |
| self.scale = scale | |
| def run(self, | |
| input_path: str, | |
| dst_face_paths, | |
| src_face_paths, | |
| output_dir='output'): | |
| if isinstance(dst_face_paths, str): | |
| dst_face_paths = [dst_face_paths] | |
| if isinstance(src_face_paths, str): | |
| src_face_paths = [src_face_paths] | |
| if input_path.lower().endswith(('jpg', 'jpeg', 'webp', 'png', 'bmp')): | |
| return self.swap_image(input_path, dst_face_paths, src_face_paths, | |
| output_dir) | |
| else: | |
| return self.swap_video(input_path, dst_face_paths, src_face_paths, | |
| output_dir) | |
| def swap_video(self, | |
| input_video_path, | |
| dst_face_paths, | |
| src_face_paths, | |
| output_dir='output'): | |
| assert os.path.exists( | |
| input_video_path), 'The input video path {} not exist.' | |
| os.makedirs(output_dir, exist_ok=True) | |
| src_faces = self.get_faces(src_face_paths) | |
| if dst_face_paths is not None: | |
| dst_faces = self.get_faces(dst_face_paths) | |
| dst_face_embeddings = self.get_faces_embeddings(dst_faces) | |
| assert len(dst_faces) == len( | |
| src_faces | |
| ), 'The detected faces in source images not equal target image faces.' | |
| video = cv2.VideoCapture(input_video_path) | |
| fps = video.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| frame_size = (width, height) | |
| print('video fps: {}, total_frames: {}, width: {}, height: {}'.format( | |
| fps, total_frames, width, height)) | |
| video_name = os.path.basename(input_video_path).split('.')[0] | |
| four_cc = cv2.VideoWriter_fourcc('m', 'p', '4', 'v') | |
| temp_video_path = os.path.join(output_dir, | |
| 'temp_{}.mp4'.format(video_name)) | |
| save_video_path = os.path.join(output_dir, '{}.mp4'.format(video_name)) | |
| output_video = cv2.VideoWriter( | |
| temp_video_path, four_cc, fps, | |
| (int(frame_size[0] * self.scale), int(frame_size[1] * self.scale))) | |
| i = 0 | |
| while video.isOpened(): | |
| ret, frame = video.read() | |
| if ret: | |
| if dst_face_paths is not None: | |
| swapped_image = self.swap_faces(frame, | |
| dst_face_embeddings, | |
| src_faces=src_faces) | |
| else: | |
| swapped_image = self.swap_all_faces(frame, | |
| src_faces=src_faces) | |
| i += 1 | |
| if i % self.log_iters == 0: | |
| print('processing {}/{}'.format(i, total_frames)) | |
| output_video.write(swapped_image) | |
| else: | |
| break | |
| video.release() | |
| output_video.release() | |
| self.add_audio_to_video(input_video_path, temp_video_path, | |
| save_video_path) | |
| os.remove(temp_video_path) | |
| return save_video_path | |
| def swap_image(self, | |
| image_path, | |
| dst_face_paths, | |
| src_face_paths, | |
| output_dir='output'): | |
| os.makedirs(output_dir, exist_ok=True) | |
| src_faces = self.get_faces(src_face_paths) | |
| if dst_face_paths is not None: | |
| dst_faces = self.get_faces(dst_face_paths) | |
| dst_face_embeddings = self.get_faces_embeddings(dst_faces) | |
| assert len(dst_faces) == len( | |
| src_faces | |
| ), 'The detected faces in source images not equal target image faces.' | |
| image = cv2.imread(image_path) | |
| if dst_face_paths is not None: | |
| swapped_image = self.swap_faces(image, | |
| dst_face_embeddings, | |
| src_faces=src_faces) | |
| else: | |
| swapped_image = self.swap_all_faces(image, src_faces=src_faces) | |
| base_name = os.path.basename(image_path) | |
| save_path = os.path.join(output_dir, base_name) | |
| cv2.imwrite(save_path, swapped_image) | |
| return save_path | |
| def add_audio_to_video(self, src_video_path, target_video_path, | |
| save_video_path): | |
| audio = VideoFileClip(src_video_path).audio | |
| target_video = VideoFileClip(target_video_path) | |
| target_video = target_video.set_audio(audio) | |
| target_video.write_videofile(save_video_path) | |
| return target_video_path | |
| def get_faces(self, image_paths): | |
| if isinstance(image_paths, str): | |
| image_paths = [image_paths] | |
| faces = [] | |
| for image_path in image_paths: | |
| image = cv2.imread(image_path) | |
| assert image is not None, "the source image is None, please check your image {} format.".format( | |
| image_path) | |
| img_faces = self.det_model.get(image, max_num=1) | |
| assert len( | |
| img_faces | |
| ) == 1, 'The detected face in image {} must be 1, but got {}, please ensure your image including one face.'.format( | |
| image_path, len(img_faces)) | |
| faces += img_faces | |
| return faces | |
| def swap_faces(self, image, dst_face_embeddings: np.ndarray, | |
| src_faces: list) -> np.ndarray: | |
| res = image.copy() | |
| image_faces = self.det_model.get(image) | |
| if len(image_faces) == 0: | |
| return res | |
| image_face_embeddings = self.get_faces_embeddings(image_faces) | |
| sim = np.dot(dst_face_embeddings, image_face_embeddings.T) | |
| for i in range(dst_face_embeddings.shape[0]): | |
| index = np.where(sim[i] > self.face_sim_thre)[0].tolist() | |
| for idx in index: | |
| res = self.swapper_model.get(res, | |
| image_faces[idx], | |
| src_faces[i], | |
| paste_back=True) | |
| if self.face_enhance is not None: | |
| res = self.face_enhance.get(res, | |
| image_faces[idx], | |
| paste_back=True) | |
| if self.sr is not None: | |
| res = self.sr.get(res, image_format='bgr') | |
| return res | |
| def swap_all_faces(self, image, src_faces: list) -> np.ndarray: | |
| assert len( | |
| src_faces | |
| ) == 1, 'If replace all faces in source, the number of src face should be 1, but got {}.'.format( | |
| len(src_faces)) | |
| res = image.copy() | |
| image_faces = self.det_model.get(image) | |
| if len(image_faces) == 0: | |
| return res | |
| for image_face in image_faces: | |
| res = self.swapper_model.get(res, | |
| image_face, | |
| src_faces[0], | |
| paste_back=True) | |
| if self.face_enhance is not None: | |
| res = self.face_enhance.get(res, image_face, paste_back=True) | |
| if self.sr is not None: | |
| res = self.sr.get(res, image_format='bgr') | |
| return res | |
| def get_faces_embeddings(self, faces): | |
| feats = [] | |
| for face in faces: | |
| feats.append(face.normed_embedding) | |
| if len(feats) == 1: | |
| feats = np.array(feats, dtype=np.float32).reshape(1, -1) | |
| else: | |
| feats = np.array(feats, dtype=np.float32) | |
| return feats | |