darabos commited on
Commit
d43f961
·
1 Parent(s): d7082b2

Backend can now put results in the CRDT.

Browse files
server/crdt.py CHANGED
@@ -1,4 +1,5 @@
1
  '''CRDT is used to synchronize workspace state for backend and frontend(s).'''
 
2
  import contextlib
3
  import fastapi
4
  import os.path
@@ -10,6 +11,7 @@ import pycrdt_websocket.ystore
10
  router = fastapi.APIRouter()
11
 
12
  def ws_exception_handler(exception, log):
 
13
  log.exception(exception)
14
  return True
15
 
@@ -18,34 +20,93 @@ class WebsocketServer(pycrdt_websocket.WebsocketServer):
18
  ystore = pycrdt_websocket.ystore.FileYStore(f'crdt_data/{name}.crdt')
19
  ydoc = pycrdt.Doc()
20
  ydoc['workspace'] = ws = pycrdt.Map()
21
- ws['nodes'] = pycrdt.Array()
22
- ws['edges'] = pycrdt.Array()
23
- ws['env'] = 'unset'
24
  # Replay updates from the store.
25
  try:
26
  for update, timestamp in [(item[0], item[-1]) async for item in ystore.read()]:
27
  ydoc.apply_update(update)
28
  except pycrdt_websocket.ystore.YDocNotFound:
29
  pass
30
- print('init_room', name, ws)
31
- _subscription_id = ws.observe_deep(handle_deep_changes)
32
- return pycrdt_websocket.YRoom(ystore=ystore, ydoc=ydoc)
 
 
 
 
 
 
 
 
 
33
 
34
  async def get_room(self, name: str) -> pycrdt_websocket.YRoom:
35
- print('get_room', name, self.rooms)
36
  if name not in self.rooms:
37
  self.rooms[name] = await self.init_room(name)
38
- print('get_room2', name, self.rooms)
39
  room = self.rooms[name]
40
  await self.start_room(room)
41
  return room
42
 
43
- print('new WebsocketServer')
44
  websocket_server = WebsocketServer(exception_handler=ws_exception_handler, auto_clean_rooms=False)
45
  asgi_server = pycrdt_websocket.ASGIServer(websocket_server)
46
 
47
- def handle_deep_changes(events):
48
- print('events', events)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
  @contextlib.asynccontextmanager
51
  async def lifespan(app):
@@ -58,5 +119,4 @@ def sanitize_path(path):
58
  @router.websocket("/ws/crdt/{room_name}")
59
  async def crdt_websocket(websocket: fastapi.WebSocket, room_name: str):
60
  room_name = sanitize_path(room_name)
61
- print('room_name', room_name)
62
  await asgi_server({'path': room_name}, websocket._receive, websocket._send)
 
1
  '''CRDT is used to synchronize workspace state for backend and frontend(s).'''
2
+ import asyncio
3
  import contextlib
4
  import fastapi
5
  import os.path
 
11
  router = fastapi.APIRouter()
12
 
13
  def ws_exception_handler(exception, log):
14
+ print('exception', exception)
15
  log.exception(exception)
16
  return True
17
 
 
20
  ystore = pycrdt_websocket.ystore.FileYStore(f'crdt_data/{name}.crdt')
21
  ydoc = pycrdt.Doc()
22
  ydoc['workspace'] = ws = pycrdt.Map()
 
 
 
23
  # Replay updates from the store.
24
  try:
25
  for update, timestamp in [(item[0], item[-1]) async for item in ystore.read()]:
26
  ydoc.apply_update(update)
27
  except pycrdt_websocket.ystore.YDocNotFound:
28
  pass
29
+ if 'nodes' not in ws:
30
+ ws['nodes'] = pycrdt.Array()
31
+ if 'edges' not in ws:
32
+ ws['edges'] = pycrdt.Array()
33
+ if 'env' not in ws:
34
+ ws['env'] = 'unset'
35
+ room = pycrdt_websocket.YRoom(ystore=ystore, ydoc=ydoc)
36
+ room.ws = ws
37
+ def on_change(changes):
38
+ asyncio.create_task(workspace_changed(changes, ws))
39
+ ws.observe_deep(on_change)
40
+ return room
41
 
42
  async def get_room(self, name: str) -> pycrdt_websocket.YRoom:
 
43
  if name not in self.rooms:
44
  self.rooms[name] = await self.init_room(name)
 
45
  room = self.rooms[name]
46
  await self.start_room(room)
47
  return room
48
 
 
49
  websocket_server = WebsocketServer(exception_handler=ws_exception_handler, auto_clean_rooms=False)
50
  asgi_server = pycrdt_websocket.ASGIServer(websocket_server)
51
 
