# Adapted from https://github.com/TIGER-AI-Lab/MMLU-Pro/blob/main/evaluate_from_local.py import csv import json import argparse import os import torch import spaces import random import transformers import time import re from vllm import LLM, SamplingParams from tqdm import tqdm import logging import sys from datasets import load_dataset import pandas as pd import numpy as mnp logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Can be found at https://github.com/TIGER-AI-Lab/MMLU-Pro/blob/main/cot_prompt_lib/initial_prompt.txt initial_prompt = "The following are multiple choice questions (with answers) about {$}. Think step by step and then finish your answer with \"the answer is (X)\" where X is the correct letter choice." choices = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P"] max_model_length = 4096 max_new_tokens = 2048 def preprocess(test_df): res_df = [] for each in test_df: options = [] for opt in each["options"]: if opt == "N/A": continue options.append(opt) each["options"] = options res_df.append(each) return res_df def load_mmlu_pro(): dataset = load_dataset("TIGER-Lab/MMLU-Pro") test_df, val_df = dataset["test"], dataset["validation"] test_df = preprocess(test_df) val_df = preprocess(val_df) return test_df, val_df def load_model(model_name, gpu_utilization=0.8): llm = LLM(model=model_name, gpu_memory_utilization=float(gpu_utilization), tensor_parallel_size=torch.cuda.device_count(), max_model_len=max_model_length, trust_remote_code=True) logger.info(f"Torch Device CUDA Count: {torch.cuda.device_count()}") sampling_params = SamplingParams(temperature=0, max_tokens=max_new_tokens, stop=["Question:"]) tokenizer = transformers.AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) return (llm, sampling_params), tokenizer def format_cot_example(example, including_answer=True): prompt = "Question:\n" question = example["question"] options = example["options"] prompt += question + "\n" prompt += "Options:\n" for i, opt in enumerate(options): prompt += "{}. {}\n".format(choices[i], opt) if including_answer: cot_content = example["cot_content"].replace("A: Let's think step by step.", "Answer: Let's think step by step.") prompt += cot_content + "\n\n" else: prompt += "Answer: Let's think step by step." return prompt def generate_cot_prompt(val_df, curr, k): prompt = initial_prompt subject = curr["category"] # Assert that all rows in val_df have 'category' equal to subject assert (val_df["category"] == subject).all(), "Not all rows in val_df have the correct category" val_df = val_df[: k] prompt = prompt.replace("{$}", subject) + "\n" for example in val_df: prompt += format_cot_example(example, including_answer=True) prompt += format_cot_example(curr, including_answer=False) return prompt def extract_answer(text): pattern = r"answer is \(?([A-J])\)?" match = re.search(pattern, text) if match: return match.group(1) else: print("1st answer extract failed\n" + text) return extract_again(text) def extract_again(text): match = re.search(r'.*[aA]nswer:\s*([A-J])', text) if match: return match.group(1) else: return extract_final(text) def extract_final(text): pattern = r"\b[A-J]\b(?!.*\b[A-J]\b)" match = re.search(pattern, text, re.DOTALL) if match: return match.group(0) else: return None def batch_inference(llm, sampling_params, inference_batch): start = time.time() outputs = llm.generate(inference_batch, sampling_params) logging.info(str(len(inference_batch)) + "size batch costing time: " + str(time.time() - start)) response_batch = [] pred_batch = [] for output in outputs: generated_text = output.outputs[0].text response_batch.append(generated_text) pred = extract_answer(generated_text) pred_batch.append(pred) logging.info("PRED BATCH: %s, RESPONSE BATCH: %s", pred_batch, response_batch) return pred_batch, response_batch def calculate_accuracy(res): """ Calculate accuracy and return an array of correctness (1 if correct, 0 if wrong) along with the overall accuracy. """ correctness = [] for each in res: if not each["pred"]: # If prediction is None, use random choice with fixed seed # This ensures reproducibility when handling missing predictions random.seed(12345) x = random.randint(0, len(each["options"]) - 1) is_correct = 1 if x == each["answer_index"] else 0 else: is_correct = 1 if each["pred"] == each["answer"] else 0 correctness.append(is_correct) # Calculate accuracy from correctness array if len(correctness) == 0: return [], 0.0 accuracy = sum(correctness) / len(correctness) return correctness, accuracy @torch.no_grad() def eval_cot(subject, model, tokenizer, val_df, test_df, num_shots=5): llm, sampling_params = model global choices logging.info("evaluating " + subject) inference_batches = [] k = num_shots for i in tqdm(range(len(test_df))): curr = test_df[i] prompt_length_ok = False prompt = None while not prompt_length_ok: prompt = generate_cot_prompt(val_df, curr, k) inputs = tokenizer(prompt, return_tensors="pt") inputs = {key: value.cuda() for key, value in inputs.items()} length = len(inputs["input_ids"][0]) if length < max_model_length - max_new_tokens: prompt_length_ok = True k -= 1 inference_batches.append(prompt) pred_batch, response_batch = batch_inference(llm, sampling_params, inference_batches) results = [] for j, curr in enumerate(test_df): curr["pred"] = pred_batch[j] curr["model_outputs"] = response_batch[j] results.append(curr) # Get array of correctness and overall accuracy correctness, accuracy = calculate_accuracy(results) logging.info("This batch accuracy is: {}, correct samples: {}/{}\n".format( str(accuracy), str(sum(correctness)), str(len(correctness)))) return correctness, accuracy @spaces.GPU(duration=240) # Extended to 3 minutes for larger evaluations def evaluate_mmlu_pro(model_name, num_subjects=-1, num_questions=10, num_shots=5): model, tokenizer = load_model(model_name, gpu_utilization=0.8) # Ensure model is in evaluation mode model[0].model.eval() # Assuming model is a tuple of (llm, sampling_params) test_df, val_df = load_mmlu_pro() test_df = pd.DataFrame(test_df) val_df = pd.DataFrame(val_df) # Fixed: was 'val_def' test_df = test_df.sort_values(['category', 'question_id']) val_df = val_df.sort_values(['category', 'question_id']) # Fixed: was 'dev_df' # Get all unique subjects all_subjects = sorted(test_df['category'].unique()) selected_subjects = [] # Select subjects based on num_subjects parameter if num_subjects == -1 or num_subjects >= len(all_subjects): selected_subjects = all_subjects else: # Take the first num_subjects subjects selected_subjects = all_subjects[:num_subjects] logging.info("selected subjects:\n" + "\n".join(selected_subjects)) results = {} all_correctness = [] results_table = [] for subject in tqdm(selected_subjects, desc="Processing Selected Categories"): test_samples = test_df[test_df['category'] == subject].head(num_questions) val_samples = val_df[val_df['category'] == subject].head(num_shots) correctness, acc = eval_cot(subject, model, tokenizer, val_df=val_samples, test_df=test_samples, num_shots=num_shots) results[subject] = acc all_correctness.extend(correctness) results_table.append({ 'Subject': subject, 'Num_samples': len(test_samples), 'Num_correct': sum(correctness), 'Accuracy': acc }) weighted_acc = np.mean(all_correctness) min_acc_subject = min(results.items(), key=lambda x: x[1])[0] max_acc_subject = max(results.items(), key=lambda x: x[1])[0] return { "overall_accuracy": weighted_acc, "min_accuracy_subject": (min_acc_subject, results[min_acc_subject]), "max_accuracy_subject": (max_acc_subject, results[max_acc_subject]), "full_accuracy_table": results_table, }