Chema JMLizano darabos commited on
Commit
2601533
·
unverified ·
1 Parent(s): 0087ef0

Adding backend tests (#58)

Browse files

* Added tests for lynxkite core

* Added tests for lynxkite-app

* Added tests for lynxkite-graph-analytics

* Better error message when workspace path is outside data dir

* Remove register_area operation, it is not used anymore

* Add some types & docstrings

* Update README

---------

Co-authored-by: JMLizano <[email protected]>
Co-authored-by: Daniel Darabos <[email protected]>

README.md CHANGED
@@ -23,7 +23,8 @@ Install everything like this:
23
  ```bash
24
  uv venv
25
  source .venv/bin/activate
26
- uv pip install -e lynxkite-core/ lynxkite-app/ lynxkite-graph-analytics/ lynxkite-lynxscribe/ lynxkite-pillow-example/
 
27
  ```
28
 
29
  This also builds the frontend, hopefully very quickly. To run it:
@@ -40,6 +41,16 @@ cd lynxkite-app/web
40
  npm run dev
41
  ```
42
 
 
 
 
 
 
 
 
 
 
 
43
  ## Documentation
44
 
45
  To work on the documentation:
 
23
  ```bash
24
  uv venv
25
  source .venv/bin/activate
26
+ # The [dev] tag is only needed if you intend on running tests
27
+ uv pip install -e lynxkite-core/[dev] lynxkite-app/[dev] lynxkite-graph-analytics/[dev] lynxkite-lynxscribe/ lynxkite-pillow-example/
28
  ```
29
 
30
  This also builds the frontend, hopefully very quickly. To run it:
 
41
  npm run dev
42
  ```
43
 
44
+ ## Executing tests
45
+
46
+ Just go into each directory and execute `pytest`.
47
+
48
+ ```bash
49
+ # Same thing for lynxkite-core and lynxkite-graph-analytics
50
+ $ cd lynxkite-app
51
+ $ pytest
52
+ ```
53
+
54
  ## Documentation
55
 
56
  To work on the documentation:
lynxkite-app/pyproject.toml CHANGED
@@ -13,6 +13,11 @@ dependencies = [
13
  "sse-starlette>=2.2.1",
14
  ]
15
 
 
 
 
 
 
16
  [tool.uv.sources]
17
  lynxkite-core = { path = "../lynxkite-core" }
18
 
 
13
  "sse-starlette>=2.2.1",
14
  ]
15
 
16
+ [project.optional-dependencies]
17
+ dev = [
18
+ "pytest",
19
+ ]
20
+
21
  [tool.uv.sources]
22
  lynxkite-core = { path = "../lynxkite-core" }
23
 
lynxkite-app/src/lynxkite/app/crdt.py CHANGED
@@ -29,7 +29,11 @@ def ws_exception_handler(exception, log):
29
 
30
 
31
  class WebsocketServer(pycrdt_websocket.WebsocketServer):
32
- async def init_room(self, name):
 
 
 
 
33
  path = CRDT_PATH / f"{name}.crdt"
34
  assert path.is_relative_to(CRDT_PATH)
35
  ystore = pycrdt_websocket.ystore.FileYStore(path)
@@ -49,6 +53,8 @@ class WebsocketServer(pycrdt_websocket.WebsocketServer):
49
  ws["edges"] = pycrdt.Array()
50
  if "env" not in ws:
51
  ws["env"] = "unset"
 
 
52
  try_to_load_workspace(ws, name)
