DiffusionText2WorldGeneration / text2world_hf.py
EthanZyh's picture
recover last commit
8316c81
raw
history blame
6.04 kB
import os
import argparse
import torch
from transformers import PreTrainedModel, PretrainedConfig
from .cosmos1.models.diffusion.inference.inference_utils import add_common_arguments, validate_args
from .cosmos1.models.diffusion.inference.world_generation_pipeline import DiffusionText2WorldGenerationPipeline
import .cosmos1.utils.log as log
import .cosmos1.utils.misc as misc
from .cosmos1.utils.io import read_prompts_from_file, save_video
class DiffusionText2WorldConfig(PretrainedConfig):
model_type = "DiffusionText2World"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.diffusion_transformer_dir = kwargs.get("diffusion_transformer_dir", "Cosmos-1.0-Diffusion-7B-Text2World")
self.prompt_upsampler_dir = kwargs.get("prompt_upsampler_dir", "Cosmos-1.0-Prompt-Upsampler-12B-Text2World")
self.word_limit_to_skip_upsampler = kwargs.get("word_limit_to_skip_upsampler", 250)
self.checkpoint_dir = kwargs.get("checkpoint_dir", "checkpoints")
self.tokenizer_dir = kwargs.get("tokenizer_dir", "Cosmos-1.0-Tokenizer-CV8x8x8")
self.video_save_name = kwargs.get("video_save_name", "output")
self.video_save_folder = kwargs.get("video_save_folder", "outputs/")
self.prompt = kwargs.get("prompt", None)
self.batch_input_path = kwargs.get("batch_input_path", None)
self.negative_prompt = kwargs.get("negative_prompt", None)
self.num_steps = kwargs.get("num_steps", 35)
self.guidance = kwargs.get("guidance", 7)
self.num_video_frames = kwargs.get("num_video_frames", 121)
self.height = kwargs.get("height", 704)
self.width = kwargs.get("width", 1280)
self.fps = kwargs.get("fps", 24)
self.seed = kwargs.get("seed", 1)
self.disable_prompt_upsampler = kwargs.get("disable_prompt_upsampler", False)
self.offload_diffusion_transformer = kwargs.get("offload_diffusion_transformer", False)
self.offload_tokenizer = kwargs.get("offload_tokenizer", False)
self.offload_text_encoder_model = kwargs.get("offload_text_encoder_model", False)
self.offload_prompt_upsampler = kwargs.get("offload_prompt_upsampler", False)
self.offload_guardrail_models = kwargs.get("offload_guardrail_models", False)
class DiffusionText2World(PreTrainedModel):
config_class = DiffusionText2WorldConfig
def __init__(self, config=DiffusionText2WorldConfig()):
super().__init__(config)
torch.enable_grad(False) # TODO: do we need this?
self.config = config
inference_type = "text2world"
config.prompt = 1 # TODO: this is to hack args validation, maybe find a better way
validate_args(config, inference_type)
del config.prompt
self.pipeline = DiffusionText2WorldGenerationPipeline(
inference_type=inference_type,
checkpoint_dir=config.checkpoint_dir,
checkpoint_name=config.diffusion_transformer_dir,
prompt_upsampler_dir=config.prompt_upsampler_dir,
enable_prompt_upsampler=not config.disable_prompt_upsampler,
offload_network=config.offload_diffusion_transformer,
offload_tokenizer=config.offload_tokenizer,
offload_text_encoder_model=config.offload_text_encoder_model,
offload_prompt_upsampler=config.offload_prompt_upsampler,
offload_guardrail_models=config.offload_guardrail_models,
guidance=config.guidance,
num_steps=config.num_steps,
height=config.height,
width=config.width,
fps=config.fps,
num_video_frames=config.num_video_frames,
seed=config.seed,
)
def forward(self, prompt):
cfg = self.config
# Handle multiple prompts if prompt file is provided
if cfg.batch_input_path:
log.info(f"Reading batch inputs from path: {cfg.batch_input_path}")
prompts = read_prompts_from_file(cfg.batch_input_path)
else:
# Single prompt case
prompts = [{"prompt": cfg.prompt}]
os.makedirs(cfg.video_save_folder, exist_ok=True)
for i, input_dict in enumerate(prompts):
current_prompt = input_dict.get("prompt", None)
if current_prompt is None:
log.critical("Prompt is missing, skipping world generation.")
continue
# Generate video
generated_output = self.pipeline.generate(current_prompt, cfg.negative_prompt, cfg.word_limit_to_skip_upsampler)
if generated_output is None:
log.critical("Guardrail blocked text2world generation.")
continue
video, prompt = generated_output
if cfg.batch_input_path:
video_save_path = os.path.join(cfg.video_save_folder, f"{i}.mp4")
prompt_save_path = os.path.join(cfg.video_save_folder, f"{i}.txt")
else:
video_save_path = os.path.join(cfg.video_save_folder, f"{cfg.video_save_name}.mp4")
prompt_save_path = os.path.join(cfg.video_save_folder, f"{cfg.video_save_name}.txt")
# Save video
save_video(
video=video,
fps=cfg.fps,
H=cfg.height,
W=cfg.width,
video_save_quality=5,
video_save_path=video_save_path,
)
# Save prompt to text file alongside video
with open(prompt_save_path, "wb") as f:
f.write(prompt.encode("utf-8"))
log.info(f"Saved video to {video_save_path}")
log.info(f"Saved prompt to {prompt_save_path}")
def save_pretrained(self, save_directory, **kwargs):
# We don't save anything
pass
@classmethod
def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
config = kwargs["config"]
model = cls(config)
return model