52
+ last_ws_input = None
53
+ def clean_input(ws_pyd):
54
+ for node in ws_pyd.nodes:
55
+ node.data.display = None
56
+ node.position.x = 0
57
+ node.position.y = 0
58
+ if node.model_extra:
59
+ for key in list(node.model_extra.keys()):
60
+ delattr(node, key)
61
+
62
+ def crdt_update(crdt_obj, python_obj):
63
+ if isinstance(python_obj, dict):
64
+ for key, value in python_obj.items():
65
+ if isinstance(value, dict):
66
+ if crdt_obj.get(key) is None:
67
+ crdt_obj[key] = pycrdt.Map()
68
+ crdt_update(crdt_obj[key], value)
69
+ elif isinstance(value, list):
70
+ if crdt_obj.get(key) is None:
71
+ crdt_obj[key] = pycrdt.Array()
72
+ crdt_update(crdt_obj[key], value)
73
+ else:
74
+ print('set', key, value)
75
+ crdt_obj[key] = value
76
+ elif isinstance(python_obj, list):
77
+ for i, value in enumerate(python_obj):
78
+ if isinstance(value, dict):
79
+ if i >= len(crdt_obj):
80
+ crdt_obj.append(pycrdt.Map())
81
+ crdt_update(crdt_obj[i], value)
82
+ elif isinstance(value, list):
83
+ if i >= len(crdt_obj):
84
+ crdt_obj.append(pycrdt.Array())
85
+ crdt_update(crdt_obj[i], value)
86
+ else:
87
+ if i >= len(crdt_obj):
88
+ crdt_obj.append(value)
89
+ else:
90
+ print('set', i, value)
91
+ crdt_obj[i] = value
92
+ else:
93
+ raise ValueError('Invalid type:', python_obj)
94
+
95
+ async def workspace_changed(e, ws_crdt):
96
+ global last_ws_input
97
+ from . import workspace
98
+ ws_pyd = workspace.Workspace.model_validate(ws_crdt.to_py())
99
+ clean_input(ws_pyd)
100
+ if ws_pyd == last_ws_input:
101
+ return
102
+ print('ws changed')
103
+ last_ws_input = ws_pyd.model_copy(deep=True)
104
+ workspace.execute(ws_pyd)
105
+ for nc, np in zip(ws_crdt['nodes'], ws_pyd.nodes):
106
+ if 'data' not in nc:
107
+ nc['data'] = pycrdt.Map()
108
+ # Display is added as an opaque Box.
109
+ nc['data']['display'] = np.data.display
110
 
111
  @contextlib.asynccontextmanager
112
  async def lifespan(app):
 
119
  @router.websocket("/ws/crdt/{room_name}")
120
  async def crdt_websocket(websocket: fastapi.WebSocket, room_name: str):
121
  room_name = sanitize_path(room_name)
 
122
  await asgi_server({'path': room_name}, websocket._receive, websocket._send)
server/lynxkite_ops.py CHANGED
@@ -105,11 +105,6 @@ def execute(ws):
105
  data = node.data
106
  op = catalog[data.title]
107
  params = {**data.params}
108
- if op.sub_nodes:
109
- sub_nodes = children.get(node.id, [])
110
- sub_node_ids = [node.id for node in sub_nodes]
111
- sub_edges = [edge for edge in ws.edges if edge.source in sub_node_ids]
112
- params['sub_flow'] = {'nodes': sub_nodes, 'edges': sub_edges}
113
  # Convert inputs.
114
  for i, (x, p) in enumerate(zip(inputs, op.inputs.values())):
115
  if p.type == nx.Graph and isinstance(x, Bundle):
 
105
  data = node.data
106
  op = catalog[data.title]
107
  params = {**data.params}
 
 
 
 
 
108
  # Convert inputs.
109
  for i, (x, p) in enumerate(zip(inputs, op.inputs.values())):
110
  if p.type == nx.Graph and isinstance(x, Bundle):
web/src/LynxKiteFlow.svelte CHANGED
@@ -136,7 +136,9 @@
136
  const edge = {
137
  id: `${params.source} ${params.target}`,
138
  source: params.source,
 
139
  target: params.target,
 
140
  };
141
  $store.workspace.edges.push(edge);
142
  }
 
136
  const edge = {
137
  id: `${params.source} ${params.target}`,
138
  source: params.source,
139
+ sourceHandle: params.sourceHandle,
140
  target: params.target,
141
+ targetHandle: params.targetHandle,
142
  };
143
  $store.workspace.edges.push(edge);
144
  }
web/src/NodeWithParams.svelte CHANGED
@@ -15,10 +15,11 @@
15
  $store.workspace.nodes[i].data.params[name] = newValue;
16
  updateNodeInternals();
17
  }
 
18
  </script>
19
 
20
  <LynxKiteNode {...$$props}>
21
- {#each Object.entries(data.params) as [name, value]}
22
  <NodeParameter
23
  {name}
24
  {value}
 
15
  $store.workspace.nodes[i].data.params[name] = newValue;
16
  updateNodeInternals();
17
  }
18
+ $: params = data?.params ? Object.entries(data.params) : [];
19
  </script>
20
 
21
  <LynxKiteNode {...$$props}>
22
+ {#each params as [name, value]}
23
  <NodeParameter
24
  {name}
25
  {value}
web/src/NodeWithVisualization.svelte CHANGED
@@ -5,11 +5,12 @@
5
  import { init } from 'echarts';
6
  type $$Props = NodeProps;
7
  export let data: $$Props['data'];
 
8
  </script>
9
 
10
  <NodeWithParams {...$$props}>
11
  {#if data.display}
12
- <Chart {init} options={data.display} initOptions={{renderer: 'canvas', width: 250, height: 250}}/>
13
  {/if}
14
  </NodeWithParams>
15
  <style>
 
5
  import { init } from 'echarts';
6
  type $$Props = NodeProps;
7
  export let data: $$Props['data'];
8
+ $: display = JSON.parse(JSON.stringify(data?.display?.value));
9
  </script>
10
 
11
  <NodeWithParams {...$$props}>
12
  {#if data.display}
13
+ <Chart {init} options={display} initOptions={{renderer: 'canvas', width: 250, height: 250}}/>
14
  {/if}
15
  </NodeWithParams>
16
  <style>