Spaces:
Running
on
Zero
Running
on
Zero
# -*- coding: utf-8 -*- | |
import pdb | |
import cv2 | |
import numpy as np | |
import ffmpeg | |
import os | |
import os.path as osp | |
def video_has_audio(video_file): | |
try: | |
ret = ffmpeg.probe(video_file, select_streams='a') | |
return len(ret["streams"]) > 0 | |
except ffmpeg.Error: | |
return False | |
def get_video_info(video_path): | |
# 使用 ffmpeg.probe 获取视频信息 | |
probe = ffmpeg.probe(video_path) | |
video_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'video'] | |
if not video_streams: | |
raise ValueError("No video stream found") | |
# 获取视频时长 | |
duration = float(probe['format']['duration']) | |
# 获取帧率 (r_frame_rate),通常是一个分数字符串,如 "30000/1001" | |
fps_string = video_streams[0]['r_frame_rate'] | |
numerator, denominator = map(int, fps_string.split('/')) | |
fps = numerator / denominator | |
return duration, fps | |
def resize_to_limit(img: np.ndarray, max_dim=1280, division=2): | |
""" | |
ajust the size of the image so that the maximum dimension does not exceed max_dim, and the width and the height of the image are multiples of n. | |
:param img: the image to be processed. | |
:param max_dim: the maximum dimension constraint. | |
:param n: the number that needs to be multiples of. | |
:return: the adjusted image. | |
""" | |
h, w = img.shape[:2] | |
# ajust the size of the image according to the maximum dimension | |
if max_dim > 0 and max(h, w) > max_dim: | |
if h > w: | |
new_h = max_dim | |
new_w = int(w * (max_dim / h)) | |
else: | |
new_w = max_dim | |
new_h = int(h * (max_dim / w)) | |
img = cv2.resize(img, (new_w, new_h)) | |
# ensure that the image dimensions are multiples of n | |
division = max(division, 1) | |
new_h = img.shape[0] - (img.shape[0] % division) | |
new_w = img.shape[1] - (img.shape[1] % division) | |
if new_h == 0 or new_w == 0: | |
# when the width or height is less than n, no need to process | |
return img | |
if new_h != img.shape[0] or new_w != img.shape[1]: | |
img = img[:new_h, :new_w] | |
return img | |
def get_rotation_matrix(pitch_, yaw_, roll_): | |
""" the input is in degree | |
""" | |
PI = np.pi | |
# transform to radian | |
pitch = pitch_ / 180 * PI | |
yaw = yaw_ / 180 * PI | |
roll = roll_ / 180 * PI | |
if pitch.ndim == 1: | |
pitch = np.expand_dims(pitch.cpu(), axis=1) | |
if yaw.ndim == 1: | |
yaw = np.expand_dims(yaw.cpu(), axis=1) | |
if roll.ndim == 1: | |
roll = np.expand_dims(roll.cpu(), axis=1) | |
# calculate the euler matrix | |
bs = pitch.shape[0] | |
ones = np.ones([bs, 1]) | |
zeros = np.zeros([bs, 1]) | |
x, y, z = pitch, yaw, roll | |
rot_x = np.concatenate([ | |
ones, zeros, zeros, | |
zeros, np.cos(x), -np.sin(x), | |
zeros, np.sin(x), np.cos(x) | |
], axis=1).reshape([bs, 3, 3]) | |
rot_y = np.concatenate([ | |
np.cos(y), zeros, np.sin(y), | |
zeros, ones, zeros, | |
-np.sin(y), zeros, np.cos(y) | |
], axis=1).reshape([bs, 3, 3]) | |
rot_z = np.concatenate([ | |
np.cos(z), -np.sin(z), zeros, | |
np.sin(z), np.cos(z), zeros, | |
zeros, zeros, ones | |
], axis=1).reshape([bs, 3, 3]) | |
rot = np.matmul(rot_z, np.matmul(rot_y, rot_x)) | |
return np.transpose(rot, (0, 2, 1)) # transpose | |
def calculate_distance_ratio(lmk: np.ndarray, idx1: int, idx2: int, idx3: int, idx4: int, | |
eps: float = 1e-6) -> np.ndarray: | |
return (np.linalg.norm(lmk[:, idx1] - lmk[:, idx2], axis=1, keepdims=True) / | |
(np.linalg.norm(lmk[:, idx3] - lmk[:, idx4], axis=1, keepdims=True) + eps)) | |
def calc_eye_close_ratio(lmk: np.ndarray, target_eye_ratio: np.ndarray = None) -> np.ndarray: | |
lefteye_close_ratio = calculate_distance_ratio(lmk, 6, 18, 0, 12) | |
righteye_close_ratio = calculate_distance_ratio(lmk, 30, 42, 24, 36) | |
if target_eye_ratio is not None: | |
return np.concatenate([lefteye_close_ratio, righteye_close_ratio, target_eye_ratio], axis=1) | |
else: | |
return np.concatenate([lefteye_close_ratio, righteye_close_ratio], axis=1) | |
def calc_lip_close_ratio(lmk: np.ndarray) -> np.ndarray: | |
return calculate_distance_ratio(lmk, 90, 102, 48, 66) | |
def _transform_img(img, M, dsize, flags=cv2.INTER_LINEAR, borderMode=None): | |
""" conduct similarity or affine transformation to the image, do not do border operation! | |
img: | |
M: 2x3 matrix or 3x3 matrix | |
dsize: target shape (width, height) | |
""" | |
if isinstance(dsize, tuple) or isinstance(dsize, list): | |
_dsize = tuple(dsize) | |
else: | |
_dsize = (dsize, dsize) | |
if borderMode is not None: | |
return cv2.warpAffine(img, M[:2, :], dsize=_dsize, flags=flags, borderMode=borderMode, borderValue=(0, 0, 0)) | |
else: | |
return cv2.warpAffine(img, M[:2, :], dsize=_dsize, flags=flags) | |
def prepare_paste_back(mask_crop, crop_M_c2o, dsize): | |
"""prepare mask for later image paste back | |
""" | |
mask_ori = _transform_img(mask_crop, crop_M_c2o, dsize) | |
mask_ori = mask_ori.astype(np.float32) / 255. | |
return mask_ori | |
def transform_keypoint(pitch, yaw, roll, t, exp, scale, kp): | |
""" | |
transform the implicit keypoints with the pose, shift, and expression deformation | |
kp: BxNx3 | |
""" | |
bs = kp.shape[0] | |
if kp.ndim == 2: | |
num_kp = kp.shape[1] // 3 # Bx(num_kpx3) | |
else: | |
num_kp = kp.shape[1] # Bxnum_kpx3 | |
rot_mat = get_rotation_matrix(pitch, yaw, roll) # (bs, 3, 3) | |
# Eqn.2: s * (R * x_c,s + exp) + t | |
kp_transformed = kp.reshape(bs, num_kp, 3) @ rot_mat + exp.reshape(bs, num_kp, 3) | |
kp_transformed *= scale[..., None] # (bs, k, 3) * (bs, 1, 1) = (bs, k, 3) | |
kp_transformed[:, :, 0:2] += t[:, None, 0:2] # remove z, only apply tx ty | |
return kp_transformed | |
def concat_feat(x, y): | |
bs = x.shape[0] | |
return np.concatenate([x.reshape(bs, -1), y.reshape(bs, -1)], axis=1) | |
def is_image(file_path): | |
image_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff') | |
return file_path.lower().endswith(image_extensions) | |
def is_video(file_path): | |
if file_path.lower().endswith((".mp4", ".mov", ".avi", ".webm")) or os.path.isdir(file_path): | |
return True | |
return False | |
def make_abs_path(fn): | |
return osp.join(os.path.dirname(osp.dirname(osp.realpath(__file__))), fn) | |
class LowPassFilter: | |
def __init__(self): | |
self.prev_raw_value = None | |
self.prev_filtered_value = None | |
def process(self, value, alpha): | |
if self.prev_raw_value is None: | |
s = value | |
else: | |
s = alpha * value + (1.0 - alpha) * self.prev_filtered_value | |
self.prev_raw_value = value | |
self.prev_filtered_value = s | |
return s | |
class OneEuroFilter: | |
def __init__(self, mincutoff=1.0, beta=0.0, dcutoff=1.0, freq=30): | |
self.freq = freq | |
self.mincutoff = mincutoff | |
self.beta = beta | |
self.dcutoff = dcutoff | |
self.x_filter = LowPassFilter() | |
self.dx_filter = LowPassFilter() | |
def compute_alpha(self, cutoff): | |
te = 1.0 / self.freq | |
tau = 1.0 / (2 * np.pi * cutoff) | |
return 1.0 / (1.0 + tau / te) | |
def get_pre_x(self): | |
return self.x_filter.prev_filtered_value | |
def process(self, x): | |
prev_x = self.x_filter.prev_raw_value | |
dx = 0.0 if prev_x is None else (x - prev_x) * self.freq | |
edx = self.dx_filter.process(dx, self.compute_alpha(self.dcutoff)) | |
cutoff = self.mincutoff + self.beta * np.abs(edx) | |
return self.x_filter.process(x, self.compute_alpha(cutoff)) | |