53
  room = pycrdt_websocket.YRoom(
54
  ystore=ystore, ydoc=ydoc, exception_handler=ws_exception_handler
@@ -62,6 +68,12 @@ class WebsocketServer(pycrdt_websocket.WebsocketServer):
62
  return room
63
 
64
  async def get_room(self, name: str) -> pycrdt_websocket.YRoom:
 
 
 
 
 
 
65
  if name not in self.rooms:
66
  self.rooms[name] = await self.init_room(name)
67
  room = self.rooms[name]
@@ -83,21 +95,43 @@ def clean_input(ws_pyd):
83
  delattr(node, key)
84
 
85
 
86
- def crdt_update(crdt_obj, python_obj, boxes=set()):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
  if isinstance(python_obj, dict):
88
  for key, value in python_obj.items():
89
- if key in boxes:
90
  crdt_obj[key] = value
91
  elif isinstance(value, dict):
92
  if crdt_obj.get(key) is None:
93
  crdt_obj[key] = pycrdt.Map()
94
- crdt_update(crdt_obj[key], value, boxes)
95
  elif isinstance(value, list):
96
  if crdt_obj.get(key) is None:
97
  crdt_obj[key] = pycrdt.Array()
98
- crdt_update(crdt_obj[key], value, boxes)
99
  elif isinstance(value, enum.Enum):
100
- crdt_obj[key] = str(value)
101
  else:
102
  crdt_obj[key] = value
103
  elif isinstance(python_obj, list):
@@ -105,12 +139,14 @@ def crdt_update(crdt_obj, python_obj, boxes=set()):
105
  if isinstance(value, dict):
106
  if i >= len(crdt_obj):
107
  crdt_obj.append(pycrdt.Map())
108
- crdt_update(crdt_obj[i], value, boxes)
109
  elif isinstance(value, list):
110
  if i >= len(crdt_obj):
111
  crdt_obj.append(pycrdt.Array())
112
- crdt_update(crdt_obj[i], value, boxes)
113
  else:
 
 
114
  if i >= len(crdt_obj):
115
  crdt_obj.append(value)
116
  else:
@@ -119,18 +155,34 @@ def crdt_update(crdt_obj, python_obj, boxes=set()):
119
  raise ValueError("Invalid type:", python_obj)
120
 
121
 
122
- def try_to_load_workspace(ws, name):
 
 
 
 
 
 
123
  json_path = f"data/{name}"
124
  if os.path.exists(json_path):
125
  ws_pyd = workspace.load(json_path)
126
- crdt_update(ws, ws_pyd.model_dump(), boxes={"display"})
 
 
127
 
128
 
129
  last_known_versions = {}
130
  delayed_executions = {}
131
 
132
 
133
- async def workspace_changed(name, changes, ws_crdt):
 
 
 
 
 
 
 
 
134
  ws_pyd = workspace.Workspace.model_validate(ws_crdt.to_py())
135
  # Do not trigger execution for superficial changes.
136
  # This is a quick solution until we build proper caching.
@@ -154,22 +206,35 @@ async def workspace_changed(name, changes, ws_crdt):
154
  await execute(name, ws_crdt, ws_pyd)
155
 
156
 
157
- async def execute(name, ws_crdt, ws_pyd, delay=0):
 
 
 
 
 
 
 
 
 
 
158
  if delay:
159
  try:
160
  await asyncio.sleep(delay)
161
  except asyncio.CancelledError:
162
  return
163
  path = DATA_PATH / name
164
- assert path.is_relative_to(DATA_PATH)
 
165
  workspace.save(ws_pyd, path)
166
  await workspace.execute(ws_pyd)
167
  workspace.save(ws_pyd, path)
 
 
168
  with ws_crdt.doc.transaction():
169
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
170
  if "data" not in nc:
171
  nc["data"] = pycrdt.Map()
172
- # Display is added as an opaque Box.
173
  nc["data"]["display"] = np.data.display
174
  nc["data"]["error"] = np.data.error
175
 
 
29
 
30
 
31
  class WebsocketServer(pycrdt_websocket.WebsocketServer):
32
+ async def init_room(self, name: str) -> pycrdt_websocket.YRoom:
33
+ """Initialize a room for the workspace with the given name.
34
+
35
+ The workspace is loaded from "crdt_data" if it exists there, or from "data", or a new workspace is created.
36
+ """
37
  path = CRDT_PATH / f"{name}.crdt"
38
  assert path.is_relative_to(CRDT_PATH)
39
  ystore = pycrdt_websocket.ystore.FileYStore(path)
 
53
  ws["edges"] = pycrdt.Array()
54
  if "env" not in ws:
55
  ws["env"] = "unset"
56
+ # We have two possible sources of truth for the workspaces, the YStore and the JSON files.
57
+ # In case we didn't find the workspace in the YStore, we try to load it from the JSON files.
58
  try_to_load_workspace(ws, name)
59
  room = pycrdt_websocket.YRoom(
60
  ystore=ystore, ydoc=ydoc, exception_handler=ws_exception_handler
 
68
  return room
69
 
70
  async def get_room(self, name: str) -> pycrdt_websocket.YRoom:
71
+ """Get a room by name.
72
+
73
+ This method overrides the parent get_room method. The original creates an empty room,
74
+ with no associated Ydoc. Instead, we want to initialize the the room with a Workspace
75
+ object.
76
+ """
77
  if name not in self.rooms:
78
  self.rooms[name] = await self.init_room(name)
79
  room = self.rooms[name]
 
95
  delattr(node, key)
96
 
97
 
98
+ def crdt_update(
99
+ crdt_obj: pycrdt.Map | pycrdt.Array,
100
+ python_obj: dict | list,
101
+ non_collaborative_fields: set[str] = set(),
102
+ ):
103
+ """Update a CRDT object to match a Python object.
104
+
105
+ The types between the CRDT object and the Python object must match. If the Python object
106
+ is a dict, the CRDT object must be a Map. If the Python object is a list, the CRDT object
107
+ must be an Array.
108
+
109
+ Args:
110
+ crdt_obj: The CRDT object, that will be updated to match the Python object.
111
+ python_obj: The Python object to update with.
112
+ non_collaborative_fields: List of fields to treat as a black box. Black boxes are
113
+ updated as a whole, instead of having a fine-grained data structure to edit
114
+ collaboratively. Useful for complex fields that contain auto-generated data or
115
+ metadata.
116
+ The default is an empty set.
117
+
118
+ Raises:
119
+ ValueError: If the Python object provided is not a dict or list.
120
+ """
121
  if isinstance(python_obj, dict):
122
  for key, value in python_obj.items():
123
+ if key in non_collaborative_fields:
124
  crdt_obj[key] = value
125
  elif isinstance(value, dict):
126
  if crdt_obj.get(key) is None:
127
  crdt_obj[key] = pycrdt.Map()
128
+ crdt_update(crdt_obj[key], value, non_collaborative_fields)
129
  elif isinstance(value, list):
130
  if crdt_obj.get(key) is None:
131
  crdt_obj[key] = pycrdt.Array()
132
+ crdt_update(crdt_obj[key], value, non_collaborative_fields)
133
  elif isinstance(value, enum.Enum):
134
+ crdt_obj[key] = str(value.value)
135
  else:
136
  crdt_obj[key] = value
137
  elif isinstance(python_obj, list):
 
139
  if isinstance(value, dict):
140
  if i >= len(crdt_obj):
141
  crdt_obj.append(pycrdt.Map())
142
+ crdt_update(crdt_obj[i], value, non_collaborative_fields)
143
  elif isinstance(value, list):
144
  if i >= len(crdt_obj):
145
  crdt_obj.append(pycrdt.Array())
146
+ crdt_update(crdt_obj[i], value, non_collaborative_fields)
147
  else:
148
+ if isinstance(value, enum.Enum):
149
+ value = str(value.value)
150
  if i >= len(crdt_obj):
151
  crdt_obj.append(value)
152
  else:
 
155
  raise ValueError("Invalid type:", python_obj)
156
 
157
 
158
+ def try_to_load_workspace(ws: pycrdt.Map, name: str):
159
+ """Load the workspace `name`, if it exists, and update the `ws` CRDT object to match its contents.
160
+
161
+ Args:
162
+ ws: CRDT object to udpate with the workspace contents.
163
+ name: Name of the workspace to load.
164
+ """
165
  json_path = f"data/{name}"
166
  if os.path.exists(json_path):
167
  ws_pyd = workspace.load(json_path)
168
+ # We treat the display field as a black box, since it is a large
169
+ # dictionary that is meant to change as a whole.
170
+ crdt_update(ws, ws_pyd.model_dump(), non_collaborative_fields={"display"})
171
 
172
 
173
  last_known_versions = {}
174
  delayed_executions = {}
175
 
176
 
177
+ async def workspace_changed(name: str, changes: pycrdt.MapEvent, ws_crdt: pycrdt.Map):
178
+ """Callback to react to changes in the workspace.
179
+
180
+
181
+ Args:
182
+ name: Name of the workspace.
183
+ changes: Changes performed to the workspace.
184
+ ws_crdt: CRDT object representing the workspace.
185
+ """
186
  ws_pyd = workspace.Workspace.model_validate(ws_crdt.to_py())
187
  # Do not trigger execution for superficial changes.
188
  # This is a quick solution until we build proper caching.
 
206
  await execute(name, ws_crdt, ws_pyd)
207
 
208
 
209
+ async def execute(
210
+ name: str, ws_crdt: pycrdt.Map, ws_pyd: workspace.Workspace, delay: int = 0
211
+ ):
212
+ """Execute the workspace and update the CRDT object with the results.
213
+
214
+ Args:
215
+ name: Name of the workspace.
216
+ ws_crdt: CRDT object representing the workspace.
217
+ ws_pyd: Workspace object to execute.
218
+ delay: Wait time before executing the workspace. The default is 0.
219
+ """
220
  if delay:
221
  try:
222
  await asyncio.sleep(delay)
223
  except asyncio.CancelledError:
224
  return
225
  path = DATA_PATH / name
226
+ assert path.is_relative_to(DATA_PATH),"Provided workspace path is invalid"
227
+ # Save user changes before executing, in case the execution fails.
228
  workspace.save(ws_pyd, path)
229
  await workspace.execute(ws_pyd)
230
  workspace.save(ws_pyd, path)
231
+ # Execution happened on the Python object, we need to replicate
232
+ # the results to the CRDT object.
233
  with ws_crdt.doc.transaction():
234
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
235
  if "data" not in nc:
236
  nc["data"] = pycrdt.Map()
237
+ # Display is added as a non collaborative field.
238
  nc["data"]["display"] = np.data.display
239
  nc["data"]["error"] = np.data.error
240
 
lynxkite-app/tests/test_crdt.py ADDED
@@ -0,0 +1,72 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from enum import Enum
2
+ import pycrdt
3
+ import pytest
4
+ from lynxkite.app.crdt import crdt_update
5
+
6
+
7
+ @pytest.fixture
8
+ def empty_dict_workspace():
9
+ ydoc = pycrdt.Doc()
10
+ ydoc["workspace"] = ws = pycrdt.Map()
11
+ yield ws
12
+
13
+
14
+ @pytest.fixture
15
+ def empty_list_workspace():
16
+ ydoc = pycrdt.Doc()
17
+ ydoc["workspace"] = ws = pycrdt.Array()
18
+ yield ws
19
+
20
+
21
+ class MyEnum(Enum):
22
+ VALUE = 1
23
+
24
+
25
+ @pytest.mark.parametrize(
26
+ "python_obj,expected",
27
+ [
28
+ (
29
+ {
30
+ "key1": "value1",
31
+ "key2": {
32
+ "nested_key1": "nested_value1",
33
+ "nested_key2": ["nested_value2"],
34
+ "nested_key3": MyEnum.VALUE,
35
+ },
36
+ },
37
+ {
38
+ "key1": "value1",
39
+ "key2": {
40
+ "nested_key1": "nested_value1",
41
+ "nested_key2": ["nested_value2"],
42
+ "nested_key3": "1",
43
+ },
44
+ },
45
+ )
46
+ ],
47
+ )
48
+ def test_crdt_update_with_dict(empty_dict_workspace, python_obj, expected):
49
+ crdt_update(empty_dict_workspace, python_obj)
50
+ assert empty_dict_workspace.to_py() == expected
51
+
52
+
53
+ @pytest.mark.parametrize(
54
+ "python_obj,expected",
55
+ [
56
+ (
57
+ [
58
+ "value1",
59
+ {"nested_key1": "nested_value1", "nested_key2": ["nested_value2"]},
60
+ MyEnum.VALUE,
61
+ ],
62
+ [
63
+ "value1",
64
+ {"nested_key1": "nested_value1", "nested_key2": ["nested_value2"]},
65
+ "1",
66
+ ],
67
+ ),
68
+ ],
69
+ )
70
+ def test_crdt_update_with_list(empty_list_workspace, python_obj, expected):
71
+ crdt_update(empty_list_workspace, python_obj)
72
+ assert empty_list_workspace.to_py() == expected
lynxkite-app/tests/test_main.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import uuid
2
+ from fastapi.testclient import TestClient
3
+ from lynxkite.app.main import app, detect_plugins, DATA_PATH
4
+ import os
5
+
6
+
7
+ client = TestClient(app)
8
+
9
+
10
+ def test_detect_plugins_with_plugins():
11
+ # This test assumes that these plugins are installed as part of the testing process.
12
+ plugins = detect_plugins()
13
+ assert all(
14
+ plugin in plugins.keys()
15
+ for plugin in [
16
+ "lynxkite_plugins.graph_analytics",
17
+ "lynxkite_plugins.lynxscribe",
18
+ "lynxkite_plugins.pillow_example",
19
+ ]
20
+ )
21
+
22
+
23
+ def test_get_catalog():
24
+ response = client.get("/api/catalog")
25
+ assert response.status_code == 200
26
+
27
+
28
+ def test_save_and_load():
29
+ save_request = {
30
+ "path": "test",
31
+ "ws": {
32
+ "env": "test",
33
+ "nodes": [
34
+ {
35
+ "id": "Node_1",
36
+ "type": "basic",
37
+ "data": {
38
+ "display": None,
39
+ "error": "Unknown operation.",
40
+ "title": "Test node",
41
+ "params": {"param1": "value"},
42
+ },
43
+ "position": {"x": -493.5496596237119, "y": 20.90123252513356},
44
+ }
45
+ ],
46
+ "edges": [],
47
+ },
48
+ }
49
+ response = client.post("/api/save", json=save_request)
50
+ saved_ws = response.json()
51
+ assert response.status_code == 200
52
+ response = client.get("/api/load?path=test")
53
+ assert response.status_code == 200
54
+ assert saved_ws == response.json()
55
+
56
+
57
+ def test_list_dir():
58
+ test_dir = str(uuid.uuid4())
59
+ test_dir_full_path = DATA_PATH / test_dir
60
+ test_dir_full_path.mkdir(exist_ok=True)
61
+ test_file = test_dir_full_path / "test_file.txt"
62
+ test_file.touch()
63
+ response = client.get(f"/api/dir/list?path={str(test_dir)}")
64
+ assert response.status_code == 200
65
+ assert len(response.json()) == 1
66
+ assert response.json()[0]["name"] == f"{test_dir}/test_file.txt"
67
+ assert response.json()[0]["type"] == "workspace"
68
+ test_file.unlink()
69
+ test_dir_full_path.rmdir()
70
+
71
+
72
+ def test_make_dir():
73
+ dir_name = str(uuid.uuid4())
74
+ response = client.post("/api/dir/mkdir", json={"path": dir_name})
75
+ assert response.status_code == 200
76
+ assert os.path.exists(DATA_PATH / dir_name)
77
+ os.rmdir(DATA_PATH / dir_name)
lynxkite-core/pyproject.toml CHANGED
@@ -6,3 +6,8 @@ readme = "README.md"
6
  requires-python = ">=3.11"
7
  dependencies = [
8
  ]
 
 
 
 
 
 
6
  requires-python = ">=3.11"
7
  dependencies = [
8
  ]
9
+
10
+ [project.optional-dependencies]
11
+ dev = [
12
+ "pytest",
13
+ ]
lynxkite-core/src/lynxkite/core/executors/one_by_one.py CHANGED
@@ -51,6 +51,8 @@ def get_stages(ws, catalog):
51
  nodes = {n.id: n for n in ws.nodes}
52
  batch_inputs = {}
53
  inputs = {}
 
 
54
  for edge in ws.edges:
55
  inputs.setdefault(edge.target, []).append(edge.source)
56
  node = nodes[edge.target]
@@ -93,7 +95,7 @@ async def await_if_needed(obj):
93
  return obj
94
 
95
 
96
- async def execute(ws, catalog, cache=None):
97
  nodes = {n.id: n for n in ws.nodes}
98
  contexts = {n.id: Context(node=n) for n in ws.nodes}
99
  edges = {n.id: [] for n in ws.nodes}
 
51
  nodes = {n.id: n for n in ws.nodes}
52
  batch_inputs = {}
53
  inputs = {}
54
+ # For each edge in the workspacce, we record the inputs (sources)
55
+ # required for each node (target).
56
  for edge in ws.edges:
57
  inputs.setdefault(edge.target, []).append(edge.source)
58
  node = nodes[edge.target]
 
95
  return obj
96
 
97
 
98
+ async def execute(ws: workspace.Workspace, catalog, cache=None):
99
  nodes = {n.id: n for n in ws.nodes}
100
  contexts = {n.id: Context(node=n) for n in ws.nodes}
101
  edges = {n.id: [] for n in ws.nodes}
lynxkite-core/src/lynxkite/core/ops.py CHANGED
@@ -64,6 +64,7 @@ class Parameter(BaseConfig):
64
  class Input(BaseConfig):
65
  name: str
66
  type: Type
 
67
  position: str = "left"
68
 
69
 
@@ -90,6 +91,7 @@ class Op(BaseConfig):
90
  params: dict[str, Parameter]
91
  inputs: dict[str, Input]
92
  outputs: dict[str, Output]
 
93
  type: str = "basic" # The UI to use for this operation.
94
 
95
  def __call__(self, *inputs, **params):
@@ -209,16 +211,3 @@ def op_registration(env: str):
209
 
210
  def passive_op_registration(env: str):
211
  return functools.partial(register_passive_op, env)
212
-
213
-
214
- def register_area(env, name, params=[]):
215
- """A node that represents an area. It can contain other nodes, but does not restrict movement in any way."""
216
- op = Op(
217
- func=no_op,
218
- name=name,
219
- params={p.name: p for p in params},
220
- inputs={},
221
- outputs={},
222
- type="area",
223
- )
224
- CATALOGS[env][name] = op
 
64
  class Input(BaseConfig):
65
  name: str
66
  type: Type
67
+ # TODO: Make position an enum with the possible values.
68
  position: str = "left"
69
 
70
 
 
91
  params: dict[str, Parameter]
92
  inputs: dict[str, Input]
93
  outputs: dict[str, Output]
94
+ # TODO: Make type an enum with the possible values.
95
  type: str = "basic" # The UI to use for this operation.
96
 
97
  def __call__(self, *inputs, **params):
 
211
 
212
  def passive_op_registration(env: str):
213
  return functools.partial(register_passive_op, env)
 
 
 
 
 
 
 
 
 
 
 
 
 
lynxkite-core/src/lynxkite/core/workspace.py CHANGED
@@ -29,6 +29,8 @@ class WorkspaceNodeData(BaseConfig):
29
 
30
 
31
  class WorkspaceNode(BaseConfig):
 
 
32
  id: str
33
  type: str
34
  data: WorkspaceNodeData
@@ -44,6 +46,13 @@ class WorkspaceEdge(BaseConfig):
44
 
45
 
46
  class Workspace(BaseConfig):
 
 
 
 
 
 
 
47
  env: str = ""
48
  nodes: list[WorkspaceNode] = dataclasses.field(default_factory=list)
49
  edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)
@@ -55,6 +64,7 @@ async def execute(ws: Workspace):
55
 
56
 
57
  def save(ws: Workspace, path: str):
 
58
  j = ws.model_dump_json(indent=2)
59
  dirname, basename = os.path.split(path)
60
  # Create temp file in the same directory to make sure it's on the same filesystem.
@@ -66,7 +76,17 @@ def save(ws: Workspace, path: str):
66
  os.replace(temp_name, path)
67
 
68
 
69
- def load(path: str):
 
 
 
 
 
 
 
 
 
 
70
  with open(path) as f:
71
  j = f.read()
72
  ws = Workspace.model_validate_json(j)
@@ -75,13 +95,26 @@ def load(path: str):
75
  return ws
76
 
77
 
78
- def _update_metadata(ws):
 
 
 
 
 
 
 
 
 
 
 
 
79
  catalog = ops.CATALOGS.get(ws.env, {})
80
  nodes = {node.id: node for node in ws.nodes}
81
  done = set()
82
  while len(done) < len(nodes):
83
  for node in ws.nodes:
84
  if node.id in done:
 
85
  continue
86
  data = node.data
87
  op = catalog.get(data.title)
 
29
 
30
 
31
  class WorkspaceNode(BaseConfig):
32
+ # The naming of these attributes matches the ones for the NodeBase type in React flow
33
+ # modyfing them will break the frontend.
34
  id: str
35
  type: str
36
  data: WorkspaceNodeData
 
46
 
47
 
48
  class Workspace(BaseConfig):
49
+ """A workspace is a representation of a computational graph that consists of nodes and edges.
50
+
51
+ Each node represents an operation or task, and the edges represent the flow of data between
52
+ the nodes. Each workspace is associated with an environment, which determines the operations
53
+ that can be performed in the workspace and the execution method for the operations.
54
+ """
55
+
56
  env: str = ""
57
  nodes: list[WorkspaceNode] = dataclasses.field(default_factory=list)
58
  edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)
 
