|
import torch |
|
import argparse |
|
import json |
|
import os |
|
from transformers import AutoModelForCausalLM, PreTrainedTokenizerFast |
|
from datasets import Dataset, DatasetDict |
|
|
|
|
|
MODEL_DIR = "../base_model" |
|
TOKENIZER_JSON = "../tokenizer.json" |
|
DATASET_DIR = "../datasets/" |
|
|
|
|
|
with open("../config.json", "r") as f: |
|
config = json.load(f) |
|
|
|
def load_model(): |
|
"""Load the model and tokenizer with optimizations.""" |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
print(f"Using device: {device}") |
|
|
|
try: |
|
tokenizer = PreTrainedTokenizerFast(tokenizer_file=TOKENIZER_JSON) |
|
if tokenizer.pad_token is None: |
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
MODEL_DIR, |
|
torch_dtype=torch.bfloat16, |
|
device_map="auto", |
|
low_cpu_mem_usage=True |
|
).to(device) |
|
return model, tokenizer |
|
except Exception as e: |
|
print(f"Error loading model/tokenizer: {e}") |
|
exit(1) |
|
|
|
def load_custom_dataset(version): |
|
"""Load Eclipse Corpuz dataset based on version.""" |
|
dataset_path = f"{DATASET_DIR}eclipse_corpuz_{version}.json" |
|
if not os.path.exists(dataset_path): |
|
print(f"Error: Dataset {dataset_path} not found") |
|
exit(1) |
|
|
|
try: |
|
with open(dataset_path, "r", encoding="utf-8") as f: |
|
data = json.load(f) |
|
|
|
|
|
if isinstance(data, list): |
|
|
|
if data and isinstance(data[0], dict) and "text" in data[0]: |
|
dataset = Dataset.from_list(data) |
|
|
|
else: |
|
dataset = Dataset.from_dict({"text": data}) |
|
else: |
|
print(f"Error: Unsupported dataset format in {dataset_path}") |
|
exit(1) |
|
|
|
return DatasetDict({"test": dataset}) |
|
except Exception as e: |
|
print(f"Error loading dataset: {e}") |
|
exit(1) |
|
|
|
def evaluate(model, tokenizer, dataset, batch_size=8): |
|
"""Evaluate model on Eclipse Corpuz dataset with batching.""" |
|
dataset = dataset["test"] |
|
model.eval() |
|
losses = [] |
|
total_tokens = 0 |
|
correct_tokens = 0 |
|
|
|
|
|
for i in range(0, min(len(dataset), 100), batch_size): |
|
batch = dataset[i:i + batch_size] |
|
inputs = tokenizer( |
|
batch["text"], |
|
return_tensors="pt", |
|
padding=True, |
|
truncation=True, |
|
max_length=config.get("max_length", 512) |
|
).to(model.device) |
|
|
|
labels = inputs["input_ids"].clone() |
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs, labels=labels) |
|
losses.append(outputs.loss.item()) |
|
|
|
|
|
shift_logits = outputs.logits[..., :-1, :].contiguous() |
|
shift_labels = labels[..., 1:].contiguous() |
|
predictions = torch.argmax(shift_logits, dim=-1) |
|
|
|
mask = shift_labels != tokenizer.pad_token_id |
|
correct_tokens += (predictions == shift_labels).masked_select(mask).sum().item() |
|
total_tokens += mask.sum().item() |
|
|
|
avg_loss = sum(losses) / len(losses) if losses else float("inf") |
|
perplexity = torch.exp(torch.tensor(avg_loss)).item() |
|
accuracy = correct_tokens / total_tokens if total_tokens > 0 else 0 |
|
|
|
return {"accuracy": accuracy, "loss": avg_loss, "perplexity": perplexity} |
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser(description="Evaluate Charm 15 on Eclipse Corpuz dataset") |
|
parser.add_argument("--version", type=str, default="1.1", help="Dataset version (e.g., 1.1, 1.2)") |
|
args = parser.parse_args() |
|
|
|
model, tokenizer = load_model() |
|
dataset = load_custom_dataset(args.version) |
|
results = evaluate(model, tokenizer, dataset, batch_size=4) |
|
|
|
print(f"Evaluation Results (Eclipse Corpuz {args.version}):") |
|
print(f"Accuracy: {results['accuracy']:.4f}") |
|
print(f"Loss: {results['loss']:.4f}") |
|
print(f"Perplexity: {results['perplexity']:.4f}") |
|
|
|
|
|
del model |
|
torch.cuda.empty_cache() |