Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import sys | |
import random | |
from typing import Sequence, Mapping, Any, Union | |
import torch | |
import gradio as gr | |
from PIL import Image | |
from huggingface_hub import hf_hub_download | |
import spaces # Se estiver no Hugging Face Spaces. Se não, pode remover. | |
##################################### | |
# 1. Funções auxiliares de caminho e import | |
##################################### | |
def find_path(name: str, path: str = None) -> str: | |
"""Busca recursivamente por uma pasta/arquivo 'name' a partir de 'path'.""" | |
if path is None: | |
path = os.getcwd() | |
if name in os.listdir(path): | |
path_name = os.path.join(path, name) | |
print(f"{name} encontrado em: {path_name}") | |
return path_name | |
parent_directory = os.path.dirname(path) | |
if parent_directory == path: | |
return None | |
return find_path(name, parent_directory) | |
def add_comfyui_directory_to_sys_path() -> None: | |
"""Adiciona o diretório ComfyUI ao sys.path, caso encontrado.""" | |
comfyui_path = find_path("ComfyUI") | |
if comfyui_path is not None and os.path.isdir(comfyui_path): | |
sys.path.append(comfyui_path) | |
print(f"Diretório ComfyUI adicionado ao sys.path: {comfyui_path}") | |
else: | |
print("Não foi possível encontrar o diretório ComfyUI.") | |
def add_extra_model_paths() -> None: | |
""" | |
Carrega configurações extras de caminhos de modelos, se existir | |
um arquivo 'extra_model_paths.yaml'. | |
""" | |
try: | |
from main import load_extra_path_config | |
except ImportError: | |
# Dependendo da versão do ComfyUI, pode estar em 'utils.extra_config' | |
from utils.extra_config import load_extra_path_config | |
extra_model_paths = find_path("extra_model_paths.yaml") | |
if extra_model_paths is not None: | |
load_extra_path_config(extra_model_paths) | |
else: | |
print("Arquivo extra_model_paths.yaml não foi encontrado.") | |
def import_custom_nodes() -> None: | |
""" | |
Executa a inicialização de nós extras e o servidor do ComfyUI (caso necessário), | |
similar ao que ocorre no segundo script. | |
""" | |
import asyncio | |
import execution | |
from nodes import init_extra_nodes | |
import server | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
server_instance = server.PromptServer(loop) | |
execution.PromptQueue(server_instance) | |
init_extra_nodes() | |
##################################### | |
# 2. Ajustando o ambiente ComfyUI | |
##################################### | |
add_comfyui_directory_to_sys_path() | |
add_extra_model_paths() | |
import_custom_nodes() | |
##################################### | |
# 3. Importando nós do ComfyUI | |
##################################### | |
from comfy import model_management | |
from nodes import ( | |
NODE_CLASS_MAPPINGS, | |
DualCLIPLoader, | |
CLIPVisionLoader, | |
StyleModelLoader, | |
VAELoader, | |
CLIPTextEncode, | |
LoadImage, | |
EmptyLatentImage, | |
VAEDecode | |
) | |
##################################### | |
# 4. Download de modelos (ajuste conforme sua necessidade) | |
##################################### | |
# Exemplo de downloads (ajuste conforme seus modelos): | |
os.makedirs("models/text_encoders", exist_ok=True) | |
os.makedirs("models/style_models", exist_ok=True) | |
os.makedirs("models/diffusion_models", exist_ok=True) | |
os.makedirs("models/vae", exist_ok=True) | |
os.makedirs("models/clip_vision", exist_ok=True) | |
try: | |
print("Baixando modelo Style (flux1-redux-dev.safetensors)...") | |
hf_hub_download(repo_id="black-forest-labs/FLUX.1-Redux-dev", | |
filename="flux1-redux-dev.safetensors", | |
local_dir="models/style_models") | |
print("Baixando T5 (t5xxl_fp16.safetensors)...") | |
hf_hub_download(repo_id="comfyanonymous/flux_text_encoders", | |
filename="t5xxl_fp16.safetensors", | |
local_dir="models/text_encoders") | |
print("Baixando CLIP L (ViT-L-14) ...") | |
hf_hub_download(repo_id="zer0int/CLIP-GmP-ViT-L-14", | |
filename="ViT-L-14-TEXT-detail-improved-hiT-GmP-HF.safetensors", | |
local_dir="models/text_encoders") | |
print("Baixando VAE (ae.safetensors)...") | |
hf_hub_download(repo_id="black-forest-labs/FLUX.1-dev", | |
filename="ae.safetensors", | |
local_dir="models/vae") | |
print("Baixando flux1-dev.safetensors (modelo difusão)...") | |
hf_hub_download(repo_id="black-forest-labs/FLUX.1-dev", | |
filename="flux1-dev.safetensors", | |
local_dir="models/diffusion_models") | |
print("Baixando CLIP Vision (model.safetensors)...") | |
hf_hub_download(repo_id="google/siglip-so400m-patch14-384", | |
filename="model.safetensors", | |
local_dir="models/clip_vision") | |
except Exception as e: | |
print("Algum download falhou:", e) | |
##################################### | |
# 5. Carregar modelos via ComfyUI | |
##################################### | |
# Carregando CLIP (DualCLIPLoader) | |
dualcliploader = DualCLIPLoader() | |
clip_model = dualcliploader.load_clip( | |
clip_name1="t5xxl_fp16.safetensors", | |
clip_name2="ViT-L-14-TEXT-detail-improved-hiT-GmP-HF.safetensors", | |
type="flux" | |
) | |
# Carregando CLIP Vision | |
clipvisionloader = CLIPVisionLoader() | |
clip_vision_model = clipvisionloader.load_clip( | |
clip_name="model.safetensors" | |
) | |
# Carregando Style Model | |
stylemodelloader = StyleModelLoader() | |
style_model = stylemodelloader.load_style_model( | |
style_model_name="flux1-redux-dev.safetensors" | |
) | |
# Carregando VAE | |
vaeloader = VAELoader() | |
vae_model = vaeloader.load_vae( | |
vae_name="ae.safetensors" | |
) | |
# (Opcional) Se tiver um model UNet, faça UNETLoader, etc. | |
# Opcional: Carregar para GPU | |
model_management.load_models_gpu([ | |
loader[0] for loader in [clip_model, clip_vision_model, style_model, vae_model] | |
]) | |
##################################### | |
# 6. Funções auxiliares e placeholders | |
##################################### | |
def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: | |
"""Retorna o 'index' de um objeto que pode ser um dict ou lista.""" | |
try: | |
return obj[index] | |
except KeyError: | |
return obj["result"][index] | |
##################################### | |
# 7. Definir workflow simplificado | |
##################################### | |
# Se estiver no Hugging Face Spaces. Senão, remova. | |
def generate_image( | |
prompt: str, | |
input_image_path: str, | |
lora_weight: float, | |
guidance: float, | |
downsampling_factor: float, | |
weight: float, | |
seed: int, | |
width: int, | |
height: int, | |
batch_size: int, | |
steps: int, | |
progress=gr.Progress(track_tqdm=True) | |
): | |
""" | |
Gera imagem usando um fluxo simplificado, similar ao primeiro script. | |
""" | |
try: | |
# Garantindo repetibilidade do seed | |
torch.manual_seed(seed) | |
random.seed(seed) | |
# 1) Encode Texto | |
cliptextencode = CLIPTextEncode() | |
encoded_text = cliptextencode.encode( | |
text=prompt, | |
clip=get_value_at_index(clip_model, 0) | |
) | |
# 2) Carregar imagem de entrada | |
loadimage = LoadImage() | |
loaded_image = loadimage.load_image(image=input_image_path) | |
# 3) Flux Guidance (se existir) | |
fluxguidance = NODE_CLASS_MAPPINGS["FluxGuidance"]() | |
flux_guided = fluxguidance.append( | |
guidance=guidance, | |
conditioning=get_value_at_index(encoded_text, 0) | |
) | |
# 4) Redux Advanced (aplicar style model) | |
reduxadvanced = NODE_CLASS_MAPPINGS["ReduxAdvanced"]() | |
redux_result = reduxadvanced.apply_stylemodel( | |
downsampling_factor=downsampling_factor, | |
downsampling_function="area", | |
mode="keep aspect ratio", | |
weight=weight, | |
conditioning=get_value_at_index(flux_guided, 0), | |
style_model=get_value_at_index(style_model, 0), | |
clip_vision=get_value_at_index(clip_vision_model, 0), | |
image=get_value_at_index(loaded_image, 0) | |
) | |
# 5) Empty Latent | |
emptylatent = EmptyLatentImage() | |
empty_latent = emptylatent.generate( | |
width=width, | |
height=height, | |
batch_size=batch_size | |
) | |
# 6) KSampler (no ComfyUI atual, há "KSamplerSelect" ou "KSampler") | |
ksampler = NODE_CLASS_MAPPINGS["KSampler"]() | |
sampled = ksampler.sample( | |
seed=seed, | |
steps=steps, | |
cfg=1, # Exemplo de CFG = 1 | |
sampler_name="euler", | |
scheduler="simple", | |
denoise=1, | |
model=get_value_at_index(style_model, 0), # Usa o style model como UNet? (depende da config) | |
positive=get_value_at_index(redux_result, 0), | |
negative=get_value_at_index(flux_guided, 0), | |
latent_image=get_value_at_index(empty_latent, 0) | |
) | |
# 7) Decodificar VAE | |
vaedecode = VAEDecode() | |
decoded = vaedecode.decode( | |
samples=get_value_at_index(sampled, 0), | |
vae=get_value_at_index(vae_model, 0) | |
) | |
# 8) Salvar imagem | |
output_dir = "output" | |
os.makedirs(output_dir, exist_ok=True) | |
temp_filename = f"Flux_{random.randint(0, 99999)}.png" | |
temp_path = os.path.join(output_dir, temp_filename) | |
# No ComfyUI, 'decoded[0]' pode ser um tensor [C,H,W] normalizado | |
# ou algo no formato [N,C,H,W]. Precisamos converter para PIL: | |
# Se for um batch, pegue o primeiro item. Ajuste se quiser batch maior. | |
image_data = get_value_at_index(decoded, 0) | |
# Normalmente, se for "float [0,1]" em C,H,W: | |
# Precisamos mover pro CPU e converter em numpy | |
if isinstance(image_data, torch.Tensor): | |
image_data = image_data.cpu().numpy() | |
# Se a imagem estiver em [C,H,W], transpor para [H,W,C] e escalar 0..255 | |
if len(image_data.shape) == 3: | |
image_data = image_data.transpose(1, 2, 0) | |
image_data = (image_data * 255).clip(0, 255).astype("uint8") | |
pil_image = Image.fromarray(image_data) | |
pil_image.save(temp_path) | |
return temp_path | |
except Exception as e: | |
print(f"Erro ao gerar imagem: {str(e)}") | |
return None | |
##################################### | |
# 8. Interface Gradio (similar ao primeiro snippet) | |
##################################### | |
with gr.Blocks() as app: | |
gr.Markdown("# FLUX Redux Image Generator (Simplificado)") | |
with gr.Row(): | |
with gr.Column(): | |
prompt_input = gr.Textbox( | |
label="Prompt", | |
placeholder="Escreva seu prompt...", | |
lines=5 | |
) | |
input_image = gr.Image( | |
label="Imagem de Entrada", | |
type="filepath" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
lora_weight = gr.Slider( | |
minimum=0, | |
maximum=2, | |
step=0.1, | |
value=0.6, | |
label="LoRA Weight (não usado nesse fluxo)" | |
) | |
guidance = gr.Slider( | |
minimum=0, | |
maximum=20, | |
step=0.1, | |
value=3.5, | |
label="Guidance" | |
) | |
downsampling_factor = gr.Slider( | |
minimum=1, | |
maximum=8, | |
step=1, | |
value=3, | |
label="Downsampling Factor" | |
) | |
weight = gr.Slider( | |
minimum=0, | |
maximum=2, | |
step=0.1, | |
value=1.0, | |
label="Redux Model Weight" | |
) | |
with gr.Column(): | |
seed = gr.Number( | |
value=random.randint(1, 2**64), | |
label="Seed", | |
precision=0 | |
) | |
width = gr.Number( | |
value=512, | |
label="Width", | |
precision=0 | |
) | |
height = gr.Number( | |
value=512, | |
label="Height", | |
precision=0 | |
) | |
batch_size = gr.Number( | |
value=1, | |
label="Batch Size", | |
precision=0 | |
) | |
steps = gr.Number( | |
value=20, | |
label="Steps", | |
precision=0 | |
) | |
generate_btn = gr.Button("Generate Image") | |
with gr.Column(): | |
output_image = gr.Image(label="Generated Image", type="filepath") | |
generate_btn.click( | |
fn=generate_image, | |
inputs=[ | |
prompt_input, | |
input_image, | |
lora_weight, | |
guidance, | |
downsampling_factor, | |
weight, | |
seed, | |
width, | |
height, | |
batch_size, | |
steps | |
], | |
outputs=[output_image] | |
) | |
if __name__ == "__main__": | |
# Você pode usar app.launch(share=True) se quiser compartilhar via link. | |
app.launch() | |