darabos commited on
Commit
da1ea6b
·
1 Parent(s): db496eb

Revert "Adding tests (#50)"

Browse files

This reverts commit ba7246a4c8b941b76d158eef41fc4616b609524a.

lynxkite-app/src/lynxkite/app/crdt.py CHANGED
@@ -29,11 +29,7 @@ def ws_exception_handler(exception, log):
29
 
30
 
31
  class WebsocketServer(pycrdt_websocket.WebsocketServer):
32
- async def init_room(self, name: str) -> pycrdt_websocket.YRoom:
33
- """Initialize a room for the workspace with the given name.
34
-
35
- The workspace is loaded from "crdt_data" if it exists there, or from "data", or a new workspace is created.
36
- """
37
  path = CRDT_PATH / f"{name}.crdt"
38
  assert path.is_relative_to(CRDT_PATH)
39
  ystore = pycrdt_websocket.ystore.FileYStore(path)
@@ -53,8 +49,6 @@ class WebsocketServer(pycrdt_websocket.WebsocketServer):
53
  ws["edges"] = pycrdt.Array()
54
  if "env" not in ws:
55
  ws["env"] = "unset"
56
- # We have two possible sources of truth for the workspaces, the YStore and the JSON files.
57
- # In case we didn't find the workspace in the YStore, we try to load it from the JSON files.
58
  try_to_load_workspace(ws, name)
