darabos commited on
Commit
d65daf1
·
1 Parent(s): 8abbae8

Progress indicator in one_by_one.py.

Browse files
lynxkite-app/src/lynxkite_app/crdt.py CHANGED
@@ -197,6 +197,7 @@ async def workspace_changed(name: str, changes: pycrdt.MapEvent, ws_crdt: pycrdt
197
  getattr(change, "keys", {}).get("__execution_delay", {}).get("newValue", 0)
198
  for change in changes
199
  )
 
200
  if delay:
201
  task = asyncio.create_task(execute(name, ws_crdt, ws_pyd, delay))
202
  delayed_executions[name] = task
 
197
  getattr(change, "keys", {}).get("__execution_delay", {}).get("newValue", 0)
198
  for change in changes
199
  )
200
+ print(f"Running {name} in {ws_pyd.env}...")
201
  if delay:
202
  task = asyncio.create_task(execute(name, ws_crdt, ws_pyd, delay))
203
  delayed_executions[name] = task
lynxkite-core/src/lynxkite/core/executors/one_by_one.py CHANGED
@@ -104,11 +104,11 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
104
  tasks = {}
105
  NO_INPUT = object() # Marker for initial tasks.
106
  for node in ws.nodes:
107
- node.data.error = None
108
  op = catalog.get(node.data.title)
109
  if op is None:
110
- node.data.error = f'Operation "{node.data.title}" not found.'
111
  continue
 
112
  # Start tasks for nodes that have no non-batch inputs.
113
  if all([i.position in "top or bottom" for i in op.inputs.values()]):
114
  tasks[node.id] = [NO_INPUT]
@@ -123,12 +123,12 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
123
  next_stage.setdefault(n, []).extend(ts)
124
  continue
125
  node = nodes[n]
126
- data = node.data
127
- op = catalog[data.title]
128
- params = {**data.params}
129
  if has_ctx(op):
130
  params["_ctx"] = contexts[node.id]
131
  results = []
 
132
  for task in ts:
133
  try:
134
  inputs = []
@@ -146,11 +146,12 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
146
  cache[key] = output
147
  output = cache[key]
148
  else:
 
149
  result = op(*inputs, **params)
150
  output = await await_if_needed(result.output)
151
  except Exception as e:
152
  traceback.print_exc()
153
- data.error = str(e)
154
  break
155
  contexts[node.id].last_result = output
156
  # Returned lists and DataFrames are considered multiple tasks.
@@ -161,7 +162,7 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
161
  results.extend(output)
162
  else: # Finished all tasks without errors.
163
  if result.display:
164
- data.display = await await_if_needed(result.display)
165
  for edge in edges[node.id]:
166
  t = nodes[edge.target]
167
  op = catalog[t.data.title]
@@ -172,5 +173,6 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
172
  ).extend(results)
173
  else:
174
  tasks.setdefault(edge.target, []).extend(results)
 
175
  tasks = next_stage
176
  return contexts
 
104
  tasks = {}
105
  NO_INPUT = object() # Marker for initial tasks.
106
  for node in ws.nodes:
 
107
  op = catalog.get(node.data.title)
108
  if op is None:
109
+ node.publish_error(f'Operation "{node.data.title}" not found.')
110
  continue
111
+ node.publish_error(None)
112
  # Start tasks for nodes that have no non-batch inputs.
113
  if all([i.position in "top or bottom" for i in op.inputs.values()]):
114
  tasks[node.id] = [NO_INPUT]
 
123
  next_stage.setdefault(n, []).extend(ts)
124
  continue
125
  node = nodes[n]
126
+ op = catalog[node.data.title]
127
+ params = {**node.data.params}
 
128
  if has_ctx(op):
129
  params["_ctx"] = contexts[node.id]
130
  results = []
131
+ node.publish_started()
132
  for task in ts:
133
  try:
134
  inputs = []
 
146
  cache[key] = output
147
  output = cache[key]
148
  else:
149
+ op.publish_started()
150
  result = op(*inputs, **params)
151
  output = await await_if_needed(result.output)
152
  except Exception as e:
153
  traceback.print_exc()
154
+ node.publish_error(e)
155
  break
156
  contexts[node.id].last_result = output
157
  # Returned lists and DataFrames are considered multiple tasks.
 
162
  results.extend(output)
163
  else: # Finished all tasks without errors.
164
  if result.display:
165
+ result.display = await await_if_needed(result.display)
166
  for edge in edges[node.id]:
167
  t = nodes[edge.target]
168
  op = catalog[t.data.title]
 
173
  ).extend(results)
174
  else:
175
  tasks.setdefault(edge.target, []).extend(results)
176
+ op.publish_result(result)
177
  tasks = next_stage
178
  return contexts
lynxkite-core/src/lynxkite/core/ops.py CHANGED
@@ -9,6 +9,9 @@ import typing
9
  from dataclasses import dataclass
10
  from typing_extensions import Annotated
11
 
 
 
 
12
  CATALOGS = {}
13
  EXECUTORS = {}
14
 
@@ -233,9 +236,15 @@ def register_passive_op(env: str, name: str, inputs=[], outputs=["output"], para
233
 
234
 
235
  def register_executor(env: str):
236
- """Decorator for registering an executor."""
237
 
238
- def decorator(func):
 
 
 
 
 
 
239
  EXECUTORS[env] = func
240
  return func
241
 
 
9
  from dataclasses import dataclass
10
  from typing_extensions import Annotated
11
 
12
+ if typing.TYPE_CHECKING:
13
+ from . import workspace
14
+
15
  CATALOGS = {}
16
  EXECUTORS = {}
17
 
 
236
 
237
 
238
  def register_executor(env: str):
239
+ """Decorator for registering an executor.
240
 
241
+ The executor is a function that takes a workspace and executes the operations in it.
242
+ When it starts executing an operation, it should call `node.publish_started()` to indicate
243
+ the status on the UI. When the execution is finished, it should call `node.publish_result()`.
244
+ This will update the UI with the result of the operation.
245
+ """
246
+
247
+ def decorator(func: typing.Callable[[workspace.Workspace], typing.Any]):
248
  EXECUTORS[env] = func
249
  return func
250
 
lynxkite-core/src/lynxkite/core/workspace.py CHANGED
@@ -49,9 +49,12 @@ class WorkspaceNode(BaseConfig):
49
 
50
  def publish_started(self):
51
  """Notifies the frontend that work has started on this node."""
 
52
  self.data.status = NodeStatus.active
53
  if hasattr(self, "_crdt"):
54
- self._crdt["data"]["status"] = NodeStatus.active
 
 
55
 
56
  def publish_result(self, result: ops.Result):
57
  """Sends the result to the frontend. Call this in an executor when the result is available."""
@@ -64,8 +67,10 @@ class WorkspaceNode(BaseConfig):
64
  self._crdt["data"]["error"] = result.error
65
  self._crdt["data"]["status"] = NodeStatus.done
66
 
67
- def publish_error(self, error: Exception | str):
68
- self.publish_result(ops.Result(error=str(error)))
 
 
69
 
70
 
71
  class WorkspaceEdge(BaseConfig):
 
49
 
50
  def publish_started(self):
51
  """Notifies the frontend that work has started on this node."""
52
+ self.data.error = None
53
  self.data.status = NodeStatus.active
54
  if hasattr(self, "_crdt"):
55
+ with self._crdt.doc.transaction():
56
+ self._crdt["data"]["error"] = None
57
+ self._crdt["data"]["status"] = NodeStatus.active
58
 
59
  def publish_result(self, result: ops.Result):
60
  """Sends the result to the frontend. Call this in an executor when the result is available."""
 
67
  self._crdt["data"]["error"] = result.error
68
  self._crdt["data"]["status"] = NodeStatus.done
69
 
70
+ def publish_error(self, error: Exception | str | None):
71
+ """Can be called with None to clear the error state."""
72
+ result = ops.Result(error=str(error) if error else None)
73
+ self.publish_result(result)
74
 
75
 
76
  class WorkspaceEdge(BaseConfig):