|
import pandas as pd |
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
from torch.utils.data import DataLoader |
|
from sklearn.model_selection import train_test_split |
|
from sklearn.metrics import accuracy_score, f1_score |
|
from datasets import Dataset |
|
|
|
import pytorch_lightning as pl |
|
from transformers import RobertaTokenizer, RobertaModel |
|
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping |
|
from pytorch_lightning.strategies import DDPStrategy |
|
|
|
|
|
class RoBERTaClassifier(pl.LightningModule): |
|
def __init__(self, num_labels=4, lr=2e-5, class_weights=None): |
|
super().__init__() |
|
self.save_hyperparameters() |
|
self.model = RobertaModel.from_pretrained("roberta-base", add_pooling_layer=False) |
|
self.dropout = nn.Dropout(0.3) |
|
self.classifier = nn.Linear(self.model.config.hidden_size, num_labels) |
|
|
|
if class_weights is not None: |
|
weights = torch.tensor(class_weights, dtype=torch.float32) |
|
self.loss_fn = nn.CrossEntropyLoss(weight=weights) |
|
else: |
|
self.loss_fn = nn.CrossEntropyLoss() |
|
|
|
def forward(self, input_ids, attention_mask): |
|
outputs = self.model(input_ids=input_ids, attention_mask=attention_mask) |
|
cls_output = outputs.last_hidden_state[:, 0, :] |
|
cls_output = self.dropout(cls_output) |
|
return self.classifier(cls_output) |
|
|
|
def training_step(self, batch, batch_idx): |
|
input_ids, attention_mask, labels = batch["input_ids"], batch["attention_mask"], batch["label"] |
|
logits = self(input_ids, attention_mask) |
|
loss = self.loss_fn(logits, labels) |
|
preds = torch.argmax(logits, dim=1) |
|
acc = accuracy_score(labels.cpu(), preds.cpu()) |
|
self.log("train_loss", loss, prog_bar=True) |
|
self.log("train_acc", acc, prog_bar=True) |
|
return loss |
|
|
|
def validation_step(self, batch, batch_idx): |
|
input_ids, attention_mask, labels = batch["input_ids"], batch["attention_mask"], batch["label"] |
|
logits = self(input_ids, attention_mask) |
|
loss = self.loss_fn(logits, labels) |
|
preds = torch.argmax(logits, dim=1) |
|
acc = accuracy_score(labels.cpu(), preds.cpu()) |
|
f1 = f1_score(labels.cpu(), preds.cpu(), average='weighted') |
|
self.log("val_loss", loss, prog_bar=True) |
|
self.log("val_acc", acc, prog_bar=True) |
|
self.log("val_f1", f1, prog_bar=True, sync_dist=True) |
|
|
|
def configure_optimizers(self): |
|
return torch.optim.AdamW(self.parameters(), lr=self.hparams.lr) |
|
|
|
|
|
if __name__ == "__main__": |
|
df = pd.read_csv("data_cleaned2.csv") |
|
|
|
class_counts = df["label"].value_counts().sort_index().tolist() |
|
class_weights = 1.0 / np.array(class_counts) |
|
class_weights = class_weights / class_weights.sum() |
|
|
|
train_df = df.sample(frac=0.8, random_state=42) |
|
val_df = df.drop(train_df.index) |
|
|
|
tokenizer = RobertaTokenizer.from_pretrained("roberta-base") |
|
|
|
def tokenize(batch): |
|
return tokenizer(batch["text"], truncation=True, padding="max_length", max_length=64) |
|
|
|
train_dataset = Dataset.from_pandas(train_df).map(tokenize, batched=True) |
|
val_dataset = Dataset.from_pandas(val_df).map(tokenize, batched=True) |
|
|
|
train_dataset.set_format("torch", columns=["input_ids", "attention_mask", "label"]) |
|
val_dataset.set_format("torch", columns=["input_ids", "attention_mask", "label"]) |
|
|
|
train_loader = DataLoader(train_dataset, batch_size=16, num_workers=8, shuffle=True) |
|
val_loader = DataLoader(val_dataset, batch_size=16, num_workers=8) |
|
|
|
checkpoint_callback = ModelCheckpoint( |
|
dirpath="checkpoints/", |
|
filename="roberta-priority-{epoch:02d}-{val_f1:.2f}", |
|
save_top_k=3, |
|
monitor="val_f1", |
|
mode="max", |
|
save_weights_only=True, |
|
every_n_epochs=1 |
|
) |
|
|
|
early_stopping = EarlyStopping( |
|
monitor="val_f1", |
|
patience=2, |
|
mode="max", |
|
verbose=True, |
|
) |
|
|
|
trainer_kwargs = dict( |
|
accelerator="gpu", |
|
devices=2, |
|
strategy=DDPStrategy(find_unused_parameters=True), |
|
max_epochs=20, |
|
precision=16, |
|
log_every_n_steps=10, |
|
callbacks=[checkpoint_callback, early_stopping], |
|
) |
|
|
|
trainer = pl.Trainer(**trainer_kwargs) |
|
model = RoBERTaClassifier(class_weights=class_weights) |
|
|
|
trainer.fit(model, train_loader, val_loader) |
|
|