Spaces:
Running
Running
| import pandas as pd | |
| import pytest | |
| import networkx as nx | |
| from lynxkite.core import workspace | |
| from lynxkite_graph_analytics.lynxkite_ops import Bundle, execute, op | |
| async def test_execute_operation_not_in_catalog(): | |
| ws = workspace.Workspace(env="test") | |
| ws.nodes.append( | |
| workspace.WorkspaceNode( | |
| id="1", | |
| type="node_type", | |
| data=workspace.WorkspaceNodeData(title="Non existing op", params={}), | |
| position=workspace.Position(x=0, y=0), | |
| ) | |
| ) | |
| await execute(ws) | |
| assert ws.nodes[0].data.error == "Operation not found in catalog" | |
| async def test_execute_operation_inputs_correct_cast(): | |
| # Test that the automatic casting of operation inputs works correctly. | |
| def create_bundle() -> Bundle: | |
| df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]}) | |
| return Bundle(dfs={"edges": df}) | |
| def bundle_to_graph(graph: nx.Graph) -> nx.Graph: | |
| return graph | |
| def graph_to_bundle(bundle: Bundle) -> pd.DataFrame: | |
| return list(bundle.dfs.values())[0] | |
| def dataframe_to_bundle(bundle: Bundle) -> Bundle: | |
| return bundle | |
| ws = workspace.Workspace(env="test") | |
| ws.nodes.append( | |
| workspace.WorkspaceNode( | |
| id="1", | |
| type="node_type", | |
| data=workspace.WorkspaceNodeData(title="Create Bundle", params={}), | |
| position=workspace.Position(x=0, y=0), | |
| ) | |
| ) | |
| ws.nodes.append( | |
| workspace.WorkspaceNode( | |
| id="2", | |
| type="node_type", | |
| data=workspace.WorkspaceNodeData(title="Bundle to Graph", params={}), | |
| position=workspace.Position(x=100, y=0), | |
| ) | |
| ) | |
| ws.nodes.append( | |
| workspace.WorkspaceNode( | |
| id="3", | |
| type="node_type", | |
| data=workspace.WorkspaceNodeData(title="Graph to Bundle", params={}), | |
| position=workspace.Position(x=200, y=0), | |
| ) | |
| ) | |
| ws.nodes.append( | |
| workspace.WorkspaceNode( | |
| id="4", | |
| type="node_type", | |
| data=workspace.WorkspaceNodeData(title="Dataframe to Bundle", params={}), | |
| position=workspace.Position(x=300, y=0), | |
| ) | |
| ) | |
| ws.edges = [ | |
| workspace.WorkspaceEdge( | |
| id="1", source="1", target="2", sourceHandle="1", targetHandle="2" | |
| ), | |
| workspace.WorkspaceEdge( | |
| id="2", source="2", target="3", sourceHandle="2", targetHandle="3" | |
| ), | |
| workspace.WorkspaceEdge( | |
| id="3", source="3", target="4", sourceHandle="3", targetHandle="4" | |
| ), | |
| ] | |
| await execute(ws) | |
| assert all([node.data.error is None for node in ws.nodes]) | |
| if __name__ == "__main__": | |
| pytest.main() | |