Spaces:
Running
on
Zero
Running
on
Zero
""" | |
Copyright NewGenAI | |
Code can't be included in commercial app used for monetary gain. No derivative code allowed. | |
""" | |
import json | |
import torch | |
import gradio as gr | |
import random | |
import time | |
from datetime import datetime | |
import os | |
from diffusers.utils import export_to_video | |
from diffusers import LTXImageToVideoPipeline | |
from transformers import T5EncoderModel, T5Tokenizer | |
from pathlib import Path | |
from datetime import datetime | |
from huggingface_hub import hf_hub_download | |
STATE_FILE = "LTX091_I2V_state.json" | |
queue = [] | |
def load_state(): | |
if os.path.exists(STATE_FILE): | |
with open(STATE_FILE, "r") as file: | |
return json.load(file) | |
return {} | |
def save_state(state): | |
with open(STATE_FILE, "w") as file: | |
json.dump(state, file) | |
initial_state = load_state() | |
def add_to_queue(image, prompt, negative_prompt, height, width, num_frames, num_inference_steps, fps, seed): | |
task = { | |
"image": image, | |
"prompt": prompt, | |
"negative_prompt": negative_prompt, | |
"height": height, | |
"width": width, | |
"num_frames": num_frames, | |
"num_inference_steps": num_inference_steps, | |
"fps": fps, | |
"seed": seed, | |
} | |
queue.append(task) | |
return f"Task added to queue. Current queue length: {len(queue)}" | |
def clear_queue(): | |
queue.clear() | |
return "Queue cleared." | |
def process_queue(): | |
if not queue: | |
return "Queue is empty." | |
for i, task in enumerate(queue): | |
generate_video(**task) | |
time.sleep(1) | |
queue.clear() | |
return "All tasks in the queue have been processed." | |
def save_ui_state(prompt, negative_prompt, height, width, num_frames, num_inference_steps, fps, seed): | |
state = { | |
"prompt": prompt, | |
"negative_prompt": negative_prompt, | |
"height": height, | |
"width": width, | |
"num_frames": num_frames, | |
"num_inference_steps": num_inference_steps, | |
"fps": fps, | |
"seed": seed, | |
} | |
save_state(state) | |
return "State saved!" | |
# [Previous model loading code remains the same...] | |
repo_id = "a-r-r-o-w/LTX-Video-0.9.1-diffusers" | |
base_path = repo_id | |
files_to_download = [ | |
"model_index.json", | |
"scheduler/scheduler_config.json", | |
"text_encoder/config.json", | |
"text_encoder/model-00001-of-00004.safetensors", | |
"text_encoder/model-00002-of-00004.safetensors", | |
"text_encoder/model-00003-of-00004.safetensors", | |
"text_encoder/model-00004-of-00004.safetensors", | |
"text_encoder/model.safetensors.index.json", | |
"tokenizer/added_tokens.json", | |
"tokenizer/special_tokens_map.json", | |
"tokenizer/spiece.model", | |
"tokenizer/tokenizer_config.json", | |
"transformer/config.json", | |
"transformer/diffusion_pytorch_model.safetensors", | |
"vae/config.json", | |
"vae/diffusion_pytorch_model.safetensors", | |
] | |
os.makedirs(base_path, exist_ok=True) | |
for file_path in files_to_download: | |
try: | |
full_dir = os.path.join(base_path, os.path.dirname(file_path)) | |
os.makedirs(full_dir, exist_ok=True) | |
downloaded_path = hf_hub_download( | |
repo_id=repo_id, | |
filename=file_path, | |
local_dir=base_path, | |
) | |
print(f"Successfully downloaded: {file_path}") | |
except Exception as e: | |
print(f"Error downloading {file_path}: {str(e)}") | |
raise | |
try: | |
full_dir = os.path.join(base_path, os.path.dirname(file_path)) | |
os.makedirs(full_dir, exist_ok=True) | |
downloaded_path = hf_hub_download( | |
repo_id="Lightricks/LTX-Video", | |
filename="ltx-video-2b-v0.9.1.safetensors", | |
local_dir=repo_id, | |
) | |
print(f"Successfully downloaded: ltx-video-2b-v0.9.1.safetensors") | |
except Exception as e: | |
print(f"Error downloading 0.9.1 model: {str(e)}") | |
raise | |
single_file_url = repo_id+"/ltx-video-2b-v0.9.1.safetensors" | |
text_encoder = T5EncoderModel.from_pretrained( | |
repo_id, subfolder="text_encoder", torch_dtype=torch.bfloat16 | |
) | |
tokenizer = T5Tokenizer.from_pretrained( | |
repo_id, subfolder="tokenizer", torch_dtype=torch.bfloat16 | |
) | |
pipe = LTXImageToVideoPipeline.from_single_file( | |
single_file_url, | |
text_encoder=text_encoder, | |
tokenizer=tokenizer, | |
torch_dtype=torch.bfloat16 | |
) | |
pipe.enable_model_cpu_offload() | |
def generate_video(image, prompt, negative_prompt, height, width, num_frames, num_inference_steps, fps, seed): | |
if seed == 0: | |
seed = random.randint(0, 999999) | |
video = pipe( | |
image=image, | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
width=width, | |
height=height, | |
num_frames=num_frames, | |
num_inference_steps=num_inference_steps, | |
generator=torch.Generator(device='cuda').manual_seed(seed), | |
).frames[0] | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"{prompt[:10]}_{timestamp}.mp4" | |
os.makedirs("output_LTX091_i2v", exist_ok=True) | |
output_path = f"./output_LTX091_i2v/{filename}" | |
export_to_video(video, output_path, fps=fps) | |
return output_path | |
def randomize_seed(): | |
return random.randint(0, 999999) | |
with gr.Blocks() as demo: | |
with gr.Tabs(): | |
with gr.Tab("Generate Video"): | |
with gr.Row(): | |
input_image = gr.Image(label="Input Image", type="pil") | |
with gr.Row(): | |
prompt = gr.Textbox(label="Prompt", lines=3, value=initial_state.get("prompt", "A dramatic view of the pyramids at Giza during sunset.")) | |
negative_prompt = gr.Textbox(label="Negative Prompt", lines=3, value=initial_state.get("negative_prompt", "worst quality, blurry, distorted")) | |
with gr.Row(): | |
height = gr.Slider(label="Height", minimum=240, maximum=1080, step=1, value=initial_state.get("height", 480)) | |
width = gr.Slider(label="Width", minimum=320, maximum=1920, step=1, value=initial_state.get("width", 704)) | |
with gr.Row(): | |
num_frames = gr.Slider(label="Number of Frames", minimum=1, maximum=500, step=1, value=initial_state.get("num_frames", 161)) | |
num_inference_steps = gr.Slider(label="Number of Inference Steps", minimum=1, maximum=100, step=1, value=initial_state.get("num_inference_steps", 50)) | |
with gr.Row(): | |
fps = gr.Slider(label="FPS", minimum=1, maximum=60, step=1, value=initial_state.get("fps", 24)) | |
seed = gr.Number(label="Seed", value=initial_state.get("seed", 0)) | |
random_seed_button = gr.Button("Randomize Seed") | |
output_video = gr.Video(label="Generated Video", show_label=True) | |
generate_button = gr.Button("Generate Video") | |
save_state_button = gr.Button("Save State") | |
random_seed_button.click(lambda: random.randint(0, 999999), outputs=seed) | |
generate_button.click( | |
generate_video, | |
inputs=[input_image, prompt, negative_prompt, height, width, num_frames, num_inference_steps, fps, seed], | |
outputs=output_video | |
) | |
save_state_button.click( | |
save_ui_state, | |
inputs=[prompt, negative_prompt, height, width, num_frames, num_inference_steps, fps, seed], | |
outputs=gr.Text(label="State Status") | |
) | |
with gr.Tab("Batch Processing"): | |
with gr.Row(): | |
batch_input_image = gr.Image(label="Input Image", type="pil") | |
with gr.Row(): | |
batch_prompt = gr.Textbox(label="Prompt", lines=3, value="A batch of videos depicting different landscapes.") | |
batch_negative_prompt = gr.Textbox(label="Negative Prompt", lines=3, value="low quality, inconsistent, jittery") | |
with gr.Row(): | |
batch_height = gr.Slider(label="Height", minimum=240, maximum=1080, step=1, value=480) | |
batch_width = gr.Slider(label="Width", minimum=320, maximum=1920, step=1, value=704) | |
with gr.Row(): | |
batch_num_frames = gr.Slider(label="Number of Frames", minimum=1, maximum=500, step=1, value=161) | |
batch_num_inference_steps = gr.Slider(label="Number of Inference Steps", minimum=1, maximum=100, step=1, value=50) | |
with gr.Row(): | |
batch_fps = gr.Slider(label="FPS", minimum=1, maximum=60, step=1, value=24) | |
batch_seed = gr.Number(label="Seed", value=0) | |
random_seed_batch_button = gr.Button("Randomize Seed") | |
add_to_queue_button = gr.Button("Add to Queue") | |
clear_queue_button = gr.Button("Clear Queue") | |
process_queue_button = gr.Button("Process Queue") | |
queue_status = gr.Text(label="Queue Status") | |
random_seed_batch_button.click(lambda: random.randint(0, 999999), outputs=batch_seed) | |
add_to_queue_button.click( | |
add_to_queue, | |
inputs=[batch_input_image, batch_prompt, batch_negative_prompt, batch_height, batch_width, batch_num_frames, batch_num_inference_steps, batch_fps, batch_seed], | |
outputs=queue_status | |
) | |
clear_queue_button.click(clear_queue, outputs=queue_status) | |
process_queue_button.click(process_queue, outputs=queue_status) | |
demo.launch() |