File size: 8,567 Bytes
9661bf3
a8ba4df
9661bf3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a9449c3
bc9ab22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9661bf3
 
 
 
 
 
 
 
 
 
 
a8ba4df
9661bf3
e2ffae4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9661bf3
 
 
 
 
 
 
 
bc9ab22
 
 
 
 
 
 
 
 
894b916
bc9ab22
 
9661bf3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9c8c926
9661bf3
 
9c8c926
9661bf3
 
 
 
 
 
 
 
2708ba5
9661bf3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bc9ab22
 
 
 
 
 
9661bf3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26f43cb
9661bf3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7657332
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import gradio as gr
import spaces
import os
import sys
import shutil
import uuid
import subprocess
from glob import glob
from huggingface_hub import snapshot_download

# Download models
os.makedirs("checkpoints", exist_ok=True)

snapshot_download(
    repo_id = "chunyu-li/LatentSync",
    local_dir = "./checkpoints"  
)

import tempfile
from moviepy.editor import VideoFileClip
from pydub import AudioSegment

def process_video(input_video_path, temp_dir="temp_dir"):
    """
    Crop a given MP4 video to a maximum duration of 10 seconds if it is longer than 10 seconds.
    Save the new video in the specified folder (default is temp_dir).
    
    Args:
        input_video_path (str): Path to the input video file.
        temp_dir (str): Directory where the processed video will be saved.
        
    Returns:
        str: Path to the cropped video file.
    """
    # Ensure the temp_dir exists
    os.makedirs(temp_dir, exist_ok=True)
    
    # Load the video
    video = VideoFileClip(input_video_path)
    
    # Determine the output path
    input_file_name = os.path.basename(input_video_path)
    output_video_path = os.path.join(temp_dir, f"cropped_{input_file_name}")
    
    # Crop the video to 10 seconds if necessary
    if video.duration > 10:
        video = video.subclip(0, 10)
    
    # Write the cropped video to the output path
    video.write_videofile(output_video_path, codec="libx264", audio_codec="aac")
    
    # Return the path to the cropped video
    return output_video_path

def process_audio(file_path, temp_dir):
    # Load the audio file
    audio = AudioSegment.from_file(file_path)
    
    # Check and cut the audio if longer than 4 seconds
    max_duration = 8 * 1000  # 4 seconds in milliseconds
    if len(audio) > max_duration:
        audio = audio[:max_duration]
    
    # Save the processed audio in the temporary directory
    output_path = os.path.join(temp_dir, "trimmed_audio.wav")
    audio.export(output_path, format="wav")
    
    # Return the path to the trimmed file
    print(f"Processed audio saved at: {output_path}")
    return output_path

import argparse
from omegaconf import OmegaConf
import torch
from diffusers import AutoencoderKL, DDIMScheduler
from latentsync.models.unet import UNet3DConditionModel
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline
from diffusers.utils.import_utils import is_xformers_available
from accelerate.utils import set_seed
from latentsync.whisper.audio2feature import Audio2Feature