64
 
65
 
66
  def save(ws: Workspace, path: str):
67
+ """Persist a workspace to a local file in JSON format."""
68
  j = ws.model_dump_json(indent=2)
69
  dirname, basename = os.path.split(path)
70
  # Create temp file in the same directory to make sure it's on the same filesystem.
 
76
  os.replace(temp_name, path)
77
 
78
 
79
+ def load(path: str) -> Workspace:
80
+ """Load a workspace from a file.
81
+
82
+ After loading the workspace, the metadata of the workspace is updated.
83
+
84
+ Args:
85
+ path (str): The path to the file to load the workspace from.
86
+
87
+ Returns:
88
+ Workspace: The loaded workspace object, with updated metadata.
89
+ """
90
  with open(path) as f:
91
  j = f.read()
92
  ws = Workspace.model_validate_json(j)
 
95
  return ws
96
 
97
 
98
+ def _update_metadata(ws: Workspace) -> Workspace:
99
+ """Update the metadata of the given workspace object.
100
+
101
+ The metadata is the information about the operations that the nodes in the workspace represent,
102
+ like the parameters and their possible values.
103
+ This information comes from the catalog of operations for the environment of the workspace.
104
+
105
+ Args:
106
+ ws: The workspace object to update.
107
+
108
+ Returns:
109
+ Workspace: The updated workspace object.
110
+ """
111
  catalog = ops.CATALOGS.get(ws.env, {})
