Spaces:
Sleeping
Sleeping
# Adapted from https://github.com/TIGER-AI-Lab/MMLU-Pro/blob/main/evaluate_from_local.py | |
import csv | |
import json | |
import argparse | |
import os | |
import torch | |
import spaces | |
import random | |
import transformers | |
import time | |
import re | |
from vllm import LLM, SamplingParams | |
from tqdm import tqdm | |
import logging | |
import sys | |
from datasets import load_dataset | |
import pandas as pd | |
import numpy as mnp | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Can be found at https://github.com/TIGER-AI-Lab/MMLU-Pro/blob/main/cot_prompt_lib/initial_prompt.txt | |
initial_prompt = "The following are multiple choice questions (with answers) about {$}. Think step by step and then finish your answer with \"the answer is (X)\" where X is the correct letter choice." | |
choices = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P"] | |
max_model_length = 4096 | |
max_new_tokens = 2048 | |
def preprocess(test_df): | |
res_df = [] | |
for each in test_df: | |
options = [] | |
for opt in each["options"]: | |
if opt == "N/A": | |
continue | |
options.append(opt) | |
each["options"] = options | |
res_df.append(each) | |
return res_df | |
def load_mmlu_pro(): | |
dataset = load_dataset("TIGER-Lab/MMLU-Pro") | |
test_df, val_df = dataset["test"], dataset["validation"] | |
test_df = preprocess(test_df) | |
val_df = preprocess(val_df) | |
return test_df, val_df | |
def load_model(model_name, gpu_utilization=0.8): | |
llm = LLM(model=model_name, gpu_memory_utilization=float(gpu_utilization), | |
tensor_parallel_size=torch.cuda.device_count(), | |
max_model_len=max_model_length, | |
trust_remote_code=True) | |
logger.info(f"Torch Device CUDA Count: {torch.cuda.device_count()}") | |
sampling_params = SamplingParams(temperature=0, max_tokens=max_new_tokens, | |
stop=["Question:"]) | |
tokenizer = transformers.AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
return (llm, sampling_params), tokenizer | |
def format_cot_example(example, including_answer=True): | |
prompt = "Question:\n" | |
question = example["question"] | |
options = example["options"] | |
prompt += question + "\n" | |
prompt += "Options:\n" | |
for i, opt in enumerate(options): | |
prompt += "{}. {}\n".format(choices[i], opt) | |
if including_answer: | |
cot_content = example["cot_content"].replace("A: Let's think step by step.", | |
"Answer: Let's think step by step.") | |
prompt += cot_content + "\n\n" | |
else: | |
prompt += "Answer: Let's think step by step." | |
return prompt | |
def generate_cot_prompt(val_df, curr, k): | |
prompt = initial_prompt | |
subject = curr["category"] | |
# Assert that all rows in val_df have 'category' equal to subject | |
assert (val_df["category"] == subject).all(), "Not all rows in val_df have the correct category" | |
val_df = val_df[: k] | |
prompt = prompt.replace("{$}", subject) + "\n" | |
for example in val_df: | |
prompt += format_cot_example(example, including_answer=True) | |
prompt += format_cot_example(curr, including_answer=False) | |
return prompt | |
def extract_answer(text): | |
pattern = r"answer is \(?([A-J])\)?" | |
match = re.search(pattern, text) | |
if match: | |
return match.group(1) | |
else: | |
print("1st answer extract failed\n" + text) | |
return extract_again(text) | |
def extract_again(text): | |
match = re.search(r'.*[aA]nswer:\s*([A-J])', text) | |
if match: | |
return match.group(1) | |
else: | |
return extract_final(text) | |
def extract_final(text): | |
pattern = r"\b[A-J]\b(?!.*\b[A-J]\b)" | |
match = re.search(pattern, text, re.DOTALL) | |
if match: | |
return match.group(0) | |
else: | |
return None | |
def batch_inference(llm, sampling_params, inference_batch): | |
start = time.time() | |
outputs = llm.generate(inference_batch, sampling_params) | |
logging.info(str(len(inference_batch)) + "size batch costing time: " + str(time.time() - start)) | |
response_batch = [] | |
pred_batch = [] | |
for output in outputs: | |
generated_text = output.outputs[0].text | |
response_batch.append(generated_text) | |
pred = extract_answer(generated_text) | |
pred_batch.append(pred) | |
logging.info("PRED BATCH: %s, RESPONSE BATCH: %s", pred_batch, response_batch) | |
return pred_batch, response_batch | |
def calculate_accuracy(res): | |
""" | |
Calculate accuracy and return an array of correctness (1 if correct, 0 if wrong) | |
along with the overall accuracy. | |
""" | |
correctness = [] | |
for each in res: | |
if not each["pred"]: | |
# If prediction is None, use random choice with fixed seed | |
# This ensures reproducibility when handling missing predictions | |
random.seed(12345) | |
x = random.randint(0, len(each["options"]) - 1) | |
is_correct = 1 if x == each["answer_index"] else 0 | |
else: | |
is_correct = 1 if each["pred"] == each["answer"] else 0 | |
correctness.append(is_correct) | |
# Calculate accuracy from correctness array | |
if len(correctness) == 0: | |
return [], 0.0 | |
accuracy = sum(correctness) / len(correctness) | |
return correctness, accuracy | |
def eval_cot(subject, model, tokenizer, val_df, test_df, num_shots=5): | |
llm, sampling_params = model | |
global choices | |
logging.info("evaluating " + subject) | |
inference_batches = [] | |
k = num_shots | |
for i in tqdm(range(len(test_df))): | |
curr = test_df[i] | |
prompt_length_ok = False | |
prompt = None | |
while not prompt_length_ok: | |
prompt = generate_cot_prompt(val_df, curr, k) | |
inputs = tokenizer(prompt, return_tensors="pt") | |
inputs = {key: value.cuda() for key, value in inputs.items()} | |
length = len(inputs["input_ids"][0]) | |
if length < max_model_length - max_new_tokens: | |
prompt_length_ok = True | |
k -= 1 | |
inference_batches.append(prompt) | |
pred_batch, response_batch = batch_inference(llm, sampling_params, inference_batches) | |
results = [] | |
for j, curr in enumerate(test_df): | |
curr["pred"] = pred_batch[j] | |
curr["model_outputs"] = response_batch[j] | |
results.append(curr) | |
# Get array of correctness and overall accuracy | |
correctness, accuracy = calculate_accuracy(results) | |
logging.info("This batch accuracy is: {}, correct samples: {}/{}\n".format( | |
str(accuracy), str(sum(correctness)), str(len(correctness)))) | |
return correctness, accuracy | |
# Extended to 3 minutes for larger evaluations | |
def evaluate_mmlu_pro(model_name, num_subjects=-1, num_questions=10, num_shots=5): | |
model, tokenizer = load_model(model_name, gpu_utilization=0.8) | |
# Ensure model is in evaluation mode | |
model[0].model.eval() # Assuming model is a tuple of (llm, sampling_params) | |
test_df, val_df = load_mmlu_pro() | |
test_df = pd.DataFrame(test_df) | |
val_df = pd.DataFrame(val_df) # Fixed: was 'val_def' | |
test_df = test_df.sort_values(['category', 'question_id']) | |
val_df = val_df.sort_values(['category', 'question_id']) # Fixed: was 'dev_df' | |
# Get all unique subjects | |
all_subjects = sorted(test_df['category'].unique()) | |
selected_subjects = [] | |
# Select subjects based on num_subjects parameter | |
if num_subjects == -1 or num_subjects >= len(all_subjects): | |
selected_subjects = all_subjects | |
else: | |
# Take the first num_subjects subjects | |
selected_subjects = all_subjects[:num_subjects] | |
logging.info("selected subjects:\n" + "\n".join(selected_subjects)) | |
results = {} | |
all_correctness = [] | |
results_table = [] | |
for subject in tqdm(selected_subjects, desc="Processing Selected Categories"): | |
test_samples = test_df[test_df['category'] == subject].head(num_questions) | |
val_samples = val_df[val_df['category'] == subject].head(num_shots) | |
correctness, acc = eval_cot(subject, model, tokenizer, val_df=val_samples, test_df=test_samples, num_shots=num_shots) | |
results[subject] = acc | |
all_correctness.extend(correctness) | |
results_table.append({ | |
'Subject': subject, | |
'Num_samples': len(test_samples), | |
'Num_correct': sum(correctness), | |
'Accuracy': acc | |
}) | |
weighted_acc = np.mean(all_correctness) | |
min_acc_subject = min(results.items(), key=lambda x: x[1])[0] | |
max_acc_subject = max(results.items(), key=lambda x: x[1])[0] | |
return { | |
"overall_accuracy": weighted_acc, | |
"min_accuracy_subject": (min_acc_subject, results[min_acc_subject]), | |
"max_accuracy_subject": (max_acc_subject, results[max_acc_subject]), | |
"full_accuracy_table": results_table, | |
} |