import random import tempfile import unittest import numpy as np import torch from PIL import Image from transformers import AutoTokenizer, CLIPTextConfig, CLIPTextModel, CLIPTokenizer, T5EncoderModel import diffusers from diffusers import AutoencoderKL, FlowMatchEulerDiscreteScheduler, FluxTransformer2DModel, VisualClozePipeline from diffusers.utils import logging from diffusers.utils.testing_utils import ( CaptureLogger, enable_full_determinism, floats_tensor, require_accelerator, torch_device, ) from ..test_pipelines_common import PipelineTesterMixin, to_np enable_full_determinism() class VisualClozePipelineFastTests(unittest.TestCase, PipelineTesterMixin): pipeline_class = VisualClozePipeline params = frozenset( [ "task_prompt", "content_prompt", "upsampling_height", "upsampling_width", "guidance_scale", "prompt_embeds", "pooled_prompt_embeds", "upsampling_strength", ] ) batch_params = frozenset(["task_prompt", "content_prompt", "image"]) test_xformers_attention = False test_layerwise_casting = True test_group_offloading = True supports_dduf = False def get_dummy_components(self): torch.manual_seed(0) transformer = FluxTransformer2DModel( patch_size=1, in_channels=12, out_channels=4, num_layers=1, num_single_layers=1, attention_head_dim=6, num_attention_heads=2, joint_attention_dim=32, pooled_projection_dim=32, axes_dims_rope=[2, 2, 2], ) clip_text_encoder_config = CLIPTextConfig( bos_token_id=0, eos_token_id=2, hidden_size=32, intermediate_size=37, layer_norm_eps=1e-05, num_attention_heads=4, num_hidden_layers=5, pad_token_id=1, vocab_size=1000, hidden_act="gelu", projection_dim=32, ) torch.manual_seed(0) text_encoder = CLIPTextModel(clip_text_encoder_config) torch.manual_seed(0) text_encoder_2 = T5EncoderModel.from_pretrained("hf-internal-testing/tiny-random-t5") tokenizer = CLIPTokenizer.from_pretrained("hf-internal-testing/tiny-random-clip") tokenizer_2 = AutoTokenizer.from_pretrained("hf-internal-testing/tiny-random-t5") torch.manual_seed(0) vae = AutoencoderKL( sample_size=32, in_channels=3, out_channels=3, block_out_channels=(4,), layers_per_block=1, latent_channels=1, norm_num_groups=1, use_quant_conv=False, use_post_quant_conv=False, shift_factor=0.0609, scaling_factor=1.5035, ) scheduler = FlowMatchEulerDiscreteScheduler() return { "scheduler": scheduler, "text_encoder": text_encoder, "text_encoder_2": text_encoder_2, "tokenizer": tokenizer, "tokenizer_2": tokenizer_2, "transformer": transformer, "vae": vae, "resolution": 32, } def get_dummy_inputs(self, device, seed=0): # Create example images to simulate the input format required by VisualCloze context_image = [ Image.fromarray(floats_tensor((32, 32, 3), rng=random.Random(seed), scale=255).numpy().astype(np.uint8)) for _ in range(2) ] query_image = [ Image.fromarray( floats_tensor((32, 32, 3), rng=random.Random(seed + 1), scale=255).numpy().astype(np.uint8) ), None, ] # Create an image list that conforms to the VisualCloze input format image = [ context_image, # In-Context example query_image, # Query image ] if str(device).startswith("mps"): generator = torch.manual_seed(seed) else: generator = torch.Generator(device="cpu").manual_seed(seed) inputs = { "task_prompt": "Each row outlines a logical process, starting from [IMAGE1] gray-based depth map with detailed object contours, to achieve [IMAGE2] an image with flawless clarity.", "content_prompt": "A beautiful landscape with mountains and a lake", "image": image, "generator": generator, "num_inference_steps": 2, "guidance_scale": 5.0, "upsampling_height": 32, "upsampling_width": 32, "max_sequence_length": 77, "output_type": "np", "upsampling_strength": 0.4, } return inputs def test_visualcloze_different_prompts(self): pipe = self.pipeline_class(**self.get_dummy_components()).to(torch_device) inputs = self.get_dummy_inputs(torch_device) output_same_prompt = pipe(**inputs).images[0] inputs = self.get_dummy_inputs(torch_device) inputs["task_prompt"] = "A different task to perform." output_different_prompts = pipe(**inputs).images[0] max_diff = np.abs(output_same_prompt - output_different_prompts).max() # Outputs should be different assert max_diff > 1e-6 def test_visualcloze_image_output_shape(self): pipe = self.pipeline_class(**self.get_dummy_components()).to(torch_device) inputs = self.get_dummy_inputs(torch_device) height_width_pairs = [(32, 32), (72, 57)] for height, width in height_width_pairs: expected_height = height - height % (pipe.generation_pipe.vae_scale_factor * 2) expected_width = width - width % (pipe.generation_pipe.vae_scale_factor * 2) inputs.update({"upsampling_height": height, "upsampling_width": width}) image = pipe(**inputs).images[0] output_height, output_width, _ = image.shape assert (output_height, output_width) == (expected_height, expected_width) def test_inference_batch_single_identical(self): self._test_inference_batch_single_identical(expected_max_diff=1e-3) def test_upsampling_strength(self, expected_min_diff=1e-1): pipe = self.pipeline_class(**self.get_dummy_components()).to(torch_device) inputs = self.get_dummy_inputs(torch_device) # Test different upsampling strengths inputs["upsampling_strength"] = 0.2 output_no_upsampling = pipe(**inputs).images[0] inputs["upsampling_strength"] = 0.8 output_full_upsampling = pipe(**inputs).images[0] # Different upsampling strengths should produce different outputs max_diff = np.abs(output_no_upsampling - output_full_upsampling).max() assert max_diff > expected_min_diff def test_different_task_prompts(self, expected_min_diff=1e-1): pipe = self.pipeline_class(**self.get_dummy_components()).to(torch_device) inputs = self.get_dummy_inputs(torch_device) output_original = pipe(**inputs).images[0] inputs["task_prompt"] = "A different task description for image generation" output_different_task = pipe(**inputs).images[0] # Different task prompts should produce different outputs max_diff = np.abs(output_original - output_different_task).max() assert max_diff > expected_min_diff @unittest.skip( "Test not applicable because the pipeline being tested is a wrapper pipeline. CFG tests should be done on the inner pipelines." ) def test_callback_cfg(self): pass def test_save_load_local(self, expected_max_difference=5e-4): components = self.get_dummy_components() pipe = self.pipeline_class(**components) for component in pipe.components.values(): if hasattr(component, "set_default_attn_processor"): component.set_default_attn_processor() pipe.to(torch_device) pipe.set_progress_bar_config(disable=None) inputs = self.get_dummy_inputs(torch_device) output = pipe(**inputs)[0] logger = logging.get_logger("diffusers.pipelines.pipeline_utils") logger.setLevel(diffusers.logging.INFO) with tempfile.TemporaryDirectory() as tmpdir: pipe.save_pretrained(tmpdir, safe_serialization=False) with CaptureLogger(logger) as cap_logger: # NOTE: Resolution must be set to 32 for loading otherwise will lead to OOM on CI hardware # This attribute is not serialized in the config of the pipeline pipe_loaded = self.pipeline_class.from_pretrained(tmpdir, resolution=32) for component in pipe_loaded.components.values(): if hasattr(component, "set_default_attn_processor"): component.set_default_attn_processor() for name in pipe_loaded.components.keys(): if name not in pipe_loaded._optional_components: assert name in str(cap_logger) pipe_loaded.to(torch_device) pipe_loaded.set_progress_bar_config(disable=None) inputs = self.get_dummy_inputs(torch_device) output_loaded = pipe_loaded(**inputs)[0] max_diff = np.abs(to_np(output) - to_np(output_loaded)).max() self.assertLess(max_diff, expected_max_difference) def test_save_load_optional_components(self, expected_max_difference=1e-4): if not hasattr(self.pipeline_class, "_optional_components"): return components = self.get_dummy_components() pipe = self.pipeline_class(**components) for component in pipe.components.values(): if hasattr(component, "set_default_attn_processor"): component.set_default_attn_processor() pipe.to(torch_device) pipe.set_progress_bar_config(disable=None) # set all optional components to None for optional_component in pipe._optional_components: setattr(pipe, optional_component, None) generator_device = "cpu" inputs = self.get_dummy_inputs(generator_device) torch.manual_seed(0) output = pipe(**inputs)[0] with tempfile.TemporaryDirectory() as tmpdir: pipe.save_pretrained(tmpdir, safe_serialization=False) # NOTE: Resolution must be set to 32 for loading otherwise will lead to OOM on CI hardware # This attribute is not serialized in the config of the pipeline pipe_loaded = self.pipeline_class.from_pretrained(tmpdir, resolution=32) for component in pipe_loaded.components.values(): if hasattr(component, "set_default_attn_processor"): component.set_default_attn_processor() pipe_loaded.to(torch_device) pipe_loaded.set_progress_bar_config(disable=None) for optional_component in pipe._optional_components: self.assertTrue( getattr(pipe_loaded, optional_component) is None, f"`{optional_component}` did not stay set to None after loading.", ) inputs = self.get_dummy_inputs(generator_device) torch.manual_seed(0) output_loaded = pipe_loaded(**inputs)[0] max_diff = np.abs(to_np(output) - to_np(output_loaded)).max() self.assertLess(max_diff, expected_max_difference) @unittest.skipIf(torch_device not in ["cuda", "xpu"], reason="float16 requires CUDA or XPU") @require_accelerator def test_save_load_float16(self, expected_max_diff=1e-2): components = self.get_dummy_components() for name, module in components.items(): if hasattr(module, "half"): components[name] = module.to(torch_device).half() pipe = self.pipeline_class(**components) for component in pipe.components.values(): if hasattr(component, "set_default_attn_processor"): component.set_default_attn_processor() pipe.to(torch_device) pipe.set_progress_bar_config(disable=None) inputs = self.get_dummy_inputs(torch_device) output = pipe(**inputs)[0] with tempfile.TemporaryDirectory() as tmpdir: pipe.save_pretrained(tmpdir) # NOTE: Resolution must be set to 32 for loading otherwise will lead to OOM on CI hardware # This attribute is not serialized in the config of the pipeline pipe_loaded = self.pipeline_class.from_pretrained(tmpdir, torch_dtype=torch.float16, resolution=32) for component in pipe_loaded.components.values(): if hasattr(component, "set_default_attn_processor"): component.set_default_attn_processor() pipe_loaded.to(torch_device) pipe_loaded.set_progress_bar_config(disable=None) for name, component in pipe_loaded.components.items(): if hasattr(component, "dtype"): self.assertTrue( component.dtype == torch.float16, f"`{name}.dtype` switched from `float16` to {component.dtype} after loading.", ) inputs = self.get_dummy_inputs(torch_device) output_loaded = pipe_loaded(**inputs)[0] max_diff = np.abs(to_np(output) - to_np(output_loaded)).max() self.assertLess( max_diff, expected_max_diff, "The output of the fp16 pipeline changed after saving and loading." )