112
  nodes = {node.id: node for node in ws.nodes}
113
  done = set()
114
  while len(done) < len(nodes):
115
  for node in ws.nodes:
116
  if node.id in done:
117
+ # TODO: Can nodes with the same ID reference different operations?
118
  continue
119
  data = node.data
120
  op = catalog.get(data.title)
lynxkite-core/tests/test_ops.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import inspect
2
+ from lynxkite.core import ops
3
+ import enum
4
+
5
+
6
+ def test_op_decorator_no_params_no_types_default_positions():
7
+ @ops.op(env="test", name="add", view="basic", outputs=["result"])
8
+ def add(a, b):
9
+ return a + b
10
+
11
+ assert add.__op__.name == "add"
12
+ assert add.__op__.params == {}
13
+ assert add.__op__.inputs == {
14
+ "a": ops.Input(name="a", type=inspect._empty, position="left"),
15
+ "b": ops.Input(name="b", type=inspect._empty, position="left"),
16
+ }
17
+ assert add.__op__.outputs == {
18
+ "result": ops.Output(name="result", type=None, position="right")
19
+ }
20
+ assert add.__op__.type == "basic"
21
+ assert ops.CATALOGS["test"]["add"] == add.__op__
22
+
23
+
24
+ def test_op_decorator_custom_positions():
25
+ @ops.input_position(a="right", b="top")
26
+ @ops.output_position(result="bottom")
27
+ @ops.op(env="test", name="add", view="basic", outputs=["result"])
28
+ def add(a, b):
29
+ return a + b
30
+
31
+ assert add.__op__.name == "add"
32
+ assert add.__op__.params == {}
33
+ assert add.__op__.inputs == {
34
+ "a": ops.Input(name="a", type=inspect._empty, position="right"),
35
+ "b": ops.Input(name="b", type=inspect._empty, position="top"),
36
+ }
37
+ assert add.__op__.outputs == {
38
+ "result": ops.Output(name="result", type=None, position="bottom")
39
+ }
40
+ assert add.__op__.type == "basic"
41
+ assert ops.CATALOGS["test"]["add"] == add.__op__
42
+
43
+
44
+ def test_op_decorator_with_params_and_types_():
45
+ @ops.op(env="test", name="multiply", view="basic", outputs=["result"])
46
+ def multiply(a: int, b: float = 2.0, *, param: str = "param"):
47
+ return a * b
48
+
49
+ assert multiply.__op__.name == "multiply"
50
+ assert multiply.__op__.params == {
51
+ "param": ops.Parameter(name="param", default="param", type=str)
52
+ }
53
+ assert multiply.__op__.inputs == {
54
+ "a": ops.Input(name="a", type=int, position="left"),
55
+ "b": ops.Input(name="b", type=float, position="left"),
56
+ }
57
+ assert multiply.__op__.outputs == {
58
+ "result": ops.Output(name="result", type=None, position="right")
59
+ }
60
+ assert multiply.__op__.type == "basic"
61
+ assert ops.CATALOGS["test"]["multiply"] == multiply.__op__
62
+
63
+
64
+ def test_op_decorator_with_complex_types():
65
+ class Color(enum.Enum):
66
+ RED = 1
67
+ GREEN = 2
68
+ BLUE = 3
69
+
70
+ @ops.op(env="test", name="color_op", view="basic", outputs=["result"])
71
+ def complex_op(color: Color, color_list: list[Color], color_dict: dict[str, Color]):
72
+ return color.name
73
+
74
+ assert complex_op.__op__.name == "color_op"
75
+ assert complex_op.__op__.params == {}
76
+ assert complex_op.__op__.inputs == {
77
+ "color": ops.Input(name="color", type=Color, position="left"),
78
+ "color_list": ops.Input(name="color_list", type=list[Color], position="left"),
79
+ "color_dict": ops.Input(name="color_dict", type=dict[str, Color], position="left"),
80
+ }
81
+ assert complex_op.__op__.type == "basic"
82
+ assert complex_op.__op__.outputs == {
83
+ "result": ops.Output(name="result", type=None, position="right")
84
+ }
85
+ assert ops.CATALOGS["test"]["color_op"] == complex_op.__op__
lynxkite-core/tests/test_workspace.py ADDED
@@ -0,0 +1,101 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import pytest
3
+ import tempfile
4
+ from lynxkite.core import workspace
5
+
6
+
7
+ def test_save_load():
8
+ ws = workspace.Workspace(env="test")
9
+ ws.nodes.append(
10
+ workspace.WorkspaceNode(
11
+ id="1",
12
+ type="node_type",
13
+ data=workspace.WorkspaceNodeData(title="Node 1", params={}),
14
+ position=workspace.Position(x=0, y=0),
15
+ )
16
+ )
17
+ ws.nodes.append(
18
+ workspace.WorkspaceNode(
19
+ id="2",
20
+ type="node_type",
21
+ data=workspace.WorkspaceNodeData(title="Node 2", params={}),
22
+ position=workspace.Position(x=0, y=0),
23
+ )
24
+ )
25
+ ws.edges.append(
26
+ workspace.WorkspaceEdge(
27
+ id="edge1",
28
+ source="1",
29
+ target="2",
30
+ sourceHandle="",
31
+ targetHandle="",
32
+ )
33
+ )
34
+ path = os.path.join(tempfile.gettempdir(), "test_workspace.json")
35
+
36
+ try:
37
+ workspace.save(ws, path)
38
+ assert os.path.exists(path)
39
+ loaded_ws = workspace.load(path)
40
+ assert loaded_ws.env == ws.env
41
+ assert len(loaded_ws.nodes) == len(ws.nodes)
42
+ assert len(loaded_ws.edges) == len(ws.edges)
43
+ sorted_ws_nodes = sorted(ws.nodes, key=lambda x: x.id)
44
+ sorted_loaded_ws_nodes = sorted(loaded_ws.nodes, key=lambda x: x.id)
45
+ # We do manual assertion on each attribute because metadata is added at
46
+ # loading time, which makes the objects different.
47
+ for node, loaded_node in zip(sorted_ws_nodes, sorted_loaded_ws_nodes):
48
+ assert node.id == loaded_node.id
49
+ assert node.type == loaded_node.type
50
+ assert node.data.title == loaded_node.data.title
51
+ assert node.data.params == loaded_node.data.params
52
+ assert node.position.x == loaded_node.position.x
53
+ assert node.position.y == loaded_node.position.y
54
+ sorted_ws_edges = sorted(ws.edges, key=lambda x: x.id)
55
+ sorted_loaded_ws_edges = sorted(loaded_ws.edges, key=lambda x: x.id)
56
+ for edge, loaded_edge in zip(sorted_ws_edges, sorted_loaded_ws_edges):
57
+ assert edge.id == loaded_edge.id
58
+ assert edge.source == loaded_edge.source
59
+ assert edge.target == loaded_edge.target
60
+ assert edge.sourceHandle == loaded_edge.sourceHandle
61
+ assert edge.targetHandle == loaded_edge.targetHandle
62
+ finally:
63
+ os.remove(path)
64
+
65
+
66
+ @pytest.fixture(scope="session", autouse=True)
67
+ def populate_ops_catalog():
68
+ from lynxkite.core import ops
69
+
70
+ ops.register_passive_op("test", "Test Operation", [])
71
+
72
+
73
+ def test_update_metadata():
74
+ ws = workspace.Workspace(env="test")
75
+ ws.nodes.append(
76
+ workspace.WorkspaceNode(
77
+ id="1",
78
+ type="basic",
79
+ data=workspace.WorkspaceNodeData(title="Test Operation", params={}),
80
+ position=workspace.Position(x=0, y=0),
81
+ )
82
+ )
83
+ ws.nodes.append(
84
+ workspace.WorkspaceNode(
85
+ id="2",
86
+ type="basic",
87
+ data=workspace.WorkspaceNodeData(title="Unknown Operation", params={}),
88
+ position=workspace.Position(x=0, y=0),
89
+ )
90
+ )
91
+ updated_ws = workspace._update_metadata(ws)
92
+ assert updated_ws.nodes[0].data.meta.name == "Test Operation"
93
+ assert updated_ws.nodes[0].data.error is None
94
+ assert not hasattr(updated_ws.nodes[1].data, "meta")
95
+ assert updated_ws.nodes[1].data.error == "Unknown operation."
96
+
97
+
98
+ def test_update_metadata_with_empty_workspace():
99
+ ws = workspace.Workspace(env="test")
100
+ updated_ws = workspace._update_metadata(ws)
101
+ assert len(updated_ws.nodes) == 0
lynxkite-graph-analytics/pyproject.toml CHANGED
@@ -14,9 +14,16 @@ dependencies = [
14
  ]
