lynxkite / lynxkite-graph-analytics /tests /test_lynxkite_ops.py
darabos's picture
Find plugins by module name instead of namespace module.
fc43558
raw
history blame
2.84 kB
import pandas as pd
import pytest
import networkx as nx
from lynxkite.core import workspace
from lynxkite_graph_analytics.lynxkite_ops import Bundle, execute, op
async def test_execute_operation_not_in_catalog():
ws = workspace.Workspace(env="test")
ws.nodes.append(
workspace.WorkspaceNode(
id="1",
type="node_type",
data=workspace.WorkspaceNodeData(title="Non existing op", params={}),
position=workspace.Position(x=0, y=0),
)
)
await execute(ws)
assert ws.nodes[0].data.error == "Operation not found in catalog"
async def test_execute_operation_inputs_correct_cast():
# Test that the automatic casting of operation inputs works correctly.
@op("Create Bundle")
def create_bundle() -> Bundle:
df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]})
return Bundle(dfs={"edges": df})
@op("Bundle to Graph")
def bundle_to_graph(graph: nx.Graph) -> nx.Graph:
return graph
@op("Graph to Bundle")
def graph_to_bundle(bundle: Bundle) -> pd.DataFrame:
return list(bundle.dfs.values())[0]
@op("Dataframe to Bundle")
def dataframe_to_bundle(bundle: Bundle) -> Bundle:
return bundle
ws = workspace.Workspace(env="test")
ws.nodes.append(
workspace.WorkspaceNode(
id="1",
type="node_type",
data=workspace.WorkspaceNodeData(title="Create Bundle", params={}),
position=workspace.Position(x=0, y=0),
)
)
ws.nodes.append(
workspace.WorkspaceNode(
id="2",
type="node_type",
data=workspace.WorkspaceNodeData(title="Bundle to Graph", params={}),
position=workspace.Position(x=100, y=0),
)
)
ws.nodes.append(
workspace.WorkspaceNode(
id="3",
type="node_type",
data=workspace.WorkspaceNodeData(title="Graph to Bundle", params={}),
position=workspace.Position(x=200, y=0),
)
)
ws.nodes.append(
workspace.WorkspaceNode(
id="4",
type="node_type",
data=workspace.WorkspaceNodeData(title="Dataframe to Bundle", params={}),
position=workspace.Position(x=300, y=0),
)
)
ws.edges = [
workspace.WorkspaceEdge(
id="1", source="1", target="2", sourceHandle="1", targetHandle="2"
),
workspace.WorkspaceEdge(
id="2", source="2", target="3", sourceHandle="2", targetHandle="3"
),
workspace.WorkspaceEdge(
id="3", source="3", target="4", sourceHandle="3", targetHandle="4"
),
]
await execute(ws)
assert all([node.data.error is None for node in ws.nodes])
if __name__ == "__main__":
pytest.main()