File size: 13,015 Bytes
db5855f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import sys
import time
import os
import subprocess  # nosec - disable B404:import-subprocess check
import csv
import json
import shutil
import platform

from argparse import ArgumentParser
from pathlib import Path
from typing import Dict, List, Optional, Tuple, TypedDict


ROOT = Path(__file__).parents[1]

NOTEBOOKS_DIR = Path("notebooks")


class NotebookStatus:
    SUCCESS = "SUCCESS"
    FAILED = "FAILED"
    TIMEOUT = "TIMEOUT"
    SKIPPED = "SKIPPED"
    NOT_RUN = "NOT_RUN"
    EMPTY = "EMPTY"


class NotebookReport(TypedDict):
    status: str
    path: Path
    duration: float = 0


TestPlan = Dict[Path, NotebookReport]


def parse_arguments():
    parser = ArgumentParser()
    parser.add_argument("--ignore_list", required=False, nargs="+")
    parser.add_argument("--test_list", required=False, nargs="+")
    parser.add_argument("--early_stop", action="store_true")
    parser.add_argument("--report_dir", default="report")
    parser.add_argument("--keep_artifacts", action="store_true")
    parser.add_argument("--collect_reports", action="store_true")
    parser.add_argument("--move_notebooks_dir")
    parser.add_argument("--job_name")
    parser.add_argument("--device_used")
    parser.add_argument("--upload_to_db")
    parser.add_argument(
        "--timeout",
        type=int,
        default=7200,
        help="Timeout for running single notebook in seconds",
    )
    return parser.parse_args()


def move_notebooks(nb_dir):
    current_notebooks_dir = ROOT / NOTEBOOKS_DIR
    shutil.copytree(current_notebooks_dir, nb_dir)


def collect_python_packages(output_file: Path):
    reqs = subprocess.check_output(
        [sys.executable, "-m", "pip", "freeze"],
        shell=(platform.system() == "Windows"),
    )
    with output_file.open("wb") as f:
        f.write(reqs)


def prepare_test_plan(test_list: Optional[List[str]], ignore_list: List[str], nb_dir: Optional[Path] = None) -> TestPlan:
    orig_nb_dir = ROOT / NOTEBOOKS_DIR
    notebooks_dir = nb_dir or orig_nb_dir
    notebooks: List[Path] = sorted(list([n for n in notebooks_dir.rglob("**/*.ipynb") if not n.name.startswith("test_")]))

    test_plan: TestPlan = {notebook.relative_to(notebooks_dir): NotebookReport(status="", path=notebook, duration=0) for notebook in notebooks}

    ignored_notebooks: List[Path] = []
    if ignore_list is not None:
        for ignore_item in ignore_list:
            if ignore_item.endswith(".txt"):
                # Paths to ignore files are provided to `--ignore_list` argument
                with open(ignore_item, "r") as f:
                    ignored_notebooks.extend(list(map(lambda line: Path(line.strip()), f.readlines())))
            else:
                # Ignored notebooks are provided as several items to `--ignore_list` argument
                ignored_notebooks.append(Path(ignore_item))
    try:
        ignored_notebooks = list(set(map(lambda n: n.relative_to(NOTEBOOKS_DIR), ignored_notebooks)))
    except ValueError:
        raise ValueError(
            f"Ignore list items should be relative to repo root (e.g. 'notebooks/subdir/notebook.ipynb').\nInvalid ignored notebooks: {ignored_notebooks}"
        )
    print(f"Ignored notebooks: {ignored_notebooks}")

    testing_notebooks: List[Path] = []
    if not test_list:
        testing_notebooks = [Path(n) for n in test_plan.keys()]
    elif len(test_list) == 1 and test_list[0].endswith(".txt"):
        with open(test_list[0], "r") as f:
            for line in f.readlines():
                changed_file_path = Path(line.strip())
                if changed_file_path.resolve() == (ROOT / "requirements.txt").resolve():
                    print("requirements.txt changed, check all notebooks")
                    testing_notebooks = [Path(n) for n in test_plan.keys()]
                    break
                if changed_file_path.suffix != ".ipynb":
                    continue
                try:
                    testing_notebook_path = changed_file_path.relative_to(NOTEBOOKS_DIR)
                except ValueError:
                    raise ValueError(
                        "Items in test list file should be relative to repo root (e.g. 'notebooks/subdir/notebook.ipynb').\n"
                        f"Invalid line: {changed_file_path}"
                    )
                testing_notebooks.append(testing_notebook_path)
    else:
        raise ValueError(
            "Testing notebooks should be provided to '--test_list' argument as a txt file or should be empty to test all notebooks.\n"
            f"Received test list: {test_list}"
        )
    testing_notebooks = list(set(testing_notebooks))
    print(f"Testing notebooks: {testing_notebooks}")

    for notebook in test_plan:
        if notebook not in testing_notebooks:
            test_plan[notebook]["status"] = NotebookStatus.SKIPPED
        if notebook in ignored_notebooks:
            test_plan[notebook]["status"] = NotebookStatus.SKIPPED
    return test_plan


