import pandas as pd import pytest import networkx as nx from lynxkite.core import workspace, ops from lynxkite_graph_analytics.core import Bundle, execute, ENV async def test_execute_operation_not_in_catalog(): ws = workspace.Workspace(env=ENV) ws.nodes.append( workspace.WorkspaceNode( id="1", type="node_type", data=workspace.WorkspaceNodeData(title="Non existing op", params={}), position=workspace.Position(x=0, y=0), ) ) await execute(ws) assert ws.nodes[0].data.error == "Operation not found in catalog" async def test_execute_operation_inputs_correct_cast(): # Test that the automatic casting of operation inputs works correctly. op = ops.op_registration("test") @op("Create Bundle") def create_bundle() -> Bundle: df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]}) return Bundle(dfs={"edges": df}) @op("Bundle to Graph") def bundle_to_graph(graph: nx.Graph) -> nx.Graph: return graph @op("Graph to Bundle") def graph_to_bundle(bundle: Bundle) -> pd.DataFrame: return list(bundle.dfs.values())[0] @op("Dataframe to Bundle") def dataframe_to_bundle(bundle: Bundle) -> Bundle: return bundle ws = workspace.Workspace(env="test") ws.nodes.append( workspace.WorkspaceNode( id="1", type="node_type", data=workspace.WorkspaceNodeData(title="Create Bundle", params={}), position=workspace.Position(x=0, y=0), ) ) ws.nodes.append( workspace.WorkspaceNode( id="2", type="node_type", data=workspace.WorkspaceNodeData(title="Bundle to Graph", params={}), position=workspace.Position(x=100, y=0), ) ) ws.nodes.append( workspace.WorkspaceNode( id="3", type="node_type", data=workspace.WorkspaceNodeData(title="Graph to Bundle", params={}), position=workspace.Position(x=200, y=0), ) ) ws.nodes.append( workspace.WorkspaceNode( id="4", type="node_type", data=workspace.WorkspaceNodeData(title="Dataframe to Bundle", params={}), position=workspace.Position(x=300, y=0), ) ) ws.edges = [ workspace.WorkspaceEdge( id="1", source="1", target="2", sourceHandle="output", targetHandle="graph" ), workspace.WorkspaceEdge( id="2", source="2", target="3", sourceHandle="output", targetHandle="bundle" ), workspace.WorkspaceEdge( id="3", source="3", target="4", sourceHandle="output", targetHandle="bundle" ), ] await execute(ws) assert all([node.data.error is None for node in ws.nodes]) async def test_multiple_inputs(): """Make sure each input goes to the right argument.""" op = ops.op_registration("test") @op("One") def one(): return 1 @op("Two") def two(): return 2 @op("Smaller?", view="visualization") def is_smaller(a, b): return a < b ws = workspace.Workspace(env="test") ws.nodes.append( workspace.WorkspaceNode( id="one", type="cool", data=workspace.WorkspaceNodeData(title="One", params={}), position=workspace.Position(x=0, y=0), ) ) ws.nodes.append( workspace.WorkspaceNode( id="two", type="cool", data=workspace.WorkspaceNodeData(title="Two", params={}), position=workspace.Position(x=100, y=0), ) ) ws.nodes.append( workspace.WorkspaceNode( id="smaller", type="cool", data=workspace.WorkspaceNodeData(title="Smaller?", params={}), position=workspace.Position(x=200, y=0), ) ) ws.edges = [ workspace.WorkspaceEdge( id="one", source="one", target="smaller", sourceHandle="output", targetHandle="a", ), workspace.WorkspaceEdge( id="two", source="two", target="smaller", sourceHandle="output", targetHandle="b", ), ] await execute(ws) assert ws.nodes[-1].data.display is True # Flip the inputs. ws.edges[0].targetHandle = "b" ws.edges[1].targetHandle = "a" await execute(ws) assert ws.nodes[-1].data.display is False if __name__ == "__main__": pytest.main()