ChaolongYang's picture
Upload 242 files
475d332 verified
# -*- coding: utf-8 -*-
import pdb
import cv2
import numpy as np
import ffmpeg
import os
import os.path as osp
def video_has_audio(video_file):
try:
ret = ffmpeg.probe(video_file, select_streams='a')
return len(ret["streams"]) > 0
except ffmpeg.Error:
return False
def get_video_info(video_path):
# 使用 ffmpeg.probe 获取视频信息
probe = ffmpeg.probe(video_path)
video_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'video']
if not video_streams:
raise ValueError("No video stream found")
# 获取视频时长
duration = float(probe['format']['duration'])
# 获取帧率 (r_frame_rate),通常是一个分数字符串,如 "30000/1001"
fps_string = video_streams[0]['r_frame_rate']
numerator, denominator = map(int, fps_string.split('/'))
fps = numerator / denominator
return duration, fps
def resize_to_limit(img: np.ndarray, max_dim=1280, division=2):
"""
ajust the size of the image so that the maximum dimension does not exceed max_dim, and the width and the height of the image are multiples of n.
:param img: the image to be processed.
:param max_dim: the maximum dimension constraint.
:param n: the number that needs to be multiples of.
:return: the adjusted image.
"""
h, w = img.shape[:2]
# ajust the size of the image according to the maximum dimension
if max_dim > 0 and max(h, w) > max_dim:
if h > w:
new_h = max_dim
new_w = int(w * (max_dim / h))
else:
new_w = max_dim
new_h = int(h * (max_dim / w))
img = cv2.resize(img, (new_w, new_h))
# ensure that the image dimensions are multiples of n
division = max(division, 1)
new_h = img.shape[0] - (img.shape[0] % division)
new_w = img.shape[1] - (img.shape[1] % division)
if new_h == 0 or new_w == 0:
# when the width or height is less than n, no need to process
return img
if new_h != img.shape[0] or new_w != img.shape[1]:
img = img[:new_h, :new_w]
return img
def get_rotation_matrix(pitch_, yaw_, roll_):
""" the input is in degree
"""
PI = np.pi
# transform to radian
pitch = pitch_ / 180 * PI
yaw = yaw_ / 180 * PI
roll = roll_ / 180 * PI
if pitch.ndim == 1:
pitch = np.expand_dims(pitch.cpu(), axis=1)
if yaw.ndim == 1:
yaw = np.expand_dims(yaw.cpu(), axis=1)
if roll.ndim == 1:
roll = np.expand_dims(roll.cpu(), axis=1)
# calculate the euler matrix
bs = pitch.shape[0]
ones = np.ones([bs, 1])
zeros = np.zeros([bs, 1])
x, y, z = pitch, yaw, roll
rot_x = np.concatenate([
ones, zeros, zeros,
zeros, np.cos(x), -np.sin(x),
zeros, np.sin(x), np.cos(x)
], axis=1).reshape([bs, 3, 3])
rot_y = np.concatenate([
np.cos(y), zeros, np.sin(y),
zeros, ones, zeros,
-np.sin(y), zeros, np.cos(y)
], axis=1).reshape([bs, 3, 3])
rot_z = np.concatenate([
np.cos(z), -np.sin(z), zeros,
np.sin(z), np.cos(z), zeros,
zeros, zeros, ones
], axis=1).reshape([bs, 3, 3])
rot = np.matmul(rot_z, np.matmul(rot_y, rot_x))
return np.transpose(rot, (0, 2, 1)) # transpose
def calculate_distance_ratio(lmk: np.ndarray, idx1: int, idx2: int, idx3: int, idx4: int,
eps: float = 1e-6) -> np.ndarray:
return (np.linalg.norm(lmk[:, idx1] - lmk[:, idx2], axis=1, keepdims=True) /
(np.linalg.norm(lmk[:, idx3] - lmk[:, idx4], axis=1, keepdims=True) + eps))
def calc_eye_close_ratio(lmk: np.ndarray, target_eye_ratio: np.ndarray = None) -> np.ndarray:
lefteye_close_ratio = calculate_distance_ratio(lmk, 6, 18, 0, 12)
righteye_close_ratio = calculate_distance_ratio(lmk, 30, 42, 24, 36)
if target_eye_ratio is not None:
return np.concatenate([lefteye_close_ratio, righteye_close_ratio, target_eye_ratio], axis=1)
else:
return np.concatenate([lefteye_close_ratio, righteye_close_ratio], axis=1)
def calc_lip_close_ratio(lmk: np.ndarray) -> np.ndarray:
return calculate_distance_ratio(lmk, 90, 102, 48, 66)
def _transform_img(img, M, dsize, flags=cv2.INTER_LINEAR, borderMode=None):
""" conduct similarity or affine transformation to the image, do not do border operation!
img:
M: 2x3 matrix or 3x3 matrix
dsize: target shape (width, height)
"""
if isinstance(dsize, tuple) or isinstance(dsize, list):
_dsize = tuple(dsize)
else:
_dsize = (dsize, dsize)
if borderMode is not None:
return cv2.warpAffine(img, M[:2, :], dsize=_dsize, flags=flags, borderMode=borderMode, borderValue=(0, 0, 0))
else:
return cv2.warpAffine(img, M[:2, :], dsize=_dsize, flags=flags)
def prepare_paste_back(mask_crop, crop_M_c2o, dsize):
"""prepare mask for later image paste back
"""
mask_ori = _transform_img(mask_crop, crop_M_c2o, dsize)
mask_ori = mask_ori.astype(np.float32) / 255.
return mask_ori
def transform_keypoint(pitch, yaw, roll, t, exp, scale, kp):
"""
transform the implicit keypoints with the pose, shift, and expression deformation
kp: BxNx3
"""
bs = kp.shape[0]
if kp.ndim == 2:
num_kp = kp.shape[1] // 3 # Bx(num_kpx3)
else:
num_kp = kp.shape[1] # Bxnum_kpx3
rot_mat = get_rotation_matrix(pitch, yaw, roll) # (bs, 3, 3)
# Eqn.2: s * (R * x_c,s + exp) + t
kp_transformed = kp.reshape(bs, num_kp, 3) @ rot_mat + exp.reshape(bs, num_kp, 3)
kp_transformed *= scale[..., None] # (bs, k, 3) * (bs, 1, 1) = (bs, k, 3)
kp_transformed[:, :, 0:2] += t[:, None, 0:2] # remove z, only apply tx ty
return kp_transformed
def concat_feat(x, y):
bs = x.shape[0]
return np.concatenate([x.reshape(bs, -1), y.reshape(bs, -1)], axis=1)
def is_image(file_path):
image_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff')
return file_path.lower().endswith(image_extensions)
def is_video(file_path):
if file_path.lower().endswith((".mp4", ".mov", ".avi", ".webm")) or os.path.isdir(file_path):
return True
return False
def make_abs_path(fn):
return osp.join(os.path.dirname(osp.dirname(osp.realpath(__file__))), fn)
class LowPassFilter:
def __init__(self):
self.prev_raw_value = None
self.prev_filtered_value = None
def process(self, value, alpha):
if self.prev_raw_value is None:
s = value
else:
s = alpha * value + (1.0 - alpha) * self.prev_filtered_value
self.prev_raw_value = value
self.prev_filtered_value = s
return s
class OneEuroFilter:
def __init__(self, mincutoff=1.0, beta=0.0, dcutoff=1.0, freq=30):
self.freq = freq
self.mincutoff = mincutoff
self.beta = beta
self.dcutoff = dcutoff
self.x_filter = LowPassFilter()
self.dx_filter = LowPassFilter()
def compute_alpha(self, cutoff):
te = 1.0 / self.freq
tau = 1.0 / (2 * np.pi * cutoff)
return 1.0 / (1.0 + tau / te)
def get_pre_x(self):
return self.x_filter.prev_filtered_value
def process(self, x):
prev_x = self.x_filter.prev_raw_value
dx = 0.0 if prev_x is None else (x - prev_x) * self.freq
edx = self.dx_filter.process(dx, self.compute_alpha(self.dcutoff))
cutoff = self.mincutoff + self.beta * np.abs(edx)
return self.x_filter.process(x, self.compute_alpha(cutoff))