Spaces:
Runtime error
Runtime error
codellama-CodeLlama-7b-hf
/
Llama2-Code-Interpreter-main
/OpenCodeInterpreter
/evaluation
/evalplus
/tools
/tsr
/minimization.py
import argparse | |
import json | |
import os | |
import pickle | |
from concurrent.futures import ProcessPoolExecutor, as_completed | |
from copy import deepcopy | |
from typing import Any, Dict, List, Optional, Tuple | |
from rich.progress import track | |
from evalplus.data import write_jsonl | |
from tools.tsr.coverage_init import collect_coverage_info | |
from tools.tsr.mutation_init import collect_mutation_info | |
from tools.tsr.sample_init import collect_sample_info | |
from tools.tsr.utils import get_problems, get_task_ids, to_path | |
def global_util_init(dataset: str): | |
global problems | |
global task_ids | |
global problem_count | |
problems = get_problems(dataset) | |
task_ids = get_task_ids(dataset) | |
problem_count = len(problems) | |
########################### | |
# Greedy Min Set Covering # | |
########################### | |
def merge_set_cover(*args) -> Dict[str, List[str]]: | |
merged_set_cover = {task_id: [] for task_id in task_ids} | |
for set_cover_dict in args: | |
for task_id, plus_tests in set_cover_dict.items(): | |
for plus_test in plus_tests: | |
if plus_test not in merged_set_cover[task_id]: | |
merged_set_cover[task_id].append(plus_test) | |
return merged_set_cover | |
def greedy_cover( | |
task_id: str, tests: Dict[str, List[Any]], exclude_model: str | |
) -> Tuple[str, List[str]]: | |
q, U = [], set() | |
for test_name, test_cover in tests.items(): | |
cover_set = set() | |
for model_path, i_code in test_cover: | |
if exclude_model not in model_path: | |
cover_set.add((model_path, i_code)) | |
q.append((test_name, cover_set)) | |
U = U.union(cover_set) | |
# Greedy algorithm for min set cover | |
min_cover = [] | |
while len(U) > 0: | |
max_uncover_set, max_test_name = {}, "" | |
for test_name, cover_set in q: | |
if len(cover_set) > len(max_uncover_set): | |
max_uncover_set = cover_set | |
max_test_name = test_name | |
min_cover.append(max_test_name) | |
U = U - max_uncover_set | |
qq = [] | |
for test_name, cover_set in q: | |
new_cover_set = U.intersection(cover_set) | |
if len(new_cover_set) != 0: | |
qq.append((test_name, new_cover_set)) | |
q = qq | |
return task_id, min_cover | |
def parallel_greedy_cover( | |
info_dict: Optional[Dict[str, Dict[str, List[Any]]]], | |
exclude_model: str, | |
type: str, | |
**kwargs, | |
) -> Dict[str, List[str]]: | |
plus_tests = {task_id: [] for task_id in task_ids} | |
with ProcessPoolExecutor(max_workers=32) as executor: | |
futures = [] | |
for task_id in task_ids: | |
if type == "sample": | |
path_task_id = to_path(task_id) | |
sample_dir = kwargs["sample_dir"] | |
with open(os.path.join(sample_dir, f"{path_task_id}.pkl"), "rb") as f: | |
td = pickle.load(f) | |
args = (task_id, td, exclude_model) | |
else: | |
args = (task_id, info_dict[task_id], exclude_model) | |
futures.append(executor.submit(greedy_cover, *args)) | |
for future in track(as_completed(futures), f"min set cover :: {type}"): | |
task_id, min_cover = future.result() | |
plus_tests[task_id] = min_cover | |
return plus_tests | |
##################### | |
# Collect Set Cover # | |
##################### | |
def get_coverage_set_cover( | |
coverage_dir: str, exclude_model: str, dataset: str | |
) -> Dict[str, List[str]]: | |
coverage_info_dict = collect_coverage_info(coverage_dir, dataset) | |
return parallel_greedy_cover(coverage_info_dict, exclude_model, "coverage") | |
def get_mutation_set_cover( | |
mutation_dir: str, exclude_model: str, dataset: str | |
) -> Dict[str, List[str]]: | |
mutation_info_dict = collect_mutation_info( | |
os.path.join(mutation_dir, "eval_results.json"), dataset | |
) | |
return parallel_greedy_cover(mutation_info_dict, exclude_model, "mutation") | |
def get_sample_set_cover( | |
sample_dir: str, sample_eval_dir: str, exclude_model: str, dataset: str | |
) -> Dict[str, List[str]]: | |
collect_sample_info(sample_dir, sample_eval_dir, dataset) | |
return parallel_greedy_cover(None, exclude_model, "sample", sample_dir=sample_dir) | |
################# | |
# pass@1 greedy # | |
################# | |
def compute_avg_test(set_cover_info: Dict[str, List[str]]) -> float: | |
sum_tests = sum( | |
len(problems[task_id]["base_input"]) + len(set_cover_info[task_id]) | |
for task_id in task_ids | |
) | |
return sum_tests / problem_count | |
def gen_report(set_cover_info: Dict[str, List[str]], sample_eval_dir: str, model: str): | |
tsr_dict = {"ntests": compute_avg_test(set_cover_info), "pass@1": 0} | |
model_path = os.path.join(sample_eval_dir, f"{model}_temp_0.0", "eval_results.json") | |
with open(model_path, "r") as f: | |
mdict = json.load(f) | |
correct_cnt = 0 | |
for task_id in task_ids: | |
legacy_task_id = task_id | |
if legacy_task_id not in mdict["eval"]: | |
legacy_task_id = legacy_task_id.replace("/", "_") | |
if mdict["eval"][legacy_task_id]["base"][0][0] != "success": | |
continue | |
correct = True | |
for plus_id in set_cover_info[task_id]: | |
index = int(plus_id.split("_")[-1]) | |
if mdict["eval"][legacy_task_id]["plus"][0][1][index] == False: | |
correct = False | |
break | |
if correct: | |
correct_cnt += 1 | |
tsr_dict["pass@1"] = correct_cnt / problem_count | |
return tsr_dict | |
def dump_humaneval_plus_mini(set_cover_info: Dict[str, List[str]], mini_path: str): | |
new_problems = [] | |
for task_id in task_ids: | |
otask = problems[task_id] | |
task = { | |
"task_id": task_id, | |
"prompt": otask["prompt"], | |
"contract": otask["contract"], | |
"canonical_solution": otask["canonical_solution"], | |
"entry_point": otask["entry_point"], | |
"base_input": otask["base_input"], | |
"plus_input": [], | |
"atol": otask["atol"], | |
} | |
for plus_test in set_cover_info[task_id]: | |
index = int(plus_test.split("_")[-1]) | |
task["plus_input"].append(otask["plus_input"][index]) | |
new_problems.append(deepcopy(task)) | |
write_jsonl(os.path.join(mini_path, "HumanEvalPlus-Mini.jsonl"), new_problems) | |
def main(flags): | |
coverage_dir = os.path.join(flags.report_dir, "coverage_cache") | |
mutation_dir = os.path.join(flags.report_dir, "mutation_cache") | |
sample_dir = os.path.join(flags.report_dir, "sample_cache") | |
os.makedirs(flags.report_dir, exist_ok=True) | |
exclude_model: str = flags.model | |
if exclude_model.endswith("b"): # format: model_name + parameter size | |
exclude_model = "".join(exclude_model.split("-")[:-1]) | |
coverage_set_cover = get_coverage_set_cover( | |
coverage_dir, exclude_model, flags.dataset | |
) | |
mutation_set_cover = get_mutation_set_cover( | |
mutation_dir, exclude_model, flags.dataset | |
) | |
sample_set_cover = get_sample_set_cover( | |
sample_dir, flags.sample_eval_dir, exclude_model, flags.dataset | |
) | |
merged_set_cover = merge_set_cover( | |
coverage_set_cover, mutation_set_cover, sample_set_cover | |
) | |
if flags.model != "ALL": | |
final_report = dict() | |
# Stage 1: Coverage min set cover | |
final_report["coverage"] = gen_report( | |
coverage_set_cover, flags.sample_eval_dir, flags.model | |
) | |
# Stage 2: Mutation min set cover | |
final_report["mutation"] = gen_report( | |
mutation_set_cover, flags.sample_eval_dir, flags.model | |
) | |
# Stage 3: Sampling min set cover | |
final_report["sample"] = gen_report( | |
sample_set_cover, flags.sample_eval_dir, flags.model | |
) | |
# Stage 4: All | |
final_report["full"] = gen_report( | |
merged_set_cover, flags.sample_eval_dir, flags.model | |
) | |
with open( | |
os.path.join(flags.report_dir, f"report_{flags.model}.json"), "w" | |
) as f: | |
json.dump(final_report, f, indent=4) | |
else: | |
dump_humaneval_plus_mini(merged_set_cover, flags.mini_path) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--model", required=True, type=str, help="Model for testing") | |
parser.add_argument("--dataset", type=str, choices=["humaneval", "mbpp"]) | |
parser.add_argument( | |
"--report_dir", type=str, help="Path to JSON report and cache files" | |
) | |
parser.add_argument( | |
"--sample_eval_dir", type=str, help="Path to sample evaluation files" | |
) | |
parser.add_argument("--mini_path", type=str, help="Path to Mini Dataset") | |
args = parser.parse_args() | |
global_util_init(args.dataset) | |
main(args) | |