File size: 3,812 Bytes
c87c295
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import argparse
import json
import os
from typing import Any, Dict, List

from rich.progress import track

from evalplus.eval.utils import swallow_io
from evalplus.evaluate import evaluate
from tools.tsr.utils import (
    clean,
    execute_cmd,
    get_cmd_output,
    get_problems,
    get_task_ids,
    to_path,
)


def prepare_mutants(mutation_dir: str, dataset: str):
    pwd = os.getcwd()
    task_ids = get_task_ids(dataset)
    problems = get_problems(dataset)
    os.makedirs(mutation_dir, exist_ok=True)
    for task_id in track(task_ids, "Generating mutants"):
        task_dir = os.path.join(mutation_dir, to_path(task_id))
        os.makedirs(task_dir, exist_ok=True)
        if any(map(lambda filename: filename.startswith("m"), os.listdir(task_dir))):
            # already have mutants
            continue
        # Make groundtruth
        groundtruth_code = (
            problems[task_id]["prompt"] + problems[task_id]["canonical_solution"]
        )
        with open(os.path.join(task_dir, "gt.py"), "w") as f:
            f.write(groundtruth_code)
        # Make dummy pytest
        with open(os.path.join(task_dir, "test_dummy.py"), "w") as f:
            f.write("def test_dummy():\n    pass")
        # Use mutmut to generate mutants
        os.chdir(task_dir)
        clean(".mutmut-cache")
        execute_cmd(["mutmut run", "--paths-to-mutate=gt.py", "1>/dev/null"])
        try:
            # Collect metainfo
            total_mutants = int(
                get_cmd_output(["mutmut", "results"]).split("\n")[-2].split("-")[-1]
            )
        except:
            total_mutants = 0
        # Dump mutants
        for i in range(1, total_mutants + 1):
            execute_cmd(["cp", "gt.py", "gt_copy.py"])
            execute_cmd(["mutmut", "apply", str(i)])
            execute_cmd(["mv", "gt.py", f"m{i}.py"])
            execute_cmd(["mv", "gt_copy.py", "gt.py"])
        # Remove gt and dummy pytest
        execute_cmd(["rm", "gt.py"])
        execute_cmd(["rm", "test_dummy.py"])
        clean(".mutmut-cache")
        os.chdir(pwd)


def mutants_eval(mutation_dir: str, dataset: str):
    args = argparse.Namespace(
        dataset=dataset,
        samples=mutation_dir,
        base_only=False,
        parallel=None,
        i_just_wanna_run=False,
        test_details=True,
        min_time_limit=0.2,
        gt_time_limit_factor=4.0,
        mini=False,
    )
    print("Evaluating mutants... ", end="", flush=True)
    with swallow_io():
        evaluate(args)
    print("Done")


def collect_mutation_info(
    eval_path: str, dataset: str
) -> Dict[str, Dict[str, List[Any]]]:
    mutation_info = {task_id: {} for task_id in get_task_ids(dataset)}
    assert os.path.isfile(
        eval_path
    ), f"mutation testing result file {eval_path} missing!"
    eval_res = json.load(open(eval_path, "r"))["eval"]
    for task_id, v in eval_res.items():
        for i_code, (status, res_list) in enumerate(v["plus"]):
            if status == "success":
                continue
            for i_test, res in enumerate(res_list):
                test_id = f"plus_{i_test}"
                if res == False:
                    mutation_info[task_id].setdefault(test_id, []).append(
                        ("mutant", i_code)
                    )
    return mutation_info


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--dataset", type=str, choices=["humaneval", "mbpp"])
    parser.add_argument("--report_dir", required=True, type=str)
    args = parser.parse_args()

    mutation_dir = os.path.join(args.report_dir, "mutation_cache")
    prepare_mutants(mutation_dir, args.dataset)
    mutants_eval(mutation_dir, args.dataset)
    collect_mutation_info(os.path.join(mutation_dir, "eval_results.json"), args.dataset)