Spaces:
Running
Running
# Copyright (c) ONNX Project Contributors | |
# SPDX-License-Identifier: Apache-2.0 | |
# type: ignore | |
"""You can run a specific test by using the following syntax. | |
:: | |
python onnx/test/reference_evaluator_test.py TestReferenceEvaluator.test_function_attribute_nested_graph | |
""" | |
import itertools | |
import math | |
import sys | |
import unittest | |
from contextlib import redirect_stdout | |
from functools import wraps | |
from io import StringIO | |
from os import getenv | |
from textwrap import dedent | |
from typing import Sequence, Tuple | |
import numpy as np | |
import parameterized | |
import version_utils | |
from numpy.testing import assert_allclose | |
import onnx.reference.custom_element_types as custom | |
from onnx import ( | |
AttributeProto, | |
FunctionProto, | |
ModelProto, | |
TensorProto, | |
checker, | |
parser, | |
subbyte, | |
) | |
from onnx.backend.test.case.node.roialign import get_roi_align_input_values | |
from onnx.checker import check_model | |
from onnx.defs import onnx_opset_version | |
from onnx.helper import ( | |
float32_to_bfloat16, | |
float32_to_float8e4m3, | |
float32_to_float8e5m2, | |
make_function, | |
make_graph, | |
make_model, | |
make_model_gen_version, | |
make_node, | |
make_operatorsetid, | |
make_opsetid, | |
make_sequence_type_proto, | |
make_tensor, | |
make_tensor_sequence_value_info, | |
make_tensor_value_info, | |
make_value_info, | |
) | |
from onnx.numpy_helper import float8e4m3_to_float32, float8e5m2_to_float32, from_array | |
from onnx.reference import ReferenceEvaluator | |
from onnx.reference.op_run import OpRun, OpRunExpand | |
from onnx.reference.ops import load_op | |
from onnx.reference.ops._op_common_indices import _get_indices, _is_out | |
from onnx.reference.ops._op_list import Cast_19, Celu | |
from onnx.reference.ops.aionnx_preview_training._op_list import Adam | |
from onnx.reference.ops.op_celu import _vcelu1 | |
from onnx.reference.ops.op_col2im import ( | |
_col2im_naive_implementation_2d, | |
col2im_naive_implementation, | |
) | |
from onnx.reference.ops.op_conv import Conv, _conv_implementation | |
from onnx.reference.ops_optimized import Conv as ConvOptimized | |
from onnx.reference.ops_optimized.op_conv_optimized import _conv_implementation_im2col | |
# TODO (https://github.com/microsoft/onnxruntime/issues/14932): Get max supported version from onnxruntime directly | |
# For now, bump the version in CIs whenever there is a new onnxruntime release | |
ORT_MAX_IR_SUPPORTED_VERSION = int(getenv("ORT_MAX_IR_SUPPORTED_VERSION", "8")) | |
ORT_MAX_ONNX_OPSET_SUPPORTED_VERSION = int( | |
getenv("ORT_MAX_ONNX_OPSET_SUPPORTED_VERSION", "18") | |
) | |
def skip_if_no_onnxruntime(fn): | |
def wrapper(*args, **kwargs): | |
try: | |
import onnxruntime | |
del onnxruntime | |
except ImportError: | |
raise unittest.SkipTest("onnxruntime not installed") from None | |
fn(*args, **kwargs) | |
return wrapper | |
def skip_if_no_torch(fn): | |
def wrapper(*args, **kwargs): | |
try: | |
import torch | |
del torch | |
except ImportError: | |
raise unittest.SkipTest("torch not installed") from None | |
fn(*args, **kwargs) | |
return wrapper | |
def skip_if_no_torchvision(fn): | |
def wrapper(*args, **kwargs): | |
try: | |
import torchvision | |
del torchvision | |
except ImportError: | |
raise unittest.SkipTest("torchvision not installed") from None | |
fn(*args, **kwargs) | |
return wrapper | |
def make_sequence_value_info(name, elem_type, shape): | |
if isinstance(elem_type, int): | |
return make_tensor_sequence_value_info(name, elem_type, shape) | |
s_type = make_sequence_type_proto(elem_type) | |
return make_value_info(name, s_type, shape) | |
def run_ort_inference(onnx_model): | |
import onnxruntime as ort | |
onnx_domain_opset = ORT_MAX_ONNX_OPSET_SUPPORTED_VERSION | |
for opset in onnx_model.opset_import: | |
if opset.domain in ("", "ai.onnx"): | |
onnx_domain_opset = opset.version | |
break | |
# The new IR or opset version is not supported by onnxruntime yet | |
if ( | |
onnx_model.ir_version > ORT_MAX_IR_SUPPORTED_VERSION | |
or onnx_domain_opset > ORT_MAX_ONNX_OPSET_SUPPORTED_VERSION | |
): | |
return None | |
return ort.InferenceSession( | |
onnx_model.SerializeToString(), providers=["CPUExecutionProvider"] | |
) | |
def im2col_naive_implementation(data, kernel_shape, dilations, pads, strides): # type: ignore | |
"""Naive implementation for `im2col`. | |
Args: | |
data: image (float) | |
kernel_shape: kernel shape | |
dilations: dilations | |
pads: pads | |
strides: strides | |
Returns: | |
result | |
""" | |
if not isinstance(kernel_shape, tuple): | |
raise TypeError(f"Unexpected type {type(kernel_shape)!r} for kernel_shape.") | |
if len(data.shape) != len(kernel_shape): | |
raise ValueError(f"Shape mismatch {data.shape!r} and {kernel_shape!r}.") | |
n_dims = len(pads) // 2 | |
new_pads = np.array([(pads[i], pads[i + n_dims]) for i in range(n_dims)]) | |
list_output_shape = list(data.shape + kernel_shape) | |
for d in range(n_dims): | |
kd = kernel_shape[d] + (kernel_shape[d] - 1) * (dilations[d] - 1) | |
nd = int( | |
((list_output_shape[d] - kd + new_pads[d][0] + new_pads[d][1]) / strides[d]) | |
+ 1 | |
) | |
list_output_shape[d] = nd | |
output_shape = tuple(list_output_shape) | |
res = np.zeros(output_shape, dtype=data.dtype) | |
kernel_size = np.prod(kernel_shape) | |
res_size = np.prod(res.shape[:-n_dims]) | |
for i in range(res_size): | |
i_res = _get_indices(i, res.shape[:-n_dims]) | |
t_res = tuple(i_res) | |
for j in range(kernel_size): | |
i_kernel = _get_indices(j, kernel_shape) | |
t_kernel = tuple(i_kernel) | |
i_img = i_res * strides - new_pads[:, 0] + i_kernel * dilations | |
t_img = tuple(i_img) | |
if _is_out(t_img, data.shape): | |
res[t_res + t_kernel] = 0 | |
else: | |
res[t_res + t_kernel] = data[tuple(t_img)] | |
return res | |
def im2col( | |
img: np.ndarray, | |
kernel_shape: Tuple[int, ...], | |
dilations: Sequence[int], | |
pads: Sequence[int], | |
strides: Sequence[int], | |
) -> np.ndarray: | |
res = None | |
for n in range(img.shape[0]): | |
for c in range(img.shape[1]): | |
out = im2col_naive_implementation( | |
img[n, c, ...], kernel_shape, dilations, pads, strides | |
) | |
if res is None: | |
new_shape = img.shape[:2] + out.shape | |
res = np.empty(new_shape, dtype=img.dtype) | |
res[n, c, ...] = out | |
new_shape = res.shape[: -len(kernel_shape)] + (-1,) # type: ignore | |
return res.reshape(new_shape) # type: ignore | |
class TestReferenceEvaluator(unittest.TestCase): | |
m2_def = """ | |
< | |
ir_version: 7, | |
opset_import: [ "": 10, "com.microsoft": 1] | |
> | |
agraph (float[N, M] B01, float[N, M] B11, float[N, M] B21) => (float[N, M] D0) | |
{ | |
C0 = Add(B01, B11) | |
C1 = Sub(B11, B21) | |
D0 = Mul(C0, C1) | |
} | |
""" | |
def _load_model(m_def: str) -> ModelProto: | |
"""Parses a model from a string representation, including checking | |
the model for correctness | |
""" | |
m = parser.parse_model(m_def) | |
checker.check_model(m) | |
return m | |
def _linear_regression(clip=False, opset=None, min_value=-1.0, max_value=1.0): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
A = make_tensor_value_info("A", TensorProto.FLOAT, [None, None]) | |
B = make_tensor_value_info("B", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("MatMul", ["X", "A"], ["XA"]) | |
if clip: | |
node2 = make_node("Add", ["XA", "B"], ["Y_clip"]) | |
if opset is not None and opset < 11: | |
if min_value: | |
if max_value: | |
node3 = make_node( | |
"Clip", ["Y_clip"], ["Y"], min=min_value, max=max_value | |
) | |
else: | |
node3 = make_node("Clip", ["Y_clip"], ["Y"], min=min_value) | |
elif max_value: | |
node3 = make_node("Clip", ["Y_clip"], ["Y"], max=max_value) | |
else: | |
node3 = make_node("Clip", ["Y_clip"], ["Y"]) | |
graph = make_graph([node1, node2, node3], "lr", [X, A, B], [Y]) | |
else: | |
mi = ( | |
from_array(np.array([min_value], dtype=np.float32), name="mi") | |
if min_value | |
else None | |
) | |
ma = ( | |
from_array(np.array([max_value], dtype=np.float32), name="ma") | |
if max_value | |
else None | |
) | |
inputs = ["Y_clip", "mi" if mi else "", "ma" if ma else ""] | |
node3 = make_node("Clip", inputs, ["Y"]) | |
initializer = [_ for _ in [mi, ma] if _] | |
graph = make_graph( | |
[node1, node2, node3], "lr", [X, A, B], [Y], initializer=initializer | |
) | |
f = lambda x, a, b: np.clip(a @ a + b, min_value, max_value) # noqa: E731 | |
else: | |
node2 = make_node("Add", ["XA", "B"], ["Y"]) | |
graph = make_graph([node1, node2], "lr", [X, A, B], [Y]) | |
f = lambda x, a, b: a @ a + b # noqa: E731 | |
if opset is None: | |
onnx_model = make_model(graph) | |
else: | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", opset)]) | |
try: | |
check_model(onnx_model) | |
except Exception as e: | |
raise AssertionError(f"checker fails for\n{onnx_model}") from e | |
return onnx_model, f | |
def test_reference_evaluator_exceptions(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
with self.assertRaises(TypeError): | |
ReferenceEvaluator(X) | |
def test_reference_evaluator_no_attribute(self): | |
m = TestReferenceEvaluator._load_model(TestReferenceEvaluator.m2_def) | |
checker.check_model(m) | |
sess = ReferenceEvaluator(m) | |
self.assertEqual(sess.input_names, ["B01", "B11", "B21"]) | |
self.assertEqual(sess.output_names, ["D0"]) | |
self.assertEqual(sess.opsets, {"": 10, "com.microsoft": 1}) | |
x = np.array([[0, 1], [2, 3]], dtype=np.float32) | |
y = np.array([[4, 5], [6, 7]], dtype=np.float32) | |
z = np.array([[-4, -5], [-6, -7]], dtype=np.float32) | |
res = sess.run(None, {"B01": x, "B11": y, "B21": z})[0] | |
expected = (x + y) * (y - z) | |
assert_allclose(expected, res) | |
def test_reference_evaluator_no_attribute_bytes(self): | |
m = TestReferenceEvaluator._load_model(TestReferenceEvaluator.m2_def) | |
checker.check_model(m) | |
sess = ReferenceEvaluator(m.SerializeToString()) | |
self.assertEqual(sess.input_names, ["B01", "B11", "B21"]) | |
self.assertEqual(sess.output_names, ["D0"]) | |
self.assertEqual(sess.opsets, {"": 10, "com.microsoft": 1}) | |
x = np.array([[0, 1], [2, 3]], dtype=np.float32) | |
y = np.array([[4, 5], [6, 7]], dtype=np.float32) | |
z = np.array([[-4, -5], [-6, -7]], dtype=np.float32) | |
res = sess.run(None, {"B01": x, "B11": y, "B21": z})[0] | |
expected = (x + y) * (y - z) | |
assert_allclose(expected, res) | |
def test_reference_evaluator_no_attribute_verbose(self): | |
m = TestReferenceEvaluator._load_model(TestReferenceEvaluator.m2_def) | |
x = np.array([[0, 1], [2, 3]], dtype=np.float32) | |
y = np.array([[4, 5], [6, 7]], dtype=np.float32) | |
z = np.array([[-4, -5], [-6, -7]], dtype=np.float32) | |
with self.subTest(level=2): | |
sess = ReferenceEvaluator(m, verbose=2) | |
stdout = StringIO() | |
with redirect_stdout(stdout): | |
sess.run(None, {"B01": x, "B11": y, "B21": z}) | |
out = stdout.getvalue() | |
log = "Add(B01, B11) -> C0\nSub(B11, B21) -> C1\nMul(C0, C1) -> D0\n" | |
self.assertEqual(log, out) | |
with self.subTest(level=3): | |
sess = ReferenceEvaluator(m, verbose=3) | |
stdout = StringIO() | |
with redirect_stdout(stdout): | |
sess.run(None, {"B01": x, "B11": y, "B21": z}) | |
out = stdout.getvalue() | |
log = dedent( | |
""" | |
+I B01: float32:(2, 2) in [0.0, 3.0] | |
+I B11: float32:(2, 2) in [4.0, 7.0] | |
+I B21: float32:(2, 2) in [-7.0, -4.0] | |
Add(B01, B11) -> C0 | |
+ C0: float32:(2, 2) in [4.0, 10.0] | |
Sub(B11, B21) -> C1 | |
+ C1: float32:(2, 2) in [8.0, 14.0] | |
Mul(C0, C1) -> D0 | |
+ D0: float32:(2, 2) in [32.0, 140.0] | |
""" | |
).lstrip("\n") | |
self.assertEqual(log, out) | |
with self.subTest(level=4): | |
sess = ReferenceEvaluator(m, verbose=4) | |
stdout = StringIO() | |
with redirect_stdout(stdout): | |
sess.run(None, {"B01": x, "B11": y, "B21": z}) | |
out = stdout.getvalue() | |
log = dedent( | |
""" | |
+I B01: float32:(2, 2):[0.0, 1.0, 2.0, 3.0] | |
+I B11: float32:(2, 2):[4.0, 5.0, 6.0, 7.0] | |
+I B21: float32:(2, 2):[-4.0, -5.0, -6.0, -7.0] | |
Add(B01, B11) -> C0 | |
+ C0: float32:(2, 2):[4.0, 6.0, 8.0, 10.0] | |
Sub(B11, B21) -> C1 | |
+ C1: float32:(2, 2):[8.0, 10.0, 12.0, 14.0] | |
Mul(C0, C1) -> D0 | |
+ D0: float32:(2, 2):[32.0, 60.0, 96.0, 140.0] | |
""" | |
).lstrip("\n") | |
self.assertEqual(log, out) | |
with self.subTest(level=15): | |
sess = ReferenceEvaluator(m, verbose=15) | |
stdout = StringIO() | |
with redirect_stdout(stdout): | |
sess.run(None, {"B01": x, "B11": y, "B21": z}) | |
out = stdout.getvalue() | |
log = dedent( | |
""" | |
+I B01: float32:(2, 2):[0.0, 1.0, 2.0, 3.0] | |
+I B11: float32:(2, 2):[4.0, 5.0, 6.0, 7.0] | |
+I B21: float32:(2, 2):[-4.0, -5.0, -6.0, -7.0] | |
Add(B01, B11) -> C0 | |
-- begin Add.run(2 inputs) | |
-- done Add.run -> 1 outputs | |
+ C0: float32:(2, 2):[4.0, 6.0, 8.0, 10.0] | |
Sub(B11, B21) -> C1 | |
-- begin Sub.run(2 inputs) | |
-- done Sub.run -> 1 outputs | |
+ C1: float32:(2, 2):[8.0, 10.0, 12.0, 14.0] | |
Mul(C0, C1) -> D0 | |
-- begin Mul.run(2 inputs) | |
-- done Mul.run -> 1 outputs | |
+ D0: float32:(2, 2):[32.0, 60.0, 96.0, 140.0] | |
""" | |
).lstrip("\n") | |
self.assertEqual(log, out) | |
def test_reference_evaluator_lr(self): | |
lr, f = TestReferenceEvaluator._linear_regression() | |
x = np.array([[0, 1], [2, 3]], dtype=np.float32) | |
a = np.array([1, 1], dtype=np.float32) | |
b = np.array([11], dtype=np.float32) | |
expected = f(x, a, b) | |
sess = ReferenceEvaluator(lr) | |
got = sess.run(None, {"X": a, "A": a, "B": b})[0] | |
assert_allclose(expected, got) | |
def test_reference_evaluator_lr_clip(self): | |
with self.subTest(opt="min+max"): | |
lr, f = TestReferenceEvaluator._linear_regression(clip=True) | |
x = np.array([[0, 1], [2, 3]], dtype=np.float32) | |
a = np.array([1, 1], dtype=np.float32) | |
b = np.array([11], dtype=np.float32) | |
expected = f(x, a, b) | |
sess = ReferenceEvaluator(lr) | |
last_node = sess.rt_nodes_[-1] | |
self.assertEqual(last_node.__class__.__name__, "Clip_11") | |
got = sess.run(None, {"X": a, "A": a, "B": b})[0] | |
assert_allclose(expected, got) | |
with self.subTest(opt="max"): | |
lr, f = TestReferenceEvaluator._linear_regression(clip=True, min_value=None) | |
x = np.array([[0, 1], [2, 3]], dtype=np.float32) | |
a = np.array([1, 1], dtype=np.float32) | |
b = np.array([11], dtype=np.float32) | |
expected = f(x, a, b) | |
sess = ReferenceEvaluator(lr) | |
last_node = sess.rt_nodes_[-1] | |
self.assertEqual(last_node.__class__.__name__, "Clip_11") | |
got = sess.run(None, {"X": a, "A": a, "B": b})[0] | |
assert_allclose(expected, got) | |
with self.subTest(opt="min"): | |
lr, f = TestReferenceEvaluator._linear_regression(clip=True, max_value=None) | |
x = np.array([[0, 1], [2, 3]], dtype=np.float32) | |
a = np.array([1, 1], dtype=np.float32) | |
b = np.array([11], dtype=np.float32) | |
expected = f(x, a, b) | |
sess = ReferenceEvaluator(lr) | |
last_node = sess.rt_nodes_[-1] | |
self.assertEqual(last_node.__class__.__name__, "Clip_11") | |
got = sess.run(None, {"X": a, "A": a, "B": b})[0] | |
assert_allclose(expected, got) | |
def test_reference_evaluator_lr_clip_6(self): | |
with self.subTest(opt="min+max"): | |
lr, f = TestReferenceEvaluator._linear_regression(clip=True, opset=10) | |
x = np.array([[0, 1], [2, 3]], dtype=np.float32) | |
a = np.array([1, 1], dtype=np.float32) | |
b = np.array([11], dtype=np.float32) | |
expected = f(x, a, b) | |
sess = ReferenceEvaluator(lr) | |
last_node = sess.rt_nodes_[-1] | |
self.assertEqual(last_node.__class__.__name__, "Clip_6") | |
self.assertEqual(last_node.min, -1) | |
self.assertEqual(last_node.max, 1) | |
got = sess.run(None, {"X": a, "A": a, "B": b})[0] | |
assert_allclose(expected, got) | |
with self.subTest(opt="max"): | |
lr, f = TestReferenceEvaluator._linear_regression( | |
clip=True, opset=10, min_value=None | |
) | |
x = np.array([[0, 1], [2, 3]], dtype=np.float32) | |
a = np.array([1, 1], dtype=np.float32) | |
b = np.array([11], dtype=np.float32) | |
expected = f(x, a, b) | |
sess = ReferenceEvaluator(lr) | |
last_node = sess.rt_nodes_[-1] | |
self.assertEqual(last_node.__class__.__name__, "Clip_6") | |
self.assertEqual(last_node.max, 1) | |
self.assertEqual(last_node.min, -3.4028234663852886e38) | |
got = sess.run(None, {"X": a, "A": a, "B": b})[0] | |
assert_allclose(expected, got) | |
with self.subTest(opt="min"): | |
lr, f = TestReferenceEvaluator._linear_regression( | |
clip=True, opset=10, max_value=None | |
) | |
x = np.array([[0, 1], [2, 3]], dtype=np.float32) | |
a = np.array([1, 1], dtype=np.float32) | |
b = np.array([11], dtype=np.float32) | |
expected = f(x, a, b) | |
sess = ReferenceEvaluator(lr) | |
last_node = sess.rt_nodes_[-1] | |
self.assertEqual(last_node.__class__.__name__, "Clip_6") | |
self.assertEqual(last_node.min, -1) | |
self.assertEqual(last_node.max, 3.4028234663852886e38) | |
got = sess.run(None, {"X": a, "A": a, "B": b})[0] | |
assert_allclose(expected, got) | |
def test_nested_local_functions(self): | |
m = parser.parse_model( | |
""" | |
< | |
ir_version: 8, | |
opset_import: [ "" : 14, "local" : 1], | |
producer_name: "test", | |
producer_version: "1.0", | |
model_version: 1, | |
doc_string: "Test preprocessing model" | |
> | |
agraph (uint8[H, W, C] x) => (uint8[H, W, C] x_processed) | |
{ | |
x_processed = local.func(x) | |
} | |
< | |
opset_import: [ "" : 14 ], | |
domain: "local", | |
doc_string: "function 1" | |
> | |
f1 (x) => (y) { | |
y = Identity(x) | |
} | |
< | |
opset_import: [ "" : 14 ], | |
domain: "local", | |
doc_string: "function 2" | |
> | |
f2 (x) => (y) { | |
y = Identity(x) | |
} | |
< | |
opset_import: [ "" : 14, "local" : 1 ], | |
domain: "local", | |
doc_string: "Preprocessing function." | |
> | |
func (x) => (y) { | |
x1 = local.f1(x) | |
y = local.f2(x1) | |
} | |
""" | |
) | |
sess = ReferenceEvaluator(m) | |
x = np.array([0, 1, 3], dtype=np.uint8).reshape((1, 1, 3)) | |
result = sess.run(None, {"x": x})[0] | |
expected = x | |
assert_allclose(expected, result) | |
def test_reduce_sum_11(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("ReduceSum", ["X"], ["Y"], axes=[1], keepdims=1) | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 11)]) | |
check_model(onnx_model) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) | |
expected = x.sum(axis=1, keepdims=1) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": x})[0] | |
assert_allclose(expected, got) | |
def test_reduce_sum_square_11(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("ReduceSumSquare", ["X"], ["Y"], axes=[1], keepdims=1) | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 11)]) | |
check_model(onnx_model) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) | |
expected = (x * x).sum(axis=1, keepdims=1) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": x})[0] | |
assert_allclose(expected, got) | |
def test_reduce_sum_13(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
A = make_tensor_value_info("A", TensorProto.INT64, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("ReduceSum", ["X", "A"], ["Y"], keepdims=1) | |
graph = make_graph([node1], "rs", [X, A], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 13)]) | |
check_model(onnx_model) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) | |
a = np.array([1], dtype=np.int64) | |
expected = x.sum(axis=1, keepdims=1) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": x, "A": a})[0] | |
assert_allclose(expected, got) | |
def test_reduce_sum_attribute(self): | |
opset = onnx_opset_version() | |
new_domain = "custom" | |
opset_imports = [make_opsetid("", opset), make_opsetid(new_domain, 1)] | |
node = make_node("ReduceSum", ["X", "axis"], ["Y"]) | |
att = AttributeProto() | |
att.name = "keepdims" | |
att.ref_attr_name = "keepdims" | |
att.type = AttributeProto.INT | |
node.attribute.append(att) | |
my_reduce_sum = make_function( | |
new_domain, | |
"MyReduceSum", | |
["X", "axis"], | |
["Y"], | |
[node], | |
opset_imports, | |
["keepdims"], | |
) | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
axis = make_tensor_value_info("axis", TensorProto.INT64, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
graph = make_graph( | |
[ | |
make_node( | |
"MyReduceSum", | |
["X", "axis"], | |
["Y"], | |
domain=new_domain, | |
keepdims=1, | |
), | |
], | |
"example", | |
[X, axis], | |
[Y], | |
) | |
onnx_model = make_model( | |
graph, opset_imports=opset_imports, functions=[my_reduce_sum] | |
) | |
sess = ReferenceEvaluator(onnx_model) | |
x = np.arange(6).reshape((3, 2)).astype(np.float32) | |
a = np.array([-1], dtype=np.int64) | |
result = sess.run(None, {"X": x, "axis": a})[0] | |
expected = x.sum(axis=-1, keepdims=1) | |
assert_allclose(expected, result) | |
def test_reduce_sum_square_18(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
A = make_tensor_value_info("A", TensorProto.INT64, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("ReduceSumSquare", ["X", "A"], ["Y"], keepdims=1) | |
graph = make_graph([node1], "rs", [X, A], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 18)]) | |
check_model(onnx_model) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) | |
a = np.array([1], dtype=np.int64) | |
expected = (x * x).sum(axis=1, keepdims=1) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": x, "A": a})[0] | |
assert_allclose(expected, got) | |
def test_reduce_sum_13_empty_axes(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
A = make_tensor_value_info("A", TensorProto.INT64, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("ReduceSum", ["X", "A"], ["Y"], keepdims=1) | |
graph = make_graph([node1], "rs", [X, A], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 13)]) | |
check_model(onnx_model) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) | |
a = np.array([], dtype=np.int64) | |
expected = x.sum(keepdims=1) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": x, "A": a})[0] | |
assert_allclose(expected, got) | |
def test_reduce_sum_square_18_empty_axes(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
A = make_tensor_value_info("A", TensorProto.INT64, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("ReduceSumSquare", ["X", "A"], ["Y"], keepdims=1) | |
graph = make_graph([node1], "rs", [X, A], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 18)]) | |
check_model(onnx_model) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) | |
a = np.array([], dtype=np.int64) | |
expected = (x * x).sum(keepdims=1) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": x, "A": a})[0] | |
assert_allclose(expected, got) | |
def test_reduce_sum_13_empty_axes_noop(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("ReduceSum", ["X"], ["Y"], keepdims=1, noop_with_empty_axes=1) | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 13)]) | |
check_model(onnx_model) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": x})[0] | |
assert_allclose(x, got) | |
def test_reduce_sum_square_18_empty_axes_noop(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node( | |
"ReduceSumSquare", ["X"], ["Y"], keepdims=1, noop_with_empty_axes=1 | |
) | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 18)]) | |
check_model(onnx_model) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": x})[0] | |
assert_allclose(x * x, got) | |
def test_greater(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
Z = make_tensor_value_info("Z", TensorProto.FLOAT, [None]) | |
node1 = make_node("Greater", ["X", "Y"], ["Z"]) | |
graph = make_graph([node1], "g", [X, Y], [Z]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 13)]) | |
check_model(onnx_model) | |
x = np.arange(4).reshape((2, 2)).astype(np.float32) | |
y = np.array([2], dtype=np.float32) | |
expected = x > y | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": x, "Y": y})[0] | |
assert_allclose(expected, got) | |
def test_node_proto(self): | |
node1 = make_node("Greater", ["X", "Y"], ["Z"]) | |
x = np.arange(4).reshape((2, 2)).astype(np.float32) | |
y = np.array([2], dtype=np.float32) | |
expected = x > y | |
sess = ReferenceEvaluator(node1) | |
got = sess.run(None, {"X": x, "Y": y})[0] | |
assert_allclose(expected, got) | |
def test_greater_or_equal(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
Z = make_tensor_value_info("Z", TensorProto.FLOAT, [None]) | |
node1 = make_node("GreaterOrEqual", ["X", "Y"], ["Z"]) | |
graph = make_graph([node1], "g", [X, Y], [Z]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 13)]) | |
check_model(onnx_model) | |
x = np.arange(4).reshape((2, 2)).astype(np.float32) | |
y = np.array([2], dtype=np.float32) | |
expected = x >= y | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": x, "Y": y})[0] | |
assert_allclose(expected, got) | |
def test_if(self): | |
C = make_tensor_value_info("C", TensorProto.FLOAT, [None]) | |
bthen = make_node( | |
"Constant", | |
[], | |
["C"], | |
value_floats=from_array(np.array([1], dtype=np.float32)), | |
) | |
bthen_body = make_graph([bthen], "gthen", [], [C]) | |
C = make_tensor_value_info("C", TensorProto.FLOAT, [None]) | |
belse = make_node( | |
"Constant", | |
[], | |
["C"], | |
value_floats=from_array(np.array([0], dtype=np.float32)), | |
) | |
belse_body = make_graph([belse], "gelse", [], [C]) | |
zero = from_array(np.array([0], dtype=np.float32), name="zero") | |
greater = make_node("Greater", ["X", "zero"], ["G"]) | |
node_if = make_node( | |
"If", | |
["G"], | |
["Z"], | |
then_branch=bthen_body, | |
else_branch=belse_body, | |
) | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Z = make_tensor_value_info("Z", TensorProto.FLOAT, [None]) | |
graph = make_graph([greater, node_if], "g", [X], [Z], initializer=[zero]) | |
model_def = make_model(graph) | |
sess = ReferenceEvaluator(model_def) | |
self.assertEqual(str(sess), "ReferenceEvaluator(X) -> Z") | |
x = np.array([1], dtype=np.float32) | |
got = sess.run(None, {"X": x})[0] | |
assert_allclose(np.array([1], dtype=np.float32), got) | |
x = np.array([-1], dtype=np.float32) | |
got = sess.run(None, {"X": x})[0] | |
assert_allclose(np.array([0], dtype=np.float32), got) | |
def test_if_function(self): | |
then_out = make_tensor_value_info("then_out", TensorProto.FLOAT, [5]) | |
else_out = make_tensor_value_info("else_out", TensorProto.FLOAT, [5]) | |
x = np.array([1, 2, 3, 4, 5]).astype(np.float32) | |
y = np.array([5, 4, 3, 2, 1]).astype(np.float32) | |
then_const_node = make_node( | |
"Constant", inputs=[], outputs=["then_out"], value=from_array(x) | |
) | |
else_const_node = make_node( | |
"Constant", inputs=[], outputs=["else_out"], value=from_array(y) | |
) | |
then_body = make_graph([then_const_node], "then_body", [], [then_out]) | |
else_body = make_graph([else_const_node], "else_body", [], [else_out]) | |
if_node = make_node( | |
"If", | |
inputs=["f_cond"], | |
outputs=["f_res"], | |
then_branch=then_body, | |
else_branch=else_body, | |
) | |
f = FunctionProto() | |
f.domain = "custom" | |
f.name = "fn" | |
f.input.extend(["f_cond"]) | |
f.output.extend(["f_res"]) | |
f.node.extend([if_node]) | |
opset = onnx_opset_version() | |
f.opset_import.extend([make_opsetid("", opset)]) | |
graph = make_graph( | |
nodes=[make_node("fn", domain="custom", inputs=["cond"], outputs=["res"])], | |
name="graph", | |
inputs=[make_tensor_value_info("cond", TensorProto.BOOL, [])], | |
outputs=[make_tensor_value_info("res", TensorProto.FLOAT, [5])], | |
) | |
m = make_model( | |
graph, | |
producer_name="test", | |
opset_imports=[make_opsetid("", opset), make_opsetid("custom", 1)], | |
) | |
m.functions.extend([f]) | |
sess = ReferenceEvaluator(m) | |
result = sess.run(None, {"cond": np.array(True)}) | |
expected = np.array([1, 2, 3, 4, 5], dtype=np.float32) | |
assert_allclose(expected, result[0]) | |
def test_function_attribute(self): | |
opset = onnx_opset_version() | |
new_domain = "custom" | |
opset_imports = [make_opsetid("", opset), make_opsetid(new_domain, 1)] | |
cst = make_node("Constant", [], ["B"]) | |
att = AttributeProto() | |
att.name = "value" | |
att.ref_attr_name = "bias" | |
att.type = AttributeProto.TENSOR | |
cst.attribute.append(att) | |
node1 = make_node("MatMul", ["X", "A"], ["XA"]) | |
node2 = make_node("Add", ["XA", "B"], ["Y"]) | |
linear_regression = make_function( | |
new_domain, | |
"LinearRegression", | |
["X", "A"], | |
["Y"], | |
[cst, node1, node2], | |
opset_imports, | |
["bias"], | |
) | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
A = make_tensor_value_info("A", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
graph = make_graph( | |
[ | |
make_node( | |
"LinearRegression", | |
["X", "A"], | |
["Y1"], | |
domain=new_domain, | |
bias=make_tensor("former_B", TensorProto.FLOAT, [1], [0.67]), | |
), | |
make_node("Abs", ["Y1"], ["Y"]), | |
], | |
"example", | |
[X, A], | |
[Y], | |
) | |
onnx_model = make_model( | |
graph, opset_imports=opset_imports, functions=[linear_regression] | |
) | |
sess = ReferenceEvaluator(onnx_model) | |
x = np.arange(6).reshape((3, 2)).astype(np.float32) | |
a = np.array([1, -1], dtype=np.float32) | |
result = sess.run(None, {"X": x, "A": a})[0] | |
expected = np.abs(x @ a + 0.67) | |
assert_allclose(expected, result) | |
def test_function_attribute_nested_graph(self): | |
opset = onnx_opset_version() | |
new_domain = "custom" | |
opset_imports = [make_opsetid("", opset), make_opsetid(new_domain, 1)] | |
cst1 = make_node("Constant", [], ["B1"]) | |
att = AttributeProto() | |
att.name = "value" | |
att.ref_attr_name = "bias1" | |
att.type = AttributeProto.TENSOR | |
cst1.attribute.append(att) | |
cst2 = make_node("Constant", [], ["B2"]) | |
att = AttributeProto() | |
att.name = "value" | |
att.ref_attr_name = "bias2" | |
att.type = AttributeProto.TENSOR | |
cst2.attribute.append(att) | |
then_out = make_tensor_value_info("B1", TensorProto.FLOAT, [None]) | |
else_out = make_tensor_value_info("B2", TensorProto.FLOAT, [None]) | |
then_body = make_graph([cst1], "then_body", [], [then_out]) | |
else_body = make_graph([cst2], "else_body", [], [else_out]) | |
zero = make_node( | |
"Constant", | |
inputs=[], | |
outputs=["zero"], | |
value=from_array(np.array([0], dtype=np.float32)), | |
) | |
mini = make_node("ReduceMin", ["X"], ["Xmin"]) | |
f_cond = make_node("Greater", ["Xmin", "zero"], ["f_cond"]) | |
if_node = make_node( | |
"If", | |
inputs=["f_cond"], | |
outputs=["B"], | |
then_branch=then_body, | |
else_branch=else_body, | |
) | |
node1 = make_node("MatMul", ["X", "A"], ["XA"]) | |
node2 = make_node("Add", ["XA", "B"], ["Y"]) | |
linear_regression = make_function( | |
new_domain, | |
"LinearRegression", | |
["X", "A"], | |
["Y"], | |
[zero, mini, f_cond, if_node, node1, node2], | |
opset_imports, | |
["bias1", "bias2"], | |
) | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
A = make_tensor_value_info("A", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
graph = make_graph( | |
[ | |
make_node( | |
"LinearRegression", | |
["X", "A"], | |
["Y1"], | |
domain=new_domain, | |
bias1=make_tensor("former_B1", TensorProto.FLOAT, [1], [0.67]), | |
bias2=make_tensor("former_B2", TensorProto.FLOAT, [1], [777]), | |
), | |
make_node("Abs", ["Y1"], ["Y"]), | |
], | |
"example", | |
[X, A], | |
[Y], | |
) | |
onnx_model = make_model( | |
graph, opset_imports=opset_imports, functions=[linear_regression] | |
) | |
check_model(onnx_model) | |
sess = ReferenceEvaluator(onnx_model) | |
self.assertEqual(sess.rt_nodes_[0].__class__.__name__, "OpFunction") | |
self.assertEqual( | |
sess.rt_nodes_[0].impl_.__class__.__name__, "ReferenceEvaluator" | |
) | |
fct = sess.rt_nodes_[0].impl_ | |
checked = False | |
for node in fct.rt_nodes_: | |
if node.__class__.__name__.startswith("If"): | |
if not node.has_linked_attribute: | |
raise AssertionError( | |
f"Nested node {type(node)} declares no linked attribute " | |
f"but a subgraph does." | |
) | |
checked = True | |
if not checked: | |
raise AssertionError( | |
"No node 'If' was found, has_linked_attribute could not be checked." | |
) | |
x = np.arange(6).reshape((3, 2)).astype(np.float32) | |
a = np.array([1, -1], dtype=np.float32) | |
result = sess.run(None, {"X": x + 1, "A": a})[0] | |
expected = np.abs(x @ a + 0.67) | |
assert_allclose(expected, result) | |
result = sess.run(None, {"X": x - 10, "A": a})[0] | |
expected = np.abs(x @ a + 777) | |
assert_allclose(expected, result) | |
def test_function_attribute_nested_nested_graph(self): | |
opset = onnx_opset_version() | |
new_domain = "custom" | |
opset_imports = [make_opsetid("", opset), make_opsetid(new_domain, 1)] | |
# first If | |
cst1 = make_node("Constant", [], ["B1"]) | |
att = AttributeProto() | |
att.name = "value" | |
att.ref_attr_name = "bias1" | |
att.type = AttributeProto.TENSOR | |
cst1.attribute.append(att) | |
cst2 = make_node("Constant", [], ["B2"]) | |
att = AttributeProto() | |
att.name = "value" | |
att.ref_attr_name = "bias2" | |
att.type = AttributeProto.TENSOR | |
cst2.attribute.append(att) | |
then_out = make_tensor_value_info("B1", TensorProto.FLOAT, [None]) | |
else_out = make_tensor_value_info("B2", TensorProto.FLOAT, [None]) | |
then_body1 = make_graph([cst1], "then_body", [], [then_out]) | |
else_body1 = make_graph([cst2], "else_body", [], [else_out]) | |
# sub graph 2 | |
c100 = make_node( | |
"Constant", | |
inputs=[], | |
outputs=["c100"], | |
value=from_array(np.array([100], dtype=np.float32)), | |
) | |
f_cond = make_node("Greater", ["Xmin", "c100"], ["f_cond_100"]) | |
if_node = make_node( | |
"If", | |
inputs=["f_cond_100"], | |
outputs=["B4"], | |
then_branch=then_body1, | |
else_branch=else_body1, | |
) | |
# second If | |
cst3 = make_node("Constant", [], ["B3"]) | |
att = AttributeProto() | |
att.name = "value" | |
att.ref_attr_name = "bias3" | |
att.type = AttributeProto.TENSOR | |
cst3.attribute.append(att) | |
then_out = make_tensor_value_info("B3", TensorProto.FLOAT, [None]) | |
then_body2 = make_graph([cst3], "then_body", [], [then_out]) | |
else_out = make_tensor_value_info("B4", TensorProto.FLOAT, [None]) | |
else_body2 = make_graph([c100, f_cond, if_node], "else_body", [], [else_out]) | |
# function | |
zero = make_node( | |
"Constant", | |
inputs=[], | |
outputs=["zero"], | |
value=from_array(np.array([0], dtype=np.float32)), | |
) | |
mini = make_node("ReduceMin", ["X"], ["Xmin"]) | |
f_cond = make_node("Less", ["Xmin", "zero"], ["f_cond_zero"]) | |
if_node = make_node( | |
"If", | |
inputs=["f_cond_zero"], | |
outputs=["B"], | |
then_branch=then_body2, | |
else_branch=else_body2, | |
) | |
node1 = make_node("MatMul", ["X", "A"], ["XA"]) | |
node2 = make_node("Add", ["XA", "B"], ["Y"]) | |
linear_regression = make_function( | |
new_domain, | |
"LinearRegression", | |
["X", "A"], | |
["Y"], | |
[zero, mini, f_cond, if_node, node1, node2], | |
opset_imports, | |
["bias1", "bias2", "bias3"], | |
) | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
A = make_tensor_value_info("A", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
graph = make_graph( | |
[ | |
make_node( | |
"LinearRegression", | |
["X", "A"], | |
["Y1"], | |
domain=new_domain, | |
bias1=make_tensor("former_B1", TensorProto.FLOAT, [1], [0.67]), | |
bias2=make_tensor("former_B2", TensorProto.FLOAT, [1], [777]), | |
bias3=make_tensor("former_B3", TensorProto.FLOAT, [1], [-888]), | |
), | |
make_node("Abs", ["Y1"], ["Y"]), | |
], | |
"example", | |
[X, A], | |
[Y], | |
) | |
onnx_model = make_model( | |
graph, opset_imports=opset_imports, functions=[linear_regression] | |
) | |
check_model(onnx_model) | |
sess = ReferenceEvaluator(onnx_model) | |
x = np.arange(6).reshape((3, 2)).astype(np.float32) | |
a = np.array([1, -1], dtype=np.float32) | |
result = sess.run(None, {"X": x + 1, "A": a})[0] | |
expected = np.abs(x @ a + 777) | |
assert_allclose(expected, result) | |
result = sess.run(None, {"X": x - 10, "A": a})[0] | |
expected = np.abs(x @ a - 888) | |
assert_allclose(expected, result) | |
result = sess.run(None, {"X": x + 1000, "A": a})[0] | |
expected = np.abs(x @ a + 0.67) | |
assert_allclose(expected, result) | |
def test_custom_node(self): | |
class _InvAlpha: | |
op_domain = "custom" | |
def __init__(self, onnx_node, run_params): # type: ignore | |
self.onnx_node = onnx_node | |
self.run_params = run_params | |
def _run(self, x): # type: ignore | |
return (1 / (x + self.alpha),) | |
class InvAlpha2(OpRun): | |
def _run(self, x): # type: ignore | |
return (1 / (x + self.alpha),) | |
class InvAlpha(OpRun): | |
op_domain = "custom" | |
def _run(self, x, alpha=None): # type: ignore | |
alpha = alpha or self.alpha # type: ignore | |
return (1 / (x + alpha),) | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("InvAlpha", ["X"], ["Y"], alpha=0.5, domain="custom") | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("custom", 1)]) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) + 1 | |
with self.assertRaises(NotImplementedError): | |
ReferenceEvaluator(onnx_model) | |
node1 = make_node("_InvAlpha", ["X"], ["Y"], alpha=0.5, domain="custom") | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("custom", 1)]) | |
with self.assertRaises(TypeError): | |
ReferenceEvaluator(onnx_model, new_ops=[_InvAlpha]) | |
node1 = make_node("InvAlpha2", ["X"], ["Y"], alpha=0.5, domain="custom") | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("custom", 1)]) | |
with self.assertRaises(NotImplementedError): | |
ReferenceEvaluator(onnx_model, new_ops=[InvAlpha2]) | |
node1 = make_node("InvAlpha", ["X"], ["Y"], alpha=0.5, domain="custom") | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("custom", 1)]) | |
sess = ReferenceEvaluator(onnx_model, new_ops=[InvAlpha, InvAlpha]) | |
got = sess.run(None, {"X": x})[0] | |
expected = 1 / (x + 0.5) | |
assert_allclose(expected, got) | |
def test_custom_no_output_tuple(self): | |
class InvAlpha(OpRun): | |
op_domain = "custom" | |
def _run(self, x, alpha=None): # type: ignore | |
alpha = alpha or self.alpha # type: ignore | |
return 1 / (x + alpha) | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("InvAlpha", ["X"], ["Y"], alpha=0.5, domain="custom") | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("custom", 1)]) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) + 1 | |
ref = ReferenceEvaluator(onnx_model, new_ops=[InvAlpha]) | |
with self.assertRaises(TypeError): | |
ref.run(None, {"X": x}) | |
def test_custom_empty_output(self): | |
class InvAlpha(OpRun): | |
op_domain = "custom" | |
def _run(self, x, alpha=None): # type: ignore | |
return tuple() | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("InvAlpha", ["X"], ["Y"], alpha=0.5, domain="custom") | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("custom", 1)]) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) + 1 | |
ref = ReferenceEvaluator(onnx_model, new_ops=[InvAlpha]) | |
with self.assertRaises(ValueError): | |
ref.run(None, {"X": x}) | |
def test_custom_tuple_tuple(self): | |
class InvAlpha(OpRun): | |
op_domain = "custom" | |
def _run(self, x, alpha=None): # type: ignore | |
alpha = alpha or self.alpha # type: ignore | |
res = tuple([tuple([1 / (x + alpha)])]) # noqa: C409 | |
assert isinstance(res, tuple) | |
assert isinstance(res[0], tuple) | |
return res | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("InvAlpha", ["X"], ["Y"], alpha=0.5, domain="custom") | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("custom", 1)]) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) + 1 | |
ref = ReferenceEvaluator(onnx_model, new_ops=[InvAlpha]) | |
with self.assertRaises(TypeError): | |
ref.run(None, {"X": x}) | |
def test_custom_tuple_unexpected_type(self): | |
class CustomType: | |
pass | |
class InvAlpha(OpRun): | |
op_domain = "custom" | |
def _run(self, x, alpha=None): # type: ignore | |
res = tuple([CustomType()]) # noqa: C409 | |
assert isinstance(res, tuple) | |
assert isinstance(res[0], CustomType) | |
return res | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("InvAlpha", ["X"], ["Y"], alpha=0.5, domain="custom") | |
graph = make_graph([node1], "rs", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("custom", 1)]) | |
x = np.arange(60).reshape((3, 4, 5)).astype(np.float32) + 1 | |
ref = ReferenceEvaluator(onnx_model, new_ops=[InvAlpha]) | |
with self.assertRaises(TypeError): | |
ref.run(None, {"X": x}) | |
def test_loop(self): | |
# Given a tensor x of values [x1, ..., xN], | |
# Return a sequence of tensors of | |
# [[x1], [x1, x2], ..., [x1, ..., xN]] | |
cond_in = make_tensor_value_info("cond_in", TensorProto.BOOL, []) | |
cond_out = make_tensor_value_info("cond_out", TensorProto.BOOL, []) | |
iter_count = make_tensor_value_info("iter_count", TensorProto.INT64, []) | |
seq_in = make_tensor_sequence_value_info("seq_in", TensorProto.FLOAT, None) | |
seq_out = make_tensor_sequence_value_info("seq_out", TensorProto.FLOAT, None) | |
x = np.array([1, 2, 3, 4, 5]).astype(np.float32) | |
x_const_node = make_node( | |
"Constant", | |
inputs=[], | |
outputs=["x"], | |
value=make_tensor( | |
name="const_tensor_x", | |
data_type=TensorProto.FLOAT, | |
dims=x.shape, | |
vals=x.flatten().astype(float), | |
), | |
) | |
one_const_node = make_node( | |
"Constant", | |
inputs=[], | |
outputs=["one"], | |
value=make_tensor( | |
name="const_tensor_one", | |
data_type=TensorProto.INT64, | |
dims=(), | |
vals=[1], | |
), | |
) | |
zero_const_node = make_node( | |
"Constant", | |
inputs=[], | |
outputs=["slice_start"], | |
value=make_tensor( | |
name="const_tensor_zero", | |
data_type=TensorProto.INT64, | |
dims=(1,), | |
vals=[0], | |
), | |
) | |
axes_node = make_node( | |
"Constant", | |
inputs=[], | |
outputs=["axes"], | |
value=make_tensor( | |
name="const_tensor_axes", | |
data_type=TensorProto.INT64, | |
dims=(), | |
vals=[0], | |
), | |
) | |
add_node = make_node("Add", inputs=["iter_count", "one"], outputs=["end"]) | |
end_unsqueeze_node = make_node( | |
"Unsqueeze", inputs=["end", "axes"], outputs=["slice_end"] | |
) | |
slice_node = make_node( | |
"Slice", inputs=["x", "slice_start", "slice_end"], outputs=["slice_out"] | |
) | |
insert_node = make_node( | |
"SequenceInsert", inputs=["seq_in", "slice_out"], outputs=["seq_out"] | |
) | |
identity_node = make_node("Identity", inputs=["cond_in"], outputs=["cond_out"]) | |
loop_body = make_graph( | |
[ | |
identity_node, | |
x_const_node, | |
one_const_node, | |
zero_const_node, | |
add_node, | |
axes_node, | |
end_unsqueeze_node, | |
slice_node, | |
insert_node, | |
], | |
"loop_body", | |
[iter_count, cond_in, seq_in], | |
[cond_out, seq_out], | |
) | |
node = make_node( | |
"Loop", | |
inputs=["trip_count", "cond", "seq_empty"], | |
outputs=["seq_res"], | |
body=loop_body, | |
) | |
node_concat = make_node( | |
"ConcatFromSequence", | |
inputs=["seq_res"], | |
outputs=["res"], | |
axis=0, | |
new_axis=0, | |
) | |
trip_count = np.array(5).astype(np.int64) | |
seq_empty = [] # type: List[Any] | |
# seq_res = [x[:int(i)] for i in x] | |
cond = np.array(1).astype(np.bool_) | |
model_def = make_model( | |
graph=make_graph( | |
name="loop_test", | |
inputs=[ | |
make_tensor_value_info( | |
"trip_count", TensorProto.INT64, trip_count.shape | |
), | |
make_tensor_value_info("cond", TensorProto.BOOL, cond.shape), | |
make_sequence_value_info("seq_empty", TensorProto.FLOAT, []), | |
], | |
outputs=[make_tensor_value_info("res", TensorProto.FLOAT, None)], | |
nodes=[node, node_concat], | |
) | |
) | |
expected = np.array( | |
[1.0, 1.0, 2.0, 1.0, 2.0, 3.0, 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0], | |
dtype=np.float32, | |
) | |
oinf = ReferenceEvaluator(model_def) | |
inputs = {"trip_count": trip_count, "cond": cond, "seq_empty": seq_empty} | |
got = oinf.run(None, inputs) | |
assert_allclose(expected, got[0]) | |
def test_onnxt_runtime_bernoulli(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("Bernoulli", ["X"], ["Y"], seed=0.0) | |
graph = make_graph([node1], "g", [X], [Y]) | |
onnx_model = make_model(graph) | |
check_model(onnx_model) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": np.zeros((2, 4), dtype=np.float32) + 0.5})[0] | |
self.assertEqual(got.shape, (2, 4)) | |
self.assertEqual(got.dtype, np.float32) | |
self.assertGreater(got.min(), -1e-5) | |
self.assertLess(got.max(), 1 + 1e-5) | |
def test_onnxt_runtime_random_uniform(self): | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("RandomUniform", [], ["Y"], seed=0.0, shape=[2, 4]) | |
graph = make_graph([node1], "g", [], [Y]) | |
onnx_model = make_model(graph) | |
check_model(onnx_model) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {})[0] | |
self.assertEqual(got.shape, (2, 4)) | |
self.assertEqual(got.dtype, np.float32) | |
self.assertGreater(got.min(), 0) | |
self.assertLess(got.max(), 1) | |
def test_onnxt_runtime_random_uniform_like(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("RandomUniformLike", ["X"], ["Y"], seed=0.0) | |
graph = make_graph([node1], "g", [X], [Y]) | |
onnx_model = make_model(graph) | |
check_model(onnx_model) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": np.zeros((2, 4), dtype=np.float32)})[0] | |
self.assertEqual(got.shape, (2, 4)) | |
self.assertEqual(got.dtype, np.float32) | |
self.assertGreater(got.min(), 0) | |
self.assertLess(got.max(), 1) | |
def test_onnxt_runtime_random_normal(self): | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("RandomNormal", [], ["Y"], seed=0.0, shape=[2, 4]) | |
graph = make_graph([node1], "g", [], [Y]) | |
onnx_model = make_model(graph) | |
check_model(onnx_model) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {})[0] | |
self.assertEqual(got.shape, (2, 4)) | |
self.assertEqual(got.dtype, np.float32) | |
def test_onnxt_runtime_random_normal_like(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("RandomNormalLike", ["X"], ["Y"], seed=0.0) | |
graph = make_graph([node1], "g", [X], [Y]) | |
onnx_model = make_model(graph) | |
check_model(onnx_model) | |
sess = ReferenceEvaluator(onnx_model) | |
got = sess.run(None, {"X": np.zeros((2, 4), dtype=np.float32)})[0] | |
self.assertEqual(got.shape, (2, 4)) | |
self.assertEqual(got.dtype, np.float32) | |
def test_eval_celu(self): | |
inst = Celu.create(alpha=0.5) | |
self.assertEqual(inst.alpha, 0.5) | |
x = np.array([[0, 1], [-1, 2]], dtype=np.float32) | |
y = Celu.eval(x, alpha=0.5) | |
expected = _vcelu1(x, alpha=0.5) | |
assert_allclose(expected, y) | |
def test_eval_cast(self): | |
x = np.array([[0, 1], [-1, 2]], dtype=np.float32) | |
y = Cast_19.eval(x, to=TensorProto.FLOAT8E4M3FN) | |
dy = Cast_19.eval(y, to=TensorProto.FLOAT) | |
expected = x | |
assert_allclose(expected, dy) | |
def test_eval_celu_load_op(self): | |
celu = load_op("", "Celu") | |
self.assertEqual(celu.op_domain, "") | |
inst = celu.create(alpha=0.5) | |
self.assertEqual(inst.alpha, 0.5) | |
x = np.array([[0, 1], [-1, 2]], dtype=np.float32) | |
y = celu.eval(x, alpha=0.5) | |
expected = _vcelu1(x, alpha=0.5) | |
assert_allclose(expected, y) | |
def test_create_adam(self): | |
inst = Adam.create(alpha=0.5) | |
self.assertEqual(inst.alpha, 0.5) | |
def test_conv(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None, None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None, None]) | |
B = make_tensor_value_info("B", TensorProto.FLOAT, [None, None, None, None]) | |
W = make_tensor_value_info("W", TensorProto.FLOAT, [None, None, None, None]) | |
node = make_node( | |
"Conv", | |
["X", "W", "B"], | |
["Y"], | |
pads=[1, 1, 1, 1], | |
dilations=[1, 1], | |
strides=[2, 2], | |
) | |
graph = make_graph([node], "g", [X, W, B], [Y]) | |
onnx_model = make_model_gen_version(graph, opset_imports=[make_opsetid("", 16)]) | |
sess1 = run_ort_inference(onnx_model) | |
if sess1 is None: | |
return | |
sess2 = ReferenceEvaluator(onnx_model, optimized=False) | |
self.assertIsInstance(sess2.rt_nodes_[0], Conv) | |
sess3 = ReferenceEvaluator(onnx_model, new_ops=[ConvOptimized], optimized=False) | |
self.assertIsInstance(sess3.rt_nodes_[0], ConvOptimized) | |
sess4 = ReferenceEvaluator(onnx_model, optimized=True) | |
self.assertIsInstance(sess4.rt_nodes_[0], ConvOptimized) | |
sH, sW = 5, 6 | |
for i in range(sH): | |
for j in range(sW): | |
X = np.zeros((1, 1, sH, sW), dtype=np.float32) | |
X[0, 0, i, j] = 1.0 | |
W = np.zeros((1, 1, 3, 3), dtype=np.float32) | |
W[0, 0, :, :] = np.minimum(2 ** np.arange(9).reshape((3, -1)), 256) | |
B = np.array([[[[0]]]], dtype=np.float32) | |
expected = sess1.run(None, {"X": X, "W": W, "B": B})[0] | |
got = sess2.run(None, {"X": X, "W": W, "B": B})[0] | |
assert_allclose(expected, got) | |
got3 = sess3.run(None, {"X": X, "W": W, "B": B})[0] | |
assert_allclose(expected, got3) | |
got4 = sess4.run(None, {"X": X, "W": W, "B": B})[0] | |
assert_allclose(expected, got4) | |
def test_qlinearconv(self): | |
x = make_tensor_value_info("x", TensorProto.UINT8, [None, None, None, None]) | |
w = make_tensor_value_info("w", TensorProto.UINT8, [None, None, None, None]) | |
y = make_tensor_value_info("y", TensorProto.UINT8, [None, None, None, None]) | |
x_scale = make_tensor_value_info("x_scale", TensorProto.FLOAT, [None]) | |
w_scale = make_tensor_value_info("w_scale", TensorProto.FLOAT, [None]) | |
y_scale = make_tensor_value_info("y_scale", TensorProto.FLOAT, [None]) | |
x_zero_point = make_tensor_value_info("x_zero_point", TensorProto.UINT8, [None]) | |
w_zero_point = make_tensor_value_info("w_zero_point", TensorProto.UINT8, [None]) | |
y_zero_point = make_tensor_value_info("y_zero_point", TensorProto.UINT8, [None]) | |
node = make_node( | |
"QLinearConv", | |
[ | |
"x", | |
"x_scale", | |
"x_zero_point", | |
"w", | |
"w_scale", | |
"w_zero_point", | |
"y_scale", | |
"y_zero_point", | |
], | |
["y"], | |
) | |
graph = make_graph( | |
[node], | |
"g", | |
[x, x_scale, x_zero_point, w, w_scale, w_zero_point, y_scale, y_zero_point], | |
[y], | |
) | |
onnx_model = make_model_gen_version(graph, opset_imports=[make_opsetid("", 16)]) | |
sess1 = run_ort_inference(onnx_model) | |
if sess1 is None: | |
return | |
sess2 = ReferenceEvaluator(onnx_model) | |
sH, sW = 3, 3 | |
for i in range(sH): | |
for j in range(sW): | |
x = np.zeros((1, 1, sH, sW), dtype=np.uint8) | |
x[0, 0, i, j] = 1.0 | |
with self.subTest(w="1x1", i=i, j=j): | |
w = np.zeros((1, 1, 1, 1), dtype=np.uint8) | |
w[0, 0, :, :] = 1 | |
feeds = { | |
"x": x, | |
"x_scale": np.array([1], dtype=np.float32), | |
"x_zero_point": np.array([0], dtype=np.uint8), | |
"w": w, | |
"w_scale": np.array([1], dtype=np.float32), | |
"w_zero_point": np.array([0], dtype=np.uint8), | |
"y_scale": np.array([1], dtype=np.float32), | |
"y_zero_point": np.array([0], np.uint8), | |
} | |
expected = sess1.run(None, feeds)[0] | |
got = sess2.run(None, feeds)[0] | |
try: | |
assert_allclose(expected, got) | |
except AssertionError as e: | |
raise e | |
with self.subTest(w="3x3", i=i, j=j): | |
w = np.zeros((1, 1, 3, 3), dtype=np.uint8) | |
w[0, 0, :, :] = np.minimum(2 ** np.arange(9).reshape((3, -1)), 128) | |
feeds = { | |
"x": x, | |
"x_scale": np.array([1], dtype=np.float32), | |
"x_zero_point": np.array([0], dtype=np.uint8), | |
"w": w, | |
"w_scale": np.array([1], dtype=np.float32), | |
"w_zero_point": np.array([0], dtype=np.uint8), | |
"y_scale": np.array([1], dtype=np.float32), | |
"y_zero_point": np.array([0], np.uint8), | |
} | |
expected = sess1.run(None, feeds)[0] | |
got = sess2.run(None, feeds)[0] | |
assert_allclose(expected, got) | |
with self.subTest(w="1x1", i=i, j=j): | |
w = np.zeros((1, 1, 1, 1), dtype=np.uint8) | |
w[0, 0, :, :] = 0 | |
feeds = { | |
"x": x, | |
"x_scale": np.array([0.00369204697], dtype=np.float32), | |
"x_zero_point": np.array([132], dtype=np.uint8), | |
"w": w, | |
"w_scale": np.array([100.001727945750], dtype=np.float32), | |
"w_zero_point": np.array([255], dtype=np.uint8), | |
"y_scale": np.array([0.00162681262], dtype=np.float32), | |
"y_zero_point": np.array([132], np.uint8), | |
} | |
expected = sess1.run(None, feeds)[0] | |
got = sess2.run(None, feeds)[0] | |
assert_allclose(expected, got) | |
x = np.array( | |
[ | |
[255, 174, 162, 25, 203, 168, 58], | |
[15, 59, 237, 95, 129, 0, 64], | |
[56, 242, 153, 221, 168, 12, 166], | |
[232, 178, 186, 195, 237, 162, 237], | |
[188, 39, 124, 77, 80, 102, 43], | |
[127, 230, 21, 83, 41, 40, 134], | |
[255, 154, 92, 141, 42, 148, 247], | |
], | |
dtype=np.uint8, | |
).reshape((1, 1, 7, 7)) | |
x_scale = np.array([0.00369204697], dtype=np.float32) | |
x_zero_point = np.array([132], dtype=np.uint8) | |
w = np.array([0], dtype=np.uint8).reshape((1, 1, 1, 1)) | |
w_scale = np.array([0.00172794575], dtype=np.float32) | |
w_zero_point = np.array([255], dtype=np.uint8) | |
y_scale = np.array([0.00162681262], dtype=np.float32) | |
y_zero_point = np.array([123], dtype=np.uint8) | |
feeds = { | |
"x": x, | |
"x_scale": x_scale, | |
"x_zero_point": x_zero_point, | |
"w": w, | |
"w_scale": w_scale, | |
"w_zero_point": w_zero_point, | |
"y_scale": y_scale, | |
"y_zero_point": y_zero_point, | |
} | |
expected = sess1.run(None, feeds)[0] | |
got = sess2.run(None, feeds)[0] | |
assert_allclose(expected, got) | |
def common_test_im2col(self, kernel_shape, pads, strides, dilations): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None, None, None]) | |
Y1 = make_tensor_value_info("Y1", TensorProto.FLOAT, [None, None, None, None]) | |
Y2 = make_tensor_value_info("Y2", TensorProto.FLOAT, [None, None, None, None]) | |
W = make_tensor_value_info("W", TensorProto.FLOAT, [None, None, None, None]) | |
node = make_node( | |
"Conv", ["X", "W"], ["Y1"], pads=pads, strides=strides, dilations=dilations | |
) | |
node_shape = make_node("Shape", ["W"], ["shape"]) | |
node_im = make_node( | |
"Im2Col", | |
["X", "shape"], | |
["xim"], | |
pads=pads, | |
strides=strides, | |
dilations=dilations, | |
domain="experimental", | |
) | |
node_flat = make_node("Flatten", ["W"], ["wflat"]) | |
node_gem = make_node("MatMul", ["wflat", "xim"], ["Y2"]) | |
graph = make_graph( | |
[node, node_shape, node_im, node_flat, node_gem], | |
"g", | |
[X, W], | |
[Y1, Y2], | |
) | |
onnx_model = make_model( | |
graph, opset_imports=[make_opsetid("", 16), make_opsetid("experimental", 1)] | |
) | |
graph_conv = make_graph([node], "g", [X, W], [Y1]) | |
onnx_model_conv = make_model_gen_version( | |
graph_conv, opset_imports=[make_opsetid("", 16)] | |
) | |
sess = ReferenceEvaluator(onnx_model) | |
try: | |
sess_conv = run_ort_inference(onnx_model_conv) | |
if sess_conv is None: | |
return | |
except ImportError: | |
sess_conv = None | |
sH, sW = 7, 7 | |
nker = np.prod(kernel_shape) | |
for i in range(sH): | |
for j in range(sW): | |
X = np.zeros((1, 1, sH, sW), dtype=np.float32) | |
X[0, 0, i, j] = 1.0 | |
W = np.zeros( | |
(1, 1, *kernel_shape), | |
dtype=np.float32, | |
) | |
W[0, 0, :, :] = np.minimum( | |
2 ** np.arange(nker).reshape((kernel_shape[0], -1)), 256 | |
) | |
got = sess.run(None, {"X": X, "W": W}) | |
if sess_conv is not None: | |
ort_res = sess_conv.run(None, {"X": X, "W": W})[0] | |
assert_allclose(got[1].ravel(), ort_res.ravel()) | |
try: | |
assert_allclose(got[0].ravel(), got[1].ravel()) | |
except AssertionError as e: | |
raise AssertionError( | |
f"Discrepancies: pads={pads}, dilations={dilations}, strides={strides}, " | |
f"kernel_shape={kernel_shape}" | |
f"\n{got[0]}\n!=\n{got[1]}" | |
) from e | |
def test_im2col_1x1(self): | |
self.common_test_im2col( | |
(1, 1), pads=[1, 1, 1, 2], strides=[1, 1], dilations=[1, 1] | |
) | |
def test_im2col_2x2(self): | |
self.common_test_im2col( | |
(2, 2), pads=[1, 1, 1, 2], strides=[1, 1], dilations=[1, 1] | |
) | |
def test_im2col_3x3(self): | |
self.common_test_im2col( | |
(3, 3), pads=[1, 1, 1, 2], strides=[1, 1], dilations=[1, 1] | |
) | |
def test_im2col_3x3_pads(self): | |
self.common_test_im2col( | |
(3, 3), pads=[0, 1, 2, 3], strides=[1, 1], dilations=[1, 1] | |
) | |
def test_im2col_3x3_strides(self): | |
self.common_test_im2col( | |
(3, 3), pads=[0, 1, 1, 1], strides=[1, 2], dilations=[1, 1] | |
) | |
def test_im2col_5x5(self): | |
self.common_test_im2col( | |
(5, 5), pads=[1, 1, 1, 2], strides=[1, 1], dilations=[1, 1] | |
) | |
def test_col2im(self): | |
import torch | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None]) | |
IS = make_tensor_value_info("I", TensorProto.INT64, [None]) | |
BS = make_tensor_value_info("B", TensorProto.INT64, [None]) | |
node = make_node( | |
"Col2Im", | |
["X", "I", "B"], | |
["Y"], | |
pads=[0, 0, 0, 0], | |
strides=[1, 1], | |
dilations=[1, 1], | |
) | |
graph = make_graph([node], "g", [X, IS, BS], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 16)]) | |
sess = ReferenceEvaluator(onnx_model) | |
X = np.array( | |
[ | |
[ | |
[1.0, 6.0, 11.0, 16.0, 21.0], | |
[2.0, 7.0, 12.0, 17.0, 22.0], | |
[3.0, 8.0, 13.0, 18.0, 23.0], | |
[4.0, 9.0, 14.0, 19.0, 24.0], | |
[5.0, 0.0, 15.0, 20.0, 25.0], | |
] | |
] | |
).astype(np.float32) | |
image_shape = np.array([5, 5]).astype(np.int64) | |
block_shape = np.array([1, 5]).astype(np.int64) | |
fold = torch.nn.Fold(output_size=tuple(image_shape), kernel_size=block_shape) | |
got = sess.run(None, {"X": X, "B": block_shape, "I": image_shape}) | |
output = fold(torch.from_numpy(X)).numpy() | |
assert_allclose(output, got[0]) | |
def common_test_col2im( | |
self, size, image_shape, block_shape, pads, strides, dilations | |
): | |
import torch | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None]) | |
IS = make_tensor_value_info("I", TensorProto.INT64, [None]) | |
BS = make_tensor_value_info("B", TensorProto.INT64, [None]) | |
node = make_node( | |
"Col2Im", | |
["X", "I", "B"], | |
["Y"], | |
pads=pads, | |
strides=strides, | |
dilations=dilations, | |
) | |
graph = make_graph([node], "g", [X, IS, BS], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 16)]) | |
sess = ReferenceEvaluator(onnx_model) | |
fold = torch.nn.Fold( | |
output_size=tuple(image_shape), | |
kernel_size=tuple(block_shape), | |
dilation=tuple(dilations), | |
padding=min(pads), | |
stride=tuple(strides), | |
) | |
nker = np.prod(block_shape) | |
for i in range(nker): | |
for j in range(size): | |
X = np.zeros((1, nker, size), dtype=np.float32) | |
X[0, i, j] = 1.0 | |
i_shape = np.array(image_shape, dtype=np.int64) | |
b_shape = np.array(block_shape, dtype=np.int64) | |
output = fold(torch.from_numpy(X)).numpy() | |
got = sess.run(None, {"X": X, "B": b_shape, "I": i_shape}) | |
# print(output) | |
# print(got) | |
assert_allclose(output, got[0]) | |
def test_col2im_2x3(self): | |
self.common_test_col2im( | |
10, (6, 4), (2, 3), pads=[0, 0, 0, 0], strides=[1, 1], dilations=[1, 1] | |
) | |
def test_col2im_2x3_pads(self): | |
self.common_test_col2im( | |
28, (6, 4), (2, 3), pads=[1, 1, 1, 1], strides=[1, 1], dilations=[1, 1] | |
) | |
def test_col2im_2d(self): | |
data = np.zeros([6, 28], dtype=np.float32) | |
data[0][0] = 1.0 | |
image_shape, kernel_shape, dilations, pads, stride = ( | |
np.array([6, 4]), | |
(2, 3), | |
np.array([1, 1]), | |
np.array([1, 1, 1, 1]), | |
np.array([1, 1]), | |
) | |
r1 = _col2im_naive_implementation_2d( | |
data, image_shape, kernel_shape, dilations, pads, stride | |
) | |
r2 = col2im_naive_implementation( | |
data, image_shape, kernel_shape, dilations, pads, stride | |
) | |
assert_allclose(r1, r2) | |
def test_conv_im2col_group4(self): | |
# model 1 | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [2, 4, 6, 6]) | |
W = make_tensor_value_info("W", TensorProto.FLOAT, [4, 1, 3, 3]) | |
B = make_tensor_value_info("B", TensorProto.FLOAT, [4]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [2, 4, 6, 6]) | |
node = make_node( | |
"Conv", | |
["X", "W", "B"], | |
["Y"], | |
group=4, | |
dilations=[1, 1], | |
kernel_shape=[3, 3], | |
pads=[1, 1, 1, 1], | |
strides=[1, 1], | |
) | |
graph = make_graph([node], "g", [X, W, B], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 16)]) | |
feeds = { | |
"X": np.arange(2 * 4 * 6 * 6).reshape((2, 4, 6, 6)).astype(np.float32), | |
"W": np.array( | |
[ | |
[ | |
[ | |
[ | |
-0.026239916682243347, | |
0.07565222680568695, | |
-0.03209298849105835, | |
], | |
[ | |
-0.08708783239126205, | |
0.0961190015077591, | |
0.13418219983577728, | |
], | |
[ | |
0.1598859578371048, | |
0.03840477764606476, | |
-0.13170936703681946, | |
], | |
] | |
], | |
[ | |
[ | |
[ | |
-0.0689004510641098, | |
0.1408083587884903, | |
-0.03717087209224701, | |
], | |
[ | |
0.030967697501182556, | |
0.0263785719871521, | |
-0.0899493545293808, | |
], | |
[ | |
0.07828782498836517, | |
-0.06266771256923676, | |
0.10750330984592438, | |
], | |
] | |
], | |
[ | |
[ | |
[ | |
0.020227551460266113, | |
-0.04353883117437363, | |
-0.10938453674316406, | |
], | |
[ | |
-0.14101561903953552, | |
-0.03393106162548065, | |
0.12139306962490082, | |
], | |
[ | |
0.02838282287120819, | |
0.13864465057849884, | |
-0.06065710633993149, | |
], | |
] | |
], | |
[ | |
[ | |
[ | |
-0.06511610746383667, | |
-0.05987360328435898, | |
-0.008047685027122498, | |
], | |
[ | |
0.07340313494205475, | |
0.0326494425535202, | |
0.012516498565673828, | |
], | |
[ | |
0.13260947167873383, | |
-0.022225692868232727, | |
-0.11167611926794052, | |
], | |
] | |
], | |
], | |
dtype=np.float32, | |
), | |
"B": np.array( | |
[ | |
-0.1457933485507965, | |
-0.07481209933757782, | |
-0.05890338122844696, | |
-0.11964251846075058, | |
], | |
dtype=np.float32, | |
), | |
} | |
feeds["B"][:] = 0 | |
# model 2 | |
X = feeds["X"] | |
W = feeds["W"] | |
B = feeds["B"] | |
Y = np.empty((2, 4, 6, 6), dtype=X.dtype) | |
for b in range(X.shape[0]): | |
for g in range(4): | |
x = X[b : b + 1, g : g + 1] | |
w = W[g] | |
c2 = im2col(x, (3, 3), [1, 1], [1, 1, 1, 1], [1, 1]) | |
mul = np.matmul(c2, w.flatten()) | |
mul = mul + B[g] | |
Y[b, g, :, :] = mul | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
assert_allclose(Y, got1[0], atol=1e-5) | |
def test_conv_strides(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [1, 3, 6, 6]) | |
W = make_tensor_value_info("W", TensorProto.FLOAT, [2, 3, 3, 3]) | |
B = make_tensor_value_info("B", TensorProto.FLOAT, [2]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None, None]) | |
node = make_node( | |
"Conv", | |
["X", "W", "B"], | |
["Y"], | |
group=1, | |
dilations=[1, 1], | |
kernel_shape=[3, 3], | |
pads=[1, 1, 1, 1], | |
strides=[2, 2], | |
) | |
graph = make_graph([node], "g", [X, W, B], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 16)]) | |
feeds = { | |
"X": np.arange(1 * 3 * 6 * 6).reshape((1, 3, 6, 6)).astype(np.float32) + 1, | |
"W": np.zeros((2, 3, 3, 3), dtype=np.float32), | |
"B": np.zeros((2,), dtype=np.float32), | |
} | |
feeds["W"][0, 0, 0, 1] = 1 | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
expected = np.array( | |
[ | |
[ | |
[[0.0, 0.0, 0.0], [7.0, 9.0, 11.0], [19.0, 21.0, 23.0]], | |
[[0.0, 0.0, 0.0], [0.0, 0.0, 0.0], [0.0, 0.0, 0.0]], | |
] | |
], | |
dtype=np.float32, | |
) | |
assert_allclose(expected, got1[0]) | |
def test_max_pool_2d_1(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None, None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None, None]) | |
node = make_node( | |
"MaxPool", | |
["X"], | |
["Y"], | |
kernel_shape=[3, 3], | |
pads=[1, 1, 1, 1], | |
strides=[2, 2], | |
) | |
graph = make_graph([node], "g", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 16)]) | |
feeds = {"X": np.arange(49)[::-1].reshape((1, 1, 7, 7)).astype(np.float32)} | |
expected = np.array( | |
[ | |
[ | |
[ | |
[48.0, 47.0, 45.0, 43.0], | |
[41.0, 40.0, 38.0, 36.0], | |
[27.0, 26.0, 24.0, 22.0], | |
[13.0, 12.0, 10.0, 8.0], | |
] | |
] | |
], | |
dtype=np.float32, | |
) | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
assert_allclose(expected, got1[0]) | |
def test_max_pool_2d_2(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None, None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None, None]) | |
node = make_node( | |
"MaxPool", | |
["X"], | |
["Y"], | |
kernel_shape=[3, 3], | |
pads=[1, 1, 1, 1], | |
strides=[2, 2], | |
) | |
graph = make_graph([node], "g", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 16)]) | |
feeds = { | |
"X": np.array( | |
[ | |
[ | |
[ | |
[683, 358, 726, 578, 650, 946, 200], | |
[679, 260, 264, 5, 240, 255, 582], | |
[322, 66, 687, 632, 852, 698, 428], | |
[111, 452, 627, 332, 751, 842, 685], | |
[472, 52, 956, 81, 807, 827, 360], | |
[972, 574, 81, 799, 646, 499, 486], | |
[892, 758, 75, 833, 972, 415, 736], | |
] | |
] | |
], | |
dtype=np.float32, | |
) | |
} | |
expected = np.array( | |
[ | |
[ | |
[ | |
[683.0, 726.0, 946.0, 946.0], | |
[679.0, 687.0, 852.0, 842.0], | |
[972.0, 956.0, 842.0, 842.0], | |
[972.0, 833.0, 972.0, 736.0], | |
] | |
] | |
], | |
dtype=np.float32, | |
) | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
assert_allclose(expected, got1[0]) | |
def test_scatter_elements(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Ind = make_tensor_value_info("I", TensorProto.INT64, [None, None]) | |
U = make_tensor_value_info("U", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None]) | |
node = make_node( | |
"ScatterElements", | |
["X", "I", "U"], | |
["Y"], | |
axis=1, | |
reduction="min", | |
) | |
graph = make_graph([node], "g", [X, Ind, U], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 16)]) | |
feeds = { | |
"X": np.array([[1.0, 2.0, 3.0, 4.0, 5.0]], dtype=np.float32), | |
"I": np.array([[1, 1]]), | |
"U": np.array([[1.1, 2.1]], dtype=np.float32), | |
} | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
expected = np.array([[1.0, 1.1, 3.0, 4.0, 5.0]], dtype=np.float32) | |
assert_allclose(expected, got1[0]) | |
def test_scatternd(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Ind = make_tensor_value_info("I", TensorProto.INT64, [None, None]) | |
U = make_tensor_value_info("U", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None]) | |
node = make_node( | |
"ScatterND", | |
["X", "I", "U"], | |
["Y"], | |
) | |
graph = make_graph([node], "g", [X, Ind, U], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 16)]) | |
feeds = { | |
"X": np.array([[1.0, 2.0]], dtype=np.float32), | |
"I": np.array([[0, 0]]), | |
"U": np.array([3.0], dtype=np.float32), | |
} | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
expected = np.array([[3.0, 2.0]], dtype=np.float32) | |
assert_allclose(expected, got1[0]) | |
def test_col2im_impl(self): | |
def get_im2col_indices( | |
x_shape, field_height, field_width, padding=None, stride=1 | |
): | |
# source: https://stackoverflow.com/questions/51703367/col2im-implementation-in-convnet | |
N, C, H, W = x_shape | |
del N # Unused | |
assert (H + padding[0] + padding[2] - field_height) % stride == 0 | |
assert (W + padding[1] + padding[3] - field_height) % stride == 0 | |
out_height = (H + padding[0] + padding[2] - field_height) // stride + 1 | |
out_width = (W + padding[1] + padding[3] - field_width) // stride + 1 | |
i0 = np.repeat(np.arange(field_height), field_width) | |
i0 = np.tile(i0, C) | |
i1 = stride * np.repeat(np.arange(out_height), out_width) | |
j0 = np.tile(np.arange(field_width), field_height * C) | |
j1 = stride * np.tile(np.arange(out_width), out_height) | |
i = i0.reshape(-1, 1) + i1.reshape(1, -1) | |
j = j0.reshape(-1, 1) + j1.reshape(1, -1) | |
k = np.repeat(np.arange(C), field_height * field_width).reshape(-1, 1) | |
return (k, i, j) | |
def col2im_indices( | |
cols, x_shape, field_height=3, field_width=3, padding=None, stride=1 | |
): | |
# source: https://stackoverflow.com/questions/51703367/col2im-implementation-in-convnet | |
N, C, H, W = x_shape | |
H_padded, W_padded = ( | |
H + padding[0] + padding[2], | |
W + padding[1] + padding[3], | |
) | |
x_padded = np.zeros((N, C, H_padded, W_padded), dtype=cols.dtype) | |
k, i, j = get_im2col_indices( | |
x_shape, field_height, field_width, padding, stride | |
) | |
cols_reshaped = cols.reshape(C * field_height * field_width, -1, N) | |
cols_reshaped = cols_reshaped.transpose(2, 0, 1) | |
np.add.at(x_padded, (slice(None), k, i, j), cols_reshaped) | |
padding = padding.copy() | |
if padding[2] == 0: | |
padding[2] += x_padded.shape[2] | |
elif padding[2] > 0: | |
padding[2] *= -1 | |
if padding[3] == 0: | |
padding[3] += x_padded.shape[3] | |
elif padding[3] > 0: | |
padding[3] *= -1 | |
res = x_padded[:, :, padding[0] : padding[2], padding[1] : padding[3]] | |
return res | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None, None]) | |
IS = make_tensor_value_info("IS", TensorProto.INT64, [None]) | |
BS = make_tensor_value_info("BS", TensorProto.INT64, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None, None]) | |
node = make_node("Col2Im", ["X", "IS", "BS"], ["Y"], pads=[0, 1, 0, 1]) | |
graph = make_graph([node], "g", [X, IS, BS], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 16)]) | |
feeds = { | |
"X": np.arange(5 * 15).astype(np.float32).reshape((1, 5, 15)), | |
"IS": np.array([5, 5]), | |
"BS": np.array([1, 5]), | |
} | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
expected = col2im_indices( | |
feeds["X"], | |
(1, 1, 5, 5), | |
field_height=1, | |
field_width=5, | |
padding=[0, 1, 0, 1], | |
) | |
assert_allclose(expected, got1[0]) | |
def test_conv_transpose_2d(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None, None, None]) | |
W = make_tensor_value_info("W", TensorProto.FLOAT, [None, None, None, None]) | |
B = make_tensor_value_info("B", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None, None]) | |
node = make_node( | |
"ConvTranspose", | |
["X", "W", "B"], | |
["Y"], | |
dilations=[1, 1], | |
kernel_shape=[3, 3], | |
output_padding=[0, 0], | |
pads=[1, 1, 1, 1], | |
strides=[1, 1], | |
) | |
graph = make_graph([node], "g", [X, W, B], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 16)]) | |
feeds = { | |
"X": np.arange(1 * 3 * 5 * 4).reshape((1, 3, 5, 4)).astype(np.float32), | |
"W": np.arange(3 * 1 * 3 * 3).reshape((3, 1, 3, 3)).astype(np.float32), | |
"B": np.array([0, 0, 0, 0], dtype=np.float32), | |
} | |
# import torch | |
# ex = torch.nn.functional.conv_transpose2d( | |
# torch.Tensor(feeds["X"]), torch.Tensor(feeds["W"]), | |
# bias=None, stride=1, padding=1, output_padding=0, groups=1, dilation=1) | |
# print(ex) | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
expected = np.array( | |
[ | |
[ | |
[ | |
[4371, 6855, 7062, 4929], | |
[7524, 11781, 12132, 8451], | |
[8424, 13185, 13536, 9423], | |
[9324, 14589, 14940, 10395], | |
[7197, 11229, 11490, 7971], | |
], | |
] | |
], | |
dtype=np.float32, | |
) | |
assert_allclose(expected, got1[0]) | |
feeds["X"] *= 0 | |
feeds["X"][0, 0, 0, 0] = 1 | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
expected = np.array( | |
[ | |
[ | |
[ | |
[4, 5, 0, 0], | |
[7, 8, 0, 0], | |
[0, 0, 0, 0], | |
[0, 0, 0, 0], | |
[0, 0, 0, 0], | |
] | |
] | |
], | |
dtype=np.float32, | |
) | |
assert_allclose(expected, got1[0]) | |
def test_conv_transpose_2d_upper(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None, None, None]) | |
W = make_tensor_value_info("W", TensorProto.FLOAT, [None, None, None, None]) | |
B = make_tensor_value_info("B", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None, None]) | |
node = make_node( | |
"ConvTranspose", | |
["X", "W", "B"], | |
["Y"], | |
auto_pad="SAME_UPPER", | |
strides=[2, 2], | |
# output_shape=[6, 6], | |
) | |
graph = make_graph([node], "g", [X, W, B], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 16)]) | |
feeds = { | |
"X": np.arange(1 * 1 * 3 * 3).reshape((1, 1, 3, 3)).astype(np.float32), | |
"W": np.arange(1 * 2 * 3 * 3).reshape((1, 2, 3, 3)).astype(np.float32), | |
"B": np.array([0, 0, 0, 0], dtype=np.float32), | |
} | |
expected = np.array( | |
[ | |
[ | |
[ | |
[0, 0, 0, 1, 2, 2], | |
[0, 0, 3, 4, 11, 8], | |
[0, 3, 12, 11, 28, 19], | |
[9, 12, 27, 16, 35, 20], | |
[18, 27, 60, 35, 76, 43], | |
[18, 24, 51, 28, 59, 32], | |
], | |
[ | |
[0, 0, 9, 10, 29, 20], | |
[0, 0, 12, 13, 38, 26], | |
[27, 30, 84, 56, 136, 82], | |
[36, 39, 90, 52, 116, 65], | |
[99, 108, 240, 134, 292, 160], | |
[72, 78, 168, 91, 194, 104], | |
], | |
] | |
], | |
dtype=np.float32, | |
) | |
# import onnxruntime | |
# ref0 = onnxruntime.InferenceSession(onnx_model.SerializeToString(), providers=["CPUExecutionProvider"]) | |
# got0 = ref0.run(None, feeds) | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
assert_allclose(expected, got1[0]) | |
def test_stft(self): | |
signal = make_tensor_value_info("signal", TensorProto.FLOAT, [None, None, None]) | |
frame_step = make_tensor_value_info("frame_step", TensorProto.INT64, [None]) | |
frame_length = make_tensor_value_info("frame_length", TensorProto.INT64, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None, None]) | |
node = make_node( | |
"STFT", | |
["signal", "frame_step", "", "frame_length"], | |
["Y"], | |
) | |
graph = make_graph([node], "g", [signal, frame_step, frame_length], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 17)]) | |
feeds = { | |
"signal": np.arange(128).reshape((1, 128, 1)).astype(np.float32), | |
"frame_step": np.array(8, dtype=np.int64), | |
"frame_length": np.array(16, dtype=np.int64), | |
} | |
signal = feeds["signal"] | |
frame_length = int(feeds["frame_length"]) | |
frame_step = int(feeds["frame_step"]) | |
onesided_length = (frame_length // 2) + 1 | |
nstfts = ((feeds["signal"].shape[1] - frame_length) // frame_step) + 1 | |
# [batch_size][frames][frame_length][2] | |
expected = np.empty([1, nstfts, onesided_length, 2], dtype=np.float32) | |
for i in range(nstfts): | |
start = i * frame_step | |
stop = i * frame_step + frame_length | |
complex_out = np.fft.fft(signal[0, start:stop, 0]) | |
c_out = complex_out[0:onesided_length] | |
expected[0, i] = np.stack((c_out.real, c_out.imag), axis=1) | |
# import torch | |
# correspondance with torch | |
# hop_length = frame_step | |
# window = np.ones((frame_length,), dtype=np.float32) | |
# ex = torch.stft( | |
# torch.Tensor(feeds["signal"][:, :, 0]), | |
# n_fft=frame_length, window=torch.Tensor(window), | |
# hop_length=hop_length, win_length=frame_length, | |
# onesided=True, return_complex=True, center=False, | |
# normalized=False) | |
# ex = np.transpose(ex.numpy(), [0, 2, 1]) | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
assert_allclose(expected, got1[0]) | |
def test_stft_with_window(self): | |
signal = make_tensor_value_info("signal", TensorProto.FLOAT, [None, None, None]) | |
frame_step = make_tensor_value_info("frame_step", TensorProto.INT64, [None]) | |
window = make_tensor_value_info("window", TensorProto.FLOAT, [None]) | |
frame_length = make_tensor_value_info("frame_length", TensorProto.INT64, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None, None]) | |
node = make_node( | |
"STFT", | |
["signal", "frame_step", "window", "frame_length"], | |
["Y"], | |
) | |
graph = make_graph([node], "g", [signal, frame_step, window, frame_length], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 17)]) | |
feeds = { | |
"signal": np.arange(128).reshape((1, 128, 1)).astype(np.float32), | |
"frame_step": np.array(8, dtype=np.int64), | |
"window": 0.5 | |
+ 0.5 * np.cos(2 * np.pi * np.arange(0, 16, 1, dtype=np.float32) / 16), | |
"frame_length": np.array(16, dtype=np.int64), | |
} | |
signal = feeds["signal"] | |
frame_length = int(feeds["frame_length"]) | |
window = feeds["window"] | |
frame_step = int(feeds["frame_step"]) | |
onesided_length = (frame_length // 2) + 1 | |
nstfts = 1 + (signal.shape[1] - window.shape[0]) // 8 | |
# [batch_size][frames][frame_length][2] | |
expected = np.empty([1, nstfts, onesided_length, 2], dtype=np.float32) | |
for i in range(nstfts): | |
start = i * frame_step | |
stop = i * frame_step + frame_length | |
complex_out = np.fft.fft(signal[0, start:stop, 0] * window)[ | |
0:onesided_length | |
] | |
c_out = complex_out[0:onesided_length] | |
expected[0, i] = np.stack((c_out.real, c_out.imag), axis=1) | |
# import torch | |
# hop_length = frame_step | |
# ex = torch.stft( | |
# torch.Tensor(feeds["signal"][:, :, 0]), | |
# n_fft=frame_length, window=torch.Tensor(window), | |
# hop_length=hop_length, win_length=frame_length, | |
# onesided=True, return_complex=True, center=False, | |
# normalized=False) | |
# ex = np.transpose(ex.numpy(), [0, 2, 1]) | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
assert_allclose(expected, got1[0]) | |
def get_roi_align_model(self, mode): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None, None, None]) | |
rois = make_tensor_value_info("rois", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None, None]) | |
IS = make_tensor_value_info("I", TensorProto.INT64, [None]) | |
node = make_node( | |
"RoiAlign", | |
["X", "rois", "I"], | |
["Y"], | |
output_height=5, | |
output_width=5, | |
sampling_ratio=2, | |
spatial_scale=1.0, | |
coordinate_transformation_mode="output_half_pixel", | |
mode=mode, | |
) | |
graph = make_graph([node], "g", [X, rois, IS], [Y]) | |
return make_model_gen_version(graph, opset_imports=[make_opsetid("", 17)]) | |
def common_test_roi_align(self, mode): | |
onnx_model = self.get_roi_align_model(mode) | |
X, batch_indices, rois = get_roi_align_input_values() | |
feeds = {"X": X, "rois": rois, "I": batch_indices} | |
sess = run_ort_inference(onnx_model) | |
if sess is None: | |
return | |
expected = sess.run(None, feeds) | |
ref = ReferenceEvaluator(onnx_model) | |
got = ref.run(None, feeds) | |
assert_allclose(expected[0], got[0], atol=1e-5) | |
def test_roi_align(self): | |
with self.subTest(mode="avg"): | |
self.common_test_roi_align("avg") | |
# max does not have example in the backend | |
with self.subTest(mode="max"): | |
self.common_test_roi_align("max") | |
def common_test_roi_align_torch(self, mode): | |
import torch | |
from torchvision.ops import RoIAlign | |
onnx_model = self.get_roi_align_model(mode) | |
sess = ReferenceEvaluator(onnx_model) | |
X, batch_indices, rois = get_roi_align_input_values() | |
got = sess.run(None, {"X": X, "rois": rois, "I": batch_indices}) | |
a = RoIAlign((5, 5), spatial_scale=1.0, sampling_ratio=2) | |
expected = a(torch.from_numpy(X), [torch.from_numpy(rois)]) | |
assert_allclose(expected, got[0], atol=1e-5) | |
def test_roi_align_torch(self): | |
with self.subTest(mode="avg"): | |
self.common_test_roi_align_torch("avg") | |
# not implemented in torch | |
# with self.subTest(mode="max"): | |
# self.common_test_roi_align_torch("max") | |
def test_split(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y1 = make_tensor_value_info("Y1", TensorProto.FLOAT, [None]) | |
Y2 = make_tensor_value_info("Y2", TensorProto.FLOAT, [None]) | |
Y3 = make_tensor_value_info("Y3", TensorProto.FLOAT, [None]) | |
Y4 = make_tensor_value_info("Y4", TensorProto.FLOAT, [None]) | |
node = make_node("Split", ["X"], ["Y1", "Y2", "Y3", "Y4"], num_outputs=4) | |
graph = make_graph([node], "g", [X], [Y1, Y2, Y3, Y4]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 18)]) | |
feeds = {"X": np.arange(10).astype(np.float32)} | |
expected = [ | |
np.array([0, 1, 2], dtype=np.float32), | |
np.array([3, 4, 5], dtype=np.float32), | |
np.array([6, 7, 8], dtype=np.float32), | |
np.array([9], dtype=np.float32), | |
] | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
for i in range(4): | |
assert_allclose(expected[i], got1[i]) | |
def test_split_2(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y1 = make_tensor_value_info("Y1", TensorProto.FLOAT, [None]) | |
Y2 = make_tensor_value_info("Y2", TensorProto.FLOAT, [None]) | |
Y3 = make_tensor_value_info("Y3", TensorProto.FLOAT, [None]) | |
Y4 = make_tensor_value_info("Y4", TensorProto.FLOAT, [None]) | |
node = make_node("Split", ["X", "split"], ["Y1", "Y2", "Y3", "Y4"]) | |
graph = make_graph([node], "g", [X], [Y1, Y2, Y3, Y4]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 18)]) | |
feeds = { | |
"X": np.arange(10).astype(np.float32), | |
"split": np.array([3, 3, 2, 2], dtype=np.int64), | |
} | |
expected = [ | |
np.array([0, 1, 2], dtype=np.float32), | |
np.array([3, 4, 5], dtype=np.float32), | |
np.array([6, 7], dtype=np.float32), | |
np.array([8, 9], dtype=np.float32), | |
] | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
for i in range(4): | |
assert_allclose(expected[i], got1[i]) | |
def test_split_num_outputs_4(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y1 = make_tensor_value_info("Y1", TensorProto.FLOAT, [None]) | |
Y2 = make_tensor_value_info("Y2", TensorProto.FLOAT, [None]) | |
Y3 = make_tensor_value_info("Y3", TensorProto.FLOAT, [None]) | |
Y4 = make_tensor_value_info("Y4", TensorProto.FLOAT, [None]) | |
node = make_node("Split", ["X"], ["Y1", "Y2", "Y3", "Y4"], num_outputs=4) | |
graph = make_graph([node], "g", [X], [Y1, Y2, Y3, Y4]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 18)]) | |
# case 1 | |
feeds = {"X": np.arange(10).astype(np.float32)} | |
expected = [ | |
np.array([0, 1, 2], dtype=np.float32), | |
np.array([3, 4, 5], dtype=np.float32), | |
np.array([6, 7, 8], dtype=np.float32), | |
np.array([9], dtype=np.float32), | |
] | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
for i in range(4): | |
assert_allclose(expected[i], got1[i]) | |
# case 2 | |
feeds = {"X": np.arange(9).astype(np.float32)} | |
expected = [ | |
np.array([0, 1, 2], dtype=np.float32), | |
np.array([3, 4, 5], dtype=np.float32), | |
np.array([6, 7, 8], dtype=np.float32), | |
np.array([], dtype=np.float32), | |
] | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
for i in range(4): | |
assert_allclose(expected[i], got1[i]) | |
def test_argmin(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.INT64, [None]) | |
node = make_node("ArgMin", ["X"], ["Y"], axis=1) | |
graph = make_graph([node], "g", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 18)]) | |
feeds = {"X": np.arange(12).reshape((3, 4)).astype(np.float32)} | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
expected = np.array([0, 0, 0], dtype=np.int64).reshape((-1, 1)) | |
self.assertEqual(expected.tolist(), got1[0].tolist()) | |
def test_argmax(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.INT64, [None]) | |
node = make_node("ArgMax", ["X"], ["Y"], axis=1) | |
graph = make_graph([node], "g", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 18)]) | |
feeds = {"X": np.arange(12).reshape((3, 4)).astype(np.float32)} | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
expected = np.array([3, 3, 3], dtype=np.int64).reshape((-1, 1)) | |
self.assertEqual(expected.tolist(), got1[0].tolist()) | |
def test_slice_squeeze(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
starts = make_tensor_value_info("starts", TensorProto.INT64, [None]) | |
ends = make_tensor_value_info("ends", TensorProto.INT64, [None]) | |
axes = make_tensor_value_info("axes", TensorProto.INT64, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.INT64, [None]) | |
nodes = [ | |
make_node("Slice", ["X", "starts", "ends", "axes"], ["T"]), | |
make_node("Squeeze", ["T", "axes"], ["Y"]), | |
] | |
graph = make_graph(nodes, "g", [X, starts, ends, axes], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 18)]) | |
feeds = { | |
"X": np.array([[0]], dtype=np.int64), | |
"starts": np.array([0], dtype=np.int64), | |
"ends": np.array([1], dtype=np.int64), | |
"axes": np.array([0], dtype=np.int64), | |
} | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
expected = np.array([0], dtype=np.int64) | |
self.assertEqual(expected.tolist(), got1[0].tolist()) | |
def test_slice_squeeze_6(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.INT64, [None]) | |
nodes = [ | |
make_node("Slice", ["X"], ["T"], axes=[0], starts=[0], ends=[1]), | |
make_node("Squeeze", ["T"], ["Y"], axes=[0]), | |
] | |
graph = make_graph(nodes, "g", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 6)]) | |
feeds = {"X": np.array([[0]], dtype=np.int64)} | |
ref1 = ReferenceEvaluator(onnx_model) | |
got1 = ref1.run(None, feeds) | |
expected = np.array([0], dtype=np.int64) | |
self.assertEqual(expected.tolist(), got1[0].tolist()) | |
def test_onnxrt_reduce_mean(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None]) | |
node1 = make_node("ReduceMean", ["X"], ["Y"]) | |
graph = make_graph([node1], "g", [X], [Y]) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 17)]) | |
check_model(onnx_model) | |
sess = ReferenceEvaluator(onnx_model) | |
cls = sess.rt_nodes_[0] | |
self.assertEqual(cls.__class__.__name__, "ReduceMean_1") | |
got = sess.run(None, {"X": np.ones((2, 4), dtype=np.float32)})[0] | |
self.assertEqual(got.shape, (1, 1)) | |
self.assertEqual(got[0, 0], 1) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", 18)]) | |
check_model(onnx_model) | |
sess = ReferenceEvaluator(onnx_model) | |
cls = sess.rt_nodes_[0] | |
self.assertEqual(cls.__class__.__name__, "ReduceMean_18") | |
got = sess.run(None, {"X": np.ones((2, 4), dtype=np.float32)})[0] | |
self.assertEqual(got.shape, (1, 1)) | |
self.assertEqual(got[0, 0], 1) | |
def _cdist_model(opset, reduce_op="ReduceSumSquare"): | |
# subgraph | |
initializers = [] | |
inputs = [ | |
make_tensor_value_info("next_in", TensorProto.FLOAT, [None, 4]), | |
make_tensor_value_info("next", TensorProto.FLOAT, [None]), | |
] | |
outputs = [ | |
make_tensor_value_info("next_out", TensorProto.FLOAT, [None, None]), | |
make_tensor_value_info("scan_out", TensorProto.FLOAT, [None]), | |
] | |
if opset >= 18: | |
initializers.append( | |
from_array(np.array([1], dtype=np.int64), name="axis_red") | |
) | |
node_reduce = make_node( | |
reduce_op, | |
["cdistdf_17_C0", "axis_red"], | |
["cdistdf_17_reduced0"], | |
name="cdistdf_17_ReduceSumSquare", | |
keepdims=0, | |
) | |
else: | |
node_reduce = make_node( | |
reduce_op, | |
["cdistdf_17_C0"], | |
["cdistdf_17_reduced0"], | |
name="cdistdf_17_ReduceSumSquare", | |
axes=[1], | |
keepdims=0, | |
) | |
nodes = [ | |
make_node("Identity", ["next_in"], ["next_out"], name="cdistd_17_Identity"), | |
make_node( | |
"Sub", ["next_in", "next"], ["cdistdf_17_C0"], name="cdistdf_17_Sub" | |
), | |
node_reduce, | |
make_node( | |
"Identity", | |
["cdistdf_17_reduced0"], | |
["scan_out"], | |
name="cdistdf_17_Identity", | |
), | |
] | |
graph = make_graph(nodes, "OnnxIdentity", inputs, outputs, initializers) | |
# main graph | |
initializers = [] | |
list_value = [ | |
1.1394007205963135, | |
-0.6848101019859314, | |
-1.234825849533081, | |
0.4023416340351105, | |
0.17742614448070526, | |
0.46278226375579834, | |
-0.4017809331417084, | |
-1.630198359489441, | |
-0.5096521973609924, | |
0.7774903774261475, | |
-0.4380742907524109, | |
-1.2527953386306763, | |
-1.0485529899597168, | |
1.950775384902954, | |
-1.420017957687378, | |
-1.7062702178955078, | |
1.8675580024719238, | |
-0.15135720372200012, | |
-0.9772778749465942, | |
0.9500884413719177, | |
-2.5529897212982178, | |
-0.7421650290489197, | |
0.653618574142456, | |
0.8644362092018127, | |
1.5327792167663574, | |
0.37816253304481506, | |
1.4693588018417358, | |
0.154947429895401, | |
-0.6724604368209839, | |
-1.7262825965881348, | |
-0.35955315828323364, | |
-0.8131462931632996, | |
-0.8707971572875977, | |
0.056165341287851334, | |
-0.5788496732711792, | |
-0.3115525245666504, | |
1.2302906513214111, | |
-0.302302747964859, | |
1.202379822731018, | |
-0.38732680678367615, | |
2.269754648208618, | |
-0.18718385696411133, | |
-1.4543657302856445, | |
0.04575851559638977, | |
-0.9072983860969543, | |
0.12898291647434235, | |
0.05194539576768875, | |
0.7290905714035034, | |
1.4940791130065918, | |
-0.8540957570075989, | |
-0.2051582634449005, | |
0.3130677044391632, | |
1.764052391052246, | |
2.2408931255340576, | |
0.40015721321105957, | |
0.978738009929657, | |
0.06651721894741058, | |
-0.3627411723136902, | |
0.30247190594673157, | |
-0.6343221068382263, | |
-0.5108051300048828, | |
0.4283318817615509, | |
-1.18063223361969, | |
-0.02818222902715206, | |
-1.6138978004455566, | |
0.38690251111984253, | |
-0.21274028718471527, | |
-0.8954665660858154, | |
0.7610377073287964, | |
0.3336743414402008, | |
0.12167501449584961, | |
0.44386324286460876, | |
-0.10321885347366333, | |
1.4542734622955322, | |
0.4105985164642334, | |
0.14404356479644775, | |
-0.8877857327461243, | |
0.15634897351264954, | |
-1.980796456336975, | |
-0.34791216254234314, | |
] | |
initializers.append( | |
from_array( | |
np.array(list_value, dtype=np.float32).reshape((20, 4)), | |
name="Sc_Scancst", | |
) | |
) | |
initializers.append( | |
from_array(np.array([2], dtype=np.int64), name="To_TopKcst") | |
) | |
inputs = [make_tensor_value_info("input", TensorProto.FLOAT, [None, 4])] | |
outputs = [ | |
make_tensor_value_info("values", TensorProto.FLOAT, [None, 2]), | |
make_tensor_value_info("indices", TensorProto.INT64, [None, 2]), | |
] | |
# nodes | |
nodes = [ | |
make_node( | |
"Scan", | |
["input", "Sc_Scancst"], | |
["UU032UU", "UU033UU"], | |
name="Sc_Scan", | |
body=graph, | |
num_scan_inputs=1, | |
), | |
make_node( | |
"Transpose", | |
["UU033UU"], | |
["Tr_transposed0"], | |
name="Tr_Transpose", | |
perm=[1, 0], | |
), | |
make_node("Sqrt", ["Tr_transposed0"], ["Sq_Y0"], name="Sq_Sqrt"), | |
make_node( | |
"TopK", | |
["Sq_Y0", "To_TopKcst"], | |
["values", "indices"], | |
name="To_TopK", | |
largest=0, | |
sorted=1, | |
), | |
] | |
graph = make_graph(nodes, "dummy", inputs, outputs, initializers) | |
# model | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", opset)]) | |
return onnx_model | |
def test_op_reduce(self, reduce_op_expected, opset: int): | |
reduce_op, expected = reduce_op_expected | |
X = np.arange(8).reshape((-1, 4)).astype(np.float32) | |
results = {} | |
model = self._cdist_model(opset, reduce_op) | |
sess = ReferenceEvaluator(model) | |
got = sess.run(None, {"input": X}) | |
results["ref", opset] = got | |
cl = [ | |
n | |
for n in sess.rt_nodes_[0].body.rt_nodes_ | |
if n.__class__.__name__.startswith(reduce_op) | |
] | |
schema = cl[0]._schema | |
new_cl = type(reduce_op, (cl[0].__class__,), {"op_schema": schema}) | |
sess = ReferenceEvaluator(model, new_ops=[new_cl]) | |
got = sess.run(None, {"input": X}) | |
results["ref_cl", opset] = got | |
baseline = "constant" | |
for k, v in results.items(): | |
for a, b in zip(reversed(expected), reversed(v)): | |
if a.shape != b.shape: | |
raise AssertionError( | |
f"Shape mismatch for {reduce_op!r}, {baseline}:{a.shape} != {k}:{b.shape}." | |
) | |
diff = np.abs(a - b).max() | |
if diff > 1e-6: | |
raise AssertionError( | |
f"Discrepancies (max={diff}) for {reduce_op!r}, {baseline} != {k}\n{a}\n!=\n{b}" | |
) | |
def test_mvn(self, opset: int, ref_opset: int = 13): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None, None, None, None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None, None, None, None]) | |
nodes = [ | |
make_node("MeanVarianceNormalization", ["X"], ["Y"]), | |
] | |
graph = make_graph(nodes, "g", [X], [Y]) | |
x = np.random.rand(3, 3, 3, 1).astype(np.float32) | |
onnx_model = make_model(graph, opset_imports=[make_opsetid("", opset)]) | |
ref = ReferenceEvaluator(onnx_model) | |
got = ref.run(None, {"X": x})[0] | |
ref_onnx_model = make_model(graph, opset_imports=[make_opsetid("", ref_opset)]) | |
ref_expected = ReferenceEvaluator(ref_onnx_model) | |
expected = ref_expected.run(None, {"X": x})[0] | |
self.assertEqual(expected.shape, got.shape) | |
assert_allclose(expected, got) | |
def test_concat_in_a_function(self): | |
def create_model(): | |
nodes = [] | |
inputs = [] | |
outputs = [] | |
functions = [] | |
opsets = {"": onnx_opset_version(), "custom_domain": 1} | |
nodes_fct = [] | |
node = make_node("Concat", ["x:0", "x:1"], ["r__0"], axis=0, domain="") | |
nodes_fct.append(node) | |
opset_imports_fct = [ | |
make_opsetid(domain, 1 if version is None else version) | |
for domain, version in opsets.items() | |
] | |
fct = make_function( | |
"custom_domain", | |
"concat_2", | |
["x:0", "x:1"], | |
["r__0"], | |
nodes_fct, | |
opset_imports_fct, | |
) | |
functions.append(fct) | |
inputs.append(make_tensor_value_info("I__0", TensorProto.DOUBLE, [])) | |
inputs.append(make_tensor_value_info("I__1", TensorProto.DOUBLE, [])) | |
inputs.append(make_tensor_value_info("I__2", TensorProto.DOUBLE, [])) | |
outputs.append(make_tensor_value_info("r__4", TensorProto.DOUBLE, [])) | |
node = make_node( | |
"concat_2", ["I__0", "I__1"], ["r__3"], axis=0, domain="custom_domain" | |
) | |
nodes.append(node) | |
node = make_node( | |
"concat_2", ["I__2", "r__3"], ["r__4"], axis=0, domain="custom_domain" | |
) | |
nodes.append(node) | |
opset_imports = [ | |
make_opsetid(domain, 1 if version is None else version) | |
for domain, version in opsets.items() | |
] | |
graph = make_graph(nodes, "numpyx", inputs, outputs) | |
onnx_model = make_model( | |
graph, opset_imports=opset_imports, functions=functions | |
) | |
return onnx_model | |
onnx_model = create_model() | |
x1 = np.array([[-5, 6], [15, 3]], dtype=np.float64) | |
x2 = np.array([[1, 2]], dtype=np.float64) | |
x3 = np.array([[-1, -2]], dtype=np.float64) | |
z = np.vstack([x1, x2, x3]) | |
ref = ReferenceEvaluator(onnx_model) | |
feeds = {"I__2": x1, "I__0": x2, "I__1": x3} | |
got = ref.run(None, feeds) | |
assert_allclose(z, got[0]) | |
def test_cast_float_to_string(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.STRING, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node("Cast", ["X"], ["Y"], to=TensorProto.STRING), | |
], | |
"g", | |
[X], | |
[Y], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array([1.152512, -0.152612, 0.0, np.nan]) | |
got = ref.run(None, {"X": data})[0] | |
self.assertTrue( | |
(got == np.array([1.152512, -0.152612, 0.0, np.nan]).astype(np.str_)).all() | |
) | |
def test_cast_float_to_string_and_back(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node("Cast", ["X"], ["Z"], to=TensorProto.STRING), | |
make_node("Cast", ["Z"], ["Y"], to=TensorProto.FLOAT), | |
], | |
"g", | |
[X], | |
[Y], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array([1.152512, -0.152612, 0.0, np.nan]) | |
got = ref.run(None, {"X": data})[0] | |
assert_allclose(got, np.array([1.152512, -0.152612, 0.0, np.nan])) | |
def test_split_to_sequence(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, None) | |
Y = make_tensor_value_info("Y", TensorProto.INT64, None) | |
Z = make_tensor_value_info("Z", TensorProto.UNDEFINED, None) | |
nodes = [make_node("SplitToSequence", ["X", "Y"], ["Z"], axis=2)] | |
model = make_model(make_graph(nodes, "g", [X, Y], [Z])) | |
ref = ReferenceEvaluator(model) | |
data = np.arange(18).reshape((1, 3, 6)).astype(np.float32) | |
indices = np.array(2, dtype=np.int64) | |
got = ref.run(None, {"X": data, "Y": indices}) | |
expected = [ | |
[ | |
np.array([[[0.0, 1.0], [6.0, 7.0], [12.0, 13.0]]], dtype=np.float32), | |
np.array([[[2.0, 3.0], [8.0, 9.0], [14.0, 15.0]]], dtype=np.float32), | |
np.array([[[4.0, 5.0], [10.0, 11.0], [16.0, 17.0]]], dtype=np.float32), | |
] | |
] | |
self.assertEqual(len(expected[0]), len(got[0])) | |
for a, b in zip(expected[0], got[0]): | |
assert_allclose(a, b) | |
def test_split_to_sequence_1d(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, None) | |
Y = make_tensor_value_info("Y", TensorProto.INT64, None) | |
Z = make_tensor_value_info("Z", TensorProto.UNDEFINED, None) | |
nodes = [make_node("SplitToSequence", ["X", "Y"], ["Z"], axis=2)] | |
model = make_model(make_graph(nodes, "g", [X, Y], [Z])) | |
ref = ReferenceEvaluator(model) | |
data = np.arange(18).reshape((1, 3, 6)).astype(np.float32) | |
indices = np.array([2, 2, 2], dtype=np.int64) | |
got = ref.run(None, {"X": data, "Y": indices}) | |
expected = [ | |
[ | |
np.array([[[0.0, 1.0], [6.0, 7.0], [12.0, 13.0]]], dtype=np.float32), | |
np.array([[[2.0, 3.0], [8.0, 9.0], [14.0, 15.0]]], dtype=np.float32), | |
np.array([[[4.0, 5.0], [10.0, 11.0], [16.0, 17.0]]], dtype=np.float32), | |
] | |
] | |
self.assertEqual(len(expected[0]), len(got[0])) | |
for a, b in zip(expected[0], got[0]): | |
assert_allclose(a, b) | |
def test_split_to_sequence_nokeepdims_noinput(self): | |
# keepdims is ignored in that case | |
X = make_tensor_value_info("X", TensorProto.FLOAT, None) | |
Z = make_tensor_value_info("Z", TensorProto.UNDEFINED, None) | |
nodes = [make_node("SplitToSequence", ["X"], ["Z"], axis=2, keepdims=0)] | |
model = make_model(make_graph(nodes, "g", [X], [Z])) | |
ref = ReferenceEvaluator(model) | |
data = np.arange(18).reshape((1, 3, 6)).astype(np.float32) | |
got = ref.run(None, {"X": data}) | |
expected = [[data[:, :, i] for i in range(data.shape[2])]] | |
self.assertEqual(len(expected[0]), len(got[0])) | |
for a, b in zip(expected[0], got[0]): | |
assert_allclose(a, b) | |
def test_cast_float8(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
F1 = make_tensor_value_info("F1", TensorProto.FLOAT, [None]) | |
F2 = make_tensor_value_info("F2", TensorProto.FLOAT, [None]) | |
F3 = make_tensor_value_info("F3", TensorProto.FLOAT, [None]) | |
F4 = make_tensor_value_info("F4", TensorProto.FLOAT, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node("Cast", ["X"], ["f81"], to=TensorProto.FLOAT8E4M3FN), | |
make_node("Cast", ["X"], ["f82"], to=TensorProto.FLOAT8E5M2), | |
make_node( | |
"Constant", | |
[], | |
["C1"], | |
value=make_tensor( | |
"C1", TensorProto.FLOAT8E4M3FN, [5], [0, 1, 2, 5e-2, 200] | |
), | |
), | |
make_node( | |
"Constant", | |
[], | |
["C2"], | |
value=make_tensor( | |
"C2", TensorProto.FLOAT8E5M2, [5], [0, 1, 2, 5e-2, 200] | |
), | |
), | |
make_node("Cast", ["f81"], ["F1"], to=TensorProto.FLOAT), | |
make_node("Cast", ["f82"], ["F2"], to=TensorProto.FLOAT), | |
make_node("Cast", ["C1"], ["F3"], to=TensorProto.FLOAT), | |
make_node("Cast", ["C2"], ["F4"], to=TensorProto.FLOAT), | |
], | |
"g", | |
[X], | |
[F1, F2, F3, F4], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array([0, 1, 2, 5e-2, 200], dtype=np.float32) | |
expected1 = np.array( | |
[float8e4m3_to_float32(float32_to_float8e4m3(x)) for x in data] | |
) | |
expected2 = np.array( | |
[float8e5m2_to_float32(float32_to_float8e5m2(x)) for x in data] | |
) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(got[0], expected1) | |
assert_allclose(got[1], expected2) | |
assert_allclose(got[2], expected1) | |
assert_allclose(got[3], expected2) | |
def test_cast_like_float8(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node("Cast", ["X"], ["f8"], to=TensorProto.FLOAT8E4M3FNUZ), | |
make_node("CastLike", ["X", "f8"], ["f32"], saturate=0), | |
make_node("Cast", ["f32"], ["Y"], to=TensorProto.FLOAT), | |
], | |
"g", | |
[X], | |
[Y], | |
) | |
) | |
data = np.array([0, 1e7], dtype=np.float32) | |
expected = np.array( | |
[ | |
float8e4m3_to_float32( | |
float32_to_float8e4m3(x, uz=True, saturate=False), uz=True | |
) | |
for x in data | |
] | |
) | |
ref = ReferenceEvaluator(model) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(got[0], expected) | |
# Forces ReferenceEvaluator to not use the associated implementation for CastLike | |
# but its implementation as a function instead. | |
class CastLike(OpRunExpand): | |
op_domain = "" | |
ref = ReferenceEvaluator(model, new_ops=[CastLike]) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(got[0], expected) | |
def test_cast_float8_output(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
F1 = make_tensor_value_info("F1", TensorProto.FLOAT8E4M3FN, [None]) | |
F2 = make_tensor_value_info("F2", TensorProto.FLOAT8E5M2, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node("Cast", ["X"], ["F1"], to=TensorProto.FLOAT8E4M3FN), | |
make_node("Cast", ["X"], ["F2"], to=TensorProto.FLOAT8E5M2), | |
], | |
"g", | |
[X], | |
[F1, F2], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array([0, 1, 2, 5e-2, 200], dtype=np.float32) | |
expected1 = np.array([float32_to_float8e4m3(x) for x in data]) | |
expected2 = np.array([float32_to_float8e5m2(x) for x in data]) | |
got = ref.run(None, {"X": data}) | |
self.assertEqual(expected1.tolist(), got[0].tolist()) | |
self.assertEqual(expected2.tolist(), got[1].tolist()) | |
def test_float8_4_types(self): | |
x = np.array( | |
[ | |
0.4068359375, | |
352, | |
416, | |
336, | |
304, | |
272, | |
-248, | |
-100, | |
1e-4, | |
1e-2, | |
416, | |
432, | |
1e5, | |
np.inf, | |
-np.inf, | |
np.nan, | |
], | |
dtype=np.float32, | |
) | |
expected = { | |
TensorProto.FLOAT8E4M3FN: np.array( | |
[ | |
0.40625, | |
352.0, | |
416.0, | |
320.0, | |
320.0, | |
256.0, | |
-256.0, | |
-96.0, | |
0.0, | |
0.009765625, | |
416.0, | |
448.0, | |
448.0, | |
448.0, | |
-448.0, | |
np.nan, | |
], | |
dtype=np.float32, | |
), | |
TensorProto.FLOAT8E4M3FNUZ: np.array( | |
[ | |
0.40625, | |
240.0, | |
240.0, | |
240.0, | |
240.0, | |
240.0, | |
-240.0, | |
-96.0, | |
0.0, | |
0.009765625, | |
240.0, | |
240.0, | |
240.0, | |
240.0, | |
-240.0, | |
np.nan, | |
], | |
dtype=np.float32, | |
), | |
TensorProto.FLOAT8E5M2: np.array( | |
[ | |
0.4375, | |
384.0, | |
384.0, | |
320.0, | |
320.0, | |
256.0, | |
-256.0, | |
-96.0, | |
0.0001068115234375, | |
0.009765625, | |
384.0, | |
448.0, | |
57344.0, | |
57344.0, | |
-57344.0, | |
np.nan, | |
], | |
dtype=np.float32, | |
), | |
TensorProto.FLOAT8E5M2FNUZ: np.array( | |
[ | |
4.3750000e-01, | |
3.8400000e02, | |
3.8400000e02, | |
3.2000000e02, | |
3.2000000e02, | |
2.5600000e02, | |
-2.5600000e02, | |
-9.6000000e01, | |
1.0681152e-04, | |
9.7656250e-03, | |
3.8400000e02, | |
4.4800000e02, | |
5.7344000e04, | |
5.7344000e04, | |
-5.7344000e04, | |
np.nan, | |
], | |
dtype=np.float32, | |
), | |
} | |
def model_cast_cast(to): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
node1 = make_node("Cast", ["X"], ["T"], to=to) | |
node2 = make_node("Cast", ["T"], ["Y"], to=TensorProto.FLOAT) | |
graph = make_graph([node1, node2], "lr", [X], [Y]) | |
onnx_model = make_model(graph) | |
check_model(onnx_model) | |
return onnx_model | |
for to, expect in expected.items(): | |
with self.subTest(to=to): | |
onnx_model = model_cast_cast(to) | |
ref = ReferenceEvaluator(onnx_model) | |
y = ref.run(None, {"X": x})[0] | |
assert_allclose(expect, y) | |
self.assertEqual(expect.shape, y.shape) | |
self.assertEqual(expect.dtype, y.dtype) | |
def test_cast_bfloat16_output(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.BFLOAT16, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node("Cast", ["X"], ["Y"], to=TensorProto.BFLOAT16), | |
], | |
"g", | |
[X], | |
[Y], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array([0, 1, 2, 1e5, 200], dtype=np.float32) | |
expected1 = np.array([float32_to_bfloat16(x) for x in data]) | |
got = ref.run(None, {"X": data}) | |
self.assertEqual(expected1.tolist(), got[0].tolist()) | |
def test_quantize_linear_e4m3(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node( | |
"Constant", | |
[], | |
["scale"], | |
value=make_tensor("scale", TensorProto.FLOAT, [1], [2.0]), | |
), | |
make_node( | |
"Constant", | |
[], | |
["zero"], | |
value=make_tensor("zero", TensorProto.FLOAT8E4M3FN, [1], [0.0]), | |
), | |
make_node("QuantizeLinear", ["X", "scale", "zero"], ["T"]), | |
make_node("DequantizeLinear", ["T", "scale"], ["Y"], axis=0), | |
], | |
"g", | |
[X], | |
[Y], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array([0, 1, 2, 1e5, 200], dtype=np.float32) | |
expected = np.array([0, 1, 2, 896, 192], dtype=np.float32) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(expected, got[0]) | |
def test_quantize_linear_e4m3_initializer(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node("QuantizeLinear", ["X", "scale", "zero"], ["T"]), | |
make_node("DequantizeLinear", ["T", "scale"], ["Y"], axis=0), | |
], | |
"g", | |
[X], | |
[Y], | |
[ | |
make_tensor("scale", TensorProto.FLOAT, [1], [2.0]), | |
make_tensor("zero", TensorProto.FLOAT8E4M3FN, [1], [0.0]), | |
], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array([0, 1, 2, 1e5, 200], dtype=np.float32) | |
expected = np.array([0, 1, 2, 896, 192], dtype=np.float32) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(expected, got[0]) | |
def test_quantize_linear_e5m2(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node( | |
"Constant", | |
[], | |
["scale"], | |
value=make_tensor("scale", TensorProto.FLOAT, [1], [2.0]), | |
), | |
make_node( | |
"Constant", | |
[], | |
["zero"], | |
value=make_tensor("zero", TensorProto.FLOAT8E5M2, [1], [0.0]), | |
), | |
make_node("QuantizeLinear", ["X", "scale", "zero"], ["T"]), | |
make_node("DequantizeLinear", ["T", "scale"], ["Y"], axis=0), | |
], | |
"g", | |
[X], | |
[Y], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array([0, 1, 2, 1e5, 200], dtype=np.float32) | |
expected = np.array([0, 1, 2, 98304, 192], dtype=np.float32) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(expected, got[0]) | |
def test_quantize_linear_uint16(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.UINT16, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node("QuantizeLinear", ["X", "scale", "zero"], ["Y"]), | |
], | |
"g", | |
[X], | |
[Y], | |
[ | |
make_tensor("scale", TensorProto.FLOAT, [1], [2.0]), | |
make_tensor("zero", TensorProto.UINT16, [1], [32767]), | |
], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array( | |
[ | |
# rounding half to even | |
0.0, | |
-128.0, | |
3.0, | |
-3.0, | |
# round < .5 | |
2.9, | |
-2.9, | |
# round > .5 | |
3.1, | |
-3.1, | |
# critical point | |
65536.0, | |
-65534.0, | |
# saturate case | |
70000.0, | |
-70000.0, | |
], | |
dtype=np.float32, | |
) | |
expected = np.array( | |
[ | |
32767, | |
32703, | |
32769, | |
32765, | |
32768, | |
32766, | |
32769, | |
32765, | |
65535, | |
0, | |
65535, | |
0, | |
], | |
dtype=np.uint16, | |
) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(expected, got[0]) | |
def test_quantize_linear_int16(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.INT16, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node("QuantizeLinear", ["X", "scale", "zero"], ["Y"]), | |
], | |
"g", | |
[X], | |
[Y], | |
[ | |
make_tensor("scale", TensorProto.FLOAT, [1], [2.0]), | |
make_tensor("zero", TensorProto.INT16, [1], [256]), | |
], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array( | |
[ | |
# rounding half to even | |
0.0, | |
-514.0, | |
3.0, | |
-3.0, | |
# round < .5 | |
2.9, | |
-2.9, | |
# round > .5 | |
3.1, | |
-3.1, | |
# critical point | |
65022.0, | |
-66046.0, | |
65023.0, | |
-66047.0, | |
65024.0, | |
-66048.0, | |
# saturate case | |
70000.0, | |
-70000.0, | |
], | |
dtype=np.float32, | |
) | |
expected = np.array( | |
[ | |
256, | |
-1, | |
258, | |
254, | |
257, | |
255, | |
258, | |
254, | |
32767, | |
-32767, | |
32767, | |
-32768, | |
32767, | |
-32768, | |
32767, | |
-32768, | |
], | |
dtype=np.int16, | |
) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(expected, got[0]) | |
def test_dequantize_linear_uint16(self): | |
X = make_tensor_value_info("X", TensorProto.UINT16, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node( | |
"DequantizeLinear", ["X", "scale", "zero"], ["Y"], axis=0 | |
), | |
], | |
"g", | |
[X], | |
[Y], | |
[ | |
make_tensor("scale", TensorProto.FLOAT, [1], [2.0]), | |
make_tensor("zero", TensorProto.UINT16, [1], [32767]), | |
], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array([30000, 31000, 32768, 33000], dtype=np.uint16) | |
expected = np.array([-5534.0, -3534.0, 2.0, 466.0], dtype=np.float32) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(expected, got[0]) | |
def test_dequantize_linear_int16(self): | |
X = make_tensor_value_info("X", TensorProto.INT16, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node( | |
"DequantizeLinear", ["X", "scale", "zero"], ["Y"], axis=0 | |
), | |
], | |
"g", | |
[X], | |
[Y], | |
[ | |
make_tensor("scale", TensorProto.FLOAT, [1], [2.0]), | |
make_tensor("zero", TensorProto.INT16, [1], [-1024]), | |
], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array([-300, -30, -1025, 1270], dtype=np.int16) | |
expected = np.array([1448.0, 1988.0, -2.0, 4588.0], dtype=np.float32) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(expected, got[0]) | |
def test_blocked_quantize_linear( | |
self, x, scale, zero_point, axis, block_size, expected | |
): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.INT8, [None]) | |
scale_data = np.array(scale, dtype=np.float32) | |
zp_data = np.array(zero_point, dtype=np.int8) | |
model = make_model( | |
make_graph( | |
[ | |
make_node( | |
"QuantizeLinear", | |
["X", "scale", "zero"], | |
["Y"], | |
axis=axis, | |
block_size=block_size, | |
), | |
], | |
"g", | |
[X], | |
[Y], | |
[ | |
make_tensor( | |
"scale", TensorProto.FLOAT, scale_data.shape, scale_data | |
), | |
make_tensor("zero", TensorProto.INT8, scale_data.shape, zp_data), | |
], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array(x, dtype=np.float32) | |
if expected is not None: | |
expected = np.array(expected, dtype=np.int8) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(expected, got[0]) | |
else: | |
with self.assertRaises(ValueError): | |
ref.run(None, {"X": data}) | |
def test_blocked_dequantize_linear( | |
self, x, scale, zero_point, axis, block_size, expected | |
): | |
X = make_tensor_value_info("X", TensorProto.INT8, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
scale_data = np.array(scale, dtype=np.float32) | |
zp_data = np.array(zero_point, dtype=np.int8) | |
model = make_model( | |
make_graph( | |
[ | |
make_node( | |
"DequantizeLinear", | |
["X", "scale", "zero"], | |
["Y"], | |
axis=axis, | |
block_size=block_size, | |
), | |
], | |
"g", | |
[X], | |
[Y], | |
[ | |
make_tensor( | |
"scale", TensorProto.FLOAT, scale_data.shape, scale_data | |
), | |
make_tensor("zero", TensorProto.INT8, scale_data.shape, zp_data), | |
], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array(x, dtype=np.int8) | |
if expected is not None: | |
expected = np.array(expected, dtype=np.float32) | |
got = ref.run(None, {"X": data}) | |
assert_allclose(expected, got[0]) | |
else: | |
with self.assertRaises(ValueError): | |
ref.run(None, {"X": data}) | |
def test_lrn(self): | |
def _expected(x, alpha, beta, bias, size): | |
square_sum = np.zeros((5, 5, 5, 5)).astype(np.float32) | |
for n, c, h, w in np.ndindex(x.shape): | |
square_sum[n, c, h, w] = sum( | |
x[ | |
n, | |
max(0, c - int(math.floor((size - 1) / 2))) : min( | |
5, c + int(math.ceil((size - 1) / 2)) + 1 | |
), | |
h, | |
w, | |
] | |
** 2 | |
) | |
y = x / ((bias + (alpha / size) * square_sum) ** beta) | |
return y | |
# keepdims is ignored in that case | |
alpha = 0.0002 | |
beta = 0.5 | |
bias = 2.0 | |
size = 3 | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [5, 5, 50, 50]) | |
Z = make_tensor_value_info("Z", TensorProto.UNDEFINED, None) | |
nodes = [ | |
make_node("LRN", ["X"], ["Z"], alpha=alpha, beta=beta, bias=bias, size=size) | |
] | |
model = make_model(make_graph(nodes, "g", [X], [Z])) | |
ref = ReferenceEvaluator(model) | |
data = np.random.rand(5, 5, 5, 5).astype(np.float32) | |
got = ref.run(None, {"X": data}) | |
expected = _expected(data, alpha, beta, bias, size) | |
self.assertEqual(len(expected), len(got[0])) | |
def test_conv_im2col_1d(self): | |
feeds = { | |
"X": np.arange(1 * 1 * 11).reshape((1, 1, 11)).astype(np.float32) + 1, | |
"W": np.arange(3).reshape((1, 1, 3)).astype(np.float32), | |
"B": np.zeros((1,), dtype=np.float32), | |
} | |
kwargs = dict( | |
group=1, | |
dilations=[1], | |
kernel_shape=[3], | |
pads=[1, 1], | |
strides=[1], | |
auto_pad="NOTSET", | |
) | |
expected = _conv_implementation(**feeds, **kwargs) | |
got = _conv_implementation_im2col(**feeds, **kwargs) | |
assert_allclose(expected, got) | |
def test_conv_im2col_1d_pad0(self): | |
feeds = { | |
"X": np.arange(2 * 4 * 3).reshape((2, 4, -1)).astype(np.float32) + 1, | |
"W": np.arange(2 * 4 * 3).reshape((-1, 4, 3)).astype(np.float32), | |
"B": np.zeros((1,), dtype=np.float32), | |
} | |
kwargs = dict( | |
group=1, | |
dilations=[1], | |
kernel_shape=[3], | |
pads=[0, 0], | |
strides=[1], | |
auto_pad="NOTSET", | |
) | |
expected = _conv_implementation(**feeds, **kwargs) | |
got = _conv_implementation_im2col(**feeds, **kwargs) | |
assert_allclose(expected, got) | |
def test_conv_im2col_2d(self): | |
feeds = { | |
"X": np.arange(1 * 1 * 11 * 23).reshape((1, 1, 11, 23)).astype(np.float32) | |
+ 1, | |
"W": np.arange(9).reshape((1, 1, 3, 3)).astype(np.float32), | |
"B": np.zeros((1,), dtype=np.float32), | |
} | |
kwargs = dict( | |
group=1, | |
dilations=[1, 1], | |
kernel_shape=[3, 3], | |
pads=[1, 1, 1, 1], | |
strides=[1, 1], | |
auto_pad="NOTSET", | |
) | |
expected = _conv_implementation(**feeds, **kwargs) | |
got = _conv_implementation_im2col(**feeds, **kwargs) | |
assert_allclose(expected, got) | |
def test_conv_im2col_2d_pad0(self): | |
feeds = { | |
"X": np.arange(2 * 3 * 5 * 2).reshape((2, 3, 5, -1)).astype(np.float32) + 1, | |
"W": 2 | |
** np.arange(3 * 3 * 1 * 2).reshape((-1, 3, 1, 2)).astype(np.float32), | |
"B": np.zeros((1,), dtype=np.float32), | |
} | |
kwargs = dict( | |
group=1, | |
dilations=[1, 1], | |
kernel_shape=[1, 2], | |
pads=[0, 0, 0, 0], | |
strides=[1, 1], | |
auto_pad="NOTSET", | |
) | |
expected = _conv_implementation(**feeds, **kwargs) | |
got = _conv_implementation_im2col(**feeds, **kwargs) | |
assert_allclose(expected, got) | |
def test_conv_im2col_2d_autopad(self): | |
feeds = { | |
"X": np.arange(5 * 5).reshape((1, 1, 5, -1)).astype(np.float32) + 1, | |
"W": 2 ** np.arange(3 * 3).reshape((1, 1, 3, 3)).astype(np.float32), | |
"B": np.zeros((1,), dtype=np.float32), | |
} | |
kwargs = dict( | |
group=1, | |
dilations=[1, 1], | |
kernel_shape=[3, 3], | |
strides=[2, 2], | |
pads=None, | |
auto_pad="SAME_LOWER", | |
) | |
expected = _conv_implementation(**feeds, **kwargs) | |
got = _conv_implementation_im2col(**feeds, **kwargs) | |
assert_allclose(expected, got) | |
def test_conv_im2col_3d(self): | |
feeds = { | |
"X": np.arange(1 * 1 * 11 * 5 * 13) | |
.reshape((1, 1, 11, 5, 13)) | |
.astype(np.float32) | |
+ 1, | |
"W": np.arange(27).reshape((1, 1, 3, 3, 3)).astype(np.float32), | |
"B": np.zeros((1,), dtype=np.float32), | |
} | |
kwargs = dict( | |
group=1, | |
dilations=[1, 1, 1], | |
kernel_shape=[3, 3, 3], | |
pads=[1, 1, 1, 1, 1, 1], | |
strides=[1, 1, 1], | |
auto_pad="NOTSET", | |
) | |
expected = _conv_implementation(**feeds, **kwargs) | |
got = _conv_implementation_im2col(**feeds, **kwargs) | |
assert_allclose(expected, got) | |
def test_conv_im2col_2d_strides(self): | |
feeds = { | |
"X": np.arange(1 * 3 * 6 * 6).reshape((1, 3, 6, 6)).astype(np.float32) + 1, | |
"W": np.arange(2 * 3 * 3 * 3).reshape((2, 3, 3, 3)).astype(np.float32), | |
"B": np.zeros((2,), dtype=np.float32), | |
} | |
kwargs = dict( | |
group=1, | |
dilations=[1, 1], | |
kernel_shape=[3, 3], | |
pads=[1, 1, 1, 1], | |
strides=[2, 2], | |
auto_pad="NOTSET", | |
) | |
expected = _conv_implementation(**feeds, **kwargs) | |
got = _conv_implementation_im2col(**feeds, **kwargs) | |
assert_allclose(expected, got) | |
def test_conv_im2col_2d_dilations(self): | |
feeds = { | |
"X": np.arange(1 * 3 * 6 * 6).reshape((1, 3, 6, 6)).astype(np.float32) + 1, | |
"W": np.arange(2 * 3 * 3 * 3).reshape((2, 3, 3, 3)).astype(np.float32), | |
"B": np.zeros((2,), dtype=np.float32), | |
} | |
kwargs = dict( | |
group=1, | |
dilations=[2, 1], | |
kernel_shape=[3, 3], | |
pads=[1, 1, 1, 1], | |
strides=[2, 2], | |
auto_pad="NOTSET", | |
) | |
expected = _conv_implementation(**feeds, **kwargs) | |
got = _conv_implementation_im2col(**feeds, **kwargs) | |
assert_allclose(expected, got) | |
def test_reduce_op_no_axis(self, op): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, None) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, None) | |
data = np.arange(6).reshape((1, 3, 2)).astype(np.float32) | |
nodes = [make_node(op, ["X"], ["Y"], keepdims=0)] | |
model = make_model(make_graph(nodes, "g", [X], [Y])) | |
ref = ReferenceEvaluator(model) | |
got = ref.run(None, {"X": data}) | |
r = got[0] | |
self.assertIsInstance(r, np.ndarray) | |
self.assertEqual(r.shape, ()) | |
def test_pad(self, dim): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, None) | |
P = make_tensor_value_info("P", TensorProto.INT64, None) | |
V = make_tensor_value_info("V", TensorProto.FLOAT, None) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, None) | |
value = np.array([-5], dtype=np.float32) | |
node = make_node("Pad", inputs=["X", "P", "V"], outputs=["Y"], mode="constant") | |
model = make_model(make_graph([node], "g", [X, P, V], [Y])) | |
ref = ReferenceEvaluator(model) | |
x = np.array([1], dtype=np.float32).reshape((1,) * dim) | |
p = np.array([1, 1] * dim, dtype=np.int64) | |
got = ref.run(None, {"X": x, "P": p, "V": value})[0] | |
self.assertEqual(got.shape, (3,) * dim) | |
self.assertEqual(got.dtype, np.float32) | |
p = np.repeat([7, 3], dim).astype(np.int64) | |
got = ref.run(None, {"X": x, "P": p, "V": value})[0] | |
self.assertEqual(got.shape, (11,) * dim) | |
self.assertEqual(got.dtype, np.float32) | |
def test_constant_of_shape(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, None) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, None) | |
nodes = [ | |
make_node("Shape", inputs=["X"], outputs=["shape"]), | |
make_node( | |
"ConstantOfShape", | |
inputs=["shape"], | |
outputs=["Y"], | |
value=make_tensor("value", TensorProto.UINT16, [1], [1]), | |
), | |
] | |
model = make_model(make_graph(nodes, "g", [X], [Y])) | |
ref = ReferenceEvaluator(model) | |
x = np.array(1, dtype=np.float32) | |
got = ref.run(None, {"X": x})[0] | |
self.assertEqual(got.shape, tuple()) | |
self.assertEqual(got.dtype, np.uint16) | |
assert_allclose(np.array(1, dtype=np.uint16), got) | |
def test_constant_of_shape_castlike(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, None) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, None) | |
nodes = [ | |
make_node( | |
"Constant", | |
[], | |
["like"], | |
value=make_tensor("c", TensorProto.UINT16, [1], [2]), | |
), | |
make_node("Shape", inputs=["X"], outputs=["shape"]), | |
make_node( | |
"ConstantOfShape", | |
inputs=["shape"], | |
outputs=["cst"], | |
value=make_tensor("value", TensorProto.INT64, [1], [1]), | |
), | |
make_node("CastLike", ["cst", "like"], ["Y"]), | |
] | |
model = make_model(make_graph(nodes, "g", [X], [Y])) | |
ref = ReferenceEvaluator(model) | |
x = np.array(1, dtype=np.float32) | |
got = ref.run(None, {"X": x})[0] | |
self.assertEqual(got.shape, tuple()) | |
self.assertEqual(got.dtype, np.uint16) | |
assert_allclose(np.array(1, dtype=np.uint16), got) | |
def test_dynamic_quantize_linear(self): | |
feeds = { | |
"X": np.array( | |
[ | |
[ | |
-7.80749545e-02, | |
-3.80597055e-01, | |
1.33831516e-01, | |
-8.20474699e-02, | |
7.56645501e-02, | |
5.65112457e-02, | |
2.56818235e-01, | |
9.42316353e-02, | |
1.88027292e-01, | |
1.44878656e-01, | |
1.34825557e-01, | |
-2.04576910e-01, | |
1.68852255e-01, | |
6.23253360e-02, | |
4.30482924e-01, | |
-5.50433956e-02, | |
9.10681635e-02, | |
1.55332625e-01, | |
-4.53630984e-02, | |
3.99910688e-01, | |
-1.28678545e-01, | |
3.77916731e-02, | |
1.29872710e-01, | |
-1.12420328e-01, | |
-2.97306702e-02, | |
2.20508516e-01, | |
-5.88933006e-03, | |
4.81076002e-01, | |
-1.18835129e-01, | |
-4.45004404e-02, | |
-7.53675848e-02, | |
1.41112670e-01, | |
1.97793499e-01, | |
-7.71476865e-01, | |
8.64694864e-02, | |
1.73293594e-02, | |
1.28247693e-01, | |
7.58144110e-02, | |
-2.71435380e-01, | |
1.75212905e-01, | |
-2.47283235e-01, | |
-3.02810557e-02, | |
8.45039487e-02, | |
6.02229357e-01, | |
-1.04913145e-01, | |
-2.46705681e-01, | |
2.92073280e-01, | |
-3.88464853e-02, | |
4.26557302e-01, | |
-3.71325493e-01, | |
-3.11283618e-01, | |
7.85303488e-02, | |
3.18069518e-01, | |
-1.51467413e-01, | |
-1.02828763e-01, | |
9.29131880e-02, | |
2.55233884e-01, | |
5.00160515e-01, | |
-1.49993747e-01, | |
4.29408073e-01, | |
-1.91787735e-01, | |
3.16187665e-02, | |
-1.84284449e-02, | |
-1.62873864e-01, | |
-2.73632705e-01, | |
2.84725696e-01, | |
-2.87029266e-01, | |
-7.15534389e-02, | |
2.24836454e-01, | |
-1.70527741e-01, | |
-2.65601039e-01, | |
-2.68008932e-03, | |
1.44260898e-01, | |
7.80707747e-02, | |
2.73875445e-02, | |
-1.18391573e-01, | |
-6.44972250e-02, | |
-5.22445887e-03, | |
-2.96754301e-01, | |
1.05800219e-01, | |
2.62558222e-01, | |
3.62841524e-02, | |
9.44730639e-03, | |
1.75837606e-01, | |
2.69956529e-01, | |
3.02758247e-01, | |
-1.13724738e-01, | |
2.98936248e-01, | |
8.54668319e-02, | |
-6.74555600e-01, | |
4.38643873e-01, | |
1.27896462e-02, | |
9.20789093e-02, | |
1.93946883e-01, | |
1.97548166e-01, | |
2.82558739e-01, | |
-2.48879120e-01, | |
-3.93428057e-01, | |
6.45540953e-02, | |
-9.66283306e-03, | |
], | |
[ | |
-2.42438495e-01, | |
-3.58334243e-01, | |
1.22619808e-01, | |
-1.21529415e-01, | |
-5.23974374e-02, | |
-6.74922541e-02, | |
1.09727375e-01, | |
-3.56835872e-03, | |
1.51029706e-01, | |
1.18043356e-01, | |
8.16475376e-02, | |
-2.36466587e-01, | |
9.44180191e-02, | |
5.61679937e-02, | |
3.67988586e-01, | |
1.29441261e-01, | |
1.15772486e-01, | |
5.30351102e-02, | |
-1.04345076e-01, | |
1.29062623e-01, | |
-2.17205048e-01, | |
2.58089490e-02, | |
2.18848974e-01, | |
-8.36039707e-02, | |
5.43577969e-03, | |
7.87076280e-02, | |
1.19966723e-01, | |
2.81631678e-01, | |
-3.58020402e-02, | |
-9.65647772e-02, | |
3.17915753e-02, | |
2.49396205e-01, | |
2.23600790e-01, | |
-3.82718384e-01, | |
1.16506606e-01, | |
-3.19400802e-02, | |
1.60812005e-01, | |
9.81735960e-02, | |
-1.99046463e-01, | |
8.81427377e-02, | |
-1.65171087e-01, | |
-1.10251531e-01, | |
1.15387037e-01, | |
3.19312930e-01, | |
-1.14400804e-01, | |
-2.02447772e-01, | |
2.33669549e-01, | |
9.20853689e-02, | |
3.91551465e-01, | |
-3.58036369e-01, | |
-2.35071778e-01, | |
-5.00670634e-02, | |
1.65313914e-01, | |
-1.60922945e-01, | |
-1.95520848e-01, | |
1.82456985e-01, | |
3.80704433e-01, | |
4.19890225e-01, | |
-2.98131555e-02, | |
3.66110623e-01, | |
-2.81170905e-01, | |
-1.23942450e-01, | |
9.58625227e-02, | |
-5.90450205e-02, | |
-1.56236291e-01, | |
2.11865619e-01, | |
-1.75286919e-01, | |
-8.36539492e-02, | |
1.29381597e-01, | |
-1.66115880e-01, | |
-1.66922957e-01, | |
1.65688396e-01, | |
8.41224194e-02, | |
1.67839468e-01, | |
-4.03967649e-02, | |
-8.34071711e-02, | |
-5.65301552e-02, | |
1.67074010e-01, | |
-3.19734544e-01, | |
7.71123618e-02, | |
5.57036921e-02, | |
1.20709330e-01, | |
-6.63790107e-02, | |
1.36002287e-01, | |
3.42324018e-01, | |
3.42968374e-01, | |
-1.42380476e-01, | |
2.89662749e-01, | |
1.82179566e-02, | |
-4.96056974e-01, | |
2.64364302e-01, | |
7.38918930e-02, | |
1.11150607e-01, | |
-5.95749579e-02, | |
1.18562862e-01, | |
6.90007359e-02, | |
-2.08283514e-01, | |
-1.70682222e-01, | |
7.82715827e-02, | |
1.35489792e-01, | |
], | |
[ | |
-6.35757595e-02, | |
-3.20629895e-01, | |
9.38569903e-02, | |
-1.15190029e-01, | |
1.03646070e-02, | |
-4.31734361e-02, | |
2.31717676e-01, | |
4.01005745e-02, | |
1.18915148e-01, | |
2.10071519e-01, | |
-2.99234912e-02, | |
-2.93135524e-01, | |
-2.39588290e-01, | |
6.71441257e-02, | |
3.15238893e-01, | |
-5.08778207e-02, | |
1.16147280e-01, | |
-6.72954097e-02, | |
-1.63787514e-01, | |
1.60288364e-01, | |
-2.30847582e-01, | |
-2.22037435e-01, | |
4.50191796e-02, | |
-1.09636143e-01, | |
-6.00997508e-02, | |
2.14693844e-01, | |
-8.51289369e-03, | |
3.88416052e-01, | |
-1.18085533e-01, | |
-8.57385695e-02, | |
-1.18666515e-02, | |
3.29741478e-01, | |
2.03779504e-01, | |
-1.69492334e-01, | |
1.94324687e-01, | |
4.16374728e-02, | |
6.83876276e-02, | |
-1.85160581e-02, | |
-3.73600274e-02, | |
7.14804307e-02, | |
-3.01446438e-01, | |
1.74035393e-02, | |
3.35123807e-01, | |
4.17102218e-01, | |
-1.58562332e-01, | |
-2.54483074e-02, | |
1.99573949e-01, | |
7.95029774e-02, | |
4.82958555e-01, | |
-4.58213627e-01, | |
-2.67229170e-01, | |
1.27542794e-01, | |
4.47799414e-02, | |
-1.68686539e-01, | |
-8.92183557e-02, | |
9.79782715e-02, | |
2.77656261e-02, | |
2.96871901e-01, | |
6.29860088e-02, | |
1.77246213e-01, | |
-3.15523535e-01, | |
-1.74582571e-01, | |
1.25724282e-02, | |
-4.54988703e-02, | |
-1.29154682e-01, | |
2.13568255e-01, | |
-1.40891463e-01, | |
-2.66211092e-01, | |
2.62144595e-01, | |
-1.42889306e-01, | |
-6.67349845e-02, | |
1.63380653e-02, | |
-1.92995071e-02, | |
1.14811368e-01, | |
1.43584609e-01, | |
-2.65347548e-02, | |
9.32592154e-02, | |
2.23325342e-01, | |
-1.87100068e-01, | |
5.71197420e-02, | |
2.71253467e-01, | |
1.02890521e-01, | |
-3.04941833e-03, | |
1.10537663e-01, | |
2.75728375e-01, | |
2.11693868e-01, | |
-1.80009842e-01, | |
2.99300496e-02, | |
1.77923322e-01, | |
-4.53491032e-01, | |
1.94149211e-01, | |
2.47100577e-01, | |
-9.95091349e-03, | |
1.99301094e-01, | |
2.06564650e-01, | |
-1.99648179e-02, | |
-1.89629450e-01, | |
1.61689930e-02, | |
1.04817756e-01, | |
2.06400946e-01, | |
], | |
[ | |
-3.86251360e-02, | |
-3.07115853e-01, | |
3.74236181e-02, | |
-1.71237886e-01, | |
-2.77359486e-02, | |
-4.08068746e-02, | |
6.91853091e-02, | |
2.65322998e-03, | |
1.23958819e-01, | |
2.20951259e-01, | |
8.36500078e-02, | |
-3.14413190e-01, | |
1.34745941e-01, | |
-8.25274512e-02, | |
3.36270213e-01, | |
-2.49634441e-02, | |
-1.06189884e-02, | |
7.18819201e-02, | |
-1.73392966e-01, | |
2.98084319e-01, | |
-1.25626653e-01, | |
-2.17043106e-02, | |
2.87982523e-01, | |
-3.41932476e-03, | |
-6.89984411e-02, | |
-7.14176893e-03, | |
4.49542440e-02, | |
5.16424477e-01, | |
-1.02622584e-01, | |
2.02640779e-02, | |
2.30106711e-02, | |
1.93037808e-01, | |
2.03393996e-01, | |
-4.34632808e-01, | |
5.74068353e-02, | |
9.66466218e-02, | |
-7.19890296e-02, | |
4.23505977e-02, | |
-1.60067812e-01, | |
3.88947129e-02, | |
-3.32329512e-01, | |
1.91072702e-01, | |
3.79437394e-02, | |
4.33839470e-01, | |
-1.29231960e-01, | |
-1.46881789e-01, | |
2.47269243e-01, | |
2.86379829e-02, | |
2.92926908e-01, | |
-2.97341049e-01, | |
-3.40239167e-01, | |
1.52589783e-01, | |
8.81168991e-02, | |
1.40633471e-02, | |
-8.83188471e-02, | |
2.48367310e-01, | |
3.41982782e-01, | |
3.18781316e-01, | |
-1.15148552e-01, | |
2.54325420e-01, | |
-1.82771236e-01, | |
5.33889830e-02, | |
2.12034155e-02, | |
-9.78844613e-02, | |
-1.61611915e-01, | |
3.54084134e-01, | |
-1.25332132e-01, | |
-2.07989410e-01, | |
2.35610008e-02, | |
-1.35993093e-01, | |
-1.97377697e-01, | |
-1.21356212e-02, | |
7.86775351e-03, | |
4.71337497e-01, | |
9.49376822e-03, | |
4.25345525e-02, | |
1.14162050e-01, | |
6.27847165e-02, | |
-2.31957644e-01, | |
-8.33211765e-02, | |
2.02719584e-01, | |
-4.64919358e-02, | |
7.57966787e-02, | |
1.01521172e-01, | |
3.16580981e-01, | |
1.49488643e-01, | |
-1.20770879e-01, | |
2.56563038e-01, | |
1.66572407e-01, | |
-6.11343801e-01, | |
2.09183827e-01, | |
6.66101649e-02, | |
1.77328646e-01, | |
1.77777156e-01, | |
2.03266457e-01, | |
1.37545317e-01, | |
-1.38004154e-01, | |
-2.57656008e-01, | |
1.83920860e-01, | |
2.87696868e-02, | |
], | |
[ | |
5.14136627e-04, | |
-2.88997203e-01, | |
-3.34554128e-02, | |
-6.80408552e-02, | |
-4.61972654e-02, | |
1.75428241e-01, | |
1.86710209e-01, | |
-1.51355267e-01, | |
1.28381401e-01, | |
2.87129283e-01, | |
5.22154570e-03, | |
-3.53413224e-01, | |
-3.87947261e-02, | |
5.81918843e-02, | |
3.17016482e-01, | |
-2.51671404e-01, | |
7.01867491e-02, | |
-4.92945537e-02, | |
-2.73323953e-01, | |
1.27938241e-01, | |
-4.11552131e-01, | |
-2.23789401e-02, | |
1.95418939e-01, | |
-3.25946212e-01, | |
4.60528135e-02, | |
1.86884090e-01, | |
6.98191971e-02, | |
2.95638293e-01, | |
-1.80322871e-01, | |
-2.98620313e-02, | |
2.11789399e-01, | |
3.15145910e-01, | |
3.11763227e-01, | |
-5.78147054e-01, | |
6.43244758e-02, | |
-3.14367823e-02, | |
1.86190963e-01, | |
1.71108633e-01, | |
-6.29745722e-02, | |
7.48428777e-02, | |
-4.58003521e-01, | |
-3.01471800e-01, | |
2.17973694e-01, | |
5.73273778e-01, | |
-1.01379365e-01, | |
-2.46951967e-01, | |
1.58989042e-01, | |
-1.79126799e-01, | |
5.24153829e-01, | |
-4.64852840e-01, | |
-2.94867605e-01, | |
1.83558539e-01, | |
2.50552952e-01, | |
-8.56962949e-02, | |
-2.57554710e-01, | |
2.30709136e-01, | |
3.53280157e-01, | |
3.20184112e-01, | |
2.99184099e-02, | |
3.09098989e-01, | |
-2.02217728e-01, | |
-9.29201543e-02, | |
-1.20569356e-02, | |
-1.37087986e-01, | |
-2.16524690e-01, | |
1.39787242e-01, | |
-1.27902150e-01, | |
-2.64347821e-01, | |
9.29943919e-02, | |
-1.57217175e-01, | |
-3.86638314e-01, | |
7.90465698e-02, | |
4.07211930e-02, | |
-3.07695866e-02, | |
1.27393469e-01, | |
-1.18648581e-01, | |
-7.21216127e-02, | |
-3.71141285e-02, | |
-3.37082207e-01, | |
-6.23112544e-02, | |
3.52166295e-01, | |
1.51260465e-01, | |
5.03427610e-02, | |
1.90433189e-01, | |
5.21304548e-01, | |
3.85341585e-01, | |
-1.26457050e-01, | |
1.54961571e-01, | |
5.29025272e-02, | |
-5.06486952e-01, | |
2.28533953e-01, | |
1.78438187e-01, | |
-4.14765030e-02, | |
2.01903239e-01, | |
-3.89365852e-02, | |
8.84043872e-02, | |
-1.55351788e-01, | |
-9.06582028e-02, | |
9.95599255e-02, | |
-4.79989760e-02, | |
], | |
], | |
dtype=np.float32, | |
) | |
} | |
expected = [ | |
np.array( | |
[ | |
[ | |
129, | |
72, | |
168, | |
128, | |
157, | |
153, | |
191, | |
160, | |
178, | |
170, | |
168, | |
105, | |
174, | |
155, | |
223, | |
133, | |
160, | |
172, | |
135, | |
217, | |
119, | |
150, | |
167, | |
122, | |
137, | |
184, | |
142, | |
232, | |
121, | |
135, | |
129, | |
169, | |
180, | |
0, | |
159, | |
146, | |
167, | |
157, | |
93, | |
176, | |
97, | |
137, | |
159, | |
255, | |
124, | |
97, | |
197, | |
136, | |
222, | |
74, | |
85, | |
158, | |
202, | |
115, | |
124, | |
160, | |
190, | |
236, | |
115, | |
223, | |
107, | |
149, | |
140, | |
113, | |
92, | |
196, | |
90, | |
130, | |
185, | |
111, | |
94, | |
143, | |
170, | |
157, | |
148, | |
121, | |
131, | |
142, | |
88, | |
163, | |
192, | |
150, | |
145, | |
176, | |
193, | |
199, | |
122, | |
198, | |
159, | |
18, | |
224, | |
145, | |
160, | |
179, | |
180, | |
195, | |
97, | |
70, | |
155, | |
141, | |
], | |
[ | |
98, | |
76, | |
166, | |
120, | |
133, | |
130, | |
163, | |
142, | |
171, | |
165, | |
158, | |
99, | |
161, | |
153, | |
211, | |
167, | |
164, | |
153, | |
124, | |
167, | |
103, | |
148, | |
184, | |
127, | |
144, | |
158, | |
165, | |
195, | |
136, | |
125, | |
149, | |
189, | |
185, | |
72, | |
165, | |
137, | |
173, | |
161, | |
106, | |
159, | |
112, | |
123, | |
164, | |
202, | |
122, | |
105, | |
186, | |
160, | |
216, | |
77, | |
99, | |
134, | |
174, | |
113, | |
107, | |
177, | |
214, | |
221, | |
137, | |
211, | |
91, | |
120, | |
161, | |
132, | |
114, | |
182, | |
110, | |
127, | |
167, | |
112, | |
112, | |
174, | |
159, | |
174, | |
136, | |
128, | |
133, | |
174, | |
84, | |
157, | |
153, | |
165, | |
131, | |
168, | |
207, | |
207, | |
117, | |
197, | |
146, | |
51, | |
192, | |
157, | |
164, | |
132, | |
165, | |
156, | |
104, | |
111, | |
158, | |
168, | |
], | |
[ | |
131, | |
83, | |
160, | |
122, | |
145, | |
135, | |
186, | |
150, | |
165, | |
182, | |
137, | |
89, | |
99, | |
155, | |
202, | |
134, | |
165, | |
131, | |
113, | |
173, | |
100, | |
102, | |
151, | |
123, | |
132, | |
183, | |
141, | |
215, | |
121, | |
127, | |
141, | |
204, | |
181, | |
112, | |
179, | |
151, | |
156, | |
140, | |
136, | |
156, | |
87, | |
146, | |
205, | |
220, | |
114, | |
138, | |
180, | |
158, | |
233, | |
58, | |
93, | |
167, | |
151, | |
112, | |
126, | |
161, | |
148, | |
198, | |
155, | |
176, | |
84, | |
111, | |
145, | |
135, | |
119, | |
183, | |
117, | |
94, | |
192, | |
116, | |
131, | |
146, | |
139, | |
164, | |
170, | |
138, | |
160, | |
184, | |
108, | |
154, | |
193, | |
162, | |
142, | |
164, | |
194, | |
182, | |
110, | |
149, | |
176, | |
59, | |
179, | |
189, | |
141, | |
180, | |
181, | |
139, | |
108, | |
146, | |
162, | |
181, | |
], | |
[ | |
136, | |
86, | |
150, | |
111, | |
138, | |
135, | |
156, | |
143, | |
166, | |
184, | |
159, | |
85, | |
168, | |
128, | |
205, | |
138, | |
141, | |
156, | |
111, | |
198, | |
120, | |
139, | |
196, | |
142, | |
130, | |
142, | |
151, | |
239, | |
124, | |
147, | |
147, | |
179, | |
181, | |
62, | |
154, | |
161, | |
130, | |
151, | |
113, | |
150, | |
81, | |
178, | |
150, | |
224, | |
119, | |
116, | |
189, | |
148, | |
197, | |
88, | |
80, | |
171, | |
159, | |
146, | |
127, | |
189, | |
206, | |
202, | |
122, | |
190, | |
109, | |
153, | |
147, | |
125, | |
113, | |
209, | |
120, | |
104, | |
147, | |
118, | |
106, | |
141, | |
144, | |
230, | |
145, | |
151, | |
164, | |
155, | |
100, | |
128, | |
181, | |
134, | |
157, | |
162, | |
202, | |
171, | |
121, | |
191, | |
174, | |
30, | |
182, | |
155, | |
176, | |
176, | |
181, | |
169, | |
117, | |
95, | |
177, | |
148, | |
], | |
[ | |
143, | |
89, | |
137, | |
130, | |
134, | |
176, | |
178, | |
115, | |
167, | |
196, | |
144, | |
77, | |
136, | |
154, | |
202, | |
96, | |
156, | |
134, | |
92, | |
167, | |
67, | |
139, | |
179, | |
82, | |
152, | |
178, | |
156, | |
198, | |
110, | |
137, | |
182, | |
202, | |
201, | |
36, | |
155, | |
137, | |
178, | |
175, | |
131, | |
157, | |
58, | |
87, | |
183, | |
249, | |
124, | |
97, | |
173, | |
110, | |
240, | |
57, | |
88, | |
177, | |
190, | |
127, | |
95, | |
186, | |
209, | |
202, | |
149, | |
200, | |
105, | |
126, | |
141, | |
118, | |
103, | |
169, | |
119, | |
94, | |
160, | |
114, | |
71, | |
158, | |
151, | |
137, | |
167, | |
121, | |
130, | |
136, | |
80, | |
131, | |
208, | |
171, | |
152, | |
178, | |
240, | |
215, | |
120, | |
172, | |
153, | |
49, | |
185, | |
176, | |
135, | |
180, | |
136, | |
159, | |
114, | |
126, | |
161, | |
134, | |
], | |
], | |
dtype=np.uint8, | |
), | |
np.array(0.005387083161622286, dtype=np.float32), | |
np.array(143, dtype=np.uint8), | |
] | |
X = make_tensor_value_info("X", TensorProto.FLOAT, None) | |
Y = make_tensor_value_info("Y", TensorProto.UINT8, None) | |
Scale = make_tensor_value_info("scale", TensorProto.FLOAT, None) | |
Zp = make_tensor_value_info("zp", TensorProto.UINT8, None) | |
nodes = [ | |
make_node( | |
"DynamicQuantizeLinear", | |
["X"], | |
["Y", "scale", "zp"], | |
), | |
] | |
model = make_model( | |
make_graph(nodes, "g", [X], [Y, Scale, Zp]), | |
opset_imports=[make_opsetid("", onnx_opset_version() - 1)], | |
) | |
ref = ReferenceEvaluator(model) | |
got = ref.run(None, feeds) | |
self.assertEqual(len(got), 3) | |
for i in range(2, -1, -1): | |
assert_allclose(expected[i], got[i]) | |
def test_string_concat(self, a, b, expected, expected_shape): | |
A = make_tensor_value_info("A", TensorProto.STRING, None) | |
B = make_tensor_value_info("B", TensorProto.STRING, None) | |
Y = make_tensor_value_info("Y", TensorProto.STRING, None) | |
node = make_node("StringConcat", inputs=["A", "B"], outputs=["Y"]) | |
model = make_model(make_graph([node], "g", [A, B], [Y])) | |
ref = ReferenceEvaluator(model) | |
result, *_ = ref.run(None, {"A": np.array(a), "B": np.array(b)}) | |
np.testing.assert_array_equal(result, expected) | |
self.assertEqual(result.dtype.kind, "O") | |
self.assertEqual(result.shape, expected_shape) | |
def test_string_split( | |
self, | |
x, | |
delimiter, | |
maxsplit, | |
expected_split, | |
expected_num_splits, | |
): | |
X = make_tensor_value_info("X", TensorProto.STRING, (None)) | |
Splits = make_tensor_value_info("Splits", TensorProto.STRING, (None)) | |
MaxSplits = make_tensor_value_info("MaxSplits", TensorProto.INT32, (None)) | |
node = make_node( | |
"StringSplit", | |
inputs=["X"], | |
outputs=["Splits", "MaxSplits"], | |
delimiter=delimiter, | |
maxsplit=maxsplit, | |
) | |
model = make_model(make_graph([node], "g", [X], [Splits, MaxSplits])) | |
ref = ReferenceEvaluator(model) | |
x = np.array(x, dtype=object) | |
result, num_splits, *_ = ref.run(None, {"X": x}) | |
np.testing.assert_array_equal(result, np.array(expected_split, dtype=object)) | |
np.testing.assert_array_equal( | |
num_splits, np.array(expected_num_splits, dtype=np.int64) | |
) | |
def test_qlinearconv_int8(self): | |
node = make_node( | |
"QLinearMatMul", | |
inputs=[ | |
"a", | |
"a_scale", | |
"a_zero_point", | |
"b", | |
"b_scale", | |
"b_zero_point", | |
"y_scale", | |
"y_zero_point", | |
], | |
outputs=["y"], | |
) | |
graph = make_graph( | |
[node], | |
"g", | |
[ | |
make_tensor_value_info("a", TensorProto.FLOAT, [None, None]), | |
make_tensor_value_info("a_scale", TensorProto.FLOAT, [1]), | |
make_tensor_value_info("a_zero_point", TensorProto.INT8, [1]), | |
make_tensor_value_info("b", TensorProto.FLOAT, [None, None]), | |
make_tensor_value_info("b_scale", TensorProto.FLOAT, [1]), | |
make_tensor_value_info("b_zero_point", TensorProto.INT8, [1]), | |
make_tensor_value_info("y_scale", TensorProto.FLOAT, [1]), | |
make_tensor_value_info("y_zero_point", TensorProto.INT8, [1]), | |
], | |
[make_tensor_value_info("y", TensorProto.FLOAT, [None, None])], | |
) | |
onnx_model = make_model( | |
graph, opset_imports=[make_opsetid("", 20)], ir_version=9 | |
) | |
sess = ReferenceEvaluator(onnx_model) | |
a = np.array([[208, 236, 0, 238], [3, 214, 255, 29]]) | |
a -= 127 | |
a = a.astype(np.int8) | |
a_scale = np.array([0.0066], dtype=np.float32) | |
a_zero_point = np.array([113 - 127], dtype=np.int8) | |
b = np.array([[152, 51, 244], [60, 26, 255], [0, 127, 246], [127, 254, 247]]) | |
b -= 127 | |
b = b.astype(np.int8) | |
b_scale = np.array([0.00705], dtype=np.float32) | |
b_zero_point = np.array([114 - 127], dtype=np.int8) | |
y_scale = np.array([0.0107], dtype=np.float32) | |
y_zero_point = np.array([118 - 127], dtype=np.int8) | |
got = sess.run( | |
None, | |
dict( | |
a=a, | |
a_scale=a_scale, | |
a_zero_point=a_zero_point, | |
b=b, | |
b_scale=b_scale, | |
b_zero_point=b_zero_point, | |
y_scale=y_scale, | |
y_zero_point=y_zero_point, | |
), | |
) | |
np.testing.assert_array_equal( | |
np.array([[41, -12, -9], [1, -75, 20]], dtype=np.int8), got[0] | |
) | |
def test_regex_full_match(self, x, pattern, expected, expected_shape): | |
X = make_tensor_value_info("X", TensorProto.STRING, None) | |
Y = make_tensor_value_info("Y", TensorProto.BOOL, None) | |
node = make_node("RegexFullMatch", inputs=["X"], outputs=["Y"], pattern=pattern) | |
model = make_model(make_graph([node], "g", [X], [Y])) | |
ref = ReferenceEvaluator(model) | |
result, *_ = ref.run(None, {"X": np.array(x)}) | |
np.testing.assert_array_equal(result, expected) | |
self.assertEqual(result.dtype.kind, "b") | |
self.assertEqual(result.shape, expected_shape) | |
def test_regex_invalid_pattern(self): | |
X = make_tensor_value_info("X", TensorProto.STRING, None) | |
Y = make_tensor_value_info("Y", TensorProto.BOOL, None) | |
node = make_node("RegexFullMatch", inputs=["X"], outputs=["Y"], pattern="x)") | |
model = make_model(make_graph([node], "g", [X], [Y])) | |
ref = ReferenceEvaluator(model) | |
with self.assertRaises(ValueError): | |
ref.run(None, {"X": np.array(["x"])}) | |
def test_quantize_linear_int4(self, qtype, data, expected): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, [None]) | |
Y = make_tensor_value_info("Y", TensorProto.FLOAT, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node( | |
"Constant", | |
[], | |
["scale"], | |
value=make_tensor("scale", TensorProto.FLOAT, [1], [2.0]), | |
), | |
make_node( | |
"Constant", | |
[], | |
["zero"], | |
value=make_tensor("zero", qtype, [1], [0]), | |
), | |
make_node("QuantizeLinear", ["X", "scale", "zero"], ["T"]), | |
make_node("DequantizeLinear", ["T", "scale"], ["Y"], axis=0), | |
], | |
"g", | |
[X], | |
[Y], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
got = ref.run(None, {"X": np.asarray(data)}) | |
assert_allclose(expected, got[0]) | |
def test_cast_int4_output(self, cast_from, cast_to): | |
X = make_tensor_value_info("X", cast_from, [None]) | |
Y = make_tensor_value_info("Y", cast_to, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node("Cast", ["X"], ["Y"], to=cast_to), | |
], | |
"g", | |
[X], | |
[Y], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array([0, 1, 2.4, 2.6, 4, 10], dtype=np.float32) | |
signed = cast_to == TensorProto.INT4 | |
expected1 = np.array( | |
[subbyte.float32_to_4bit_unpacked(x, signed=signed) for x in data] | |
) | |
got = ref.run(None, {"X": data}) | |
self.assertEqual(expected1.tolist(), got[0].tolist()) | |
def test_cast_int4_input(self, cast_from, cast_to): | |
X = make_tensor_value_info("X", cast_from, [None]) | |
Y = make_tensor_value_info("Y", cast_to, [None]) | |
model = make_model( | |
make_graph( | |
[ | |
make_node("Cast", ["X"], ["Y"], to=TensorProto.FLOAT), | |
], | |
"g", | |
[X], | |
[Y], | |
) | |
) | |
ref = ReferenceEvaluator(model) | |
data = np.array(range(0, 7), dtype=np.float32) | |
cast_from_np = custom.uint4 if cast_from == TensorProto.UINT4 else custom.int4 | |
data = data.astype(cast_from_np) | |
expected1 = np.array( | |
[subbyte.float32_to_4bit_unpacked(x, cast_from_np) for x in data] | |
) | |
got = ref.run(None, {"X": data}) | |
self.assertEqual(expected1.tolist(), got[0].tolist()) | |
def test_a_function_calling_a_function_once(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, ["N"]) | |
output = make_tensor_value_info("output", TensorProto.FLOAT, ["N"]) | |
Z = make_tensor_value_info("output", TensorProto.FLOAT, ["N"]) | |
func_def_add = make_function( | |
"this", | |
"fctadd", | |
["input2"], | |
["output"], | |
[ | |
make_node("Constant", [], ["one"], value_floats=[1.0], name="CC0"), | |
make_node("Add", ["input2", "one"], ["output"], name="A1"), | |
], | |
opset_imports=[make_operatorsetid("", 15)], | |
) | |
func_def = make_function( | |
"this", | |
"fct", | |
["input"], | |
["output"], | |
[ | |
make_node("Constant", [], ["one"], value_floats=[1.0], name="CC"), | |
make_node("Greater", ["input", "one"], ["cond"]), | |
make_node( | |
"If", | |
["cond"], | |
["output"], | |
then_branch=make_graph( | |
[make_node("fctadd", ["input"], ["output"], domain="this")], | |
"gthen", | |
[], | |
[output], | |
), | |
else_branch=make_graph( | |
[make_node("Add", ["input", "one"], ["output"], domain="")], | |
"gelse", | |
[], | |
[output], | |
), | |
name=":IF", | |
), | |
], | |
opset_imports=[ | |
make_operatorsetid("", 15), | |
make_operatorsetid("this", 1), | |
], | |
) | |
model_def = make_model( | |
make_graph( | |
[ | |
make_node("fct", ["X"], ["output"], domain="this"), | |
], | |
"test", | |
[X], | |
[Z], | |
), | |
ir_version=7, | |
opset_imports=[ | |
make_operatorsetid("", 15), | |
make_operatorsetid("this", 1), | |
], | |
functions=[func_def_add, func_def], | |
) | |
feeds = {"X": np.array([-5], dtype=np.float32)} | |
oinf = ReferenceEvaluator(model_def) | |
expected = oinf.run(None, feeds) | |
# inlining does not work here | |
# inlined = inline_local_functions(model_def) | |
# oinf = ReferenceEvaluator(inlined) | |
# goti = oinf.run(None, feeds) | |
# self.assertEqual(expected[0].tolist(), goti[0].tolist()) | |
self.assertEqual(expected[0], np.array([-4], dtype=np.float32)) | |
def test_a_function_calling_a_function_double(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, ["N"]) | |
output = make_tensor_value_info("output", TensorProto.FLOAT, ["N"]) | |
Z = make_tensor_value_info("output", TensorProto.FLOAT, ["N"]) | |
func_def_add = make_function( | |
"this", | |
"fctadd", | |
["input2"], | |
["output"], | |
[ | |
make_node("Constant", [], ["one"], value_floats=[1.0], name="CC0"), | |
make_node("Add", ["input2", "one"], ["output"], name="A1"), | |
], | |
opset_imports=[make_operatorsetid("", 15)], | |
) | |
func_def = make_function( | |
"this", | |
"fct", | |
["input"], | |
["output"], | |
[ | |
make_node("Constant", [], ["one"], value_floats=[1.0], name="CC"), | |
make_node("Greater", ["input", "one"], ["cond"]), | |
make_node( | |
"If", | |
["cond"], | |
["output"], | |
then_branch=make_graph( | |
[make_node("fctadd", ["input"], ["output"], domain="this")], | |
"gthen", | |
[], | |
[output], | |
), | |
else_branch=make_graph( | |
[make_node("Add", ["input", "one"], ["output"], domain="")], | |
"gelse", | |
[], | |
[output], | |
), | |
name=":IF", | |
), | |
], | |
opset_imports=[ | |
make_operatorsetid("", 15), | |
make_operatorsetid("this", 1), | |
], | |
) | |
model_def = make_model( | |
make_graph( | |
[ | |
make_node("fct", ["X"], ["ztmp"], domain="this"), | |
make_node("fct", ["ztmp"], ["output"], domain="this"), | |
], | |
"test", | |
[X], | |
[Z], | |
), | |
ir_version=7, | |
opset_imports=[ | |
make_operatorsetid("", 15), | |
make_operatorsetid("this", 1), | |
], | |
functions=[func_def_add, func_def], | |
) | |
feeds = {"X": np.array([-5], dtype=np.float32)} | |
oinf = ReferenceEvaluator(model_def) | |
expected = oinf.run(None, feeds) | |
# inlining does not work here | |
# inlined = inline_local_functions(model_def) | |
# oinf = ReferenceEvaluator(inlined) | |
# goti = oinf.run(None, feeds) | |
# self.assertEqual(expected[0].tolist(), goti[0].tolist()) | |
self.assertEqual(expected[0], np.array([-3], dtype=np.float32)) | |
def test_overload_reference_implementation(self): | |
X = make_tensor_value_info("X", TensorProto.FLOAT, ["N"]) | |
output = make_tensor_value_info("output", TensorProto.FLOAT, ["N"]) | |
Z = make_tensor_value_info("output", TensorProto.FLOAT, ["N"]) | |
func_def_add = make_function( | |
"this", | |
"fctadd", | |
["input2"], | |
["output"], | |
[ | |
make_node("Constant", [], ["one"], value_floats=[1.0], name="CC0"), | |
make_node("Add", ["input2", "one"], ["output"], name="A1"), | |
], | |
opset_imports=[make_operatorsetid("", 15)], | |
) | |
func_def = make_function( | |
"this", | |
"fct", | |
["input"], | |
["output"], | |
[ | |
make_node("Constant", [], ["one"], value_floats=[1.0], name="CC"), | |
make_node("Greater", ["input", "one"], ["cond"]), | |
make_node( | |
"If", | |
["cond"], | |
["output"], | |
then_branch=make_graph( | |
[make_node("fctadd", ["input"], ["output"], domain="this")], | |
"gthen", | |
[], | |
[output], | |
), | |
else_branch=make_graph( | |
[make_node("Add", ["input", "one"], ["output"], domain="")], | |
"gelse", | |
[], | |
[output], | |
), | |
name=":IF", | |
), | |
], | |
opset_imports=[ | |
make_operatorsetid("", 15), | |
make_operatorsetid("this", 1), | |
], | |
) | |
model_def = make_model( | |
make_graph( | |
[ | |
make_node("fct", ["X"], ["ztmp"], domain="this"), | |
make_node("fct", ["ztmp"], ["output"], domain="this"), | |
], | |
"test", | |
[X], | |
[Z], | |
), | |
ir_version=7, | |
opset_imports=[ | |
make_operatorsetid("", 15), | |
make_operatorsetid("this", 1), | |
], | |
functions=[func_def_add, func_def], | |
) | |
class MyReferenceEvaluator(ReferenceEvaluator): | |
pass | |
oinf = MyReferenceEvaluator(model_def) | |
for v in oinf.functions_.values(): | |
self.assertIsInstance(v, MyReferenceEvaluator) | |
if __name__ == "__main__": | |
unittest.main(verbosity=2) | |