@spaces.GPU(duration=180)
def main(video_path, audio_path, progress=gr.Progress(track_tqdm=True)):
    """
    Perform lip-sync video generation using an input video and a separate audio track.
    
    This function takes an input video (usually a person speaking) and an audio file,
    and synchronizes the video frames so that the lips of the speaker match the audio content.
    It uses a latent diffusion model-based pipeline (LatentSync) for audio-conditioned lip synchronization.
    
    Args:
        video_path (str): File path to the input video in MP4 format.
        audio_path (str): File path to the input audio file (e.g., WAV or MP3).
        progress (gr.Progress, optional): Gradio progress tracker for UI feedback (auto-injected).
        
    Returns:
        str: File path to the generated output video with lip synchronization applied.
    """
    
    inference_ckpt_path = "checkpoints/latentsync_unet.pt"
    unet_config_path = "configs/unet/second_stage.yaml"
    config = OmegaConf.load(unet_config_path)
    
    print(f"Input video path: {video_path}")
    print(f"Input audio path: {audio_path}")
    print(f"Loaded checkpoint path: {inference_ckpt_path}")

    is_shared_ui = True if "fffiloni/LatentSync" in os.environ['SPACE_ID'] else False
    temp_dir = None
    if is_shared_ui:
        temp_dir = tempfile.mkdtemp()
        cropped_video_path = process_video(video_path)
        print(f"Cropped video saved to: {cropped_video_path}")
        video_path=cropped_video_path

        trimmed_audio_path = process_audio(audio_path, temp_dir)
        print(f"Processed file was stored temporarily at: {trimmed_audio_path}")
        audio_path=trimmed_audio_path

    scheduler = DDIMScheduler.from_pretrained("configs")

    if config.model.cross_attention_dim == 768:
        whisper_model_path = "checkpoints/whisper/small.pt"
    elif config.model.cross_attention_dim == 384:
        whisper_model_path = "checkpoints/whisper/tiny.pt"
    else:
        raise NotImplementedError("cross_attention_dim must be 768 or 384")

    audio_encoder = Audio2Feature(model_path=whisper_model_path, device="cuda", num_frames=config.data.num_frames)

    vae = AutoencoderKL.from_pretrained("stabilityai/sd-vae-ft-mse", torch_dtype=torch.float16)
    vae.config.scaling_factor = 0.18215
    vae.config.shift_factor = 0

    unet, _ = UNet3DConditionModel.from_pretrained(
        OmegaConf.to_container(config.model),
        inference_ckpt_path,  # load checkpoint
        device="cpu",
    )

    unet = unet.to(dtype=torch.float16)

    # set xformers
    """
    if is_xformers_available():
        unet.enable_xformers_memory_efficient_attention()
    """

    pipeline = LipsyncPipeline(
        vae=vae,
        audio_encoder=audio_encoder,
        unet=unet,
        scheduler=scheduler,
    ).to("cuda")

    seed = -1
    if seed != -1:
        set_seed(seed)
    else:
        torch.seed()

    print(f"Initial seed: {torch.initial_seed()}")

    unique_id = str(uuid.uuid4())
    video_out_path = f"video_out{unique_id}.mp4"

    pipeline(
        video_path=video_path,
        audio_path=audio_path,
        video_out_path=video_out_path,
        video_mask_path=video_out_path.replace(".mp4", "_mask.mp4"),
        num_frames=config.data.num_frames,
        num_inference_steps=config.run.inference_steps,
        guidance_scale=1.0,
        weight_dtype=torch.float16,
        width=config.data.resolution,
        height=config.data.resolution,
    )

    if is_shared_ui:
        # Clean up the temporary directory
        if os.path.exists(temp_dir):
            shutil.rmtree(temp_dir)
            print(f"Temporary directory {temp_dir} deleted.")

    return video_out_path


css="""
div#col-container{
    margin: 0 auto;
    max-width: 982px;
}
"""
with gr.Blocks(css=css) as demo:
    with gr.Column(elem_id="col-container"):
        gr.Markdown("# LatentSync: Audio Conditioned Latent Diffusion Models for Lip Sync")
        gr.Markdown("LatentSync, an end-to-end lip sync framework based on audio conditioned latent diffusion models without any intermediate motion representation, diverging from previous diffusion-based lip sync methods based on pixel space diffusion or two-stage generation.")
        gr.HTML("""
        <div style="display:flex;column-gap:4px;">
            <a href="https://github.com/bytedance/LatentSync">
                <img src='https://img.shields.io/badge/GitHub-Repo-blue'>
            </a> 
            <a href="https://arxiv.org/abs/2412.09262">
                <img src='https://img.shields.io/badge/ArXiv-Paper-red'>
            </a>
            <a href="https://huggingface.co/spaces/fffiloni/LatentSync?duplicate=true">
                <img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-sm.svg" alt="Duplicate this Space">
            </a>
            <a href="https://huggingface.co/fffiloni">
                <img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/follow-me-on-HF-sm-dark.svg" alt="Follow me on HF">
            </a>
        </div>
        """)
        with gr.Row():
            with gr.Column():
                video_input = gr.Video(label="Video Control", format="mp4")
                audio_input = gr.Audio(label="Audio Input", type="filepath")
                submit_btn = gr.Button("Submit")
            with gr.Column():
                video_result = gr.Video(label="Result")

                gr.Examples(
                    examples = [
                        ["assets/demo1_video.mp4", "assets/demo1_audio.wav"],
                        ["assets/demo2_video.mp4", "assets/demo2_audio.wav"],
                        ["assets/demo3_video.mp4", "assets/demo3_audio.wav"],
                    ],
                    inputs = [video_input, audio_input]
                )

    submit_btn.click(
        fn = main,
        inputs = [video_input, audio_input],
        outputs = [video_result]
    )

demo.queue().launch(show_api=True, show_error=True, ssr_mode=False, mcp_server=True)