darabos commited on
Commit
560385e
·
unverified ·
2 Parent(s): e9bf53e 0e76f97

Merge pull request #79 from biggraph/darabos-progress

Browse files
.github/workflows/test.yaml CHANGED
@@ -56,6 +56,11 @@ jobs:
56
  cd lynxkite-graph-analytics
57
  pytest
58
 
 
 
 
 
 
59
  - name: Try building the documentation
60
  run: |
61
  uv pip install mkdocs-material mkdocstrings[python]
 
56
  cd lynxkite-graph-analytics
57
  pytest
58
 
59
+ - name: Run LynxScribe tests
60
+ run: |
61
+ cd lynxkite-lynxscribe
62
+ pytest
63
+
64
  - name: Try building the documentation
65
  run: |
66
  uv pip install mkdocs-material mkdocstrings[python]
lynxkite-app/src/lynxkite_app/crdt.py CHANGED
@@ -86,6 +86,7 @@ def clean_input(ws_pyd):
86
  for node in ws_pyd.nodes:
87
  node.data.display = None
88
  node.data.error = None
 
89
  node.position.x = 0
90
  node.position.y = 0
91
  if node.model_extra:
@@ -175,7 +176,6 @@ delayed_executions = {}
175
  async def workspace_changed(name: str, changes: pycrdt.MapEvent, ws_crdt: pycrdt.Map):
176
  """Callback to react to changes in the workspace.
177
 
178
-
179
  Args:
180
  name: Name of the workspace.
181
  changes: Changes performed to the workspace.
@@ -197,6 +197,7 @@ async def workspace_changed(name: str, changes: pycrdt.MapEvent, ws_crdt: pycrdt
197
  getattr(change, "keys", {}).get("__execution_delay", {}).get("newValue", 0)
198
  for change in changes
199
  )
 
200
  if delay:
201
  task = asyncio.create_task(execute(name, ws_crdt, ws_pyd, delay))
202
  delayed_executions[name] = task
@@ -224,17 +225,15 @@ async def execute(
224
  assert path.is_relative_to(config.DATA_PATH), "Provided workspace path is invalid"
225
  # Save user changes before executing, in case the execution fails.
226
  workspace.save(ws_pyd, path)
227
- await workspace.execute(ws_pyd)
228
- workspace.save(ws_pyd, path)
229
- # Execution happened on the Python object, we need to replicate
230
- # the results to the CRDT object.
231
  with ws_crdt.doc.transaction():
232
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
233
  if "data" not in nc:
234
  nc["data"] = pycrdt.Map()
235
- # Display is added as a non collaborative field.
236
- nc["data"]["display"] = np.data.display
237
- nc["data"]["error"] = np.data.error
 
 
238
 
239
 
240
  @contextlib.asynccontextmanager
 
86
  for node in ws_pyd.nodes:
87
  node.data.display = None
88
  node.data.error = None
89
+ node.data.status = workspace.NodeStatus.done
90
  node.position.x = 0
91
  node.position.y = 0
92
  if node.model_extra:
 
176
  async def workspace_changed(name: str, changes: pycrdt.MapEvent, ws_crdt: pycrdt.Map):
177
  """Callback to react to changes in the workspace.
178
 
 
179
  Args:
180
  name: Name of the workspace.
181
  changes: Changes performed to the workspace.
 
197
  getattr(change, "keys", {}).get("__execution_delay", {}).get("newValue", 0)
198
  for change in changes
199
  )
200
+ print(f"Running {name} in {ws_pyd.env}...")
201
  if delay:
202
  task = asyncio.create_task(execute(name, ws_crdt, ws_pyd, delay))
203
  delayed_executions[name] = task
 
225
  assert path.is_relative_to(config.DATA_PATH), "Provided workspace path is invalid"
226
  # Save user changes before executing, in case the execution fails.
227
  workspace.save(ws_pyd, path)
 
 
 
 
228
  with ws_crdt.doc.transaction():
229
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
230
  if "data" not in nc:
231
  nc["data"] = pycrdt.Map()
232
+ nc["data"]["status"] = "planned"
233
+ # Nodes get a reference to their CRDT maps, so they can update them as the results come in.
234
+ np._crdt = nc
235
+ await workspace.execute(ws_pyd)
236
+ workspace.save(ws_pyd, path)
237
 
238
 
239
  @contextlib.asynccontextmanager
lynxkite-app/src/lynxkite_app/main.py CHANGED
@@ -1,6 +1,5 @@
1
  """The FastAPI server for serving the LynxKite application."""
2
 
3
- import os
4
  import shutil
5
  import pydantic
6
  import fastapi
@@ -13,11 +12,6 @@ from lynxkite.core import ops
13
  from lynxkite.core import workspace
14
  from . import crdt, config
15
 
16
- if os.environ.get("NX_CUGRAPH_AUTOCONFIG", "").strip().lower() == "true":
17
- import cudf.pandas
18
-
19
- cudf.pandas.install()
20
-
21
 
22
  def detect_plugins():
23
  plugins = {}
 
1
  """The FastAPI server for serving the LynxKite application."""
2
 
 
3
  import shutil
4
  import pydantic
5
  import fastapi
 
12
  from lynxkite.core import workspace
13
  from . import crdt, config
14
 
 
 
 
 
 
15
 
16
  def detect_plugins():
17
  plugins = {}
lynxkite-app/web/src/apiTypes.ts CHANGED
@@ -41,6 +41,7 @@ export interface WorkspaceNodeData {
41
  };
42
  display?: unknown;
43
  error?: string | null;
 
44
  [k: string]: unknown;
45
  }
46
  export interface Position {
 
41
  };
42
  display?: unknown;
43
  error?: string | null;
44
+ in_progress?: boolean;
45
  [k: string]: unknown;
46
  }
