#img_gen_modal.py import modal import random import io from config.config import prompts, models_modal # Indirect import import os import gradio as gr #MOVED FROM IMAGE IMPORT LIST import torch import sentencepiece import torch from huggingface_hub import login from transformers import AutoTokenizer import random from datetime import datetime from diffusers.callbacks import SDXLCFGCutoffCallback from diffusers import DPMSolverMultistepScheduler, StableDiffusionPipeline, AutoencoderTiny, AutoencoderKL, DiffusionPipeline, FluxTransformer2DModel, GGUFQuantizationConfig from PIL import Image from src.check_dependecies import check_dependencies #import xformers from live_preview_helpers import flux_pipe_call_that_returns_an_iterable_of_images CACHE_DIR = "/model_cache" # Define the Modal image image = ( modal.Image.from_registry("nvidia/cuda:12.2.0-devel-ubuntu22.04", add_python="3.9") .pip_install_from_requirements("requirements.txt") #modal.Image.debian_slim(python_version="3.9") # Base image # .apt_install( # "git", # ) # .pip_install( # "diffusers", # f"git+https://github.com/huggingface/transformers.git" # ) .env( { "HF_HUB_ENABLE_HF_TRANSFER": "1", "HF_HOME": "HF_HOME", "HF_HUB_CACHE": CACHE_DIR } ) ) # Create a Modal app app = modal.App("img-gen-modal", image=image) with image.imports(): import os flux_model_vol = modal.Volume.from_name("flux-model-vol", create_if_missing=True) # Reference your volume # GPU FUNCTION @app.function(volumes={"/data": flux_model_vol}, secrets=[modal.Secret.from_name("huggingface-token")], gpu="L40S", timeout = 300 ) def generate_image_gpu(prompt_alias, team_color, model_alias, custom_prompt): image = generate_image(prompt_alias, team_color, model_alias, custom_prompt) return image, "Image generated successfully! Call the banners!" # CPU FUNCTION @app.function(volumes={"/data": flux_model_vol}, secrets=[modal.Secret.from_name("huggingface-token")], cpu = 1, timeout = 30000 ) def generate_image_cpu(prompt_alias, team_color, model_alias, custom_prompt): image = generate_image(prompt_alias, team_color, model_alias, custom_prompt) return image, "Image generated successfully! Call the banners!" # MAIN GENERATE IMAGE FUNCTION def generate_image( prompt_alias, team_color, model_alias, custom_prompt, height=360, width=640, num_inference_steps=20, guidance_scale=2.0, seed=-1, progress=gr.Progress(track_tqdm=True) # Add progress parameter ): with modal.enable_output(): print("Hello from ctb_modal!") check_dependencies() # Find the selected prompt and model try: prompt = next(p for p in prompts if p["alias"] == prompt_alias)["text"] model_name = next(m for m in models_modal if m["alias"] == model_alias)["name"] except StopIteration: return None, "ERROR: Invalid prompt or model selected." # Determine the enemy color enemy_color = "blue" if team_color.lower() == "red" else "red" # Print the original prompt and dynamic values for debugging print("Original Prompt:") print(prompt) print(f"Enemy Color: {enemy_color}") print(f"Team Color: {team_color.lower()}") prompt = prompt.format(team_color=team_color.lower(), enemy_color=enemy_color) # Print the formatted prompt for debugging print("\nFormatted Prompt:") print(prompt) # Append the custom prompt (if provided) if custom_prompt and len(custom_prompt.strip()) > 0: prompt += " " + custom_prompt.strip() # Randomize the seed if needed if seed == -1: seed = random.randint(0, 1000000) try: from diffusers import FluxPipeline print("Initializing HF TOKEN") hf_token = os.environ["HF_TOKEN"] print(hf_token) print("HF TOKEN:") login(token=hf_token) print("model_name:") print(model_name) # Use absolute path with leading slash local_path = f"/data/{model_name}" # Changed from "data/" to "/data/" print(f"Loading model from local path: {local_path}") # Debug: Check if the directory exists and list its contents if os.path.exists(local_path): print("Directory exists. Contents:") for item in os.listdir(local_path): print(f" - {item}") else: print(f"Directory does not exist: {local_path}") print("Contents of /data:") print(os.listdir("/data")) # CHECK FOR TORCH USING CUDA print("CHECK FOR TORCH USING CUDA") print(f"CUDA available: {torch.cuda.is_available()}") if torch.cuda.is_available(): print("inside if") print(f"CUDA device count: {torch.cuda.device_count()}") print(f"Current device: {torch.cuda.current_device()}") print(f"Device name: {torch.cuda.get_device_name(torch.cuda.current_device())}") ########## INITIALIZING CPU PIPE ########## # ########## LIVE PREVIEW FROM REPO DEMO PART 1 ########## # dtype = torch.bfloat16 # taef1 = AutoencoderTiny.from_pretrained("/data/taef1", torch_dtype=dtype).to(device) # good_vae = AutoencoderKL.from_pretrained(local_path, subfolder="vae", torch_dtype=dtype).to(device) # pipe.flux_pipe_call_that_returns_an_iterable_of_images = flux_pipe_call_that_returns_an_iterable_of_images.__get__(pipe) # ##################################################### print("-----LOADING QUANTA-----") ckpt_path = ( "/data/FLUX.1-dev-gguf/flux1-dev-Q8_0.gguf" ) transformer = FluxTransformer2DModel.from_single_file( ckpt_path, quantization_config=GGUFQuantizationConfig(compute_dtype=torch.bfloat16), torch_dtype=torch.bfloat16, )pip print("-----INITIALIZING PIPE-----") pipe = FluxPipeline.from_pretrained( local_path, torch_dtype=torch.bfloat16, transformer=transformer, #torch_dtype=torch.float16, #torch_dtype=torch.float32, #vae=taef1, local_files_only=True, ) #torch.cuda.empty_cache() if torch.cuda.is_available(): print("CUDA available") print("using gpu") pipe = pipe.to("cuda") pipe_message = "CUDA" pipe.enable_model_cpu_offload() # Use official recommended method else: print("CUDA not available") print("using cpu") pipe = pipe.to("cpu") pipe_message = "CPU" print(f"-----{pipe_message} PIPE INITIALIZED-----") print(f"Using device: {pipe.device}") except Exception as e: print(f"Detailed error: {str(e)}") return None, f"ERROR: Failed to initialize PIPE2. Details: {e}" try: print("-----SENDING IMG GEN TO PIPE-----") print("-----HOLD ON-----") # ################ LIVE PREVIEW FROM DEMO REPO PART2 #################### # seed = random.randint(0, MAX_SEED) # generator = torch.Generator().manual_seed(seed) # for img in pipe.flux_pipe_call_that_returns_an_iterable_of_images( # prompt=prompt, # guidance_scale=guidance_scale, # num_inference_steps=num_inference_steps, # width=width, # height=height, # generator=generator, # output_type="pil", # good_vae=good_vae, # ): # yield img, seed # ############################################################ # ########## LATENTS ########## # # live preview function to get the latents # # official reference guideline # def latents_to_rgb(latents): # weights = ( # (60, -60, 25, -70), # (60, -5, 15, -50), # (60, 10, -5, -35), # ) # weights_tensor = torch.t(torch.tensor(weights, dtype=latents.dtype).to(latents.device)) # biases_tensor = torch.tensor((150, 140, 130), dtype=latents.dtype).to(latents.device) # rgb_tensor = torch.einsum("...lxy,lr -> ...rxy", latents, weights_tensor) + biases_tensor.unsqueeze(-1).unsqueeze(-1) # image_array = rgb_tensor.clamp(0, 255).byte().cpu().numpy().transpose(1, 2, 0) # return Image.fromarray(image_array) # def decode_tensors(pipe, step, timestep, callback_kwargs): # latents = callback_kwargs["latents"] # image = latents_to_rgb(latents[0]) # image.save(f"{step}.png") # return callback_kwargs # ############################################################ ########## SENDING IMG GEN TO PIPE - WORKING CODE ########## image = pipe( prompt, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, width=width, height=height, max_sequence_length=512, #callback_on_step_end=decode_tensors, #callback_on_step_end_tensor_inputs=["latents"], # seed=seed ).images[0] ############################################################# print("-----IMAGE GENERATED SUCCESSFULLY!-----") print(image) except Exception as e: return f"ERROR: Failed to initialize InferenceClient. Details: {e}" try: print("-----SAVING-----") # Save the image with a timestamped filename timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"/data/images/{timestamp}_{model_alias.replace(' ', '_').lower()}_{prompt_alias.replace(' ', '_').lower()}_{team_color.lower()}.png" # Save the image using PIL's save method image.save(output_filename) print("-----DONE!-----") print("-----CALL THE BANNERS!-----") print(f"File path: {output_filename}") except Exception as e: print(f"ERROR: Failed to save image. Details: {e}") # Return the filename and success message return image