|
""" |
|
This script demonstrates how to generate a video using the CogVideoX model with the Hugging Face `diffusers` pipeline. |
|
The script supports different types of video generation, including text-to-video (t2v), image-to-video (i2v), |
|
and video-to-video (v2v), depending on the input data and different weight. |
|
|
|
- text-to-video: THUDM/CogVideoX-5b, THUDM/CogVideoX-2b or THUDM/CogVideoX1.5-5b |
|
- video-to-video: THUDM/CogVideoX-5b, THUDM/CogVideoX-2b or THUDM/CogVideoX1.5-5b |
|
- image-to-video: THUDM/CogVideoX-5b-I2V or THUDM/CogVideoX1.5-5b-I2V |
|
|
|
Running the Script: |
|
To run the script, use the following command with appropriate arguments: |
|
|
|
```bash |
|
$ python cli_demo.py --prompt "A girl riding a bike." --model_path THUDM/CogVideoX1.5-5b --generate_type "t2v" |
|
``` |
|
|
|
You can change `pipe.enable_sequential_cpu_offload()` to `pipe.enable_model_cpu_offload()` to speed up inference, but this will use more GPU memory |
|
|
|
Additional options are available to specify the model path, guidance scale, number of inference steps, video generation type, and output paths. |
|
|
|
""" |
|
from typing import TYPE_CHECKING, Any, Dict, List, Tuple |
|
import argparse |
|
import logging |
|
import os |
|
import sys |
|
from typing import Literal, Optional |
|
from pathlib import Path |
|
import json |
|
from datetime import timedelta |
|
import random |
|
from safetensors.torch import load_file, save_file |
|
from tqdm import tqdm |
|
from einops import rearrange, repeat |
|
import math |
|
import numpy as np |
|
from PIL import Image |
|
|
|
import torch |
|
import types |
|
from diffusers.pipelines.cogvideo.pipeline_cogvideox_image2video import CogVideoXImageToVideoPipeline |
|
|
|
from diffusers import ( |
|
CogVideoXDPMScheduler, |
|
CogVideoXImageToVideoPipeline, |
|
CogVideoXPipeline, |
|
CogVideoXVideoToVideoPipeline, |
|
AutoencoderKLCogVideoX, |
|
CogVideoXTransformer3DModel, |
|
) |
|
from diffusers.utils import export_to_video, load_image, load_video |
|
from peft import LoraConfig, get_peft_model_state_dict, set_peft_model_state_dict |
|
|
|
sys.path.append(os.path.abspath(os.path.join(sys.path[0], "../"))) |
|
from finetune.pipeline.flovd_FVSM_cogvideox_controlnet_pipeline import FloVDCogVideoXControlnetImageToVideoPipeline |
|
from finetune.pipeline.flovd_OMSM_cogvideox_pipeline import FloVDOMSMCogVideoXImageToVideoPipeline |
|
from finetune.schemas import Components, Args |
|
from finetune.modules.cogvideox_controlnet import CogVideoXControlnet |
|
from finetune.modules.cogvideox_custom_model import CustomCogVideoXTransformer3DModel |
|
from transformers import AutoTokenizer, T5EncoderModel |
|
|
|
from finetune.modules.camera_sampler import SampleManualCam |
|
from finetune.modules.camera_flow_generator import CameraFlowGenerator |
|
from finetune.modules.utils import get_camera_flow_generator_input, forward_bilinear_splatting, flow_to_color |
|
from finetune.modules.depth_warping.depth_warping import unnormalize_intrinsic |
|
|
|
from finetune.datasets.utils import ( |
|
preprocess_image_with_resize, |
|
preprocess_video_with_resize, |
|
) |
|
|
|
|
|
from torch.utils.data import Dataset |
|
from torchvision import transforms |
|
|
|
|
|
import pdb |
|
sys.path.append(os.path.abspath(os.path.join(sys.path[-1], 'finetune'))) |
|
|
|
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
RESOLUTION_MAP = { |
|
|
|
"cogvideox1.5-5b-i2v": (768, 1360), |
|
"cogvideox1.5-5b": (768, 1360), |
|
|
|
"cogvideox-5b-i2v": (480, 720), |
|
"cogvideox-5b": (480, 720), |
|
"cogvideox-2b": (480, 720), |
|
} |
|
|
|
def load_cogvideox_flovd_OMSM_lora_pipeline(omsm_path, backbone_path, transformer_lora_config, device, dtype): |
|
tokenizer = AutoTokenizer.from_pretrained(backbone_path, subfolder="tokenizer") |
|
text_encoder = T5EncoderModel.from_pretrained(backbone_path, subfolder="text_encoder") |
|
transformer = CogVideoXTransformer3DModel.from_pretrained( |
|
backbone_path, subfolder="transformer", torch_dtype=dtype, device_map="auto" |
|
) |
|
vae = AutoencoderKLCogVideoX.from_pretrained( |
|
backbone_path, subfolder="vae", torch_dtype=dtype, device_map="auto" |
|
) |
|
scheduler = CogVideoXDPMScheduler.from_pretrained(backbone_path, subfolder="scheduler") |
|
|
|
|
|
transformer.add_adapter(transformer_lora_config) |
|
|
|
lora_state_dict = FloVDOMSMCogVideoXImageToVideoPipeline.lora_state_dict(omsm_path) |
|
transformer_state_dict = { |
|
f'{k.replace("transformer.", "")}': v |
|
for k, v in lora_state_dict.items() |
|
if k.startswith("transformer.") |
|
} |
|
incompatible_keys = set_peft_model_state_dict(transformer, transformer_state_dict, adapter_name="default") |
|
if incompatible_keys is not None: |
|
|
|
unexpected_keys = getattr(incompatible_keys, "unexpected_keys", None) |
|
if unexpected_keys: |
|
logger.warning( |
|
f"Loading adapter weights from state_dict led to unexpected keys not found in the model: " |
|
f" {unexpected_keys}. " |
|
) |
|
|
|
|
|
load_path = os.path.join(omsm_path, "selected_blocks.safetensors") |
|
if os.path.exists(load_path): |
|
tensor_dict = load_file(load_path) |
|
|
|
block_state_dicts = {} |
|
for k, v in tensor_dict.items(): |
|
block_name, param_name = k.split(".", 1) |
|
if block_name not in block_state_dicts: |
|
block_state_dicts[block_name] = {} |
|
block_state_dicts[block_name][param_name] = v |
|
|
|
for block_name, state_dict in block_state_dicts.items(): |
|
if hasattr(transformer, block_name): |
|
getattr(transformer, block_name).load_state_dict(state_dict) |
|
else: |
|
raise ValueError(f"Transformer has no attribute '{block_name}'") |
|
|
|
pipe = FloVDOMSMCogVideoXImageToVideoPipeline( |
|
tokenizer=tokenizer, |
|
text_encoder=text_encoder, |
|
vae=vae, |
|
transformer=transformer, |
|
scheduler=scheduler, |
|
) |
|
|
|
pipe.vae.enable_slicing() |
|
pipe.vae.enable_tiling() |
|
|
|
return pipe |
|
|
|
|
|
def load_cogvideox_flovd_FVSM_controlnet_pipeline(controlnet_path, backbone_path, device, dtype): |
|
controlnet_sd = torch.load(controlnet_path, map_location='cpu')['module'] |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(backbone_path, subfolder="tokenizer") |
|
text_encoder = T5EncoderModel.from_pretrained(backbone_path, subfolder="text_encoder") |
|
transformer = CustomCogVideoXTransformer3DModel.from_pretrained( |
|
backbone_path, subfolder="transformer", torch_dtype=dtype, device_map="auto" |
|
) |
|
vae = AutoencoderKLCogVideoX.from_pretrained( |
|
backbone_path, subfolder="vae", torch_dtype=dtype, device_map="auto" |
|
) |
|
scheduler = CogVideoXDPMScheduler.from_pretrained(backbone_path, subfolder="scheduler") |
|
|
|
additional_kwargs = { |
|
'num_layers': 6, |
|
'out_proj_dim_factor': 64, |
|
'out_proj_dim_zero_init': True, |
|
'notextinflow': True, |
|
} |
|
controlnet = CogVideoXControlnet.from_pretrained(backbone_path, subfolder="transformer", **additional_kwargs) |
|
controlnet.eval() |
|
|
|
missing, unexpected = controlnet.load_state_dict(controlnet_sd) |
|
|
|
if len(missing) != 0 or len(unexpected) != 0: |
|
print(f"Missing keys : {missing}") |
|
print(f"Unexpected keys : {unexpected}") |
|
|
|
pipe = FloVDCogVideoXControlnetImageToVideoPipeline( |
|
tokenizer=tokenizer, |
|
text_encoder=text_encoder, |
|
vae=vae, |
|
transformer=transformer, |
|
controlnet=controlnet, |
|
scheduler=scheduler, |
|
) |
|
|
|
pipe.vae.enable_slicing() |
|
pipe.vae.enable_tiling() |
|
|
|
return pipe |
|
|
|
|
|
def initialize_flow_generator(target, ckpt_path): |
|
depth_estimator_kwargs = { |
|
"target": target, |
|
"kwargs": { |
|
"ckpt_path": ckpt_path, |
|
"model_config": { |
|
"max_depth": 20, |
|
"encoder": 'vitb', |
|
"features": 128, |
|
"out_channels": [96, 192, 384, 768], |
|
} |
|
|
|
} |
|
} |
|
|
|
return CameraFlowGenerator(depth_estimator_kwargs) |
|
|
|
def integrate_flow(camera_flow, object_flow, depth_ctxt, camera_flow_generator, camera_flow_generator_input): |
|
|
|
|
|
|
|
|
|
B, F = camera_flow_generator_input["target"]["intrinsics"].shape[:2] |
|
H, W = object_flow.shape[-2:] |
|
|
|
c2w_ctxt = repeat(camera_flow_generator_input["context"]["extrinsics"], "b t h w -> (b v t) h w", v=F) |
|
c2w_trgt = rearrange(torch.inverse(camera_flow_generator_input["target"]["extrinsics"]), "b t h w -> (b t) h w") |
|
intrinsics_ctxt = unnormalize_intrinsic(repeat(camera_flow_generator_input["context"]["intrinsics"], "b t h w -> (b v t) h w", v=F), size=(H, W)) |
|
|
|
with torch.cuda.amp.autocast(enabled=False): |
|
warped_object_flow = camera_flow_generator.depth_warping_module.warper.forward_warp_displacement( |
|
depth1=repeat(depth_ctxt, "b c h w -> (b f) c h w", f=F), |
|
flow1=object_flow, |
|
transformation1=c2w_ctxt, |
|
transformation2=c2w_trgt, |
|
intrinsic1=intrinsics_ctxt, |
|
intrinsic2=None, |
|
) |
|
|
|
integrated_flow = camera_flow + warped_object_flow |
|
|
|
return integrated_flow |
|
|
|
def save_flow(flow, filename, fps=16): |
|
|
|
flow_RGB = flow_to_color(flow) |
|
|
|
frame_list = [] |
|
for frame in flow_RGB: |
|
frame = (frame.permute(1,2,0).float().detach().cpu().numpy()).astype(np.uint8).clip(0,255) |
|
frame_list.append(Image.fromarray(frame)) |
|
|
|
export_to_video(frame_list, filename, fps=fps) |
|
|
|
def save_flow_warped_video(image, flow, filename, fps=16): |
|
|
|
|
|
warped_video = forward_bilinear_splatting(repeat(image, 'c h w -> f c h w', f=flow.size(0)), flow.to(torch.float)) |
|
|
|
frame_list = [] |
|
for frame in warped_video: |
|
frame = (frame.permute(1,2,0).float().detach().cpu().numpy()).astype(np.uint8).clip(0,255) |
|
frame_list.append(Image.fromarray(frame)) |
|
|
|
export_to_video(frame_list, filename, fps=fps) |
|
|
|
def patch_prepare_latents_safe(): |
|
def new_prepare_latents( |
|
self, |
|
image, |
|
batch_size, |
|
latent_channels, |
|
num_frames, |
|
height, |
|
width, |
|
dtype, |
|
device, |
|
generator, |
|
latents=None, |
|
): |
|
image_5d = image.unsqueeze(2) if image.ndim == 4 else image |
|
image_latents = self.vae.encode(image_5d.to(device, dtype=dtype)).latent_dist.sample() |
|
image_latents = image_latents * self.vae.config.scaling_factor |
|
|
|
|
|
if image_latents.shape[2] != num_frames: |
|
latent_padding = torch.zeros( |
|
( |
|
image_latents.shape[0], |
|
image_latents.shape[1], |
|
num_frames - image_latents.shape[2], |
|
image_latents.shape[3], |
|
image_latents.shape[4], |
|
), |
|
device=image_latents.device, |
|
dtype=image_latents.dtype |
|
) |
|
image_latents = torch.cat([image_latents, latent_padding], dim=2) |
|
|
|
if latents is None: |
|
|
|
if generator.device != image_latents.device: |
|
generator = generator.to(image_latents.device) |
|
noise = torch.randn( |
|
image_latents.shape, |
|
dtype=image_latents.dtype, |
|
device=image_latents.device, |
|
generator=generator |
|
) |
|
|
|
latents = noise.to(device=device, dtype=dtype) |
|
|
|
return latents, image_latents.to(device, dtype=dtype) |
|
|
|
from diffusers.pipelines.cogvideo.pipeline_cogvideox_image2video import CogVideoXImageToVideoPipeline |
|
CogVideoXImageToVideoPipeline.prepare_latents = new_prepare_latents |
|
|
|
|
|
|
|
def generate_video( |
|
prompt: str, |
|
fvsm_path: str, |
|
omsm_path: str, |
|
num_frames: int = 81, |
|
width: Optional[int] = None, |
|
height: Optional[int] = None, |
|
output_path: str = "./output.mp4", |
|
image_path: str = "", |
|
num_inference_steps: int = 50, |
|
guidance_scale: float = 6.0, |
|
num_videos_per_prompt: int = 1, |
|
dtype: torch.dtype = torch.bfloat16, |
|
seed: int = 42, |
|
fps: int = 16, |
|
controlnet_guidance_end: float = 0.4, |
|
use_dynamic_cfg: bool = False, |
|
pose_type: str = "manual", |
|
speed: float = 0.5, |
|
use_flow_integration: bool = False, |
|
cam_pose_name: str = None, |
|
depth_ckpt_path: str = "./ckpt/others/depth_anything_v2_metric_hypersim_vitb.pth", |
|
): |
|
""" |
|
Generates a video based on the given prompt and saves it to the specified path. |
|
|
|
Parameters: |
|
- prompt (str): The description of the video to be generated. |
|
- lora_path (str): The path of the LoRA weights to be used. |
|
- lora_rank (int): The rank of the LoRA weights. |
|
- output_path (str): The path where the generated video will be saved. |
|
- num_inference_steps (int): Number of steps for the inference process. More steps can result in better quality. |
|
- num_frames (int): Number of frames to generate. CogVideoX1.0 generates 49 frames for 6 seconds at 8 fps, while CogVideoX1.5 produces either 81 or 161 frames, corresponding to 5 seconds or 10 seconds at 16 fps. |
|
- width (int): The width of the generated video, applicable only for CogVideoX1.5-5B-I2V |
|
- height (int): The height of the generated video, applicable only for CogVideoX1.5-5B-I2V |
|
- guidance_scale (float): The scale for classifier-free guidance. Higher values can lead to better alignment with the prompt. |
|
- num_videos_per_prompt (int): Number of videos to generate per prompt. |
|
- dtype (torch.dtype): The data type for computation (default is torch.bfloat16). |
|
- generate_type (str): The type of video generation (e.g., 't2v', 'i2v', 'v2v').· |
|
- seed (int): The seed for reproducibility. |
|
- fps (int): The frames per second for the generated video. |
|
""" |
|
|
|
patch_prepare_latents_safe() |
|
|
|
print("at generate video", flush=True) |
|
local_rank = 'cuda' |
|
|
|
torch.manual_seed(seed) |
|
random.seed(seed) |
|
|
|
os.makedirs(os.path.join(output_path, 'generated_videos'), exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
image = None |
|
video = None |
|
|
|
model_name = "cogvideox-5b-i2v".lower() |
|
desired_resolution = RESOLUTION_MAP[model_name] |
|
if width is None or height is None: |
|
height, width = desired_resolution |
|
logging.info(f"\033[1mUsing default resolution {desired_resolution} for {model_name}\033[0m") |
|
elif (height, width) != desired_resolution: |
|
if generate_type == "i2v": |
|
|
|
logging.warning( |
|
f"\033[1;31mThe width({width}) and height({height}) are not recommended for {model_name}. The best resolution is {desired_resolution}.\033[0m" |
|
) |
|
|
|
|
|
|
|
""" |
|
# Prepare Pipeline |
|
""" |
|
transformer_lora_config = LoraConfig( |
|
r=128, |
|
lora_alpha=64, |
|
init_lora_weights=True, |
|
target_modules=["to_q", "to_k", "to_v", "to_out.0", "norm1.linear", "norm2.linear", "ff.net.2"], |
|
) |
|
|
|
print(f'Constructing pipeline', flush=True) |
|
pipe_omsm = load_cogvideox_flovd_OMSM_lora_pipeline(omsm_path, backbone_path="THUDM/CogVideoX-5b-I2V", transformer_lora_config=transformer_lora_config, device=local_rank, dtype=dtype) |
|
print("done with omsm", flush=True) |
|
pipe_fvsm = load_cogvideox_flovd_FVSM_controlnet_pipeline(fvsm_path, backbone_path="THUDM/CogVideoX-5b-I2V", device=local_rank, dtype=dtype) |
|
print("done with fvsm", flush=True) |
|
|
|
print(f'Done loading pipeline', flush=True) |
|
|
|
""" |
|
# Prepare inputs |
|
""" |
|
print(f'loading image', flush=True) |
|
|
|
image = load_image(image=image_path) |
|
print(f'done loading image', flush=True) |
|
|
|
|
|
assert pose_type in ['re10k', 'manual'], "Choose other pose_type between ['re10k', 'manual']" |
|
if pose_type == 're10k': |
|
root_path = "./assets/re10k_poses" |
|
else: |
|
root_path = "./assets/manual_poses" |
|
|
|
CameraSampler = SampleManualCam(pose_type=pose_type, root_path=root_path) |
|
camera_flow_generator_target = 'finetune.modules.depth_warping.depth_warping.DepthWarping_wrapper' |
|
camera_flow_generator = initialize_flow_generator(camera_flow_generator_target, ckpt_path=depth_ckpt_path).to(local_rank) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pipe_fvsm.scheduler = CogVideoXDPMScheduler.from_config(pipe_fvsm.scheduler.config, timestep_spacing="trailing") |
|
pipe_omsm.scheduler = CogVideoXDPMScheduler.from_config(pipe_omsm.scheduler.config, timestep_spacing="trailing") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pipe_fvsm.vae.enable_slicing() |
|
pipe_fvsm.vae.enable_tiling() |
|
pipe_omsm.vae.enable_slicing() |
|
pipe_omsm.vae.enable_tiling() |
|
|
|
|
|
output_video_path = os.path.join(output_path, 'generated_videos') |
|
|
|
""" |
|
# Inference time |
|
""" |
|
image = rearrange((torch.tensor(np.array(image)).to(torch.float) / 255. * 2. - 1.).unsqueeze(0), 'b h w c -> b c h w') |
|
image = image.to(local_rank) |
|
prompt_short = prompt[:30].strip().replace(" ", "_") |
|
|
|
|
|
camparam, cam_name = CameraSampler.sample(name=cam_pose_name) |
|
image_torch_255 = ((image.detach().clone()+1)/2. * 255.).squeeze(0) |
|
camera_flow_generator_input = get_camera_flow_generator_input(image_torch_255, camparam, device=local_rank, speed=speed) |
|
image_torch = ((image_torch_255.unsqueeze(0) / 255.) * 2. - 1.).to(local_rank) |
|
|
|
with torch.no_grad(): |
|
with torch.cuda.amp.autocast(enabled=True, dtype=dtype): |
|
|
|
flow_latent = pipe_omsm( |
|
num_frames=num_frames, |
|
height=height, |
|
width=width, |
|
prompt=prompt, |
|
image=image, |
|
generator=torch.Generator().manual_seed(seed), |
|
num_inference_steps=num_inference_steps, |
|
use_dynamic_cfg=use_dynamic_cfg, |
|
output_type='latent' |
|
).frames[0] |
|
object_flow = decode_flow(flow_latent.detach().clone().unsqueeze(0).to(local_rank), pipe_omsm.vae, flow_scale_factor=[60, 36]) |
|
|
|
if use_flow_integration: |
|
|
|
|
|
|
|
camera_flow, log_dict = camera_flow_generator(image_torch, camera_flow_generator_input) |
|
camera_flow = camera_flow.to(local_rank, dtype) |
|
|
|
integrated_flow = integrate_flow(camera_flow, object_flow, log_dict['depth_ctxt'], camera_flow_generator, camera_flow_generator_input) |
|
integrated_flow_latent = rearrange(encode_flow(integrated_flow, pipe_omsm.vae, flow_scale_factor=[60, 36]), 'b c f h w -> b f c h w').to(local_rank, dtype) |
|
else: |
|
integrated_flow_latent = rearrange(flow_latent, '(b f) c h w -> b f c h w', b=image.size(0)) |
|
|
|
|
|
|
|
video_generate = pipe_fvsm( |
|
num_frames=num_frames, |
|
height=height, |
|
width=width, |
|
prompt=prompt, |
|
image=image, |
|
flow_latent=integrated_flow_latent, |
|
valid_mask=None, |
|
generator=torch.Generator().manual_seed(seed), |
|
num_inference_steps=num_inference_steps, |
|
controlnet_guidance_start = 0.0, |
|
controlnet_guidance_end = controlnet_guidance_end, |
|
use_dynamic_cfg=use_dynamic_cfg, |
|
).frames[0] |
|
|
|
|
|
save_path = os.path.join(output_video_path, f"{prompt_short}_{cam_name}.mp4") |
|
export_to_video(video_generate, save_path, fps=fps) |
|
|
|
|
|
|
|
|
|
def encode_video(video: torch.Tensor, vae) -> torch.Tensor: |
|
|
|
video = video.to(vae.device, dtype=vae.dtype) |
|
latent_dist = vae.encode(video).latent_dist |
|
latent = latent_dist.sample() * vae.config.scaling_factor |
|
return latent |
|
|
|
def encode_flow(flow, vae, flow_scale_factor): |
|
|
|
|
|
assert flow.ndim == 4 |
|
num_frames, _, height, width = flow.shape |
|
|
|
|
|
|
|
flow = rearrange(flow, '(b f) c h w -> b f c h w', b=1) |
|
flow_norm = adaptive_normalize(flow, flow_scale_factor[0], flow_scale_factor[1]) |
|
|
|
|
|
flow_norm = rearrange(flow_norm, 'b f c h w -> (b f) c h w', b=1) |
|
|
|
|
|
num_frames, _, H, W = flow_norm.shape |
|
flow_norm_extended = torch.empty((num_frames, 3, height, width)).to(flow_norm) |
|
flow_norm_extended[:,:2] = flow_norm |
|
flow_norm_extended[:,-1:] = flow_norm.mean(dim=1, keepdim=True) |
|
flow_norm_extended = rearrange(flow_norm_extended, '(b f) c h w -> b c f h w', f=num_frames) |
|
|
|
return encode_video(flow_norm_extended, vae) |
|
|
|
|
|
def decode_flow(flow_latent, vae, flow_scale_factor): |
|
flow_latent = flow_latent.permute(0, 2, 1, 3, 4) |
|
flow_latent = 1 / vae.config.scaling_factor * flow_latent |
|
|
|
flow = vae.decode(flow_latent).sample |
|
|
|
|
|
flow = flow[:,:2].detach().clone() |
|
|
|
|
|
flow = rearrange(flow, 'b c f h w -> b f c h w') |
|
flow = adaptive_unnormalize(flow, flow_scale_factor[0], flow_scale_factor[1]) |
|
|
|
flow = rearrange(flow, 'b f c h w -> (b f) c h w') |
|
return flow |
|
|
|
def adaptive_normalize(flow, sf_x, sf_y): |
|
|
|
assert flow.ndim == 5, 'Set the shape of the flow input as (B, F, C, H, W)' |
|
assert sf_x is not None and sf_y is not None |
|
b, f, c, h, w = flow.shape |
|
|
|
max_clip_x = math.sqrt(w/sf_x) * 1.0 |
|
max_clip_y = math.sqrt(h/sf_y) * 1.0 |
|
|
|
flow_norm = flow.detach().clone() |
|
flow_x = flow[:, :, 0].detach().clone() |
|
flow_y = flow[:, :, 1].detach().clone() |
|
|
|
flow_x_norm = torch.sign(flow_x) * torch.sqrt(torch.abs(flow_x)/sf_x + 1e-7) |
|
flow_y_norm = torch.sign(flow_y) * torch.sqrt(torch.abs(flow_y)/sf_y + 1e-7) |
|
|
|
flow_norm[:, :, 0] = torch.clamp(flow_x_norm, min=-max_clip_x, max=max_clip_x) |
|
flow_norm[:, :, 1] = torch.clamp(flow_y_norm, min=-max_clip_y, max=max_clip_y) |
|
|
|
return flow_norm |
|
|
|
|
|
def adaptive_unnormalize(flow, sf_x, sf_y): |
|
|
|
assert flow.ndim == 5, 'Set the shape of the flow input as (B, F, C, H, W)' |
|
assert sf_x is not None and sf_y is not None |
|
|
|
flow_orig = flow.detach().clone() |
|
flow_x = flow[:, :, 0].detach().clone() |
|
flow_y = flow[:, :, 1].detach().clone() |
|
|
|
flow_orig[:, :, 0] = torch.sign(flow_x) * sf_x * (flow_x**2 - 1e-7) |
|
flow_orig[:, :, 1] = torch.sign(flow_y) * sf_y * (flow_y**2 - 1e-7) |
|
|
|
return flow_orig |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser(description="Generate a video from a text prompt using CogVideoX") |
|
parser.add_argument("--prompt", type=str, required=True, help="The description of the video to be generated") |
|
parser.add_argument("--image_path", type=str, default=None, help="The path of the image to be used as the background of the video",) |
|
parser.add_argument("--fvsm_path", type=str, required=True, help="Path of the pre-trained model use") |
|
parser.add_argument("--omsm_path", type=str, required=True, help="Path of the pre-trained model use") |
|
parser.add_argument("--output_path", type=str, default="./output.mp4", help="The path save generated video") |
|
parser.add_argument("--guidance_scale", type=float, default=6.0, help="The scale for classifier-free guidance") |
|
parser.add_argument("--num_inference_steps", type=int, default=50, help="Inference steps") |
|
parser.add_argument("--num_frames", type=int, default=49, help="Number of steps for the inference process") |
|
parser.add_argument("--width", type=int, default=None, help="The width of the generated video") |
|
parser.add_argument("--height", type=int, default=None, help="The height of the generated video") |
|
parser.add_argument("--fps", type=int, default=16, help="The frames per second for the generated video") |
|
parser.add_argument("--num_videos_per_prompt", type=int, default=1, help="Number of videos to generate per prompt") |
|
parser.add_argument("--dtype", type=str, default="bfloat16", help="The data type for computation") |
|
parser.add_argument("--seed", type=int, default=42, help="The seed for reproducibility") |
|
parser.add_argument("--controlnet_guidance_end", type=float, default=0.4, help="Controlnet guidance end during sampling") |
|
parser.add_argument("--use_dynamic_cfg", action='store_true') |
|
parser.add_argument("--pose_type", type=str, default='manual', help="pose type in the inference time") |
|
parser.add_argument("--speed", type=float, default=0.5, help="pose type in the inference time") |
|
parser.add_argument("--use_flow_integration", action='store_true') |
|
parser.add_argument("--cam_pose_name", type=str, required=False, default=None, help="Camera trajectory name") |
|
parser.add_argument("--depth_ckpt_path", type=str, required=False, default="./ckpt/others/depth_anything_v2_metric_hypersim_vitb.pth", help="Camera trajectory name") |
|
|
|
args = parser.parse_args() |
|
dtype = torch.float16 if args.dtype == "float16" else torch.bfloat16 |
|
|
|
|
|
generate_video( |
|
prompt=args.prompt, |
|
fvsm_path=args.fvsm_path, |
|
omsm_path=args.omsm_path, |
|
output_path=args.output_path, |
|
num_frames=args.num_frames, |
|
width=args.width, |
|
height=args.height, |
|
image_path=args.image_path, |
|
num_inference_steps=args.num_inference_steps, |
|
guidance_scale=args.guidance_scale, |
|
num_videos_per_prompt=args.num_videos_per_prompt, |
|
dtype=dtype, |
|
seed=args.seed, |
|
fps=args.fps, |
|
controlnet_guidance_end=args.controlnet_guidance_end, |
|
use_dynamic_cfg=args.use_dynamic_cfg, |
|
pose_type=args.pose_type, |
|
speed=args.speed, |
|
use_flow_integration=args.use_flow_integration, |
|
cam_pose_name=args.cam_pose_name, |
|
depth_ckpt_path=args.depth_ckpt_path |
|
) |
|
|