import os import random import sys from typing import Sequence, Mapping, Any, Union import torch import gradio as gr from huggingface_hub import hf_hub_download import spaces from comfy import model_management hf_hub_download(repo_id="black-forest-labs/FLUX.1-Redux-dev", filename="flux1-redux-dev.safetensors", local_dir="models/style_models") hf_hub_download(repo_id="black-forest-labs/FLUX.1-Depth-dev", filename="flux1-depth-dev.safetensors", local_dir="models/diffusion_models") hf_hub_download(repo_id="Comfy-Org/sigclip_vision_384", filename="sigclip_vision_patch14_384.safetensors", local_dir="models/clip_vision") hf_hub_download(repo_id="Kijai/DepthAnythingV2-safetensors", filename="depth_anything_v2_vitl_fp32.safetensors", local_dir="models/depthanything") hf_hub_download(repo_id="black-forest-labs/FLUX.1-dev", filename="ae.safetensors", local_dir="models/vae/FLUX1") hf_hub_download(repo_id="comfyanonymous/flux_text_encoders", filename="clip_l.safetensors", local_dir="models/text_encoders") hf_hub_download(repo_id="comfyanonymous/flux_text_encoders", filename="t5xxl_fp16.safetensors", local_dir="models/text_encoders/t5") def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: """Returns the value at the given index of a sequence or mapping. If the object is a sequence (like list or string), returns the value at the given index. If the object is a mapping (like a dictionary), returns the value at the index-th key. Some return a dictionary, in these cases, we look for the "results" key Args: obj (Union[Sequence, Mapping]): The object to retrieve the value from. index (int): The index of the value to retrieve. Returns: Any: The value at the given index. Raises: IndexError: If the index is out of bounds for the object and the object is not a mapping. """ try: return obj[index] except KeyError: return obj["result"][index] def find_path(name: str, path: str = None) -> str: """ Recursively looks at parent folders starting from the given path until it finds the given name. Returns the path as a Path object if found, or None otherwise. """ # If no path is given, use the current working directory if path is None: path = os.getcwd() # Check if the current directory contains the name if name in os.listdir(path): path_name = os.path.join(path, name) print(f"{name} found: {path_name}") return path_name # Get the parent directory parent_directory = os.path.dirname(path) # If the parent directory is the same as the current directory, we've reached the root and stop the search if parent_directory == path: return None # Recursively call the function with the parent directory return find_path(name, parent_directory) def add_comfyui_directory_to_sys_path() -> None: """ Add 'ComfyUI' to the sys.path """ comfyui_path = find_path("ComfyUI") if comfyui_path is not None and os.path.isdir(comfyui_path): sys.path.append(comfyui_path) print(f"'{comfyui_path}' added to sys.path") def add_extra_model_paths() -> None: """ Parse the optional extra_model_paths.yaml file and add the parsed paths to the sys.path. """ try: from main import load_extra_path_config except ImportError: print( "Could not import load_extra_path_config from main.py. Looking in utils.extra_config instead." ) from utils.extra_config import load_extra_path_config extra_model_paths = find_path("extra_model_paths.yaml") if extra_model_paths is not None: load_extra_path_config(extra_model_paths) else: print("Could not find the extra_model_paths config file.") add_comfyui_directory_to_sys_path() add_extra_model_paths() def import_custom_nodes() -> None: """Find all custom nodes in the custom_nodes folder and add those node objects to NODE_CLASS_MAPPINGS This function sets up a new asyncio event loop, initializes the PromptServer, creates a PromptQueue, and initializes the custom nodes. """ import asyncio import execution from nodes import init_extra_nodes import server # Creating a new event loop and setting it as the default loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Creating an instance of PromptServer with the loop server_instance = server.PromptServer(loop) execution.PromptQueue(server_instance) # Initializing custom nodes init_extra_nodes() from nodes import NODE_CLASS_MAPPINGS intconstant = NODE_CLASS_MAPPINGS["INTConstant"]() dualcliploader = NODE_CLASS_MAPPINGS["DualCLIPLoader"]() #To be added to `model_loaders` as it loads a model dualcliploader_357 = dualcliploader.load_clip( clip_name1="t5/t5xxl_fp16.safetensors", clip_name2="clip_l.safetensors", type="flux", ) cr_clip_input_switch = NODE_CLASS_MAPPINGS["CR Clip Input Switch"]() cliptextencode = NODE_CLASS_MAPPINGS["CLIPTextEncode"]() loadimage = NODE_CLASS_MAPPINGS["LoadImage"]() imageresize = NODE_CLASS_MAPPINGS["ImageResize+"]() getimagesizeandcount = NODE_CLASS_MAPPINGS["GetImageSizeAndCount"]() vaeloader = NODE_CLASS_MAPPINGS["VAELoader"]() #To be added to `model_loaders` as it loads a model vaeloader_359 = vaeloader.load_vae(vae_name="FLUX1/ae.safetensors") vaeencode = NODE_CLASS_MAPPINGS["VAEEncode"]() unetloader = NODE_CLASS_MAPPINGS["UNETLoader"]() #To be added to `model_loaders` as it loads a model unetloader_358 = unetloader.load_unet( unet_name="flux1-depth-dev.safetensors", weight_dtype="default" ) ksamplerselect = NODE_CLASS_MAPPINGS["KSamplerSelect"]() randomnoise = NODE_CLASS_MAPPINGS["RandomNoise"]() fluxguidance = NODE_CLASS_MAPPINGS["FluxGuidance"]() depthanything_v2 = NODE_CLASS_MAPPINGS["DepthAnything_V2"]() downloadandloaddepthanythingv2model = NODE_CLASS_MAPPINGS[ "DownloadAndLoadDepthAnythingV2Model" ]() #To be added to `model_loaders` as it loads a model downloadandloaddepthanythingv2model_437 = ( downloadandloaddepthanythingv2model.loadmodel( model="depth_anything_v2_vitl_fp32.safetensors" ) ) instructpixtopixconditioning = NODE_CLASS_MAPPINGS[ "InstructPixToPixConditioning" ]() text_multiline_454 = text_multiline.text_multiline(text="FLUX_Redux") clipvisionloader = NODE_CLASS_MAPPINGS["CLIPVisionLoader"]() #To be added to `model_loaders` as it loads a model clipvisionloader_438 = clipvisionloader.load_clip( clip_name="sigclip_vision_patch14_384.safetensors" ) clipvisionencode = NODE_CLASS_MAPPINGS["CLIPVisionEncode"]() stylemodelloader = NODE_CLASS_MAPPINGS["StyleModelLoader"]() #To be added to `model_loaders` as it loads a model stylemodelloader_441 = stylemodelloader.load_style_model( style_model_name="flux1-redux-dev.safetensors" ) text_multiline = NODE_CLASS_MAPPINGS["Text Multiline"]() emptylatentimage = NODE_CLASS_MAPPINGS["EmptyLatentImage"]() cr_conditioning_input_switch = NODE_CLASS_MAPPINGS[ "CR Conditioning Input Switch" ]() cr_model_input_switch = NODE_CLASS_MAPPINGS["CR Model Input Switch"]() stylemodelapplyadvanced = NODE_CLASS_MAPPINGS["StyleModelApplyAdvanced"]() basicguider = NODE_CLASS_MAPPINGS["BasicGuider"]() basicscheduler = NODE_CLASS_MAPPINGS["BasicScheduler"]() samplercustomadvanced = NODE_CLASS_MAPPINGS["SamplerCustomAdvanced"]() vaedecode = NODE_CLASS_MAPPINGS["VAEDecode"]() saveimage = NODE_CLASS_MAPPINGS["SaveImage"]() imagecrop = NODE_CLASS_MAPPINGS["ImageCrop+"]() #Add all the models that load a safetensors file model_loaders = [dualcliploader_357, vaeloader_359, unetloader_358, clipvisionloader_438, stylemodelloader_441, downloadandloaddepthanythingv2model_437] # Check which models are valid and how to best load them valid_models = [ getattr(loader[0], 'patcher', loader[0]) for loader in model_loaders if not isinstance(loader[0], dict) and not isinstance(getattr(loader[0], 'patcher', None), dict) ] #Finally loads the models model_management.load_models_gpu(valid_models) @spaces.GPU(duration=60) def generate_image(prompt, structure_image, style_image, depth_strength, style_strength): import_custom_nodes() with torch.inference_mode(): intconstant_83 = intconstant.get_value(value=1024) intconstant_84 = intconstant.get_value(value=1024) cr_clip_input_switch_319 = cr_clip_input_switch.switch( Input=1, clip1=get_value_at_index(dualcliploader_357, 0), clip2=get_value_at_index(dualcliploader_357, 0), ) cliptextencode_174 = cliptextencode.encode( text=prompt, clip=get_value_at_index(cr_clip_input_switch_319, 0), ) cliptextencode_175 = cliptextencode.encode( text="purple", clip=get_value_at_index(cr_clip_input_switch_319, 0) ) loadimage_429 = loadimage.load_image(image=structure_image) imageresize_72 = imageresize.execute( width=get_value_at_index(intconstant_83, 0), height=get_value_at_index(intconstant_84, 0), interpolation="bicubic", method="keep proportion", condition="always", multiple_of=16, image=get_value_at_index(loadimage_429, 0), ) getimagesizeandcount_360 = getimagesizeandcount.getsize( image=get_value_at_index(imageresize_72, 0) ) vaeencode_197 = vaeencode.encode( pixels=get_value_at_index(getimagesizeandcount_360, 0), vae=get_value_at_index(vaeloader_359, 0), ) ksamplerselect_363 = ksamplerselect.get_sampler(sampler_name="euler") randomnoise_365 = randomnoise.get_noise(noise_seed=random.randint(1, 2**64)) fluxguidance_430 = fluxguidance.append( guidance=15, conditioning=get_value_at_index(cliptextencode_174, 0) ) depthanything_v2_436 = depthanything_v2.process( da_model=get_value_at_index(downloadandloaddepthanythingv2model_437, 0), images=get_value_at_index(getimagesizeandcount_360, 0), ) instructpixtopixconditioning_431 = instructpixtopixconditioning.encode( positive=get_value_at_index(fluxguidance_430, 0), negative=get_value_at_index(cliptextencode_175, 0), vae=get_value_at_index(vaeloader_359, 0), pixels=get_value_at_index(depthanything_v2_436, 0), ) loadimage_440 = loadimage.load_image(image=style_image) clipvisionencode_439 = clipvisionencode.encode( crop="center", clip_vision=get_value_at_index(clipvisionloader_438, 0), image=get_value_at_index(loadimage_440, 0), ) emptylatentimage_10 = emptylatentimage.generate( width=get_value_at_index(imageresize_72, 1), height=get_value_at_index(imageresize_72, 2), batch_size=1, ) cr_conditioning_input_switch_271 = cr_conditioning_input_switch.switch( Input=1, conditioning1=get_value_at_index(instructpixtopixconditioning_431, 0), conditioning2=get_value_at_index(instructpixtopixconditioning_431, 0), ) cr_conditioning_input_switch_272 = cr_conditioning_input_switch.switch( Input=1, conditioning1=get_value_at_index(instructpixtopixconditioning_431, 1), conditioning2=get_value_at_index(instructpixtopixconditioning_431, 1), ) cr_model_input_switch_320 = cr_model_input_switch.switch( Input=1, model1=get_value_at_index(unetloader_358, 0), model2=get_value_at_index(unetloader_358, 0), ) stylemodelapplyadvanced_442 = stylemodelapplyadvanced.apply_stylemodel( strength=style_strength, conditioning=get_value_at_index(instructpixtopixconditioning_431, 0), style_model=get_value_at_index(stylemodelloader_441, 0), clip_vision_output=get_value_at_index(clipvisionencode_439, 0), ) basicguider_366 = basicguider.get_guider( model=get_value_at_index(cr_model_input_switch_320, 0), conditioning=get_value_at_index(stylemodelapplyadvanced_442, 0), ) basicscheduler_364 = basicscheduler.get_sigmas( scheduler="simple", steps=28, denoise=1, model=get_value_at_index(cr_model_input_switch_320, 0), ) samplercustomadvanced_362 = samplercustomadvanced.sample( noise=get_value_at_index(randomnoise_365, 0), guider=get_value_at_index(basicguider_366, 0), sampler=get_value_at_index(ksamplerselect_363, 0), sigmas=get_value_at_index(basicscheduler_364, 0), latent_image=get_value_at_index(emptylatentimage_10, 0), ) vaedecode_321 = vaedecode.decode( samples=get_value_at_index(samplercustomadvanced_362, 0), vae=get_value_at_index(vaeloader_359, 0), ) saveimage_327 = saveimage.save_images( filename_prefix=get_value_at_index(text_multiline_454, 0), images=get_value_at_index(vaedecode_321, 0), ) fluxguidance_382 = fluxguidance.append( guidance=depth_strength, conditioning=get_value_at_index(cr_conditioning_input_switch_272, 0), ) imagecrop_447 = imagecrop.execute( width=2000, height=2000, position="top-center", x_offset=0, y_offset=0, image=get_value_at_index(loadimage_440, 0), ) saved_path = f"output/{saveimage_327['ui']['images'][0]['filename']}" return saved_path if __name__ == "__main__": # Comment out the main() call # Start your Gradio app with gr.Blocks() as app: # Add a title gr.Markdown("# FLUX Style Shaping") with gr.Row(): with gr.Column(): # Add an input prompt_input = gr.Textbox(label="Prompt", placeholder="Enter your prompt here...") # Add a `Row` to include the groups side by side with gr.Row(): # First group includes structure image and depth strength with gr.Group(): structure_image = gr.Image(label="Structure Image", type="filepath") depth_strength = gr.Slider(minimum=0, maximum=50, value=15, label="Depth Strength") # Second group includes style image and style strength with gr.Group(): style_image = gr.Image(label="Style Image", type="filepath") style_strength = gr.Slider(minimum=0, maximum=1, value=0.5, label="Style Strength") # The generate button generate_btn = gr.Button("Generate") with gr.Column(): # The output image output_image = gr.Image(label="Generated Image") # When clicking the button, it will trigger the `generate_image` function, with the respective inputs # and the output an image generate_btn.click( fn=generate_image, inputs=[prompt_input, structure_image, style_image, depth_strength, style_strength], outputs=[output_image] ) app.launch(share=True)