lynxkite / lynxkite-graph-analytics /tests /test_lynxkite_ops.py
darabos's picture
Split lynxkite_ops.py into two.
56cf2e9
raw
history blame
2.87 kB
import pandas as pd
import pytest
import networkx as nx
from lynxkite.core import workspace, ops
from lynxkite_graph_analytics.core import Bundle, execute, ENV
async def test_execute_operation_not_in_catalog():
ws = workspace.Workspace(env=ENV)
ws.nodes.append(
workspace.WorkspaceNode(
id="1",
type="node_type",
data=workspace.WorkspaceNodeData(title="Non existing op", params={}),
position=workspace.Position(x=0, y=0),
)
)
await execute(ws)
assert ws.nodes[0].data.error == "Operation not found in catalog"
async def test_execute_operation_inputs_correct_cast():
# Test that the automatic casting of operation inputs works correctly.
op = ops.op_registration("test")
@op("Create Bundle")
def create_bundle() -> Bundle:
df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]})
return Bundle(dfs={"edges": df})
@op("Bundle to Graph")
def bundle_to_graph(graph: nx.Graph) -> nx.Graph:
return graph
@op("Graph to Bundle")
def graph_to_bundle(bundle: Bundle) -> pd.DataFrame:
return list(bundle.dfs.values())[0]
@op("Dataframe to Bundle")
def dataframe_to_bundle(bundle: Bundle) -> Bundle:
return bundle
ws = workspace.Workspace(env="test")
ws.nodes.append(
workspace.WorkspaceNode(
id="1",
type="node_type",
data=workspace.WorkspaceNodeData(title="Create Bundle", params={}),
position=workspace.Position(x=0, y=0),
)
)
ws.nodes.append(
workspace.WorkspaceNode(
id="2",
type="node_type",
data=workspace.WorkspaceNodeData(title="Bundle to Graph", params={}),
position=workspace.Position(x=100, y=0),
)
)
ws.nodes.append(
workspace.WorkspaceNode(
id="3",
type="node_type",
data=workspace.WorkspaceNodeData(title="Graph to Bundle", params={}),
position=workspace.Position(x=200, y=0),
)
)
ws.nodes.append(
workspace.WorkspaceNode(
id="4",
type="node_type",
data=workspace.WorkspaceNodeData(title="Dataframe to Bundle", params={}),
position=workspace.Position(x=300, y=0),
)
)
ws.edges = [
workspace.WorkspaceEdge(
id="1", source="1", target="2", sourceHandle="1", targetHandle="2"
),
workspace.WorkspaceEdge(
id="2", source="2", target="3", sourceHandle="2", targetHandle="3"
),
workspace.WorkspaceEdge(
id="3", source="3", target="4", sourceHandle="3", targetHandle="4"
),
]
await execute(ws)
assert all([node.data.error is None for node in ws.nodes])
if __name__ == "__main__":
pytest.main()