15
 
16
  [project.optional-dependencies]
 
 
 
 
17
  gpu = [
18
  "nx-cugraph-cu12>=24.12.0",
19
  ]
20
 
21
  [tool.uv.sources]
22
  lynxkite-core = { path = "../lynxkite-core" }
 
 
 
 
14
  ]
15
 
16
  [project.optional-dependencies]
17
+ dev = [
18
+ "pytest",
19
+ "pytest-asyncio",
20
+ ]
21
  gpu = [
22
  "nx-cugraph-cu12>=24.12.0",
23
  ]
24
 
25
  [tool.uv.sources]
26
  lynxkite-core = { path = "../lynxkite-core" }
27
+
28
+ [tool.pytest.ini_options]
29
+ asyncio_mode = "auto"
lynxkite-graph-analytics/src/lynxkite_plugins/graph_analytics/lynxkite_ops.py CHANGED
@@ -1,7 +1,7 @@
1
  """Graph analytics operations. To be split into separate files when we have more."""
2
 
3
  import os
4
- from lynxkite.core import ops
5
  from collections import deque
6
  import dataclasses
7
  import functools
@@ -119,7 +119,7 @@ def disambiguate_edges(ws):
119
 
120
  @ops.register_executor(ENV)
121
  async def execute(ws):