59
  room = pycrdt_websocket.YRoom(
60
  ystore=ystore, ydoc=ydoc, exception_handler=ws_exception_handler
@@ -68,12 +62,6 @@ class WebsocketServer(pycrdt_websocket.WebsocketServer):
68
  return room
69
 
70
  async def get_room(self, name: str) -> pycrdt_websocket.YRoom:
71
- """Get a room by name.
72
-
73
- This method overrides the parent get_room method. The original creates an empty room,
74
- with no associated Ydoc. Instead, we want to initialize the the room with a Workspace
75
- object.
76
- """
77
  if name not in self.rooms:
78
  self.rooms[name] = await self.init_room(name)
79
  room = self.rooms[name]
@@ -84,7 +72,7 @@ class WebsocketServer(pycrdt_websocket.WebsocketServer):
84
  last_ws_input = None
85
 
86
 
87
- def clean_input(ws_pyd: workspace.Workspace):
88
  for node in ws_pyd.nodes:
89
  node.data.display = None
90
  node.data.error = None
@@ -95,43 +83,21 @@ def clean_input(ws_pyd: workspace.Workspace):
95
  delattr(node, key)
96
 
97
 
98
- def crdt_update(
99
- crdt_obj: pycrdt.Map | pycrdt.Array,
100
- python_obj: dict | list,
101
- non_collaborative_fields: set[str] = set(),
102
- ):
103
- """Update a CRDT object to match a Python object.
104
-
105
- The types between the CRDT object and the Python object must match. If the Python object
106
- is a dict, the CRDT object must be a Map. If the Python object is a list, the CRDT object
107
- must be an Array.
108
-
109
- Args:
110
- crdt_obj: The CRDT object, that will be updated to match the Python object.
111
- python_obj: The Python object to update with.
112
- non_collaborative_fields: List of fields to treat as a black box. Black boxes are
113
- updated as a whole, instead of having a fine-grained data structure to edit
114
- collaboratively. Useful for complex fields that contain auto-generated data or
115
- metadata.
116
- The default is an empty set.
117
-
118
- Raises:
119
- ValueError: If the Python object provided is not a dict or list.
120
- """
121
  if isinstance(python_obj, dict):
122
  for key, value in python_obj.items():
123
- if key in non_collaborative_fields:
124
  crdt_obj[key] = value
125
  elif isinstance(value, dict):
126
  if crdt_obj.get(key) is None:
127
  crdt_obj[key] = pycrdt.Map()
128
- crdt_update(crdt_obj[key], value, non_collaborative_fields)
129
  elif isinstance(value, list):
130
  if crdt_obj.get(key) is None:
131
  crdt_obj[key] = pycrdt.Array()
132
- crdt_update(crdt_obj[key], value, non_collaborative_fields)
133
  elif isinstance(value, enum.Enum):
134
- crdt_obj[key] = str(value.value)
135
  else:
136
  crdt_obj[key] = value
137
  elif isinstance(python_obj, list):
@@ -139,14 +105,12 @@ def crdt_update(
139
  if isinstance(value, dict):
140
  if i >= len(crdt_obj):
141
  crdt_obj.append(pycrdt.Map())
142
- crdt_update(crdt_obj[i], value, non_collaborative_fields)
143
  elif isinstance(value, list):
144
  if i >= len(crdt_obj):
145
  crdt_obj.append(pycrdt.Array())
146
- crdt_update(crdt_obj[i], value, non_collaborative_fields)
147
  else:
148
- if isinstance(value, enum.Enum):
149
- value = str(value.value)
150
  if i >= len(crdt_obj):
151
  crdt_obj.append(value)
152
  else:
@@ -155,34 +119,18 @@ def crdt_update(
155
  raise ValueError("Invalid type:", python_obj)
156
 
157
 
158
- def try_to_load_workspace(ws: pycrdt.Map, name: str):
159
- """Load the workspace `name`, if it exists, and update the `ws` CRDT object to match its contents.
160
-
161
- Args:
162
- ws: CRDT object to udpate with the workspace contents.
163
- name: Name of the workspace to load.
164
- """
165
  json_path = f"data/{name}"
166
  if os.path.exists(json_path):
167
  ws_pyd = workspace.load(json_path)
168
- # We treat the display field as a black box, since it is a large
169
- # dictionary that is meant to change as a whole.
170
- crdt_update(ws, ws_pyd.model_dump(), non_collaborative_fields={"display"})
171
 
172
 
173
  last_known_versions = {}
174
  delayed_executions = {}
175
 
176
 
177
- async def workspace_changed(name: str, changes: pycrdt.MapEvent, ws_crdt: pycrdt.Map):
178
- """Callback to react to changes in the workspace.
179
-
180
-
181
- Args:
182
- name: Name of the workspace.
183
- changes: Changes performed to the workspace.
184
- ws_crdt: CRDT object representing the workspace.
185
- """
186
  ws_pyd = workspace.Workspace.model_validate(ws_crdt.to_py())
187
  # Do not trigger execution for superficial changes.
188
  # This is a quick solution until we build proper caching.
@@ -206,35 +154,22 @@ async def workspace_changed(name: str, changes: pycrdt.MapEvent, ws_crdt: pycrdt
206
  await execute(name, ws_crdt, ws_pyd)
207
 
208
 
209
- async def execute(
210
- name: str, ws_crdt: pycrdt.Map, ws_pyd: workspace.Workspace, delay: int = 0
211
- ):
212
- """Execute the workspace and update the CRDT object with the results.
213
-
214
- Args:
215
- name: Name of the workspace.
216
- ws_crdt: CRDT object representing the workspace.
217
- ws_pyd: Workspace object to execute.
218
- delay: Wait time before executing the workspace. The default is 0.
219
- """
220
  if delay:
221
  try:
222
  await asyncio.sleep(delay)
223
  except asyncio.CancelledError:
224
  return
225
  path = DATA_PATH / name
226
- assert path.is_relative_to(DATA_PATH), "Provided workspace path is invalid"
227
- # Save user changes before executing, in case the execution fails.
228
  workspace.save(ws_pyd, path)
229
  await workspace.execute(ws_pyd)
230
  workspace.save(ws_pyd, path)
231
- # Execution happened on the Python object, we need to replicate
232
- # the results to the CRDT object.
233
  with ws_crdt.doc.transaction():
234
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
235
  if "data" not in nc:
236
  nc["data"] = pycrdt.Map()
237
- # Display is added as a non collaborative field.
238
  nc["data"]["display"] = np.data.display
239
  nc["data"]["error"] = np.data.error
240
 
 
29
 
30
 
31
  class WebsocketServer(pycrdt_websocket.WebsocketServer):
32
+ async def init_room(self, name):
 
 
 
 
33
  path = CRDT_PATH / f"{name}.crdt"
34
  assert path.is_relative_to(CRDT_PATH)
35
  ystore = pycrdt_websocket.ystore.FileYStore(path)
 
49
  ws["edges"] = pycrdt.Array()
50
  if "env" not in ws:
51
  ws["env"] = "unset"
 
 
52
  try_to_load_workspace(ws, name)
53
  room = pycrdt_websocket.YRoom(
54
  ystore=ystore, ydoc=ydoc, exception_handler=ws_exception_handler
 
62
  return room
63
 
64
  async def get_room(self, name: str) -> pycrdt_websocket.YRoom:
 
 
 
 
 
 
65
  if name not in self.rooms:
66
  self.rooms[name] = await self.init_room(name)
67
  room = self.rooms[name]
 
72
  last_ws_input = None
73
 
74
 
75
+ def clean_input(ws_pyd):
76
  for node in ws_pyd.nodes:
77
  node.data.display = None
78
  node.data.error = None
 
83
  delattr(node, key)
84
 
85
 
86
+ def crdt_update(crdt_obj, python_obj, boxes=set()):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
  if isinstance(python_obj, dict):
88
  for key, value in python_obj.items():
89
+ if key in boxes:
90
  crdt_obj[key] = value
91
  elif isinstance(value, dict):
92
  if crdt_obj.get(key) is None:
93
  crdt_obj[key] = pycrdt.Map()
94
+ crdt_update(crdt_obj[key], value, boxes)
95
  elif isinstance(value, list):
96
  if crdt_obj.get(key) is None:
97
  crdt_obj[key] = pycrdt.Array()
98
+ crdt_update(crdt_obj[key], value, boxes)
99
  elif isinstance(value, enum.Enum):
100
+ crdt_obj[key] = str(value)
101
  else:
102
  crdt_obj[key] = value
103
  elif isinstance(python_obj, list):
 
105
  if isinstance(value, dict):
106
  if i >= len(crdt_obj):
107
  crdt_obj.append(pycrdt.Map())
108
+ crdt_update(crdt_obj[i], value, boxes)
109
  elif isinstance(value, list):
110
  if i >= len(crdt_obj):
111
  crdt_obj.append(pycrdt.Array())
112
+ crdt_update(crdt_obj[i], value, boxes)
113
  else:
 
 
114
  if i >= len(crdt_obj):
115
  crdt_obj.append(value)
116
  else:
 
119
  raise ValueError("Invalid type:", python_obj)
120
 
121
 
122
+ def try_to_load_workspace(ws, name):
 
 
 
 
 
 
123
  json_path = f"data/{name}"
124
  if os.path.exists(json_path):
125
  ws_pyd = workspace.load(json_path)
126
+ crdt_update(ws, ws_pyd.model_dump(), boxes={"display"})
 
 
127
 
128
 
129
  last_known_versions = {}
130
  delayed_executions = {}
131
 
132
 
133
+ async def workspace_changed(name, changes, ws_crdt):
 
 
 
 
 
 
 
 
134
  ws_pyd = workspace.Workspace.model_validate(ws_crdt.to_py())
135
  # Do not trigger execution for superficial changes.
136
  # This is a quick solution until we build proper caching.
 
154
  await execute(name, ws_crdt, ws_pyd)
155
 
156
 
157
+ async def execute(name, ws_crdt, ws_pyd, delay=0):
 
 
 
 
 
 
 
 
 
 
158
  if delay:
159
  try:
160
  await asyncio.sleep(delay)
161
  except asyncio.CancelledError:
162
  return
163
  path = DATA_PATH / name
164
+ assert path.is_relative_to(DATA_PATH)
 
165
  workspace.save(ws_pyd, path)
166
  await workspace.execute(ws_pyd)
167
  workspace.save(ws_pyd, path)
 
 
168
  with ws_crdt.doc.transaction():
169
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
170
  if "data" not in nc:
171
  nc["data"] = pycrdt.Map()
172
+ # Display is added as an opaque Box.
173
  nc["data"]["display"] = np.data.display
174
  nc["data"]["error"] = np.data.error
175
 
lynxkite-app/tests/test_crdt.py DELETED
@@ -1,72 +0,0 @@
1
- from enum import Enum
2
- import pycrdt
3
- import pytest
4
- from lynxkite.app.crdt import crdt_update
5
-
6
-
7
- @pytest.fixture
8
- def empty_dict_workspace():
9
- ydoc = pycrdt.Doc()
10
- ydoc["workspace"] = ws = pycrdt.Map()
11
- yield ws
12
-
13
-
14
- @pytest.fixture
15
- def empty_list_workspace():
16
- ydoc = pycrdt.Doc()
17
- ydoc["workspace"] = ws = pycrdt.Array()
18
- yield ws
19
-
20
-
21
- class MyEnum(Enum):
22
- VALUE = 1
23
-
24
-
25
- @pytest.mark.parametrize(
26
- "python_obj,expected",
27
- [
28
- (
29
- {
30
- "key1": "value1",
31
- "key2": {
32
- "nested_key1": "nested_value1",
33
- "nested_key2": ["nested_value2"],
34
- "nested_key3": MyEnum.VALUE,
35
- },
36
- },
37
- {
38
- "key1": "value1",
39
- "key2": {
40
- "nested_key1": "nested_value1",
41
- "nested_key2": ["nested_value2"],
42
- "nested_key3": "1",
43
- },
44
- },
45
- )
46
- ],
47
- )
48
- def test_crdt_update_with_dict(empty_dict_workspace, python_obj, expected):
49
- crdt_update(empty_dict_workspace, python_obj)
50
- assert empty_dict_workspace.to_py() == expected
51
-
52
-
53
- @pytest.mark.parametrize(
54
- "python_obj,expected",
55
- [
56
- (
57
- [
58
- "value1",
59
- {"nested_key1": "nested_value1", "nested_key2": ["nested_value2"]},
60
- MyEnum.VALUE,
61
- ],
62
- [
63
- "value1",
64
- {"nested_key1": "nested_value1", "nested_key2": ["nested_value2"]},
65
- "1",
66
- ],
67
- ),
68
- ],
69
- )
70
- def test_crdt_update_with_list(empty_list_workspace, python_obj, expected):
71
- crdt_update(empty_list_workspace, python_obj)
72
- assert empty_list_workspace.to_py() == expected
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
lynxkite-app/tests/test_main.py DELETED
@@ -1,77 +0,0 @@
1
- import uuid
2
- from fastapi.testclient import TestClient
3
- from lynxkite.app.main import app, detect_plugins, DATA_PATH
4
- import os
5
-
6
-
7
- client = TestClient(app)
8
-
9
-
10
- def test_detect_plugins_with_plugins():
11
- # This test assumes that these plugins are installed as part of the testing process.
12
- plugins = detect_plugins()
13
- assert all(
14
- plugin in plugins.keys()
15
- for plugin in [
16
- "lynxkite_plugins.graph_analytics",
17
- "lynxkite_plugins.lynxscribe",
18
- "lynxkite_plugins.pillow_example",
19
- ]
20
- )
21
-
22
-
23
- def test_get_catalog():
24
- response = client.get("/api/catalog")
25
- assert response.status_code == 200
26
-
27
-
28
- def test_save_and_load():
29
- save_request = {
30
- "path": "test",
31
- "ws": {
32
- "env": "test",
33
- "nodes": [
34
- {
35
- "id": "Node_1",
36
- "type": "basic",
37
- "data": {
38
- "display": None,
39
- "error": "Unknown operation.",
40
- "title": "Test node",
41
- "params": {"param1": "value"},
42
- },
43
- "position": {"x": -493.5496596237119, "y": 20.90123252513356},
44
- }
45
- ],
46
- "edges": [],
47
- },
48
- }
49
- response = client.post("/api/save", json=save_request)
50
- saved_ws = response.json()
51
- assert response.status_code == 200
52
- response = client.get("/api/load?path=test")
53
- assert response.status_code == 200
54
- assert saved_ws == response.json()
55
-
56
-
57
- def test_list_dir():
58
- test_dir = str(uuid.uuid4())
59
- test_dir_full_path = DATA_PATH / test_dir
60
- test_dir_full_path.mkdir(exist_ok=True)
61
- test_file = test_dir_full_path / "test_file.txt"
62
- test_file.touch()
63
- response = client.get(f"/api/dir/list?path={str(test_dir)}")
64
- assert response.status_code == 200
65
- assert len(response.json()) == 1
66
- assert response.json()[0]["name"] == f"{test_dir}/test_file.txt"
67
- assert response.json()[0]["type"] == "workspace"
68
- test_file.unlink()
69
- test_dir_full_path.rmdir()
70
-
71
-
72
- def test_make_dir():
73
- dir_name = str(uuid.uuid4())
74
- response = client.post("/api/dir/mkdir", json={"path": dir_name})
75
- assert response.status_code == 200
76
- assert os.path.exists(DATA_PATH / dir_name)
77
- os.rmdir(DATA_PATH / dir_name)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
lynxkite-app/web/src/apiTypes.ts CHANGED
@@ -5,21 +5,21 @@
5
  /* Do not modify it by hand - just update the pydantic models and then re-run the script
6
  */
7
 
 
 
 
 
 
 
 
8
  export interface BaseConfig {
9
  [k: string]: unknown;
10
  }
11
- export interface NodePosition {
12
  x: number;
13
  y: number;
14
  [k: string]: unknown;
15
  }
16
- /**
17
- * A workspace is a representation of a computational graph that consists of nodes and edges.
18
- *
19
- * Each node represents an operation or task, and the edges represent the flow of data between
20
- * the nodes. Each workspace is associated with an environment, which determines the operations
21
- * that can be performed in the workspace and the execution method for the operations.
22
- */
23
  export interface Workspace {
24
  env?: string;
25
  nodes?: WorkspaceNode[];
@@ -30,7 +30,7 @@ export interface WorkspaceNode {
30
  id: string;
31
  type: string;
32
  data: WorkspaceNodeData;
33
- position: NodePosition;
34
  [k: string]: unknown;
35
  }
36
  export interface WorkspaceNodeData {
 
5
  /* Do not modify it by hand - just update the pydantic models and then re-run the script
6
  */
7
 
8
+ /* eslint-disable */
9
+ /**
10
+ * This file was automatically generated by json-schema-to-typescript.
11
+ * DO NOT MODIFY IT BY HAND. Instead, modify the source JSONSchema file,
12
+ * and run json-schema-to-typescript to regenerate this file.
13
+ */
14
+
15
  export interface BaseConfig {
16
  [k: string]: unknown;
17
  }
18
+ export interface Position {
19
  x: number;
20
  y: number;
21
  [k: string]: unknown;
22
  }
 
 
 
 
 
 
 
23
  export interface Workspace {
24
  env?: string;
25
  nodes?: WorkspaceNode[];
 
30
  id: string;
31
  type: string;
32
  data: WorkspaceNodeData;
33
+ position: Position;
34
  [k: string]: unknown;
35
  }
36
  export interface WorkspaceNodeData {
lynxkite-core/src/lynxkite/core/executors/one_by_one.py CHANGED
@@ -46,19 +46,17 @@ def register(env: str, cache: bool = True):
46
  ops.EXECUTORS[env] = lambda ws: execute(ws, ops.CATALOGS[env], cache=cache)
47
 
48
 
49
- def get_stages(ws: workspace.Workspace, catalog: dict[str, ops.Op]):
50
  """Inputs on top/bottom are batch inputs. We decompose the graph into a DAG of components along these edges."""
51
  nodes = {n.id: n for n in ws.nodes}
52
  batch_inputs = {}
53
  inputs = {}
54
- # For each edge in the workspacce, we record the inputs (sources)
55
- # required for each node (target).
56
  for edge in ws.edges:
57
  inputs.setdefault(edge.target, []).append(edge.source)
58
  node = nodes[edge.target]
59
  op = catalog[node.data.title]
60
  i = op.inputs[edge.targetHandle]
61
- if i.side in [ops.Side.TOP, ops.Side.BOTTOM]:
62
  batch_inputs.setdefault(edge.target, []).append(edge.source)
63
  stages = []
64
  for bt, bss in batch_inputs.items():
@@ -95,7 +93,7 @@ async def await_if_needed(obj):
95
  return obj
96
 
97
 
98
- async def execute(ws: workspace.Workspace, catalog: dict[str, ops.Op], cache=None):
99
  nodes = {n.id: n for n in ws.nodes}
100
  contexts = {n.id: Context(node=n) for n in ws.nodes}
101
  edges = {n.id: [] for n in ws.nodes}
@@ -110,12 +108,7 @@ async def execute(ws: workspace.Workspace, catalog: dict[str, ops.Op], cache=Non
110
  node.data.error = f'Operation "{node.data.title}" not found.'
111
  continue
112
  # Start tasks for nodes that have no non-batch inputs.
113
- if all(
114
- [
115
- i.side in [ops.Side.TOP, ops.Side.BOTTOM]
116
- for i in op.inputs.values()
117
- ]
118
- ):
119
  tasks[node.id] = [NO_INPUT]
120
  batch_inputs = {}
121
  # Run the rest until we run out of tasks.
@@ -138,7 +131,7 @@ async def execute(ws: workspace.Workspace, catalog: dict[str, ops.Op], cache=Non
138
  try:
139
  inputs = []
140
  for i in op.inputs.values():
141
- if i.side in [ops.Side.TOP, ops.Side.BOTTOM]:
142
  assert (n, i.name) in batch_inputs, f"{i.name} is missing"
143
  inputs.append(batch_inputs[(n, i.name)])
144
  else:
@@ -163,16 +156,16 @@ async def execute(ws: workspace.Workspace, catalog: dict[str, ops.Op], cache=Non
163
  results.extend(result)
164
  else: # Finished all tasks without errors.
165
  if (
166
- op.view_type == ops.ViewType.VISUALIZATION
167
- or op.view_type == ops.ViewType.TABLE_VIEW
168
- or op.view_type == ops.ViewType.IMAGE
169
  ):
170
  data.display = results[0]
171
  for edge in edges[node.id]:
172
  t = nodes[edge.target]
173
  op = catalog[t.data.title]
174
  i = op.inputs[edge.targetHandle]
175
- if i.side in [ops.Side.TOP, ops.Side.BOTTOM]:
176
  batch_inputs.setdefault(
177
  (edge.target, edge.targetHandle), []
178
  ).extend(results)
 
46
  ops.EXECUTORS[env] = lambda ws: execute(ws, ops.CATALOGS[env], cache=cache)
47
 
48
 
49
+ def get_stages(ws, catalog):
50
  """Inputs on top/bottom are batch inputs. We decompose the graph into a DAG of components along these edges."""
51
  nodes = {n.id: n for n in ws.nodes}
52
  batch_inputs = {}
53
  inputs = {}
 
 
54
  for edge in ws.edges:
55
  inputs.setdefault(edge.target, []).append(edge.source)
56
  node = nodes[edge.target]
57
  op = catalog[node.data.title]
58
  i = op.inputs[edge.targetHandle]
59
+ if i.position in "top or bottom":
60
  batch_inputs.setdefault(edge.target, []).append(edge.source)
61
  stages = []
62
  for bt, bss in batch_inputs.items():
 
93
  return obj
94
 
95
 
96
+ async def execute(ws, catalog, cache=None):
97
  nodes = {n.id: n for n in ws.nodes}
98
  contexts = {n.id: Context(node=n) for n in ws.nodes}
99
  edges = {n.id: [] for n in ws.nodes}
 
108
  node.data.error = f'Operation "{node.data.title}" not found.'
109
  continue
110
  # Start tasks for nodes that have no non-batch inputs.
111
+ if all([i.position in "top or bottom" for i in op.inputs.values()]):
 
 
 
 
 
112
  tasks[node.id] = [NO_INPUT]
113
  batch_inputs = {}
114
  # Run the rest until we run out of tasks.
 
131
  try:
132
  inputs = []
133
  for i in op.inputs.values():
134
+ if i.position in "top or bottom":
135
  assert (n, i.name) in batch_inputs, f"{i.name} is missing"
136
  inputs.append(batch_inputs[(n, i.name)])
137
  else:
 
156
  results.extend(result)
157
  else: # Finished all tasks without errors.
158
  if (
159
+ op.type == "visualization"
160
+ or op.type == "table_view"
161
+ or op.type == "image"
162
  ):
163
  data.display = results[0]
164
  for edge in edges[node.id]:
165
  t = nodes[edge.target]
166
  op = catalog[t.data.title]
167
  i = op.inputs[edge.targetHandle]
168
+ if i.position in "top or bottom":
169
  batch_inputs.setdefault(
170
  (edge.target, edge.targetHandle), []
171
  ).extend(results)
lynxkite-core/src/lynxkite/core/ops.py CHANGED
@@ -61,23 +61,16 @@ class Parameter(BaseConfig):
61
  return Parameter(name=name, default=default, type=type)
62
 
63
 
64
- class Side(enum.StrEnum):
65
- LEFT = "left"
66
- RIGHT = "right"
67
- TOP = "top"
68
- BOTTOM = "bottom"
69
-
70
-
71
  class Input(BaseConfig):
72
  name: str
73
  type: Type
74
- side: Side = Side.LEFT
75
 
76
 
77
  class Output(BaseConfig):
78
  name: str
79
  type: Type
80
- side: Side = Side.RIGHT
81
 
82
 
83
  MULTI_INPUT = Input(name="multi", type="*")
@@ -91,22 +84,13 @@ def basic_outputs(*names):
91
  return {name: Output(name=name, type=None) for name in names}
92
 
93
 
94
- class ViewType(enum.StrEnum):
95
- """Represents the visualization options for an operation."""
96
-
97
- BASIC = "basic"
98
- VISUALIZATION = "visualization"
99
- IMAGE = "image"
100
- TABLE_VIEW = "table_view"
101
-
102
-
103
  class Op(BaseConfig):
104
  func: typing.Callable = pydantic.Field(exclude=True)
105
  name: str
106
  params: dict[str, Parameter]
107
  inputs: dict[str, Input]
108
  outputs: dict[str, Output]
109
- view_type: ViewType = ViewType.BASIC # The UI to use for this operation.
110
 
111
  def __call__(self, *inputs, **params):
112
  # Convert parameters.
@@ -149,7 +133,7 @@ def op(env: str, name: str, *, view="basic", outputs=None):
149
  params=params,
150
  inputs=inputs,
151
  outputs=_outputs,
152
- view_type=view,
153
  )
154
  CATALOGS.setdefault(env, {})
155
  CATALOGS[env][name] = op
@@ -159,25 +143,25 @@ def op(env: str, name: str, *, view="basic", outputs=None):
159
  return decorator
160
 
161
 
162
- def input_side(**kwargs):
163
- """Decorator for specifying unusual sides for the inputs."""
164
 
165
  def decorator(func):
166
  op = func.__op__
167
  for k, v in kwargs.items():
168
- op.inputs[k].side = v
169
  return func
170
 
171
  return decorator
172
 
173
 
174
- def output_side(**kwargs):
175
- """Decorator for specifying unusual sides for the outputs."""
176
 
177
  def decorator(func):
178
  op = func.__op__
179
  for k, v in kwargs.items():
180
- op.outputs[k].side = v
181
  return func
182
 
183
  return decorator
@@ -189,13 +173,7 @@ def no_op(*args, **kwargs):
189
  return None
190
 
191
 
192
- def register_passive_op(
193
- env: str,
194
- name: str,
195
- inputs: list[Input] = [],
196
- outputs: list[Output] = ["output"],
197
- params: list[Parameter] = [],
198
- ):
199
  """A passive operation has no associated code."""
200
  op = Op(
201
  func=no_op,
@@ -231,3 +209,16 @@ def op_registration(env: str):
231
 
232
  def passive_op_registration(env: str):
233
  return functools.partial(register_passive_op, env)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
  return Parameter(name=name, default=default, type=type)
62
 
63
 
 
 
 
 
 
 
 
64
  class Input(BaseConfig):
65
  name: str
66
  type: Type
67
+ position: str = "left"
68
 
69
 
70
  class Output(BaseConfig):
71
  name: str
72
  type: Type
73
+ position: str = "right"
74
 
75
 
76
  MULTI_INPUT = Input(name="multi", type="*")
 
84
  return {name: Output(name=name, type=None) for name in names}
85
 
86
 
 
 
 
 
 
 
 
 
 
87
  class Op(BaseConfig):
88
  func: typing.Callable = pydantic.Field(exclude=True)
89
  name: str
90
  params: dict[str, Parameter]
91
  inputs: dict[str, Input]
92
  outputs: dict[str, Output]
93
+ type: str = "basic" # The UI to use for this operation.
94
 
95
  def __call__(self, *inputs, **params):
96
  # Convert parameters.
 
133
  params=params,
134
  inputs=inputs,
135
  outputs=_outputs,
136
+ type=view,
137
  )
138
  CATALOGS.setdefault(env, {})
139
  CATALOGS[env][name] = op
 
143
  return decorator
144
 
145
 
146
+ def input_position(**kwargs):
147
+ """Decorator for specifying unusual positions for the inputs."""
148
 
149
  def decorator(func):
150
  op = func.__op__
151
  for k, v in kwargs.items():
152
+ op.inputs[k].position = v
153
  return func
154
 
155
  return decorator
156
 
157
 
158
+ def output_position(**kwargs):
159
+ """Decorator for specifying unusual positions for the outputs."""
160
 
161
  def decorator(func):
162
  op = func.__op__
163
  for k, v in kwargs.items():
164
+ op.outputs[k].position = v
165
  return func
166
 
167
  return decorator
 
173
  return None
174
 
175
 
176
+ def register_passive_op(env: str, name: str, inputs=[], outputs=["output"], params=[]):
 
 
 
 
 
 
177
  """A passive operation has no associated code."""
178
  op = Op(
179
  func=no_op,
 
209
 
210
  def passive_op_registration(env: str):
211
  return functools.partial(register_passive_op, env)
212
+
213
+
214
+ def register_area(env, name, params=[]):
215
+ """A node that represents an area. It can contain other nodes, but does not restrict movement in any way."""
216
+ op = Op(
217
+ func=no_op,
218
+ name=name,
219
+ params={p.name: p for p in params},
220
+ inputs={},
221
+ outputs={},
222
+ type="area",
223
+ )
224
+ CATALOGS[env][name] = op
lynxkite-core/src/lynxkite/core/workspace.py CHANGED
@@ -14,8 +14,7 @@ class BaseConfig(pydantic.BaseModel):
14
  )
15
 
16
 
17
- # TODO: Rename this to coordinates
18
- class NodePosition(BaseConfig):
19
  x: float
20
  y: float
21
 
@@ -30,12 +29,10 @@ class WorkspaceNodeData(BaseConfig):
30
 
31
 
32
  class WorkspaceNode(BaseConfig):
33
- # The naming of these attributes matches the ones for the NodeBase type in React flow
34
- # modyfing them will break the frontend.
35
  id: str
36
  type: str
37
  data: WorkspaceNodeData
38
- position: NodePosition
39
 
40
 
41
  class WorkspaceEdge(BaseConfig):
@@ -47,13 +44,6 @@ class WorkspaceEdge(BaseConfig):
47
 
48
 
49
  class Workspace(BaseConfig):
50
- """A workspace is a representation of a computational graph that consists of nodes and edges.
51
-
52
- Each node represents an operation or task, and the edges represent the flow of data between
53
- the nodes. Each workspace is associated with an environment, which determines the operations
54
- that can be performed in the workspace and the execution method for the operations.
55
- """
56
-
57
  env: str = ""
58
  nodes: list[WorkspaceNode] = dataclasses.field(default_factory=list)
59
  edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)
@@ -65,7 +55,6 @@ async def execute(ws: Workspace):
65
 
66
 
67
  def save(ws: Workspace, path: str):
68
- """Persist a workspace to a local file in JSON format."""
69
  j = ws.model_dump_json(indent=2)
70
  dirname, basename = os.path.split(path)
71
  # Create temp file in the same directory to make sure it's on the same filesystem.
@@ -77,17 +66,7 @@ def save(ws: Workspace, path: str):
77
  os.replace(temp_name, path)
78
 
79
 
80
- def load(path: str) -> Workspace:
81
- """Load a workspace from a file.
82
-
83
- After loading the workspace, the metadata of the workspace is updated.
84
-
85
- Args:
86
- path (str): The path to the file to load the workspace from.
87
-
88
- Returns:
89
- Workspace: The loaded workspace object, with updated metadata.
90
- """
91
  with open(path) as f:
92
  j = f.read()
93
  ws = Workspace.model_validate_json(j)
@@ -96,32 +75,19 @@ def load(path: str) -> Workspace:
96
  return ws
97
 
98
 
99
- def _update_metadata(ws: Workspace) -> Workspace:
100
- """Update the metadata of the given workspace object.
101
-
102
- The metadata is the information about the operations that the nodes in the workspace represent,
103
- like the parameters and their possible values.
104
- This information comes from the catalog of operations for the environment of the workspace.
105
-
106
- Args:
107
- ws: The workspace object to update.
108
-
109
- Returns:
110
- Workspace: The updated workspace object.
111
- """
112
- catalog: dict[str, ops.Op] = ops.CATALOGS.get(ws.env, {})
113
  nodes = {node.id: node for node in ws.nodes}
114
  done = set()
115
  while len(done) < len(nodes):
116
  for node in ws.nodes:
117
  if node.id in done:
118
- # TODO: Can nodes with the same ID reference different operations?
119
  continue
120
  data = node.data
121
  op = catalog.get(data.title)
122
  if op:
123
  data.meta = op
124
- node.type = op.view_type
125
  if data.error == "Unknown operation.":
126
  data.error = None
127
  else:
 
14
  )
15
 
16
 
17
+ class Position(BaseConfig):
 
18
  x: float
19
  y: float
20
 
 
29
 
30
 
31
  class WorkspaceNode(BaseConfig):
 
 
32
  id: str
33
  type: str
34
  data: WorkspaceNodeData
35
+ position: Position
36
 
37
 
38
  class WorkspaceEdge(BaseConfig):
 
44
 
45
 
46
  class Workspace(BaseConfig):
 
 
 
 
 
 
 
47
  env: str = ""
48
  nodes: list[WorkspaceNode] = dataclasses.field(default_factory=list)
49
  edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)
 
55
 
56
 
57
  def save(ws: Workspace, path: str):
 
58
  j = ws.model_dump_json(indent=2)
59
  dirname, basename = os.path.split(path)
60
  # Create temp file in the same directory to make sure it's on the same filesystem.
 
66
  os.replace(temp_name, path)
67
 
68
 
69
+ def load(path: str):
 
 
 
 
 
 
 
 
 
 
70
  with open(path) as f:
71
  j = f.read()
72
  ws = Workspace.model_validate_json(j)
 
75
  return ws
76
 
77
 
78
+ def _update_metadata(ws):
79
+ catalog = ops.CATALOGS.get(ws.env, {})
 
 
 
 
 
 
 
 
 
 
 
 
80
  nodes = {node.id: node for node in ws.nodes}
81
  done = set()
82
  while len(done) < len(nodes):
83
  for node in ws.nodes:
84
  if node.id in done:
 
85
  continue
86
  data = node.data
87
  op = catalog.get(data.title)
88
  if op:
89
  data.meta = op
90
+ node.type = op.type
91
  if data.error == "Unknown operation.":
92
  data.error = None
93
  else:
lynxkite-core/tests/test_ops.py DELETED
@@ -1,89 +0,0 @@
1
- import inspect
2
- from lynxkite.core import ops
3
- import enum
4
-
5
-
6
- def test_op_decorator_no_params_no_types_default_sides():
7
- @ops.op(env="test", name="add", view=ops.ViewType.BASIC, outputs=["result"])
8
- def add(a, b):
9
- return a + b
10
-
11
- assert add.__op__.name == "add"
12
- assert add.__op__.params == {}
13
- assert add.__op__.inputs == {
14
- "a": ops.Input(name="a", type=inspect._empty, side=ops.Side.LEFT),
15
- "b": ops.Input(name="b", type=inspect._empty, side=ops.Side.LEFT),
16
- }
17
- assert add.__op__.outputs == {
18
- "result": ops.Output(name="result", type=None, side=ops.Side.RIGHT)
19
- }
20
- assert add.__op__.view_type == ops.ViewType.BASIC
21
- assert ops.CATALOGS["test"]["add"] == add.__op__
22
-
23
-
24
- def test_op_decorator_custom_sides():
25
- @ops.input_side(a=ops.Side.RIGHT, b=ops.Side.TOP)
26
- @ops.output_side(result=ops.Side.BOTTOM)
27
- @ops.op(env="test", name="add", view=ops.ViewType.BASIC, outputs=["result"])
28
- def add(a, b):
29
- return a + b
30
-
31
- assert add.__op__.name == "add"
32
- assert add.__op__.params == {}
33
- assert add.__op__.inputs == {
34
- "a": ops.Input(name="a", type=inspect._empty, side=ops.Side.RIGHT),
35
- "b": ops.Input(name="b", type=inspect._empty, side=ops.Side.TOP),
36
- }
37
- assert add.__op__.outputs == {
38
- "result": ops.Output(name="result", type=None, side=ops.Side.BOTTOM)
39
- }
40
- assert add.__op__.view_type == ops.ViewType.BASIC
41
- assert ops.CATALOGS["test"]["add"] == add.__op__
42
-
43
-
44
- def test_op_decorator_with_params_and_types_():
45
- @ops.op(env="test", name="multiply", view=ops.ViewType.BASIC, outputs=["result"])
46
- def multiply(a: int, b: float = 2.0, *, param: str = "param"):
47
- return a * b
48
-
49
- assert multiply.__op__.name == "multiply"
50
- assert multiply.__op__.params == {
51
- "param": ops.Parameter(name="param", default="param", type=str)
52
- }
53
- assert multiply.__op__.inputs == {
54
- "a": ops.Input(name="a", type=int, side=ops.Side.LEFT),
55
- "b": ops.Input(name="b", type=float, side=ops.Side.LEFT),
56
- }
57
- assert multiply.__op__.outputs == {
58
- "result": ops.Output(name="result", type=None, side=ops.Side.RIGHT)
59
- }
60
- assert multiply.__op__.view_type == ops.ViewType.BASIC
61
- assert ops.CATALOGS["test"]["multiply"] == multiply.__op__
62
-
63
-
64
- def test_op_decorator_with_complex_types():
65
- class Color(enum.Enum):
66
- RED = 1
67
- GREEN = 2
68
- BLUE = 3
69
-
70
- @ops.op(env="test", name="color_op", view=ops.ViewType.BASIC, outputs=["result"])
71
- def complex_op(color: Color, color_list: list[Color], color_dict: dict[str, Color]):
72
- return color.name
73
-
74
- assert complex_op.__op__.name == "color_op"
75
- assert complex_op.__op__.params == {}
76
- assert complex_op.__op__.inputs == {
77
- "color": ops.Input(name="color", type=Color, side=ops.Side.LEFT),
78
- "color_list": ops.Input(
79
- name="color_list", type=list[Color], side=ops.Side.LEFT
80
- ),
81
- "color_dict": ops.Input(
82
- name="color_dict", type=dict[str, Color], side=ops.Side.LEFT
83
- ),
84
- }
85
- assert complex_op.__op__.view_type == ops.ViewType.BASIC
86
- assert complex_op.__op__.outputs == {
87
- "result": ops.Output(name="result", type=None, side=ops.Side.RIGHT)
88
- }
89
- assert ops.CATALOGS["test"]["color_op"] == complex_op.__op__
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
lynxkite-core/tests/test_workspace.py DELETED
@@ -1,115 +0,0 @@
1
- import os
2
- import pytest
3
- import tempfile
4
- from lynxkite.core import workspace
5
- from lynxkite.core import ops
6
-
7
-
8
- def test_save_load():
9
- ws = workspace.Workspace(env="test")
10
- ws.nodes.append(
11
- workspace.WorkspaceNode(
12
- id="1",
13
- type="node_type",
14
- data=workspace.WorkspaceNodeData(title="Node 1", params={}),
15
- position=workspace.NodePosition(x=0, y=0),
16
- )
17
- )
18
- ws.nodes.append(
19
- workspace.WorkspaceNode(
20
- id="2",
21
- type="node_type",
22
- data=workspace.WorkspaceNodeData(title="Node 2", params={}),
23
- position=workspace.NodePosition(x=0, y=0),
24
- )
25
- )
26
- ws.edges.append(
27
- workspace.WorkspaceEdge(
28
- id="edge1",
29
- source="1",
30
- target="2",
31
- sourceHandle="",
32
- targetHandle="",
33
- )
34
- )
35
- path = os.path.join(tempfile.gettempdir(), "test_workspace.json")
36
-
37
- try:
38
- workspace.save(ws, path)
39
- assert os.path.exists(path)
40
- loaded_ws = workspace.load(path)
41
- assert loaded_ws.env == ws.env
42
- assert len(loaded_ws.nodes) == len(ws.nodes)
43
- assert len(loaded_ws.edges) == len(ws.edges)
44
- sorted_ws_nodes = sorted(ws.nodes, key=lambda x: x.id)
45
- sorted_loaded_ws_nodes = sorted(loaded_ws.nodes, key=lambda x: x.id)
46
- # We do manual assertion on each attribute because metadata is added at
47
- # loading time, which makes the objects different.
48
- for node, loaded_node in zip(sorted_ws_nodes, sorted_loaded_ws_nodes):
49
- assert node.id == loaded_node.id
50
- assert node.type == loaded_node.type
51
- assert node.data.title == loaded_node.data.title
52
- assert node.data.params == loaded_node.data.params
53
- assert node.position.x == loaded_node.position.x
54
- assert node.position.y == loaded_node.position.y
55
- sorted_ws_edges = sorted(ws.edges, key=lambda x: x.id)
56
- sorted_loaded_ws_edges = sorted(loaded_ws.edges, key=lambda x: x.id)
57
- for edge, loaded_edge in zip(sorted_ws_edges, sorted_loaded_ws_edges):
58
- assert edge.id == loaded_edge.id
59
- assert edge.source == loaded_edge.source
60
- assert edge.target == loaded_edge.target
61
- assert edge.sourceHandle == loaded_edge.sourceHandle
62
- assert edge.targetHandle == loaded_edge.targetHandle
63
- finally:
64
- os.remove(path)
65
-
66
-
67
- @pytest.fixture(scope="session", autouse=True)
68
- def populate_ops_catalog():
69
- ops.register_passive_op(
70
- env="test",
71
- name="Test Operation",
72
- inputs=[],
73
- params=[
74
- ops.Parameter(name="param_int", default=1),
75
- ops.Parameter(name="param_str", default="test"),
76
- ],
77
- )
78
-
79
-
80
- def test_update_metadata():
81
- ws = workspace.Workspace(env="test")
82
- ws.nodes.append(
83
- workspace.WorkspaceNode(
84
- id="1",
85
- type="basic",
86
- data=workspace.WorkspaceNodeData(
87
- title="Test Operation", params={"param_int": 1}
88
- ),
89
- position=workspace.NodePosition(x=0, y=0),
90
- )
91
- )
92
- ws.nodes.append(
93
- workspace.WorkspaceNode(
94
- id="2",
95
- type="basic",
96
- data=workspace.WorkspaceNodeData(title="Unknown Operation", params={}),
97
- position=workspace.NodePosition(x=0, y=0),
98
- )
99
- )
100
- updated_ws = workspace._update_metadata(ws)
101
- assert updated_ws.nodes[0].data.meta.name == "Test Operation"
102
- assert updated_ws.nodes[0].data.error is None
103
- assert updated_ws.nodes[0].data.params == {"param_int": 1}
104
- assert updated_ws.nodes[0].data.meta.params == {
105
- "param_int": ops.Parameter(name="param_int", default=1),
106
- "param_str": ops.Parameter(name="param_str", default="test"),
107
- }
108
- assert not hasattr(updated_ws.nodes[1].data, "meta")
109
- assert updated_ws.nodes[1].data.error == "Unknown operation."
110
-
111
-
112
- def test_update_metadata_with_empty_workspace():
113
- ws = workspace.Workspace(env="test")
114
- updated_ws = workspace._update_metadata(ws)
115
- assert len(updated_ws.nodes) == 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
lynxkite-graph-analytics/src/lynxkite_plugins/graph_analytics/lynxkite_ops.py CHANGED
@@ -1,7 +1,7 @@
1
  """Graph analytics operations. To be split into separate files when we have more."""
2
 
3
  import os
4
- from lynxkite.core import ops, workspace
5
  from collections import deque
6
  import dataclasses
7
  import functools
@@ -34,7 +34,6 @@ class RelationDefinition:
34
  target_key: str # The column in the target table that contains the node ID.
35
 
36
 
37
- # TODO: Convert this to Pydantic
38
  @dataclasses.dataclass
39
  class Bundle:
40
  """A collection of DataFrames and other data.
@@ -116,50 +115,32 @@ def disambiguate_edges(ws):
116
 
117
 
118
  @ops.register_executor(ENV)
119
- async def execute(ws: workspace.Workspace):
120
- catalog: dict[str, ops.Op] = ops.CATALOGS[ENV]
121
  disambiguate_edges(ws)
122
- computed_outputs = {}
123
  failed = 0
124
- while len(computed_outputs) + failed < len(ws.nodes):
125
  for node in ws.nodes:
126
- if node.id in computed_outputs:
127
  continue
128
  # TODO: Take the input/output handles into account.
129
- operation_inputs = [
130
- edge.source for edge in ws.edges if edge.target == node.id
131
- ]
132
- if all(input in computed_outputs for input in operation_inputs):
133
- # All inputs for this node are ready, we can compute the output.
134
- operation_inputs = [
135
- computed_outputs[input] for input in operation_inputs
136
- ]
137
  data = node.data
 
138
  params = {**data.params}
 
139
  try:
140
- op = catalog[data.title]
141
- # Convert inputs types to match operation signature.
142
- for i, (input_value, input_signature) in enumerate(
143
- zip(operation_inputs, op.inputs.values())
144
- ):
145
- if input_signature.type == nx.Graph and isinstance(
146
- input_value, Bundle
147
- ):
148
- operation_inputs[i] = input_value.to_nx()
149
- elif input_signature.type == Bundle and isinstance(
150
- input_value, nx.Graph
151
- ):
152
- operation_inputs[i] = Bundle.from_nx(input_value)
153
- elif input_signature.type == Bundle and isinstance(
154
- input_value, pd.DataFrame
155
- ):
156
- operation_inputs[i] = Bundle.from_df(input_value)
157
- output = op(*operation_inputs, **params)
158
- except KeyError:
159
- traceback.print_exc()
160
- data.error = "Operation not found in catalog"
161
- failed += 1
162
- continue
163
  except Exception as e:
164
  traceback.print_exc()
165
  data.error = str(e)
@@ -167,16 +148,13 @@ async def execute(ws: workspace.Workspace):
167
  continue
168
  if len(op.inputs) == 1 and op.inputs.get("multi") == "*":
169
  # It's a flexible input. Create n+1 handles.
170
- # TODO: How is this used? Why we define the inputs in the WorkspaceNodeData?
171
- data.inputs = {
172
- f"input{i}": None for i in range(len(operation_inputs) + 1)
173
- }
174
  data.error = None
175
- computed_outputs[node.id] = output
176
  if (
177
- op.view_type == ops.ViewType.VISUALIZATION
178
- or op.view_type == ops.ViewType.TABLE_VIEW
179
- or op.view_type == ops.ViewType.IMAGE
180
  ):
181
  data.display = output
182
 
@@ -210,7 +188,6 @@ def create_scale_free_graph(*, nodes: int = 10):
210
  @op("Compute PageRank")
211
  @nx_node_attribute_func("pagerank")
212
  def compute_pagerank(graph: nx.Graph, *, damping=0.85, iterations=100):
213
- # TODO: This requires scipy to be installed.
214
  return nx.pagerank(graph, alpha=damping, max_iter=iterations)
215
 
216
 
@@ -304,7 +281,7 @@ def _map_color(value):
304
  ]
305
 
306
 
307
- @op("Visualize graph", view=ops.ViewType.VISUALIZATION)
308
  def visualize_graph(graph: Bundle, *, color_nodes_by: ops.NodeAttribute = None):
309
  nodes = graph.dfs["nodes"].copy()
310
  if color_nodes_by:
@@ -358,7 +335,7 @@ def collect(df: pd.DataFrame):
358
  return df.values.tolist()
359
 
360
 
361
- @op("View tables", view=ops.ViewType.TABLE_VIEW)
362
  def view_tables(bundle: Bundle):
363
  v = {
364
  "dataframes": {
 
1
  """Graph analytics operations. To be split into separate files when we have more."""
2
 
3
  import os
4
+ from lynxkite.core import ops
5
  from collections import deque
6
  import dataclasses
7
  import functools
 
34
  target_key: str # The column in the target table that contains the node ID.
35
 
36
 
 
37
  @dataclasses.dataclass
38
  class Bundle:
39
  """A collection of DataFrames and other data.
 
115
 
116
 
117
  @ops.register_executor(ENV)
118
+ async def execute(ws):
119
+ catalog = ops.CATALOGS[ENV]
120
  disambiguate_edges(ws)
121
+ outputs = {}
122
  failed = 0
123
+ while len(outputs) + failed < len(ws.nodes):
124
  for node in ws.nodes:
125
+ if node.id in outputs:
126
  continue
127
  # TODO: Take the input/output handles into account.
128
+ inputs = [edge.source for edge in ws.edges if edge.target == node.id]
129
+ if all(input in outputs for input in inputs):
130
+ inputs = [outputs[input] for input in inputs]
 
 
 
 
 
131
  data = node.data
132
+ op = catalog[data.title]
133
  params = {**data.params}
134
+ # Convert inputs.
135
  try:
136
+ for i, (x, p) in enumerate(zip(inputs, op.inputs.values())):
137
+ if p.type == nx.Graph and isinstance(x, Bundle):
138
+ inputs[i] = x.to_nx()
139
+ elif p.type == Bundle and isinstance(x, nx.Graph):
140
+ inputs[i] = Bundle.from_nx(x)
141
+ elif p.type == Bundle and isinstance(x, pd.DataFrame):
142
+ inputs[i] = Bundle.from_df(x)
143
+ output = op(*inputs, **params)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
  except Exception as e:
145
  traceback.print_exc()
146
  data.error = str(e)
 
148
  continue
149
  if len(op.inputs) == 1 and op.inputs.get("multi") == "*":
150
  # It's a flexible input. Create n+1 handles.
151
+ data.inputs = {f"input{i}": None for i in range(len(inputs) + 1)}
 
 
 
152
  data.error = None
153
+ outputs[node.id] = output
154
  if (
155
+ op.type == "visualization"
156
+ or op.type == "table_view"
157
+ or op.type == "image"
158
  ):
159
  data.display = output
160
 
 
188
  @op("Compute PageRank")
189
  @nx_node_attribute_func("pagerank")
190
  def compute_pagerank(graph: nx.Graph, *, damping=0.85, iterations=100):
 
191
  return nx.pagerank(graph, alpha=damping, max_iter=iterations)
192
 
193
 
 
281
  ]
282
 
283
 
284
+ @op("Visualize graph", view="visualization")
285
  def visualize_graph(graph: Bundle, *, color_nodes_by: ops.NodeAttribute = None):
286
  nodes = graph.dfs["nodes"].copy()
287
  if color_nodes_by:
 
335
  return df.values.tolist()
336
 
337
 
338
+ @op("View tables", view="table_view")
339
  def view_tables(bundle: Bundle):
340
  v = {
341
  "dataframes": {
lynxkite-graph-analytics/src/lynxkite_plugins/graph_analytics/networkx_ops.py CHANGED
@@ -54,7 +54,7 @@ def register_networkx(env: str):
54
  params=params,
55
  inputs=inputs,
56
  outputs={"output": ops.Output(name="output", type=nx.Graph)},
57
- view_type=ops.ViewType.BASIC,
58
  )
59
  cat[name] = op
60
 
 
54
  params=params,
55
  inputs=inputs,
56
  outputs={"output": ops.Output(name="output", type=nx.Graph)},
57
+ type="basic",
58
  )
59
  cat[name] = op
60
 
lynxkite-graph-analytics/src/lynxkite_plugins/graph_analytics/pytorch_model_ops.py CHANGED
@@ -13,10 +13,10 @@ def reg(name, inputs=[], outputs=None, params=[]):
13
  ENV,
14
  name,
15
  inputs=[
16
- ops.Input(name=name, side=ops.Side.BOTTOM, type="tensor") for name in inputs
17
  ],
18
  outputs=[
19
- ops.Output(name=name, side=ops.Side.TOP, type="tensor") for name in outputs
20
  ],
21
  params=params,
22
  )
@@ -64,4 +64,6 @@ reg(
64
  ),
65
  P.basic("lr", 0.001),
66
  ],
67
- )
 
 
 
13
  ENV,
14
  name,
15
  inputs=[
16
+ ops.Input(name=name, position="bottom", type="tensor") for name in inputs
17
  ],
18
  outputs=[
19
+ ops.Output(name=name, position="top", type="tensor") for name in outputs
20
  ],
21
  params=params,
22
  )
 
64
  ),
65
  P.basic("lr", 0.001),
66
  ],