47
  export interface Position {
lynxkite-app/web/src/index.css CHANGED
@@ -90,9 +90,33 @@ body {
90
  }
91
 
92
  .lynxkite-node .title {
93
- /* background: oklch(75% 0.2 55); */
94
  font-weight: bold;
95
  padding: 8px;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
  }
97
 
98
  .handle-name {
@@ -322,6 +346,38 @@ body {
322
  }
323
  }
324
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
325
  .react-flow__edge.selected path.react-flow__edge-path {
326
  outline: var(--xy-selection-border, var(--xy-selection-border-default));
327
  outline-offset: 10px;
@@ -379,13 +435,15 @@ body {
379
  display: flex;
380
  justify-content: space-between;
381
  padding: 8px 12px;
382
- border-bottom: 1px solid #ccc; /* Adds a separator between rows */
 
383
  }
384
 
385
  /* Alternating background colors for table-like effect */
386
  .graph-creation-view .df-head:nth-child(odd) {
387
  background-color: #f9f9f9;
388
  }
 
389
  .graph-creation-view .df-head:nth-child(even) {
390
  background-color: #e0e0e0;
391
  }
@@ -393,7 +451,8 @@ body {
393
  .graph-relation-attributes {
394
  display: flex;
395
  flex-direction: column;
396
- gap: 10px; /* Adds space between each label-input pair */
 
397
  width: 100%;
398
  }
399
 
@@ -402,7 +461,8 @@ body {
402
  font-weight: bold;
403
  display: block;
404
  margin-bottom: 2px;
405
- color: #666; /* Lighter text for labels */
 
406
  }
407
 
408
  .graph-relation-attributes input {
@@ -415,7 +475,8 @@ body {
415
  }
416
 
417
  .graph-relation-attributes input:focus {
418
- border-color: #007bff; /* Highlight input on focus */
 
419
  }
420
 
421
  .add-relationship-button {
 
90
  }
91
 
92
  .lynxkite-node .title {
 
93
  font-weight: bold;
94
  padding: 8px;
95
+ background-image: linear-gradient(
96
+ to right,
97
+ var(--status-color-1),
98
+ var(--status-color-2),
99
+ var(--status-color-3)
100
+ );
101
+ background-size: 180% 180%;
102
+ --status-color-1: oklch(75% 0.2 55);
103
+ --status-color-2: oklch(75% 0.2 55);
104
+ --status-color-3: oklch(75% 0.2 55);
105
+ transition: --status-color-1 0.3s, --status-color-2 0.3s, --status-color-3
106
+ 0.3s;
107
+ }
108
+
109
+ .lynxkite-node .title.active {
110
+ --status-color-1: oklch(75% 0.2 55);
111
+ --status-color-2: oklch(90% 0.2 55);
112
+ --status-color-3: oklch(75% 0.1 55);
113
+ /* animation: active-node-gradient-animation 2s ease-in-out infinite; */
114
+ }
115
+
116
+ .lynxkite-node .title.planned {
117
+ --status-color-1: oklch(75% 0.1 55);
118
+ --status-color-2: oklch(75% 0.1 55);
119
+ --status-color-3: oklch(75% 0.1 55);
120
  }
121
 
122
  .handle-name {
 
346
  }
347
  }
348
 
349
+ @keyframes active-node-gradient-animation {
350
+ 0% {
351
+ background-position-x: 100%;
352
+ }
353
+
354
+ 50% {
355
+ background-position-x: 0%;
356
+ }
357
+
358
+ 100% {
359
+ background-position-x: 100%;
360
+ }
361
+ }
362
+
363
+ @property --status-color-1 {
364
+ syntax: "<color>";
365
+ initial-value: red;
366
+ inherits: false;
367
+ }
368
+
369
+ @property --status-color-2 {
370
+ syntax: "<color>";
371
+ initial-value: red;
372
+ inherits: false;
373
+ }
374
+
375
+ @property --status-color-3 {
376
+ syntax: "<color>";
377
+ initial-value: red;
378
+ inherits: false;
379
+ }
380
+
381
  .react-flow__edge.selected path.react-flow__edge-path {
382
  outline: var(--xy-selection-border, var(--xy-selection-border-default));
383
  outline-offset: 10px;
 
435
  display: flex;
436
  justify-content: space-between;
437
  padding: 8px 12px;
438
+ /* Adds a separator between rows */
439
+ border-bottom: 1px solid #ccc;
440
  }
441
 
442
  /* Alternating background colors for table-like effect */
443
  .graph-creation-view .df-head:nth-child(odd) {
444
  background-color: #f9f9f9;
445
  }
446
+
447
  .graph-creation-view .df-head:nth-child(even) {
448
  background-color: #e0e0e0;
449
  }
 
451
  .graph-relation-attributes {
452
  display: flex;
453
  flex-direction: column;
454
+ /* Adds space between each label-input pair */
455
+ gap: 10px;
456
  width: 100%;
457
  }
458
 
 
461
  font-weight: bold;
462
  display: block;
463
  margin-bottom: 2px;
464
+ /* Lighter text for labels */
465
+ color: #666;
466
  }
467
 
468
  .graph-relation-attributes input {
 
475
  }
476
 
477
  .graph-relation-attributes input:focus {
478
+ /* Highlight input on focus */
479
+ border-color: #007bff;
480
  }
481
 
482
  .add-relationship-button {
lynxkite-app/web/src/workspace/nodes/LynxKiteNode.tsx CHANGED
@@ -71,7 +71,10 @@ export default function LynxKiteNode(props: LynxKiteNodeProps) {
71
  }}
72
  >
73
  <div className="lynxkite-node" style={props.nodeStyle}>
74
- <div className="title bg-primary" onClick={titleClicked}>
 
 
 
75
  {data.title}
76
  {data.error && <span className="title-icon">⚠️</span>}
77
  {expanded || <span className="title-icon">⋯</span>}
 
71
  }}
72
  >
73
  <div className="lynxkite-node" style={props.nodeStyle}>
74
+ <div
75
+ className={`title bg-primary ${data.status}`}
76
+ onClick={titleClicked}
77
+ >
78
  {data.title}
79
  {data.error && <span className="title-icon">⚠️</span>}
80
  {expanded || <span className="title-icon">⋯</span>}
