darabos commited on
Commit
e513a41
·
1 Parent(s): 0280171

Proper error when an input is missing.

Browse files
lynxkite-graph-analytics/src/lynxkite_graph_analytics/core.py CHANGED
@@ -1,6 +1,7 @@
1
  """Graph analytics executor and data types."""
2
 
3
- from lynxkite.core import ops
 
4
  import dataclasses
5
  import functools
6
  import networkx as nx
@@ -144,44 +145,60 @@ def disambiguate_edges(ws):
144
 
145
 
146
  @ops.register_executor(ENV)
147
- async def execute(ws):
148
  catalog: dict[str, ops.Op] = ops.CATALOGS[ws.env]
149
  disambiguate_edges(ws)
150
  outputs = {}
151
- failed = 0
152
- while len(outputs) + failed < len(ws.nodes):
153
- for node in ws.nodes:
154
- if node.id in outputs:
155
- continue
156
- # TODO: Take the input/output handles into account.
157
- inputs = [edge.source for edge in ws.edges if edge.target == node.id]
158
- if all(input in outputs for input in inputs):
 
159
  # All inputs for this node are ready, we can compute the output.
160
- inputs = [outputs[input] for input in inputs]
161
- params = {**node.data.params}
162
- op = catalog.get(node.data.title)
163
- if not op:
164
- node.publish_error("Operation not found in catalog")
165
- failed += 1
166
- continue
167
- node.publish_started()
168
- try:
169
- # Convert inputs types to match operation signature.
170
- for i, (x, p) in enumerate(zip(inputs, op.inputs.values())):
171
- if p.type == nx.Graph and isinstance(x, Bundle):
172
- inputs[i] = x.to_nx()
173
- elif p.type == Bundle and isinstance(x, nx.Graph):
174
- inputs[i] = Bundle.from_nx(x)
175
- elif p.type == Bundle and isinstance(x, pd.DataFrame):
176
- inputs[i] = Bundle.from_df(x)
177
- result = op(*inputs, **params)
178
- except Exception as e:
179
- traceback.print_exc()
180
- node.publish_error(e)
181
- failed += 1
182
- continue
183
- outputs[node.id] = result.output
184
- node.publish_result(result)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
 
186
 
187
  def df_for_frontend(df: pd.DataFrame, limit: int) -> pd.DataFrame:
 
1
  """Graph analytics executor and data types."""
2
 
3
+ import os
4
+ from lynxkite.core import ops, workspace
5
  import dataclasses
6
  import functools
7
  import networkx as nx
 
145
 
146
 
147
  @ops.register_executor(ENV)
148
+ async def execute(ws: workspace.Workspace):
149
  catalog: dict[str, ops.Op] = ops.CATALOGS[ws.env]
150
  disambiguate_edges(ws)
151
  outputs = {}
152
+ nodes = {node.id: node for node in ws.nodes}
153
+ todo = set(nodes.keys())
154
+ progress = True
155
+ while progress:
156
+ progress = False
157
+ for id in list(todo):
158
+ node = nodes[id]
159
+ input_nodes = [edge.source for edge in ws.edges if edge.target == id]
160
+ if all(input in outputs for input in input_nodes):
161
  # All inputs for this node are ready, we can compute the output.
162
+ todo.remove(id)
163
+ progress = True
164
+ _execute_node(node, ws, catalog, outputs)
165
+
166
+
167
+ def _execute_node(node, ws, catalog, outputs):
168
+ params = {**node.data.params}
169
+ op = catalog.get(node.data.title)
170
+ if not op:
171
+ node.publish_error("Operation not found in catalog")
172
+ return
173
+ node.publish_started()
174
+ input_map = {
175
+ edge.targetHandle: outputs[edge.source]
176
+ for edge in ws.edges
177
+ if edge.target == node.id
178
+ }
179
+ try:
180
+ # Convert inputs types to match operation signature.
181
+ inputs = []
182
+ for p in op.inputs.values():
183
+ if p.name not in input_map:
184
+ node.publish_error(f"Missing input: {p.name}")
185
+ return
186
+ x = input_map[p.name]
187
+ if p.type == nx.Graph and isinstance(x, Bundle):
188
+ x = x.to_nx()
189
+ elif p.type == Bundle and isinstance(x, nx.Graph):
190
+ x = Bundle.from_nx(x)
191
+ elif p.type == Bundle and isinstance(x, pd.DataFrame):
192
+ x = Bundle.from_df(x)
193
+ inputs.append(x)
194
+ result = op(*inputs, **params)
195
+ except Exception as e:
196
+ if os.environ.get("LYNXKITE_LOG_OP_ERRORS"):
197
+ traceback.print_exc()
198
+ node.publish_error(e)
199
+ return
200
+ outputs[node.id] = result.output
201
+ node.publish_result(result)
202
 
203
 
204
  def df_for_frontend(df: pd.DataFrame, limit: int) -> pd.DataFrame: