Spaces:
Sleeping
Sleeping
import torch | |
import evaluate | |
from datasets import load_dataset | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
import spaces | |
import logging | |
import numpy as np | |
import pandas as pd | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
accuracy_metric = evaluate.load("accuracy") | |
option_letters = ["A", "B", "C", "D"] | |
MAX_CONTEXT_WINDOW = 4096 #Hard-coded for the moment, will be replaced later to be an input from the Model. | |
def load_dataset_from_hf(verbose=False): | |
mmlu_dataset = load_dataset("cais/mmlu", "all") | |
if verbose: | |
for split in mmlu_dataset.keys(): | |
dataset = mmlu_dataset[split] # Access the dataset split | |
# Log number of rows and columns | |
num_rows = len(dataset) | |
num_cols = len(dataset.column_names) | |
logger.info(f"Dataset Split: {split}") | |
logger.info(f"Number of Rows: {num_rows}") | |
logger.info(f"Number of Columns: {num_cols}") | |
# Log column names and their types | |
column_types = {col: str(dataset.features[col].dtype) for col in dataset.column_names} | |
logger.info(f"Column Names: {dataset.column_names}") | |
logger.info(f"Column Types: {column_types}") | |
# Log a sample of 5 rows | |
sample_rows = dataset.select(range(min(5, num_rows))) # Ensure we don't exceed available rows | |
logger.info("Sample Rows:") | |
for row in sample_rows: | |
logger.info(row) | |
logger.info("=" * 50) # Separator for readability | |
return mmlu_dataset | |
def format_subject(subject): | |
l = subject.split("_") | |
s = "" | |
for entry in l: | |
s += " " + entry | |
return s | |
def format_example(df, idx, include_answer=True): | |
""" | |
Format a single example for the prompt based on the actual dataset structure: | |
- Column 0: question text | |
- Column 1: subject | |
- Column 2: choices as a list of strings | |
- Column 3: answer as a numeric index (0-3) | |
""" | |
# Get the question text | |
prompt = df.iloc[idx, 0] | |
# Get the choices from the dataframe | |
options_list = df.iloc[idx, 2] | |
assert(isinstance(options_list, list)) | |
for j, option in enumerate(options_list): | |
prompt += f"\n{option_letters[j]}. {option}" | |
prompt += "\nAnswer:" | |
if include_answer: | |
# Convert numeric answer to letter | |
answer_num = df.iloc[idx, 3] | |
answer_letter = {0: "A", 1: "B", 2: "C", 3: "D"}[answer_num] | |
prompt += f" {answer_letter}\n\n" | |
return prompt | |
def gen_prompt(df, subject, k=-1): | |
prompt = "The following are multiple choice questions (with answers) about {}.\n\n".format( | |
format_subject(subject) | |
) | |
if k == -1: | |
k = df.shape[0] | |
for i in range(k): | |
prompt += format_example(df, i, include_answer=True) | |
return prompt | |
def eval (subject, model, tokenizer, dev_df, test_df, num_questions_per_subject=5, train_shots=5): | |
assert all(dev_df['subject'] == subject), f"Not all items in dev_df match subject {subject}" | |
assert all(test_df['subject'] == subject), f"Not all items in test_df match subject {subject}" | |
logger.info(f"Subject: {subject}") | |
cors = [] | |
all_probs = [] | |
if (train_shots < 0): | |
train_shots = 0 # Make positive. | |
for i in range(test_df.shape[0]): | |
prompt_end = format_example(test_df, i, include_answer=False) | |
train_prompt = gen_prompt(dev_df, subject, train_shots) | |
prompt = train_prompt + prompt_end | |
input_ids = tokenizer (prompt, return_tensors="pt").input_ids.to(model.device) | |
# Reduce number of shots in the prompt to fit in context window. | |
while (train_shots > 0 and input_ids.shape[-1] > MAX_CONTEXT_WINDOW): | |
train_shots -= 1 | |
train_prompt = gen_prompt(dev_df, subject, train_shots) | |
prompt = train_prompt + prompt_end | |
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to( | |
model.device | |
) | |
logger.info (f"Sample: {i}") | |
label = test_df.iloc[i, 3] | |
label_letter = {0: "A", 1: "B", 2: "C", 3: "D"}[label] | |
logits = model(input_ids=input_ids).logits[0, -1] | |
probs = ( | |
torch.nn.functional.softmax( | |
torch.tensor( | |
[ | |
logits[tokenizer("A").input_ids[-1]], | |
logits[tokenizer("B").input_ids[-1]], | |
logits[tokenizer("C").input_ids[-1]], | |
logits[tokenizer("D").input_ids[-1]], | |
] | |
).float(), | |
dim=0, | |
) | |
.detach() | |
.cpu() | |
.numpy() | |
) | |
pred = {0: "A", 1: "B", 2: "C", 3: "D"}[np.argmax(probs)] | |
cor = pred == label_letter | |
if (i == 0): | |
logger.info (f"Prompt: {prompt}") | |
logger.info(f"Label_Letter: {label_letter}") | |
logger.info(f"Logits: {logits}") | |
logger.info(f"Probabilities: {probs}") | |
logger.info(f"Prediction: {pred}") | |
logger.info(f"Correct: {cor}") | |
cors.append(cor) | |
all_probs.append(probs) | |
acc = np.mean(cors) | |
cors = np.array(cors) | |
all_probs = np.array(all_probs) | |
print("Average accuracy {:.3f} - {}".format(acc, subject)) | |
return cors, acc, all_probs | |
def evaluate_mmlu(model, tokenizer, num_subjects=-1, num_questions=5, num_shots=5): | |
""" | |
Evaluates the model on MMLU across specified number of subjects. | |
Args: | |
model: The model to evaluate | |
tokenizer: The tokenizer to use | |
num_subjects (int): Number of subjects to evaluate. If -1, evaluates all subjects | |
num_questions (int): Number of questions per subject | |
num_shots (int): Number of few-shot examples to use | |
""" | |
model.eval() # Ensure Dropout and BatchNorm behave appropriately for inference | |
dataset = load_dataset_from_hf(verbose=True) | |
# Convert dataset partitions to pandas DataFrames | |
test_df = pd.DataFrame(dataset['test']) | |
dev_df = pd.DataFrame(dataset['dev']) | |
# Sort datasets by subject and other relevant columns | |
test_df = test_df.sort_values(['subject', 'question']) | |
dev_df = dev_df.sort_values(['subject', 'question']) | |
# Get all unique subjects | |
all_subjects = sorted(test_df['subject'].unique()) | |
# Select subjects based on num_subjects parameter | |
if num_subjects == -1 or num_subjects >= len(all_subjects): | |
subjects = all_subjects | |
else: | |
# Take the first num_subjects subjects | |
subjects = all_subjects[:num_subjects] | |
results = {} | |
all_cors = [] | |
results_table = [] | |
for subject in subjects: | |
test_samples = test_df[test_df['subject'] == subject].head(num_questions) | |
dev_samples = dev_df[dev_df['subject'] == subject].head(num_shots) | |
# Log subject and sample counts | |
logger.info(f"Subject: {subject}, Test Samples: {len(test_samples)}, Dev Samples: {len(dev_samples)}") | |
cors, acc, probs = eval( | |
subject, | |
model, | |
tokenizer, | |
dev_samples, | |
test_samples, | |
num_questions_per_subject=num_questions, | |
train_shots=num_shots | |
) | |
results[subject] = acc | |
all_cors.append(cors) | |
results_table.append({ | |
'Subject': subject, | |
'Num_samples': len(test_samples), | |
'Num_correct': int(np.sum(cors)), | |
'Accuracy': acc | |
}) | |
weighted_acc = np.mean(np.concatenate(all_cors)) | |
min_acc_subject = min(results.items(), key=lambda x: x[1])[0] | |
max_acc_subject = max(results.items(), key=lambda x: x[1])[0] | |
return { | |
"overall_accuracy": weighted_acc, | |
"min_accuracy_subject": (min_acc_subject, results[min_acc_subject]), | |
"max_accuracy_subject": (max_acc_subject, results[max_acc_subject]), | |
"full_accuracy_table": results_table, | |
} |