Chema JMLizano darabos commited on
Commit
ba7246a
·
unverified ·
1 Parent(s): 850a895

Adding tests (#50)

Browse files

* Added test for lynxkite-core

* Added tests for lynxkite-app

* Add tests for lynxkite-graph-analytics

* Store the actual Enum value when creating the CRDT object

* Better error message when workspace path is outside data dir

* Use enum for operation Inputs & Outputs position values instead of hardcoded strings everywhere

* Make WorkspaceNode positon an Enum

* Make Operation type an Enum, and rename it to view_type

* Remove register_area operation, it is not used anymore

* Add some missing docstrings and types

* update apiTypes.ts

---------

Co-authored-by: JMLizano <[email protected]>
Co-authored-by: Daniel Darabos <[email protected]>

lynxkite-app/src/lynxkite/app/crdt.py CHANGED
@@ -29,7 +29,11 @@ def ws_exception_handler(exception, log):
29
 
30
 
31
  class WebsocketServer(pycrdt_websocket.WebsocketServer):
32
- async def init_room(self, name):
 
 
 
 
33
  path = CRDT_PATH / f"{name}.crdt"
34
  assert path.is_relative_to(CRDT_PATH)
35
  ystore = pycrdt_websocket.ystore.FileYStore(path)
@@ -49,6 +53,8 @@ class WebsocketServer(pycrdt_websocket.WebsocketServer):
49
  ws["edges"] = pycrdt.Array()
50
  if "env" not in ws:
51
  ws["env"] = "unset"
 
 
52
  try_to_load_workspace(ws, name)
53
  room = pycrdt_websocket.YRoom(
54
  ystore=ystore, ydoc=ydoc, exception_handler=ws_exception_handler
@@ -62,6 +68,12 @@ class WebsocketServer(pycrdt_websocket.WebsocketServer):
62
  return room
63
 
64
  async def get_room(self, name: str) -> pycrdt_websocket.YRoom:
 
 
 
 
 
 
65
  if name not in self.rooms:
66
  self.rooms[name] = await self.init_room(name)
67
  room = self.rooms[name]
@@ -72,7 +84,7 @@ class WebsocketServer(pycrdt_websocket.WebsocketServer):
72
  last_ws_input = None
73
 
74
 
75
- def clean_input(ws_pyd):
76
  for node in ws_pyd.nodes:
77
  node.data.display = None
78
  node.data.error = None
@@ -83,21 +95,43 @@ def clean_input(ws_pyd):
83
  delattr(node, key)
84
 
85
 
86
- def crdt_update(crdt_obj, python_obj, boxes=set()):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
  if isinstance(python_obj, dict):
88
  for key, value in python_obj.items():
89
- if key in boxes:
90
  crdt_obj[key] = value
91
  elif isinstance(value, dict):
92
  if crdt_obj.get(key) is None:
93
  crdt_obj[key] = pycrdt.Map()
94
- crdt_update(crdt_obj[key], value, boxes)
95
  elif isinstance(value, list):
96
  if crdt_obj.get(key) is None:
97
  crdt_obj[key] = pycrdt.Array()
98
- crdt_update(crdt_obj[key], value, boxes)
99
  elif isinstance(value, enum.Enum):
100
- crdt_obj[key] = str(value)
101
  else:
102
  crdt_obj[key] = value
103
  elif isinstance(python_obj, list):
@@ -105,12 +139,14 @@ def crdt_update(crdt_obj, python_obj, boxes=set()):
105
  if isinstance(value, dict):
106
  if i >= len(crdt_obj):
107
  crdt_obj.append(pycrdt.Map())
108
- crdt_update(crdt_obj[i], value, boxes)
109
  elif isinstance(value, list):
110
  if i >= len(crdt_obj):
111
  crdt_obj.append(pycrdt.Array())
112
- crdt_update(crdt_obj[i], value, boxes)
113
  else:
 
 
114
  if i >= len(crdt_obj):
115
  crdt_obj.append(value)
116
  else:
@@ -119,18 +155,34 @@ def crdt_update(crdt_obj, python_obj, boxes=set()):
119
  raise ValueError("Invalid type:", python_obj)
120
 
121
 
122
- def try_to_load_workspace(ws, name):
 
 
 
 
 
 
123
  json_path = f"data/{name}"
124
  if os.path.exists(json_path):
125
  ws_pyd = workspace.load(json_path)
126
- crdt_update(ws, ws_pyd.model_dump(), boxes={"display"})
 
 
127
 
128
 
129
  last_known_versions = {}
130
  delayed_executions = {}
131
 
132
 
133
- async def workspace_changed(name, changes, ws_crdt):
 
 
 
 
 
 
 
 
134
  ws_pyd = workspace.Workspace.model_validate(ws_crdt.to_py())
135
  # Do not trigger execution for superficial changes.
136
  # This is a quick solution until we build proper caching.
@@ -154,22 +206,35 @@ async def workspace_changed(name, changes, ws_crdt):
154
  await execute(name, ws_crdt, ws_pyd)
155
 
156
 
157
- async def execute(name, ws_crdt, ws_pyd, delay=0):
 
 
 
 
 
 
 
 
 
 
158
  if delay:
159
  try:
160
  await asyncio.sleep(delay)
161
  except asyncio.CancelledError:
162
  return
163
  path = DATA_PATH / name
164
- assert path.is_relative_to(DATA_PATH)
 
165
  workspace.save(ws_pyd, path)
166
  await workspace.execute(ws_pyd)
167
  workspace.save(ws_pyd, path)
 
 
168
  with ws_crdt.doc.transaction():
169
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
170
  if "data" not in nc:
171
  nc["data"] = pycrdt.Map()
172
- # Display is added as an opaque Box.
173
  nc["data"]["display"] = np.data.display
174
  nc["data"]["error"] = np.data.error
175
 
 
29
 
30
 
31
  class WebsocketServer(pycrdt_websocket.WebsocketServer):
32
+ async def init_room(self, name: str) -> pycrdt_websocket.YRoom:
33
+ """Initialize a room for the workspace with the given name.
34
+
35
+ The workspace is loaded from "crdt_data" if it exists there, or from "data", or a new workspace is created.
36
+ """
37
  path = CRDT_PATH / f"{name}.crdt"
38
  assert path.is_relative_to(CRDT_PATH)
39
  ystore = pycrdt_websocket.ystore.FileYStore(path)
 
53
  ws["edges"] = pycrdt.Array()
54
  if "env" not in ws:
55
  ws["env"] = "unset"
56
+ # We have two possible sources of truth for the workspaces, the YStore and the JSON files.
57
+ # In case we didn't find the workspace in the YStore, we try to load it from the JSON files.
58
  try_to_load_workspace(ws, name)
59
  room = pycrdt_websocket.YRoom(
60
  ystore=ystore, ydoc=ydoc, exception_handler=ws_exception_handler
 
68
  return room
69
 
70
  async def get_room(self, name: str) -> pycrdt_websocket.YRoom:
71
+ """Get a room by name.
72
+
73
+ This method overrides the parent get_room method. The original creates an empty room,
74
+ with no associated Ydoc. Instead, we want to initialize the the room with a Workspace
75
+ object.
76
+ """
77
  if name not in self.rooms:
78
  self.rooms[name] = await self.init_room(name)
79
  room = self.rooms[name]
 
84
  last_ws_input = None
85
 
86
 
87
+ def clean_input(ws_pyd: workspace.Workspace):
88
  for node in ws_pyd.nodes:
89
  node.data.display = None
90
  node.data.error = None
 
95
  delattr(node, key)
96
 
97
 
98
+ def crdt_update(
99
+ crdt_obj: pycrdt.Map | pycrdt.Array,
100
+ python_obj: dict | list,
101
+ non_collaborative_fields: set[str] = set(),
102
+ ):
103
+ """Update a CRDT object to match a Python object.
104
+
105
+ The types between the CRDT object and the Python object must match. If the Python object
106
+ is a dict, the CRDT object must be a Map. If the Python object is a list, the CRDT object
107
+ must be an Array.
108
+
109
+ Args:
110
+ crdt_obj: The CRDT object, that will be updated to match the Python object.
111
+ python_obj: The Python object to update with.
112
+ non_collaborative_fields: List of fields to treat as a black box. Black boxes are
113
+ updated as a whole, instead of having a fine-grained data structure to edit
114
+ collaboratively. Useful for complex fields that contain auto-generated data or
115
+ metadata.
116
+ The default is an empty set.
117
+
118
+ Raises:
119
+ ValueError: If the Python object provided is not a dict or list.
120
+ """
121
  if isinstance(python_obj, dict):
122
  for key, value in python_obj.items():
123
+ if key in non_collaborative_fields:
124
  crdt_obj[key] = value
125
  elif isinstance(value, dict):
126
  if crdt_obj.get(key) is None:
127
  crdt_obj[key] = pycrdt.Map()
128
+ crdt_update(crdt_obj[key], value, non_collaborative_fields)
129
  elif isinstance(value, list):
130
  if crdt_obj.get(key) is None:
131
  crdt_obj[key] = pycrdt.Array()
132
+ crdt_update(crdt_obj[key], value, non_collaborative_fields)
133
  elif isinstance(value, enum.Enum):
134
+ crdt_obj[key] = str(value.value)
135
  else:
136
  crdt_obj[key] = value
137
  elif isinstance(python_obj, list):
 
139
  if isinstance(value, dict):
140
  if i >= len(crdt_obj):
141
  crdt_obj.append(pycrdt.Map())
142
+ crdt_update(crdt_obj[i], value, non_collaborative_fields)
143
  elif isinstance(value, list):
144
  if i >= len(crdt_obj):
145
  crdt_obj.append(pycrdt.Array())
146
+ crdt_update(crdt_obj[i], value, non_collaborative_fields)
147
  else:
148
+ if isinstance(value, enum.Enum):
149
+ value = str(value.value)
150
  if i >= len(crdt_obj):
151
  crdt_obj.append(value)
152
  else:
 
155
  raise ValueError("Invalid type:", python_obj)
156
 
157
 
158
+ def try_to_load_workspace(ws: pycrdt.Map, name: str):
159
+ """Load the workspace `name`, if it exists, and update the `ws` CRDT object to match its contents.
160
+
161
+ Args:
162
+ ws: CRDT object to udpate with the workspace contents.
163
+ name: Name of the workspace to load.
164
+ """
165
  json_path = f"data/{name}"
166
  if os.path.exists(json_path):
167
  ws_pyd = workspace.load(json_path)
168
+ # We treat the display field as a black box, since it is a large
169
+ # dictionary that is meant to change as a whole.
170
+ crdt_update(ws, ws_pyd.model_dump(), non_collaborative_fields={"display"})
171
 
172
 
173
  last_known_versions = {}
174
  delayed_executions = {}
175
 
176
 
177
+ async def workspace_changed(name: str, changes: pycrdt.MapEvent, ws_crdt: pycrdt.Map):
178
+ """Callback to react to changes in the workspace.
179
+
180
+
181
+ Args:
182
+ name: Name of the workspace.
183
+ changes: Changes performed to the workspace.
184
+ ws_crdt: CRDT object representing the workspace.
185
+ """
186
  ws_pyd = workspace.Workspace.model_validate(ws_crdt.to_py())
187
  # Do not trigger execution for superficial changes.
188
  # This is a quick solution until we build proper caching.
 
206
  await execute(name, ws_crdt, ws_pyd)
207
 
208
 
209
+ async def execute(
210
+ name: str, ws_crdt: pycrdt.Map, ws_pyd: workspace.Workspace, delay: int = 0
211
+ ):
212
+ """Execute the workspace and update the CRDT object with the results.
213
+
214
+ Args:
215
+ name: Name of the workspace.
216
+ ws_crdt: CRDT object representing the workspace.
217
+ ws_pyd: Workspace object to execute.
218
+ delay: Wait time before executing the workspace. The default is 0.
219
+ """
220
  if delay:
221
  try:
222
  await asyncio.sleep(delay)
223
  except asyncio.CancelledError:
224
  return
225
  path = DATA_PATH / name
226
+ assert path.is_relative_to(DATA_PATH), "Provided workspace path is invalid"
227
+ # Save user changes before executing, in case the execution fails.
228
  workspace.save(ws_pyd, path)
229
  await workspace.execute(ws_pyd)
230
  workspace.save(ws_pyd, path)
231
+ # Execution happened on the Python object, we need to replicate
232
+ # the results to the CRDT object.
233
  with ws_crdt.doc.transaction():
234
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
235
  if "data" not in nc:
236
  nc["data"] = pycrdt.Map()
237
+ # Display is added as a non collaborative field.
238
  nc["data"]["display"] = np.data.display
239
  nc["data"]["error"] = np.data.error
240
 
lynxkite-app/tests/test_crdt.py ADDED
@@ -0,0 +1,72 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from enum import Enum
2
+ import pycrdt
3
+ import pytest
4
+ from lynxkite.app.crdt import crdt_update
5
+
6
+
7
+ @pytest.fixture
8
+ def empty_dict_workspace():
9
+ ydoc = pycrdt.Doc()
10
+ ydoc["workspace"] = ws = pycrdt.Map()
11
+ yield ws
12
+
13
+
14
+ @pytest.fixture
15
+ def empty_list_workspace():
16
+ ydoc = pycrdt.Doc()
17
+ ydoc["workspace"] = ws = pycrdt.Array()
18
+ yield ws
19
+
20
+
21
+ class MyEnum(Enum):
22
+ VALUE = 1
23
+
24
+
25
+ @pytest.mark.parametrize(
26
+ "python_obj,expected",
27
+ [
28
+ (
29
+ {
30
+ "key1": "value1",
31
+ "key2": {
32
+ "nested_key1": "nested_value1",
33
+ "nested_key2": ["nested_value2"],
34
+ "nested_key3": MyEnum.VALUE,
35
+ },
36
+ },
37
+ {
38
+ "key1": "value1",
39
+ "key2": {
40
+ "nested_key1": "nested_value1",
41
+ "nested_key2": ["nested_value2"],
42
+ "nested_key3": "1",
43
+ },
44
+ },
45
+ )
46
+ ],
47
+ )
48
+ def test_crdt_update_with_dict(empty_dict_workspace, python_obj, expected):
49
+ crdt_update(empty_dict_workspace, python_obj)
50
+ assert empty_dict_workspace.to_py() == expected
51
+
52
+
53
+ @pytest.mark.parametrize(
54
+ "python_obj,expected",
55
+ [
56
+ (
57
+ [
58
+ "value1",
59
+ {"nested_key1": "nested_value1", "nested_key2": ["nested_value2"]},
60
+ MyEnum.VALUE,
61
+ ],
62
+ [
63
+ "value1",
64
+ {"nested_key1": "nested_value1", "nested_key2": ["nested_value2"]},
65
+ "1",
66
+ ],
67
+ ),
68
+ ],
69
+ )
70
+ def test_crdt_update_with_list(empty_list_workspace, python_obj, expected):
71
+ crdt_update(empty_list_workspace, python_obj)
72
+ assert empty_list_workspace.to_py() == expected
lynxkite-app/tests/test_main.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import uuid
2
+ from fastapi.testclient import TestClient
3
+ from lynxkite.app.main import app, detect_plugins, DATA_PATH
4
+ import os
5
+
6
+
7
+ client = TestClient(app)
8
+
9
+
10
+ def test_detect_plugins_with_plugins():
11
+ # This test assumes that these plugins are installed as part of the testing process.
12
+ plugins = detect_plugins()
13
+ assert all(
14
+ plugin in plugins.keys()
15
+ for plugin in [
16
+ "lynxkite_plugins.graph_analytics",
17
+ "lynxkite_plugins.lynxscribe",
18
+ "lynxkite_plugins.pillow_example",
19
+ ]
20
+ )
21
+
22
+
23
+ def test_get_catalog():
24
+ response = client.get("/api/catalog")
25
+ assert response.status_code == 200
26
+
27
+
28
+ def test_save_and_load():
29
+ save_request = {
30
+ "path": "test",
31
+ "ws": {
32
+ "env": "test",
33
+ "nodes": [
34
+ {
35
+ "id": "Node_1",
36
+ "type": "basic",
37
+ "data": {
38
+ "display": None,
39
+ "error": "Unknown operation.",
40
+ "title": "Test node",
41
+ "params": {"param1": "value"},
42
+ },
43
+ "position": {"x": -493.5496596237119, "y": 20.90123252513356},
44
+ }
45
+ ],
46
+ "edges": [],
47
+ },
48
+ }
49
+ response = client.post("/api/save", json=save_request)
50
+ saved_ws = response.json()
51
+ assert response.status_code == 200
52
+ response = client.get("/api/load?path=test")
53
+ assert response.status_code == 200
54
+ assert saved_ws == response.json()
55
+
56
+
57
+ def test_list_dir():
58
+ test_dir = str(uuid.uuid4())
59
+ test_dir_full_path = DATA_PATH / test_dir
60
+ test_dir_full_path.mkdir(exist_ok=True)
61
+ test_file = test_dir_full_path / "test_file.txt"
62
+ test_file.touch()
63
+ response = client.get(f"/api/dir/list?path={str(test_dir)}")
64
+ assert response.status_code == 200
65
+ assert len(response.json()) == 1
66
+ assert response.json()[0]["name"] == f"{test_dir}/test_file.txt"
67
+ assert response.json()[0]["type"] == "workspace"
68
+ test_file.unlink()
69
+ test_dir_full_path.rmdir()
70
+
71
+
72
+ def test_make_dir():
73
+ dir_name = str(uuid.uuid4())
74
+ response = client.post("/api/dir/mkdir", json={"path": dir_name})
75
+ assert response.status_code == 200
76
+ assert os.path.exists(DATA_PATH / dir_name)
77
+ os.rmdir(DATA_PATH / dir_name)
lynxkite-app/web/src/apiTypes.ts CHANGED
@@ -5,21 +5,21 @@
5
  /* Do not modify it by hand - just update the pydantic models and then re-run the script
6
  */
7
 
8
- /* eslint-disable */
9
- /**
10
- * This file was automatically generated by json-schema-to-typescript.
11
- * DO NOT MODIFY IT BY HAND. Instead, modify the source JSONSchema file,
12
- * and run json-schema-to-typescript to regenerate this file.
13
- */
14
-
15
  export interface BaseConfig {
16
  [k: string]: unknown;
17
  }
18
- export interface Position {
19
  x: number;
20
  y: number;
21
  [k: string]: unknown;
22
  }
 
 
 
 
 
 
 
23
  export interface Workspace {
24
  env?: string;
25
  nodes?: WorkspaceNode[];
@@ -30,7 +30,7 @@ export interface WorkspaceNode {
30
  id: string;
31
  type: string;
32
  data: WorkspaceNodeData;
33
- position: Position;
34
  [k: string]: unknown;
35
  }
36
  export interface WorkspaceNodeData {
 
5
  /* Do not modify it by hand - just update the pydantic models and then re-run the script
6
  */
7
 
 
 
 
 
 
 
 
8
  export interface BaseConfig {
9
  [k: string]: unknown;
10
  }
11
+ export interface NodePosition {
12
  x: number;
13
  y: number;
14
  [k: string]: unknown;
15
  }
16
+ /**
17
+ * A workspace is a representation of a computational graph that consists of nodes and edges.
18
+ *
19
+ * Each node represents an operation or task, and the edges represent the flow of data between
20
+ * the nodes. Each workspace is associated with an environment, which determines the operations
21
+ * that can be performed in the workspace and the execution method for the operations.
22
+ */
23
  export interface Workspace {
24
  env?: string;
25
  nodes?: WorkspaceNode[];
 
30
  id: string;
31
  type: string;
32
  data: WorkspaceNodeData;
33
+ position: NodePosition;
34
  [k: string]: unknown;
35
  }
36
  export interface WorkspaceNodeData {
lynxkite-core/src/lynxkite/core/executors/one_by_one.py CHANGED
@@ -46,17 +46,19 @@ def register(env: str, cache: bool = True):
46
  ops.EXECUTORS[env] = lambda ws: execute(ws, ops.CATALOGS[env], cache=cache)
47
 
48
 
49
- def get_stages(ws, catalog):
50
  """Inputs on top/bottom are batch inputs. We decompose the graph into a DAG of components along these edges."""
51
  nodes = {n.id: n for n in ws.nodes}
52
  batch_inputs = {}
53
  inputs = {}
 
 
54
  for edge in ws.edges:
55
  inputs.setdefault(edge.target, []).append(edge.source)
56
  node = nodes[edge.target]
57
  op = catalog[node.data.title]
58
  i = op.inputs[edge.targetHandle]
59
- if i.position in "top or bottom":
60
  batch_inputs.setdefault(edge.target, []).append(edge.source)
61
  stages = []
62
  for bt, bss in batch_inputs.items():
@@ -93,7 +95,7 @@ async def await_if_needed(obj):
93
  return obj
94
 
95
 
96
- async def execute(ws, catalog, cache=None):
97
  nodes = {n.id: n for n in ws.nodes}
98
  contexts = {n.id: Context(node=n) for n in ws.nodes}
99
  edges = {n.id: [] for n in ws.nodes}
@@ -108,7 +110,12 @@ async def execute(ws, catalog, cache=None):
108
  node.data.error = f'Operation "{node.data.title}" not found.'
109
  continue
110
  # Start tasks for nodes that have no non-batch inputs.
111
- if all([i.position in "top or bottom" for i in op.inputs.values()]):
 
 
 
 
 
112
  tasks[node.id] = [NO_INPUT]
113
  batch_inputs = {}
114
  # Run the rest until we run out of tasks.
@@ -131,7 +138,7 @@ async def execute(ws, catalog, cache=None):
131
  try:
132
  inputs = []
133
  for i in op.inputs.values():
134
- if i.position in "top or bottom":
135
  assert (n, i.name) in batch_inputs, f"{i.name} is missing"
136
  inputs.append(batch_inputs[(n, i.name)])
137
  else:
@@ -156,16 +163,16 @@ async def execute(ws, catalog, cache=None):
156
  results.extend(result)
157
  else: # Finished all tasks without errors.
158
  if (
159
- op.type == "visualization"
160
- or op.type == "table_view"
161
- or op.type == "image"
162
  ):
163
  data.display = results[0]
164
  for edge in edges[node.id]:
165
  t = nodes[edge.target]
166
  op = catalog[t.data.title]
167
  i = op.inputs[edge.targetHandle]
168
- if i.position in "top or bottom":
169
  batch_inputs.setdefault(
170
  (edge.target, edge.targetHandle), []
171
  ).extend(results)
 
46
  ops.EXECUTORS[env] = lambda ws: execute(ws, ops.CATALOGS[env], cache=cache)
47
 
48
 
49
+ def get_stages(ws: workspace.Workspace, catalog: dict[str, ops.Op]):
50
  """Inputs on top/bottom are batch inputs. We decompose the graph into a DAG of components along these edges."""
51
  nodes = {n.id: n for n in ws.nodes}
52
  batch_inputs = {}
53
  inputs = {}
54
+ # For each edge in the workspacce, we record the inputs (sources)
55
+ # required for each node (target).
56
  for edge in ws.edges:
57
  inputs.setdefault(edge.target, []).append(edge.source)
58
  node = nodes[edge.target]
59
  op = catalog[node.data.title]
60
  i = op.inputs[edge.targetHandle]
61
+ if i.side in [ops.Side.TOP, ops.Side.BOTTOM]:
62
  batch_inputs.setdefault(edge.target, []).append(edge.source)
63
  stages = []
64
  for bt, bss in batch_inputs.items():
 
95
  return obj
96
 
97
 
98
+ async def execute(ws: workspace.Workspace, catalog: dict[str, ops.Op], cache=None):
99
  nodes = {n.id: n for n in ws.nodes}
100
  contexts = {n.id: Context(node=n) for n in ws.nodes}
101
  edges = {n.id: [] for n in ws.nodes}
 
110
  node.data.error = f'Operation "{node.data.title}" not found.'
111
  continue
112
  # Start tasks for nodes that have no non-batch inputs.
113
+ if all(
114
+ [
115
+ i.side in [ops.Side.TOP, ops.Side.BOTTOM]
116
+ for i in op.inputs.values()
117
+ ]
118
+ ):
119
  tasks[node.id] = [NO_INPUT]
120
  batch_inputs = {}
121
  # Run the rest until we run out of tasks.
 
138
  try:
139
  inputs = []
140
  for i in op.inputs.values():
141
+ if i.side in [ops.Side.TOP, ops.Side.BOTTOM]:
142
  assert (n, i.name) in batch_inputs, f"{i.name} is missing"
143
  inputs.append(batch_inputs[(n, i.name)])
144
  else:
 
163
  results.extend(result)
164
  else: # Finished all tasks without errors.
165
  if (
166
+ op.view_type == ops.ViewType.VISUALIZATION
167
+ or op.view_type == ops.ViewType.TABLE_VIEW
168
+ or op.view_type == ops.ViewType.IMAGE
169
  ):
170
  data.display = results[0]
171
  for edge in edges[node.id]:
172
  t = nodes[edge.target]
173
  op = catalog[t.data.title]
174
  i = op.inputs[edge.targetHandle]
175
+ if i.side in [ops.Side.TOP, ops.Side.BOTTOM]:
176
  batch_inputs.setdefault(
177
  (edge.target, edge.targetHandle), []
178
  ).extend(results)
lynxkite-core/src/lynxkite/core/ops.py CHANGED
@@ -61,16 +61,23 @@ class Parameter(BaseConfig):
61
  return Parameter(name=name, default=default, type=type)
62
 
63
 
 
 
 
 
 
 
 
64
  class Input(BaseConfig):
65
  name: str
66
  type: Type
67
- position: str = "left"
68
 
69
 
70
  class Output(BaseConfig):
71
  name: str
72
  type: Type
73
- position: str = "right"
74
 
75
 
76
  MULTI_INPUT = Input(name="multi", type="*")
@@ -84,13 +91,22 @@ def basic_outputs(*names):
84
  return {name: Output(name=name, type=None) for name in names}
85
 
86
 
 
 
 
 
 
 
 
 
 
87
  class Op(BaseConfig):
88
  func: typing.Callable = pydantic.Field(exclude=True)
89
  name: str
90
  params: dict[str, Parameter]
91
  inputs: dict[str, Input]
92
  outputs: dict[str, Output]
93
- type: str = "basic" # The UI to use for this operation.
94
 
95
  def __call__(self, *inputs, **params):
96
  # Convert parameters.
@@ -133,7 +149,7 @@ def op(env: str, name: str, *, view="basic", outputs=None):
133
  params=params,
134
  inputs=inputs,
135
  outputs=_outputs,
136
- type=view,
137
  )
138
  CATALOGS.setdefault(env, {})
139
  CATALOGS[env][name] = op
@@ -143,25 +159,25 @@ def op(env: str, name: str, *, view="basic", outputs=None):
143
  return decorator
144
 
145
 
146
- def input_position(**kwargs):
147
- """Decorator for specifying unusual positions for the inputs."""
148
 
149
  def decorator(func):
150
  op = func.__op__
151
  for k, v in kwargs.items():
152
- op.inputs[k].position = v
153
  return func
154
 
155
  return decorator
156
 
157
 
158
- def output_position(**kwargs):
159
- """Decorator for specifying unusual positions for the outputs."""
160
 
161
  def decorator(func):
162
  op = func.__op__
163
  for k, v in kwargs.items():
164
- op.outputs[k].position = v
165
  return func
166
 
167
  return decorator
@@ -173,7 +189,13 @@ def no_op(*args, **kwargs):
173
  return None
174
 
175
 
176
- def register_passive_op(env: str, name: str, inputs=[], outputs=["output"], params=[]):
 
 
 
 
 
 
177
  """A passive operation has no associated code."""
178
  op = Op(
179
  func=no_op,
@@ -209,16 +231,3 @@ def op_registration(env: str):
209
 
210
  def passive_op_registration(env: str):
211
  return functools.partial(register_passive_op, env)
212
-
213
-
214
- def register_area(env, name, params=[]):
215
- """A node that represents an area. It can contain other nodes, but does not restrict movement in any way."""
216
- op = Op(
217
- func=no_op,
218
- name=name,
219
- params={p.name: p for p in params},
220
- inputs={},
221
- outputs={},
222
- type="area",
223
- )
224
- CATALOGS[env][name] = op
 
61
  return Parameter(name=name, default=default, type=type)
62
 
63
 
64
+ class Side(enum.StrEnum):
65
+ LEFT = "left"
66
+ RIGHT = "right"
67
+ TOP = "top"
68
+ BOTTOM = "bottom"
69
+
70
+
71
  class Input(BaseConfig):
72
  name: str
73
  type: Type
74
+ side: Side = Side.LEFT
75
 
76
 
77
  class Output(BaseConfig):
78
  name: str
79
  type: Type
80
+ side: Side = Side.RIGHT
81
 
82
 
83
  MULTI_INPUT = Input(name="multi", type="*")
 
91
  return {name: Output(name=name, type=None) for name in names}
92
 
93
 
94
+ class ViewType(enum.StrEnum):
95
+ """Represents the visualization options for an operation."""
96
+
97
+ BASIC = "basic"
98
+ VISUALIZATION = "visualization"
99
+ IMAGE = "image"
100
+ TABLE_VIEW = "table_view"
101
+
102
+
103
  class Op(BaseConfig):
104
  func: typing.Callable = pydantic.Field(exclude=True)
105
  name: str
106
  params: dict[str, Parameter]
107
  inputs: dict[str, Input]
108
  outputs: dict[str, Output]
109
+ view_type: ViewType = ViewType.BASIC # The UI to use for this operation.
110
 
111
  def __call__(self, *inputs, **params):
112
  # Convert parameters.
 
149
  params=params,
150
  inputs=inputs,
151
  outputs=_outputs,
152
+ view_type=view,
153
  )
154
  CATALOGS.setdefault(env, {})
155
  CATALOGS[env][name] = op
 
159
  return decorator
160
 
161
 
162
+ def input_side(**kwargs):
163
+ """Decorator for specifying unusual sides for the inputs."""
164
 
165
  def decorator(func):
166
  op = func.__op__
167
  for k, v in kwargs.items():
168
+ op.inputs[k].side = v
169
  return func
170
 
171
  return decorator
172
 
173
 
174
+ def output_side(**kwargs):
175
+ """Decorator for specifying unusual sides for the outputs."""
176
 
177
  def decorator(func):
178
  op = func.__op__
179
  for k, v in kwargs.items():
180
+ op.outputs[k].side = v
181
  return func
182
 
183
  return decorator
 
189
  return None
190
 
191
 
192
+ def register_passive_op(
193
+ env: str,
194
+ name: str,
195
+ inputs: list[Input] = [],
196
+ outputs: list[Output] = ["output"],
197
+ params: list[Parameter] = [],
198
+ ):
199
  """A passive operation has no associated code."""
200
  op = Op(
201
  func=no_op,
 
231
 
232
  def passive_op_registration(env: str):
233
  return functools.partial(register_passive_op, env)
 
 
 
 
 
 
 
 
 
 
 
 
 
lynxkite-core/src/lynxkite/core/workspace.py CHANGED
@@ -14,7 +14,8 @@ class BaseConfig(pydantic.BaseModel):
14
  )
15
 
16
 
17
- class Position(BaseConfig):
 
18
  x: float
19
  y: float
20
 
@@ -29,10 +30,12 @@ class WorkspaceNodeData(BaseConfig):
29
 
30
 
31
  class WorkspaceNode(BaseConfig):
 
 
32
  id: str
33
  type: str
34
  data: WorkspaceNodeData
35
- position: Position
36
 
37
 
38
  class WorkspaceEdge(BaseConfig):
@@ -44,6 +47,13 @@ class WorkspaceEdge(BaseConfig):
44
 
45
 
46
  class Workspace(BaseConfig):
 
 
 
 
 
 
 
47
  env: str = ""
48
  nodes: list[WorkspaceNode] = dataclasses.field(default_factory=list)
49
  edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)
@@ -55,6 +65,7 @@ async def execute(ws: Workspace):
55
 
56
 
57
  def save(ws: Workspace, path: str):
 
58
  j = ws.model_dump_json(indent=2)
59
  dirname, basename = os.path.split(path)
60
  # Create temp file in the same directory to make sure it's on the same filesystem.
@@ -66,7 +77,17 @@ def save(ws: Workspace, path: str):
66
  os.replace(temp_name, path)
67
 
68
 
69
- def load(path: str):
 
 
 
 
 
 
 
 
 
 
70
  with open(path) as f:
71
  j = f.read()
72
  ws = Workspace.model_validate_json(j)
@@ -75,19 +96,32 @@ def load(path: str):
75
  return ws
76
 
77
 
78
- def _update_metadata(ws):
79
- catalog = ops.CATALOGS.get(ws.env, {})
 
 
 
 
 
 
 
 
 
 
 
 
80
  nodes = {node.id: node for node in ws.nodes}
81
  done = set()
82
  while len(done) < len(nodes):
83
  for node in ws.nodes:
84
  if node.id in done:
 
85
  continue
86
  data = node.data
87
  op = catalog.get(data.title)
88
  if op:
89
  data.meta = op
90
- node.type = op.type
91
  if data.error == "Unknown operation.":
92
  data.error = None
93
  else:
 
14
  )
15
 
16
 
17
+ # TODO: Rename this to coordinates
18
+ class NodePosition(BaseConfig):
19
  x: float
20
  y: float
21
 
 
30
 
31
 
32
  class WorkspaceNode(BaseConfig):
33
+ # The naming of these attributes matches the ones for the NodeBase type in React flow
34
+ # modyfing them will break the frontend.
35
  id: str
36
  type: str
37
  data: WorkspaceNodeData
38
+ position: NodePosition
39
 
40
 
41
  class WorkspaceEdge(BaseConfig):
 
47
 
48
 
49
  class Workspace(BaseConfig):
50
+ """A workspace is a representation of a computational graph that consists of nodes and edges.
51
+
52
+ Each node represents an operation or task, and the edges represent the flow of data between
53
+ the nodes. Each workspace is associated with an environment, which determines the operations
54
+ that can be performed in the workspace and the execution method for the operations.
55
+ """
56
+
57
  env: str = ""
58
  nodes: list[WorkspaceNode] = dataclasses.field(default_factory=list)
59
  edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)
 
65
 
66
 
67
  def save(ws: Workspace, path: str):
68
+ """Persist a workspace to a local file in JSON format."""
69
  j = ws.model_dump_json(indent=2)
70
  dirname, basename = os.path.split(path)
71
  # Create temp file in the same directory to make sure it's on the same filesystem.
 
77
  os.replace(temp_name, path)
78
 
79
 
80
+ def load(path: str) -> Workspace:
81
+ """Load a workspace from a file.
82
+
83
+ After loading the workspace, the metadata of the workspace is updated.
84
+
85
+ Args:
86
+ path (str): The path to the file to load the workspace from.
87
+
88
+ Returns:
89
+ Workspace: The loaded workspace object, with updated metadata.
90
+ """
91
  with open(path) as f:
92
  j = f.read()
93
  ws = Workspace.model_validate_json(j)
 
96
  return ws
97
 
98
 
99
+ def _update_metadata(ws: Workspace) -> Workspace:
100
+ """Update the metadata of the given workspace object.
101
+
102
+ The metadata is the information about the operations that the nodes in the workspace represent,
103
+ like the parameters and their possible values.
104
+ This information comes from the catalog of operations for the environment of the workspace.
105
+
106
+ Args:
107
+ ws: The workspace object to update.
108
+
109
+ Returns:
110
+ Workspace: The updated workspace object.
111
+ """
112
+ catalog: dict[str, ops.Op] = ops.CATALOGS.get(ws.env, {})
113
  nodes = {node.id: node for node in ws.nodes}
114
  done = set()
115
  while len(done) < len(nodes):
116
  for node in ws.nodes:
117
  if node.id in done:
118
+ # TODO: Can nodes with the same ID reference different operations?
119
  continue
120
  data = node.data
121
  op = catalog.get(data.title)
122
  if op:
123
  data.meta = op
124
+ node.type = op.view_type
125
  if data.error == "Unknown operation.":
126
  data.error = None
127
  else:
lynxkite-core/tests/test_ops.py ADDED
@@ -0,0 +1,89 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import inspect
2
+ from lynxkite.core import ops
3
+ import enum
4
+
5
+
6
+ def test_op_decorator_no_params_no_types_default_sides():
7
+ @ops.op(env="test", name="add", view=ops.ViewType.BASIC, outputs=["result"])
8
+ def add(a, b):
9
+ return a + b
10
+
11
+ assert add.__op__.name == "add"
12
+ assert add.__op__.params == {}
13
+ assert add.__op__.inputs == {
14
+ "a": ops.Input(name="a", type=inspect._empty, side=ops.Side.LEFT),
15
+ "b": ops.Input(name="b", type=inspect._empty, side=ops.Side.LEFT),
16
+ }
17
+ assert add.__op__.outputs == {
18
+ "result": ops.Output(name="result", type=None, side=ops.Side.RIGHT)
19
+ }
20
+ assert add.__op__.view_type == ops.ViewType.BASIC
21
+ assert ops.CATALOGS["test"]["add"] == add.__op__
22
+
23
+
24
+ def test_op_decorator_custom_sides():
25
+ @ops.input_side(a=ops.Side.RIGHT, b=ops.Side.TOP)
26
+ @ops.output_side(result=ops.Side.BOTTOM)
27
+ @ops.op(env="test", name="add", view=ops.ViewType.BASIC, outputs=["result"])
28
+ def add(a, b):
29
+ return a + b
30
+
31
+ assert add.__op__.name == "add"
32
+ assert add.__op__.params == {}
33
+ assert add.__op__.inputs == {
34
+ "a": ops.Input(name="a", type=inspect._empty, side=ops.Side.RIGHT),
35
+ "b": ops.Input(name="b", type=inspect._empty, side=ops.Side.TOP),
36
+ }
37
+ assert add.__op__.outputs == {
38
+ "result": ops.Output(name="result", type=None, side=ops.Side.BOTTOM)
39
+ }
40
+ assert add.__op__.view_type == ops.ViewType.BASIC
41
+ assert ops.CATALOGS["test"]["add"] == add.__op__
42
+
43
+
44
+ def test_op_decorator_with_params_and_types_():
45
+ @ops.op(env="test", name="multiply", view=ops.ViewType.BASIC, outputs=["result"])
46
+ def multiply(a: int, b: float = 2.0, *, param: str = "param"):
47
+ return a * b
48
+
49
+ assert multiply.__op__.name == "multiply"
50
+ assert multiply.__op__.params == {
51
+ "param": ops.Parameter(name="param", default="param", type=str)
52
+ }
53
+ assert multiply.__op__.inputs == {
54
+ "a": ops.Input(name="a", type=int, side=ops.Side.LEFT),
55
+ "b": ops.Input(name="b", type=float, side=ops.Side.LEFT),
56
+ }
57
+ assert multiply.__op__.outputs == {
58
+ "result": ops.Output(name="result", type=None, side=ops.Side.RIGHT)
59
+ }
60
+ assert multiply.__op__.view_type == ops.ViewType.BASIC
61
+ assert ops.CATALOGS["test"]["multiply"] == multiply.__op__
62
+
63
+
64
+ def test_op_decorator_with_complex_types():
65
+ class Color(enum.Enum):
66
+ RED = 1
67
+ GREEN = 2
68
+ BLUE = 3
69
+
70
+ @ops.op(env="test", name="color_op", view=ops.ViewType.BASIC, outputs=["result"])
71
+ def complex_op(color: Color, color_list: list[Color], color_dict: dict[str, Color]):
72
+ return color.name
73
+
74
+ assert complex_op.__op__.name == "color_op"
75
+ assert complex_op.__op__.params == {}
76
+ assert complex_op.__op__.inputs == {
77
+ "color": ops.Input(name="color", type=Color, side=ops.Side.LEFT),
78
+ "color_list": ops.Input(
79
+ name="color_list", type=list[Color], side=ops.Side.LEFT
80
+ ),
81
+ "color_dict": ops.Input(
82
+ name="color_dict", type=dict[str, Color], side=ops.Side.LEFT
83
+ ),
84
+ }
85
+ assert complex_op.__op__.view_type == ops.ViewType.BASIC
86
+ assert complex_op.__op__.outputs == {
87
+ "result": ops.Output(name="result", type=None, side=ops.Side.RIGHT)
88
+ }
89
+ assert ops.CATALOGS["test"]["color_op"] == complex_op.__op__
lynxkite-core/tests/test_workspace.py ADDED
@@ -0,0 +1,115 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import pytest
3
+ import tempfile
4
+ from lynxkite.core import workspace
5
+ from lynxkite.core import ops
6
+
7
+
8
+ def test_save_load():
9
+ ws = workspace.Workspace(env="test")
10
+ ws.nodes.append(
11
+ workspace.WorkspaceNode(
12
+ id="1",
13
+ type="node_type",
14
+ data=workspace.WorkspaceNodeData(title="Node 1", params={}),
15
+ position=workspace.NodePosition(x=0, y=0),
16
+ )
17
+ )
18
+ ws.nodes.append(
19
+ workspace.WorkspaceNode(
20
+ id="2",
21
+ type="node_type",
22
+ data=workspace.WorkspaceNodeData(title="Node 2", params={}),
23
+ position=workspace.NodePosition(x=0, y=0),
24
+ )
25
+ )
26
+ ws.edges.append(
27
+ workspace.WorkspaceEdge(
28
+ id="edge1",
29
+ source="1",
30
+ target="2",
31
+ sourceHandle="",
32
+ targetHandle="",
33
+ )
34
+ )
35
+ path = os.path.join(tempfile.gettempdir(), "test_workspace.json")
36
+
37
+ try:
38
+ workspace.save(ws, path)
39
+ assert os.path.exists(path)
40
+ loaded_ws = workspace.load(path)
41
+ assert loaded_ws.env == ws.env
42
+ assert len(loaded_ws.nodes) == len(ws.nodes)
43
+ assert len(loaded_ws.edges) == len(ws.edges)
44
+ sorted_ws_nodes = sorted(ws.nodes, key=lambda x: x.id)
45
+ sorted_loaded_ws_nodes = sorted(loaded_ws.nodes, key=lambda x: x.id)
46
+ # We do manual assertion on each attribute because metadata is added at
47
+ # loading time, which makes the objects different.
48
+ for node, loaded_node in zip(sorted_ws_nodes, sorted_loaded_ws_nodes):
49
+ assert node.id == loaded_node.id
50
+ assert node.type == loaded_node.type
51
+ assert node.data.title == loaded_node.data.title
52
+ assert node.data.params == loaded_node.data.params
53
+ assert node.position.x == loaded_node.position.x
54
+ assert node.position.y == loaded_node.position.y
55
+ sorted_ws_edges = sorted(ws.edges, key=lambda x: x.id)
56
+ sorted_loaded_ws_edges = sorted(loaded_ws.edges, key=lambda x: x.id)
57
+ for edge, loaded_edge in zip(sorted_ws_edges, sorted_loaded_ws_edges):
58
+ assert edge.id == loaded_edge.id
59
+ assert edge.source == loaded_edge.source
60
+ assert edge.target == loaded_edge.target
61
+ assert edge.sourceHandle == loaded_edge.sourceHandle
62
+ assert edge.targetHandle == loaded_edge.targetHandle
63
+ finally:
64
+ os.remove(path)
65
+
66
+
67
+ @pytest.fixture(scope="session", autouse=True)
68
+ def populate_ops_catalog():
69
+ ops.register_passive_op(
70
+ env="test",
71
+ name="Test Operation",
72
+ inputs=[],
73
+ params=[
74
+ ops.Parameter(name="param_int", default=1),
75
+ ops.Parameter(name="param_str", default="test"),
76
+ ],
77
+ )
78
+
79
+
80
+ def test_update_metadata():
81
+ ws = workspace.Workspace(env="test")
82
+ ws.nodes.append(
83
+ workspace.WorkspaceNode(
84
+ id="1",
85
+ type="basic",
86
+ data=workspace.WorkspaceNodeData(
87
+ title="Test Operation", params={"param_int": 1}
88
+ ),
89
+ position=workspace.NodePosition(x=0, y=0),
90
+ )
91
+ )
92
+ ws.nodes.append(
93
+ workspace.WorkspaceNode(
94
+ id="2",
95
+ type="basic",
96
+ data=workspace.WorkspaceNodeData(title="Unknown Operation", params={}),
97
+ position=workspace.NodePosition(x=0, y=0),
98
+ )
99
+ )
100
+ updated_ws = workspace._update_metadata(ws)
101
+ assert updated_ws.nodes[0].data.meta.name == "Test Operation"
102
+ assert updated_ws.nodes[0].data.error is None
103
+ assert updated_ws.nodes[0].data.params == {"param_int": 1}
104
+ assert updated_ws.nodes[0].data.meta.params == {
105
+ "param_int": ops.Parameter(name="param_int", default=1),
106
+ "param_str": ops.Parameter(name="param_str", default="test"),
107
+ }
108
+ assert not hasattr(updated_ws.nodes[1].data, "meta")
109
+ assert updated_ws.nodes[1].data.error == "Unknown operation."
110
+
111
+
112
+ def test_update_metadata_with_empty_workspace():
113
+ ws = workspace.Workspace(env="test")
114
+ updated_ws = workspace._update_metadata(ws)
115
+ assert len(updated_ws.nodes) == 0
lynxkite-graph-analytics/src/lynxkite_plugins/graph_analytics/lynxkite_ops.py CHANGED
@@ -1,7 +1,7 @@
1
  """Graph analytics operations. To be split into separate files when we have more."""
2
 
3
  import os
4
- from lynxkite.core import ops
5
  from collections import deque
6
  import dataclasses
7
  import functools
@@ -34,6 +34,7 @@ class RelationDefinition:
34
  target_key: str # The column in the target table that contains the node ID.
35
 
36
 
 
37
  @dataclasses.dataclass
38
  class Bundle:
39
  """A collection of DataFrames and other data.
@@ -115,32 +116,50 @@ def disambiguate_edges(ws):
115
 
116
 
117
  @ops.register_executor(ENV)
118
- async def execute(ws):
119
- catalog = ops.CATALOGS[ENV]
120
  disambiguate_edges(ws)
121
- outputs = {}
122
  failed = 0
123
- while len(outputs) + failed < len(ws.nodes):
124
  for node in ws.nodes:
125
- if node.id in outputs:
126
  continue
127
  # TODO: Take the input/output handles into account.
128
- inputs = [edge.source for edge in ws.edges if edge.target == node.id]
129
- if all(input in outputs for input in inputs):
130
- inputs = [outputs[input] for input in inputs]
 
 
 
 
 
131
  data = node.data
132
- op = catalog[data.title]
133
  params = {**data.params}
134
- # Convert inputs.
135
  try:
136
- for i, (x, p) in enumerate(zip(inputs, op.inputs.values())):
137
- if p.type == nx.Graph and isinstance(x, Bundle):
138
- inputs[i] = x.to_nx()
139
- elif p.type == Bundle and isinstance(x, nx.Graph):
140
- inputs[i] = Bundle.from_nx(x)
141
- elif p.type == Bundle and isinstance(x, pd.DataFrame):
142
- inputs[i] = Bundle.from_df(x)
143
- output = op(*inputs, **params)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
  except Exception as e:
145
  traceback.print_exc()
146
  data.error = str(e)
@@ -148,13 +167,16 @@ async def execute(ws):
148
  continue
149
  if len(op.inputs) == 1 and op.inputs.get("multi") == "*":
150
  # It's a flexible input. Create n+1 handles.
151
- data.inputs = {f"input{i}": None for i in range(len(inputs) + 1)}
 
 
 
152
  data.error = None
153
- outputs[node.id] = output
154
  if (
155
- op.type == "visualization"
156
- or op.type == "table_view"
157
- or op.type == "image"
158
  ):
159
  data.display = output
160
 
@@ -188,6 +210,7 @@ def create_scale_free_graph(*, nodes: int = 10):
188
  @op("Compute PageRank")
189
  @nx_node_attribute_func("pagerank")
190
  def compute_pagerank(graph: nx.Graph, *, damping=0.85, iterations=100):
 
191
  return nx.pagerank(graph, alpha=damping, max_iter=iterations)
192
 
193
 
@@ -281,7 +304,7 @@ def _map_color(value):
281
  ]
282
 
283
 
284
- @op("Visualize graph", view="visualization")
285
  def visualize_graph(graph: Bundle, *, color_nodes_by: ops.NodeAttribute = None):
286
  nodes = graph.dfs["nodes"].copy()
287
  if color_nodes_by:
@@ -335,7 +358,7 @@ def collect(df: pd.DataFrame):
335
  return df.values.tolist()
336
 
337
 
338
- @op("View tables", view="table_view")
339
  def view_tables(bundle: Bundle):
340
  v = {
341
  "dataframes": {
 
1
  """Graph analytics operations. To be split into separate files when we have more."""
2
 
3
  import os
4
+ from lynxkite.core import ops, workspace
5
  from collections import deque
6
  import dataclasses
7
  import functools
 
34
  target_key: str # The column in the target table that contains the node ID.
35
 
36
 
37
+ # TODO: Convert this to Pydantic
38
  @dataclasses.dataclass
39
  class Bundle:
40
  """A collection of DataFrames and other data.
 
116
 
117
 
118
  @ops.register_executor(ENV)
119
+ async def execute(ws: workspace.Workspace):
120
+ catalog: dict[str, ops.Op] = ops.CATALOGS[ENV]
121
  disambiguate_edges(ws)
122
+ computed_outputs = {}
123
  failed = 0
124
+ while len(computed_outputs) + failed < len(ws.nodes):
125
  for node in ws.nodes:
126
+ if node.id in computed_outputs:
127
  continue
128
  # TODO: Take the input/output handles into account.
129
+ operation_inputs = [
130
+ edge.source for edge in ws.edges if edge.target == node.id
131
+ ]
132
+ if all(input in computed_outputs for input in operation_inputs):
133
+ # All inputs for this node are ready, we can compute the output.
134
+ operation_inputs = [
135
+ computed_outputs[input] for input in operation_inputs
136
+ ]
137
  data = node.data
 
138
  params = {**data.params}
 
139
  try:
140
+ op = catalog[data.title]
141
+ # Convert inputs types to match operation signature.
142
+ for i, (input_value, input_signature) in enumerate(
143
+ zip(operation_inputs, op.inputs.values())
144
+ ):
145
+ if input_signature.type == nx.Graph and isinstance(
146
+ input_value, Bundle
147
+ ):
148
+ operation_inputs[i] = input_value.to_nx()
149
+ elif input_signature.type == Bundle and isinstance(
150
+ input_value, nx.Graph
151
+ ):
152
+ operation_inputs[i] = Bundle.from_nx(input_value)
153
+ elif input_signature.type == Bundle and isinstance(
154
+ input_value, pd.DataFrame
155
+ ):
156
+ operation_inputs[i] = Bundle.from_df(input_value)
157
+ output = op(*operation_inputs, **params)
158
+ except KeyError:
159
+ traceback.print_exc()
160
+ data.error = "Operation not found in catalog"
161
+ failed += 1
162
+ continue
163
  except Exception as e:
164
  traceback.print_exc()
165
  data.error = str(e)
 
167
  continue
168
  if len(op.inputs) == 1 and op.inputs.get("multi") == "*":
169
  # It's a flexible input. Create n+1 handles.
170
+ # TODO: How is this used? Why we define the inputs in the WorkspaceNodeData?
171
+ data.inputs = {
172
+ f"input{i}": None for i in range(len(operation_inputs) + 1)
173
+ }
174
  data.error = None
175
+ computed_outputs[node.id] = output
176
  if (
177
+ op.view_type == ops.ViewType.VISUALIZATION
178
+ or op.view_type == ops.ViewType.TABLE_VIEW
179
+ or op.view_type == ops.ViewType.IMAGE
180
  ):
181
  data.display = output
182
 
 
210
  @op("Compute PageRank")
211
  @nx_node_attribute_func("pagerank")
212
  def compute_pagerank(graph: nx.Graph, *, damping=0.85, iterations=100):
213
+ # TODO: This requires scipy to be installed.
214
  return nx.pagerank(graph, alpha=damping, max_iter=iterations)
215
 
216
 
 
304
  ]
305
 
306
 
307
+ @op("Visualize graph", view=ops.ViewType.VISUALIZATION)
308
  def visualize_graph(graph: Bundle, *, color_nodes_by: ops.NodeAttribute = None):
309
  nodes = graph.dfs["nodes"].copy()
310
  if color_nodes_by:
 
358
  return df.values.tolist()
359
 
360
 
361
+ @op("View tables", view=ops.ViewType.TABLE_VIEW)
362
  def view_tables(bundle: Bundle):
363
  v = {
364
  "dataframes": {
lynxkite-graph-analytics/src/lynxkite_plugins/graph_analytics/networkx_ops.py CHANGED
@@ -54,7 +54,7 @@ def register_networkx(env: str):
54
  params=params,
55
  inputs=inputs,
56
  outputs={"output": ops.Output(name="output", type=nx.Graph)},
57
- type="basic",
58
  )
59
  cat[name] = op
60
 
 
54
  params=params,
55
  inputs=inputs,
56
  outputs={"output": ops.Output(name="output", type=nx.Graph)},
57
+ view_type=ops.ViewType.BASIC,
58
  )
59
  cat[name] = op
60
 
lynxkite-graph-analytics/src/lynxkite_plugins/graph_analytics/pytorch_model_ops.py CHANGED
@@ -13,10 +13,10 @@ def reg(name, inputs=[], outputs=None, params=[]):
13
  ENV,
14
  name,
15
  inputs=[
16
- ops.Input(name=name, position="bottom", type="tensor") for name in inputs
17
  ],
18
  outputs=[
19
- ops.Output(name=name, position="top", type="tensor") for name in outputs
20
  ],
21
  params=params,
22
  )
