Spaces:
Running
Running
import gradio as gr | |
import requests | |
from PIL import Image | |
from transformers import BlipProcessor, BlipForConditionalGeneration | |
import torch | |
from diffusers import AnimateDiffPipeline, LCMScheduler, MotionAdapter | |
from diffusers.utils import export_to_gif | |
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips | |
from transformers import AutoProcessor, MusicgenForConditionalGeneration | |
import scipy.io.wavfile | |
import re | |
import glob | |
import os | |
# 定义图像到文本函数 | |
def img2text(image): | |
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large") | |
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large") | |
inputs = processor(image, return_tensors="pt") | |
out = model.generate(**inputs) | |
caption = processor.decode(out[0], skip_special_tokens=True) | |
return caption | |
# 定义文本生成函数 | |
def text2text(user_input): | |
api_key = "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" | |
base_url = "https://openrouter.ai/api/v1" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"model": "openai/gpt-3.5-turbo", | |
"messages": [ | |
{ | |
"role": "system", | |
"content": ( | |
"You are an expert who is very good at writing stories. Please expand it into a continuous story based on the input, and logically cut the story into sentences. Each sentence is a scene (as many sentences and scenes as possible, and at least 10 sentences). Each sentence is required The content of the sentence description should be detailed and do not use rhetorical techniques, and no ambiguous words such as pronouns should appear in the sentence. Be as detailed as possible to accurately describe who is doing what, and the scene descriptions before and after should have a certain correlation. In addition, I require your answer to follow a certain format. Let me give you an example. For example, I enter: a dolphin jumping out of the water at sunset. " | |
"Your answer format: " | |
""" | |
[1] The sun nears the horizon, illuminating the calm sea surface with a warm glow. | |
[2] A dolphin swims swiftly below the calm sea surface, moving closer to the top. | |
[3] The dolphin uses its powerful tail fin to prepare for a leap out of the water. | |
[4] The dolphin's body starts to emerge from the water, exposing itself above the surface. | |
[5] The dolphin leaps completely out of the water, creating an arch with its body in the air. | |
[6] The dolphin rotates its body in the air before it begins its descent back to the water. | |
[7] The dolphin's head and back make the first contact with the water, creating a splash. | |
[8] The dolphin fully submerges under the water, causing the splashes around it to slowly disperse. | |
[9] The dolphin moves forward underwater, gradually disappearing into the dimming light of the sunset. | |
""" | |
"My input is as follows, please answer me in the format without adding any other words." | |
) | |
}, | |
{ "role": "user", "content": user_input } | |
] | |
} | |
response = requests.post(f"{base_url}/chat/completions", headers=headers, json=data) | |
response.raise_for_status() | |
completion = response.json() | |
return completion['choices'][0]['message']['content'] | |
# 定义文本到视频函数 | |
def text2vid(input_text): | |
sentences = re.findall(r'\[\d+\] (.+?)(?:\n|\Z)', input_text) | |
adapter = MotionAdapter.from_pretrained("wangfuyun/AnimateLCM", config_file="wangfuyun/AnimateLCM/config.json", torch_dtype=torch.float16) | |
pipe = AnimateDiffPipeline.from_pretrained("emilianJR/epiCRealism", motion_adapter=adapter, torch_dtype=torch.float16) | |
pipe.scheduler = LCMScheduler.from_config(pipe.scheduler.config, beta_schedule="linear") | |
pipe.load_lora_weights("wangfuyun/AnimateLCM", weight_name="AnimateLCM_sd15_t2v_lora.safetensors", adapter_name="lcm-lora") | |
try: | |
pipe.set_adapters(["lcm-lora"], [0.8]) | |
except ValueError as e: | |
print("Ignoring the error:", str(e)) | |
pipe.enable_vae_slicing() | |
pipe.enable_model_cpu_offload() | |
for index, sentence in enumerate(sentences): | |
output = pipe( | |
prompt=sentence + ", 4k, high resolution", | |
negative_prompt="bad quality, worse quality, low resolution", | |
num_frames=24, | |
guidance_scale=2.0, | |
num_inference_steps=6, | |
generator=torch.Generator("cpu").manual_seed(0) | |
) | |
frames = output.frames[0] | |
export_to_gif(frames, f"./{index+1}.gif") | |
# 定义生成最终视频的函数 | |
def video_generate(): | |
frame_rate = 24 | |
gif_files = sorted(glob.glob('./*.gif')) | |
clips = [VideoFileClip(gif) for gif in gif_files] | |
final_clip = concatenate_videoclips(clips, method="compose") | |
final_clip.write_videofile('output_video.mp4', codec='libx264') | |
# 定义文本到音频函数 | |
def text2audio(text_input, duration_seconds): | |
processor = AutoProcessor.from_pretrained("facebook/musicgen-small") | |
model = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-small") | |
inputs = processor(text=[text_input], padding=True, return_tensors="pt") | |
max_new_tokens = int((duration_seconds / 5) * 256) | |
audio_values = model.generate(**inputs, max_new_tokens=max_new_tokens) | |
scipy.io.wavfile.write("bgm.wav", rate=model.config.audio_encoder.sampling_rate, data=audio_values[0, 0].numpy()) | |
# 定义生成结果视频的函数 | |
def result_generate(): | |
video = VideoFileClip("output_video.mp4") | |
audio = AudioFileClip("bgm.wav") | |
video = video.set_audio(audio) | |
video.write_videofile("result.mp4", codec="libx264", audio_codec="aac") | |
# 定义删除所有文件的函数 | |
def delete_all_files(directory): | |
for filename in os.listdir(directory): | |
file_path = os.path.join(directory, filename) | |
try: | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
print(f"Deleted {filename}") | |
elif os.path.isdir(file_path): | |
os.rmdir(file_path) | |
print(f"Deleted empty directory {filename}") | |
except Exception as e: | |
print(f"Failed to delete {filename}. Reason: {e}") | |
# 整合所有步骤到主函数 | |
def generate_video(image): | |
delete_all_files("data") | |
text = img2text(image) | |
sentences = text2text(text) | |
text2vid(sentences) | |
video_generate() | |
video = VideoFileClip("output_video.mp4") | |
duration = video.duration | |
audio_text = text2text(text) | |
text2audio(audio_text, duration) | |
result_generate() | |
return "result.mp4" | |
# 定义 Gradio 接口 | |
# interface = gr.Interface( | |
# fn=generate_video, | |
# inputs=gr.Image(type="pil"), | |
# outputs=gr.Video(), | |
# title="Custom Video Generation", | |
# description="Upload an image to generate a video using a custom model", | |
# theme="soft" | |
# ) | |
interface = gr.Interface( | |
fn=lambda img: generate_video(img), | |
inputs=gr.Image(type="pil"), | |
outputs=gr.Video(), | |
title="Custom Video Generation", | |
description="Upload an image to generate a video", | |
theme="soft" | |
) | |
# 启动 Gradio 应用 | |
interface.launch() | |