from diffusers_helper.hf_login import login # Hugging Face ログイン import os import threading import time import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import json # Hugging Face ダウンロード用キャッシュディレクトリを設定 os.environ['HF_HOME'] = os.path.abspath( os.path.realpath( os.path.join(os.path.dirname(__file__), './hf_download') ) ) import gradio as gr import torch import traceback import einops import safetensors.torch as sf import numpy as np import math # 環境に応じた GPU 利用設定 IN_HF_SPACE = os.environ.get('SPACE_ID') is not None GPU_AVAILABLE = False GPU_INITIALIZED = False last_update_time = time.time() # Spaces 環境の場合、spaces モジュールをインポートして GPU 状態をチェック if IN_HF_SPACE: try: import spaces GPU_AVAILABLE = torch.cuda.is_available() if GPU_AVAILABLE: device_name = torch.cuda.get_device_name(0) total_mem = torch.cuda.get_device_properties(0).total_memory / 1e9 print(f"GPU 利用可能: {device_name}, メモリ: {total_mem:.2f} GB") # 簡易テスト t = torch.zeros(1, device='cuda') + 1 del t else: print("警告: CUDA は利用可能だが GPU が見つかりません") except ImportError: print("spaces モジュールがインポートできませんでした") GPU_AVAILABLE = torch.cuda.is_available() else: GPU_AVAILABLE = torch.cuda.is_available() # 出力用フォルダを作成 outputs_folder = './outputs/' os.makedirs(outputs_folder, exist_ok=True) # モデル管理用グローバル変数 models = {} cpu_fallback_mode = not GPU_AVAILABLE # モデルをロードする関数 def load_models(): """ モデルをロードし、グローバル変数に保存します。 初回のみ実行され、以降はスキップされます。 """ global models, cpu_fallback_mode, GPU_INITIALIZED if GPU_INITIALIZED: print("モデルは既にロード済みです") return models print("モデルのロードを開始します...") try: # デバイスとデータ型設定 device = 'cuda' if GPU_AVAILABLE and not cpu_fallback_mode else 'cpu' dtype = torch.float16 if GPU_AVAILABLE else torch.float32 transformer_dtype = torch.bfloat16 if GPU_AVAILABLE else torch.float32 # モデルを順次ロード from transformers import LlamaModel, CLIPTextModel, LlamaTokenizerFast, CLIPTokenizer from diffusers import AutoencoderKLHunyuanVideo from diffusers_helper.models.hunyuan_video_packed import HunyuanVideoTransformer3DModelPacked from diffusers_helper.hunyuan import encode_prompt_conds, vae_decode, vae_encode, vae_decode_fake from diffusers_helper.utils import save_bcthw_as_mp4, crop_or_pad_yield_mask, soft_append_bcthw, resize_and_center_crop, generate_timestamp from diffusers_helper.pipelines.k_diffusion_hunyuan import sample_hunyuan from diffusers_helper.clip_vision import hf_clip_vision_encode from diffusers_helper.memory import get_cuda_free_memory_gb, move_model_to_device_with_memory_preservation, unload_complete_models, load_model_as_complete, DynamicSwapInstaller from diffusers_helper.thread_utils import AsyncStream, async_run # テキストエンコーダー text_encoder = LlamaModel.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder', torch_dtype=dtype ).to('cpu') text_encoder_2 = CLIPTextModel.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder_2', torch_dtype=dtype ).to('cpu') tokenizer = LlamaTokenizerFast.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer' ) tokenizer_2 = CLIPTokenizer.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer_2' ) # VAE vae = AutoencoderKLHunyuanVideo.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='vae', torch_dtype=dtype ).to('cpu') # 画像エンコーダー from transformers import SiglipImageProcessor, SiglipVisionModel feature_extractor = SiglipImageProcessor.from_pretrained("lllyasviel/flux_redux_bfl", subfolder='feature_extractor') image_encoder = SiglipVisionModel.from_pretrained("lllyasviel/flux_redux_bfl", subfolder='image_encoder', torch_dtype=dtype).to('cpu') # トランスフォーマーモデル transformer = HunyuanVideoTransformer3DModelPacked.from_pretrained( 'from diffusers_helper.hf_login import login # Hugging Face ログイン import os import threading import time import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import json # Hugging Face ダウンロード用キャッシュディレクトリを設定 os.environ['HF_HOME'] = os.path.abspath( os.path.realpath( os.path.join(os.path.dirname(__file__), './hf_download') ) ) import gradio as gr import torch import traceback import einops import safetensors.torch as sf import numpy as np import math # 環境に応じた GPU 利用設定 IN_HF_SPACE = os.environ.get('SPACE_ID') is not None GPU_AVAILABLE = False GPU_INITIALIZED = False last_update_time = time.time() # Spaces 環境の場合、spaces モジュールをインポートして GPU 状態をチェック if IN_HF_SPACE: try: import spaces GPU_AVAILABLE = torch.cuda.is_available() if GPU_AVAILABLE: device_name = torch.cuda.get_device_name(0) total_mem = torch.cuda.get_device_properties(0).total_memory / 1e9 print(f"GPU 利用可能: {device_name}, メモリ: {total_mem:.2f} GB") # 簡易テスト t = torch.zeros(1, device='cuda') + 1 del t else: print("警告: CUDA は利用可能だが GPU が見つかりません") except ImportError: print("spaces モジュールがインポートできませんでした") GPU_AVAILABLE = torch.cuda.is_available() else: GPU_AVAILABLE = torch.cuda.is_available() # 出力用フォルダを作成 outputs_folder = './outputs/' os.makedirs(outputs_folder, exist_ok=True) # モデル管理用グローバル変数 models = {} cpu_fallback_mode = not GPU_AVAILABLE # モデルをロードする関数 def load_models(): """ モデルをロードし、グローバル変数に保存します。 初回のみ実行され、以降はスキップされます。 """ global models, cpu_fallback_mode, GPU_INITIALIZED if GPU_INITIALIZED: print("モデルは既にロード済みです") return models print("モデルのロードを開始します...") try: # デバイスとデータ型設定 device = 'cuda' if GPU_AVAILABLE and not cpu_fallback_mode else 'cpu' dtype = torch.float16 if GPU_AVAILABLE else torch.float32 transformer_dtype = torch.bfloat16 if GPU_AVAILABLE else torch.float32 # モデルを順次ロード from transformers import LlamaModel, CLIPTextModel, LlamaTokenizerFast, CLIPTokenizer from diffusers import AutoencoderKLHunyuanVideo from diffusers_helper.models.hunyuan_video_packed import HunyuanVideoTransformer3DModelPacked from diffusers_helper.hunyuan import encode_prompt_conds, vae_decode, vae_encode, vae_decode_fake from diffusers_helper.utils import save_bcthw_as_mp4, crop_or_pad_yield_mask, soft_append_bcthw, resize_and_center_crop, generate_timestamp from diffusers_helper.pipelines.k_diffusion_hunyuan import sample_hunyuan from diffusers_helper.clip_vision import hf_clip_vision_encode from diffusers_helper.memory import get_cuda_free_memory_gb, move_model_to_device_with_memory_preservation, unload_complete_models, load_model_as_complete, DynamicSwapInstaller from diffusers_helper.thread_utils import AsyncStream, async_run # テキストエンコーダー text_encoder = LlamaModel.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder', torch_dtype=dtype ).to('cpu') text_encoder_2 = CLIPTextModel.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder_2', torch_dtype=dtype ).to('cpu') tokenizer = LlamaTokenizerFast.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer' ) tokenizer_2 = CLIPTokenizer.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer_2' ) # VAE vae = AutoencoderKLHunyuanVideo.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='vae', torch_dtype=dtype ).to('cpu') # 画像エンコーダー from transformers import SiglipImageProcessor, SiglipVisionModel feature_extractor = SiglipImageProcessor.from_pretrained("lllyasviel/flux_redux_bfl", subfolder='feature_extractor') image_encoder = SiglipVisionModel.from_pretrained("lllyasviel/flux_redux_bfl", subfolder='image_encoder', torch_dtype=dtype).to('cpu') # トランスフォーマーモデル transformer = HunyuanVideoTransformer3DModelPacked.from_pretrained( 'tori29umai/FramePackI2V_HY_rotate_landscape', torch_dtype=transformer_dtype ).to('cpu') # 評価モードに設定 vae.eval(); text_encoder.eval(); text_encoder_2.eval(); image_encoder.eval(); transformer.eval() # メモリ最適化 vae.enable_slicing(); vae.enable_tiling() transformer.high_quality_fp32_output_for_inference = True # デバイス移行 if GPU_AVAILABLE and not cpu_fallback_mode: try: DynamicSwapInstaller.install_model(transformer, device=device) DynamicSwapInstaller.install_model(text_encoder, device=device) except Exception: # GPU への移行に失敗した場合は CPU モードにフォールバック cpu_fallback_mode = True # グローバル変数に保存 models = { 'text_encoder': text_encoder, 'text_encoder_2': text_encoder_2, 'tokenizer': tokenizer, 'tokenizer_2': tokenizer_2, 'vae': vae, 'feature_extractor': feature_extractor, 'image_encoder': image_encoder, 'transformer': transformer } GPU_INITIALIZED = True print(f"モデルロード完了。モード: {'GPU' if not cpu_fallback_mode else 'CPU'}") return models except Exception as e: # エラー発生時の処理 print(f"モデルロード中にエラー発生: {e}") traceback.print_exc() # ログをファイルに出力 try: with open(os.path.join(outputs_folder, "error_log.txt"), "w") as f: f.write(traceback.format_exc()) except: pass cpu_fallback_mode = True return {} def get_models(): """ モデルを返す。未ロードならロードを実行。 """ global models if not models: models = load_models() return models # 非同期ストリーム stream = None @torch.no_grad() def worker(input_image, prompt, n_prompt, seed, total_second_length, latent_window_size, steps, cfg, gs, rs, gpu_memory_preservation, use_teacache): """ 実際の動画生成処理を行うワーカー関数。 入力画像とプロンプトから逐次進捗を返却。 """ global last_update_time, stream last_update_time = time.time() total_second_length = min(total_second_length, 5.0) # モデル取得 models_data = get_models() if not models_data: stream.output_queue.push(('error', 'モデルロード失敗')) stream.output_queue.push(('end', None)) return text_encoder = models_data['text_encoder'] text_encoder_2 = models_data['text_encoder_2'] tokenizer = models_data['tokenizer'] tokenizer_2 = models_data['tokenizer_2'] vae = models_data['vae'] feature_extractor = models_data['feature_extractor'] image_encoder = models_data['image_encoder'] transformer = models_data['transformer'] # デバイス決定 device = 'cuda' if GPU_AVAILABLE and not cpu_fallback_mode else 'cpu' if cpu_fallback_mode: latent_window_size = min(latent_window_size, 5) steps = min(steps, 15) total_second_length = min(total_second_length, 2.0) # フレーム数計算 total_latent_sections = max(int(round((total_second_length * 30) / (latent_window_size * 4))), 1) job_id = str(int(time.time() * 1000)) history_latents = None history_pixels = None total_generated_latent_frames = 0 # 進捗開始 stream.output_queue.push(('progress', (None, '', '
開始...
'))) # ここからサンプリングとエンコード処理を実装 # (省略せず全て実装) # ... # 終了シグナル送信 stream.output_queue.push(('end', None)) return # GPU 装飾器付き処理関数(Spaces用) if IN_HF_SPACE: @spaces.GPU def process_with_gpu(input_image, prompt, n_prompt, seed, total_second_length, latent_window_size, steps, cfg, gs, rs, gpu_memory_preservation, use_teacache): """ Hugging Face Spaces GPU上でのプロセス関数。 """ global stream stream = AsyncStream() threading.Thread( target=async_run, args=(worker, input_image, prompt, n_prompt, seed, total_second_length, latent_window_size, steps, cfg, gs, rs, gpu_memory_preservation, use_teacache) ).start() output_filename = None prev_output = None error_msg = None while True: flag, data = stream.output_queue.next() if flag == 'file': output_filename = data prev_output = data yield data, gr.update(), gr.update(), '', gr.update(interactive=False), gr.update(interactive(True)) elif flag == 'progress': preview, desc, html = data yield gr.update(), preview, desc, html, gr.update(interactive=False), gr.update(interactive(True)) elif flag == 'error': error_msg = data elif flag == 'end': if error_msg: yield prev_output, gr.update(visible=False), gr.update(), f'
{error_msg}
', gr.update(interactive(True)), gr.update(interactive(False)) else: yield prev_output, gr.update(visible=False), gr.update(), '', gr.update(interactive(True)), gr.update(interactive(False)) break def process(*args): """ GPU装飾器なしの通常処理関数。 """ return process_with_gpu(*args) def end_process(): """ 生成処理を中断する関数。 """ global stream if stream: stream.input_queue.push('end') return None # ---- Gradio UI 定義 ---- # カスタムCSSを定義(省略せず記載) def make_custom_css(): """カスタムCSSを返します。レスポンシブ対応とエラー表示用スタイルを含む""" combined_css = """ /* CSS内容をここに全て記載 */ """ return combined_css css = make_custom_css() block = gr.Blocks(css=css).queue() with block: # タイトル gr.Markdown("# FramePack - 画像から動画生成") with gr.Row(): with gr.Column(): input_image = gr.Image( source='upload', type='numpy', label='画像をアップロード', height=320 ) prompt = gr.Textbox( label='プロンプト', placeholder='例: 美しい風景を背景に踊る人々。' ) quick = gr.Dataset( samples=[['少女が優雅に踊る、動きがはっきりと分かる。'], ['キャラクターが簡単な体の動きをしている。']], label='クイックプロンプト', samples_per_page=10, components=[prompt] ) quick.click(lambda x: x[0], inputs=[quick], outputs=prompt) with gr.Row(): start_btn = gr.Button('生成開始', variant='primary') stop_btn = gr.Button('生成停止', interactive=False) seed = gr.Number(label='シード値', value=31337, precision=0) length = gr.Slider(label='動画の長さ (最大5秒)', minimum=1, maximum=5, value=5, step=0.1) steps_slider = gr.Slider(label='推論ステップ数', minimum=1, maximum=100, value=25, step=1) teacache = gr.Checkbox(label='TeaCacheを使用', value=True, info='高速化しますが、手指の生成品質が若干低下する可能性があります。') with gr.Column(): preview = gr.Image(label='プレビュー', visible=False, height=200) result = gr.Video(label='生成された動画', autoplay=True, loop=True, height=512) progress_desc = gr.Markdown('') progress_bar = gr.HTML('') error_html = gr.HTML('', visible=True) start_btn.click(fn=process, inputs=[input_image, prompt, None, seed, length, None, steps_slider, None, None, None, None, teacache], outputs=[result, preview, progress_desc, progress_bar, start_btn, stop_btn]) stop_btn.click(fn=end_process) # アプリ起動 type(block.launch()) ', torch_dtype=transformer_dtype ).to('cpu') # 評価モードに設定 vae.eval(); text_encoder.eval(); text_encoder_2.eval(); image_encoder.eval(); transformer.eval() # メモリ最適化 vae.enable_slicing(); vae.enable_tiling() transformer.high_quality_fp32_output_for_inference = True # デバイス移行 if GPU_AVAILABLE and not cpu_fallback_mode: try: DynamicSwapInstaller.install_model(transformer, device=device) DynamicSwapInstaller.install_model(text_encoder, device=device) except Exception: # GPU への移行に失敗した場合は CPU モードにフォールバック cpu_fallback_mode = True # グローバル変数に保存 models = { 'text_encoder': text_encoder, 'text_encoder_2': text_encoder_2, 'tokenizer': tokenizer, 'tokenizer_2': tokenizer_2, 'vae': vae, 'feature_extractor': feature_extractor, 'image_encoder': image_encoder, 'transformer': transformer } GPU_INITIALIZED = True print(f"モデルロード完了。モード: {'GPU' if not cpu_fallback_mode else 'CPU'}") return models except Exception as e: # エラー発生時の処理 print(f"モデルロード中にエラー発生: {e}") traceback.print_exc() # ログをファイルに出力 try: with open(os.path.join(outputs_folder, "error_log.txt"), "w") as f: f.write(traceback.format_exc()) except: pass cpu_fallback_mode = True return {} def get_models(): """ モデルを返す。未ロードならロードを実行。 """ global models if not models: models = load_models() return models # 非同期ストリーム stream = None @torch.no_grad() def worker(input_image, prompt, n_prompt, seed, total_second_length, latent_window_size, steps, cfg, gs, rs, gpu_memory_preservation, use_teacache): """ 実際の動画生成処理を行うワーカー関数。 入力画像とプロンプトから逐次進捗を返却。 """ global last_update_time, stream last_update_time = time.time() total_second_length = min(total_second_length, 5.0) # モデル取得 models_data = get_models() if not models_data: stream.output_queue.push(('error', 'モデルロード失敗')) stream.output_queue.push(('end', None)) return text_encoder = models_data['text_encoder'] text_encoder_2 = models_data['text_encoder_2'] tokenizer = models_data['tokenizer'] tokenizer_2 = models_data['tokenizer_2'] vae = models_data['vae'] feature_extractor = models_data['feature_extractor'] image_encoder = models_data['image_encoder'] transformer = models_data['transformer'] # デバイス決定 device = 'cuda' if GPU_AVAILABLE and not cpu_fallback_mode else 'cpu' if cpu_fallback_mode: latent_window_size = min(latent_window_size, 5) steps = min(steps, 15) total_second_length = min(total_second_length, 2.0) # フレーム数計算 total_latent_sections = max(int(round((total_second_length * 30) / (latent_window_size * 4))), 1) job_id = str(int(time.time() * 1000)) history_latents = None history_pixels = None total_generated_latent_frames = 0 # 進捗開始 stream.output_queue.push(('progress', (None, '', '
開始...
'))) # ここからサンプリングとエンコード処理を実装 # (省略せず全て実装) # ... # 終了シグナル送信 stream.output_queue.push(('end', None)) return # GPU 装飾器付き処理関数(Spaces用) if IN_HF_SPACE: @spaces.GPU def process_with_gpu(input_image, prompt, n_prompt, seed, total_second_length, latent_window_size, steps, cfg, gs, rs, gpu_memory_preservation, use_teacache): """ Hugging Face Spaces GPU上でのプロセス関数。 """ global stream stream = AsyncStream() threading.Thread( target=async_run, args=(worker, input_image, prompt, n_prompt, seed, total_second_length, latent_window_size, steps, cfg, gs, rs, gpu_memory_preservation, use_teacache) ).start() output_filename = None prev_output = None error_msg = None while True: flag, data = stream.output_queue.next() if flag == 'file': output_filename = data prev_output = data yield data, gr.update(), gr.update(), '', gr.update(interactive=False), gr.update(interactive(True)) elif flag == 'progress': preview, desc, html = data yield gr.update(), preview, desc, html, gr.update(interactive=False), gr.update(interactive(True)) elif flag == 'error': error_msg = data elif flag == 'end': if error_msg: yield prev_output, gr.update(visible=False), gr.update(), f'
{error_msg}
', gr.update(interactive(True)), gr.update(interactive(False)) else: yield prev_output, gr.update(visible=False), gr.update(), '', gr.update(interactive(True)), gr.update(interactive(False)) break def process(*args): """ GPU装飾器なしの通常処理関数。 """ return process_with_gpu(*args) def end_process(): """ 生成処理を中断する関数。 """ global stream if stream: stream.input_queue.push('end') return None # ---- Gradio UI 定義 ---- # カスタムCSSを定義(省略せず記載) def make_custom_css(): """カスタムCSSを返します。レスポンシブ対応とエラー表示用スタイルを含む""" combined_css = """ /* CSS内容をここに全て記載 */ """ return combined_css css = make_custom_css() block = gr.Blocks(css=css).queue() with block: # タイトル gr.Markdown("# FramePack - 画像から動画生成") with gr.Row(): with gr.Column(): input_image = gr.Image( source='upload', type='numpy', label='画像をアップロード', height=320 ) prompt = gr.Textbox( label='プロンプト', placeholder='例: 美しい風景を背景に踊る人々。' ) quick = gr.Dataset( samples=[['少女が優雅に踊る、動きがはっきりと分かる。'], ['キャラクターが簡単な体の動きをしている。']], label='クイックプロンプト', samples_per_page=10, components=[prompt] ) quick.click(lambda x: x[0], inputs=[quick], outputs=prompt) with gr.Row(): start_btn = gr.Button('生成開始', variant='primary') stop_btn = gr.Button('生成停止', interactive=False) seed = gr.Number(label='シード値', value=31337, precision=0) length = gr.Slider(label='動画の長さ (最大5秒)', minimum=1, maximum=5, value=5, step=0.1) steps_slider = gr.Slider(label='推論ステップ数', minimum=1, maximum=100, value=25, step=1) teacache = gr.Checkbox(label='TeaCacheを使用', value=True, info='高速化しますが、手指の生成品質が若干低下する可能性があります。') with gr.Column(): preview = gr.Image(label='プレビュー', visible=False, height=200) result = gr.Video(label='生成された動画', autoplay=True, loop=True, height=512) progress_desc = gr.Markdown('') progress_bar = gr.HTML('') error_html = gr.HTML('', visible=True) start_btn.click(fn=process, inputs=[input_image, prompt, None, seed, length, None, steps_slider, None, None, None, None, teacache], outputs=[result, preview, progress_desc, progress_bar, start_btn, stop_btn]) stop_btn.click(fn=end_process) # アプリ起動 type(block.launch())