# Copyright (c) ONNX Project Contributors # SPDX-License-Identifier: Apache-2.0 import numpy as np from onnx.reference.op_run import OpRun from onnx.reference.ops._op_common_indices import _get_indices, _is_out def _col2im_shape_check_2d(X, output_shape, kernel_shape, dilations, pads, strides): # type: ignore output_height, output_width = output_shape kernel_height, kernel_width = kernel_shape dilation_height, dilation_width = dilations stride_height, stride_width = strides ndim = len(X.shape) if not ( (ndim == 2 and X.shape[0] != 0 and X.shape[1] != 0) or (ndim == 3 and X.shape[1] != 0 and X.shape[2] != 0) ): raise ValueError( "Expected 2D or 3D (batch mode) tensor for input with possibly 0 batch size and non-zero dimensions for input." ) batch_dim = 0 if len(X.shape) == 3 else -1 n_input_plane = X.shape[batch_dim + 1] if n_input_plane % (kernel_width * kernel_height) != 0: raise ValueError( f"Expected size of input's dimension 1 to be divisible by the " f"product of kernel_size, but got input.size(1)={n_input_plane} " f"and kernel_size={kernel_shape}." ) input_length = X.shape[batch_dim + 2] n_blocks_height = ( output_height + pads[0, :].sum() - dilation_height * (kernel_height - 1) - 1 ) // stride_height + 1 n_blocks_width = ( output_width + pads[1, :].sum() - dilation_width * (kernel_width - 1) - 1 ) // stride_width + 1 if input_length != (n_blocks_height * n_blocks_width): raise ValueError( f"Given batch_dim={batch_dim}, n_input_plane={n_input_plane}, X.shape={X.shape}, " f"output_shape={output_shape}, kernel_shape={kernel_shape}, " f"dilations={dilations}, pads={pads}, strides={strides}, " f"expected size of input's dimension 2 to match the calculated number of ", f"sliding blocks {n_blocks_height} * {n_blocks_width} = {n_blocks_height * n_blocks_width}, " f"but got input.size(2)={input_length}.", ) if not (n_blocks_height >= 1 and n_blocks_width >= 1): raise ValueError( f"Given batch_dim={batch_dim}, n_input_plane={n_input_plane}, X.shape={X.shape}, " f"output_shape={output_shape}, kernel_shape={kernel_shape}, " f"dilations={dilations}, pads={pads}, strides={strides}, " f"calculated shape of the array of sliding blocks as ({n_blocks_height}, {n_blocks_width}), " f"which is too small (non-positive)." ) def _col2im_naive_implementation_2d(res, image_shape, kernel_shape, dilations, pads, strides): # type: ignore # source: https://github.com/pytorch/pytorch/blob/master/aten/src/ATen/native/im2col.h n_dims = len(pads) // 2 new_pads = np.array([(pads[i], pads[i + n_dims]) for i in range(n_dims)]) _col2im_shape_check_2d(res, image_shape, kernel_shape, dilations, new_pads, strides) data_col = res.ravel() data_im = np.zeros(image_shape, dtype=res.dtype).flatten() kernel_h, kernel_w = kernel_shape channels_col = kernel_h * kernel_w stride_h, stride_w = strides dilation_h, dilation_w = dilations pad_h, pad_w = new_pads[:, 0] height, width = image_shape output_height, output_width = image_shape height_col = ( output_height + new_pads[0, :].sum() - (dilation_h * (kernel_h - 1) + 1) ) // stride_h + 1 width_col = ( output_width + new_pads[1, :].sum() - (dilation_w * (kernel_w - 1) + 1) ) // stride_w + 1 for c_col in range(channels_col): w_offset = c_col % kernel_w h_offset = (c_col // kernel_w) % kernel_h c_im = c_col // (kernel_h * kernel_w) for h_col in range(height_col): h_im = h_col * stride_h - pad_h + h_offset * dilation_h for w_col in range(width_col): w_im = w_col * stride_w - pad_w + w_offset * dilation_w if 0 <= h_im < height and 0 <= w_im < width: i_im = (c_im * height + h_im) * width + w_im i_col = (c_col * height_col + h_col) * width_col + w_col if 0 <= i_col < data_col.shape[0]: data_im[i_im] += data_col[i_col] return data_im.reshape(image_shape) def _col2im_shape_check(X, output_shape, kernel_shape, dilations, pads, strides): # type: ignore n_input_plane = X.shape[0] kernel_size = np.prod(kernel_shape) if n_input_plane % kernel_size != 0: raise ValueError( f"Expected size of input's dimension 1 to be divisible by the " f"product of kernel_size={kernel_size}, " f"but got input.size(1)={n_input_plane} " f"and kernel_shape={kernel_shape}, X.shape={X.shape}, output_shape={output_shape}." ) input_length = X.shape[1] n_dims = len(output_shape) n_blocks = [] for i in range(n_dims): n_block = ( output_shape[i] + pads[i, :].sum() - dilations[i] * (kernel_shape[i] - 1) - 1 ) // strides[i] + 1 n_blocks.append(n_block) block_size = np.prod(n_blocks) if input_length != block_size: raise ValueError( f"Given n_input_plane={n_input_plane}, X.shape={X.shape}, " f"output_shape={output_shape}, kernel_shape={kernel_shape}, " f"dilations={dilations}, pads={pads}, strides={strides}, " f"expected size of input's dimension 2 to match the calculated number of " f"sliding blocks {n_blocks} = {block_size}, " f"but got input.size(2)={input_length}.", ) def col2im_naive_implementation(data, image_shape, kernel_shape, dilations, pads, strides): # type: ignore """Naive implementation for `col2im`.""" n_dims = len(pads) // 2 new_pads = np.array([(pads[i], pads[i + n_dims]) for i in range(n_dims)]) _col2im_shape_check(data, image_shape, kernel_shape, dilations, new_pads, strides) data_col = data data_im = np.zeros(image_shape, dtype=data.dtype) dim_col = [] for i in range(n_dims): col = ( image_shape[i] + new_pads[i, :].sum() - (dilations[i] * (kernel_shape[i] - 1) + 1) ) // strides[i] + 1 dim_col.append(col) kernel_size = np.prod(kernel_shape) col_size = np.prod(dim_col) for c_col in range(kernel_size): offset = _get_indices(c_col, kernel_shape) for col in range(col_size): ind_col = _get_indices(col, dim_col) ind_im = [] for i in range(n_dims): ind = ( ind_col[i] * strides[i] - new_pads[i, 0] + offset[i] * dilations[i] ) ind_im.append(ind) if not _is_out(ind_im, data_im.shape): data_im[tuple(ind_im)] += data_col[c_col, col] return data_im class Col2Im(OpRun): def _run(self, data, image_shape, block_shape, dilations=None, pads=None, strides=None): # type: ignore if dilations is None: dilations = [1 for s in image_shape] if pads is None: pads = [0 for s in image_shape] * 2 if strides is None: strides = [1 for s in image_shape] bl = np.prod(block_shape) C = data.shape[1] // bl data = data.reshape(data.shape[:1] + (C,) + (bl,) + data.shape[2:]) ks = tuple(block_shape) res = None for n in range(data.shape[0]): for c in range(data.shape[1]): out = col2im_naive_implementation( data[n, c, ...], image_shape, ks, dilations, pads, strides ) if res is None: new_shape = data.shape[:2] + out.shape res = np.empty(new_shape, dtype=data.dtype) res[n, c, ...] = out return (res,) # type: ignore