Meismaxandmaxisme commited on
Commit
236da4e
·
verified ·
1 Parent(s): 0c4ab48

Upload 10 files

Browse files
src/backend/__init__.py ADDED
File without changes
src/backend/base64_image.py ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from io import BytesIO
2
+ from base64 import b64encode, b64decode
3
+ from PIL import Image
4
+
5
+
6
+ def pil_image_to_base64_str(
7
+ image: Image,
8
+ format: str = "JPEG",
9
+ ) -> str:
10
+ buffer = BytesIO()
11
+ image.save(buffer, format=format)
12
+ buffer.seek(0)
13
+ img_base64 = b64encode(buffer.getvalue()).decode("utf-8")
14
+ return img_base64
15
+
16
+
17
+ def base64_image_to_pil(base64_str) -> Image:
18
+ image_data = b64decode(base64_str)
19
+ image_buffer = BytesIO(image_data)
20
+ image = Image.open(image_buffer)
21
+ return image
src/backend/controlnet.py ADDED
@@ -0,0 +1,90 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from PIL import Image
3
+ from diffusers import ControlNetModel
4
+ from backend.models.lcmdiffusion_setting import (
5
+ DiffusionTask,
6
+ ControlNetSetting,
7
+ )
8
+
9
+
10
+ # Prepares ControlNet adapters for use with FastSD CPU
11
+ #
12
+ # This function loads the ControlNet adapters defined by the
13
+ # _lcm_diffusion_setting.controlnet_ object and returns a dictionary
14
+ # with the pipeline arguments required to use the loaded adapters
15
+ def load_controlnet_adapters(lcm_diffusion_setting) -> dict:
16
+ controlnet_args = {}
17
+ if (
18
+ lcm_diffusion_setting.controlnet is None
19
+ or not lcm_diffusion_setting.controlnet.enabled
20
+ ):
21
+ return controlnet_args
22
+
23
+ logging.info("Loading ControlNet adapter")
24
+ controlnet_adapter = ControlNetModel.from_single_file(
25
+ lcm_diffusion_setting.controlnet.adapter_path,
26
+ # local_files_only=True,
27
+ use_safetensors=True,
28
+ )
29
+ controlnet_args["controlnet"] = controlnet_adapter
30
+ return controlnet_args
31
+
32
+
33
+ # Updates the ControlNet pipeline arguments to use for image generation
34
+ #
35
+ # This function uses the contents of the _lcm_diffusion_setting.controlnet_
36
+ # object to generate a dictionary with the corresponding pipeline arguments
37
+ # to be used for image generation; in particular, it sets the ControlNet control
38
+ # image and conditioning scale
39
+ def update_controlnet_arguments(lcm_diffusion_setting) -> dict:
40
+ controlnet_args = {}
41
+ if (
42
+ lcm_diffusion_setting.controlnet is None
43
+ or not lcm_diffusion_setting.controlnet.enabled
44
+ ):
45
+ return controlnet_args
46
+
47
+ controlnet_args["controlnet_conditioning_scale"] = (
48
+ lcm_diffusion_setting.controlnet.conditioning_scale
49
+ )
50
+ if lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value:
51
+ controlnet_args["image"] = lcm_diffusion_setting.controlnet._control_image
52
+ elif lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value:
53
+ controlnet_args["control_image"] = (
54
+ lcm_diffusion_setting.controlnet._control_image
55
+ )
56
+ return controlnet_args
57
+
58
+
59
+ # Helper function to adjust ControlNet settings from a dictionary
60
+ def controlnet_settings_from_dict(
61
+ lcm_diffusion_setting,
62
+ dictionary,
63
+ ) -> None:
64
+ if lcm_diffusion_setting is None or dictionary is None:
65
+ logging.error("Invalid arguments!")
66
+ return
67
+ if (
68
+ "controlnet" not in dictionary
69
+ or dictionary["controlnet"] is None
70
+ or len(dictionary["controlnet"]) == 0
71
+ ):
72
+ logging.warning("ControlNet settings not found, ControlNet will be disabled")
73
+ lcm_diffusion_setting.controlnet = None
74
+ return
75
+
76
+ controlnet = ControlNetSetting()
77
+ controlnet.enabled = dictionary["controlnet"][0]["enabled"]
78
+ controlnet.conditioning_scale = dictionary["controlnet"][0]["conditioning_scale"]
79
+ controlnet.adapter_path = dictionary["controlnet"][0]["adapter_path"]
80
+ controlnet._control_image = None
81
+ image_path = dictionary["controlnet"][0]["control_image"]
82
+ if controlnet.enabled:
83
+ try:
84
+ controlnet._control_image = Image.open(image_path)
85
+ except (AttributeError, FileNotFoundError) as err:
86
+ print(err)
87
+ if controlnet._control_image is None:
88
+ logging.error("Wrong ControlNet control image! Disabling ControlNet")
89
+ controlnet.enabled = False
90
+ lcm_diffusion_setting.controlnet = controlnet
src/backend/device.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import platform
2
+ from constants import DEVICE
3
+ import torch
4
+ import openvino as ov
5
+
6
+ core = ov.Core()
7
+
8
+
9
+ def is_openvino_device() -> bool:
10
+ if DEVICE.lower() == "cpu" or DEVICE.lower()[0] == "g" or DEVICE.lower()[0] == "n":
11
+ return True
12
+ else:
13
+ return False
14
+
15
+
16
+ def get_device_name() -> str:
17
+ if DEVICE == "cuda" or DEVICE == "mps":
18
+ default_gpu_index = torch.cuda.current_device()
19
+ return torch.cuda.get_device_name(default_gpu_index)
20
+ elif platform.system().lower() == "darwin":
21
+ return platform.processor()
22
+ elif is_openvino_device():
23
+ return core.get_property(DEVICE.upper(), "FULL_DEVICE_NAME")
src/backend/image_saver.py ADDED
@@ -0,0 +1,75 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from os import path, mkdir
3
+ from typing import Any
4
+ from uuid import uuid4
5
+ from backend.models.lcmdiffusion_setting import LCMDiffusionSetting
6
+ from utils import get_image_file_extension
7
+
8
+
9
+ def get_exclude_keys():
10
+ exclude_keys = {
11
+ "init_image": True,
12
+ "generated_images": True,
13
+ "lora": {
14
+ "models_dir": True,
15
+ "path": True,
16
+ },
17
+ "dirs": True,
18
+ "controlnet": {
19
+ "adapter_path": True,
20
+ },
21
+ }
22
+ return exclude_keys
23
+
24
+
25
+ class ImageSaver:
26
+ @staticmethod
27
+ def save_images(
28
+ output_path: str,
29
+ images: Any,
30
+ folder_name: str = "",
31
+ format: str = "PNG",
32
+ jpeg_quality: int = 90,
33
+ lcm_diffusion_setting: LCMDiffusionSetting = None,
34
+ ) -> list[str]:
35
+ gen_id = uuid4()
36
+ image_ids = []
37
+
38
+ if images:
39
+ image_seeds = []
40
+
41
+ for index, image in enumerate(images):
42
+
43
+ image_seed = image.info.get('image_seed')
44
+ if image_seed is not None:
45
+ image_seeds.append(image_seed)
46
+
47
+ if not path.exists(output_path):
48
+ mkdir(output_path)
49
+
50
+ if folder_name:
51
+ out_path = path.join(
52
+ output_path,
53
+ folder_name,
54
+ )
55
+ else:
56
+ out_path = output_path
57
+
58
+ if not path.exists(out_path):
59
+ mkdir(out_path)
60
+ image_extension = get_image_file_extension(format)
61
+ image_file_name = f"{gen_id}-{index+1}{image_extension}"
62
+ image_ids.append(image_file_name)
63
+ image.save(path.join(out_path, image_file_name), quality = jpeg_quality)
64
+ if lcm_diffusion_setting:
65
+ data = lcm_diffusion_setting.model_dump(exclude=get_exclude_keys())
66
+ if image_seeds:
67
+ data['image_seeds'] = image_seeds
68
+ with open(path.join(out_path, f"{gen_id}.json"), "w") as json_file:
69
+ json.dump(
70
+ data,
71
+ json_file,
72
+ indent=4,
73
+ )
74
+ return image_ids
75
+
src/backend/lcm_text_to_image.py ADDED
@@ -0,0 +1,597 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gc
2
+ from math import ceil
3
+ from typing import Any, List
4
+ import random
5
+
6
+ import numpy as np
7
+ import torch
8
+ from backend.device import is_openvino_device
9
+ from backend.controlnet import (
10
+ load_controlnet_adapters,
11
+ update_controlnet_arguments,
12
+ )
13
+ from backend.models.lcmdiffusion_setting import (
14
+ DiffusionTask,
15
+ LCMDiffusionSetting,
16
+ LCMLora,
17
+ )
18
+ from backend.openvino.pipelines import (
19
+ get_ov_image_to_image_pipeline,
20
+ get_ov_text_to_image_pipeline,
21
+ ov_load_tiny_autoencoder,
22
+ get_ov_diffusion_pipeline,
23
+ )
24
+ from backend.pipelines.lcm import (
25
+ get_image_to_image_pipeline,
26
+ get_lcm_model_pipeline,
27
+ load_taesd,
28
+ )
29
+ from backend.pipelines.lcm_lora import get_lcm_lora_pipeline
30
+ from constants import DEVICE, GGUF_THREADS
31
+ from diffusers import LCMScheduler
32
+ from image_ops import resize_pil_image
33
+ from backend.openvino.ov_hc_stablediffusion_pipeline import OvHcLatentConsistency
34
+ from backend.gguf.gguf_diffusion import (
35
+ GGUFDiffusion,
36
+ ModelConfig,
37
+ Txt2ImgConfig,
38
+ SampleMethod,
39
+ )
40
+ from paths import get_app_path
41
+ from pprint import pprint
42
+
43
+ try:
44
+ # support for token merging; keeping it optional for now
45
+ import tomesd
46
+ except ImportError:
47
+ print("tomesd library unavailable; disabling token merging support")
48
+ tomesd = None
49
+
50
+
51
+ class LCMTextToImage:
52
+ def __init__(
53
+ self,
54
+ device: str = "cpu",
55
+ ) -> None:
56
+ self.pipeline = None
57
+ self.use_openvino = False
58
+ self.device = ""
59
+ self.previous_model_id = None
60
+ self.previous_use_tae_sd = False
61
+ self.previous_use_lcm_lora = False
62
+ self.previous_ov_model_id = ""
63
+ self.previous_token_merging = 0.0
64
+ self.previous_safety_checker = False
65
+ self.previous_use_openvino = False
66
+ self.img_to_img_pipeline = None
67
+ self.is_openvino_init = False
68
+ self.previous_lora = None
69
+ self.task_type = DiffusionTask.text_to_image
70
+ self.previous_use_gguf_model = False
71
+ self.previous_gguf_model = None
72
+ self.torch_data_type = (
73
+ torch.float32 if is_openvino_device() or DEVICE == "mps" else torch.float16
74
+ )
75
+ self.ov_model_id = None
76
+ print(f"Torch datatype : {self.torch_data_type}")
77
+
78
+ def _pipeline_to_device(self):
79
+ print(f"Pipeline device : {DEVICE}")
80
+ print(f"Pipeline dtype : {self.torch_data_type}")
81
+ self.pipeline.to(
82
+ torch_device=DEVICE,
83
+ torch_dtype=self.torch_data_type,
84
+ )
85
+
86
+ def _add_freeu(self):
87
+ pipeline_class = self.pipeline.__class__.__name__
88
+ if isinstance(self.pipeline.scheduler, LCMScheduler):
89
+ if pipeline_class == "StableDiffusionPipeline":
90
+ print("Add FreeU - SD")
91
+ self.pipeline.enable_freeu(
92
+ s1=0.9,
93
+ s2=0.2,
94
+ b1=1.2,
95
+ b2=1.4,
96
+ )
97
+ elif pipeline_class == "StableDiffusionXLPipeline":
98
+ print("Add FreeU - SDXL")
99
+ self.pipeline.enable_freeu(
100
+ s1=0.6,
101
+ s2=0.4,
102
+ b1=1.1,
103
+ b2=1.2,
104
+ )
105
+
106
+ def _enable_vae_tiling(self):
107
+ self.pipeline.vae.enable_tiling()
108
+
109
+ def _update_lcm_scheduler_params(self):
110
+ if isinstance(self.pipeline.scheduler, LCMScheduler):
111
+ self.pipeline.scheduler = LCMScheduler.from_config(
112
+ self.pipeline.scheduler.config,
113
+ beta_start=0.001,
114
+ beta_end=0.01,
115
+ )
116
+
117
+ def _is_hetero_pipeline(self) -> bool:
118
+ return "square" in self.ov_model_id.lower()
119
+
120
+ def _load_ov_hetero_pipeline(self):
121
+ print("Loading Heterogeneous Compute pipeline")
122
+ if DEVICE.upper() == "NPU":
123
+ device = ["NPU", "NPU", "NPU"]
124
+ self.pipeline = OvHcLatentConsistency(self.ov_model_id, device)
125
+ else:
126
+ self.pipeline = OvHcLatentConsistency(self.ov_model_id)
127
+
128
+ def _generate_images_hetero_compute(
129
+ self,
130
+ lcm_diffusion_setting: LCMDiffusionSetting,
131
+ ):
132
+ print("Using OpenVINO ")
133
+ if lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value:
134
+ return [
135
+ self.pipeline.generate(
136
+ prompt=lcm_diffusion_setting.prompt,
137
+ neg_prompt=lcm_diffusion_setting.negative_prompt,
138
+ init_image=None,
139
+ strength=1.0,
140
+ num_inference_steps=lcm_diffusion_setting.inference_steps,
141
+ )
142
+ ]
143
+ else:
144
+ return [
145
+ self.pipeline.generate(
146
+ prompt=lcm_diffusion_setting.prompt,
147
+ neg_prompt=lcm_diffusion_setting.negative_prompt,
148
+ init_image=lcm_diffusion_setting.init_image,
149
+ strength=lcm_diffusion_setting.strength,
150
+ num_inference_steps=lcm_diffusion_setting.inference_steps,
151
+ )
152
+ ]
153
+
154
+ def _is_valid_mode(
155
+ self,
156
+ modes: List,
157
+ ) -> bool:
158
+ return modes.count(True) == 1 or modes.count(False) == 3
159
+
160
+ def _validate_mode(
161
+ self,
162
+ modes: List,
163
+ ) -> None:
164
+ if not self._is_valid_mode(modes):
165
+ raise ValueError("Invalid mode,delete configs/settings.yaml and retry!")
166
+
167
+ def _is_sana_model(self) -> bool:
168
+ return "sana" in self.ov_model_id.lower()
169
+
170
+ def init(
171
+ self,
172
+ device: str = "cpu",
173
+ lcm_diffusion_setting: LCMDiffusionSetting = LCMDiffusionSetting(),
174
+ ) -> None:
175
+ # Mode validation either LCM LoRA or OpenVINO or GGUF
176
+
177
+ modes = [
178
+ lcm_diffusion_setting.use_gguf_model,
179
+ lcm_diffusion_setting.use_openvino,
180
+ lcm_diffusion_setting.use_lcm_lora,
181
+ ]
182
+ self._validate_mode(modes)
183
+ self.device = device
184
+ self.use_openvino = lcm_diffusion_setting.use_openvino
185
+ model_id = lcm_diffusion_setting.lcm_model_id
186
+ use_local_model = lcm_diffusion_setting.use_offline_model
187
+ use_tiny_auto_encoder = lcm_diffusion_setting.use_tiny_auto_encoder
188
+ use_lora = lcm_diffusion_setting.use_lcm_lora
189
+ lcm_lora: LCMLora = lcm_diffusion_setting.lcm_lora
190
+ token_merging = lcm_diffusion_setting.token_merging
191
+ self.ov_model_id = lcm_diffusion_setting.openvino_lcm_model_id
192
+
193
+ if lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value:
194
+ lcm_diffusion_setting.init_image = resize_pil_image(
195
+ lcm_diffusion_setting.init_image,
196
+ lcm_diffusion_setting.image_width,
197
+ lcm_diffusion_setting.image_height,
198
+ )
199
+
200
+ if (
201
+ self.pipeline is None
202
+ or self.previous_model_id != model_id
203
+ or self.previous_use_tae_sd != use_tiny_auto_encoder
204
+ or self.previous_lcm_lora_base_id != lcm_lora.base_model_id
205
+ or self.previous_lcm_lora_id != lcm_lora.lcm_lora_id
206
+ or self.previous_use_lcm_lora != use_lora
207
+ or self.previous_ov_model_id != self.ov_model_id
208
+ or self.previous_token_merging != token_merging
209
+ or self.previous_safety_checker != lcm_diffusion_setting.use_safety_checker
210
+ or self.previous_use_openvino != lcm_diffusion_setting.use_openvino
211
+ or self.previous_use_gguf_model != lcm_diffusion_setting.use_gguf_model
212
+ or self.previous_gguf_model != lcm_diffusion_setting.gguf_model
213
+ or (
214
+ self.use_openvino
215
+ and (
216
+ self.previous_task_type != lcm_diffusion_setting.diffusion_task
217
+ or self.previous_lora != lcm_diffusion_setting.lora
218
+ )
219
+ )
220
+ or lcm_diffusion_setting.rebuild_pipeline
221
+ ):
222
+ if self.use_openvino and is_openvino_device():
223
+ if self.pipeline:
224
+ del self.pipeline
225
+ self.pipeline = None
226
+ gc.collect()
227
+ self.is_openvino_init = True
228
+ if (
229
+ lcm_diffusion_setting.diffusion_task
230
+ == DiffusionTask.text_to_image.value
231
+ ):
232
+ print(
233
+ f"***** Init Text to image (OpenVINO) - {self.ov_model_id} *****"
234
+ )
235
+ if "flux" in self.ov_model_id.lower() or self._is_sana_model():
236
+ if self._is_sana_model():
237
+ print("Loading OpenVINO SANA Sprint pipeline")
238
+ else:
239
+ print("Loading OpenVINO Flux pipeline")
240
+ self.pipeline = get_ov_diffusion_pipeline(self.ov_model_id)
241
+ elif self._is_hetero_pipeline():
242
+ self._load_ov_hetero_pipeline()
243
+ else:
244
+ self.pipeline = get_ov_text_to_image_pipeline(
245
+ self.ov_model_id,
246
+ use_local_model,
247
+ )
248
+ elif (
249
+ lcm_diffusion_setting.diffusion_task
250
+ == DiffusionTask.image_to_image.value
251
+ ):
252
+ if not self.pipeline and self._is_hetero_pipeline():
253
+ self._load_ov_hetero_pipeline()
254
+ else:
255
+ print(
256
+ f"***** Image to image (OpenVINO) - {self.ov_model_id} *****"
257
+ )
258
+ self.pipeline = get_ov_image_to_image_pipeline(
259
+ self.ov_model_id,
260
+ use_local_model,
261
+ )
262
+ elif lcm_diffusion_setting.use_gguf_model:
263
+ model = lcm_diffusion_setting.gguf_model.diffusion_path
264
+ print(f"***** Init Text to image (GGUF) - {model} *****")
265
+ # if self.pipeline:
266
+ # self.pipeline.terminate()
267
+ # del self.pipeline
268
+ # self.pipeline = None
269
+ self._init_gguf_diffusion(lcm_diffusion_setting)
270
+ else:
271
+ if self.pipeline or self.img_to_img_pipeline:
272
+ self.pipeline = None
273
+ self.img_to_img_pipeline = None
274
+ gc.collect()
275
+
276
+ controlnet_args = load_controlnet_adapters(lcm_diffusion_setting)
277
+ if use_lora:
278
+ print(
279
+ f"***** Init LCM-LoRA pipeline - {lcm_lora.base_model_id} *****"
280
+ )
281
+ self.pipeline = get_lcm_lora_pipeline(
282
+ lcm_lora.base_model_id,
283
+ lcm_lora.lcm_lora_id,
284
+ use_local_model,
285
+ torch_data_type=self.torch_data_type,
286
+ pipeline_args=controlnet_args,
287
+ )
288
+
289
+ else:
290
+ print(f"***** Init LCM Model pipeline - {model_id} *****")
291
+ self.pipeline = get_lcm_model_pipeline(
292
+ model_id,
293
+ use_local_model,
294
+ controlnet_args,
295
+ )
296
+
297
+ self.img_to_img_pipeline = get_image_to_image_pipeline(self.pipeline)
298
+
299
+ if tomesd and token_merging > 0.001:
300
+ print(f"***** Token Merging: {token_merging} *****")
301
+ tomesd.apply_patch(self.pipeline, ratio=token_merging)
302
+ tomesd.apply_patch(self.img_to_img_pipeline, ratio=token_merging)
303
+
304
+ if use_tiny_auto_encoder:
305
+ if self.use_openvino and is_openvino_device():
306
+ if not self._is_sana_model():
307
+ print("Using Tiny AutoEncoder (OpenVINO)")
308
+ ov_load_tiny_autoencoder(
309
+ self.pipeline,
310
+ use_local_model,
311
+ )
312
+ else:
313
+ print("Using Tiny Auto Encoder")
314
+ load_taesd(
315
+ self.pipeline,
316
+ use_local_model,
317
+ self.torch_data_type,
318
+ )
319
+ load_taesd(
320
+ self.img_to_img_pipeline,
321
+ use_local_model,
322
+ self.torch_data_type,
323
+ )
324
+
325
+ if not self.use_openvino and not is_openvino_device():
326
+ self._pipeline_to_device()
327
+
328
+ if not self._is_hetero_pipeline():
329
+ if (
330
+ lcm_diffusion_setting.diffusion_task
331
+ == DiffusionTask.image_to_image.value
332
+ and lcm_diffusion_setting.use_openvino
333
+ ):
334
+ self.pipeline.scheduler = LCMScheduler.from_config(
335
+ self.pipeline.scheduler.config,
336
+ )
337
+ else:
338
+ if not lcm_diffusion_setting.use_gguf_model:
339
+ self._update_lcm_scheduler_params()
340
+
341
+ if use_lora:
342
+ self._add_freeu()
343
+
344
+ self.previous_model_id = model_id
345
+ self.previous_ov_model_id = self.ov_model_id
346
+ self.previous_use_tae_sd = use_tiny_auto_encoder
347
+ self.previous_lcm_lora_base_id = lcm_lora.base_model_id
348
+ self.previous_lcm_lora_id = lcm_lora.lcm_lora_id
349
+ self.previous_use_lcm_lora = use_lora
350
+ self.previous_token_merging = lcm_diffusion_setting.token_merging
351
+ self.previous_safety_checker = lcm_diffusion_setting.use_safety_checker
352
+ self.previous_use_openvino = lcm_diffusion_setting.use_openvino
353
+ self.previous_task_type = lcm_diffusion_setting.diffusion_task
354
+ self.previous_lora = lcm_diffusion_setting.lora.model_copy(deep=True)
355
+ self.previous_use_gguf_model = lcm_diffusion_setting.use_gguf_model
356
+ self.previous_gguf_model = lcm_diffusion_setting.gguf_model.model_copy(
357
+ deep=True
358
+ )
359
+ lcm_diffusion_setting.rebuild_pipeline = False
360
+ if (
361
+ lcm_diffusion_setting.diffusion_task
362
+ == DiffusionTask.text_to_image.value
363
+ ):
364
+ print(f"Pipeline : {self.pipeline}")
365
+ elif (
366
+ lcm_diffusion_setting.diffusion_task
367
+ == DiffusionTask.image_to_image.value
368
+ ):
369
+ if self.use_openvino and is_openvino_device():
370
+ print(f"Pipeline : {self.pipeline}")
371
+ else:
372
+ print(f"Pipeline : {self.img_to_img_pipeline}")
373
+ if self.use_openvino:
374
+ if lcm_diffusion_setting.lora.enabled:
375
+ print("Warning: Lora models not supported on OpenVINO mode")
376
+ elif not lcm_diffusion_setting.use_gguf_model:
377
+ adapters = self.pipeline.get_active_adapters()
378
+ print(f"Active adapters : {adapters}")
379
+
380
+ def _get_timesteps(self):
381
+ time_steps = self.pipeline.scheduler.config.get("timesteps")
382
+ time_steps_value = [int(time_steps)] if time_steps else None
383
+ return time_steps_value
384
+
385
+ def _compile_ov_pipeline(
386
+ self,
387
+ lcm_diffusion_setting,
388
+ ):
389
+ self.pipeline.reshape(
390
+ batch_size=-1,
391
+ height=lcm_diffusion_setting.image_height,
392
+ width=lcm_diffusion_setting.image_width,
393
+ num_images_per_prompt=lcm_diffusion_setting.number_of_images,
394
+ )
395
+ self.pipeline.compile()
396
+
397
+ def generate(
398
+ self,
399
+ lcm_diffusion_setting: LCMDiffusionSetting,
400
+ reshape: bool = False,
401
+ ) -> Any:
402
+ guidance_scale = lcm_diffusion_setting.guidance_scale
403
+ img_to_img_inference_steps = lcm_diffusion_setting.inference_steps
404
+ check_step_value = int(
405
+ lcm_diffusion_setting.inference_steps * lcm_diffusion_setting.strength
406
+ )
407
+ if (
408
+ lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value
409
+ and check_step_value < 1
410
+ ):
411
+ img_to_img_inference_steps = ceil(1 / lcm_diffusion_setting.strength)
412
+ print(
413
+ f"Strength: {lcm_diffusion_setting.strength},{img_to_img_inference_steps}"
414
+ )
415
+
416
+ pipeline_extra_args = {}
417
+
418
+ if lcm_diffusion_setting.use_seed:
419
+ cur_seed = lcm_diffusion_setting.seed
420
+ # for multiple images with a fixed seed, use sequential seeds
421
+ seeds = [
422
+ (cur_seed + i) for i in range(lcm_diffusion_setting.number_of_images)
423
+ ]
424
+ else:
425
+ seeds = [
426
+ random.randint(0, 999999999)
427
+ for i in range(lcm_diffusion_setting.number_of_images)
428
+ ]
429
+
430
+ if self.use_openvino:
431
+ # no support for generators; try at least to ensure reproducible results for single images
432
+ np.random.seed(seeds[0])
433
+ if self._is_hetero_pipeline():
434
+ torch.manual_seed(seeds[0])
435
+ lcm_diffusion_setting.seed = seeds[0]
436
+ else:
437
+ pipeline_extra_args["generator"] = [
438
+ torch.Generator(device=self.device).manual_seed(s) for s in seeds
439
+ ]
440
+
441
+ is_openvino_pipe = lcm_diffusion_setting.use_openvino and is_openvino_device()
442
+ if is_openvino_pipe and not self._is_hetero_pipeline():
443
+ print("Using OpenVINO")
444
+ if self.is_openvino_init and self._is_sana_model():
445
+ self._compile_ov_pipeline(lcm_diffusion_setting)
446
+
447
+ if reshape and not self.is_openvino_init:
448
+ print("Reshape and compile")
449
+ self._compile_ov_pipeline(lcm_diffusion_setting)
450
+
451
+ if self.is_openvino_init:
452
+ self.is_openvino_init = False
453
+
454
+ if is_openvino_pipe and self._is_hetero_pipeline():
455
+ return self._generate_images_hetero_compute(lcm_diffusion_setting)
456
+ elif lcm_diffusion_setting.use_gguf_model:
457
+ return self._generate_images_gguf(lcm_diffusion_setting)
458
+
459
+ if lcm_diffusion_setting.clip_skip > 1:
460
+ # We follow the convention that "CLIP Skip == 2" means "skip
461
+ # the last layer", so "CLIP Skip == 1" means "no skipping"
462
+ pipeline_extra_args["clip_skip"] = lcm_diffusion_setting.clip_skip - 1
463
+
464
+ self.pipeline.safety_checker = None
465
+ if (
466
+ lcm_diffusion_setting.diffusion_task == DiffusionTask.image_to_image.value
467
+ and not is_openvino_pipe
468
+ ):
469
+ self.img_to_img_pipeline.safety_checker = None
470
+
471
+ if (
472
+ not lcm_diffusion_setting.use_lcm_lora
473
+ and not lcm_diffusion_setting.use_openvino
474
+ and lcm_diffusion_setting.guidance_scale != 1.0
475
+ ):
476
+ print("Not using LCM-LoRA so setting guidance_scale 1.0")
477
+ guidance_scale = 1.0
478
+
479
+ controlnet_args = update_controlnet_arguments(lcm_diffusion_setting)
480
+ if lcm_diffusion_setting.use_openvino:
481
+ if (
482
+ lcm_diffusion_setting.diffusion_task
483
+ == DiffusionTask.text_to_image.value
484
+ ):
485
+ if self._is_sana_model():
486
+ result_images = self.pipeline(
487
+ prompt=lcm_diffusion_setting.prompt,
488
+ num_inference_steps=lcm_diffusion_setting.inference_steps,
489
+ guidance_scale=guidance_scale,
490
+ width=lcm_diffusion_setting.image_width,
491
+ height=lcm_diffusion_setting.image_height,
492
+ num_images_per_prompt=lcm_diffusion_setting.number_of_images,
493
+ ).images
494
+ else:
495
+ result_images = self.pipeline(
496
+ prompt=lcm_diffusion_setting.prompt,
497
+ negative_prompt=lcm_diffusion_setting.negative_prompt,
498
+ num_inference_steps=lcm_diffusion_setting.inference_steps,
499
+ guidance_scale=guidance_scale,
500
+ width=lcm_diffusion_setting.image_width,
501
+ height=lcm_diffusion_setting.image_height,
502
+ num_images_per_prompt=lcm_diffusion_setting.number_of_images,
503
+ ).images
504
+ elif (
505
+ lcm_diffusion_setting.diffusion_task
506
+ == DiffusionTask.image_to_image.value
507
+ ):
508
+ result_images = self.pipeline(
509
+ image=lcm_diffusion_setting.init_image,
510
+ strength=lcm_diffusion_setting.strength,
511
+ prompt=lcm_diffusion_setting.prompt,
512
+ negative_prompt=lcm_diffusion_setting.negative_prompt,
513
+ num_inference_steps=img_to_img_inference_steps * 3,
514
+ guidance_scale=guidance_scale,
515
+ num_images_per_prompt=lcm_diffusion_setting.number_of_images,
516
+ ).images
517
+
518
+ else:
519
+ if (
520
+ lcm_diffusion_setting.diffusion_task
521
+ == DiffusionTask.text_to_image.value
522
+ ):
523
+ result_images = self.pipeline(
524
+ prompt=lcm_diffusion_setting.prompt,
525
+ negative_prompt=lcm_diffusion_setting.negative_prompt,
526
+ num_inference_steps=lcm_diffusion_setting.inference_steps,
527
+ guidance_scale=guidance_scale,
528
+ width=lcm_diffusion_setting.image_width,
529
+ height=lcm_diffusion_setting.image_height,
530
+ num_images_per_prompt=lcm_diffusion_setting.number_of_images,
531
+ timesteps=self._get_timesteps(),
532
+ **pipeline_extra_args,
533
+ **controlnet_args,
534
+ ).images
535
+
536
+ elif (
537
+ lcm_diffusion_setting.diffusion_task
538
+ == DiffusionTask.image_to_image.value
539
+ ):
540
+ result_images = self.img_to_img_pipeline(
541
+ image=lcm_diffusion_setting.init_image,
542
+ strength=lcm_diffusion_setting.strength,
543
+ prompt=lcm_diffusion_setting.prompt,
544
+ negative_prompt=lcm_diffusion_setting.negative_prompt,
545
+ num_inference_steps=img_to_img_inference_steps,
546
+ guidance_scale=guidance_scale,
547
+ width=lcm_diffusion_setting.image_width,
548
+ height=lcm_diffusion_setting.image_height,
549
+ num_images_per_prompt=lcm_diffusion_setting.number_of_images,
550
+ **pipeline_extra_args,
551
+ **controlnet_args,
552
+ ).images
553
+
554
+ for i, seed in enumerate(seeds):
555
+ result_images[i].info["image_seed"] = seed
556
+
557
+ return result_images
558
+
559
+ def _init_gguf_diffusion(
560
+ self,
561
+ lcm_diffusion_setting: LCMDiffusionSetting,
562
+ ):
563
+ config = ModelConfig()
564
+ config.model_path = lcm_diffusion_setting.gguf_model.diffusion_path
565
+ config.diffusion_model_path = lcm_diffusion_setting.gguf_model.diffusion_path
566
+ config.clip_l_path = lcm_diffusion_setting.gguf_model.clip_path
567
+ config.t5xxl_path = lcm_diffusion_setting.gguf_model.t5xxl_path
568
+ config.vae_path = lcm_diffusion_setting.gguf_model.vae_path
569
+ config.n_threads = GGUF_THREADS
570
+ print(f"GGUF Threads : {GGUF_THREADS} ")
571
+ print("GGUF - Model config")
572
+ pprint(lcm_diffusion_setting.gguf_model.model_dump())
573
+ self.pipeline = GGUFDiffusion(
574
+ get_app_path(), # Place DLL in fastsdcpu folder
575
+ config,
576
+ True,
577
+ )
578
+
579
+ def _generate_images_gguf(
580
+ self,
581
+ lcm_diffusion_setting: LCMDiffusionSetting,
582
+ ):
583
+ if lcm_diffusion_setting.diffusion_task == DiffusionTask.text_to_image.value:
584
+ t2iconfig = Txt2ImgConfig()
585
+ t2iconfig.prompt = lcm_diffusion_setting.prompt
586
+ t2iconfig.batch_count = lcm_diffusion_setting.number_of_images
587
+ t2iconfig.cfg_scale = lcm_diffusion_setting.guidance_scale
588
+ t2iconfig.height = lcm_diffusion_setting.image_height
589
+ t2iconfig.width = lcm_diffusion_setting.image_width
590
+ t2iconfig.sample_steps = lcm_diffusion_setting.inference_steps
591
+ t2iconfig.sample_method = SampleMethod.EULER
592
+ if lcm_diffusion_setting.use_seed:
593
+ t2iconfig.seed = lcm_diffusion_setting.seed
594
+ else:
595
+ t2iconfig.seed = -1
596
+
597
+ return self.pipeline.generate_text2mg(t2iconfig)
src/backend/lora.py ADDED
@@ -0,0 +1,136 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import glob
2
+ from os import path
3
+ from paths import get_file_name, FastStableDiffusionPaths
4
+ from pathlib import Path
5
+
6
+
7
+ # A basic class to keep track of the currently loaded LoRAs and
8
+ # their weights; the diffusers function \c get_active_adapters()
9
+ # returns a list of adapter names but not their weights so we need
10
+ # a way to keep track of the current LoRA weights to set whenever
11
+ # a new LoRA is loaded
12
+ class _lora_info:
13
+ def __init__(
14
+ self,
15
+ path: str,
16
+ weight: float,
17
+ ):
18
+ self.path = path
19
+ self.adapter_name = get_file_name(path)
20
+ self.weight = weight
21
+
22
+ def __del__(self):
23
+ self.path = None
24
+ self.adapter_name = None
25
+
26
+
27
+ _loaded_loras = []
28
+ _current_pipeline = None
29
+
30
+
31
+ # This function loads a LoRA from the LoRA path setting, so it's
32
+ # possible to load multiple LoRAs by calling this function more than
33
+ # once with a different LoRA path setting; note that if you plan to
34
+ # load multiple LoRAs and dynamically change their weights, you
35
+ # might want to set the LoRA fuse option to False
36
+ def load_lora_weight(
37
+ pipeline,
38
+ lcm_diffusion_setting,
39
+ ):
40
+ if not lcm_diffusion_setting.lora.path:
41
+ raise Exception("Empty lora model path")
42
+
43
+ if not path.exists(lcm_diffusion_setting.lora.path):
44
+ raise Exception("Lora model path is invalid")
45
+
46
+ # If the pipeline has been rebuilt since the last call, remove all
47
+ # references to previously loaded LoRAs and store the new pipeline
48
+ global _loaded_loras
49
+ global _current_pipeline
50
+ if pipeline != _current_pipeline:
51
+ for lora in _loaded_loras:
52
+ del lora
53
+ del _loaded_loras
54
+ _loaded_loras = []
55
+ _current_pipeline = pipeline
56
+
57
+ current_lora = _lora_info(
58
+ lcm_diffusion_setting.lora.path,
59
+ lcm_diffusion_setting.lora.weight,
60
+ )
61
+ _loaded_loras.append(current_lora)
62
+
63
+ if lcm_diffusion_setting.lora.enabled:
64
+ print(f"LoRA adapter name : {current_lora.adapter_name}")
65
+ pipeline.load_lora_weights(
66
+ FastStableDiffusionPaths.get_lora_models_path(),
67
+ weight_name=Path(lcm_diffusion_setting.lora.path).name,
68
+ local_files_only=True,
69
+ adapter_name=current_lora.adapter_name,
70
+ )
71
+ update_lora_weights(
72
+ pipeline,
73
+ lcm_diffusion_setting,
74
+ )
75
+
76
+ if lcm_diffusion_setting.lora.fuse:
77
+ pipeline.fuse_lora()
78
+
79
+
80
+ def get_lora_models(root_dir: str):
81
+ lora_models = glob.glob(f"{root_dir}/**/*.safetensors", recursive=True)
82
+ lora_models_map = {}
83
+ for file_path in lora_models:
84
+ lora_name = get_file_name(file_path)
85
+ if lora_name is not None:
86
+ lora_models_map[lora_name] = file_path
87
+ return lora_models_map
88
+
89
+
90
+ # This function returns a list of (adapter_name, weight) tuples for the
91
+ # currently loaded LoRAs
92
+ def get_active_lora_weights():
93
+ active_loras = []
94
+ for lora_info in _loaded_loras:
95
+ active_loras.append(
96
+ (
97
+ lora_info.adapter_name,
98
+ lora_info.weight,
99
+ )
100
+ )
101
+ return active_loras
102
+
103
+
104
+ # This function receives a pipeline, an lcm_diffusion_setting object and
105
+ # an optional list of updated (adapter_name, weight) tuples
106
+ def update_lora_weights(
107
+ pipeline,
108
+ lcm_diffusion_setting,
109
+ lora_weights=None,
110
+ ):
111
+ global _loaded_loras
112
+ global _current_pipeline
113
+ if pipeline != _current_pipeline:
114
+ print("Wrong pipeline when trying to update LoRA weights")
115
+ return
116
+ if lora_weights:
117
+ for idx, lora in enumerate(lora_weights):
118
+ if _loaded_loras[idx].adapter_name != lora[0]:
119
+ print("Wrong adapter name in LoRA enumeration!")
120
+ continue
121
+ _loaded_loras[idx].weight = lora[1]
122
+
123
+ adapter_names = []
124
+ adapter_weights = []
125
+ if lcm_diffusion_setting.use_lcm_lora:
126
+ adapter_names.append("lcm")
127
+ adapter_weights.append(1.0)
128
+ for lora in _loaded_loras:
129
+ adapter_names.append(lora.adapter_name)
130
+ adapter_weights.append(lora.weight)
131
+ pipeline.set_adapters(
132
+ adapter_names,
133
+ adapter_weights=adapter_weights,
134
+ )
135
+ adapter_weights = zip(adapter_names, adapter_weights)
136
+ print(f"Adapters: {list(adapter_weights)}")
src/backend/safety_checker.py ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Any
2
+
3
+ from transformers import pipeline
4
+
5
+ from constants import SAFETY_CHECKER_MODEL
6
+
7
+
8
+ class SafetyChecker:
9
+ """A class to check if an image is NSFW or not."""
10
+
11
+ def __init__(
12
+ self,
13
+ mode_id: str = SAFETY_CHECKER_MODEL,
14
+ ):
15
+ self.classifier = pipeline(
16
+ "image-classification",
17
+ model=mode_id,
18
+ )
19
+
20
+ def is_safe(
21
+ self,
22
+ image: Any,
23
+ ) -> bool:
24
+ pred = self.classifier(image)
25
+ scores = {label["label"]: label["score"] for label in pred}
26
+ nsfw_score = scores.get("nsfw", 0)
27
+ normal_score = scores.get("normal", 0)
28
+ print(f"NSFW score: {nsfw_score}, Normal score: {normal_score}")
29
+ return normal_score > nsfw_score
src/backend/tiny_autoencoder.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from constants import (
2
+ TAESD_MODEL,
3
+ TAESDXL_MODEL,
4
+ TAESD_MODEL_OPENVINO,
5
+ TAESDXL_MODEL_OPENVINO,
6
+ TAEF1_MODEL_OPENVINO,
7
+ )
8
+
9
+
10
+ def get_tiny_autoencoder_repo_id(pipeline_class) -> str:
11
+ print(f"Pipeline class : {pipeline_class}")
12
+ if (
13
+ pipeline_class == "LatentConsistencyModelPipeline"
14
+ or pipeline_class == "StableDiffusionPipeline"
15
+ or pipeline_class == "StableDiffusionImg2ImgPipeline"
16
+ or pipeline_class == "StableDiffusionControlNetPipeline"
17
+ or pipeline_class == "StableDiffusionControlNetImg2ImgPipeline"
18
+ ):
19
+ return TAESD_MODEL
20
+ elif (
21
+ pipeline_class == "StableDiffusionXLPipeline"
22
+ or pipeline_class == "StableDiffusionXLImg2ImgPipeline"
23
+ ):
24
+ return TAESDXL_MODEL
25
+ elif (
26
+ pipeline_class == "OVStableDiffusionPipeline"
27
+ or pipeline_class == "OVStableDiffusionImg2ImgPipeline"
28
+ ):
29
+ return TAESD_MODEL_OPENVINO
30
+ elif (
31
+ pipeline_class == "OVStableDiffusionXLPipeline"
32
+ or pipeline_class == "OVStableDiffusionXLImg2ImgPipeline"
33
+ ):
34
+ return TAESDXL_MODEL_OPENVINO
35
+ elif pipeline_class == "OVFluxPipeline":
36
+ return TAEF1_MODEL_OPENVINO
37
+ else:
38
+ raise ValueError(
39
+ f"Tiny autoencoder not available for the pipeline class {pipeline_class}!"
40
+ )
src/backend/utils.py ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from PIL import Image
2
+
3
+
4
+ def get_blank_image(
5
+ width: int,
6
+ height: int,
7
+ ) -> Image.Image:
8
+ """
9
+ Create a blank image with the specified width and height.
10
+
11
+ Args:
12
+ width (int): The width of the image.
13
+ height (int): The height of the image.
14
+
15
+ Returns:
16
+ Image.Image: A blank image with the specified dimensions.
17
+ """
18
+ return Image.new("RGB", (width, height), (0, 0, 0))