# -*- coding: utf-8 -*- import pdb import cv2 import numpy as np import ffmpeg import os import os.path as osp def video_has_audio(video_file): try: ret = ffmpeg.probe(video_file, select_streams='a') return len(ret["streams"]) > 0 except ffmpeg.Error: return False def get_video_info(video_path): # 使用 ffmpeg.probe 获取视频信息 probe = ffmpeg.probe(video_path) video_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'video'] if not video_streams: raise ValueError("No video stream found") # 获取视频时长 duration = float(probe['format']['duration']) # 获取帧率 (r_frame_rate),通常是一个分数字符串,如 "30000/1001" fps_string = video_streams[0]['r_frame_rate'] numerator, denominator = map(int, fps_string.split('/')) fps = numerator / denominator return duration, fps def resize_to_limit(img: np.ndarray, max_dim=1280, division=2): """ ajust the size of the image so that the maximum dimension does not exceed max_dim, and the width and the height of the image are multiples of n. :param img: the image to be processed. :param max_dim: the maximum dimension constraint. :param n: the number that needs to be multiples of. :return: the adjusted image. """ h, w = img.shape[:2] # ajust the size of the image according to the maximum dimension if max_dim > 0 and max(h, w) > max_dim: if h > w: new_h = max_dim new_w = int(w * (max_dim / h)) else: new_w = max_dim new_h = int(h * (max_dim / w)) img = cv2.resize(img, (new_w, new_h)) # ensure that the image dimensions are multiples of n division = max(division, 1) new_h = img.shape[0] - (img.shape[0] % division) new_w = img.shape[1] - (img.shape[1] % division) if new_h == 0 or new_w == 0: # when the width or height is less than n, no need to process return img if new_h != img.shape[0] or new_w != img.shape[1]: img = img[:new_h, :new_w] return img def get_rotation_matrix(pitch_, yaw_, roll_): """ the input is in degree """ PI = np.pi # transform to radian pitch = pitch_ / 180 * PI yaw = yaw_ / 180 * PI roll = roll_ / 180 * PI if pitch.ndim == 1: pitch = np.expand_dims(pitch.cpu(), axis=1) if yaw.ndim == 1: yaw = np.expand_dims(yaw.cpu(), axis=1) if roll.ndim == 1: roll = np.expand_dims(roll.cpu(), axis=1) # calculate the euler matrix bs = pitch.shape[0] ones = np.ones([bs, 1]) zeros = np.zeros([bs, 1]) x, y, z = pitch, yaw, roll rot_x = np.concatenate([ ones, zeros, zeros, zeros, np.cos(x), -np.sin(x), zeros, np.sin(x), np.cos(x) ], axis=1).reshape([bs, 3, 3]) rot_y = np.concatenate([ np.cos(y), zeros, np.sin(y), zeros, ones, zeros, -np.sin(y), zeros, np.cos(y) ], axis=1).reshape([bs, 3, 3]) rot_z = np.concatenate([ np.cos(z), -np.sin(z), zeros, np.sin(z), np.cos(z), zeros, zeros, zeros, ones ], axis=1).reshape([bs, 3, 3]) rot = np.matmul(rot_z, np.matmul(rot_y, rot_x)) return np.transpose(rot, (0, 2, 1)) # transpose def calculate_distance_ratio(lmk: np.ndarray, idx1: int, idx2: int, idx3: int, idx4: int, eps: float = 1e-6) -> np.ndarray: return (np.linalg.norm(lmk[:, idx1] - lmk[:, idx2], axis=1, keepdims=True) / (np.linalg.norm(lmk[:, idx3] - lmk[:, idx4], axis=1, keepdims=True) + eps)) def calc_eye_close_ratio(lmk: np.ndarray, target_eye_ratio: np.ndarray = None) -> np.ndarray: lefteye_close_ratio = calculate_distance_ratio(lmk, 6, 18, 0, 12) righteye_close_ratio = calculate_distance_ratio(lmk, 30, 42, 24, 36) if target_eye_ratio is not None: return np.concatenate([lefteye_close_ratio, righteye_close_ratio, target_eye_ratio], axis=1) else: return np.concatenate([lefteye_close_ratio, righteye_close_ratio], axis=1) def calc_lip_close_ratio(lmk: np.ndarray) -> np.ndarray: return calculate_distance_ratio(lmk, 90, 102, 48, 66) def _transform_img(img, M, dsize, flags=cv2.INTER_LINEAR, borderMode=None): """ conduct similarity or affine transformation to the image, do not do border operation! img: M: 2x3 matrix or 3x3 matrix dsize: target shape (width, height) """ if isinstance(dsize, tuple) or isinstance(dsize, list): _dsize = tuple(dsize) else: _dsize = (dsize, dsize) if borderMode is not None: return cv2.warpAffine(img, M[:2, :], dsize=_dsize, flags=flags, borderMode=borderMode, borderValue=(0, 0, 0)) else: return cv2.warpAffine(img, M[:2, :], dsize=_dsize, flags=flags) def prepare_paste_back(mask_crop, crop_M_c2o, dsize): """prepare mask for later image paste back """ mask_ori = _transform_img(mask_crop, crop_M_c2o, dsize) mask_ori = mask_ori.astype(np.float32) / 255. return mask_ori def transform_keypoint(pitch, yaw, roll, t, exp, scale, kp): """ transform the implicit keypoints with the pose, shift, and expression deformation kp: BxNx3 """ bs = kp.shape[0] if kp.ndim == 2: num_kp = kp.shape[1] // 3 # Bx(num_kpx3) else: num_kp = kp.shape[1] # Bxnum_kpx3 rot_mat = get_rotation_matrix(pitch, yaw, roll) # (bs, 3, 3) # Eqn.2: s * (R * x_c,s + exp) + t kp_transformed = kp.reshape(bs, num_kp, 3) @ rot_mat + exp.reshape(bs, num_kp, 3) kp_transformed *= scale[..., None] # (bs, k, 3) * (bs, 1, 1) = (bs, k, 3) kp_transformed[:, :, 0:2] += t[:, None, 0:2] # remove z, only apply tx ty return kp_transformed def concat_feat(x, y): bs = x.shape[0] return np.concatenate([x.reshape(bs, -1), y.reshape(bs, -1)], axis=1) def is_image(file_path): image_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff') return file_path.lower().endswith(image_extensions) def is_video(file_path): if file_path.lower().endswith((".mp4", ".mov", ".avi", ".webm")) or os.path.isdir(file_path): return True return False def make_abs_path(fn): return osp.join(os.path.dirname(osp.dirname(osp.realpath(__file__))), fn) class LowPassFilter: def __init__(self): self.prev_raw_value = None self.prev_filtered_value = None def process(self, value, alpha): if self.prev_raw_value is None: s = value else: s = alpha * value + (1.0 - alpha) * self.prev_filtered_value self.prev_raw_value = value self.prev_filtered_value = s return s class OneEuroFilter: def __init__(self, mincutoff=1.0, beta=0.0, dcutoff=1.0, freq=30): self.freq = freq self.mincutoff = mincutoff self.beta = beta self.dcutoff = dcutoff self.x_filter = LowPassFilter() self.dx_filter = LowPassFilter() def compute_alpha(self, cutoff): te = 1.0 / self.freq tau = 1.0 / (2 * np.pi * cutoff) return 1.0 / (1.0 + tau / te) def get_pre_x(self): return self.x_filter.prev_filtered_value def process(self, x): prev_x = self.x_filter.prev_raw_value dx = 0.0 if prev_x is None else (x - prev_x) * self.freq edx = self.dx_filter.process(dx, self.compute_alpha(self.dcutoff)) cutoff = self.mincutoff + self.beta * np.abs(edx) return self.x_filter.process(x, self.compute_alpha(cutoff))