File size: 2,746 Bytes
bc2b550
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a24dfe
 
bc2b550
 
 
 
 
 
83cc307
bc2b550
 
 
 
 
a07e9cb
 
bc2b550
 
e7fa7ee
bc2b550
 
 
 
a0194e7
e7fa7ee
a0194e7
bc2b550
 
 
 
21590fa
 
5882a26
bc2b550
 
 
 
 
 
 
 
 
aa0792f
 
 
 
 
 
e7fa7ee
aa0792f
 
 
 
 
 
 
 
e7fa7ee
aa0792f
 
 
 
 
 
 
 
 
 
6a24dfe
aa0792f
 
 
 
 
bc2b550
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
'''For working with LynxKite workspaces.'''
from typing import Optional
import dataclasses
import os
import pydantic
import tempfile
from . import ops

class BaseConfig(pydantic.BaseModel):
    model_config = pydantic.ConfigDict(
        extra='allow',
    )

class Position(BaseConfig):
    x: float
    y: float

class WorkspaceNodeData(BaseConfig):
    title: str
    params: dict
    display: Optional[object] = None
    error: Optional[str] = None
    # Also contains a "meta" field when going out.
    # This is ignored when coming back from the frontend.

class WorkspaceNode(BaseConfig):
    id: str
    type: str
    data: WorkspaceNodeData
    position: Position
    parentId: Optional[str] = None

class WorkspaceEdge(BaseConfig):
    id: str
    source: str
    target: str
    sourceHandle: str
    targetHandle: str

class Workspace(BaseConfig):
    env: str = ''
    nodes: list[WorkspaceNode] = dataclasses.field(default_factory=list)
    edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)


async def execute(ws: Workspace):
    if ws.env in ops.EXECUTORS:
        await ops.EXECUTORS[ws.env](ws)


def save(ws: Workspace, path: str):
    j = ws.model_dump_json(indent=2)
    dirname, basename = os.path.split(path)
    # Create temp file in the same directory to make sure it's on the same filesystem.
    with tempfile.NamedTemporaryFile('w', prefix=f'.{basename}.', dir=dirname, delete_on_close=False) as f:
        f.write(j)
        f.close()
        os.replace(f.name, path)


def load(path: str):
    with open(path) as f:
        j = f.read()
    ws = Workspace.model_validate_json(j)
    # Metadata is added after loading. This way code changes take effect on old boxes too.
    _update_metadata(ws)
    return ws


def _update_metadata(ws):
    catalog = ops.CATALOGS.get(ws.env, {})
    nodes = {node.id: node for node in ws.nodes}
    done = set()
    while len(done) < len(nodes):
        for node in ws.nodes:
            if node.id in done:
                continue
            data = node.data
            if node.parentId is None:
                op = catalog.get(data.title)
            elif node.parentId not in nodes:
                data.error = f'Parent not found: {node.parentId}'
                done.add(node.id)
                continue
            elif node.parentId in done:
                op = nodes[node.parentId].data.meta.sub_nodes[data.title]
            else:
                continue
            if op:
                data.meta = op
                node.type = op.type
                if data.error == 'Unknown operation.':
                    data.error = None
            else:
                data.error = 'Unknown operation.'
            done.add(node.id)
    return ws