File size: 8,259 Bytes
b273838
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import os
from functools import partial

import cv2
import gradio as gr
import spaces
from util.file import generate_binary_file, load_numpy_from_binary_bitwise
import torch
import yaml
from util.basicsr_img_util import img2tensor, tensor2img
from facexlib.utils.face_restoration_helper import FaceRestoreHelper
from torchvision.transforms.functional import resize

from guided_diffusion.gaussian_diffusion import create_sampler
from guided_diffusion.swinir import SwinIR
from guided_diffusion.unet import create_model


def create_swinir_model(ckpt_path):
    cfg = {
        'in_channels': 3,
        'out_channels': 3,
        'embed_dim': 180,
        'depths': [6, 6, 6, 6, 6, 6, 6, 6],
        'num_heads': [6, 6, 6, 6, 6, 6, 6, 6],
        'resi_connection': '1conv',
        'sf': 8
    }
    mmse_model = SwinIR(
        img_size=64,
        patch_size=1,
        in_chans=cfg['in_channels'],
        num_out_ch=cfg['out_channels'],
        embed_dim=cfg['embed_dim'],
        depths=cfg['depths'],
        num_heads=cfg['num_heads'],
        window_size=8,
        mlp_ratio=2,
        sf=cfg['sf'],
        img_range=1.0,
        upsampler="nearest+conv",
        resi_connection=cfg['resi_connection'],
        unshuffle=True,
        unshuffle_scale=8
    )
    ckpt = torch.load(ckpt_path, map_location="cpu")

    if 'params_ema' in ckpt:
        mmse_model.load_state_dict(ckpt['params_ema'])
    else:
        state_dict = ckpt['state_dict']
        state_dict = {layer_name.replace('model.', ''): weights for layer_name, weights in
                      state_dict.items()}
        state_dict = {layer_name.replace('module.', ''): weights for layer_name, weights in
                      state_dict.items()}
        mmse_model.load_state_dict(state_dict)
    for param in mmse_model.parameters():
        param.requires_grad = False
    return mmse_model


ffhq_diffusion_model = "./guided_diffusion/iddpm_ffhq512_ema500000.pth"
mmse_model_ckpt = "./guided_diffusion/swinir_restoration512_L1.pth"

if not os.path.exists(ffhq_diffusion_model):
    os.system(
        "wget https://github.com/zsyOAOA/DifFace/releases/download/V1.0/iddpm_ffhq512_ema500000.pth -O ./guided_diffusion/iddpm_ffhq512_ema500000.pth"
    )
if not os.path.exists(mmse_model_ckpt):
    os.system(
        "wget https://github.com/zsyOAOA/DifFace/releases/download/V1.0/swinir_restoration512_L1.pth -O ./guided_diffusion/swinir_restoration512_L1.pth"
    )


def load_yaml(file_path: str) -> dict:
    with open(file_path) as f:
        config = yaml.load(f, Loader=yaml.FullLoader)
    return config


model_config = './guided_diffusion/ffhq512_model_config.yaml'
diffusion_config = './guided_diffusion/diffusion_config.yaml'
model_config = load_yaml(model_config)
diffusion_config = load_yaml(diffusion_config)

models = {
    'main_model': create_model(**model_config),
    'mmse_model': create_swinir_model('./guided_diffusion/swinir_restoration512_L1.pth')
}
models['main_model'].eval()
models['mmse_model'].eval()


@torch.no_grad()
@spaces.GPU(duration=80)
def generate_reconstruction(degraded_face_img, K, T, iqa_metric, iqa_coef, loaded_indices):
    assert iqa_metric in ['niqe', 'clipiqa+', 'topiq_nr-face']
    diffusion_config['timestep_respacing'] = T
    sampler = create_sampler(**diffusion_config)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = models['main_model'].to(device)
    mmse_model = models['mmse_model'].to(device)

    sample_fn = partial(sampler.p_sample_loop_blind_restoration, model=model, num_opt_noises=K,
                        eta=1.0, iqa_metric=iqa_metric, iqa_coef=iqa_coef)

    if degraded_face_img is not None:
        mmse_img = mmse_model(degraded_face_img).clip(0, 1) * 2 - 1
        x_start = torch.randn(mmse_img.shape, device=device)
    else:
        mmse_img = None
        x_start = torch.randn(1, 3, 512, 512, device=device)
    restored_face, indices = sample_fn(x_start=x_start, mmse_img=mmse_img, loaded_indices=loaded_indices)

    return restored_face, indices


