import pdb import threading import os import time import numpy as np import onnxruntime import torch from torch.cuda import nvtx from collections import OrderedDict import platform import spaces try: import tensorrt as trt import ctypes except ModuleNotFoundError: print("No TensorRT Found") numpy_to_torch_dtype_dict = { np.uint8: torch.uint8, np.int8: torch.int8, np.int16: torch.int16, np.int32: torch.int32, np.int64: torch.int64, np.float16: torch.float16, np.float32: torch.float32, np.float64: torch.float64, np.complex64: torch.complex64, np.complex128: torch.complex128, } if np.version.full_version >= "1.24.0": numpy_to_torch_dtype_dict[np.bool_] = torch.bool else: numpy_to_torch_dtype_dict[np.bool] = torch.bool class TensorRTPredictor: """ Implements inference for the EfficientDet TensorRT engine. """ @spaces.GPU def __init__(self, **kwargs): """ :param engine_path: The path to the serialized engine to load from disk. """ if platform.system().lower() == 'linux': ctypes.CDLL("./difpoint/checkpoints/liveportrait_onnx/libgrid_sample_3d_plugin.so", mode=ctypes.RTLD_GLOBAL) else: ctypes.CDLL("./difpoint/checkpoints/liveportrait_onnx/grid_sample_3d_plugin.dll", mode=ctypes.RTLD_GLOBAL) # Load TRT engine self.logger = trt.Logger(trt.Logger.VERBOSE) trt.init_libnvinfer_plugins(self.logger, "") engine_path = os.path.abspath(kwargs.get("model_path", None)) print('engine_path', engine_path) self.debug = kwargs.get("debug", False) assert engine_path, f"model:{engine_path} must exist!" print(f"loading trt model:{engine_path}") with open(engine_path, "rb") as f, trt.Runtime(self.logger) as runtime: assert runtime self.engine = runtime.deserialize_cuda_engine(f.read()) print('self.engine', self.engine) assert self.engine self.context = self.engine.create_execution_context() assert self.context # Setup I/O bindings self.inputs = [] self.outputs = [] self.tensors = OrderedDict() # TODO: 支持动态shape输入 for idx in range(self.engine.num_io_tensors): name = self.engine[idx] is_input = self.engine.get_tensor_mode(name).name == "INPUT" shape = self.engine.get_tensor_shape(name) dtype = trt.nptype(self.engine.get_tensor_dtype(name)) binding = { "index": idx, "name": name, "dtype": dtype, "shape": list(shape) } if is_input: self.inputs.append(binding) else: self.outputs.append(binding) assert len(self.inputs) > 0 assert len(self.outputs) > 0 self.allocate_max_buffers() def allocate_max_buffers(self, device="cuda"): nvtx.range_push("allocate_max_buffers") # 目前仅支持 batch 维度的动态处理 batch_size = 1 for idx in range(self.engine.num_io_tensors): binding = self.engine[idx] shape = self.engine.get_tensor_shape(binding) is_input = self.engine.get_tensor_mode(binding).name == "INPUT" if -1 in shape: if is_input: shape = self.engine.get_tensor_profile_shape(binding, 0)[-1] batch_size = shape[0] else: shape[0] = batch_size dtype = trt.nptype(self.engine.get_tensor_dtype(binding)) tensor = torch.empty( tuple(shape), dtype=numpy_to_torch_dtype_dict[dtype] ).to(device=device) self.tensors[binding] = tensor nvtx.range_pop() def input_spec(self): """ Get the specs for the input tensor of the network. Useful to prepare memory allocations. :return: Two items, the shape of the input tensor and its (numpy) datatype. """ specs = [] for i, o in enumerate(self.inputs): specs.append((o["name"], o['shape'], o['dtype'])) if self.debug: print(f"trt input {i} -> {o['name']} -> {o['shape']}") return specs def output_spec(self): """ Get the specs for the output tensors of the network. Useful to prepare memory allocations. :return: A list with two items per element, the shape and (numpy) datatype of each output tensor. """ specs = [] for i, o in enumerate(self.outputs): specs.append((o["name"], o['shape'], o['dtype'])) if self.debug: print(f"trt output {i} -> {o['name']} -> {o['shape']}") return specs def adjust_buffer(self, feed_dict): nvtx.range_push("adjust_buffer") for name, buf in feed_dict.items(): input_tensor = self.tensors[name] current_shape = list(buf.shape) slices = tuple(slice(0, dim) for dim in current_shape) input_tensor[slices].copy_(buf) self.context.set_input_shape(name, current_shape) nvtx.range_pop() def predict(self, feed_dict, stream): """ Execute inference on a batch of images. :param data: A list of inputs as numpy arrays. :return A list of outputs as numpy arrays. """ nvtx.range_push("set_tensors") self.adjust_buffer(feed_dict) for name, tensor in self.tensors.items(): self.context.set_tensor_address(name, tensor.data_ptr()) nvtx.range_pop() nvtx.range_push("execute") noerror = self.context.execute_async_v3(stream) if not noerror: raise ValueError("ERROR: inference failed.") nvtx.range_pop() return self.tensors def __del__(self): del self.engine del self.context del self.inputs del self.outputs del self.tensors class OnnxRuntimePredictor: """ OnnxRuntime Prediction """ def __init__(self, **kwargs): model_path = kwargs.get("model_path", "") # 用模型路径区分是否是一样的实例 assert os.path.exists(model_path), "model path must exist!" # print("loading ort model:{}".format(model_path)) self.debug = kwargs.get("debug", False) providers = ['CUDAExecutionProvider', 'CoreMLExecutionProvider', 'CPUExecutionProvider'] print(f"OnnxRuntime use {providers}") opts = onnxruntime.SessionOptions() # opts.inter_op_num_threads = kwargs.get("num_threads", 4) # opts.intra_op_num_threads = kwargs.get("num_threads", 4) # opts.log_severity_level = 3 self.onnx_model = onnxruntime.InferenceSession(model_path, providers=providers, sess_options=opts) self.inputs = self.onnx_model.get_inputs() self.outputs = self.onnx_model.get_outputs() def input_spec(self): """ Get the specs for the input tensor of the network. Useful to prepare memory allocations. :return: Two items, the shape of the input tensor and its (numpy) datatype. """ specs = [] for i, o in enumerate(self.inputs): specs.append((o.name, o.shape, o.type)) if self.debug: print(f"ort {i} -> {o.name} -> {o.shape}") return specs def output_spec(self): """ Get the specs for the output tensors of the network. Useful to prepare memory allocations. :return: A list with two items per element, the shape and (numpy) datatype of each output tensor. """ specs = [] for i, o in enumerate(self.outputs): specs.append((o.name, o.shape, o.type)) if self.debug: print(f"ort output {i} -> {o.name} -> {o.shape}") return specs def predict(self, *data): input_feeds = {} for i in range(len(data)): if self.inputs[i].type == 'tensor(float16)': input_feeds[self.inputs[i].name] = data[i].astype(np.float16) else: input_feeds[self.inputs[i].name] = data[i].astype(np.float32) results = self.onnx_model.run(None, input_feeds) return results def __del__(self): del self.onnx_model self.onnx_model = None class OnnxRuntimePredictorSingleton(OnnxRuntimePredictor): """ 单例模式,防止模型被加载多次 """ _instance_lock = threading.Lock() _instance = {} def __new__(cls, *args, **kwargs): model_path = kwargs.get("model_path", "") # 用模型路径区分是否是一样的实例 assert os.path.exists(model_path), "model path must exist!" # 单例模式,避免重复加载模型 with OnnxRuntimePredictorSingleton._instance_lock: if model_path not in OnnxRuntimePredictorSingleton._instance or \ OnnxRuntimePredictorSingleton._instance[model_path].onnx_model is None: OnnxRuntimePredictorSingleton._instance[model_path] = OnnxRuntimePredictor(**kwargs) return OnnxRuntimePredictorSingleton._instance[model_path] def get_predictor(**kwargs): predict_type = kwargs.get("predict_type", "trt") if predict_type == "ort": return OnnxRuntimePredictorSingleton(**kwargs) elif predict_type == "trt": return TensorRTPredictor(**kwargs) else: raise NotImplementedError