Spaces:
Running
Running
Proper error when an input is missing.
Browse files
lynxkite-graph-analytics/src/lynxkite_graph_analytics/core.py
CHANGED
|
@@ -1,6 +1,7 @@
|
|
| 1 |
"""Graph analytics executor and data types."""
|
| 2 |
|
| 3 |
-
|
|
|
|
| 4 |
import dataclasses
|
| 5 |
import functools
|
| 6 |
import networkx as nx
|
|
@@ -144,44 +145,60 @@ def disambiguate_edges(ws):
|
|
| 144 |
|
| 145 |
|
| 146 |
@ops.register_executor(ENV)
|
| 147 |
-
async def execute(ws):
|
| 148 |
catalog: dict[str, ops.Op] = ops.CATALOGS[ws.env]
|
| 149 |
disambiguate_edges(ws)
|
| 150 |
outputs = {}
|
| 151 |
-
|
| 152 |
-
|
| 153 |
-
|
| 154 |
-
|
| 155 |
-
|
| 156 |
-
|
| 157 |
-
|
| 158 |
-
|
|
|
|
| 159 |
# All inputs for this node are ready, we can compute the output.
|
| 160 |
-
|
| 161 |
-
|
| 162 |
-
|
| 163 |
-
|
| 164 |
-
|
| 165 |
-
|
| 166 |
-
|
| 167 |
-
|
| 168 |
-
|
| 169 |
-
|
| 170 |
-
|
| 171 |
-
|
| 172 |
-
|
| 173 |
-
|
| 174 |
-
|
| 175 |
-
|
| 176 |
-
|
| 177 |
-
|
| 178 |
-
|
| 179 |
-
|
| 180 |
-
|
| 181 |
-
|
| 182 |
-
|
| 183 |
-
|
| 184 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 185 |
|
| 186 |
|
| 187 |
def df_for_frontend(df: pd.DataFrame, limit: int) -> pd.DataFrame:
|
|
|
|
| 1 |
"""Graph analytics executor and data types."""
|
| 2 |
|
| 3 |
+
import os
|
| 4 |
+
from lynxkite.core import ops, workspace
|
| 5 |
import dataclasses
|
| 6 |
import functools
|
| 7 |
import networkx as nx
|
|
|
|
| 145 |
|
| 146 |
|
| 147 |
@ops.register_executor(ENV)
|
| 148 |
+
async def execute(ws: workspace.Workspace):
|
| 149 |
catalog: dict[str, ops.Op] = ops.CATALOGS[ws.env]
|
| 150 |
disambiguate_edges(ws)
|
| 151 |
outputs = {}
|
| 152 |
+
nodes = {node.id: node for node in ws.nodes}
|
| 153 |
+
todo = set(nodes.keys())
|
| 154 |
+
progress = True
|
| 155 |
+
while progress:
|
| 156 |
+
progress = False
|
| 157 |
+
for id in list(todo):
|
| 158 |
+
node = nodes[id]
|
| 159 |
+
input_nodes = [edge.source for edge in ws.edges if edge.target == id]
|
| 160 |
+
if all(input in outputs for input in input_nodes):
|
| 161 |
# All inputs for this node are ready, we can compute the output.
|
| 162 |
+
todo.remove(id)
|
| 163 |
+
progress = True
|
| 164 |
+
_execute_node(node, ws, catalog, outputs)
|
| 165 |
+
|
| 166 |
+
|
| 167 |
+
def _execute_node(node, ws, catalog, outputs):
|
| 168 |
+
params = {**node.data.params}
|
| 169 |
+
op = catalog.get(node.data.title)
|
| 170 |
+
if not op:
|
| 171 |
+
node.publish_error("Operation not found in catalog")
|
| 172 |
+
return
|
| 173 |
+
node.publish_started()
|
| 174 |
+
input_map = {
|
| 175 |
+
edge.targetHandle: outputs[edge.source]
|
| 176 |
+
for edge in ws.edges
|
| 177 |
+
if edge.target == node.id
|
| 178 |
+
}
|
| 179 |
+
try:
|
| 180 |
+
# Convert inputs types to match operation signature.
|
| 181 |
+
inputs = []
|
| 182 |
+
for p in op.inputs.values():
|
| 183 |
+
if p.name not in input_map:
|
| 184 |
+
node.publish_error(f"Missing input: {p.name}")
|
| 185 |
+
return
|
| 186 |
+
x = input_map[p.name]
|
| 187 |
+
if p.type == nx.Graph and isinstance(x, Bundle):
|
| 188 |
+
x = x.to_nx()
|
| 189 |
+
elif p.type == Bundle and isinstance(x, nx.Graph):
|
| 190 |
+
x = Bundle.from_nx(x)
|
| 191 |
+
elif p.type == Bundle and isinstance(x, pd.DataFrame):
|
| 192 |
+
x = Bundle.from_df(x)
|
| 193 |
+
inputs.append(x)
|
| 194 |
+
result = op(*inputs, **params)
|
| 195 |
+
except Exception as e:
|
| 196 |
+
if os.environ.get("LYNXKITE_LOG_OP_ERRORS"):
|
| 197 |
+
traceback.print_exc()
|
| 198 |
+
node.publish_error(e)
|
| 199 |
+
return
|
| 200 |
+
outputs[node.id] = result.output
|
| 201 |
+
node.publish_result(result)
|
| 202 |
|
| 203 |
|
| 204 |
def df_for_frontend(df: pd.DataFrame, limit: int) -> pd.DataFrame:
|