Spaces:
Running
on
Zero
Running
on
Zero
| import pdb | |
| import threading | |
| import os | |
| import time | |
| import numpy as np | |
| import onnxruntime | |
| import torch | |
| from torch.cuda import nvtx | |
| from collections import OrderedDict | |
| import platform | |
| import spaces | |
| try: | |
| import tensorrt as trt | |
| import ctypes | |
| except ModuleNotFoundError: | |
| print("No TensorRT Found") | |
| numpy_to_torch_dtype_dict = { | |
| np.uint8: torch.uint8, | |
| np.int8: torch.int8, | |
| np.int16: torch.int16, | |
| np.int32: torch.int32, | |
| np.int64: torch.int64, | |
| np.float16: torch.float16, | |
| np.float32: torch.float32, | |
| np.float64: torch.float64, | |
| np.complex64: torch.complex64, | |
| np.complex128: torch.complex128, | |
| } | |
| if np.version.full_version >= "1.24.0": | |
| numpy_to_torch_dtype_dict[np.bool_] = torch.bool | |
| else: | |
| numpy_to_torch_dtype_dict[np.bool] = torch.bool | |
| class TensorRTPredictor: | |
| """ | |
| Implements inference for the EfficientDet TensorRT engine. | |
| """ | |
| def __init__(self, **kwargs): | |
| """ | |
| :param engine_path: The path to the serialized engine to load from disk. | |
| """ | |
| if platform.system().lower() == 'linux': | |
| ctypes.CDLL("./difpoint/checkpoints/liveportrait_onnx/libgrid_sample_3d_plugin.so", mode=ctypes.RTLD_GLOBAL) | |
| else: | |
| ctypes.CDLL("./difpoint/checkpoints/liveportrait_onnx/grid_sample_3d_plugin.dll", mode=ctypes.RTLD_GLOBAL) | |
| # Load TRT engine | |
| self.logger = trt.Logger(trt.Logger.VERBOSE) | |
| trt.init_libnvinfer_plugins(self.logger, "") | |
| engine_path = os.path.abspath(kwargs.get("model_path", None)) | |
| print('engine_path', engine_path) | |
| self.debug = kwargs.get("debug", False) | |
| assert engine_path, f"model:{engine_path} must exist!" | |
| print(f"loading trt model:{engine_path}") | |
| with open(engine_path, "rb") as f, trt.Runtime(self.logger) as runtime: | |
| assert runtime | |
| self.engine = runtime.deserialize_cuda_engine(f.read()) | |
| print('self.engine', self.engine) | |
| assert self.engine | |
| self.context = self.engine.create_execution_context() | |
| assert self.context | |
| # Setup I/O bindings | |
| self.inputs = [] | |
| self.outputs = [] | |
| self.tensors = OrderedDict() | |
| # TODO: 支持动态shape输入 | |
| for idx in range(self.engine.num_io_tensors): | |
| name = self.engine[idx] | |
| is_input = self.engine.get_tensor_mode(name).name == "INPUT" | |
| shape = self.engine.get_tensor_shape(name) | |
| dtype = trt.nptype(self.engine.get_tensor_dtype(name)) | |
| binding = { | |
| "index": idx, | |
| "name": name, | |
| "dtype": dtype, | |
| "shape": list(shape) | |
| } | |
| if is_input: | |
| self.inputs.append(binding) | |
| else: | |
| self.outputs.append(binding) | |
| assert len(self.inputs) > 0 | |
| assert len(self.outputs) > 0 | |
| self.allocate_max_buffers() | |
| def allocate_max_buffers(self, device="cuda"): | |
| nvtx.range_push("allocate_max_buffers") | |
| # 目前仅支持 batch 维度的动态处理 | |
| batch_size = 1 | |
| for idx in range(self.engine.num_io_tensors): | |
| binding = self.engine[idx] | |
| shape = self.engine.get_tensor_shape(binding) | |
| is_input = self.engine.get_tensor_mode(binding).name == "INPUT" | |
| if -1 in shape: | |
| if is_input: | |
| shape = self.engine.get_tensor_profile_shape(binding, 0)[-1] | |
| batch_size = shape[0] | |
| else: | |
| shape[0] = batch_size | |
| dtype = trt.nptype(self.engine.get_tensor_dtype(binding)) | |
| tensor = torch.empty( | |
| tuple(shape), dtype=numpy_to_torch_dtype_dict[dtype] | |
| ).to(device=device) | |
| self.tensors[binding] = tensor | |
| nvtx.range_pop() | |
| def input_spec(self): | |
| """ | |
| Get the specs for the input tensor of the network. Useful to prepare memory allocations. | |
| :return: Two items, the shape of the input tensor and its (numpy) datatype. | |
| """ | |
| specs = [] | |
| for i, o in enumerate(self.inputs): | |
| specs.append((o["name"], o['shape'], o['dtype'])) | |
| if self.debug: | |
| print(f"trt input {i} -> {o['name']} -> {o['shape']}") | |
| return specs | |
| def output_spec(self): | |
| """ | |
| Get the specs for the output tensors of the network. Useful to prepare memory allocations. | |
| :return: A list with two items per element, the shape and (numpy) datatype of each output tensor. | |
| """ | |
| specs = [] | |
| for i, o in enumerate(self.outputs): | |
| specs.append((o["name"], o['shape'], o['dtype'])) | |
| if self.debug: | |
| print(f"trt output {i} -> {o['name']} -> {o['shape']}") | |
| return specs | |
| def adjust_buffer(self, feed_dict): | |
| nvtx.range_push("adjust_buffer") | |
| for name, buf in feed_dict.items(): | |
| input_tensor = self.tensors[name] | |
| current_shape = list(buf.shape) | |
| slices = tuple(slice(0, dim) for dim in current_shape) | |
| input_tensor[slices].copy_(buf) | |
| self.context.set_input_shape(name, current_shape) | |
| nvtx.range_pop() | |
| def predict(self, feed_dict, stream): | |
| """ | |
| Execute inference on a batch of images. | |
| :param data: A list of inputs as numpy arrays. | |
| :return A list of outputs as numpy arrays. | |
| """ | |
| nvtx.range_push("set_tensors") | |
| self.adjust_buffer(feed_dict) | |
| for name, tensor in self.tensors.items(): | |
| self.context.set_tensor_address(name, tensor.data_ptr()) | |
| nvtx.range_pop() | |
| nvtx.range_push("execute") | |
| noerror = self.context.execute_async_v3(stream) | |
| if not noerror: | |
| raise ValueError("ERROR: inference failed.") | |
| nvtx.range_pop() | |
| return self.tensors | |
| def __del__(self): | |
| del self.engine | |
| del self.context | |
| del self.inputs | |
| del self.outputs | |
| del self.tensors | |
| class OnnxRuntimePredictor: | |
| """ | |
| OnnxRuntime Prediction | |
| """ | |
| def __init__(self, **kwargs): | |
| model_path = kwargs.get("model_path", "") # 用模型路径区分是否是一样的实例 | |
| assert os.path.exists(model_path), "model path must exist!" | |
| # print("loading ort model:{}".format(model_path)) | |
| self.debug = kwargs.get("debug", False) | |
| providers = ['CUDAExecutionProvider', 'CoreMLExecutionProvider', 'CPUExecutionProvider'] | |
| print(f"OnnxRuntime use {providers}") | |
| opts = onnxruntime.SessionOptions() | |
| # opts.inter_op_num_threads = kwargs.get("num_threads", 4) | |
| # opts.intra_op_num_threads = kwargs.get("num_threads", 4) | |
| # opts.log_severity_level = 3 | |
| self.onnx_model = onnxruntime.InferenceSession(model_path, providers=providers, sess_options=opts) | |
| self.inputs = self.onnx_model.get_inputs() | |
| self.outputs = self.onnx_model.get_outputs() | |
| def input_spec(self): | |
| """ | |
| Get the specs for the input tensor of the network. Useful to prepare memory allocations. | |
| :return: Two items, the shape of the input tensor and its (numpy) datatype. | |
| """ | |
| specs = [] | |
| for i, o in enumerate(self.inputs): | |
| specs.append((o.name, o.shape, o.type)) | |
| if self.debug: | |
| print(f"ort {i} -> {o.name} -> {o.shape}") | |
| return specs | |
| def output_spec(self): | |
| """ | |
| Get the specs for the output tensors of the network. Useful to prepare memory allocations. | |
| :return: A list with two items per element, the shape and (numpy) datatype of each output tensor. | |
| """ | |
| specs = [] | |
| for i, o in enumerate(self.outputs): | |
| specs.append((o.name, o.shape, o.type)) | |
| if self.debug: | |
| print(f"ort output {i} -> {o.name} -> {o.shape}") | |
| return specs | |
| def predict(self, *data): | |
| input_feeds = {} | |
| for i in range(len(data)): | |
| if self.inputs[i].type == 'tensor(float16)': | |
| input_feeds[self.inputs[i].name] = data[i].astype(np.float16) | |
| else: | |
| input_feeds[self.inputs[i].name] = data[i].astype(np.float32) | |
| results = self.onnx_model.run(None, input_feeds) | |
| return results | |
| def __del__(self): | |
| del self.onnx_model | |
| self.onnx_model = None | |
| class OnnxRuntimePredictorSingleton(OnnxRuntimePredictor): | |
| """ | |
| 单例模式,防止模型被加载多次 | |
| """ | |
| _instance_lock = threading.Lock() | |
| _instance = {} | |
| def __new__(cls, *args, **kwargs): | |
| model_path = kwargs.get("model_path", "") # 用模型路径区分是否是一样的实例 | |
| assert os.path.exists(model_path), "model path must exist!" | |
| # 单例模式,避免重复加载模型 | |
| with OnnxRuntimePredictorSingleton._instance_lock: | |
| if model_path not in OnnxRuntimePredictorSingleton._instance or \ | |
| OnnxRuntimePredictorSingleton._instance[model_path].onnx_model is None: | |
| OnnxRuntimePredictorSingleton._instance[model_path] = OnnxRuntimePredictor(**kwargs) | |
| return OnnxRuntimePredictorSingleton._instance[model_path] | |
| def get_predictor(**kwargs): | |
| predict_type = kwargs.get("predict_type", "trt") | |
| if predict_type == "ort": | |
| return OnnxRuntimePredictorSingleton(**kwargs) | |
| elif predict_type == "trt": | |
| return TensorRTPredictor(**kwargs) | |
| else: | |
| raise NotImplementedError | |