# Copyright (c) Tencent Inc. All rights reserved.
import os
import argparse
import os.path as osp
from functools import partial
from io import BytesIO

import onnx
import onnxsim
import torch
import gradio as gr
import numpy as np
from PIL import Image
from torchvision.ops import nms
from mmengine.config import Config, ConfigDict, DictAction
from mmengine.runner import Runner
from mmengine.runner.amp import autocast
from mmengine.dataset import Compose
from mmdet.visualization import DetLocalVisualizer
from mmdet.datasets import CocoDataset
from mmyolo.registry import RUNNERS

from yolo_world.easydeploy.model import DeployModel, MMYOLOBackend


def parse_args():
    parser = argparse.ArgumentParser(
        description='YOLO-World Demo')
    parser.add_argument('config', help='test config file path')
    parser.add_argument('checkpoint', help='checkpoint file')
    parser.add_argument(
        '--work-dir',
        help='the directory to save the file containing evaluation metrics')
    parser.add_argument(
        '--cfg-options',
        nargs='+',
        action=DictAction,
        help='override some settings in the used config, the key-value pair '
        'in xxx=yyy format will be merged into config file. If the value to '
        'be overwritten is a list, it should be like key="[a,b]" or key=a,b '
        'It also allows nested list/tuple values, e.g. key="[(a,b),(c,d)]" '
        'Note that the quotation marks are necessary and that no white space '
        'is allowed.')
    args = parser.parse_args()
    return args


def run_image(runner,
              image,
              text,
              max_num_boxes,
              score_thr,
              nms_thr,
              image_path='./work_dirs/demo.png'):
    image.save(image_path)
    texts = [[t.strip()] for t in text.split(',')] + [[' ']]
    data_info = dict(img_id=0, img_path=image_path, texts=texts)
    data_info = runner.pipeline(data_info)
    data_batch = dict(inputs=data_info['inputs'].unsqueeze(0),
                      data_samples=[data_info['data_samples']])

    with autocast(enabled=False), torch.no_grad():
        output = runner.model.test_step(data_batch)[0]
        pred_instances = output.pred_instances

    keep_idxs = nms(pred_instances.bboxes,
                    pred_instances.scores,
                    iou_threshold=nms_thr)
    pred_instances = pred_instances[keep_idxs]
    pred_instances = pred_instances[
        pred_instances.scores.float() > score_thr]
    if len(pred_instances.scores) > max_num_boxes:
        indices = pred_instances.scores.float().topk(max_num_boxes)[1]
        pred_instances = pred_instances[indices]
    output.pred_instances = pred_instances

    image = np.array(image)
    visualizer = DetLocalVisualizer()
    visualizer.dataset_meta['classes'] = [t[0] for t in texts]
    visualizer.add_datasample('image',
                              np.array(image),
                              output,
                              draw_gt=False,
                              out_file=image_path,
                              pred_score_thr=score_thr)
    image = Image.open(image_path)
    return image


def export_model(runner,
                 checkpoint,
                 text,
                 max_num_boxes,
                 score_thr,
                 nms_thr):
    backend = MMYOLOBackend.ONNXRUNTIME
    postprocess_cfg = ConfigDict(
        pre_top_k=10 * max_num_boxes,
        keep_top_k=max_num_boxes,
        iou_threshold=nms_thr,
        score_threshold=score_thr)

    base_model = runner.model
    texts = [[t.strip() for t in text.split(',')] + [' ']]
    base_model.reparameterize(texts)
    deploy_model = DeployModel(
        baseModel=base_model,
        backend=backend,
        postprocess_cfg=postprocess_cfg)
    deploy_model.eval()

    device = (next(iter(base_model.parameters()))).device
    fake_input = torch.ones([1, 3, 640, 640], device=device)
    # dry run
    deploy_model(fake_input)

    save_onnx_path = os.path.join(
        args.work_dir,
        os.path.basename(args.checkpoint).replace('pth', 'onnx'))
    # export onnx
    with BytesIO() as f:
        output_names = ['num_dets', 'boxes', 'scores', 'labels']
        torch.onnx.export(
            deploy_model,
            fake_input,
            f,
            input_names=['images'],
            output_names=output_names,
            opset_version=12)
        f.seek(0)
        onnx_model = onnx.load(f)
        onnx.checker.check_model(onnx_model)
    onnx_model, check = onnxsim.simplify(onnx_model)
    onnx.save(onnx_model, save_onnx_path)
    return gr.update(visible=True), save_onnx_path


def demo(runner, args):
    with gr.Blocks(title="YOLO-World") as demo:
        with gr.Row():
            gr.Markdown('<h1><center>YOLO-World: Real-Time Open-Vocabulary '
                        'Object Detector</center></h1>')
        with gr.Row():
            with gr.Column(scale=0.3):
                with gr.Row():
                    image = gr.Image(type='pil', label='input image')
                input_text = gr.Textbox(
                    lines=7,
                    label='Enter the classes to be detected, '
                          'separated by comma',
                    value=', '.join(CocoDataset.METAINFO['classes']),
                    elem_id='textbox')
                with gr.Row():
                    submit = gr.Button('Submit')
                    clear = gr.Button('Clear')
                with gr.Row():
                    export = gr.Button('Deploy and Export ONNX Model')
                out_download = gr.File(lines=1, visible=False)
                max_num_boxes = gr.Slider(
                    minimum=1,
                    maximum=300,
                    value=100,
                    step=1,
                    interactive=True,
                    label='Maximum Number Boxes')
                score_thr = gr.Slider(
                    minimum=0,
                    maximum=1,
                    value=0.05,
                    step=0.001,
                    interactive=True,
                    label='Score Threshold')
                nms_thr = gr.Slider(
                    minimum=0,
                    maximum=1,
                    value=0.7,
                    step=0.001,
                    interactive=True,
                    label='NMS Threshold')
            with gr.Column(scale=0.7):
                output_image = gr.Image(
                    lines=20,
                    type='pil',
                    label='output image')

        submit.click(partial(run_image, runner),
                     [image, input_text, max_num_boxes,
                      score_thr, nms_thr],
                     [output_image])
        clear.click(lambda: [[], '', ''], None,
                    [image, input_text, output_image])
        export.click(partial(export_model, runner, args.checkpoint),
                     [input_text, max_num_boxes, score_thr, nms_thr],
                     [out_download, out_download])
        demo.launch(server_name='0.0.0.0')


if __name__ == '__main__':
    args = parse_args()

    # load config
    cfg = Config.fromfile(args.config)
    if args.cfg_options is not None:
        cfg.merge_from_dict(args.cfg_options)

    if args.work_dir is not None:
        cfg.work_dir = args.work_dir
    elif cfg.get('work_dir', None) is None:
        cfg.work_dir = osp.join('./work_dirs',
                                osp.splitext(osp.basename(args.config))[0])

    cfg.load_from = args.checkpoint

    if 'runner_type' not in cfg:
        runner = Runner.from_cfg(cfg)
    else:
        runner = RUNNERS.build(cfg)

    runner.call_hook('before_run')
    runner.load_or_resume()
    pipeline = cfg.test_dataloader.dataset.pipeline
    runner.pipeline = Compose(pipeline)
    runner.model.eval()
    demo(runner, args)