lynxkite-core/src/lynxkite/core/executors/one_by_one.py CHANGED
@@ -104,11 +104,11 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
104
  tasks = {}
105
  NO_INPUT = object() # Marker for initial tasks.
106
  for node in ws.nodes:
107
- node.data.error = None
108
  op = catalog.get(node.data.title)
109
  if op is None:
110
- node.data.error = f'Operation "{node.data.title}" not found.'
111
  continue
 
112
  # Start tasks for nodes that have no non-batch inputs.
113
  if all([i.position in "top or bottom" for i in op.inputs.values()]):
114
  tasks[node.id] = [NO_INPUT]
@@ -123,12 +123,12 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
123
  next_stage.setdefault(n, []).extend(ts)
124
  continue
125
  node = nodes[n]
126
- data = node.data
127
- op = catalog[data.title]
128
- params = {**data.params}
129
  if has_ctx(op):
130
  params["_ctx"] = contexts[node.id]
131
  results = []
 
132
  for task in ts:
133
  try:
134
  inputs = []
@@ -150,7 +150,7 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
150
  output = await await_if_needed(result.output)
151
  except Exception as e:
152
  traceback.print_exc()
153
- data.error = str(e)
154
  break
155
  contexts[node.id].last_result = output
156
  # Returned lists and DataFrames are considered multiple tasks.
@@ -161,7 +161,7 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
161
  results.extend(output)
162
  else: # Finished all tasks without errors.
163
  if result.display:
164
- data.display = await await_if_needed(result.display)
165
  for edge in edges[node.id]:
166
  t = nodes[edge.target]
167
  op = catalog[t.data.title]
@@ -172,5 +172,6 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
172
  ).extend(results)
173
  else:
174
  tasks.setdefault(edge.target, []).extend(results)
 
175
  tasks = next_stage
176
  return contexts
 
104
  tasks = {}
105
  NO_INPUT = object() # Marker for initial tasks.
106
  for node in ws.nodes:
 
107
  op = catalog.get(node.data.title)
108
  if op is None:
109
+ node.publish_error(f'Operation "{node.data.title}" not found.')
110
  continue
111
+ node.publish_error(None)
112
  # Start tasks for nodes that have no non-batch inputs.
113
  if all([i.position in "top or bottom" for i in op.inputs.values()]):
114
  tasks[node.id] = [NO_INPUT]
 
123
  next_stage.setdefault(n, []).extend(ts)
124
  continue
125
  node = nodes[n]
126
+ op = catalog[node.data.title]
127
+ params = {**node.data.params}
 
128
  if has_ctx(op):
129
  params["_ctx"] = contexts[node.id]
130
  results = []
131
+ node.publish_started()
132
  for task in ts:
133
  try:
134
  inputs = []
 
150
  output = await await_if_needed(result.output)
151
  except Exception as e:
152
  traceback.print_exc()
153
+ node.publish_error(e)
154
  break
155
  contexts[node.id].last_result = output
156
  # Returned lists and DataFrames are considered multiple tasks.
 
161
  results.extend(output)
162
  else: # Finished all tasks without errors.
163
  if result.display:
164
+ result.display = await await_if_needed(result.display)
165
  for edge in edges[node.id]:
166
  t = nodes[edge.target]
167
  op = catalog[t.data.title]
 
172
  ).extend(results)
173
  else:
174
  tasks.setdefault(edge.target, []).extend(results)
175
+ node.publish_result(result)
176
  tasks = next_stage
177
  return contexts
lynxkite-core/src/lynxkite/core/ops.py CHANGED
@@ -9,6 +9,9 @@ import typing
9
  from dataclasses import dataclass
10
  from typing_extensions import Annotated
11
 
 
 
 
12
  CATALOGS = {}
13
  EXECUTORS = {}
14
 
@@ -94,8 +97,9 @@ class Result:
94
  JSON-serializable.
95
  """
96
 
97
- output: typing.Any
98
  display: ReadOnlyJSON | None = None
 
99
 
100
 
101
  MULTI_INPUT = Input(name="multi", type="*")
@@ -232,9 +236,15 @@ def register_passive_op(env: str, name: str, inputs=[], outputs=["output"], para
232
 
233
 
234
  def register_executor(env: str):
235
- """Decorator for registering an executor."""
236
 
237
- def decorator(func):
 
 
 
 
 
 
238
  EXECUTORS[env] = func
239
  return func
240
 
 
9
  from dataclasses import dataclass
10
  from typing_extensions import Annotated
11
 
12
+ if typing.TYPE_CHECKING:
13
+ from . import workspace
14
+
15
  CATALOGS = {}
16
  EXECUTORS = {}
17
 
 
97
  JSON-serializable.
98
  """
99
 
100
+ output: typing.Any = None
101
  display: ReadOnlyJSON | None = None
102
+ error: str | None = None
103
 
104
 
105
  MULTI_INPUT = Input(name="multi", type="*")
 
236
 
237
 
238
  def register_executor(env: str):
239
+ """Decorator for registering an executor.
240
 
241
+ The executor is a function that takes a workspace and executes the operations in it.
242
+ When it starts executing an operation, it should call `node.publish_started()` to indicate
243
+ the status on the UI. When the execution is finished, it should call `node.publish_result()`.
244
+ This will update the UI with the result of the operation.
245
+ """
246
+
247
+ def decorator(func: typing.Callable[[workspace.Workspace], typing.Any]):
248
  EXECUTORS[env] = func
249
  return func
250
 
lynxkite-core/src/lynxkite/core/workspace.py CHANGED
@@ -3,7 +3,9 @@
3
  import json
4
  from typing import Optional
5
  import dataclasses
 
6
  import os
 
7
  import pydantic
8
  import tempfile
9
  from . import ops
@@ -20,11 +22,18 @@ class Position(BaseConfig):
20
  y: float
21
 
22
 
 
 
 
 
 
 
23
  class WorkspaceNodeData(BaseConfig):
24
  title: str
25
  params: dict
26
  display: Optional[object] = None
27
  error: Optional[str] = None
 
28
  # Also contains a "meta" field when going out.
29
  # This is ignored when coming back from the frontend.
30
 
@@ -36,6 +45,32 @@ class WorkspaceNode(BaseConfig):
36
  type: str
37
  data: WorkspaceNodeData
38
  position: Position
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
 
40
 
41
  class WorkspaceEdge(BaseConfig):
 
3
  import json
4
  from typing import Optional
5
  import dataclasses
6
+ import enum
7
  import os
8
+ import pycrdt
9
  import pydantic
10
  import tempfile
11
  from . import ops
 
22
  y: float
23
 
24
 
25
+ class NodeStatus(str, enum.Enum):
26
+ planned = "planned"
27
+ active = "active"
28
+ done = "done"
29
+
30
+
31
  class WorkspaceNodeData(BaseConfig):
32
  title: str
33
  params: dict
34
  display: Optional[object] = None
35
  error: Optional[str] = None
36
+ status: NodeStatus = NodeStatus.done
37
  # Also contains a "meta" field when going out.
38
  # This is ignored when coming back from the frontend.
39
 
 
45
  type: str
46
  data: WorkspaceNodeData
47
  position: Position
48
+ _crdt: pycrdt.Map
49
+
50
+ def publish_started(self):
51
+ """Notifies the frontend that work has started on this node."""
52
+ self.data.error = None
53
+ self.data.status = NodeStatus.active
54
+ if hasattr(self, "_crdt"):
55
+ with self._crdt.doc.transaction():
56
+ self._crdt["data"]["error"] = None
57
+ self._crdt["data"]["status"] = NodeStatus.active
58
+
59
+ def publish_result(self, result: ops.Result):
60
+ """Sends the result to the frontend. Call this in an executor when the result is available."""
61
+ self.data.display = result.display
62
+ self.data.error = result.error
63
+ self.data.status = NodeStatus.done
64
+ if hasattr(self, "_crdt"):
65
+ with self._crdt.doc.transaction():
66
+ self._crdt["data"]["display"] = result.display
67
+ self._crdt["data"]["error"] = result.error
68
+ self._crdt["data"]["status"] = NodeStatus.done
69
+
70
+ def publish_error(self, error: Exception | str | None):
71
+ """Can be called with None to clear the error state."""
72
+ result = ops.Result(error=str(error) if error else None)
73
+ self.publish_result(result)
74
 
75
 
76
  class WorkspaceEdge(BaseConfig):
lynxkite-graph-analytics/src/lynxkite_graph_analytics/__init__.py CHANGED
@@ -1,3 +1,16 @@
1
- from .lynxkite_ops import * # noqa (imported to trigger registration)
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  from . import networkx_ops # noqa (imported to trigger registration)
3
  from . import pytorch_model_ops # noqa (imported to trigger registration)
 
1
+ """Graph analytics environment for LynxKite. The core types and functions are imported here for easy access."""
2
+
3
+ import os
4
+ import pandas as pd
5
+
6
+ if os.environ.get("NX_CUGRAPH_AUTOCONFIG", "").strip().lower() == "true":
7
+ import cudf.pandas
8
+
9
+ cudf.pandas.install()
10
+
11
+ pd.options.mode.copy_on_write = True # Prepare for Pandas 3.0.
12
+
13
+ from .core import * # noqa (easier access for core classes)
14
+ from . import lynxkite_ops # noqa (imported to trigger registration)
15
  from . import networkx_ops # noqa (imported to trigger registration)
16
  from . import pytorch_model_ops # noqa (imported to trigger registration)