67
+ )
68
+
69
+ ops.register_area(ENV, "Repeat", params=[ops.Parameter.basic("times", 1, int)])
lynxkite-graph-analytics/tests/test_lynxkite_ops.py DELETED
@@ -1,100 +0,0 @@
1
- import sys
2
-
3
- # Add the project root to sys.path
4
- sys.path.insert(0, "/home/chema/work/lynxkite-2024/lynxkite-graph-analytics")
5
- import pandas as pd
6
- import pytest
7
- import networkx as nx
8
-
9
- from lynxkite.core import workspace
10
-
11
- # from lynxkite_plugins.graph_analytics.lynxkite_ops import execute
12
- from src.lynxkite_plugins.graph_analytics.lynxkite_ops import Bundle, execute, op
13
-
14
-
15
- async def test_execute_operation_not_in_catalog():
16
- ws = workspace.Workspace(env="test")
17
- ws.nodes.append(
18
- workspace.WorkspaceNode(
19
- id="1",
20
- type="node_type",
21
- data=workspace.WorkspaceNodeData(title="Non existing op", params={}),
22
- position=workspace.NodePosition(x=0, y=0),
23
- )
24
- )
25
- await execute(ws)
26
- assert ws.nodes[0].data.error == "Operation not found in catalog"
27
-
28
-
29
- async def test_execute_operation_inputs_correct_cast():
30
- # Test that the automatic casting of operation inputs works correctly.
31
-
32
- @op("Create Bundle")
33
- def create_bundle() -> Bundle:
34
- df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]})
35
- return Bundle(dfs={"edges": df})
36
-
37
- @op("Bundle to Graph")
38
- def bundle_to_graph(graph: nx.Graph) -> nx.Graph:
39
- return graph
40
-
41
- @op("Graph to Bundle")
42
- def graph_to_bundle(bundle: Bundle) -> pd.DataFrame:
43
- return list(bundle.dfs.values())[0]
44
-
45
- @op("Dataframe to Bundle")
46
- def dataframe_to_bundle(bundle: Bundle) -> Bundle:
47
- return bundle
48
-
49
- ws = workspace.Workspace(env="test")
50
- ws.nodes.append(
51
- workspace.WorkspaceNode(
52
- id="1",
53
- type="node_type",
54
- data=workspace.WorkspaceNodeData(title="Create Bundle", params={}),
55
- position=workspace.NodePosition(x=0, y=0),
56
- )
57
- )
58
- ws.nodes.append(
59
- workspace.WorkspaceNode(
60
- id="2",
61
- type="node_type",
62
- data=workspace.WorkspaceNodeData(title="Bundle to Graph", params={}),
63
- position=workspace.NodePosition(x=100, y=0),
64
- )
65
- )
66
- ws.nodes.append(
67
- workspace.WorkspaceNode(
68
- id="3",
69
- type="node_type",
70
- data=workspace.WorkspaceNodeData(title="Graph to Bundle", params={}),
71
- position=workspace.NodePosition(x=200, y=0),
72
- )
73
- )
74
- ws.nodes.append(
75
- workspace.WorkspaceNode(
76
- id="4",
77
- type="node_type",
78
- data=workspace.WorkspaceNodeData(title="Dataframe to Bundle", params={}),
79
- position=workspace.NodePosition(x=300, y=0),
80
- )
81
- )
82
- ws.edges = [
83
- workspace.WorkspaceEdge(
84
- id="1", source="1", target="2", sourceHandle="1", targetHandle="2"
85
- ),
86
- workspace.WorkspaceEdge(
87
- id="2", source="2", target="3", sourceHandle="2", targetHandle="3"
88
- ),
89
- workspace.WorkspaceEdge(
90
- id="3", source="3", target="4", sourceHandle="3", targetHandle="4"
91
- ),
92
- ]
93
-
94
- await execute(ws)
95
-
96
- assert all([node.data.error is None for node in ws.nodes])
97
-
98
-
99
- if __name__ == "__main__":
100
- pytest.main()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
lynxkite-lynxscribe/src/lynxkite_plugins/lynxscribe/llm_ops.py CHANGED
@@ -68,13 +68,13 @@ def split_document(input, *, delimiter: str = "\\n\\n"):
68
  return pd.DataFrame(chunks, columns=["text"])