def clean_test_artifacts(before_test_files: List[Path], after_test_files: List[Path]):
    for file_path in after_test_files:
        if file_path in before_test_files or not file_path.exists():
            continue
        if file_path.is_file():
            try:
                file_path.unlink()
            except Exception:
                pass
        else:
            shutil.rmtree(file_path, ignore_errors=True)


def get_openvino_version() -> str:
    try:
        import openvino as ov

        version = ov.get_version()
    except ImportError:
        print("Openvino is missing in validation environment.")
        version = "Openvino is missing"
    return version


def run_test(notebook_path: Path, root, timeout=7200, keep_artifacts=False, report_dir=".") -> Optional[Tuple[str, int, float, str, str]]:
    os.environ["HUGGINGFACE_HUB_CACHE"] = str(notebook_path.parent)
    print(f"RUN {notebook_path.relative_to(root)}", flush=True)
    result = None

    if notebook_path.is_dir():
        print(f'Notebook path "{notebook_path}" is a directory, but path to "*.ipynb" file was expected.')
        return result
    if notebook_path.suffix != ".ipynb":
        print(f'Notebook path "{notebook_path}" should have "*.ipynb" extension.')
        return result

    with cd(notebook_path.parent):
        files_before_test = sorted(Path(".").iterdir())
        ov_version_before = get_openvino_version()
        patched_notebook = Path(f"test_{notebook_path.name}")
        if not patched_notebook.exists():
            print(f'Patched notebook "{patched_notebook}" does not exist.')
            return result

        collect_python_packages(report_dir / (patched_notebook.stem + "_env_before.txt"))

        main_command = [sys.executable, "-m", "treon", str(patched_notebook)]
        start = time.perf_counter()
        try:
            retcode = subprocess.run(
                main_command,
                shell=(platform.system() == "Windows"),
                timeout=timeout,
            ).returncode
        except subprocess.TimeoutExpired:
            retcode = -42
        duration = time.perf_counter() - start
        ov_version_after = get_openvino_version()
        result = (str(patched_notebook), retcode, duration, ov_version_before, ov_version_after)

        if not keep_artifacts:
            clean_test_artifacts(files_before_test, sorted(Path(".").iterdir()))
        collect_python_packages(report_dir / (patched_notebook.stem + "_env_after.txt"))

    return result


def finalize_status(failed_notebooks: List[str], timeout_notebooks: List[str], test_plan: TestPlan, report_dir: Path, root: Path) -> int:
    return_status = 0
    if failed_notebooks:
        return_status = 1
        print("FAILED: \n{}".format("\n".join(failed_notebooks)))
    if timeout_notebooks:
        print("FAILED BY TIMEOUT: \n{}".format("\n".join(timeout_notebooks)))
    test_report = []
    for notebook, status in test_plan.items():
        test_status = status["status"] or NotebookStatus.NOT_RUN
        test_report.append(
            {"name": notebook.as_posix(), "status": test_status, "full_path": str(status["path"].relative_to(root)), "duration": status["duration"]}
        )
    with (report_dir / "test_report.csv").open("w") as f:
        writer = csv.DictWriter(f, fieldnames=["name", "status", "full_path", "duration"])
        writer.writeheader()
        writer.writerows(test_report)
    return return_status


