Spaces:
Running
Running
import pandas as pd | |
import pytest | |
import networkx as nx | |
from lynxkite.core import workspace, ops | |
from lynxkite_graph_analytics.core import Bundle, execute, ENV | |
async def test_execute_operation_not_in_catalog(): | |
ws = workspace.Workspace(env=ENV) | |
ws.nodes.append( | |
workspace.WorkspaceNode( | |
id="1", | |
type="node_type", | |
data=workspace.WorkspaceNodeData(title="Non existing op", params={}), | |
position=workspace.Position(x=0, y=0), | |
) | |
) | |
await execute(ws) | |
assert ws.nodes[0].data.error == "Operation not found in catalog" | |
async def test_execute_operation_inputs_correct_cast(): | |
# Test that the automatic casting of operation inputs works correctly. | |
op = ops.op_registration("test") | |
def create_bundle() -> Bundle: | |
df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]}) | |
return Bundle(dfs={"edges": df}) | |
def bundle_to_graph(graph: nx.Graph) -> nx.Graph: | |
return graph | |
def graph_to_bundle(bundle: Bundle) -> pd.DataFrame: | |
return list(bundle.dfs.values())[0] | |
def dataframe_to_bundle(bundle: Bundle) -> Bundle: | |
return bundle | |
ws = workspace.Workspace(env="test") | |
ws.nodes.append( | |
workspace.WorkspaceNode( | |
id="1", | |
type="node_type", | |
data=workspace.WorkspaceNodeData(title="Create Bundle", params={}), | |
position=workspace.Position(x=0, y=0), | |
) | |
) | |
ws.nodes.append( | |
workspace.WorkspaceNode( | |
id="2", | |
type="node_type", | |
data=workspace.WorkspaceNodeData(title="Bundle to Graph", params={}), | |
position=workspace.Position(x=100, y=0), | |
) | |
) | |
ws.nodes.append( | |
workspace.WorkspaceNode( | |
id="3", | |
type="node_type", | |
data=workspace.WorkspaceNodeData(title="Graph to Bundle", params={}), | |
position=workspace.Position(x=200, y=0), | |
) | |
) | |
ws.nodes.append( | |
workspace.WorkspaceNode( | |
id="4", | |
type="node_type", | |
data=workspace.WorkspaceNodeData(title="Dataframe to Bundle", params={}), | |
position=workspace.Position(x=300, y=0), | |
) | |
) | |
ws.edges = [ | |
workspace.WorkspaceEdge( | |
id="1", source="1", target="2", sourceHandle="output", targetHandle="graph" | |
), | |
workspace.WorkspaceEdge( | |
id="2", source="2", target="3", sourceHandle="output", targetHandle="bundle" | |
), | |
workspace.WorkspaceEdge( | |
id="3", source="3", target="4", sourceHandle="output", targetHandle="bundle" | |
), | |
] | |
await execute(ws) | |
assert all([node.data.error is None for node in ws.nodes]) | |
async def test_multiple_inputs(): | |
"""Make sure each input goes to the right argument.""" | |
op = ops.op_registration("test") | |
def one(): | |
return 1 | |
def two(): | |
return 2 | |
def is_smaller(a, b): | |
return a < b | |
ws = workspace.Workspace(env="test") | |
ws.nodes.append( | |
workspace.WorkspaceNode( | |
id="one", | |
type="cool", | |
data=workspace.WorkspaceNodeData(title="One", params={}), | |
position=workspace.Position(x=0, y=0), | |
) | |
) | |
ws.nodes.append( | |
workspace.WorkspaceNode( | |
id="two", | |
type="cool", | |
data=workspace.WorkspaceNodeData(title="Two", params={}), | |
position=workspace.Position(x=100, y=0), | |
) | |
) | |
ws.nodes.append( | |
workspace.WorkspaceNode( | |
id="smaller", | |
type="cool", | |
data=workspace.WorkspaceNodeData(title="Smaller?", params={}), | |
position=workspace.Position(x=200, y=0), | |
) | |
) | |
ws.edges = [ | |
workspace.WorkspaceEdge( | |
id="one", | |
source="one", | |
target="smaller", | |
sourceHandle="output", | |
targetHandle="a", | |
), | |
workspace.WorkspaceEdge( | |
id="two", | |
source="two", | |
target="smaller", | |
sourceHandle="output", | |
targetHandle="b", | |
), | |
] | |
await execute(ws) | |
assert ws.nodes[-1].data.display is True | |
# Flip the inputs. | |
ws.edges[0].targetHandle = "b" | |
ws.edges[1].targetHandle = "a" | |
await execute(ws) | |
assert ws.nodes[-1].data.display is False | |
if __name__ == "__main__": | |
pytest.main() | |