H2H-eval-comparator / mmlu_pro_eval_adapted.py
rohansampath's picture
Update mmlu_pro_eval_adapted.py
599b7a0 verified
raw
history blame
8.9 kB
# Adapted from https://github.com/TIGER-AI-Lab/MMLU-Pro/blob/main/evaluate_from_local.py
import csv
import json
import argparse
import os
import torch
import spaces
import random
import transformers
import time
import re
from vllm import LLM, SamplingParams
from tqdm import tqdm
import logging
import sys
from datasets import load_dataset
import pandas as pd
import numpy as mnp
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Can be found at https://github.com/TIGER-AI-Lab/MMLU-Pro/blob/main/cot_prompt_lib/initial_prompt.txt
initial_prompt = "The following are multiple choice questions (with answers) about {$}. Think step by step and then finish your answer with \"the answer is (X)\" where X is the correct letter choice."
choices = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P"]
max_model_length = 4096
max_new_tokens = 2048
def preprocess(test_df):
res_df = []
for each in test_df:
options = []
for opt in each["options"]:
if opt == "N/A":
continue
options.append(opt)
each["options"] = options
res_df.append(each)
return res_df
def load_mmlu_pro():
dataset = load_dataset("TIGER-Lab/MMLU-Pro")
test_df, val_df = dataset["test"], dataset["validation"]
test_df = preprocess(test_df)
val_df = preprocess(val_df)
return test_df, val_df
def load_model(model_name, gpu_utilization=0.8):
llm = LLM(model=model_name, gpu_memory_utilization=float(gpu_utilization),
tensor_parallel_size=torch.cuda.device_count(),
max_model_len=max_model_length,
trust_remote_code=True)
logger.info(f"Torch Device CUDA Count: {torch.cuda.device_count()}")
sampling_params = SamplingParams(temperature=0, max_tokens=max_new_tokens,
stop=["Question:"])
tokenizer = transformers.AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
return (llm, sampling_params), tokenizer
def format_cot_example(example, including_answer=True):
prompt = "Question:\n"
question = example["question"]
options = example["options"]
prompt += question + "\n"
prompt += "Options:\n"
for i, opt in enumerate(options):
prompt += "{}. {}\n".format(choices[i], opt)
if including_answer:
cot_content = example["cot_content"].replace("A: Let's think step by step.",
"Answer: Let's think step by step.")
prompt += cot_content + "\n\n"
else:
prompt += "Answer: Let's think step by step."
return prompt
def generate_cot_prompt(val_df, curr, k):
prompt = initial_prompt
subject = curr["category"]
# Assert that all rows in val_df have 'category' equal to subject
assert (val_df["category"] == subject).all(), "Not all rows in val_df have the correct category"
val_df = val_df[: k]
prompt = prompt.replace("{$}", subject) + "\n"
for example in val_df:
prompt += format_cot_example(example, including_answer=True)
prompt += format_cot_example(curr, including_answer=False)
return prompt
def extract_answer(text):
pattern = r"answer is \(?([A-J])\)?"
match = re.search(pattern, text)
if match:
return match.group(1)
else:
print("1st answer extract failed\n" + text)
return extract_again(text)
def extract_again(text):
match = re.search(r'.*[aA]nswer:\s*([A-J])', text)
if match:
return match.group(1)
else:
return extract_final(text)
def extract_final(text):
pattern = r"\b[A-J]\b(?!.*\b[A-J]\b)"
match = re.search(pattern, text, re.DOTALL)
if match:
return match.group(0)
else:
return None
def batch_inference(llm, sampling_params, inference_batch):
start = time.time()
outputs = llm.generate(inference_batch, sampling_params)
logging.info(str(len(inference_batch)) + "size batch costing time: " + str(time.time() - start))
response_batch = []
pred_batch = []
for output in outputs:
generated_text = output.outputs[0].text
response_batch.append(generated_text)
pred = extract_answer(generated_text)
pred_batch.append(pred)
logging.info("PRED BATCH: %s, RESPONSE BATCH: %s", pred_batch, response_batch)
return pred_batch, response_batch
def calculate_accuracy(res):
"""
Calculate accuracy and return an array of correctness (1 if correct, 0 if wrong)
along with the overall accuracy.
"""
correctness = []
for each in res:
if not each["pred"]:
# If prediction is None, use random choice with fixed seed
# This ensures reproducibility when handling missing predictions
random.seed(12345)
x = random.randint(0, len(each["options"]) - 1)
is_correct = 1 if x == each["answer_index"] else 0
else:
is_correct = 1 if each["pred"] == each["answer"] else 0
correctness.append(is_correct)
# Calculate accuracy from correctness array
if len(correctness) == 0:
return [], 0.0
accuracy = sum(correctness) / len(correctness)
return correctness, accuracy
@torch.no_grad()
def eval_cot(subject, model, tokenizer, val_df, test_df, num_shots=5):
llm, sampling_params = model
global choices
logging.info("evaluating " + subject)
inference_batches = []
k = num_shots
for i in tqdm(range(len(test_df))):
curr = test_df[i]
prompt_length_ok = False
prompt = None
while not prompt_length_ok:
prompt = generate_cot_prompt(val_df, curr, k)
inputs = tokenizer(prompt, return_tensors="pt")
inputs = {key: value.cuda() for key, value in inputs.items()}
length = len(inputs["input_ids"][0])
if length < max_model_length - max_new_tokens:
prompt_length_ok = True
k -= 1
inference_batches.append(prompt)
pred_batch, response_batch = batch_inference(llm, sampling_params, inference_batches)
results = []
for j, curr in enumerate(test_df):
curr["pred"] = pred_batch[j]
curr["model_outputs"] = response_batch[j]
results.append(curr)
# Get array of correctness and overall accuracy
correctness, accuracy = calculate_accuracy(results)
logging.info("This batch accuracy is: {}, correct samples: {}/{}\n".format(
str(accuracy), str(sum(correctness)), str(len(correctness))))
return correctness, accuracy
@spaces.GPU(duration=240) # Extended to 3 minutes for larger evaluations
def evaluate_mmlu_pro(model_name, num_subjects=-1, num_questions=10, num_shots=5):
model, tokenizer = load_model(model_name, gpu_utilization=0.8)
# Ensure model is in evaluation mode
model[0].model.eval() # Assuming model is a tuple of (llm, sampling_params)
test_df, val_df = load_mmlu_pro()
test_df = pd.DataFrame(test_df)
val_df = pd.DataFrame(val_df) # Fixed: was 'val_def'
test_df = test_df.sort_values(['category', 'question_id'])
val_df = val_df.sort_values(['category', 'question_id']) # Fixed: was 'dev_df'
# Get all unique subjects
all_subjects = sorted(test_df['category'].unique())
selected_subjects = []
# Select subjects based on num_subjects parameter
if num_subjects == -1 or num_subjects >= len(all_subjects):
selected_subjects = all_subjects
else:
# Take the first num_subjects subjects
selected_subjects = all_subjects[:num_subjects]
logging.info("selected subjects:\n" + "\n".join(selected_subjects))
results = {}
all_correctness = []
results_table = []
for subject in tqdm(selected_subjects, desc="Processing Selected Categories"):
test_samples = test_df[test_df['category'] == subject].head(num_questions)
val_samples = val_df[val_df['category'] == subject].head(num_shots)
correctness, acc = eval_cot(subject, model, tokenizer, val_df=val_samples, test_df=test_samples, num_shots=num_shots)
results[subject] = acc
all_correctness.extend(correctness)
results_table.append({
'Subject': subject,
'Num_samples': len(test_samples),
'Num_correct': sum(correctness),
'Accuracy': acc
})
weighted_acc = np.mean(all_correctness)
min_acc_subject = min(results.items(), key=lambda x: x[1])[0]
max_acc_subject = max(results.items(), key=lambda x: x[1])[0]
return {
"overall_accuracy": weighted_acc,
"min_accuracy_subject": (min_acc_subject, results[min_acc_subject]),
"max_accuracy_subject": (max_acc_subject, results[max_acc_subject]),
"full_accuracy_table": results_table,
}