class cd:
    """Context manager for changing the current working directory"""

    def __init__(self, new_path):
        self.new_path = os.path.expanduser(new_path)

    def __enter__(self):
        self.saved_path = os.getcwd()
        os.chdir(self.new_path)

    def __exit__(self, etype, value, traceback):
        os.chdir(self.saved_path)


def write_single_notebook_report(

    base_version: str,

    notebook_name: str,

    status_code: int,

    duration: float,

    ov_version_before: str,

    ov_version_after: str,

    job_name: str,

    device_used: str,

    saving_dir: Path,

) -> Path:
    report_file = saving_dir / notebook_name.replace(".ipynb", ".json")
    report = {
        "version": base_version,
        "notebook_name": notebook_name.replace("test_", ""),
        "status": status_code,
        "duration": duration,
        "ov_version_before": ov_version_before,
        "ov_version_after": ov_version_after,
        "job_name": job_name,
        "device_used": device_used,
    }
    with report_file.open("w") as f:
        json.dump(report, f)
    return report_file


def main():
    failed_notebooks = []
    timeout_notebooks = []
    args = parse_arguments()
    reports_dir = Path(args.report_dir)
    reports_dir.mkdir(exist_ok=True, parents=True)
    notebooks_moving_dir = args.move_notebooks_dir
    root = ROOT
    if notebooks_moving_dir is not None:
        notebooks_moving_dir = Path(notebooks_moving_dir)
        root = notebooks_moving_dir.parent
        move_notebooks(notebooks_moving_dir)

    keep_artifacts = False
    if args.keep_artifacts:
        keep_artifacts = True

    base_version = get_openvino_version()

    test_plan = prepare_test_plan(args.test_list, args.ignore_list, notebooks_moving_dir)
    for notebook, report in test_plan.items():
        if report["status"] == NotebookStatus.SKIPPED:
            continue
        test_result = run_test(report["path"], root, args.timeout, keep_artifacts, reports_dir.absolute())
        timing = 0
        if not test_result:
            print(f'Testing notebooks "{str(notebook)}" is not found.')
            report["status"] = NotebookStatus.EMPTY
            report["duration"] = timing
        else:
            patched_notebook, status_code, duration, ov_version_before, ov_version_after = test_result
            if status_code:
                if status_code == -42:
                    status = NotebookStatus.TIMEOUT
                    timeout_notebooks.append(patched_notebook)
                else:
                    status = NotebookStatus.FAILED
                    failed_notebooks.append(patched_notebook)
                report["status"] = status
            else:
                report["status"] = NotebookStatus.SUCCESS if not report["status"] in [NotebookStatus.TIMEOUT, NotebookStatus.FAILED] else report["status"]

            timing += duration
            report["duration"] = timing
            if args.collect_reports:
                job_name = args.job_name or "Unknown"
                device_used = args.device_used or "Unknown"
                report_path = write_single_notebook_report(
                    base_version, patched_notebook, status_code, duration, ov_version_before, ov_version_after, job_name, device_used, reports_dir
                )
                if args.upload_to_db:
                    cmd = [sys.executable, args.upload_to_db, report_path]
                    print(f"\nUploading {report_path} to database. CMD: {cmd}")
                    try:
                        dbprocess = subprocess.Popen(
                            cmd, shell=(platform.system() == "Windows"), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True
                        )
                        for line in dbprocess.stdout:
                            sys.stdout.write(line)
                            sys.stdout.flush()
                    except subprocess.CalledProcessError as e:
                        print(e.output)

            if args.early_stop:
                break

    exit_status = finalize_status(failed_notebooks, timeout_notebooks, test_plan, reports_dir, root)
    return exit_status


if __name__ == "__main__":
    exit_code = main()
    sys.exit(exit_code)