JMLizano JMLizano commited on
Commit
10c9dc3
·
unverified ·
1 Parent(s): 560385e

Single import box for multiple file formats (#80)

Browse files

* Add the ability to override automatic parameter creation

* Add a new ParameterGroup class

* Correctly process kwargs in operations as parameters, not inputs

---------

Co-authored-by: JMLizano <[email protected]>

biome.json CHANGED
@@ -10,6 +10,9 @@
10
  "noExplicitAny": "off",
11
  "noArrayIndexKey": "off"
12
  },
 
 
 
13
  "style": {
14
  "noNonNullAssertion": "off"
15
  },
 
10
  "noExplicitAny": "off",
11
  "noArrayIndexKey": "off"
12
  },
13
+ "correctness": {
14
+ "useExhaustiveDependencies": "off"
15
+ },
16
  "style": {
17
  "noNonNullAssertion": "off"
18
  },
lynxkite-app/.gitignore CHANGED
@@ -2,3 +2,4 @@
2
  !/src/lynxkite_app/web_assets/__init__.py
3
  !/src/lynxkite_app/web_assets/assets/__init__.py
4
  data/
 
 
2
  !/src/lynxkite_app/web_assets/__init__.py
3
  !/src/lynxkite_app/web_assets/assets/__init__.py
4
  data/
5
+ !/web/tests/data
lynxkite-app/web/src/workspace/nodes/NodeGroupParameter.tsx ADDED
@@ -0,0 +1,67 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { useEffect, useState } from "react";
2
+ import NodeParameter from "./NodeParameter";
3
+
4
+ interface SelectorType {
5
+ name: string;
6
+ default: string;
7
+ type: {
8
+ enum: string[];
9
+ };
10
+ }
11
+
12
+ interface ParameterType {
13
+ name: string;
14
+ default: string;
15
+ type: {
16
+ type: string;
17
+ };
18
+ }
19
+
20
+ interface GroupsType {
21
+ [key: string]: ParameterType[];
22
+ }
23
+
24
+ interface NodeGroupParameterProps {
25
+ meta: { selector: SelectorType; groups: GroupsType };
26
+ value: any;
27
+ setParam: (name: string, value: any, options?: { delay: number }) => void;
28
+ deleteParam: (name: string, options?: { delay: number }) => void;
29
+ }
30
+
31
+ export default function NodeGroupParameter({
32
+ meta,
33
+ value,
34
+ setParam,
35
+ deleteParam,
36
+ }: NodeGroupParameterProps) {
37
+ const selector = meta.selector;
38
+ const groups = meta.groups;
39
+ const [selectedValue, setSelectedValue] = useState<string>(
40
+ value || selector.default,
41
+ );
42
+
43
+ const handleSelectorChange = (value: any, opts?: { delay: number }) => {
44
+ setSelectedValue(value);
45
+ setParam(selector.name, value, opts);
46
+ };
47
+
48
+ useEffect(() => {
49
+ // Clean possible previous parameters first
50
+ Object.values(groups).flatMap((group) =>
51
+ group.map((entry) => deleteParam(entry.name)),
52
+ );
53
+ for (const param of groups[selectedValue]) {
54
+ setParam(param.name, param.default);
55
+ }
56
+ }, [selectedValue]);
57
+
58
+ return (
59
+ <NodeParameter
60
+ name={selector.name}
61
+ key={selector.name}
62
+ value={selectedValue}
63
+ meta={selector}
64
+ onChange={handleSelectorChange}
65
+ />
66
+ );
67
+ }
lynxkite-app/web/src/workspace/nodes/NodeWithParams.tsx CHANGED
@@ -1,5 +1,6 @@
1
  import { useReactFlow } from "@xyflow/react";
2
  import LynxKiteNode from "./LynxKiteNode";
 
3
  import NodeParameter from "./NodeParameter";
4
 
5
  export type UpdateOptions = { delay?: number };
