Spaces:
Running
Running
Keep updating JSON data file even if we are using CRDT.
Browse files- server/crdt.py +13 -4
server/crdt.py
CHANGED
|
@@ -3,6 +3,7 @@
|
|
| 3 |
import asyncio
|
| 4 |
import contextlib
|
| 5 |
import enum
|
|
|
|
| 6 |
import fastapi
|
| 7 |
import os.path
|
| 8 |
import pycrdt
|
|
@@ -12,6 +13,8 @@ import uvicorn
|
|
| 12 |
import builtins
|
| 13 |
|
| 14 |
router = fastapi.APIRouter()
|
|
|
|
|
|
|
| 15 |
|
| 16 |
|
| 17 |
def ws_exception_handler(exception, log):
|
|
@@ -26,7 +29,9 @@ def ws_exception_handler(exception, log):
|
|
| 26 |
|
| 27 |
class WebsocketServer(pycrdt_websocket.WebsocketServer):
|
| 28 |
async def init_room(self, name):
|
| 29 |
-
|
|
|
|
|
|
|
| 30 |
ydoc = pycrdt.Doc()
|
| 31 |
ydoc["workspace"] = ws = pycrdt.Map()
|
| 32 |
# Replay updates from the store.
|
|
@@ -145,13 +150,13 @@ async def workspace_changed(name, changes, ws_crdt):
|
|
| 145 |
for change in changes
|
| 146 |
)
|
| 147 |
if delay:
|
| 148 |
-
task = asyncio.create_task(execute(ws_crdt, ws_pyd, delay))
|
| 149 |
delayed_executions[name] = task
|
| 150 |
else:
|
| 151 |
-
await execute(ws_crdt, ws_pyd)
|
| 152 |
|
| 153 |
|
| 154 |
-
async def execute(ws_crdt, ws_pyd, delay=0):
|
| 155 |
from . import workspace
|
| 156 |
|
| 157 |
if delay:
|
|
@@ -159,7 +164,11 @@ async def execute(ws_crdt, ws_pyd, delay=0):
|
|
| 159 |
await asyncio.sleep(delay)
|
| 160 |
except asyncio.CancelledError:
|
| 161 |
return
|
|
|
|
|
|
|
|
|
|
| 162 |
await workspace.execute(ws_pyd)
|
|
|
|
| 163 |
with ws_crdt.doc.transaction():
|
| 164 |
for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
|
| 165 |
if "data" not in nc:
|
|
|
|
| 3 |
import asyncio
|
| 4 |
import contextlib
|
| 5 |
import enum
|
| 6 |
+
import pathlib
|
| 7 |
import fastapi
|
| 8 |
import os.path
|
| 9 |
import pycrdt
|
|
|
|
| 13 |
import builtins
|
| 14 |
|
| 15 |
router = fastapi.APIRouter()
|
| 16 |
+
DATA_PATH = pathlib.Path.cwd() / "data"
|
| 17 |
+
CRDT_PATH = pathlib.Path.cwd() / "crdt_data"
|
| 18 |
|
| 19 |
|
| 20 |
def ws_exception_handler(exception, log):
|
|
|
|
| 29 |
|
| 30 |
class WebsocketServer(pycrdt_websocket.WebsocketServer):
|
| 31 |
async def init_room(self, name):
|
| 32 |
+
path = CRDT_PATH / f"{name}.crdt"
|
| 33 |
+
assert path.is_relative_to(CRDT_PATH)
|
| 34 |
+
ystore = pycrdt_websocket.ystore.FileYStore(path)
|
| 35 |
ydoc = pycrdt.Doc()
|
| 36 |
ydoc["workspace"] = ws = pycrdt.Map()
|
| 37 |
# Replay updates from the store.
|
|
|
|
| 150 |
for change in changes
|
| 151 |
)
|
| 152 |
if delay:
|
| 153 |
+
task = asyncio.create_task(execute(name, ws_crdt, ws_pyd, delay))
|
| 154 |
delayed_executions[name] = task
|
| 155 |
else:
|
| 156 |
+
await execute(name, ws_crdt, ws_pyd)
|
| 157 |
|
| 158 |
|
| 159 |
+
async def execute(name, ws_crdt, ws_pyd, delay=0):
|
| 160 |
from . import workspace
|
| 161 |
|
| 162 |
if delay:
|
|
|
|
| 164 |
await asyncio.sleep(delay)
|
| 165 |
except asyncio.CancelledError:
|
| 166 |
return
|
| 167 |
+
path = DATA_PATH / name
|
| 168 |
+
assert path.is_relative_to(DATA_PATH)
|
| 169 |
+
workspace.save(ws_pyd, path)
|
| 170 |
await workspace.execute(ws_pyd)
|
| 171 |
+
workspace.save(ws_pyd, path)
|
| 172 |
with ws_crdt.doc.transaction():
|
| 173 |
for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
|
| 174 |
if "data" not in nc:
|