Spaces:
Sleeping
Sleeping
import os | |
# os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' | |
# os.environ['CUDA_VISIBLE_DEVICES'] = '2' | |
# os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "caching_allocator" | |
import gradio as gr | |
import numpy as np | |
from models import make_inpainting | |
import utils | |
from transformers import MaskFormerImageProcessor, MaskFormerForInstanceSegmentation | |
from PIL import Image | |
import requests | |
from transformers import pipeline | |
import torch | |
import random | |
import io | |
import base64 | |
import json | |
from diffusers import DiffusionPipeline | |
from diffusers import StableDiffusionLatentUpscalePipeline, StableDiffusionPipeline | |
from diffusers import StableDiffusionUpscalePipeline | |
from diffusers import LDMSuperResolutionPipeline | |
import cv2 | |
import onnxruntime | |
# import xformers | |
# from xformers.ops import MemoryEfficientAttentionFlashAttentionOp | |
def removeFurniture(input_img1, | |
input_img2, | |
positive_prompt, | |
negative_prompt, | |
num_of_images, | |
resolution | |
): | |
print("removeFurniture") | |
HEIGHT = resolution | |
WIDTH = resolution | |
input_img1 = input_img1.resize((resolution, resolution)) | |
input_img2 = input_img2.resize((resolution, resolution)) | |
canvas_mask = np.array(input_img2) | |
mask = utils.get_mask(canvas_mask) | |
print(input_img1, mask, positive_prompt, negative_prompt) | |
retList= make_inpainting(positive_prompt=positive_prompt, | |
image=input_img1, | |
mask_image=mask, | |
negative_prompt=negative_prompt, | |
num_of_images=num_of_images, | |
resolution=resolution | |
) | |
# add the rest up to 10 | |
while (len(retList)<10): | |
retList.append(None) | |
return retList | |
def imageToString(img): | |
output = io.BytesIO() | |
img.save(output, format="png") | |
return output.getvalue() | |
def segmentation(img): | |
print("segmentation") | |
# semantic_segmentation = pipeline("image-segmentation", "nvidia/segformer-b1-finetuned-cityscapes-1024-1024") | |
pipe = pipeline("image-segmentation", "facebook/maskformer-swin-large-ade") | |
results = pipe(img) | |
for p in results: | |
p['mask'] = utils.image_to_byte_array(p['mask']) | |
p['mask'] = base64.b64encode(p['mask']).decode("utf-8") | |
#print(results) | |
return json.dumps(results) | |
def upscale1(image, prompt): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
print("upscale1", device, image, prompt) | |
# image.thumbnail((512, 512)) | |
# print("resize",image) | |
torch.backends.cuda.matmul.allow_tf32 = True | |
pipe = StableDiffusionUpscalePipeline.from_pretrained("stabilityai/stable-diffusion-x4-upscaler", | |
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
use_safetensors=True) | |
# pipe = StableDiffusionLatentUpscalePipeline.from_pretrained("stabilityai/sd-x2-latent-upscaler", torch_dtype=torch.float16) | |
pipe = pipe.to(device) | |
pipe.enable_attention_slicing() | |
pipe.enable_xformers_memory_efficient_attention() | |
# pipe.enable_xformers_memory_efficient_attention(attention_op=xformers.ops.MemoryEfficientAttentionFlashAttentionOp) | |
# Workaround for not accepting attention shape using VAE for Flash Attention | |
pipe.vae.enable_xformers_memory_efficient_attention() | |
ret = pipe(prompt=prompt, | |
image=image, | |
num_inference_steps=10, | |
guidance_scale=0) | |
print("ret",ret) | |
upscaled_image = ret.images[0] | |
print("up",upscaled_image) | |
return upscaled_image | |
def upscale2(image, prompt): | |
print("upscale2",image,prompt) | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
print("device",device) | |
pipe = LDMSuperResolutionPipeline.from_pretrained("CompVis/ldm-super-resolution-4x-openimages", torch_dtype=torch.float16) | |
pipe = pipe.to(device) | |
pipe.enable_attention_slicing() | |
pipe.enable_xformers_memory_efficient_attention(attention_op=xformers.ops.MemoryEfficientAttentionFlashAttentionOp) | |
# Workaround for not accepting attention shape using VAE for Flash Attention | |
pipe.vae.enable_xformers_memory_efficient_attention(attention_op=None) | |
upscaled_image = pipe(image, num_inference_steps=10, eta=1).images[0] | |
return upscaled_image | |
def convert_pil_to_cv2(image): | |
# pil_image = image.convert("RGB") | |
open_cv_image = np.array(image) | |
# RGB to BGR | |
open_cv_image = open_cv_image[:, :, ::-1].copy() | |
return open_cv_image | |
def inference(model_path: str, img_array: np.array) -> np.array: | |
options = onnxruntime.SessionOptions() | |
options.intra_op_num_threads = 1 | |
options.inter_op_num_threads = 1 | |
ort_session = onnxruntime.InferenceSession(model_path, options) | |
ort_inputs = {ort_session.get_inputs()[0].name: img_array} | |
ort_outs = ort_session.run(None, ort_inputs) | |
return ort_outs[0] | |
def post_process(img: np.array) -> np.array: | |
# 1, C, H, W -> C, H, W | |
img = np.squeeze(img) | |
# C, H, W -> H, W, C | |
img = np.transpose(img, (1, 2, 0))[:, :, ::-1].astype(np.uint8) | |
return img | |
def pre_process(img: np.array) -> np.array: | |
# H, W, C -> C, H, W | |
img = np.transpose(img[:, :, 0:3], (2, 0, 1)) | |
# C, H, W -> 1, C, H, W | |
img = np.expand_dims(img, axis=0).astype(np.float32) | |
return img | |
def upscale3(image): | |
print("upscale3",image) | |
model_path = f"up_models/modelx4.ort" | |
img = convert_pil_to_cv2(image) | |
# if img.ndim == 2: | |
# print("upscale3","img.ndim == 2") | |
# img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) | |
# if img.shape[2] == 4: | |
# print("upscale3","img.shape[2] == 4") | |
# alpha = img[:, :, 3] # GRAY | |
# alpha = cv2.cvtColor(alpha, cv2.COLOR_GRAY2BGR) # BGR | |
# alpha_output = post_process(inference(model_path, pre_process(alpha))) # BGR | |
# alpha_output = cv2.cvtColor(alpha_output, cv2.COLOR_BGR2GRAY) # GRAY | |
# img = img[:, :, 0:3] # BGR | |
# image_output = post_process(inference(model_path, pre_process(img))) # BGR | |
# image_output = cv2.cvtColor(image_output, cv2.COLOR_BGR2BGRA) # BGRA | |
# image_output[:, :, 3] = alpha_output | |
# print("upscale3","img.shape[2] == 3") | |
image_output = post_process(inference(model_path, pre_process(img))) # BGR | |
return image_output | |
def split_image(im, rows, cols, should_square, should_quiet=False): | |
im_width, im_height = im.size | |
row_width = int(im_width / cols) | |
row_height = int(im_height / rows) | |
name = "image" | |
ext = ".png" | |
name = os.path.basename(name) | |
images = [] | |
if should_square: | |
min_dimension = min(im_width, im_height) | |
max_dimension = max(im_width, im_height) | |
if not should_quiet: | |
print("Resizing image to a square...") | |
print("Determining background color...") | |
bg_color = split.determine_bg_color(im) | |
if not should_quiet: | |
print("Background color is... " + str(bg_color)) | |
im_r = Image.new("RGBA" if ext == "png" else "RGB", | |
(max_dimension, max_dimension), bg_color) | |
offset = int((max_dimension - min_dimension) / 2) | |
if im_width > im_height: | |
im_r.paste(im, (0, offset)) | |
else: | |
im_r.paste(im, (offset, 0)) | |
im = im_r | |
row_width = int(max_dimension / cols) | |
row_height = int(max_dimension / rows) | |
n = 0 | |
for i in range(0, rows): | |
for j in range(0, cols): | |
box = (j * row_width, i * row_height, j * row_width + | |
row_width, i * row_height + row_height) | |
outp = im.crop(box) | |
outp_path = name + "_" + str(n) + ext | |
if not should_quiet: | |
print("Exporting image tile: " + outp_path) | |
images.append(outp) | |
n += 1 | |
return [img for img in images] | |
def upscale_image(img, rows, cols, seed, prompt, negative_prompt, xformers, cpu_offload, attention_slicing, enable_custom_sliders=False, guidance=7, iterations=50): | |
model_id = "stabilityai/stable-diffusion-x4-upscaler" | |
try: | |
pipeline = StableDiffusionUpscalePipeline.from_pretrained(model_id, torch_dtype=torch.float16) | |
except: | |
pipeline = StableDiffusionUpscalePipeline.from_pretrained(model_id, torch_dtype=torch.float16, local_files_only=True) | |
pipeline = pipeline.to("cuda") | |
if xformers: | |
pipeline.enable_xformers_memory_efficient_attention() | |
else: | |
pipeline.disable_xformers_memory_efficient_attention() | |
if cpu_offload: | |
try: | |
pipeline.enable_sequential_cpu_offload() | |
except: | |
pass | |
if attention_slicing: | |
pipeline.enable_attention_slicing() | |
else: | |
pipeline.disable_attention_slicing() | |
img = Image.fromarray(img) | |
# load model and scheduler | |
if seed==-1: | |
generator = torch.manual_seed(random.randint(0, 9999999)) | |
else: | |
generator = torch.manual_seed(seed) | |
original_width, original_height = img.size | |
max_dimension = max(original_width, original_height) | |
tiles = split_image(img, rows, cols, True, False) | |
ups_tiles = [] | |
i = 0 | |
for x in tiles: | |
i=i+1 | |
if enable_custom_sliders: | |
ups_tile = pipeline(prompt=prompt,negative_prompt=negative_prompt,guidance_scale=guidance, num_inference_steps=iterations, image=x.convert("RGB"),generator=generator).images[0] | |
else: | |
ups_tile = pipeline(prompt=prompt,negative_prompt=negative_prompt, image=x.convert("RGB"),generator=generator).images[0] | |
ups_tiles.append(ups_tile) | |
# Determine the size of the merged upscaled image | |
total_width = 0 | |
total_height = 0 | |
side = 0 | |
for ups_tile in ups_tiles: | |
side = ups_tile.width | |
break | |
for x in tiles: | |
tsize = x.width | |
break | |
ups_times = abs(side/tsize) | |
new_size = (max_dimension * ups_times, max_dimension * ups_times) | |
total_width = cols*side | |
total_height = rows*side | |
# Create a blank image with the calculated size | |
merged_image = Image.new("RGB", (total_width, total_height)) | |
# Paste each upscaled tile into the blank image | |
current_width = 0 | |
current_height = 0 | |
maximum_width = cols*side | |
for ups_tile in ups_tiles: | |
merged_image.paste(ups_tile, (current_width, current_height)) | |
current_width += ups_tile.width | |
if current_width>=maximum_width: | |
current_width = 0 | |
current_height = current_height+side | |
# Using the center of the image as pivot, crop the image to the original dimension times four | |
crop_left = (new_size[0] - original_width * ups_times) // 2 | |
crop_upper = (new_size[1] - original_height * ups_times) // 2 | |
crop_right = crop_left + original_width * ups_times | |
crop_lower = crop_upper + original_height * ups_times | |
final_img = merged_image.crop((crop_left, crop_upper, crop_right, crop_lower)) | |
# The resulting image should be identical to the original image in proportions / aspect ratio, with no loss of elements. | |
# Save the merged image | |
return final_img | |
def upscale(mode, image, prompt): | |
print("upscale", mode, image, prompt) | |
# return upscale1(image, prompt) | |
return upscale_image(image,rows=3,cols=3,seed=-1,prompt=prompt,negative_prompt="jpeg artifacts, lowres, bad quality, watermark",xformers=True,cpu_offload=True,attention_slicing=True,iterations=10) | |
modes = { | |
'1': '1', | |
'img2img': 'Image to Image', | |
'inpaint': 'Inpainting', | |
'upscale4x': 'Upscale 4x', | |
} | |
with gr.Blocks() as app: | |
gr.HTML( | |
f""" | |
Running on <b>{"GPU 🔥" if torch.cuda.is_available() else "CPU 🥶"}</b> | |
</div> | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Button("FurnituRemove").click(removeFurniture, | |
inputs=[gr.Image(label="img", type="pil"), | |
gr.Image(label="mask", type="pil"), | |
gr.Textbox(label="positive_prompt",value="empty room"), | |
gr.Textbox(label="negative_prompt",value=""), | |
gr.Number(label="num_of_images",value=2), | |
gr.Number(label="resolution",value=512) | |
], | |
outputs=[ | |
gr.Image(), | |
gr.Image(), | |
gr.Image(), | |
gr.Image(), | |
gr.Image(), | |
gr.Image(), | |
gr.Image(), | |
gr.Image(), | |
gr.Image(), | |
gr.Image()]) | |
with gr.Column(): | |
gr.Button("Segmentation").click(segmentation, inputs=gr.Image(type="pil"), outputs=gr.JSON()) | |
with gr.Column(): | |
gr.Button("Upscale").click( | |
upscale, | |
inputs=[ | |
gr.Radio(label="Mode", choices=list(modes.values())[:4], value=modes['txt2img']), | |
gr.Image(type="pil"), | |
gr.Textbox(label="prompt",value="empty room") | |
], | |
outputs=gr.Image()) | |
# with gr.Row(): | |
# with gr.Column(scale=55): | |
# with gr.Group(): | |
# with gr.Row(): | |
# prompt = gr.Textbox(label="Prompt", show_label=False, max_lines=2,placeholder=f"Enter prompt") | |
# generate = gr.Button(value="Generate") | |
# gallery = gr.Gallery(label="Generated images", show_label=False) | |
# state_info = gr.Textbox(label="State", show_label=False, max_lines=2) | |
# error_output = gr.Markdown(visible=False) | |
# with gr.Column(scale=45): | |
# inf_mode = gr.Radio(label="Inference Mode", choices=list(modes.values())[:4], value=modes['txt2img']) # TODO remove [:3] limit | |
# with gr.Group(visible=False) as i2i_options: | |
# image = gr.Image(label="Image", height=128, type="pil") | |
# inpaint_info = gr.Markdown("Inpainting resizes and pads images to 512x512", visible=False) | |
# upscale_info = gr.Markdown("""Best for small images (128x128 or smaller).<br> | |
# Bigger images will be sliced into 128x128 tiles which will be upscaled individually.<br> | |
# This is done to avoid running out of GPU memory.""", visible=False) | |
# strength = gr.Slider(label="Transformation strength", minimum=0, maximum=1, step=0.01, value=0.5) | |
# with gr.Group(): | |
# neg_prompt = gr.Textbox(label="Negative prompt", placeholder="What to exclude from the image") | |
# n_images = gr.Slider(label="Number of images", value=1, minimum=1, maximum=4, step=1) | |
# with gr.Row(): | |
# guidance = gr.Slider(label="Guidance scale", value=7.5, maximum=15) | |
# steps = gr.Slider(label="Steps", value=current_steps, minimum=2, maximum=100, step=1) | |
# with gr.Row(): | |
# width = gr.Slider(label="Width", value=768, minimum=64, maximum=1024, step=8) | |
# height = gr.Slider(label="Height", value=768, minimum=64, maximum=1024, step=8) | |
# seed = gr.Slider(0, 2147483647, label='Seed (0 = random)', value=0, step=1) | |
# with gr.Accordion("Memory optimization"): | |
# attn_slicing = gr.Checkbox(label="Attention slicing (a bit slower, but uses less memory)", value=attn_slicing_enabled) | |
# # mem_eff_attn = gr.Checkbox(label="Memory efficient attention (xformers)", value=mem_eff_attn_enabled) | |
# inf_mode.change(on_mode_change, inputs=[inf_mode], outputs=[i2i_options, inpaint_info, upscale_info, strength], queue=False) | |
# steps.change(on_steps_change, inputs=[steps], outputs=[], queue=False) | |
# attn_slicing.change(lambda x: switch_attention_slicing(x), inputs=[attn_slicing], queue=False) | |
# # mem_eff_attn.change(lambda x: switch_mem_eff_attn(x), inputs=[mem_eff_attn], queue=False) | |
# inputs = [inf_mode, prompt, n_images, guidance, steps, width, height, seed, image, strength, neg_prompt] | |
# outputs = [gallery, error_output] | |
# prompt.submit(inference, inputs=inputs, outputs=outputs) | |
# generate.click(inference, inputs=inputs, outputs=outputs) | |
# app.load(update_state_info, inputs=state_info, outputs=state_info, every=0.5, show_progress=False) | |
app.queue() | |
app.launch(debug=True,share=True, height=768) | |
# UP 1 |