File size: 4,612 Bytes
2601533
 
 
 
56cf2e9
 
2601533
 
 
56cf2e9
2601533
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56cf2e9
 
2601533
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8827b1e
2601533
 
8827b1e
2601533
 
8827b1e
2601533
 
 
 
 
 
 
 
ffaf155
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2601533
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import pandas as pd
import pytest
import networkx as nx

from lynxkite.core import workspace, ops
from lynxkite_graph_analytics.core import Bundle, execute, ENV


async def test_execute_operation_not_in_catalog():
    ws = workspace.Workspace(env=ENV)
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="1",
            type="node_type",
            data=workspace.WorkspaceNodeData(title="Non existing op", params={}),
            position=workspace.Position(x=0, y=0),
        )
    )
    await execute(ws)
    assert ws.nodes[0].data.error == "Operation not found in catalog"


async def test_execute_operation_inputs_correct_cast():
    # Test that the automatic casting of operation inputs works correctly.

    op = ops.op_registration("test")

    @op("Create Bundle")
    def create_bundle() -> Bundle:
        df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]})
        return Bundle(dfs={"edges": df})

    @op("Bundle to Graph")
    def bundle_to_graph(graph: nx.Graph) -> nx.Graph:
        return graph

    @op("Graph to Bundle")
    def graph_to_bundle(bundle: Bundle) -> pd.DataFrame:
        return list(bundle.dfs.values())[0]

    @op("Dataframe to Bundle")
    def dataframe_to_bundle(bundle: Bundle) -> Bundle:
        return bundle

    ws = workspace.Workspace(env="test")
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="1",
            type="node_type",
            data=workspace.WorkspaceNodeData(title="Create Bundle", params={}),
            position=workspace.Position(x=0, y=0),
        )
    )
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="2",
            type="node_type",
            data=workspace.WorkspaceNodeData(title="Bundle to Graph", params={}),
            position=workspace.Position(x=100, y=0),
        )
    )
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="3",
            type="node_type",
            data=workspace.WorkspaceNodeData(title="Graph to Bundle", params={}),
            position=workspace.Position(x=200, y=0),
        )
    )
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="4",
            type="node_type",
            data=workspace.WorkspaceNodeData(title="Dataframe to Bundle", params={}),
            position=workspace.Position(x=300, y=0),
        )
    )
    ws.edges = [
        workspace.WorkspaceEdge(
            id="1", source="1", target="2", sourceHandle="output", targetHandle="graph"
        ),
        workspace.WorkspaceEdge(
            id="2", source="2", target="3", sourceHandle="output", targetHandle="bundle"
        ),
        workspace.WorkspaceEdge(
            id="3", source="3", target="4", sourceHandle="output", targetHandle="bundle"
        ),
    ]

    await execute(ws)

    assert all([node.data.error is None for node in ws.nodes])


async def test_multiple_inputs():
    """Make sure each input goes to the right argument."""
    op = ops.op_registration("test")

    @op("One")
    def one():
        return 1

    @op("Two")
    def two():
        return 2

    @op("Smaller?", view="visualization")
    def is_smaller(a, b):
        return a < b

    ws = workspace.Workspace(env="test")
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="one",
            type="cool",
            data=workspace.WorkspaceNodeData(title="One", params={}),
            position=workspace.Position(x=0, y=0),
        )
    )
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="two",
            type="cool",
            data=workspace.WorkspaceNodeData(title="Two", params={}),
            position=workspace.Position(x=100, y=0),
        )
    )
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="smaller",
            type="cool",
            data=workspace.WorkspaceNodeData(title="Smaller?", params={}),
            position=workspace.Position(x=200, y=0),
        )
    )
    ws.edges = [
        workspace.WorkspaceEdge(
            id="one",
            source="one",
            target="smaller",
            sourceHandle="output",
            targetHandle="a",
        ),
        workspace.WorkspaceEdge(
            id="two",
            source="two",
            target="smaller",
            sourceHandle="output",
            targetHandle="b",
        ),
    ]

    await execute(ws)

    assert ws.nodes[-1].data.display is True
    # Flip the inputs.
    ws.edges[0].targetHandle = "b"
    ws.edges[1].targetHandle = "a"
    await execute(ws)
    assert ws.nodes[-1].data.display is False


if __name__ == "__main__":
    pytest.main()