def resize(img, size):
    # From https://github.com/sczhou/CodeFormer/blob/master/facelib/utils/face_restoration_helper.py
    h, w = img.shape[0:2]
    scale = size / min(h, w)
    h, w = int(h * scale), int(w * scale)
    interp = cv2.INTER_AREA if scale < 1 else cv2.INTER_LINEAR
    return cv2.resize(img, (w, h), interpolation=interp)


@torch.no_grad()
@spaces.GPU(duration=80)
def enhance_faces(img, face_helper, has_aligned, K, T, iqa_metric, iqa_coef, loaded_indices):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    face_helper.clean_all()
    if has_aligned:  # The inputs are already aligned
        img = cv2.resize(img, (512, 512), interpolation=cv2.INTER_LINEAR)
        face_helper.cropped_faces = [img]
    else:
        face_helper.read_image(img)
        face_helper.input_img = resize(face_helper.input_img, 640)
        face_helper.get_face_landmarks_5(only_center_face=False, eye_dist_threshold=5)
        face_helper.align_warp_face()
    if len(face_helper.cropped_faces) == 0:
        raise gr.Error("Could not identify any face in the image.")
    if has_aligned and len(face_helper.cropped_faces) > 1:
        raise gr.Error(
            "You marked that the input image is aligned, but multiple faces were detected."
        )
    restored_faces = []
    generated_indices = []
    for i, cropped_face in enumerate(face_helper.cropped_faces):
        cropped_face_t = img2tensor(cropped_face / 255.0, bgr2rgb=True, float32=True)
        cropped_face_t = cropped_face_t.unsqueeze(0).to(device)
        cur_loaded_indices = loaded_indices[i] if loaded_indices is not None else None

        output, indices = generate_reconstruction(
            cropped_face_t,
            K,
            T,
            iqa_metric,
            iqa_coef,
            cur_loaded_indices
        )

        restored_face = tensor2img(
            output.to(torch.float32).squeeze(0), rgb2bgr=False, min_max=(-1, 1)
        )

        restored_face = restored_face.astype("uint8")
        restored_faces.append(restored_face),
        generated_indices.append(indices)
    return restored_faces, generated_indices


@torch.no_grad()
@spaces.GPU()
def decompress_face(K, T, iqa_metric, iqa_coef, loaded_indices):
    assert loaded_indices is not None

    output, indices = generate_reconstruction(
        None,
        K,
        T,
        iqa_metric,
        iqa_coef,
        loaded_indices
    )

    restored_face = tensor2img(
        output.to(torch.float32).squeeze(0), rgb2bgr=False, min_max=(-1, 1)
    ).astype("uint8")

    return restored_face, loaded_indices

@torch.no_grad()
@spaces.GPU(duration=80)
def inference(
        img,
        T,
        K,
        iqa_metric,
        iqa_coef,
        aligned,
        bitstream=None,
        progress=gr.Progress(track_tqdm=True),
):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    iqa_metric_to_pyiqa_name = {
        'NIQE': 'niqe',
        'TOPIQ': 'topiq_nr-face',
        'CLIP-IQA': 'clipiqa+'
    }
    iqa_metric = iqa_metric_to_pyiqa_name[iqa_metric]
    indices = load_numpy_from_binary_bitwise(bitstream, K, T, 'ffhq', T)
    if indices is not None:
        indices = indices.to(device)

    if img is not None:
        img = cv2.imread(img, cv2.IMREAD_COLOR)
        h, w = img.shape[0:2]
        if h > 4500 or w > 4500:
            raise gr.Error("Image size too large.")

        face_helper = FaceRestoreHelper(
            1,
            face_size=512,
            crop_ratio=(1, 1),
            det_model="retinaface_resnet50",
            save_ext="png",
            use_parse=True,
            device=device,
            model_rootpath=None,
        )

        x, indices = enhance_faces(
            img, face_helper, aligned, K=K, T=T, iqa_metric=iqa_metric, iqa_coef=iqa_coef,
            loaded_indices=indices,
        )
    else:
        x, indices = decompress_face(
            K=K, T=T, iqa_metric=iqa_metric, iqa_coef=iqa_coef, loaded_indices=indices,
        )

    torch.cuda.empty_cache()

    if bitstream is None:
        indices = [generate_binary_file(index.numpy(), K, T, 'ffhq') for index in indices]
        return x, indices
    return x