darabos commited on
Commit
e8a8341
·
1 Parent(s): 5826642

Split one-by-one executor into separate module.

Browse files
Files changed (2) hide show
  1. server/executors/one_by_one.py +125 -0
  2. server/llm_ops.py +6 -119
server/executors/one_by_one.py ADDED
@@ -0,0 +1,125 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from .. import ops
2
+ from .. import workspace
3
+ import fastapi
4
+ import json
5
+ import pandas as pd
6
+ import traceback
7
+ import inspect
8
+ import typing
9
+
10
+ class Context(ops.BaseConfig):
11
+ '''Passed to operation functions as "_ctx" if they have such a parameter.'''
12
+ node: workspace.WorkspaceNode
13
+ last_result: typing.Any = None
14
+
15
+ class Output(ops.BaseConfig):
16
+ '''Return this to send values to specific outputs of a node.'''
17
+ output_handle: str
18
+ value: dict
19
+
20
+
21
+ def df_to_list(df):
22
+ return [dict(zip(df.columns, row)) for row in df.values]
23
+
24
+ def has_ctx(op):
25
+ sig = inspect.signature(op.func)
26
+ return '_ctx' in sig.parameters
27
+
28
+ def register(env: str):
29
+ '''Registers the one-by-one executor.'''
30
+ ops.EXECUTORS[env] = execute
31
+
32
+ def get_stages(ws, catalog):
33
+ '''Inputs on top are batch inputs. We decompose the graph into a DAG of components along these edges.'''
34
+ nodes = {n.id: n for n in ws.nodes}
35
+ batch_inputs = {}
36
+ inputs = {}
37
+ for edge in ws.edges:
38
+ inputs.setdefault(edge.target, []).append(edge.source)
39
+ node = nodes[edge.target]
40
+ op = catalog[node.data.title]
41
+ i = op.inputs[edge.targetHandle]
42
+ if i.position == 'top':
43
+ batch_inputs.setdefault(edge.target, []).append(edge.source)
44
+ stages = []
45
+ for bt, bss in batch_inputs.items():
46
+ upstream = set(bss)
47
+ new = set(bss)
48
+ while new:
49
+ n = new.pop()
50
+ for i in inputs.get(n, []):
51
+ if i not in upstream:
52
+ upstream.add(i)
53
+ new.add(i)
54
+ stages.append(upstream)
55
+ stages.sort(key=lambda s: len(s))
56
+ stages.append(set(nodes))
57
+ return stages
58
+
59
+ EXECUTOR_OUTPUT_CACHE = {}
60
+
61
+ def execute(ws, catalog, cache=None):
62
+ nodes = {n.id: n for n in ws.nodes}
63
+ contexts = {n.id: Context(node=n) for n in ws.nodes}
64
+ edges = {n.id: [] for n in ws.nodes}
65
+ for e in ws.edges:
66
+ edges[e.source].append(e)
67
+ tasks = {}
68
+ NO_INPUT = object() # Marker for initial tasks.
69
+ for node in ws.nodes:
70
+ node.data.error = None
71
+ op = catalog[node.data.title]
72
+ # Start tasks for nodes that have no inputs.
73
+ if not op.inputs:
74
+ tasks[node.id] = [NO_INPUT]
75
+ batch_inputs = {}
76
+ # Run the rest until we run out of tasks.
77
+ for stage in get_stages(ws, catalog):
78
+ next_stage = {}
79
+ while tasks:
80
+ n, ts = tasks.popitem()
81
+ if n not in stage:
82
+ next_stage.setdefault(n, []).extend(ts)
83
+ continue
84
+ node = nodes[n]
85
+ data = node.data
86
+ op = catalog[data.title]
87
+ params = {**data.params}
88
+ if has_ctx(op):
89
+ params['_ctx'] = contexts[node.id]
90
+ results = []
91
+ for task in ts:
92
+ try:
93
+ inputs = [
94
+ batch_inputs[(n, i.name)] if i.position == 'top' else task
95
+ for i in op.inputs.values()]
96
+ key = json.dumps(fastapi.encoders.jsonable_encoder((inputs, params)))
97
+ if cache:
98
+ if key not in cache:
99
+ cache[key] = op.func(*inputs, **params)
100
+ result = cache[key]
101
+ else:
102
+ result = op.func(*inputs, **params)
103
+ except Exception as e:
104
+ traceback.print_exc()
105
+ data.error = str(e)
106
+ break
107
+ contexts[node.id].last_result = result
108
+ # Returned lists and DataFrames are considered multiple tasks.
109
+ if isinstance(result, pd.DataFrame):
110
+ result = df_to_list(result)
111
+ elif not isinstance(result, list):
112
+ result = [result]
113
+ results.extend(result)
114
+ else: # Finished all tasks without errors.
115
+ if op.type == 'visualization' or op.type == 'table_view':
116
+ data.display = results[0]
117
+ for edge in edges[node.id]:
118
+ t = nodes[edge.target]
119
+ op = catalog[t.data.title]
120
+ i = op.inputs[edge.targetHandle]
121
+ if i.position == 'top':
122
+ batch_inputs.setdefault((edge.target, edge.targetHandle), []).extend(results)
123
+ else:
124
+ tasks.setdefault(edge.target, []).extend(results)
125
+ tasks = next_stage
server/llm_ops.py CHANGED
@@ -1,33 +1,20 @@
1
  '''For specifying an LLM agent logic flow.'''
