Spaces:
Running
Running
from typing import Optional | |
import dataclasses | |
import fastapi | |
import os | |
import pathlib | |
import pydantic | |
import tempfile | |
import traceback | |
from . import ops | |
from . import basic_ops | |
from . import networkx_ops | |
class BaseConfig(pydantic.BaseModel): | |
model_config = pydantic.ConfigDict( | |
extra='allow', | |
) | |
class Position(BaseConfig): | |
x: float | |
y: float | |
class WorkspaceNodeData(BaseConfig): | |
title: str | |
params: dict | |
display: Optional[object] = None | |
error: Optional[str] = None | |
class WorkspaceNode(BaseConfig): | |
id: str | |
type: str | |
data: WorkspaceNodeData | |
position: Position | |
class WorkspaceEdge(BaseConfig): | |
id: str | |
source: str | |
target: str | |
class Workspace(BaseConfig): | |
nodes: list[WorkspaceNode] | |
edges: list[WorkspaceEdge] | |
app = fastapi.FastAPI() | |
def get_catalog(): | |
return [ | |
{ | |
'type': op.type, | |
'data': { 'title': op.name, 'params': op.params }, | |
'targetPosition': 'left' if op.inputs else None, | |
'sourcePosition': 'right' if op.outputs else None, | |
} | |
for op in ops.ALL_OPS.values()] | |
def execute(ws): | |
nodes = ws.nodes | |
outputs = {} | |
failed = 0 | |
while len(outputs) + failed < len(nodes): | |
for node in nodes: | |
if node.id in outputs: | |
continue | |
inputs = [edge.source for edge in ws.edges if edge.target == node.id] | |
if all(input in outputs for input in inputs): | |
inputs = [outputs[input] for input in inputs] | |
data = node.data | |
op = ops.ALL_OPS[data.title] | |
try: | |
output = op(*inputs, **data.params) | |
except Exception as e: | |
traceback.print_exc() | |
data.error = str(e) | |
failed += 1 | |
continue | |
data.error = None | |
outputs[node.id] = output | |
if op.type == 'graph_view' or op.type == 'table_view': | |
data.view = output | |
class SaveRequest(BaseConfig): | |
path: str | |
ws: Workspace | |
def save(req: SaveRequest): | |
path = DATA_PATH / req.path | |
assert path.is_relative_to(DATA_PATH) | |
j = req.ws.model_dump_json(indent=2) | |
with tempfile.NamedTemporaryFile('w', delete_on_close=False) as f: | |
f.write(j) | |
f.close() | |
os.replace(f.name, path) | |
def save_and_execute(req: SaveRequest): | |
save(req) | |
execute(req.ws) | |
save(req) | |
return req.ws | |
def load(path: str): | |
path = DATA_PATH / path | |
assert path.is_relative_to(DATA_PATH) | |
if not path.exists(): | |
return Workspace(nodes=[], edges=[]) | |
with open(path) as f: | |
j = f.read() | |
ws = Workspace.model_validate_json(j) | |
return ws | |
DATA_PATH = pathlib.Path.cwd() / 'data' | |
class DirectoryEntry: | |
name: str | |
type: str | |
def list_dir(path: str): | |
path = DATA_PATH / path | |
assert path.is_relative_to(DATA_PATH) | |
return sorted([ | |
DirectoryEntry(p.relative_to(DATA_PATH), 'directory' if p.is_dir() else 'workspace') | |
for p in path.iterdir()]) | |
def make_dir(req: dict): | |
path = DATA_PATH / req['path'] | |
assert path.is_relative_to(DATA_PATH) | |
assert not path.exists() | |
path.mkdir() | |
return list_dir(path.parent) | |