lynxkite-graph-analytics/src/lynxkite_graph_analytics/core.py ADDED
@@ -0,0 +1,198 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Graph analytics executor and data types."""
2
+
3
+ from lynxkite.core import ops
4
+ import dataclasses
5
+ import functools
6
+ import networkx as nx
7
+ import pandas as pd
8
+ import polars as pl
9
+ import traceback
10
+ import typing
11
+
12
+
13
+ ENV = "LynxKite Graph Analytics"
14
+
15
+
16
+ @dataclasses.dataclass
17
+ class RelationDefinition:
18
+ """Defines a set of edges."""
19
+
20
+ df: str # The DataFrame that contains the edges.
21
+ source_column: (
22
+ str # The column in the edge DataFrame that contains the source node ID.
23
+ )
24
+ target_column: (
25
+ str # The column in the edge DataFrame that contains the target node ID.
26
+ )
27
+ source_table: str # The DataFrame that contains the source nodes.
28
+ target_table: str # The DataFrame that contains the target nodes.
29
+ source_key: str # The column in the source table that contains the node ID.
30
+ target_key: str # The column in the target table that contains the node ID.
31
+ name: str | None = None # Descriptive name for the relation.
32
+
33
+
34
+ @dataclasses.dataclass
35
+ class Bundle:
36
+ """A collection of DataFrames and other data.
37
+
38
+ Can efficiently represent a knowledge graph (homogeneous or heterogeneous) or tabular data.
39
+ It can also carry other data, such as a trained model.
40
+ """
41
+
42
+ dfs: dict[str, pd.DataFrame] = dataclasses.field(default_factory=dict)
43
+ relations: list[RelationDefinition] = dataclasses.field(default_factory=list)
44
+ other: dict[str, typing.Any] = None
45
+
46
+ @classmethod
47
+ def from_nx(cls, graph: nx.Graph):
48
+ edges = nx.to_pandas_edgelist(graph)
49
+ d = dict(graph.nodes(data=True))
50
+ nodes = pd.DataFrame(d.values(), index=d.keys())
51
+ nodes["id"] = nodes.index
52
+ if "index" in nodes.columns:
53
+ nodes.drop(columns=["index"], inplace=True)
54
+ return cls(
55
+ dfs={"edges": edges, "nodes": nodes},
56
+ relations=[
57
+ RelationDefinition(
58
+ df="edges",
59
+ source_column="source",
60
+ target_column="target",
61
+ source_table="nodes",
62
+ target_table="nodes",
63
+ source_key="id",
64
+ target_key="id",
65
+ )
66
+ ],
67
+ )
68
+
69
+ @classmethod
70
+ def from_df(cls, df: pd.DataFrame):
71
+ return cls(dfs={"df": df})
72
+
73
+ def to_nx(self):
74
+ # TODO: Use relations.
75
+ graph = nx.DiGraph()
76
+ if "nodes" in self.dfs:
77
+ df = self.dfs["nodes"]
78
+ if df.index.name != "id":
79
+ df = df.set_index("id")
80
+ graph.add_nodes_from(df.to_dict("index").items())
81
+ if "edges" in self.dfs:
82
+ edges = self.dfs["edges"]
83
+ graph.add_edges_from(
84
+ [
85
+ (
86
+ e["source"],
87
+ e["target"],
88
+ {
89
+ k: e[k]
90
+ for k in edges.columns
91
+ if k not in ["source", "target"]
92
+ },
93
+ )
94
+ for e in edges.to_records()
95
+ ]
96
+ )
97
+ return graph
98
+
99
+ def copy(self):
100
+ """Returns a medium depth copy of the bundle. The Bundle is completely new, but the DataFrames and RelationDefinitions are shared."""
101
+ return Bundle(
102
+ dfs=dict(self.dfs),
103
+ relations=list(self.relations),
104
+ other=dict(self.other) if self.other else None,
105
+ )
106
+
107
+ def to_dict(self, limit: int = 100):
108
+ return {
109
+ "dataframes": {
110
+ name: {
111
+ "columns": [str(c) for c in df.columns],
112
+ "data": df_for_frontend(df, limit).values.tolist(),
113
+ }
114
+ for name, df in self.dfs.items()
115
+ },
116
+ "relations": [dataclasses.asdict(relation) for relation in self.relations],
117
+ "other": self.other,
118
+ }
119
+
120
+
121
+ def nx_node_attribute_func(name):
122
+ """Decorator for wrapping a function that adds a NetworkX node attribute."""
123
+
124
+ def decorator(func):
125
+ @functools.wraps(func)
126
+ def wrapper(graph: nx.Graph, **kwargs):
127
+ graph = graph.copy()
128
+ attr = func(graph, **kwargs)
129
+ nx.set_node_attributes(graph, attr, name)
130
+ return graph
131
+
132
+ return wrapper
133
+
134
+ return decorator
135
+
136
+
137
+ def disambiguate_edges(ws):
138
+ """If an input plug is connected to multiple edges, keep only the last edge."""
139
+ seen = set()
140
+ for edge in reversed(ws.edges):
141
+ if (edge.target, edge.targetHandle) in seen:
142
+ ws.edges.remove(edge)
143
+ seen.add((edge.target, edge.targetHandle))
144
+
145
+
146
+ @ops.register_executor(ENV)
147
+ async def execute(ws):
148
+ catalog: dict[str, ops.Op] = ops.CATALOGS[ws.env]
149
+ disambiguate_edges(ws)
150
+ outputs = {}
151
+ failed = 0
152
+ while len(outputs) + failed < len(ws.nodes):
153
+ for node in ws.nodes:
154
+ if node.id in outputs:
155
+ continue
156
+ # TODO: Take the input/output handles into account.
157
+ inputs = [edge.source for edge in ws.edges if edge.target == node.id]
158
+ if all(input in outputs for input in inputs):
159
+ # All inputs for this node are ready, we can compute the output.
160
+ inputs = [outputs[input] for input in inputs]
161
+ params = {**node.data.params}
162
+ op = catalog.get(node.data.title)
163
+ if not op:
164
+ node.publish_error("Operation not found in catalog")
165
+ failed += 1
166
+ continue
167
+ node.publish_started()
168
+ try:
169
+ # Convert inputs types to match operation signature.
170
+ for i, (x, p) in enumerate(zip(inputs, op.inputs.values())):
171
+ if p.type == nx.Graph and isinstance(x, Bundle):
172
+ inputs[i] = x.to_nx()
173
+ elif p.type == Bundle and isinstance(x, nx.Graph):
174
+ inputs[i] = Bundle.from_nx(x)
175
+ elif p.type == Bundle and isinstance(x, pd.DataFrame):
176
+ inputs[i] = Bundle.from_df(x)
177
+ result = op(*inputs, **params)
178
+ except Exception as e:
179
+ traceback.print_exc()
180
+ node.publish_error(e)
181
+ failed += 1
182
+ continue
183
+ outputs[node.id] = result.output
184
+ node.publish_result(result)
185
+
186
+
187
+ def df_for_frontend(df: pd.DataFrame, limit: int) -> pd.DataFrame:
188
+ """Returns a DataFrame with values that are safe to send to the frontend."""
189
+ df = df[:limit]
190
+ if isinstance(df, pl.LazyFrame):
191
+ df = df.collect()
192
+ if isinstance(df, pl.DataFrame):
193
+ df = df.to_pandas()
194
+ # Convert non-numeric columns to strings.
195
+ for c in df.columns:
196
+ if not pd.api.types.is_numeric_dtype(df[c]):
197
+ df[c] = df[c].astype(str)
198
+ return df
lynxkite-graph-analytics/src/lynxkite_graph_analytics/lynxkite_ops.py CHANGED
@@ -1,201 +1,21 @@
1
- """Graph analytics operations. To be split into separate files when we have more."""
2
 
3
  import os
4
  import fsspec
5
  from lynxkite.core import ops
6
  from collections import deque
7
- import dataclasses
8
- import functools
9
  import grandcypher
10
  import joblib
11
  import matplotlib
12
  import networkx as nx
13
  import pandas as pd
14
  import polars as pl
15
- import traceback
16
- import typing
17
  import json
18
 
19
 
20
  mem = joblib.Memory("../joblib-cache")
21
- ENV = "LynxKite Graph Analytics"
22
- op = ops.op_registration(ENV)
23
-
24
-
25
- @dataclasses.dataclass
26
- class RelationDefinition:
27
- """Defines a set of edges."""
28
-
29
- df: str # The DataFrame that contains the edges.
30
- source_column: (
31
- str # The column in the edge DataFrame that contains the source node ID.
32
- )
33
- target_column: (
34
- str # The column in the edge DataFrame that contains the target node ID.
35
- )
36
- source_table: str # The DataFrame that contains the source nodes.
37
- target_table: str # The DataFrame that contains the target nodes.
38
- source_key: str # The column in the source table that contains the node ID.
39
- target_key: str # The column in the target table that contains the node ID.
40
- name: str | None = None # Descriptive name for the relation.
41
-
42
-
43
- @dataclasses.dataclass
44
- class Bundle:
45
- """A collection of DataFrames and other data.
46
-
47
- Can efficiently represent a knowledge graph (homogeneous or heterogeneous) or tabular data.
48
- It can also carry other data, such as a trained model.
49
- """
50
-
51
- dfs: dict[str, pd.DataFrame] = dataclasses.field(default_factory=dict)
52
- relations: list[RelationDefinition] = dataclasses.field(default_factory=list)
53
- other: dict[str, typing.Any] = None
54
-
55
- @classmethod
56
- def from_nx(cls, graph: nx.Graph):
57
- edges = nx.to_pandas_edgelist(graph)
58
- d = dict(graph.nodes(data=True))
59
- nodes = pd.DataFrame(d.values(), index=d.keys())
60
- nodes["id"] = nodes.index
61
- if "index" in nodes.columns:
62
- nodes.drop(columns=["index"], inplace=True)
63
- return cls(
64
- dfs={"edges": edges, "nodes": nodes},
65
- relations=[
66
- RelationDefinition(
67
- df="edges",
68
- source_column="source",
69
- target_column="target",
70
- source_table="nodes",
71
- target_table="nodes",
72
- source_key="id",
73
- target_key="id",
74
- )
75
- ],
76
- )
77
-
78
- @classmethod
79
- def from_df(cls, df: pd.DataFrame):
80
- return cls(dfs={"df": df})
81
-
82
- def to_nx(self):
83
- # TODO: Use relations.
84
- graph = nx.DiGraph()
85
- if "nodes" in self.dfs:
86
- df = self.dfs["nodes"]
87
- if df.index.name != "id":
88
- df = df.set_index("id")
89
- graph.add_nodes_from(df.to_dict("index").items())
90
- if "edges" in self.dfs:
91
- edges = self.dfs["edges"]
92
- graph.add_edges_from(
93
- [
94
- (
95
- e["source"],
96
- e["target"],
97
- {
98
- k: e[k]
99
- for k in edges.columns
100
- if k not in ["source", "target"]
101
- },
102
- )
103
- for e in edges.to_records()
104
- ]
105
- )
106
- return graph
107
-
108
- def copy(self):
109
- """Returns a medium depth copy of the bundle. The Bundle is completely new, but the DataFrames and RelationDefinitions are shared."""
110
- return Bundle(
111
- dfs=dict(self.dfs),
112
- relations=list(self.relations),
113
- other=dict(self.other) if self.other else None,
114
- )
115
-
116
- def to_dict(self, limit: int = 100):
117
- return {
118
- "dataframes": {
119
- name: {
120
- "columns": [str(c) for c in df.columns],
121
- "data": df_for_frontend(df, limit).values.tolist(),
122
- }
123
- for name, df in self.dfs.items()
124
- },
125
- "relations": [dataclasses.asdict(relation) for relation in self.relations],
126
- "other": self.other,
127
- }
128
-
129
-
130
- def nx_node_attribute_func(name):
131
- """Decorator for wrapping a function that adds a NetworkX node attribute."""
132
-
133
- def decorator(func):
134
- @functools.wraps(func)
135
- def wrapper(graph: nx.Graph, **kwargs):
136
- graph = graph.copy()
137
- attr = func(graph, **kwargs)
138
- nx.set_node_attributes(graph, attr, name)
139
- return graph
140
-
141
- return wrapper
142
-
143
- return decorator
144
-
145
-
146
- def disambiguate_edges(ws):
147
- """If an input plug is connected to multiple edges, keep only the last edge."""
148
- seen = set()
149
- for edge in reversed(ws.edges):
150
- if (edge.target, edge.targetHandle) in seen:
151
- ws.edges.remove(edge)
152
- seen.add((edge.target, edge.targetHandle))
153
-
154
-
155
- @ops.register_executor(ENV)
156
- async def execute(ws):
157
- catalog: dict[str, ops.Op] = ops.CATALOGS[ENV]
158
- disambiguate_edges(ws)
159
- outputs = {}
160
- failed = 0
161
- while len(outputs) + failed < len(ws.nodes):
162
- for node in ws.nodes:
163
- if node.id in outputs:
164
- continue
165
- # TODO: Take the input/output handles into account.
166
- inputs = [edge.source for edge in ws.edges if edge.target == node.id]
167
- if all(input in outputs for input in inputs):
168
- # All inputs for this node are ready, we can compute the output.
169
- inputs = [outputs[input] for input in inputs]
170
- data = node.data
171
- params = {**data.params}
172
- op = catalog.get(data.title)
173
- if not op:
174
- data.error = "Operation not found in catalog"
175
- failed += 1
176
- continue
177
- try:
178
- # Convert inputs types to match operation signature.
179
- for i, (x, p) in enumerate(zip(inputs, op.inputs.values())):
180
- if p.type == nx.Graph and isinstance(x, Bundle):
181
- inputs[i] = x.to_nx()
182
- elif p.type == Bundle and isinstance(x, nx.Graph):
183
- inputs[i] = Bundle.from_nx(x)
184
- elif p.type == Bundle and isinstance(x, pd.DataFrame):
185
- inputs[i] = Bundle.from_df(x)
186
- result = op(*inputs, **params)
187
- except Exception as e:
188
- traceback.print_exc()
189
- data.error = str(e)
190
- failed += 1
191
- continue
192
- if len(op.inputs) == 1 and op.inputs.get("multi") == "*":
193
- # It's a flexible input. Create n+1 handles.
194
- data.inputs = {f"input{i}": None for i in range(len(inputs) + 1)}
195
- data.error = None
196
- outputs[node.id] = result.output
197
- if result.display:
198
- data.display = result.display
199
 
200
 
201
  @op("Import Parquet")
@@ -246,14 +66,14 @@ def create_scale_free_graph(*, nodes: int = 10):
246
 
247
 
248
  @op("Compute PageRank")
249
- @nx_node_attribute_func("pagerank")
250
  def compute_pagerank(graph: nx.Graph, *, damping=0.85, iterations=100):
251
  # TODO: This requires scipy to be installed.
252
  return nx.pagerank(graph, alpha=damping, max_iter=iterations)
253
 
254
 
255
  @op("Compute betweenness centrality")
256
- @nx_node_attribute_func("betweenness_centrality")
257
  def compute_betweenness_centrality(graph: nx.Graph, *, k=10):
258
  return nx.betweenness_centrality(graph, k=k)
259
 
@@ -271,7 +91,7 @@ def discard_parallel_edges(graph: nx.Graph):
271
 
272
 
273
  @op("SQL")
274
- def sql(bundle: Bundle, *, query: ops.LongStr, save_as: str = "result"):
275
  """Run a SQL query on the DataFrames in the bundle. Save the results as a new DataFrame."""
276
  bundle = bundle.copy()
277
  if os.environ.get("NX_CUGRAPH_AUTOCONFIG", "").strip().lower() == "true":
@@ -292,7 +112,7 @@ def sql(bundle: Bundle, *, query: ops.LongStr, save_as: str = "result"):
292
 
293
 
294
  @op("Cypher")
295
- def cypher(bundle: Bundle, *, query: ops.LongStr, save_as: str = "result"):
296
  """Run a Cypher query on the graph in the bundle. Save the results as a new DataFrame."""
297
  bundle = bundle.copy()
298
  graph = bundle.to_nx()
@@ -302,7 +122,7 @@ def cypher(bundle: Bundle, *, query: ops.LongStr, save_as: str = "result"):
302
 
303
 
304
  @op("Organize bundle")
305
- def organize_bundle(bundle: Bundle, *, code: ops.LongStr):
306
  """Lets you rename/copy/delete DataFrames, and modify relations.
307
 
308
  TODO: Use a declarative solution instead of Python code. Add UI.
@@ -332,7 +152,7 @@ def _map_color(value):
332
  if pd.api.types.is_numeric_dtype(value):
333
  cmap = matplotlib.cm.get_cmap("viridis")
334
  value = (value - value.min()) / (value.max() - value.min())
335
- rgba = cmap(value)
336
  return [
337
  "#{:02x}{:02x}{:02x}".format(int(r * 255), int(g * 255), int(b * 255))
338
  for r, g, b in rgba[:, :3]
@@ -351,13 +171,13 @@ def _map_color(value):
351
 
352
  @op("Visualize graph", view="visualization")
353
  def visualize_graph(
354
- graph: Bundle,
355
  *,
356
  color_nodes_by: ops.NodeAttribute = None,
357
  label_by: ops.NodeAttribute = None,
358
  color_edges_by: ops.EdgeAttribute = None,
359
  ):
360
- nodes = df_for_frontend(graph.dfs["nodes"], 10_000)
361
  if color_nodes_by:
362
  nodes["color"] = _map_color(nodes[color_nodes_by])
363
  for cols in ["x y", "long lat"]:
@@ -387,7 +207,7 @@ def visualize_graph(
387
  )
388
  curveness = 0.3
389
  nodes = nodes.to_records()
390
- edges = df_for_frontend(
391
  graph.dfs["edges"].drop_duplicates(["source", "target"]), 10_000
392
  )
393
  if color_edges_by:
@@ -446,22 +266,8 @@ def visualize_graph(
446
  return v
447
 
448
 
449
- def df_for_frontend(df: pd.DataFrame, limit: int) -> pd.DataFrame:
450
- """Returns a DataFrame with values that are safe to send to the frontend."""
451
- df = df[:limit]
452
- if isinstance(df, pl.LazyFrame):
453
- df = df.collect()
454
- if isinstance(df, pl.DataFrame):
455
- df = df.to_pandas()
456
- # Convert non-numeric columns to strings.
457
- for c in df.columns:
458
- if not pd.api.types.is_numeric_dtype(df[c]):
459
- df[c] = df[c].astype(str)
460
- return df
461
-
462
-
463
  @op("View tables", view="table_view")
464
- def view_tables(bundle: Bundle, *, limit: int = 100):
465
  return bundle.to_dict(limit=limit)
466
 
467
 
@@ -470,7 +276,7 @@ def view_tables(bundle: Bundle, *, limit: int = 100):
470
  view="graph_creation_view",
471
  outputs=["output"],
472
  )
473
- def create_graph(bundle: Bundle, *, relations: str = None) -> Bundle:
474
  """Replace relations of the given bundle
475
 
476
  relations is a stringified JSON, instead of a dict, because complex Yjs types (arrays, maps)
@@ -489,6 +295,6 @@ def create_graph(bundle: Bundle, *, relations: str = None) -> Bundle:
489
  bundle = bundle.copy()
490
  if not (relations is None or relations.strip() == ""):
491
  bundle.relations = [
492
- RelationDefinition(**r) for r in json.loads(relations).values()
493
  ]
494
  return ops.Result(output=bundle, display=bundle.to_dict(limit=100))
 
1
+ """Graph analytics operations."""
2
 
3
  import os
4
  import fsspec
5
  from lynxkite.core import ops
6
  from collections import deque
7
+ from . import core
 
8
  import grandcypher
9
  import joblib
10
  import matplotlib
11
  import networkx as nx
12
  import pandas as pd
13
  import polars as pl
 
 
14
  import json
15
 
16
 
17
  mem = joblib.Memory("../joblib-cache")
18
+ op = ops.op_registration(core.ENV)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
 
21
  @op("Import Parquet")
 
66
 
67
 
68
  @op("Compute PageRank")
69
+ @core.nx_node_attribute_func("pagerank")
70
  def compute_pagerank(graph: nx.Graph, *, damping=0.85, iterations=100):
71
  # TODO: This requires scipy to be installed.
72
  return nx.pagerank(graph, alpha=damping, max_iter=iterations)
73
 
74
 
75
  @op("Compute betweenness centrality")
76
+ @core.nx_node_attribute_func("betweenness_centrality")
77
  def compute_betweenness_centrality(graph: nx.Graph, *, k=10):
78
  return nx.betweenness_centrality(graph, k=k)
79
 
 
91
 
92
 
93
  @op("SQL")
94
+ def sql(bundle: core.Bundle, *, query: ops.LongStr, save_as: str = "result"):
95
  """Run a SQL query on the DataFrames in the bundle. Save the results as a new DataFrame."""
96
  bundle = bundle.copy()
97
  if os.environ.get("NX_CUGRAPH_AUTOCONFIG", "").strip().lower() == "true":
 
112
 
113
 
114
  @op("Cypher")
115
+ def cypher(bundle: core.Bundle, *, query: ops.LongStr, save_as: str = "result"):
116
  """Run a Cypher query on the graph in the bundle. Save the results as a new DataFrame."""
117
  bundle = bundle.copy()
118
  graph = bundle.to_nx()
 
122
 
123
 
124
  @op("Organize bundle")
125
+ def organize_bundle(bundle: core.Bundle, *, code: ops.LongStr):
126
  """Lets you rename/copy/delete DataFrames, and modify relations.
127
 
128
  TODO: Use a declarative solution instead of Python code. Add UI.
 
152
  if pd.api.types.is_numeric_dtype(value):
153
  cmap = matplotlib.cm.get_cmap("viridis")
154
  value = (value - value.min()) / (value.max() - value.min())
155
+ rgba = cmap(value.values)
156
  return [
157
  "#{:02x}{:02x}{:02x}".format(int(r * 255), int(g * 255), int(b * 255))
158
  for r, g, b in rgba[:, :3]
 
171
 
172
  @op("Visualize graph", view="visualization")
173
  def visualize_graph(
174
+ graph: core.Bundle,
175
  *,
176
  color_nodes_by: ops.NodeAttribute = None,
177
  label_by: ops.NodeAttribute = None,
178
  color_edges_by: ops.EdgeAttribute = None,
179
  ):
180
+ nodes = core.df_for_frontend(graph.dfs["nodes"], 10_000)
181
  if color_nodes_by:
182
  nodes["color"] = _map_color(nodes[color_nodes_by])
183
  for cols in ["x y", "long lat"]:
 
207
  )
208
  curveness = 0.3
209
  nodes = nodes.to_records()
210
+ edges = core.df_for_frontend(
211
  graph.dfs["edges"].drop_duplicates(["source", "target"]), 10_000
212
  )
213
  if color_edges_by:
 
266
  return v
267
 
268
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
269
  @op("View tables", view="table_view")
270
+ def view_tables(bundle: core.Bundle, *, limit: int = 100):
271
  return bundle.to_dict(limit=limit)
272
 
273
 
 
276
  view="graph_creation_view",
277
  outputs=["output"],
278
  )
279
+ def create_graph(bundle: core.Bundle, *, relations: str = None) -> core.Bundle:
280
  """Replace relations of the given bundle
281
 
282
  relations is a stringified JSON, instead of a dict, because complex Yjs types (arrays, maps)
 
295
  bundle = bundle.copy()
296
  if not (relations is None or relations.strip() == ""):
297
  bundle.relations = [
298
+ core.RelationDefinition(**r) for r in json.loads(relations).values()
299
  ]
300
  return ops.Result(output=bundle, display=bundle.to_dict(limit=100))
lynxkite-graph-analytics/tests/test_lynxkite_ops.py CHANGED
@@ -2,12 +2,12 @@ import pandas as pd
2
  import pytest
3
  import networkx as nx
4
 
5
- from lynxkite.core import workspace
6
- from lynxkite_graph_analytics.lynxkite_ops import Bundle, execute, op
7
 
8
 
9
  async def test_execute_operation_not_in_catalog():
10
- ws = workspace.Workspace(env="test")
11
  ws.nodes.append(
12
  workspace.WorkspaceNode(
13
  id="1",
@@ -23,6 +23,8 @@ async def test_execute_operation_not_in_catalog():
23
  async def test_execute_operation_inputs_correct_cast():
24
  # Test that the automatic casting of operation inputs works correctly.
25
 
 
 
26
  @op("Create Bundle")
27
  def create_bundle() -> Bundle:
28
  df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]})
 
2
  import pytest
3
  import networkx as nx
4
 
5
+ from lynxkite.core import workspace, ops
6
+ from lynxkite_graph_analytics.core import Bundle, execute, ENV
7
 
8
 
9
  async def test_execute_operation_not_in_catalog():
10
+ ws = workspace.Workspace(env=ENV)
11
  ws.nodes.append(
12
  workspace.WorkspaceNode(
13
  id="1",
 
23
  async def test_execute_operation_inputs_correct_cast():
24
  # Test that the automatic casting of operation inputs works correctly.
25
 
26
+ op = ops.op_registration("test")
27
+
28
  @op("Create Bundle")
29
  def create_bundle() -> Bundle:
30
  df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]})
lynxkite-lynxscribe/tests/test_llm_ops.py CHANGED
@@ -1,5 +1,5 @@
1
  import unittest
2
- from lynxscribe.lynxkite import llm_ops # noqa: F401
3
  from lynxkite.core.executors import one_by_one
4
  from lynxkite.core import ops, workspace
5
 
 
1
  import unittest
2
+ from lynxkite_lynxscribe import llm_ops # noqa: F401
3
  from lynxkite.core.executors import one_by_one
4
  from lynxkite.core import ops, workspace
5