File size: 2,836 Bytes
2601533
 
 
 
 
fc43558
2601533
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import pandas as pd
import pytest
import networkx as nx

from lynxkite.core import workspace
from lynxkite_graph_analytics.lynxkite_ops import Bundle, execute, op


async def test_execute_operation_not_in_catalog():
    ws = workspace.Workspace(env="test")
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="1",
            type="node_type",
            data=workspace.WorkspaceNodeData(title="Non existing op", params={}),
            position=workspace.Position(x=0, y=0),
        )
    )
    await execute(ws)
    assert ws.nodes[0].data.error == "Operation not found in catalog"


async def test_execute_operation_inputs_correct_cast():
    # Test that the automatic casting of operation inputs works correctly.

    @op("Create Bundle")
    def create_bundle() -> Bundle:
        df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]})
        return Bundle(dfs={"edges": df})

    @op("Bundle to Graph")
    def bundle_to_graph(graph: nx.Graph) -> nx.Graph:
        return graph

    @op("Graph to Bundle")
    def graph_to_bundle(bundle: Bundle) -> pd.DataFrame:
        return list(bundle.dfs.values())[0]

    @op("Dataframe to Bundle")
    def dataframe_to_bundle(bundle: Bundle) -> Bundle:
        return bundle

    ws = workspace.Workspace(env="test")
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="1",
            type="node_type",
            data=workspace.WorkspaceNodeData(title="Create Bundle", params={}),
            position=workspace.Position(x=0, y=0),
        )
    )
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="2",
            type="node_type",
            data=workspace.WorkspaceNodeData(title="Bundle to Graph", params={}),
            position=workspace.Position(x=100, y=0),
        )
    )
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="3",
            type="node_type",
            data=workspace.WorkspaceNodeData(title="Graph to Bundle", params={}),
            position=workspace.Position(x=200, y=0),
        )
    )
    ws.nodes.append(
        workspace.WorkspaceNode(
            id="4",
            type="node_type",
            data=workspace.WorkspaceNodeData(title="Dataframe to Bundle", params={}),
            position=workspace.Position(x=300, y=0),
        )
    )
    ws.edges = [
        workspace.WorkspaceEdge(
            id="1", source="1", target="2", sourceHandle="1", targetHandle="2"
        ),
        workspace.WorkspaceEdge(
            id="2", source="2", target="3", sourceHandle="2", targetHandle="3"
        ),
        workspace.WorkspaceEdge(
            id="3", source="3", target="4", sourceHandle="3", targetHandle="4"
        ),
    ]

    await execute(ws)

    assert all([node.data.error is None for node in ws.nodes])


if __name__ == "__main__":
    pytest.main()