File size: 2,362 Bytes
15bcc0e
 
bc2b550
 
 
 
 
 
 
15bcc0e
bc2b550
 
15bcc0e
bc2b550
 
15bcc0e
bc2b550
 
 
 
15bcc0e
bc2b550
 
 
 
 
6a24dfe
 
bc2b550
15bcc0e
bc2b550
 
 
 
 
15bcc0e
bc2b550
 
 
 
 
a07e9cb
 
bc2b550
15bcc0e
bc2b550
15bcc0e
bc2b550
 
 
 
a0194e7
e7fa7ee
a0194e7
bc2b550
 
 
 
21590fa
 
15bcc0e
2c16673
15bcc0e
2c16673
bc2b550
2c16673
bc2b550
 
 
 
 
 
aa0792f
 
 
 
 
 
e7fa7ee
aa0792f
 
 
 
 
 
 
15bcc0e
aa0792f
 
6a24dfe
15bcc0e
aa0792f
 
15bcc0e
aa0792f
bc2b550
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""For working with LynxKite workspaces."""

from typing import Optional
import dataclasses
import os
import pydantic
import tempfile
from . import ops


class BaseConfig(pydantic.BaseModel):
    model_config = pydantic.ConfigDict(
        extra="allow",
    )


class Position(BaseConfig):
    x: float
    y: float


class WorkspaceNodeData(BaseConfig):
    title: str
    params: dict
    display: Optional[object] = None
    error: Optional[str] = None
    # Also contains a "meta" field when going out.
    # This is ignored when coming back from the frontend.


class WorkspaceNode(BaseConfig):
    id: str
    type: str
    data: WorkspaceNodeData
    position: Position


class WorkspaceEdge(BaseConfig):
    id: str
    source: str
    target: str
    sourceHandle: str
    targetHandle: str


class Workspace(BaseConfig):
    env: str = ""
    nodes: list[WorkspaceNode] = dataclasses.field(default_factory=list)
    edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)


async def execute(ws: Workspace):
    if ws.env in ops.EXECUTORS:
        await ops.EXECUTORS[ws.env](ws)


def save(ws: Workspace, path: str):
    j = ws.model_dump_json(indent=2)
    dirname, basename = os.path.split(path)
    # Create temp file in the same directory to make sure it's on the same filesystem.
    with tempfile.NamedTemporaryFile(
        "w", prefix=f".{basename}.", dir=dirname, delete=False
    ) as f:
        temp_name = f.name
        f.write(j)
    os.replace(temp_name, path)


def load(path: str):
    with open(path) as f:
        j = f.read()
    ws = Workspace.model_validate_json(j)
    # Metadata is added after loading. This way code changes take effect on old boxes too.
    _update_metadata(ws)
    return ws


def _update_metadata(ws):
    catalog = ops.CATALOGS.get(ws.env, {})
    nodes = {node.id: node for node in ws.nodes}
    done = set()
    while len(done) < len(nodes):
        for node in ws.nodes:
            if node.id in done:
                continue
            data = node.data
            op = catalog.get(data.title)
            if op:
                data.meta = op
                node.type = op.type
                if data.error == "Unknown operation.":
                    data.error = None
            else:
                data.error = "Unknown operation."
            done.add(node.id)
    return ws