File size: 3,238 Bytes
c87c295
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import json
import os

from rich.progress import track

from evalplus.data import get_human_eval_plus, get_human_eval_plus_inputs

LLM_HOME_PATH = "/JawTitan/EvalPlus/humaneval"
model_paths = os.listdir(LLM_HOME_PATH)

problems = get_human_eval_plus().values()
new_inputs = get_human_eval_plus_inputs()
cover_info = {f"HumanEval_{i}": {} for i in range(164)}


# One dict is super huge, so split them into separate JSON files
def get_cover_info():
    for model_path in track(model_paths, description="Collecting sets..."):
        if not model_path[-1].isdigit():
            continue
        eval_json_path = os.path.join(LLM_HOME_PATH, model_path, "eval_results.json")
        if not os.path.exists(eval_json_path):
            continue
        with open(eval_json_path, "r") as f:
            res = json.load(f)["eval"]
            for task_id, v in res.items():
                for i_code, (status, res_list) in enumerate(v["base"]):
                    if status == "success":
                        continue
                    code_id = hash(v["files"][i_code])
                    for i_test, res in enumerate(res_list):
                        test_id = f"base_{i_test}"
                        if res == False:
                            cover_info[task_id].setdefault(test_id, []).append(code_id)
                for i_code, (status, res_list) in enumerate(v["plus"]):
                    if status == "success":
                        continue
                    code_id = hash(v["files"][i_code])
                    for i_test, res in enumerate(res_list):
                        test_id = f"plus_{i_test}"
                        if res == False:
                            cover_info[task_id].setdefault(test_id, []).append(code_id)


if __name__ == "__main__":
    get_cover_info()
    for i in track(range(164), description="Solving set covering..."):
        task_id = f"HumanEval_{i}"
        tests = cover_info[task_id]
        q, U = [], set()
        for test_name, test_cover in tests.items():
            cover_set = set(test_cover)
            q.append((test_name, cover_set))
            U = U.union(cover_set)
        # Greedy
        min_cover = []
        while len(U) > 0:
            max_uncover_set, max_test_name = {}, ""
            for test_name, cover_set in q:
                if len(cover_set) > len(max_uncover_set):
                    max_uncover_set = cover_set
                    max_test_name = test_name
            min_cover.append(max_test_name)
            U = U - max_uncover_set
            qq = []
            for test_name, cover_set in q:
                new_cover_set = U.intersection(cover_set)
                if len(new_cover_set) != 0:
                    qq.append((test_name, new_cover_set))
            q = qq

        d = {"task_id": task_id, "inputs": []}
        for test in min_cover:
            tmp = test.split("_")
            t, n = tmp[0], int(tmp[1])
            if t == "base":
                d["inputs"].append(problems[i]["base_input"][n])
            else:
                print(task_id, n)
                d["inputs"].append(new_inputs[task_id][n])
        with open("HumanEvalPlusInputsMin.jsonl", "a") as f:
            f.write(json.dumps(d) + "\n")