Spaces:
Runtime error
Runtime error
"""Modified from https://github.com/kijai/ComfyUI-EasyAnimateWrapper/blob/main/nodes.py | |
""" | |
import gc | |
import json | |
import os | |
import comfy.model_management as mm | |
import cv2 | |
import folder_paths | |
import numpy as np | |
import torch | |
from comfy.utils import ProgressBar, load_torch_file | |
from diffusers import (AutoencoderKL, CogVideoXDDIMScheduler, DDIMScheduler, | |
DPMSolverMultistepScheduler, | |
EulerAncestralDiscreteScheduler, EulerDiscreteScheduler, | |
PNDMScheduler) | |
from einops import rearrange | |
from omegaconf import OmegaConf | |
from PIL import Image | |
from transformers import T5EncoderModel, T5Tokenizer | |
from ..cogvideox.data.bucket_sampler import ASPECT_RATIO_512, get_closest_ratio | |
from ..cogvideox.models.autoencoder_magvit import AutoencoderKLCogVideoX | |
from ..cogvideox.models.transformer3d import CogVideoXTransformer3DModel | |
from ..cogvideox.pipeline.pipeline_cogvideox import CogVideoX_Fun_Pipeline | |
from ..cogvideox.pipeline.pipeline_cogvideox_control import \ | |
CogVideoX_Fun_Pipeline_Control | |
from ..cogvideox.pipeline.pipeline_cogvideox_inpaint import ( | |
CogVideoX_Fun_Pipeline_Inpaint) | |
from ..cogvideox.utils.lora_utils import merge_lora, unmerge_lora | |
from ..cogvideox.utils.utils import (get_image_to_video_latent, | |
get_video_to_video_latent, | |
save_videos_grid) | |
# Compatible with Alibaba EAS for quick launch | |
eas_cache_dir = '/stable-diffusion-cache/models' | |
# The directory of the cogvideoxfun | |
script_directory = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
def tensor2pil(image): | |
return Image.fromarray(np.clip(255. * image.cpu().numpy(), 0, 255).astype(np.uint8)) | |
def numpy2pil(image): | |
return Image.fromarray(np.clip(255. * image, 0, 255).astype(np.uint8)) | |
def to_pil(image): | |
if isinstance(image, Image.Image): | |
return image | |
if isinstance(image, torch.Tensor): | |
return tensor2pil(image) | |
if isinstance(image, np.ndarray): | |
return numpy2pil(image) | |
raise ValueError(f"Cannot convert {type(image)} to PIL.Image") | |
class LoadCogVideoX_Fun_Model: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"model": ( | |
[ | |
'CogVideoX-Fun-2b-InP', | |
'CogVideoX-Fun-5b-InP', | |
'CogVideoX-Fun-V1.1-2b-InP', | |
'CogVideoX-Fun-V1.1-5b-InP', | |
'CogVideoX-Fun-V1.1-2b-Pose', | |
'CogVideoX-Fun-V1.1-5b-Pose', | |
], | |
{ | |
"default": 'CogVideoX-Fun-V1.1-2b-InP', | |
} | |
), | |
"model_type": ( | |
["Inpaint", "Control"], | |
{ | |
"default": "Inpaint", | |
} | |
), | |
"low_gpu_memory_mode":( | |
[False, True], | |
{ | |
"default": False, | |
} | |
), | |
"precision": ( | |
['fp16', 'bf16'], | |
{ | |
"default": 'fp16' | |
} | |
), | |
}, | |
} | |
RETURN_TYPES = ("CogVideoXFUNSMODEL",) | |
RETURN_NAMES = ("cogvideoxfun_model",) | |
FUNCTION = "loadmodel" | |
CATEGORY = "CogVideoXFUNWrapper" | |
def loadmodel(self, low_gpu_memory_mode, model, model_type, precision): | |
# Init weight_dtype and device | |
device = mm.get_torch_device() | |
offload_device = mm.unet_offload_device() | |
weight_dtype = {"bf16": torch.bfloat16, "fp16": torch.float16, "fp32": torch.float32}[precision] | |
# Init processbar | |
pbar = ProgressBar(3) | |
# Detect model is existing or not | |
model_path = os.path.join(folder_paths.models_dir, "CogVideoX_Fun", model) | |
if not os.path.exists(model_path): | |
if os.path.exists(eas_cache_dir): | |
model_path = os.path.join(eas_cache_dir, 'CogVideoX_Fun', model) | |
else: | |
model_path = os.path.join(folder_paths.models_dir, "CogVideoX-Fun", model) | |
if not os.path.exists(model_path): | |
if os.path.exists(eas_cache_dir): | |
model_path = os.path.join(eas_cache_dir, 'CogVideoX_Fun', model) | |
else: | |
# Detect model is existing or not | |
print(f"Please download cogvideoxfun model to: {model_path}") | |
vae = AutoencoderKLCogVideoX.from_pretrained( | |
model_path, | |
subfolder="vae", | |
).to(weight_dtype) | |
# Update pbar | |
pbar.update(1) | |
# Load Sampler | |
print("Load Sampler.") | |
scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder='scheduler') | |
# Update pbar | |
pbar.update(1) | |
# Get Transformer | |
transformer = CogVideoXTransformer3DModel.from_pretrained_2d( | |
model_path, | |
subfolder="transformer", | |
).to(weight_dtype) | |
# Update pbar | |
pbar.update(1) | |
# Get pipeline | |
if model_type == "Inpaint": | |
if transformer.config.in_channels != vae.config.latent_channels: | |
pipeline = CogVideoX_Fun_Pipeline_Inpaint.from_pretrained( | |
model_path, | |
vae=vae, | |
transformer=transformer, | |
scheduler=scheduler, | |
torch_dtype=weight_dtype | |
) | |
else: | |
pipeline = CogVideoX_Fun_Pipeline.from_pretrained( | |
model_path, | |
vae=vae, | |
transformer=transformer, | |
scheduler=scheduler, | |
torch_dtype=weight_dtype | |
) | |
else: | |
pipeline = CogVideoX_Fun_Pipeline_Control.from_pretrained( | |
model_path, | |
vae=vae, | |
transformer=transformer, | |
scheduler=scheduler, | |
torch_dtype=weight_dtype | |
) | |
if low_gpu_memory_mode: | |
pipeline.enable_sequential_cpu_offload() | |
else: | |
pipeline.enable_model_cpu_offload() | |
cogvideoxfun_model = { | |
'pipeline': pipeline, | |
'dtype': weight_dtype, | |
'model_path': model_path, | |
'model_type': model_type, | |
'loras': [], | |
'strength_model': [], | |
} | |
return (cogvideoxfun_model,) | |
class LoadCogVideoX_Fun_Lora: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"cogvideoxfun_model": ("CogVideoXFUNSMODEL",), | |
"lora_name": (folder_paths.get_filename_list("loras"), {"default": None,}), | |
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}), | |
} | |
} | |
RETURN_TYPES = ("CogVideoXFUNSMODEL",) | |
RETURN_NAMES = ("cogvideoxfun_model",) | |
FUNCTION = "load_lora" | |
CATEGORY = "CogVideoXFUNWrapper" | |
def load_lora(self, cogvideoxfun_model, lora_name, strength_model): | |
if lora_name is not None: | |
return ( | |
{ | |
'pipeline': cogvideoxfun_model["pipeline"], | |
'dtype': cogvideoxfun_model["dtype"], | |
'model_path': cogvideoxfun_model["model_path"], | |
'loras': cogvideoxfun_model.get("loras", []) + [folder_paths.get_full_path("loras", lora_name)], | |
'strength_model': cogvideoxfun_model.get("strength_model", []) + [strength_model], | |
}, | |
) | |
else: | |
return (cogvideoxfun_model,) | |
class CogVideoX_FUN_TextBox: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"prompt": ("STRING", {"multiline": True, "default": "",}), | |
} | |
} | |
RETURN_TYPES = ("STRING_PROMPT",) | |
RETURN_NAMES =("prompt",) | |
FUNCTION = "process" | |
CATEGORY = "CogVideoXFUNWrapper" | |
def process(self, prompt): | |
return (prompt, ) | |
class CogVideoX_Fun_I2VSampler: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"cogvideoxfun_model": ( | |
"CogVideoXFUNSMODEL", | |
), | |
"prompt": ( | |
"STRING_PROMPT", | |
), | |
"negative_prompt": ( | |
"STRING_PROMPT", | |
), | |
"video_length": ( | |
"INT", {"default": 49, "min": 5, "max": 49, "step": 4} | |
), | |
"base_resolution": ( | |
[ | |
512, | |
768, | |
960, | |
1024, | |
], {"default": 768} | |
), | |
"seed": ( | |
"INT", {"default": 43, "min": 0, "max": 0xffffffffffffffff} | |
), | |
"steps": ( | |
"INT", {"default": 50, "min": 1, "max": 200, "step": 1} | |
), | |
"cfg": ( | |
"FLOAT", {"default": 6.0, "min": 1.0, "max": 20.0, "step": 0.01} | |
), | |
"scheduler": ( | |
[ | |
"Euler", | |
"Euler A", | |
"DPM++", | |
"PNDM", | |
"DDIM", | |
], | |
{ | |
"default": 'DDIM' | |
} | |
) | |
}, | |
"optional":{ | |
"start_img": ("IMAGE",), | |
"end_img": ("IMAGE",), | |
}, | |
} | |
RETURN_TYPES = ("IMAGE",) | |
RETURN_NAMES =("images",) | |
FUNCTION = "process" | |
CATEGORY = "CogVideoXFUNWrapper" | |
def process(self, cogvideoxfun_model, prompt, negative_prompt, video_length, base_resolution, seed, steps, cfg, scheduler, start_img=None, end_img=None): | |
device = mm.get_torch_device() | |
offload_device = mm.unet_offload_device() | |
mm.soft_empty_cache() | |
gc.collect() | |
start_img = [to_pil(_start_img) for _start_img in start_img] if start_img is not None else None | |
end_img = [to_pil(_end_img) for _end_img in end_img] if end_img is not None else None | |
# Count most suitable height and width | |
aspect_ratio_sample_size = {key : [x / 512 * base_resolution for x in ASPECT_RATIO_512[key]] for key in ASPECT_RATIO_512.keys()} | |
original_width, original_height = start_img[0].size if type(start_img) is list else Image.open(start_img).size | |
closest_size, closest_ratio = get_closest_ratio(original_height, original_width, ratios=aspect_ratio_sample_size) | |
height, width = [int(x / 16) * 16 for x in closest_size] | |
# Get Pipeline | |
pipeline = cogvideoxfun_model['pipeline'] | |
model_path = cogvideoxfun_model['model_path'] | |
# Load Sampler | |
if scheduler == "DPM++": | |
noise_scheduler = DPMSolverMultistepScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "Euler": | |
noise_scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "Euler A": | |
noise_scheduler = EulerAncestralDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "PNDM": | |
noise_scheduler = PNDMScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "DDIM": | |
noise_scheduler = DDIMScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
pipeline.scheduler = noise_scheduler | |
generator= torch.Generator(device).manual_seed(seed) | |
with torch.no_grad(): | |
video_length = int((video_length - 1) // pipeline.vae.config.temporal_compression_ratio * pipeline.vae.config.temporal_compression_ratio) + 1 if video_length != 1 else 1 | |
input_video, input_video_mask, clip_image = get_image_to_video_latent(start_img, end_img, video_length=video_length, sample_size=(height, width)) | |
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])): | |
pipeline = merge_lora(pipeline, _lora_path, _lora_weight) | |
sample = pipeline( | |
prompt, | |
num_frames = video_length, | |
negative_prompt = negative_prompt, | |
height = height, | |
width = width, | |
generator = generator, | |
guidance_scale = cfg, | |
num_inference_steps = steps, | |
video = input_video, | |
mask_video = input_video_mask, | |
comfyui_progressbar = True, | |
).videos | |
videos = rearrange(sample, "b c t h w -> (b t) h w c") | |
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])): | |
pipeline = unmerge_lora(pipeline, _lora_path, _lora_weight) | |
return (videos,) | |
class CogVideoX_Fun_T2VSampler: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"cogvideoxfun_model": ( | |
"CogVideoXFUNSMODEL", | |
), | |
"prompt": ( | |
"STRING_PROMPT", | |
), | |
"negative_prompt": ( | |
"STRING_PROMPT", | |
), | |
"video_length": ( | |
"INT", {"default": 49, "min": 5, "max": 49, "step": 4} | |
), | |
"width": ( | |
"INT", {"default": 1008, "min": 64, "max": 2048, "step": 16} | |
), | |
"height": ( | |
"INT", {"default": 576, "min": 64, "max": 2048, "step": 16} | |
), | |
"is_image":( | |
[ | |
False, | |
True | |
], | |
{ | |
"default": False, | |
} | |
), | |
"seed": ( | |
"INT", {"default": 43, "min": 0, "max": 0xffffffffffffffff} | |
), | |
"steps": ( | |
"INT", {"default": 50, "min": 1, "max": 200, "step": 1} | |
), | |
"cfg": ( | |
"FLOAT", {"default": 6.0, "min": 1.0, "max": 20.0, "step": 0.01} | |
), | |
"scheduler": ( | |
[ | |
"Euler", | |
"Euler A", | |
"DPM++", | |
"PNDM", | |
"DDIM", | |
], | |
{ | |
"default": 'DDIM' | |
} | |
), | |
}, | |
} | |
RETURN_TYPES = ("IMAGE",) | |
RETURN_NAMES =("images",) | |
FUNCTION = "process" | |
CATEGORY = "CogVideoXFUNWrapper" | |
def process(self, cogvideoxfun_model, prompt, negative_prompt, video_length, width, height, is_image, seed, steps, cfg, scheduler): | |
device = mm.get_torch_device() | |
offload_device = mm.unet_offload_device() | |
mm.soft_empty_cache() | |
gc.collect() | |
# Get Pipeline | |
pipeline = cogvideoxfun_model['pipeline'] | |
model_path = cogvideoxfun_model['model_path'] | |
# Load Sampler | |
if scheduler == "DPM++": | |
noise_scheduler = DPMSolverMultistepScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "Euler": | |
noise_scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "Euler A": | |
noise_scheduler = EulerAncestralDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "PNDM": | |
noise_scheduler = PNDMScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "DDIM": | |
noise_scheduler = DDIMScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
pipeline.scheduler = noise_scheduler | |
generator= torch.Generator(device).manual_seed(seed) | |
video_length = 1 if is_image else video_length | |
with torch.no_grad(): | |
video_length = int((video_length - 1) // pipeline.vae.config.temporal_compression_ratio * pipeline.vae.config.temporal_compression_ratio) + 1 if video_length != 1 else 1 | |
input_video, input_video_mask, clip_image = get_image_to_video_latent(None, None, video_length=video_length, sample_size=(height, width)) | |
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])): | |
pipeline = merge_lora(pipeline, _lora_path, _lora_weight) | |
sample = pipeline( | |
prompt, | |
num_frames = video_length, | |
negative_prompt = negative_prompt, | |
height = height, | |
width = width, | |
generator = generator, | |
guidance_scale = cfg, | |
num_inference_steps = steps, | |
video = input_video, | |
mask_video = input_video_mask, | |
comfyui_progressbar = True, | |
).videos | |
videos = rearrange(sample, "b c t h w -> (b t) h w c") | |
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])): | |
pipeline = unmerge_lora(pipeline, _lora_path, _lora_weight) | |
return (videos,) | |
class CogVideoX_Fun_V2VSampler: | |
def INPUT_TYPES(s): | |
return { | |
"required": { | |
"cogvideoxfun_model": ( | |
"CogVideoXFUNSMODEL", | |
), | |
"prompt": ( | |
"STRING_PROMPT", | |
), | |
"negative_prompt": ( | |
"STRING_PROMPT", | |
), | |
"video_length": ( | |
"INT", {"default": 49, "min": 5, "max": 49, "step": 4} | |
), | |
"base_resolution": ( | |
[ | |
512, | |
768, | |
960, | |
1024, | |
], {"default": 768} | |
), | |
"seed": ( | |
"INT", {"default": 43, "min": 0, "max": 0xffffffffffffffff} | |
), | |
"steps": ( | |
"INT", {"default": 50, "min": 1, "max": 200, "step": 1} | |
), | |
"cfg": ( | |
"FLOAT", {"default": 6.0, "min": 1.0, "max": 20.0, "step": 0.01} | |
), | |
"denoise_strength": ( | |
"FLOAT", {"default": 0.70, "min": 0.05, "max": 1.00, "step": 0.01} | |
), | |
"scheduler": ( | |
[ | |
"Euler", | |
"Euler A", | |
"DPM++", | |
"PNDM", | |
"DDIM", | |
], | |
{ | |
"default": 'DDIM' | |
} | |
), | |
}, | |
"optional":{ | |
"validation_video": ("IMAGE",), | |
"control_video": ("IMAGE",), | |
}, | |
} | |
RETURN_TYPES = ("IMAGE",) | |
RETURN_NAMES =("images",) | |
FUNCTION = "process" | |
CATEGORY = "CogVideoXFUNWrapper" | |
def process(self, cogvideoxfun_model, prompt, negative_prompt, video_length, base_resolution, seed, steps, cfg, denoise_strength, scheduler, validation_video=None, control_video=None): | |
device = mm.get_torch_device() | |
offload_device = mm.unet_offload_device() | |
mm.soft_empty_cache() | |
gc.collect() | |
# Get Pipeline | |
pipeline = cogvideoxfun_model['pipeline'] | |
model_path = cogvideoxfun_model['model_path'] | |
model_type = cogvideoxfun_model['model_type'] | |
# Count most suitable height and width | |
aspect_ratio_sample_size = {key : [x / 512 * base_resolution for x in ASPECT_RATIO_512[key]] for key in ASPECT_RATIO_512.keys()} | |
if model_type == "Inpaint": | |
if type(validation_video) is str: | |
original_width, original_height = Image.fromarray(cv2.VideoCapture(validation_video).read()[1]).size | |
else: | |
validation_video = np.array(validation_video.cpu().numpy() * 255, np.uint8) | |
original_width, original_height = Image.fromarray(validation_video[0]).size | |
else: | |
if type(control_video) is str: | |
original_width, original_height = Image.fromarray(cv2.VideoCapture(control_video).read()[1]).size | |
else: | |
control_video = np.array(control_video.cpu().numpy() * 255, np.uint8) | |
original_width, original_height = Image.fromarray(control_video[0]).size | |
closest_size, closest_ratio = get_closest_ratio(original_height, original_width, ratios=aspect_ratio_sample_size) | |
height, width = [int(x / 16) * 16 for x in closest_size] | |
# Load Sampler | |
if scheduler == "DPM++": | |
noise_scheduler = DPMSolverMultistepScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "Euler": | |
noise_scheduler = EulerDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "Euler A": | |
noise_scheduler = EulerAncestralDiscreteScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "PNDM": | |
noise_scheduler = PNDMScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
elif scheduler == "DDIM": | |
noise_scheduler = DDIMScheduler.from_pretrained(model_path, subfolder= 'scheduler') | |
pipeline.scheduler = noise_scheduler | |
generator= torch.Generator(device).manual_seed(seed) | |
with torch.no_grad(): | |
video_length = int((video_length - 1) // pipeline.vae.config.temporal_compression_ratio * pipeline.vae.config.temporal_compression_ratio) + 1 if video_length != 1 else 1 | |
if model_type == "Inpaint": | |
input_video, input_video_mask, clip_image = get_video_to_video_latent(validation_video, video_length=video_length, sample_size=(height, width), fps=8) | |
else: | |
input_video, input_video_mask, clip_image = get_video_to_video_latent(control_video, video_length=video_length, sample_size=(height, width), fps=8) | |
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])): | |
pipeline = merge_lora(pipeline, _lora_path, _lora_weight) | |
if model_type == "Inpaint": | |
sample = pipeline( | |
prompt, | |
num_frames = video_length, | |
negative_prompt = negative_prompt, | |
height = height, | |
width = width, | |
generator = generator, | |
guidance_scale = cfg, | |
num_inference_steps = steps, | |
video = input_video, | |
mask_video = input_video_mask, | |
strength = float(denoise_strength), | |
comfyui_progressbar = True, | |
).videos | |
else: | |
sample = pipeline( | |
prompt, | |
num_frames = video_length, | |
negative_prompt = negative_prompt, | |
height = height, | |
width = width, | |
generator = generator, | |
guidance_scale = cfg, | |
num_inference_steps = steps, | |
control_video = input_video, | |
comfyui_progressbar = True, | |
).videos | |
videos = rearrange(sample, "b c t h w -> (b t) h w c") | |
for _lora_path, _lora_weight in zip(cogvideoxfun_model.get("loras", []), cogvideoxfun_model.get("strength_model", [])): | |
pipeline = unmerge_lora(pipeline, _lora_path, _lora_weight) | |
return (videos,) | |
NODE_CLASS_MAPPINGS = { | |
"CogVideoX_FUN_TextBox": CogVideoX_FUN_TextBox, | |
"LoadCogVideoX_Fun_Model": LoadCogVideoX_Fun_Model, | |
"LoadCogVideoX_Fun_Lora": LoadCogVideoX_Fun_Lora, | |
"CogVideoX_Fun_I2VSampler": CogVideoX_Fun_I2VSampler, | |
"CogVideoX_Fun_T2VSampler": CogVideoX_Fun_T2VSampler, | |
"CogVideoX_Fun_V2VSampler": CogVideoX_Fun_V2VSampler, | |
} | |
NODE_DISPLAY_NAME_MAPPINGS = { | |
"CogVideoX_FUN_TextBox": "CogVideoX_FUN_TextBox", | |
"LoadCogVideoX_Fun_Model": "Load CogVideoX-Fun Model", | |
"LoadCogVideoX_Fun_Lora": "Load CogVideoX-Fun Lora", | |
"CogVideoX_Fun_I2VSampler": "CogVideoX-Fun Sampler for Image to Video", | |
"CogVideoX_Fun_T2VSampler": "CogVideoX-Fun Sampler for Text to Video", | |
"CogVideoX_Fun_V2VSampler": "CogVideoX-Fun Sampler for Video to Video", | |
} |