@@ -64,6 +64,4 @@ reg(
64
  ),
65
  P.basic("lr", 0.001),
66
  ],
67
- )
68
-
69
- ops.register_area(ENV, "Repeat", params=[ops.Parameter.basic("times", 1, int)])
 
13
  ENV,
14
  name,
15
  inputs=[
16
+ ops.Input(name=name, side=ops.Side.BOTTOM, type="tensor") for name in inputs
17
  ],
18
  outputs=[
19
+ ops.Output(name=name, side=ops.Side.TOP, type="tensor") for name in outputs
20
  ],
21
  params=params,
22
  )
 
64
  ),
65
  P.basic("lr", 0.001),
66
  ],
67
+ )
 
 
lynxkite-graph-analytics/tests/test_lynxkite_ops.py ADDED
@@ -0,0 +1,100 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sys
2
+
3
+ # Add the project root to sys.path
4
+ sys.path.insert(0, "/home/chema/work/lynxkite-2024/lynxkite-graph-analytics")
5
+ import pandas as pd
6
+ import pytest
7
+ import networkx as nx
8
+
9
+ from lynxkite.core import workspace
10
+
11
+ # from lynxkite_plugins.graph_analytics.lynxkite_ops import execute
12
+ from src.lynxkite_plugins.graph_analytics.lynxkite_ops import Bundle, execute, op
13
+
14
+
15
+ async def test_execute_operation_not_in_catalog():
16
+ ws = workspace.Workspace(env="test")
17
+ ws.nodes.append(
18
+ workspace.WorkspaceNode(
19
+ id="1",
20
+ type="node_type",
21
+ data=workspace.WorkspaceNodeData(title="Non existing op", params={}),
22
+ position=workspace.NodePosition(x=0, y=0),
23
+ )
24
+ )
25
+ await execute(ws)
26
+ assert ws.nodes[0].data.error == "Operation not found in catalog"
27
+
28
+
29
+ async def test_execute_operation_inputs_correct_cast():
30
+ # Test that the automatic casting of operation inputs works correctly.
31
+
32
+ @op("Create Bundle")
33
+ def create_bundle() -> Bundle:
34
+ df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]})
35
+ return Bundle(dfs={"edges": df})
36
+
37
+ @op("Bundle to Graph")
38
+ def bundle_to_graph(graph: nx.Graph) -> nx.Graph:
39
+ return graph
40
+
41
+ @op("Graph to Bundle")
42
+ def graph_to_bundle(bundle: Bundle) -> pd.DataFrame:
43
+ return list(bundle.dfs.values())[0]
44
+
45
+ @op("Dataframe to Bundle")
46
+ def dataframe_to_bundle(bundle: Bundle) -> Bundle:
47
+ return bundle
48
+
49
+ ws = workspace.Workspace(env="test")
50
+ ws.nodes.append(
51
+ workspace.WorkspaceNode(
52
+ id="1",
53
+ type="node_type",
54
+ data=workspace.WorkspaceNodeData(title="Create Bundle", params={}),
55
+ position=workspace.NodePosition(x=0, y=0),
56
+ )
57
+ )
58
+ ws.nodes.append(
59
+ workspace.WorkspaceNode(
60
+ id="2",
61
+ type="node_type",
62
+ data=workspace.WorkspaceNodeData(title="Bundle to Graph", params={}),
63
+ position=workspace.NodePosition(x=100, y=0),
64
+ )
65
+ )
66
+ ws.nodes.append(
67
+ workspace.WorkspaceNode(
68
+ id="3",
69
+ type="node_type",
70
+ data=workspace.WorkspaceNodeData(title="Graph to Bundle", params={}),
71
+ position=workspace.NodePosition(x=200, y=0),
72
+ )
73
+ )
74
+ ws.nodes.append(
75
+ workspace.WorkspaceNode(
76
+ id="4",
77
+ type="node_type",
78
+ data=workspace.WorkspaceNodeData(title="Dataframe to Bundle", params={}),
79
+ position=workspace.NodePosition(x=300, y=0),
80
+ )
81
+ )
82
+ ws.edges = [
83
+ workspace.WorkspaceEdge(
84
+ id="1", source="1", target="2", sourceHandle="1", targetHandle="2"
85
+ ),
86
+ workspace.WorkspaceEdge(
87
+ id="2", source="2", target="3", sourceHandle="2", targetHandle="3"
88
+ ),
89
+ workspace.WorkspaceEdge(
90
+ id="3", source="3", target="4", sourceHandle="3", targetHandle="4"
91
+ ),
92
+ ]
93
+
94
+ await execute(ws)
95
+
96
+ assert all([node.data.error is None for node in ws.nodes])
97
+
98
+
99
+ if __name__ == "__main__":
100
+ pytest.main()
lynxkite-lynxscribe/src/lynxkite_plugins/lynxscribe/llm_ops.py CHANGED
@@ -68,13 +68,13 @@ def split_document(input, *, delimiter: str = "\\n\\n"):
68
  return pd.DataFrame(chunks, columns=["text"])
