Spaces:
Running
Running
# Copyright (c) ONNX Project Contributors | |
# SPDX-License-Identifier: Apache-2.0 | |
from __future__ import annotations | |
import typing | |
import numpy as np | |
import onnx | |
from onnx import TensorProto | |
from onnx.backend.test.case.base import Base | |
from onnx.backend.test.case.model import expect | |
def SequenceEmptyImpl() -> list[np.ndarray | None]: | |
return [] | |
def SequenceConstructImpl(*tensors: np.ndarray) -> list[np.ndarray]: | |
return list(tensors) | |
def SequenceInsertImpl( | |
sequence: list[np.ndarray], tensor: np.ndarray, position: int | None = None | |
) -> list[np.ndarray]: | |
if position is None: | |
position = len(sequence) | |
sequence.insert(position, tensor) | |
return sequence | |
def SequenceAtImpl(sequence: list[np.ndarray], position: int) -> np.ndarray: | |
return sequence[position] | |
def SequenceEraseImpl( | |
sequence: list[np.ndarray], position: int | None = None | |
) -> list[np.ndarray | None]: | |
if position is None: | |
position = -1 | |
del sequence[position] | |
return sequence | |
def SequenceLengthImpl(sequence: list[np.ndarray]) -> np.int64: | |
return np.int64(len(sequence)) | |
def SplitToSequenceImpl( | |
tensor: np.ndarray, | |
split: int | list[int] | None = None, | |
axis: int = 0, | |
keepdims: int = 1, | |
) -> list[np.ndarray]: | |
dim_size = tensor.shape[axis] | |
if split is None: | |
split = 1 | |
split_indices = [ | |
i * split + 1 for i in range(dim_size) if i * split + 1 < dim_size | |
] | |
if not keepdims: | |
results = np.array_split(tensor, split_indices, axis) | |
return [np.squeeze(res, axis) for res in results] | |
if np.isscalar(split): | |
split_indices = [i * split + 1 for i in range(dim_size) if i * split + 1 < dim_size] # type: ignore | |
else: | |
split_indices = np.cumsum(split) + 1 | |
return np.array_split(tensor, split_indices, axis) # type: ignore | |
def ConcatFromSequenceImpl( | |
sequence: list[np.ndarray], axis: int, new_axis: int | None = 0 | |
) -> np.ndarray: | |
if not new_axis: | |
return np.concatenate(sequence, axis) | |
return np.stack(sequence, axis) | |
class Sequence(Base): | |
def export() -> None: | |
def make_graph( | |
nodes: list[onnx.helper.NodeProto], | |
input_shapes: list[typing.Sequence[str | int] | None], | |
output_shapes: list[typing.Sequence[str | int] | None], | |
input_names: list[str], | |
output_names: list[str], | |
input_types: list[TensorProto.DataType], | |
output_types: list[TensorProto.DataType], | |
initializers: list[TensorProto] | None = None, | |
) -> onnx.helper.GraphProto: | |
graph = onnx.helper.make_graph( | |
nodes=nodes, | |
name="Sequence", | |
inputs=[ | |
onnx.helper.make_tensor_value_info(name, input_type, input_shape) | |
for name, input_type, input_shape in zip( | |
input_names, input_types, input_shapes | |
) | |
], | |
outputs=[ | |
onnx.helper.make_tensor_value_info(name, output_type, output_shape) | |
for name, output_type, output_shape in zip( | |
output_names, output_types, output_shapes | |
) | |
], | |
initializer=initializers, | |
) | |
return graph | |
# 1st testcase - insert and at. | |
# 1. SequenceEmpty: -> [] | |
# 2. SequenceInsert(x): -> [x] | |
# 3. SequenceInsert(y): -> [x, y] | |
# 4. SequenceInsert(z, 1): -> [x, z, y] | |
# 5. SequenceAt(2): -> y | |
seq_empty_node = onnx.helper.make_node("SequenceEmpty", [], ["Seq_empty"]) | |
seq_insert_node = onnx.helper.make_node( | |
"SequenceInsert", ["Seq_empty", "X"], ["Seq_1"] | |
) | |
seq_insert_node2 = onnx.helper.make_node( | |
"SequenceInsert", ["Seq_1", "Y"], ["Seq_2"] | |
) | |
seq_insert_node3 = onnx.helper.make_node( | |
"SequenceInsert", ["Seq_2", "Z", "pos"], ["Seq_3"] | |
) | |
seq_at_node = onnx.helper.make_node("SequenceAt", ["Seq_3", "pos_at"], ["out"]) | |
x_shape = [2, 3, 4] | |
y_shape = [1, 3, 4] | |
z_shape = [3, 3, 4] | |
out_shape = [None, 3, 4] | |
x = np.ones(x_shape, dtype=np.float32) | |
y = np.zeros(y_shape, dtype=np.float32) | |
z = np.ones(z_shape, dtype=np.float32) * 2 | |
pos_val = 1 | |
pos_at_val = 2 | |
out = SequenceEmptyImpl() | |
out = SequenceInsertImpl(out, x) | |
out = SequenceInsertImpl(out, y) | |
out = SequenceInsertImpl(out, z, pos_val) | |
out = SequenceAtImpl(out, pos_at_val) | |
assert np.array_equal(out, y) | |
pos = onnx.helper.make_tensor("pos", TensorProto.INT64, (), (pos_val,)) | |
pos_at = onnx.helper.make_tensor("pos_at", TensorProto.INT64, (), (pos_at_val,)) | |
graph = make_graph( | |
[ | |
seq_empty_node, | |
seq_insert_node, | |
seq_insert_node2, | |
seq_insert_node3, | |
seq_at_node, | |
], | |
[x_shape, y_shape, z_shape, [], []], # type: ignore | |
[out_shape], # type: ignore | |
["X", "Y", "Z", "pos", "pos_at"], | |
["out"], | |
[onnx.TensorProto.FLOAT] * 3 + [onnx.TensorProto.INT64] * 2, # type: ignore | |
[onnx.TensorProto.FLOAT], | |
[pos, pos_at], | |
) | |
model = onnx.helper.make_model_gen_version( | |
graph, | |
producer_name="backend-test", | |
opset_imports=[onnx.helper.make_opsetid("", 12)], | |
) | |
expect(model, inputs=[x, y, z], outputs=[out], name="test_sequence_model1") | |
# 2nd testcase - erase and at. | |
# 1. SequenceConstruct(x, y, z): -> [x, y, z] | |
# 2. SequenceErase(1): -> [x, z] | |
# 3. SequenceAt(1): -> z | |
seq_construct_node = onnx.helper.make_node( | |
"SequenceConstruct", ["X", "Y", "Z"], ["seq_1"] | |
) | |
seq_erase_node = onnx.helper.make_node( | |
"SequenceErase", ["seq_1", "pos_erase"], ["seq_2"] | |
) | |
seq_at_node = onnx.helper.make_node("SequenceAt", ["seq_2", "pos_at"], ["out"]) | |
tensor_shape = [2, 3, 4] | |
x = np.ones(tensor_shape, dtype=np.float32) | |
y = np.zeros(tensor_shape, dtype=np.float32) | |
z = np.ones(tensor_shape, dtype=np.float32) * 2 | |
pos_erase_val = 1 | |
pos_at_val = 1 | |
out = SequenceConstructImpl(x, y, z) | |
out = SequenceEraseImpl(out, pos_erase_val) | |
out = SequenceAtImpl(out, pos_at_val) | |
assert np.array_equal(out, z) | |
pos_erase = onnx.helper.make_tensor( | |
"pos_erase", TensorProto.INT64, (), (pos_erase_val,) | |
) | |
pos_at = onnx.helper.make_tensor("pos_at", TensorProto.INT64, (), (pos_at_val,)) | |
graph = make_graph( | |
[seq_construct_node, seq_erase_node, seq_at_node], | |
[tensor_shape, tensor_shape, tensor_shape, [], []], # type: ignore | |
[tensor_shape], # type: ignore | |
["X", "Y", "Z", "pos_erase", "pos_at"], | |
["out"], | |
[onnx.TensorProto.FLOAT] * 3 + [onnx.TensorProto.INT64] * 2, # type: ignore | |
[onnx.TensorProto.FLOAT], | |
[pos_erase, pos_at], | |
) | |
model = onnx.helper.make_model_gen_version( | |
graph, | |
producer_name="backend-test", | |
opset_imports=[onnx.helper.make_opsetid("", 12)], | |
) | |
expect(model, inputs=[x, y, z], outputs=[out], name="test_sequence_model2") | |
# 3rd testcase - erase, insert and at, with negative index value. | |
# 1. SequenceConstruct(x, y, z): -> [x, y, z] | |
# 2. SequenceErase(-3): -> [y, z] | |
# 3. SequenceInsert(x, -1): -> [y, x, z] | |
# 4. SequenceAt(-1): -> z | |
seq_construct_node = onnx.helper.make_node( | |
"SequenceConstruct", ["X", "Y", "Z"], ["seq_1"] | |
) | |
seq_erase_node = onnx.helper.make_node( | |
"SequenceErase", ["seq_1", "pos_erase"], ["seq_2"] | |
) | |
seq_insert_node = onnx.helper.make_node( | |
"SequenceInsert", ["seq_2", "X", "pos_insert"], ["seq_3"] | |
) | |
seq_at_node = onnx.helper.make_node("SequenceAt", ["seq_3", "pos_at"], ["out"]) | |
tensor_shape = [2, 3, 4] | |
x = np.ones(tensor_shape, dtype=np.float32) | |
y = np.zeros(tensor_shape, dtype=np.float32) | |
z = np.ones(tensor_shape, dtype=np.float32) * 2 | |
pos_erase_val = -3 | |
pos_insert_val = -1 | |
pos_at_val = -1 | |
out = SequenceConstructImpl(x, y, z) | |
out = SequenceEraseImpl(out, pos_erase_val) | |
out = SequenceInsertImpl(out, x, pos_insert_val) | |
out = SequenceAtImpl(out, pos_at_val) | |
assert np.array_equal(out, z) | |
pos_erase = onnx.helper.make_tensor( | |
"pos_erase", TensorProto.INT64, (), (pos_erase_val,) | |
) | |
pos_insert = onnx.helper.make_tensor( | |
"pos_insert", TensorProto.INT64, (), (pos_insert_val,) | |
) | |
pos_at = onnx.helper.make_tensor("pos_at", TensorProto.INT64, (), (pos_at_val,)) | |
graph = make_graph( | |
[seq_construct_node, seq_erase_node, seq_insert_node, seq_at_node], | |
[tensor_shape, tensor_shape, tensor_shape, [], [], []], # type: ignore | |
[tensor_shape], # type: ignore | |
["X", "Y", "Z", "pos_erase", "pos_insert", "pos_at"], | |
["out"], | |
[onnx.TensorProto.FLOAT] * 3 + [onnx.TensorProto.INT64] * 3, # type: ignore | |
[onnx.TensorProto.FLOAT], | |
[pos_erase, pos_insert, pos_at], | |
) | |
model = onnx.helper.make_model_gen_version( | |
graph, | |
producer_name="backend-test", | |
opset_imports=[onnx.helper.make_opsetid("", 12)], | |
) | |
expect(model, inputs=[x, y, z], outputs=[out], name="test_sequence_model3") | |
# 4th testcase - concat | |
seq_construct_node = onnx.helper.make_node( | |
"SequenceConstruct", ["X", "Y", "Z"], ["seq_1"] | |
) | |
seq_concat_node = onnx.helper.make_node( | |
"ConcatFromSequence", ["seq_1"], ["out"], axis=1 | |
) | |
tensor_shape = [2, 3, 4] | |
concat_out_shape = [2, None, 4] | |
x = np.ones(tensor_shape, dtype=np.float32) | |
y = np.zeros(tensor_shape, dtype=np.float32) | |
z = np.ones(tensor_shape, dtype=np.float32) * 2 | |
out = SequenceConstructImpl(x, y, z) | |
concat_out = ConcatFromSequenceImpl(out, 1) | |
graph = make_graph( | |
[seq_construct_node, seq_concat_node], | |
[tensor_shape] * 3, # type: ignore | |
[concat_out_shape], # type: ignore | |
["X", "Y", "Z"], | |
["out"], | |
[onnx.TensorProto.FLOAT] * 3, # type: ignore | |
[onnx.TensorProto.FLOAT], | |
) | |
model = onnx.helper.make_model_gen_version( | |
graph, | |
producer_name="backend-test", | |
opset_imports=[onnx.helper.make_opsetid("", 12)], | |
) | |
expect( | |
model, inputs=[x, y, z], outputs=[concat_out], name="test_sequence_model4" | |
) | |
# 5th testcase - concat with new_axis = 1 | |
seq_construct_node = onnx.helper.make_node( | |
"SequenceConstruct", ["X", "Y", "Z"], ["seq_1"] | |
) | |
seq_concat_node = onnx.helper.make_node( | |
"ConcatFromSequence", ["seq_1"], ["out"], axis=-1, new_axis=1 | |
) | |
tensor_shape = [2, 3, 4] | |
concat_out_shape = [2, 3, 4, 3] | |
x = np.ones(tensor_shape, dtype=np.float32) | |
y = np.zeros(tensor_shape, dtype=np.float32) | |
z = np.ones(tensor_shape, dtype=np.float32) * 2 | |
out = SequenceConstructImpl(x, y, z) | |
concat_out = ConcatFromSequenceImpl(out, -1, 1) | |
graph = make_graph( | |
[seq_construct_node, seq_concat_node], | |
[tensor_shape] * 3, # type: ignore | |
[concat_out_shape], # type: ignore | |
["X", "Y", "Z"], | |
["out"], | |
[onnx.TensorProto.FLOAT] * 3, # type: ignore | |
[onnx.TensorProto.FLOAT], | |
) | |
model = onnx.helper.make_model_gen_version( | |
graph, | |
producer_name="backend-test", | |
opset_imports=[onnx.helper.make_opsetid("", 12)], | |
) | |
expect( | |
model, inputs=[x, y, z], outputs=[concat_out], name="test_sequence_model5" | |
) | |
# 6th testcase - split and len | |
seq_split_node = onnx.helper.make_node( | |
"SplitToSequence", ["X"], ["seq_1"], axis=-1 | |
) | |
seq_len_node = onnx.helper.make_node("SequenceLength", ["seq_1"], ["len"]) | |
tensor_shape = [2, 3, 4] | |
len_shape = [] # type: ignore | |
x = np.ones(tensor_shape, dtype=np.float32) | |
out = SplitToSequenceImpl(x, axis=-1) | |
out = SequenceLengthImpl(out) | |
assert np.array_equal(out, np.int64(4)) | |
graph = onnx.helper.make_graph( | |
nodes=[seq_split_node, seq_len_node], | |
name="Sequence", | |
inputs=[ | |
onnx.helper.make_tensor_value_info( | |
"X", onnx.TensorProto.FLOAT, tensor_shape | |
) | |
], | |
outputs=[ | |
onnx.helper.make_tensor_value_info( | |
"len", onnx.TensorProto.INT64, len_shape | |
) | |
], | |
) # type: ignore | |
model = onnx.helper.make_model_gen_version( | |
graph, | |
producer_name="backend-test", | |
opset_imports=[onnx.helper.make_opsetid("", 12)], | |
) | |
expect(model, inputs=[x], outputs=[out], name="test_sequence_model6") | |
# 7th testcase - split with keepdims=0, and SequenceAt | |
seq_split_node = onnx.helper.make_node( | |
"SplitToSequence", ["X"], ["seq_1"], axis=0, keepdims=0 | |
) | |
seq_at_node = onnx.helper.make_node("SequenceAt", ["seq_1", "pos_at"], ["out"]) | |
tensor_shape = [2, 3, 4] | |
out_shape = [3, 4] | |
x = np.random.rand(*tensor_shape) | |
pos_at_val = 1 | |
out = SplitToSequenceImpl(x, axis=0, keepdims=0) | |
out = SequenceAtImpl(out, pos_at_val) | |
assert np.array_equal(out, x[pos_at_val]) | |
pos_at = onnx.helper.make_tensor("pos_at", TensorProto.INT64, (), (pos_at_val,)) | |
graph = make_graph( | |
[seq_split_node, seq_at_node], | |
[tensor_shape, []], # type: ignore | |
[out_shape], # type: ignore | |
["X", "pos_at"], | |
["out"], | |
[onnx.TensorProto.DOUBLE, onnx.TensorProto.INT64], | |
[onnx.TensorProto.DOUBLE], | |
[pos_at], | |
) | |
model = onnx.helper.make_model_gen_version( | |
graph, | |
producer_name="backend-test", | |
opset_imports=[onnx.helper.make_opsetid("", 12)], | |
) | |
expect(model, inputs=[x], outputs=[out], name="test_sequence_model7") | |
# 8th testcase - split zero length | |
seq_split_node = onnx.helper.make_node( | |
"SplitToSequence", ["X", "Splits"], ["seq_1"] | |
) | |
seq_len_node = onnx.helper.make_node("SequenceLength", ["seq_1"], ["len"]) | |
tensor_shape = ["n"] # type: ignore | |
splits_shape = [3] # type: ignore | |
x = np.array([]).astype(np.float32) | |
splits = np.array([0, 0, 0]).astype(np.int64) | |
out_len = np.int64(3) | |
graph = onnx.helper.make_graph( | |
nodes=[seq_split_node, seq_len_node], | |
name="Sequence", | |
inputs=[ | |
onnx.helper.make_tensor_value_info( | |
"X", onnx.TensorProto.FLOAT, tensor_shape | |
), # type: ignore | |
onnx.helper.make_tensor_value_info( | |
"Splits", onnx.TensorProto.INT64, splits_shape | |
), | |
], # type: ignore | |
outputs=[ | |
onnx.helper.make_tensor_value_info( | |
"len", onnx.TensorProto.INT64, len_shape | |
) | |
], | |
) # type: ignore | |
model = onnx.helper.make_model_gen_version( | |
graph, | |
producer_name="backend-test", | |
opset_imports=[onnx.helper.make_opsetid("", 12)], | |
) | |
expect( | |
model, inputs=[x, splits], outputs=[out_len], name="test_sequence_model8" | |
) | |