File size: 2,927 Bytes
c7628b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from typing import Dict, List, Any
import torch
from base64 import b64decode
from diffusers import AutoencoderKL
from diffusers.image_processor import VaeImageProcessor


class EndpointHandler:
    def __init__(self, path=""):
        self.device = "cuda"
        self.dtype = torch.bfloat16
        self.vae = (
            AutoencoderKL.from_pretrained(path, torch_dtype=self.dtype)
            .to(self.device, self.dtype)
            .eval()
        )

        self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)

    @staticmethod
    def _unpack_latents(latents, height, width, vae_scale_factor):
        batch_size, num_patches, channels = latents.shape

        # VAE applies 8x compression on images but we must also account for packing which requires
        # latent height and width to be divisible by 2.
        height = 2 * (int(height) // (vae_scale_factor * 2))
        width = 2 * (int(width) // (vae_scale_factor * 2))

        latents = latents.view(batch_size, height // 2, width // 2, channels // 4, 2, 2)
        latents = latents.permute(0, 3, 1, 4, 2, 5)

        latents = latents.reshape(batch_size, channels // (2 * 2), height, width)

        return latents

    @torch.no_grad()
    def __call__(self, data: Any) -> List[List[Dict[str, float]]]:
        """
        Args:
            data (:obj:):
                includes the input data and the parameters for the inference.
        """
        tensor = data["inputs"]
        tensor = b64decode(tensor.encode("utf-8"))
        parameters = data.get("parameters", {})
        if "shape" not in parameters:
            raise ValueError("Expected `shape` in parameters.")
        if "dtype" not in parameters:
            raise ValueError("Expected `dtype` in parameters.")
        if "height" not in parameters:
            raise ValueError("Expected `height` in parameters.")
        if "width" not in parameters:
            raise ValueError("Expected `width` in parameters.")

        DTYPE_MAP = {
            "float16": torch.float16,
            "float32": torch.float32,
            "bfloat16": torch.bfloat16,
        }

        shape = parameters.get("shape")
        dtype = DTYPE_MAP.get(parameters.get("dtype"))
        height = parameters.get("height")
        width = parameters.get("width")

        tensor = torch.frombuffer(bytearray(tensor), dtype=dtype).reshape(shape)

        tensor = tensor.to(self.device, self.dtype)

        tensor = self._unpack_latents(tensor, height, width, self.vae_scale_factor)
        tensor = (
            tensor / self.vae.config.scaling_factor
        ) + self.vae.config.shift_factor

        with torch.no_grad():
            image = self.vae.decode(tensor, return_dict=False)[0]

        image = self.image_processor.postprocess(image, output_type="pil")

        return image[0]