69
 
70
 
71
- @ops.input_position(input="top")
72
  @op("Build document graph")
73
  def build_document_graph(input):
74
  return [{"source": i, "target": i + 1} for i in range(len(input) - 1)]
75
 
76
 
77
- @ops.input_position(nodes="top", edges="top")
78
  @op("Predict links")
79
  def predict_links(nodes, edges):
80
  """A placeholder for a real algorithm. For now just adds 2-hop neighbors."""
@@ -89,7 +89,7 @@ def predict_links(nodes, edges):
89
  return edges + new_edges
90
 
91
 
92
- @ops.input_position(nodes="top", edges="top")
93
  @op("Add neighbors")
94
  def add_neighbors(nodes, edges, item):
95
  nodes = pd.DataFrame(nodes)
@@ -133,7 +133,7 @@ def ask_llm(input, *, model: str, accepted_regex: str = None, max_tokens: int =
133
  return [{**input, "response": r} for r in results]
134
 
135
 
136
- @op("View", view="table_view")
137
  def view(input, *, _ctx: one_by_one.Context):
138
  v = _ctx.last_result
139
  if v:
@@ -152,8 +152,8 @@ def view(input, *, _ctx: one_by_one.Context):
152
  return v
153
 
154
 
155
- @ops.input_position(input="right")
156
- @ops.output_position(output="left")
157
  @op("Loop")
158
  def loop(input, *, max_iterations: int = 3, _ctx: one_by_one.Context):
159
  """Data can flow back here max_iterations-1 times."""
@@ -174,7 +174,7 @@ class RagEngine(enum.Enum):
174
  Custom = "Custom"
175
 
176
 
177
- @ops.input_position(db="top")
178
  @op("RAG")
179
  def rag(
180
  input,
 
68
  return pd.DataFrame(chunks, columns=["text"])
69
 
70
 
71
+ @ops.input_side(input=ops.Side.TOP)
72
  @op("Build document graph")
73
  def build_document_graph(input):
74
  return [{"source": i, "target": i + 1} for i in range(len(input) - 1)]
75
 
76
 
77
+ @ops.input_side(nodes=ops.Side.TOP, edges=ops.Side.TOP)
78
  @op("Predict links")
79
  def predict_links(nodes, edges):
80
  """A placeholder for a real algorithm. For now just adds 2-hop neighbors."""
 
89
  return edges + new_edges
90
 
91
 
92
+ @ops.input_side(nodes=ops.Side.TOP, edges=ops.Side.TOP)
93
  @op("Add neighbors")
94
  def add_neighbors(nodes, edges, item):
95
  nodes = pd.DataFrame(nodes)
 
133
  return [{**input, "response": r} for r in results]
134
 
135
 
136
+ @op("View", view=ops.ViewType.TABLE_VIEW)
137
  def view(input, *, _ctx: one_by_one.Context):
138
  v = _ctx.last_result
139
  if v:
 
152
  return v
153
 
154
 
155
+ @ops.input_side(input=ops.Side.RIGHT)
156
+ @ops.output_side(output=ops.Side.LEFT)
157
  @op("Loop")
158
  def loop(input, *, max_iterations: int = 3, _ctx: one_by_one.Context):
159
  """Data can flow back here max_iterations-1 times."""
 
174
  Custom = "Custom"
175
 
176
 
177
+ @ops.input_side(db=ops.Side.TOP)
178
  @op("RAG")
179
  def rag(
180
  input,
lynxkite-lynxscribe/src/lynxkite_plugins/lynxscribe/lynxscribe_ops.py CHANGED
@@ -24,7 +24,7 @@ from lynxkite.core.executors import one_by_one
24
  ENV = "LynxScribe"
25
  one_by_one.register(ENV)
26
  op = ops.op_registration(ENV)
27
- output_on_top = ops.output_position(output="top")
28
 
29
 
30
  @output_on_top
@@ -42,7 +42,7 @@ def llm(*, name="openai"):
42
 
43
 
44
  @output_on_top
45
- @ops.input_position(llm="bottom")
46
  @op("Text embedder")
47
  def text_embedder(llm, *, model="text-embedding-ada-002"):
48
  llm = llm[0]["llm"]
@@ -51,7 +51,7 @@ def text_embedder(llm, *, model="text-embedding-ada-002"):
51
 
52
 
53
  @output_on_top
54
- @ops.input_position(vector_store="bottom", text_embedder="bottom")
55
  @op("RAG graph")
56
  def rag_graph(vector_store, text_embedder):
57
  vector_store = vector_store[0]["vector_store"]
@@ -78,7 +78,7 @@ DEFAULT_NEGATIVE_ANSWER = "I'm sorry, but the data I've been trained on does not
78
 
79
 
80
  @output_on_top
81
- @ops.input_position(rag_graph="bottom", scenario_selector="bottom", llm="bottom")
82
  @op("RAG chatbot")
83
  def rag_chatbot(
84
  rag_graph,
@@ -107,7 +107,7 @@ def rag_chatbot(
107
 
108
 
109
  @output_on_top
110
- @ops.input_position(processor="bottom")
111
  @op("Chat processor")
112
  def chat_processor(processor, *, _ctx: one_by_one.Context):
113
  cfg = _ctx.last_result or {
@@ -152,7 +152,7 @@ def mask(*, name="", regex="", exceptions="", mask_pattern=""):
152
  }
153
 
154
 
155
- @ops.input_position(chat_api="bottom")
156
  @op("Test Chat API")
157
  async def test_chat_api(message, chat_api, *, show_details=False):
158
  chat_api = chat_api[0]["chat_api"]
@@ -173,7 +173,7 @@ def input_chat(*, chat: str):
173
 
174
 
175
  @output_on_top
176
- @ops.input_position(chatbot="bottom", chat_processor="bottom", knowledge_base="bottom")
177
  @op("Chat API")
178
  def chat_api(chatbot, chat_processor, knowledge_base, *, model="gpt-4o-mini"):
179
  chatbot = chatbot[0]["chatbot"]
@@ -205,7 +205,7 @@ def knowledge_base(
205
  }
206
 
207
 
208
- @op("View", view="table_view")
209
  def view(input):
210
  columns = [str(c) for c in input.keys() if not str(c).startswith("_")]
211
  v = {
 
24
  ENV = "LynxScribe"
25
  one_by_one.register(ENV)
26
  op = ops.op_registration(ENV)
27
+ output_on_top = ops.output_side(output=ops.Side.TOP)
28
 
29
 
30
  @output_on_top
 
42
 
43
 
44
  @output_on_top
45
+ @ops.input_side(llm=ops.Side.BOTTOM)
46
  @op("Text embedder")
47
  def text_embedder(llm, *, model="text-embedding-ada-002"):
48
  llm = llm[0]["llm"]
 
51
 
52
 
53
  @output_on_top
54
+ @ops.input_side(vector_store=ops.Side.BOTTOM, text_embedder=ops.Side.BOTTOM)
55
  @op("RAG graph")
56
  def rag_graph(vector_store, text_embedder):
57
  vector_store = vector_store[0]["vector_store"]
 
78
 
79
 
80
  @output_on_top
81
+ @ops.input_side(rag_graph=ops.Side.BOTTOM, scenario_selector=ops.Side.BOTTOM, llm=ops.Side.BOTTOM)
82
  @op("RAG chatbot")
83
  def rag_chatbot(
84
  rag_graph,
 
107
 
108
 
109
  @output_on_top
110
+ @ops.input_side(processor=ops.Side.BOTTOM)
111
  @op("Chat processor")
112
  def chat_processor(processor, *, _ctx: one_by_one.Context):
113
  cfg = _ctx.last_result or {
 
152
  }
153
 
154
 
155
+ @ops.input_side(chat_api=ops.Side.BOTTOM)
156
  @op("Test Chat API")
157
  async def test_chat_api(message, chat_api, *, show_details=False):
158
  chat_api = chat_api[0]["chat_api"]
 
173
 
174
 
175
  @output_on_top
176
+ @ops.input_side(chatbot=ops.Side.BOTTOM, chat_processor=ops.Side.BOTTOM, knowledge_base=ops.Side.BOTTOM)
177
  @op("Chat API")
178
  def chat_api(chatbot, chat_processor, knowledge_base, *, model="gpt-4o-mini"):
179
  chatbot = chatbot[0]["chatbot"]
 
205
  }
206
 
207
 
208
+ @op("View", view=ops.ViewType.TABLE_VIEW)
209
  def view(input):
210
  columns = [str(c) for c in input.keys() if not str(c).startswith("_")]
211
  v = {
lynxkite-lynxscribe/tests/test_llm_ops.py CHANGED
@@ -8,7 +8,7 @@ def make_node(id, op, type="basic", **params):
8
  return workspace.WorkspaceNode(
9
  id=id,
10
  type=type,
11
- position=workspace.Position(x=0, y=0),
12
  data=workspace.WorkspaceNodeData(title=op, params=params),
13
  )
14
 
@@ -43,7 +43,7 @@ class LLMOpsTest(unittest.IsolatedAsyncioTestCase):
43
  filename="/Users/danieldarabos/Downloads/aimo-train.csv",
44
  key="problem",
45
  ),
46
- make_node("1", "View", type="table_view"),
47
  ],
48
  edges=[make_edge("0", "1")],
49
  )
 
8
  return workspace.WorkspaceNode(
9
  id=id,
10
  type=type,
11
+ position=workspace.NodePosition(x=0, y=0),
12
  data=workspace.WorkspaceNodeData(title=op, params=params),
13
  )
14
 
 
43
  filename="/Users/danieldarabos/Downloads/aimo-train.csv",
44
  key="problem",
45
  ),
46
+ make_node("1", "View", type=ops.ViewType.TABLE_VIEW),
47
  ],
48
  edges=[make_edge("0", "1")],
49
  )
lynxkite-pillow-example/src/lynxkite_plugins/pillow_example/__init__.py CHANGED
@@ -56,7 +56,7 @@ def to_grayscale(image: Image):
56
  return image.convert("L")
57
 
58
 
59
- @op("View image", view="image")
60
  def view_image(image: Image):
61
  buffered = io.BytesIO()
62
  image.save(buffered, format="JPEG")
 
56
  return image.convert("L")
57
 
58
 
59
+ @op("View image", view=ops.ViewType.IMAGE)
60
  def view_image(image: Image):
61
  buffered = io.BytesIO()
62
  image.save(buffered, format="JPEG")