@@ -7,9 +8,22 @@ export type UpdateOptions = { delay?: number };
7
  function NodeWithParams(props: any) {
8
  const reactFlow = useReactFlow();
9
  const metaParams = props.data.meta?.params;
 
10
  function setParam(name: string, newValue: any, opts: UpdateOptions) {
 
 
 
 
 
 
 
 
 
 
 
 
11
  reactFlow.updateNodeData(props.id, {
12
- params: { ...props.data.params, [name]: newValue },
13
  __execution_delay: opts.delay || 0,
14
  });
15
  }
@@ -17,17 +31,31 @@ function NodeWithParams(props: any) {
17
 
18
  return (
19
  <LynxKiteNode {...props}>
20
- {params.map(([name, value]) => (
21
- <NodeParameter
22
- name={name}
23
- key={name}
24
- value={value}
25
- meta={metaParams?.[name]}
26
- onChange={(value: any, opts?: UpdateOptions) =>
27
- setParam(name, value, opts || {})
28
- }
29
- />
30
- ))}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
  {props.children}
32
  </LynxKiteNode>
33
  );
 
1
  import { useReactFlow } from "@xyflow/react";
2
  import LynxKiteNode from "./LynxKiteNode";
3
+ import NodeGroupParameter from "./NodeGroupParameter";
4
  import NodeParameter from "./NodeParameter";
5
 
6
  export type UpdateOptions = { delay?: number };
 
8
  function NodeWithParams(props: any) {
9
  const reactFlow = useReactFlow();
10
  const metaParams = props.data.meta?.params;
11
+
12
  function setParam(name: string, newValue: any, opts: UpdateOptions) {
13
+ reactFlow.updateNodeData(props.id, (prevData: any) => ({
14
+ ...prevData,
15
+ params: { ...prevData.data.params, [name]: newValue },
16
+ __execution_delay: opts.delay || 0,
17
+ }));
18
+ }
19
+
20
+ function deleteParam(name: string, opts: UpdateOptions) {
21
+ if (props.data.params[name] === undefined) {
22
+ return;
23
+ }
24
+ delete props.data.params[name];
25
  reactFlow.updateNodeData(props.id, {
26
+ params: { ...props.data.params },
27
  __execution_delay: opts.delay || 0,
28
  });
29
  }
 
31
 
32
  return (
33
  <LynxKiteNode {...props}>
34
+ {params.map(([name, value]) =>
35
+ metaParams?.[name]?.type === "group" ? (
36
+ <NodeGroupParameter
37
+ key={name}
38
+ value={value}
39
+ meta={metaParams?.[name]}
40
+ setParam={(name: string, value: any, opts?: UpdateOptions) =>
41
+ setParam(name, value, opts || {})
42
+ }
43
+ deleteParam={(name: string, opts?: UpdateOptions) =>
44
+ deleteParam(name, opts || {})
45
+ }
46
+ />
47
+ ) : (
48
+ <NodeParameter
49
+ name={name}
50
+ key={name}
51
+ value={value}
52
+ meta={metaParams?.[name]}
53
+ onChange={(value: any, opts?: UpdateOptions) =>
54
+ setParam(name, value, opts || {})
55
+ }
56
+ />
57
+ ),
58
+ )}
59
  {props.children}
60
  </LynxKiteNode>
61
  );
lynxkite-app/web/tests/data/{upload_test.csv → import_test.csv} RENAMED
File without changes
lynxkite-app/web/tests/data/import_test.json ADDED
@@ -0,0 +1 @@
 
 
1
+ { "name": { "0": "Adam", "1": "Eve", "2": "Bob", "3": "Isolated Joe" } }
lynxkite-app/web/tests/data/import_test.parquet ADDED
Binary file (1.13 kB). View file
 
lynxkite-app/web/tests/data/import_test.xlsx ADDED
Binary file (4.96 kB). View file
 
lynxkite-app/web/tests/{upload.spec.ts → import.spec.ts} RENAMED
@@ -9,7 +9,7 @@ let workspace: Workspace;
9
  test.beforeEach(async ({ browser }) => {
10
  workspace = await Workspace.empty(
11
  await browser.newPage(),
12
- "upload_spec_test",
13
  );
14
  });
15
 
@@ -19,25 +19,57 @@ test.afterEach(async () => {
19
  splash.page.on("dialog", async (dialog) => {
20
  await dialog.accept();
21
  });
22
- await splash.deleteEntry("upload_spec_test");
23
  });
24
 
25
- test("can upload and import a simple CSV", async () => {
 
 
 
 
26
  const __filename = fileURLToPath(import.meta.url);
27
  const __dirname = dirname(__filename);
28
- const csvPath = join(__dirname, "data", "upload_test.csv");
29
 
30
- await workspace.addBox("Import CSV");
31
- const csvBox = workspace.getBox("Import CSV 1");
32
- const filenameInput = csvBox.locator("input.input-bordered").nth(0);
33
- await filenameInput.click();
34
- await filenameInput.fill(csvPath);
35
- await filenameInput.press("Enter");
 
 
 
 
 
 
 
 
 
 
 
 
36
 
37
  await workspace.addBox("View tables");
38
  const tableBox = workspace.getBox("View tables 1");
39
- await workspace.connectBoxes("Import CSV 1", "View tables 1");
40
 
41
  const tableRows = tableBox.locator("table tbody tr");
42
  await expect(tableRows).toHaveCount(4);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  });
 
9
  test.beforeEach(async ({ browser }) => {
10
  workspace = await Workspace.empty(
11
  await browser.newPage(),
12
+ "import_spec_test",
13
  );
14
  });
15
 
 
19
  splash.page.on("dialog", async (dialog) => {
20
  await dialog.accept();
21
  });
22
+ await splash.deleteEntry("import_spec_test");
23
  });
24
 
25
+ async function validateImport(
26
+ workspace: Workspace,
27
+ fileName: string,
28
+ fileFormat: string,
29
+ ) {
30
  const __filename = fileURLToPath(import.meta.url);
31
  const __dirname = dirname(__filename);
32
+ const filePath = join(__dirname, "data", fileName);
33
 
34
+ await workspace.addBox("Import file");
35
+ const importBox = workspace.getBox("Import file 1");
36
+ const fileFormatSelect = await importBox
37
+ .locator("label.param", { hasText: "file format" })
38
+ .locator("select");
39
+ await fileFormatSelect.selectOption(fileFormat);
40
+ const filePathInput = await importBox
41
+ .locator("label.param", { hasText: "file path" })
42
+ .locator("input");
43
+ await filePathInput.click();
44
+ await filePathInput.fill(filePath);
45
+ await filePathInput.press("Enter");
46
+ const tableNameInput = await importBox
47
+ .locator("label.param", { hasText: "table name" })
48
+ .locator("input");
49
+ await tableNameInput.click();
50
+ await tableNameInput.fill("table");
51
+ await tableNameInput.press("Enter");
52
 
53
  await workspace.addBox("View tables");
54
  const tableBox = workspace.getBox("View tables 1");
55
+ await workspace.connectBoxes("Import file 1", "View tables 1");
56
 
57
  const tableRows = tableBox.locator("table tbody tr");
58
  await expect(tableRows).toHaveCount(4);
59
+ }
60
+
61
+ test("Can import a CSV file", async () => {
62
+ await validateImport(workspace, "import_test.csv", "csv");
63
+ });
64
+
65
+ test("Can import a parquet file", async () => {
66
+ await validateImport(workspace, "import_test.parquet", "parquet");
67
+ });
68
+
69
+ test("Can import a JSON file", async () => {
70
+ await validateImport(workspace, "import_test.json", "json");
71
+ });
72
+
73
+ test("Can import an Excel file", async () => {
74
+ await validateImport(workspace, "import_test.xlsx", "excel");
75
  });
lynxkite-app/web/tests/lynxkite.ts CHANGED
@@ -155,7 +155,7 @@ export class Workspace {
155
  await new Promise((resolve) =>
156
  setTimeout(resolve, executionWaitTime ? executionWaitTime : 500),
157
  );
158
- await expect(this.getBoxes().locator(".error")).not.toBeVisible();
159
  }
160
 
161
  async close() {
 
155
  await new Promise((resolve) =>
156
  setTimeout(resolve, executionWaitTime ? executionWaitTime : 500),
157
  );
158
+ await expect(this.getBoxes().locator(".error").first()).not.toBeVisible();
159
  }
160
 
161
  async close() {
lynxkite-core/src/lynxkite/core/ops.py CHANGED
@@ -75,6 +75,16 @@ class Parameter(BaseConfig):
75
  return Parameter(name=name, default=default, type=type)
76
 
77
 
 
 
 
 
 
 
 
 
 
 
78
  class Input(BaseConfig):
79
  name: str
80
  type: Type
@@ -116,7 +126,7 @@ def basic_outputs(*names):
116
  class Op(BaseConfig):
117
  func: typing.Callable = pydantic.Field(exclude=True)
118
  name: str
119
- params: dict[str, Parameter]
120
  inputs: dict[str, Input]
121
  outputs: dict[str, Output]
122
  # TODO: Make type an enum with the possible values.
@@ -148,7 +158,7 @@ class Op(BaseConfig):
148
  return res
149
 
150
 
151
- def op(env: str, name: str, *, view="basic", outputs=None):
152
  """Decorator for defining an operation."""
153
 
154
  def decorator(func):
@@ -157,12 +167,14 @@ def op(env: str, name: str, *, view="basic", outputs=None):
157
  inputs = {
158
  name: Input(name=name, type=param.annotation)
159
  for name, param in sig.parameters.items()
160
- if param.kind != param.KEYWORD_ONLY
161
  }
162
- params = {}
163
  for n, param in sig.parameters.items():
164
  if param.kind == param.KEYWORD_ONLY and not n.startswith("_"):
165
- params[n] = Parameter.basic(n, param.default, param.annotation)
 
 
166
  if outputs:
167
  _outputs = {name: Output(name=name, type=None) for name in outputs}
168
  else:
@@ -172,7 +184,7 @@ def op(env: str, name: str, *, view="basic", outputs=None):
172
  op = Op(
173
  func=func,
174
  name=name,
175
- params=params,
176
  inputs=inputs,
177
  outputs=_outputs,
178
  type=view,
 
75
  return Parameter(name=name, default=default, type=type)
76
 
77
 
78
+ class ParameterGroup(BaseConfig):
79
+ """Defines a group of parameters for an operation."""
80
+
81
+ name: str
82
+ selector: Parameter
83
+ default: typing.Any
84
+ groups: dict[str, list[Parameter]]
85
+ type: str = "group"
86
+
87
+
88
  class Input(BaseConfig):
89
  name: str
90
  type: Type
 
126
  class Op(BaseConfig):
127
  func: typing.Callable = pydantic.Field(exclude=True)
128
  name: str
129
+ params: dict[str, Parameter | ParameterGroup]
130
  inputs: dict[str, Input]
131
  outputs: dict[str, Output]
132
  # TODO: Make type an enum with the possible values.
 
158
  return res
159
 
160
 
161
+ def op(env: str, name: str, *, view="basic", outputs=None, params=None):
162
  """Decorator for defining an operation."""
163
 
164
  def decorator(func):
 
167
  inputs = {
168
  name: Input(name=name, type=param.annotation)
169
  for name, param in sig.parameters.items()
170
+ if param.kind not in (param.KEYWORD_ONLY, param.VAR_KEYWORD)
171
  }
172
+ _params = {}
173
  for n, param in sig.parameters.items():
174
  if param.kind == param.KEYWORD_ONLY and not n.startswith("_"):
175
+ _params[n] = Parameter.basic(n, param.default, param.annotation)
176
+ if params:
177
+ _params.update(params)
178
  if outputs:
179
  _outputs = {name: Output(name=name, type=None) for name in outputs}
180
  else:
 
184
  op = Op(
185
  func=func,
186
  name=name,
187
+ params=_params,
188
  inputs=inputs,
189
  outputs=_outputs,
190
  type=view,
lynxkite-graph-analytics/src/lynxkite_graph_analytics/lynxkite_ops.py CHANGED
@@ -1,5 +1,6 @@
1
  """Graph analytics operations."""
2
 
 
3
  import os
4
  import fsspec
5
  from lynxkite.core import ops
@@ -18,6 +19,68 @@ mem = joblib.Memory("../joblib-cache")
18
  op = ops.op_registration(core.ENV)
19
 
20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  @op("Import Parquet")
22
  def import_parquet(*, filename: str):
23
  """Imports a Parquet file."""
 
1
  """Graph analytics operations."""
2
 
3
+ import enum
4
  import os
5
  import fsspec
6
  from lynxkite.core import ops
 
19
  op = ops.op_registration(core.ENV)
20
 
21
 
22
+ class FileFormat(enum.StrEnum):
23
+ csv = "csv"
24
+ parquet = "parquet"
25
+ json = "json"
26
+ excel = "excel"
27
+
28
+
29
+ @op(
30
+ "Import file",
31
+ params={
32
+ "file_format": ops.ParameterGroup(
33
+ name="file_format",
34
+ selector=ops.Parameter(
35
+ name="file_format", type=FileFormat, default=FileFormat.csv
36
+ ),
37
+ groups={
38
+ "csv": [
39
+ ops.Parameter.basic("columns", type=str, default="<from file>"),
40
+ ops.Parameter.basic("separator", type=str, default="<auto>"),
41
+ ],
42
+ "parquet": [],
43
+ "json": [],
44
+ "excel": [
45
+ ops.Parameter.basic("sheet_name", type=str, default="Sheet1")
46
+ ],
47
+ },
48
+ default=FileFormat.csv,
49
+ ),
50
+ },
51
+ )
52
+ def import_file(
53
+ *, file_path: str, table_name: str, file_format: FileFormat, **kwargs
54
+ ) -> core.Bundle:
55
+ """Read the contents of the a file into a `Bundle`.
56
+
57
+ Args:
58
+ file_path: Path to the file to import.
59
+ table_name: Name to use for identifying the table in the bundle.
60
+ file_format: Format of the file. Has to be one of the values in the `FileFormat` enum.
61
+
62
+ Returns:
63
+ Bundle: Bundle with a single table with the contents of the file.
64
+ """
65
+ if file_format == "csv":
66
+ names = kwargs.pop("columns", "<from file>")
67
+ names = (
68
+ pd.api.extensions.no_default if names == "<from file>" else names.split(",")
69
+ )
70
+ sep = kwargs.pop("separator", "<auto>")
71
+ sep = pd.api.extensions.no_default if sep == "<auto>" else sep
72
+ df = pd.read_csv(file_path, names=names, sep=sep, **kwargs)
73
+ elif file_format == "json":
74
+ df = pd.read_json(file_path, **kwargs)
75
+ elif file_format == "parquet":
76
+ df = pd.read_parquet(file_path, **kwargs)
77
+ elif file_format == "excel":
78
+ df = pd.read_excel(file_path, **kwargs)
79
+ else:
80
+ df = ValueError(f"Unsupported file format: {file_format}")
81
+ return core.Bundle(dfs={table_name: df})
82
+
83
+
84
  @op("Import Parquet")
85
  def import_parquet(*, filename: str):
86
  """Imports a Parquet file."""