2
  from . import ops
3
  import chromadb
4
- import fastapi.encoders
5
- import inspect
6
  import jinja2
7
  import json
8
  import openai
9
  import pandas as pd
10
- import traceback
11
- import typing
12
- from . import workspace
13
 
14
  client = openai.OpenAI(base_url="http://localhost:11434/v1")
15
  jinja = jinja2.Environment()
16
  chroma_client = chromadb.Client()
17
  LLM_CACHE = {}
18
  ENV = 'LLM logic'
 
19
  op = ops.op_registration(ENV)
20
 
21
- class Context(ops.BaseConfig):
22
- '''Passed to operation functions as "_ctx" if they have such a parameter.'''
23
- node: workspace.WorkspaceNode
24
- last_result: typing.Any = None
25
-
26
- class Output(ops.BaseConfig):
27
- '''Return this to send values to specific outputs of a node.'''
28
- output_handle: str
29
- value: dict
30
-
31
  def chat(*args, **kwargs):
32
  key = json.dumps({'args': args, 'kwargs': kwargs})
33
  if key not in LLM_CACHE:
@@ -66,7 +53,7 @@ def ask_llm(input, *, model: str, accepted_regex: str = None, max_tokens: int =
66
  return [{**input, 'response': r} for r in results]
67
 
68
  @op("View", view="table_view")
69
- def view(input, *, _ctx: Context):
70
  v = _ctx.last_result
71
  if v:
72
  columns = v['dataframes']['df']['columns']
@@ -84,7 +71,7 @@ def view(input, *, _ctx: Context):
84
  @ops.input_position(input="right")
85
  @ops.output_position(output="left")
86
  @op("Loop")
87
- def loop(input, *, max_iterations: int = 3, _ctx: Context):
88
  '''Data can flow back here max_iterations-1 times.'''
89
  key = f'iterations-{_ctx.node.id}'
90
  input[key] = input.get(key, 0) + 1
@@ -94,11 +81,11 @@ def loop(input, *, max_iterations: int = 3, _ctx: Context):
94
  @op('Branch', outputs=['true', 'false'])
95
  def branch(input, *, expression: str):
96
  res = eval(expression, input)
97
- return Output(output_handle=str(bool(res)).lower(), value=input)
98
 
99
  @ops.input_position(db="top")
100
  @op('RAG')
101
- def rag(input, db, *, input_field='text', db_field='text', num_matches: int=10, _ctx: Context):
102
  last = _ctx.last_result
103
  if last:
104
  collection = last['_collection']
@@ -127,104 +114,4 @@ def run_python(input, *, template: str):
127
  p = p.replace(k.upper(), str(v))
128
  return p
129
 
130
- EXECUTOR_OUTPUT_CACHE = {}
131
-
132
- @ops.register_executor(ENV)
133
- def execute(ws):
134
- catalog = ops.CATALOGS[ENV]
135
- nodes = {n.id: n for n in ws.nodes}
136
- contexts = {n.id: Context(node=n) for n in ws.nodes}
137
- edges = {n.id: [] for n in ws.nodes}
138
- for e in ws.edges:
139
- edges[e.source].append(e)
140
- tasks = {}
141
- NO_INPUT = object() # Marker for initial tasks.
142
- for node in ws.nodes:
143
- node.data.error = None
144
- op = catalog[node.data.title]
145
- # Start tasks for nodes that have no inputs.
146
- if not op.inputs:
147
- tasks[node.id] = [NO_INPUT]
148
- batch_inputs = {}
149
- # Run the rest until we run out of tasks.
150
- for stage in get_stages(ws):
151
- next_stage = {}
152
- while tasks:
153
- n, ts = tasks.popitem()
154
- if n not in stage:
155
- next_stage.setdefault(n, []).extend(ts)
156
- continue
157
- node = nodes[n]
158
- data = node.data
159
- op = catalog[data.title]
160
- params = {**data.params}
161
- if has_ctx(op):
162
- params['_ctx'] = contexts[node.id]
163
- results = []
164
- for task in ts:
165
- try:
166
- inputs = [
167
- batch_inputs[(n, i.name)] if i.position == 'top' else task
168
- for i in op.inputs.values()]
169
- key = json.dumps(fastapi.encoders.jsonable_encoder((inputs, params)))
170
- if key not in EXECUTOR_OUTPUT_CACHE:
171
- EXECUTOR_OUTPUT_CACHE[key] = op.func(*inputs, **params)
172
- result = EXECUTOR_OUTPUT_CACHE[key]
173
- except Exception as e:
174
- traceback.print_exc()
175
- data.error = str(e)
176
- break
177
- contexts[node.id].last_result = result
178
- # Returned lists and DataFrames are considered multiple tasks.
179
- if isinstance(result, pd.DataFrame):
180
- result = df_to_list(result)
181
- elif not isinstance(result, list):
182
- result = [result]
183
- results.extend(result)
184
- else: # Finished all tasks without errors.
185
- if op.type == 'visualization' or op.type == 'table_view':
186
- data.display = results[0]
187
- for edge in edges[node.id]:
188
- t = nodes[edge.target]
189
- op = catalog[t.data.title]
190
- i = op.inputs[edge.targetHandle]
191
- if i.position == 'top':
192
- batch_inputs.setdefault((edge.target, edge.targetHandle), []).extend(results)
193
- else:
194
- tasks.setdefault(edge.target, []).extend(results)
195
- tasks = next_stage
196
-
197
- def df_to_list(df):
198
- return [dict(zip(df.columns, row)) for row in df.values]
199
-
200
- def has_ctx(op):
201
- sig = inspect.signature(op.func)
202
- return '_ctx' in sig.parameters
203
 
204
- def get_stages(ws):
205
- '''Inputs on top are batch inputs. We decompose the graph into a DAG of components along these edges.'''
206
- catalog = ops.CATALOGS[ENV]
207
- nodes = {n.id: n for n in ws.nodes}
208
- batch_inputs = {}
209
- inputs = {}
210
- for edge in ws.edges:
211
- inputs.setdefault(edge.target, []).append(edge.source)
212
- node = nodes[edge.target]
213
- op = catalog[node.data.title]
214
- i = op.inputs[edge.targetHandle]
215
- if i.position == 'top':
216
- batch_inputs.setdefault(edge.target, []).append(edge.source)
217
- stages = []
218
- for bt, bss in batch_inputs.items():
219
- upstream = set(bss)
220
- new = set(bss)
221
- while new:
222
- n = new.pop()
223
- for i in inputs.get(n, []):
224
- if i not in upstream:
225
- upstream.add(i)
226
- new.add(i)
227
- stages.append(upstream)
228
- stages.sort(key=lambda s: len(s))
229
- stages.append(set(nodes))
230
- return stages
 
1
  '''For specifying an LLM agent logic flow.'''
2
  from . import ops
3
  import chromadb
 
 
4
  import jinja2
5
  import json
6
  import openai
7
  import pandas as pd
8
+ from .executors import one_by_one
 
 
9
 
10
  client = openai.OpenAI(base_url="http://localhost:11434/v1")
11
  jinja = jinja2.Environment()
12
  chroma_client = chromadb.Client()
13
  LLM_CACHE = {}
14
  ENV = 'LLM logic'
15
+ one_by_one.register(ENV)
16
  op = ops.op_registration(ENV)
17
 
 
 
 
 
 
 
 
 
 
 
18
  def chat(*args, **kwargs):
19
  key = json.dumps({'args': args, 'kwargs': kwargs})
20
  if key not in LLM_CACHE:
 
53
  return [{**input, 'response': r} for r in results]
54
 
55
  @op("View", view="table_view")
56
+ def view(input, *, _ctx: one_by_one.Context):
57
  v = _ctx.last_result
58
  if v:
59
  columns = v['dataframes']['df']['columns']
 
71
  @ops.input_position(input="right")
72
  @ops.output_position(output="left")
73
  @op("Loop")
74
+ def loop(input, *, max_iterations: int = 3, _ctx: one_by_one.Context):
75
  '''Data can flow back here max_iterations-1 times.'''
76
  key = f'iterations-{_ctx.node.id}'
77
  input[key] = input.get(key, 0) + 1
 
81
  @op('Branch', outputs=['true', 'false'])
82
  def branch(input, *, expression: str):
83
  res = eval(expression, input)
84
+ return one_by_one.Output(output_handle=str(bool(res)).lower(), value=input)
85
 
86
  @ops.input_position(db="top")
87
  @op('RAG')
88
+ def rag(input, db, *, input_field='text', db_field='text', num_matches: int=10, _ctx: one_by_one.Context):
89
  last = _ctx.last_result
90
  if last:
91
  collection = last['_collection']
 
114
  p = p.replace(k.upper(), str(v))
115
  return p
116
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117