69
 
70
 
71
- @ops.input_side(input=ops.Side.TOP)
72
  @op("Build document graph")
73
  def build_document_graph(input):
74
  return [{"source": i, "target": i + 1} for i in range(len(input) - 1)]
75
 
76
 
77
- @ops.input_side(nodes=ops.Side.TOP, edges=ops.Side.TOP)
78
  @op("Predict links")
79
  def predict_links(nodes, edges):
80
  """A placeholder for a real algorithm. For now just adds 2-hop neighbors."""
@@ -89,7 +89,7 @@ def predict_links(nodes, edges):
89
  return edges + new_edges
90
 
91
 
92
- @ops.input_side(nodes=ops.Side.TOP, edges=ops.Side.TOP)
93
  @op("Add neighbors")
94
  def add_neighbors(nodes, edges, item):
95
  nodes = pd.DataFrame(nodes)
@@ -133,7 +133,7 @@ def ask_llm(input, *, model: str, accepted_regex: str = None, max_tokens: int =
133
  return [{**input, "response": r} for r in results]
134
 
135
 
136
- @op("View", view=ops.ViewType.TABLE_VIEW)
137
  def view(input, *, _ctx: one_by_one.Context):
138
  v = _ctx.last_result
139
  if v:
@@ -152,8 +152,8 @@ def view(input, *, _ctx: one_by_one.Context):
152
  return v
153
 
154
 
155
- @ops.input_side(input=ops.Side.RIGHT)
156
- @ops.output_side(output=ops.Side.LEFT)
157
  @op("Loop")
158
  def loop(input, *, max_iterations: int = 3, _ctx: one_by_one.Context):
159
  """Data can flow back here max_iterations-1 times."""
@@ -174,7 +174,7 @@ class RagEngine(enum.Enum):
174
  Custom = "Custom"
175
 
176
 
177
- @ops.input_side(db=ops.Side.TOP)
178
  @op("RAG")
179
  def rag(
180
  input,
 
68
  return pd.DataFrame(chunks, columns=["text"])
69
 
70
 
71
+ @ops.input_position(input="top")
72
  @op("Build document graph")
73
  def build_document_graph(input):
74
  return [{"source": i, "target": i + 1} for i in range(len(input) - 1)]
75
 
76
 
77
+ @ops.input_position(nodes="top", edges="top")
78
  @op("Predict links")
79
  def predict_links(nodes, edges):
80
  """A placeholder for a real algorithm. For now just adds 2-hop neighbors."""
 
89
  return edges + new_edges
90
 
91
 
92
+ @ops.input_position(nodes="top", edges="top")
93
  @op("Add neighbors")
94
  def add_neighbors(nodes, edges, item):
95
  nodes = pd.DataFrame(nodes)
 
133
  return [{**input, "response": r} for r in results]
134
 
135
 
136
+ @op("View", view="table_view")
137
  def view(input, *, _ctx: one_by_one.Context):
138
  v = _ctx.last_result
139
  if v:
 
152
  return v
153
 
154
 
155
+ @ops.input_position(input="right")
156
+ @ops.output_position(output="left")
157
  @op("Loop")
158
  def loop(input, *, max_iterations: int = 3, _ctx: one_by_one.Context):
159
  """Data can flow back here max_iterations-1 times."""
 
174
  Custom = "Custom"
175
 
176
 
177
+ @ops.input_position(db="top")
178
  @op("RAG")
179
  def rag(
180
  input,
lynxkite-lynxscribe/src/lynxkite_plugins/lynxscribe/lynxscribe_ops.py CHANGED
@@ -24,7 +24,7 @@ from lynxkite.core.executors import one_by_one
24
  ENV = "LynxScribe"
25
  one_by_one.register(ENV)
26
  op = ops.op_registration(ENV)
27
- output_on_top = ops.output_side(output=ops.Side.TOP)
28
 
29
 
30
  @output_on_top
@@ -42,7 +42,7 @@ def llm(*, name="openai"):
42
 
43
 
44
  @output_on_top
45
- @ops.input_side(llm=ops.Side.BOTTOM)
46
  @op("Text embedder")
47
  def text_embedder(llm, *, model="text-embedding-ada-002"):
48
  llm = llm[0]["llm"]
@@ -51,7 +51,7 @@ def text_embedder(llm, *, model="text-embedding-ada-002"):
51
 
52
 
53
  @output_on_top
54
- @ops.input_side(vector_store=ops.Side.BOTTOM, text_embedder=ops.Side.BOTTOM)
55
  @op("RAG graph")
56
  def rag_graph(vector_store, text_embedder):
57
  vector_store = vector_store[0]["vector_store"]
@@ -78,7 +78,7 @@ DEFAULT_NEGATIVE_ANSWER = "I'm sorry, but the data I've been trained on does not
78
 
79
 
80
  @output_on_top
81
- @ops.input_side(rag_graph=ops.Side.BOTTOM, scenario_selector=ops.Side.BOTTOM, llm=ops.Side.BOTTOM)
82
  @op("RAG chatbot")
83
  def rag_chatbot(
84
  rag_graph,
@@ -107,7 +107,7 @@ def rag_chatbot(
107
 
108
 
109
  @output_on_top
110
- @ops.input_side(processor=ops.Side.BOTTOM)
111
  @op("Chat processor")
112
  def chat_processor(processor, *, _ctx: one_by_one.Context):
113
  cfg = _ctx.last_result or {
@@ -152,7 +152,7 @@ def mask(*, name="", regex="", exceptions="", mask_pattern=""):
152
  }
153
 
154
 
155
- @ops.input_side(chat_api=ops.Side.BOTTOM)
156
  @op("Test Chat API")
157
  async def test_chat_api(message, chat_api, *, show_details=False):
158
  chat_api = chat_api[0]["chat_api"]
@@ -173,7 +173,7 @@ def input_chat(*, chat: str):
173
 
174
 
175
  @output_on_top
176
- @ops.input_side(chatbot=ops.Side.BOTTOM, chat_processor=ops.Side.BOTTOM, knowledge_base=ops.Side.BOTTOM)
177
  @op("Chat API")
178
  def chat_api(chatbot, chat_processor, knowledge_base, *, model="gpt-4o-mini"):
179
  chatbot = chatbot[0]["chatbot"]
@@ -205,7 +205,7 @@ def knowledge_base(
205
  }
206
 
207
 
208
- @op("View", view=ops.ViewType.TABLE_VIEW)
209
  def view(input):
210
  columns = [str(c) for c in input.keys() if not str(c).startswith("_")]
211
  v = {
 
24
  ENV = "LynxScribe"
25
  one_by_one.register(ENV)
26
  op = ops.op_registration(ENV)
27
+ output_on_top = ops.output_position(output="top")
28
 
29
 
30
  @output_on_top
 
42
 
43
 
44
  @output_on_top
45
+ @ops.input_position(llm="bottom")
46
  @op("Text embedder")
47
  def text_embedder(llm, *, model="text-embedding-ada-002"):
48
  llm = llm[0]["llm"]
 
51
 
52
 
53
  @output_on_top
54
+ @ops.input_position(vector_store="bottom", text_embedder="bottom")
55
  @op("RAG graph")
56
  def rag_graph(vector_store, text_embedder):
57
  vector_store = vector_store[0]["vector_store"]
 
78
 
79
 
80
  @output_on_top
81
+ @ops.input_position(rag_graph="bottom", scenario_selector="bottom", llm="bottom")
82
  @op("RAG chatbot")
83
  def rag_chatbot(
84
  rag_graph,
 
107
 
108
 
109
  @output_on_top
110
+ @ops.input_position(processor="bottom")
111
  @op("Chat processor")
112
  def chat_processor(processor, *, _ctx: one_by_one.Context):
113
  cfg = _ctx.last_result or {
 
152
  }
153
 
154
 
155
+ @ops.input_position(chat_api="bottom")
156
  @op("Test Chat API")
157
  async def test_chat_api(message, chat_api, *, show_details=False):
158
  chat_api = chat_api[0]["chat_api"]
 
173
 
174
 
175
  @output_on_top
176
+ @ops.input_position(chatbot="bottom", chat_processor="bottom", knowledge_base="bottom")
177
  @op("Chat API")
178
  def chat_api(chatbot, chat_processor, knowledge_base, *, model="gpt-4o-mini"):
179
  chatbot = chatbot[0]["chatbot"]
 
205
  }
206
 
207
 
208
+ @op("View", view="table_view")
209
  def view(input):
210
  columns = [str(c) for c in input.keys() if not str(c).startswith("_")]
211
  v = {
lynxkite-lynxscribe/tests/test_llm_ops.py CHANGED
@@ -8,7 +8,7 @@ def make_node(id, op, type="basic", **params):
8
  return workspace.WorkspaceNode(
9
  id=id,
10
  type=type,
11
- position=workspace.NodePosition(x=0, y=0),
12
  data=workspace.WorkspaceNodeData(title=op, params=params),
13
  )
14
 
@@ -43,7 +43,7 @@ class LLMOpsTest(unittest.IsolatedAsyncioTestCase):
43
  filename="/Users/danieldarabos/Downloads/aimo-train.csv",
44
  key="problem",
45
  ),
46
- make_node("1", "View", type=ops.ViewType.TABLE_VIEW),
47
  ],
48
  edges=[make_edge("0", "1")],
49
  )
 
8
  return workspace.WorkspaceNode(
9
  id=id,
10
  type=type,
11
+ position=workspace.Position(x=0, y=0),
12
  data=workspace.WorkspaceNodeData(title=op, params=params),
13
  )
14
 
 
43
  filename="/Users/danieldarabos/Downloads/aimo-train.csv",
44
  key="problem",
45
  ),
46
+ make_node("1", "View", type="table_view"),
47
  ],
48
  edges=[make_edge("0", "1")],
49
  )
lynxkite-pillow-example/src/lynxkite_plugins/pillow_example/__init__.py CHANGED
@@ -56,7 +56,7 @@ def to_grayscale(image: Image):
56
  return image.convert("L")
57
 
58
 
59
- @op("View image", view=ops.ViewType.IMAGE)
60
  def view_image(image: Image):
61
  buffered = io.BytesIO()
62
  image.save(buffered, format="JPEG")
 
56
  return image.convert("L")
57
 
58
 
59
+ @op("View image", view="image")
60
  def view_image(image: Image):
61
  buffered = io.BytesIO()
62
  image.save(buffered, format="JPEG")