122
- catalog = ops.CATALOGS[ENV]
123
  disambiguate_edges(ws)
124
  outputs = {}
125
  failed = 0
@@ -130,12 +130,17 @@ async def execute(ws):
130
  # TODO: Take the input/output handles into account.
131
  inputs = [edge.source for edge in ws.edges if edge.target == node.id]
132
  if all(input in outputs for input in inputs):
 
133
  inputs = [outputs[input] for input in inputs]
134
  data = node.data
135
- op = catalog[data.title]
136
  params = {**data.params}
137
- # Convert inputs.
 
 
 
 
138
  try:
 
139
  for i, (x, p) in enumerate(zip(inputs, op.inputs.values())):
140
  if p.type == nx.Graph and isinstance(x, Bundle):
141
  inputs[i] = x.to_nx()
@@ -191,6 +196,7 @@ def create_scale_free_graph(*, nodes: int = 10):
191
  @op("Compute PageRank")
192
  @nx_node_attribute_func("pagerank")
193
  def compute_pagerank(graph: nx.Graph, *, damping=0.85, iterations=100):
 
194
  return nx.pagerank(graph, alpha=damping, max_iter=iterations)
195
 
196
 
 
1
  """Graph analytics operations. To be split into separate files when we have more."""
2
 
3
  import os
