Spaces:
Build error
Build error
| import spaces | |
| import gradio as gr | |
| import numpy as np | |
| #import tensorrt as trt | |
| import random | |
| import torch | |
| from diffusers import StableDiffusion3Pipeline, AutoencoderKL, StableDiffusionXLImg2ImgPipeline, EDMEulerScheduler, DPMSolverMultistepScheduler, EulerAncestralDiscreteScheduler | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer | |
| from threading import Thread | |
| from transformers import pipeline | |
| from transformers import T5Tokenizer, T5ForConditionalGeneration | |
| import re | |
| import paramiko | |
| import urllib | |
| import time | |
| import os | |
| FTP_HOST = "1ink.us" | |
| FTP_USER = "ford442" | |
| FTP_PASS = "GoogleBez12!" | |
| FTP_DIR = "1ink.us/stable_diff/" # Remote directory on FTP server | |
| torch.backends.cuda.matmul.allow_tf32 = False | |
| torch.backends.cuda.matmul.allow_bf16_reduced_precision_reduction = False | |
| torch.backends.cuda.matmul.allow_fp16_reduced_precision_reduction = False | |
| torch.backends.cudnn.allow_tf32 = False | |
| torch.backends.cudnn.deterministic = False | |
| torch.backends.cudnn.benchmark = False | |
| torch.backends.cuda.preferred_blas_library="cublas" | |
| torch.backends.cuda.preferred_linalg_library="cusolver" | |
| torch.set_float32_matmul_precision("highest") | |
| hftoken = os.getenv("HF_AUTH_TOKEN") | |
| def upload_to_ftp(filename): | |
| try: | |
| transport = paramiko.Transport((FTP_HOST, 22)) | |
| destination_path=FTP_DIR+filename | |
| transport.connect(username = FTP_USER, password = FTP_PASS) | |
| sftp = paramiko.SFTPClient.from_transport(transport) | |
| sftp.put(filename, destination_path) | |
| sftp.close() | |
| transport.close() | |
| print(f"Uploaded {filename} to FTP server") | |
| except Exception as e: | |
| print(f"FTP upload error: {e}") | |
| device = torch.device("cuda") | |
| torch_dtype = torch.bfloat16 | |
| checkpoint = "microsoft/Phi-3.5-mini-instruct" | |
| #vae = AutoencoderKL.from_pretrained("madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16) | |
| vae = AutoencoderKL.from_pretrained("ford442/sdxl-vae-bf16", torch_dtype=torch.bfloat16, device_map='balanced') | |
| pipe = StableDiffusion3Pipeline.from_pretrained("ford442/stable-diffusion-3.5-medium-bf16", torch_dtype=torch.bfloat16, device_map='balanced') | |
| #pipe = StableDiffusion3Pipeline.from_pretrained("stabilityai/stable-diffusion-3.5-medium", token=hftoken, torch_dtype=torch.float32, device_map='balanced') | |
| # pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config, use_karras_sigmas=True, algorithm_type="sde-dpmsolver++") | |
| #pipe.scheduler.config.requires_aesthetics_score = False | |
| #pipe.enable_model_cpu_offload() | |
| #pipe.to(device) | |
| #pipe = torch.compile(pipe) | |
| # pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(pipe.scheduler.config, beta_schedule="scaled_linear") | |
| refiner = StableDiffusionXLImg2ImgPipeline.from_pretrained("ford442/stable-diffusion-xl-refiner-1.0-bf16", vae=vae, torch_dtype=torch.bfloat16, use_safetensors=True, requires_aesthetics_score=True, device_map='balanced') | |
| #refiner = StableDiffusionXLImg2ImgPipeline.from_pretrained("stabilityai/stable-diffusion-xl-refiner-1.0", vae=vae, torch_dtype=torch.float32, requires_aesthetics_score=True, device_map='balanced') | |
| #refiner.enable_model_cpu_offload() | |
| #refiner.scheduler.config.requires_aesthetics_score=False | |
| #refiner.to(device) | |
| #refiner = torch.compile(refiner) | |
| refiner.scheduler = EulerAncestralDiscreteScheduler.from_config(refiner.scheduler.config, beta_schedule="scaled_linear") | |
| tokenizer = AutoTokenizer.from_pretrained(checkpoint, add_prefix_space=False, device_map='balanced') | |
| tokenizer.tokenizer_legacy=False | |
| model = AutoModelForCausalLM.from_pretrained(checkpoint, device_map='balanced') | |
| #model = torch.compile(model) | |
| def filter_text(text): | |
| """Filters out the text up to and including 'Rewritten Prompt:'.""" | |
| pattern = r".*?Rewritten Prompt:\s*" # Matches any characters up to 'Rewritten Prompt:' | |
| filtered_text = re.sub(pattern, "", text,flags=re.DOTALL) # Removes the matched pattern from the text | |
| return filtered_text | |
| MAX_SEED = np.iinfo(np.int32).max | |
| MAX_IMAGE_SIZE = 4096 | |
| def infer( | |
| prompt, | |
| negative_prompt, | |
| seed, | |
| randomize_seed, | |
| width, | |
| height, | |
| guidance_scale, | |
| num_inference_steps, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| seed = random.randint(0, MAX_SEED) | |
| generator = torch.Generator(device='cuda').manual_seed(seed) | |
| system_prompt_rewrite = ( | |
| "You are an AI assistant that rewrites image prompts to be more descriptive and detailed." | |
| ) | |
| user_prompt_rewrite = ( | |
| "Rewrite this prompt to be more descriptive and detailed: " | |
| ) | |
| input_text = f"{system_prompt_rewrite} {user_prompt_rewrite} {prompt}" | |
| print("-- got prompt --") | |
| # Encode the input text and include the attention mask | |
| encoded_inputs = tokenizer( | |
| input_text, return_tensors="pt", return_attention_mask=True | |
| ) | |
| # Ensure all values are on the correct device | |
| input_ids = encoded_inputs["input_ids"].to(device) | |
| attention_mask = encoded_inputs["attention_mask"].to(device) | |
| print("-- tokenize prompt --") | |
| # Google T5 | |
| input_ids = tokenizer(input_text, return_tensors="pt").input_ids.to("cuda") | |
| outputs = model.generate( | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| max_new_tokens=65, | |
| temperature=0.2, | |
| top_p=0.9, | |
| do_sample=True, | |
| ) | |
| # Use the encoded tensor 'text_inputs' here | |
| enhanced_prompt = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| print('-- generated prompt --') | |
| print(enhanced_prompt) | |
| enhanced_prompt = filter_text(enhanced_prompt) | |
| print('-- filtered prompt --') | |
| print(enhanced_prompt) | |
| print('-- generating image --') | |
| sd_image = pipe( | |
| prompt=enhanced_prompt, # This conversion is fine | |
| negative_prompt=negative_prompt, | |
| guidance_scale=guidance_scale, | |
| num_inference_steps=num_inference_steps, | |
| width=width, | |
| height=height, | |
| generator=generator | |
| ).images[0] | |
| print('-- got image --') | |
| image_path = f"sd35m_{seed}.png" | |
| sd_image.save(image_path,optimize=False,compress_level=0) | |
| upload_to_ftp(image_path) | |
| refine = refiner( | |
| prompt=f"{prompt}, high quality masterpiece, complex details", | |
| negative_prompt = negative_prompt, | |
| guidance_scale=7.5, | |
| num_inference_steps=num_inference_steps, | |
| image=sd_image, | |
| generator=generator, | |
| ).images[0] | |
| refine_path = f"refine_{seed}.png" | |
| refine.save(refine_path,optimize=False,compress_level=0) | |
| upload_to_ftp(refine_path) | |
| return refine, seed, refine_path, enhanced_prompt | |
| examples = [ | |
| "Astronaut in a jungle, cold color palette, muted colors, detailed, 8k", | |
| "An astronaut riding a green horse", | |
| "A delicious ceviche cheesecake slice", | |
| ] | |
| css = """ | |
| #col-container { | |
| margin: 0 auto; | |
| max-width: 640px; | |
| } | |
| """ | |
| def repeat_infer( | |
| prompt, | |
| negative_prompt, | |
| seed, | |
| randomize_seed, | |
| width, | |
| height, | |
| guidance_scale, | |
| num_inference_steps, | |
| num_iterations, # New input for number of iterations | |
| ): | |
| i = 0 | |
| while i < num_iterations: | |
| time.sleep(700) # Wait for 10 minutes (600 seconds) | |
| result, seed, image_path, enhanced_prompt = infer( | |
| prompt, | |
| negative_prompt, | |
| seed, | |
| randomize_seed, | |
| width, | |
| height, | |
| guidance_scale, | |
| num_inference_steps, | |
| ) | |
| # Optionally, you can add logic here to process the results of each iteration | |
| # For example, you could display the image, save it with a different name, etc. | |
| i += 1 | |
| return result, seed, image_path, enhanced_prompt | |
| with gr.Blocks(css=css) as demo: | |
| with gr.Column(elem_id="col-container"): | |
| gr.Markdown(" # Text-to-Text-to-Image StableDiffusion 3.5 Medium (with refine)") | |
| expanded_prompt_output = gr.Textbox(label="Expanded Prompt", lines=5) # Add this line | |
| gr.File(label="Latents File (optional)"), # Add a file input for latents | |
| with gr.Row(): | |
| prompt = gr.Text( | |
| label="Prompt", | |
| show_label=False, | |
| max_lines=1, | |
| placeholder="Enter your prompt", | |
| value="A captivating Christmas scene.", | |
| container=False, | |
| ) | |
| run_button = gr.Button("Run", scale=0, variant="primary") | |
| result = gr.Image(label="Result", show_label=False) | |
| with gr.Accordion("Advanced Settings", open=False): | |
| negative_prompt = gr.Text( | |
| label="Negative prompt", | |
| max_lines=1, | |
| placeholder="Enter a negative prompt", | |
| visible=False, | |
| ) | |
| num_iterations = gr.Number( | |
| value=1000, | |
| label="Number of Iterations") | |
| seed = gr.Slider( | |
| label="Seed", | |
| minimum=0, | |
| maximum=MAX_SEED, | |
| step=1, | |
| value=0, | |
| ) | |
| randomize_seed = gr.Checkbox(label="Randomize seed", value=True) | |
| with gr.Row(): | |
| width = gr.Slider( | |
| label="Width", | |
| minimum=256, | |
| maximum=MAX_IMAGE_SIZE, | |
| step=32, | |
| value=768, # Replace with defaults that work for your model | |
| ) | |
| height = gr.Slider( | |
| label="Height", | |
| minimum=256, | |
| maximum=MAX_IMAGE_SIZE, | |
| step=32, | |
| value=768, # Replace with defaults that work for your model | |
| ) | |
| guidance_scale = gr.Slider( | |
| label="Guidance scale", | |
| minimum=0.0, | |
| maximum=10.0, | |
| step=0.1, | |
| value=5.0, # Replace with defaults that work for your model | |
| ) | |
| num_inference_steps = gr.Slider( | |
| label="Number of inference steps", | |
| minimum=1, | |
| maximum=500, | |
| step=1, | |
| value=75, # Replace with defaults that work for your model | |
| ) | |
| save_button = gr.Button("Save Image") | |
| image_path_output = gr.Text(visible=False) # Hidden component to store the path | |
| save_button.click( | |
| fn=lambda image_path: None, # No-op function, the path is already available | |
| inputs=[image_path_output], | |
| outputs=None, | |
| ) | |
| gr.Examples(examples=examples, inputs=[prompt]) | |
| gr.on( | |
| triggers=[run_button.click, prompt.submit], | |
| fn=infer, | |
| inputs=[ | |
| prompt, | |
| negative_prompt, | |
| seed, | |
| randomize_seed, | |
| width, | |
| height, | |
| guidance_scale, | |
| num_inference_steps, | |
| ], | |
| outputs=[result, seed, image_path_output, expanded_prompt_output], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |