darabos commited on
Commit
074be96
·
1 Parent(s): e513a41

Remove duplicate inputs with CRDT.

Browse files
lynxkite-app/src/lynxkite_app/crdt.py CHANGED
@@ -197,7 +197,6 @@ async def workspace_changed(name: str, changes: pycrdt.MapEvent, ws_crdt: pycrdt
197
  getattr(change, "keys", {}).get("__execution_delay", {}).get("newValue", 0)
198
  for change in changes
199
  )
200
- print(f"Running {name} in {ws_pyd.env}...")
201
  if delay:
202
  task = asyncio.create_task(execute(name, ws_crdt, ws_pyd, delay))
203
  delayed_executions[name] = task
@@ -221,10 +220,12 @@ async def execute(
221
  await asyncio.sleep(delay)
222
  except asyncio.CancelledError:
223
  return
 
224
  path = config.DATA_PATH / name
225
  assert path.is_relative_to(config.DATA_PATH), "Provided workspace path is invalid"
226
  # Save user changes before executing, in case the execution fails.
227
  workspace.save(ws_pyd, path)
 
228
  with ws_crdt.doc.transaction():
229
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
230
  if "data" not in nc:
@@ -234,6 +235,7 @@ async def execute(
234
  np._crdt = nc
235
  await workspace.execute(ws_pyd)
236
  workspace.save(ws_pyd, path)
 
237
 
238
 
239
  @contextlib.asynccontextmanager
 
197
  getattr(change, "keys", {}).get("__execution_delay", {}).get("newValue", 0)
198
  for change in changes
199
  )
 
200
  if delay:
201
  task = asyncio.create_task(execute(name, ws_crdt, ws_pyd, delay))
202
  delayed_executions[name] = task
 
220
  await asyncio.sleep(delay)
221
  except asyncio.CancelledError:
222
  return
223
+ print(f"Running {name} in {ws_pyd.env}...")
224
  path = config.DATA_PATH / name
225
  assert path.is_relative_to(config.DATA_PATH), "Provided workspace path is invalid"
226
  # Save user changes before executing, in case the execution fails.
227
  workspace.save(ws_pyd, path)
228
+ ws_pyd._crdt = ws_crdt
229
  with ws_crdt.doc.transaction():
230
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
231
  if "data" not in nc:
 
235
  np._crdt = nc
236
  await workspace.execute(ws_pyd)
237
  workspace.save(ws_pyd, path)
238
+ print(f"Finished running {name} in {ws_pyd.env}.")
239
 
240
 
241
  @contextlib.asynccontextmanager
lynxkite-core/src/lynxkite/core/workspace.py CHANGED
@@ -92,6 +92,7 @@ class Workspace(BaseConfig):
92
  env: str = ""
93
  nodes: list[WorkspaceNode] = dataclasses.field(default_factory=list)
94
  edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)
 
95
 
96
 
97
  async def execute(ws: Workspace):
 
92
  env: str = ""
93
  nodes: list[WorkspaceNode] = dataclasses.field(default_factory=list)
94
  edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)
95
+ _crdt: pycrdt.Map
96
 
97
 
98
  async def execute(ws: Workspace):
lynxkite-graph-analytics/src/lynxkite_graph_analytics/core.py CHANGED
@@ -135,12 +135,15 @@ def nx_node_attribute_func(name):
135
  return decorator
136
 
137
 
138
- def disambiguate_edges(ws):
139
  """If an input plug is connected to multiple edges, keep only the last edge."""
140
  seen = set()
141
  for edge in reversed(ws.edges):
142
  if (edge.target, edge.targetHandle) in seen:
143
- ws.edges.remove(edge)
 
 
 
144
  seen.add((edge.target, edge.targetHandle))
145
 
146
 
 
135
  return decorator
136
 
137
 
138
+ def disambiguate_edges(ws: workspace.Workspace):
139
  """If an input plug is connected to multiple edges, keep only the last edge."""
140
  seen = set()
141
  for edge in reversed(ws.edges):
142
  if (edge.target, edge.targetHandle) in seen:
143
+ i = ws.edges.index(edge)
144
+ del ws.edges[i]
145
+ if hasattr(ws, "_crdt"):
146
+ del ws._crdt["edges"][i]
147
  seen.add((edge.target, edge.targetHandle))
148
 
149