4
+ from lynxkite.core import ops, workspace
5
  from collections import deque
6
  import dataclasses
7
  import functools
 
119
 
120
  @ops.register_executor(ENV)
121
  async def execute(ws):
122
+ catalog: dict[str, ops.Op] = ops.CATALOGS[ENV]
123
  disambiguate_edges(ws)
124
  outputs = {}
125
  failed = 0
 
130
  # TODO: Take the input/output handles into account.
131
  inputs = [edge.source for edge in ws.edges if edge.target == node.id]
132
  if all(input in outputs for input in inputs):
133
+ # All inputs for this node are ready, we can compute the output.
134
  inputs = [outputs[input] for input in inputs]
135
  data = node.data
 
136
  params = {**data.params}
137
+ op = catalog.get(data.title)
138
+ if not op:
139
+ data.error = "Operation not found in catalog"
140
+ failed += 1
141
+ continue
142
  try:
143
+ # Convert inputs types to match operation signature.
144
  for i, (x, p) in enumerate(zip(inputs, op.inputs.values())):
145
  if p.type == nx.Graph and isinstance(x, Bundle):
146
  inputs[i] = x.to_nx()
 
196
  @op("Compute PageRank")
197
  @nx_node_attribute_func("pagerank")
198
  def compute_pagerank(graph: nx.Graph, *, damping=0.85, iterations=100):
199
+ # TODO: This requires scipy to be installed.
200
  return nx.pagerank(graph, alpha=damping, max_iter=iterations)
201
 
202
 
lynxkite-graph-analytics/src/lynxkite_plugins/graph_analytics/pytorch_model_ops.py CHANGED
@@ -65,5 +65,3 @@ reg(
65
  P.basic("lr", 0.001),
66
  ],
67
  )
68
-
69
- ops.register_area(ENV, "Repeat", params=[ops.Parameter.basic("times", 1, int)])
 
65
  P.basic("lr", 0.001),
66
  ],
67
  )
 
 
lynxkite-graph-analytics/tests/test_lynxkite_ops.py ADDED
@@ -0,0 +1,94 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pandas as pd
2
+ import pytest
3
+ import networkx as nx
4
+
5
+ from lynxkite.core import workspace
6
+ from lynxkite_plugins.graph_analytics.lynxkite_ops import Bundle, execute, op
7
+
8
+
9
+ async def test_execute_operation_not_in_catalog():
10
+ ws = workspace.Workspace(env="test")
11
+ ws.nodes.append(
12
+ workspace.WorkspaceNode(
13
+ id="1",
14
+ type="node_type",
15
+ data=workspace.WorkspaceNodeData(title="Non existing op", params={}),
16
+ position=workspace.Position(x=0, y=0),
17
+ )
18
+ )
19
+ await execute(ws)
20
+ assert ws.nodes[0].data.error == "Operation not found in catalog"
21
+
22
+
23
+ async def test_execute_operation_inputs_correct_cast():
24
+ # Test that the automatic casting of operation inputs works correctly.
25
+
26
+ @op("Create Bundle")
27
+ def create_bundle() -> Bundle:
28
+ df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]})
29
+ return Bundle(dfs={"edges": df})
30
+
31
+ @op("Bundle to Graph")
32
+ def bundle_to_graph(graph: nx.Graph) -> nx.Graph:
33
+ return graph
34
+
35
+ @op("Graph to Bundle")
36
+ def graph_to_bundle(bundle: Bundle) -> pd.DataFrame:
37
+ return list(bundle.dfs.values())[0]
38
+
39
+ @op("Dataframe to Bundle")
40
+ def dataframe_to_bundle(bundle: Bundle) -> Bundle:
41
+ return bundle
42
+
43
+ ws = workspace.Workspace(env="test")
44
+ ws.nodes.append(
45
+ workspace.WorkspaceNode(
46
+ id="1",
47
+ type="node_type",
48
+ data=workspace.WorkspaceNodeData(title="Create Bundle", params={}),
49
+ position=workspace.Position(x=0, y=0),
50
+ )
51
+ )
52
+ ws.nodes.append(
53
+ workspace.WorkspaceNode(
54
+ id="2",
55
+ type="node_type",
56
+ data=workspace.WorkspaceNodeData(title="Bundle to Graph", params={}),
57
+ position=workspace.Position(x=100, y=0),
58
+ )
59
+ )
60
+ ws.nodes.append(
61
+ workspace.WorkspaceNode(
62
+ id="3",
63
+ type="node_type",
64
+ data=workspace.WorkspaceNodeData(title="Graph to Bundle", params={}),
65
+ position=workspace.Position(x=200, y=0),
66
+ )
67
+ )
68
+ ws.nodes.append(
69
+ workspace.WorkspaceNode(
70
+ id="4",
71
+ type="node_type",
72
+ data=workspace.WorkspaceNodeData(title="Dataframe to Bundle", params={}),
73
+ position=workspace.Position(x=300, y=0),
74
+ )
75
+ )
76
+ ws.edges = [
77
+ workspace.WorkspaceEdge(
78
+ id="1", source="1", target="2", sourceHandle="1", targetHandle="2"
79
+ ),
80
+ workspace.WorkspaceEdge(
81
+ id="2", source="2", target="3", sourceHandle="2", targetHandle="3"
82
+ ),
83
+ workspace.WorkspaceEdge(
84
+ id="3", source="3", target="4", sourceHandle="3", targetHandle="4"
85
+ ),
86
+ ]
87
+
88
+ await execute(ws)
89
+
90
+ assert all([node.data.error is None for node in ws.nodes])
91
+
92
+
93
+ if __name__ == "__main__":
94
+ pytest.main()