Spaces:
Runtime error
Runtime error
# Description: Classification models | |
from transformers import AutoModel, AutoTokenizer, BatchEncoding, TrainingArguments, Trainer | |
from functools import partial | |
from huggingface_hub import snapshot_download | |
from huggingface_hub.constants import HF_HUB_CACHE | |
from accelerate import Accelerator | |
from accelerate.utils import find_executable_batch_size as auto_find_batch_size | |
from datasets import load_dataset, Dataset | |
from torch.utils.data import DataLoader | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import numpy as np | |
import json | |
import os | |
from tqdm import tqdm | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
from sklearn.metrics import ( | |
ConfusionMatrixDisplay, | |
accuracy_score, | |
classification_report, | |
confusion_matrix, | |
f1_score, | |
recall_score | |
) | |
BASE_PATH = os.path.dirname(os.path.abspath(__file__)) | |
class MultiHeadClassification(nn.Module): | |
""" | |
MultiHeadClassification | |
An easy to use multi-head classification model. It takes a backbone model and a dictionary of head configurations. | |
It can be used to train multiple classification tasks at once using a single backbone model. | |
Apart from joint training, it also supports training individual heads separately, providing a simple way to freeze | |
and unfreeze heads. | |
Example: | |
>>> from transformers import AutoModel, AutoTokenizer | |
>>> from torch.optim import AdamW | |
>>> import torch | |
>>> import time | |
>>> import torch.nn as nn | |
>>> | |
>>> # Manually load backbone model to create model | |
>>> backbone = AutoModel.from_pretrained('BAAI/bge-m3') | |
>>> model = MultiHeadClassification(backbone, {'binary': 2, 'sentiment': 3, 'something': 4}).to('cuda') | |
>>> print(model) | |
>>> # Load tokenizer for data preprocessing | |
>>> tokenizer = AutoTokenizer.from_pretrained('BAAI/bge-m3') | |
>>> # some training data | |
>>> inputs = tokenizer("Hello, my dog is cute", return_tensors="pt", padding=True, truncation=True) | |
>>> optimizer = AdamW(model.parameters(), lr=5e-4) | |
>>> samples = tokenizer(["Hello, my dog is cute", "Hello, my dog is cute", "I like turtles"], return_tensors="pt", padding=True, truncation=True).to('cuda') | |
>>> labels = {'binary': torch.tensor([0, 0, 1]), 'sentiment': torch.tensor([0, 1, 2]), 'something': torch.tensor([0, 1, 2])} | |
>>> model.freeze_backbone() | |
>>> model.train(True) | |
>>> for i in range(10): | |
... optimizer.zero_grad() | |
... outputs = model(samples) | |
... loss = sum([nn.CrossEntropyLoss()(outputs[name].cpu(), labels[name]) for name in model.heads.keys()]) | |
... loss.backward() | |
... optimizer.step() | |
... print(loss.item()) | |
... #time.sleep(1) | |
... print(model(samples)) | |
>>> # Save full model | |
>>> model.save('model.pth') | |
>>> # Save head only | |
>>> model.save_head('binary', 'binary.pth') | |
>>> # Load full model | |
>>> model = MultiHeadClassification(backbone, {}).to('cuda') | |
>>> model.load('model.pth') | |
>>> # Load head only | |
>>> model = MultiHeadClassification(backbone, {}).to('cuda') | |
>>> model.load_head('binary', 'binary.pth') | |
>>> # Adding new head | |
>>> model.add_head('new_head', 3) | |
>>> print(model) | |
>>> # extend dataset with data for new head | |
>>> labels['new_head'] = torch.tensor([0, 1, 2]) | |
>>> # Freeze all heads and backbone | |
>>> model.freeze_all() | |
>>> # Only unfreeze new head | |
>>> model.unfreeze_head('new_head') | |
>>> model.train(True) | |
>>> for i in range(10): | |
... optimizer.zero_grad() | |
... outputs = model(samples) | |
... loss = sum([nn.CrossEntropyLoss()(outputs[name].cpu(), labels[name]) for name in model.heads.keys()]) | |
... loss.backward() | |
... optimizer.step() | |
... print(loss.item()) | |
>>> print(model(samples)) | |
Args: | |
backbone (transformers.PreTrainedModel): A pretrained transformer model | |
head_config (dict): A dictionary with head configurations. The key is the head name and the value is the number | |
of classes for that head. | |
""" | |
def __init__(self, backbone, head_config, dropout=0.1, l2_reg=0.01): | |
super().__init__() | |
self.backbone = backbone | |
self.num_heads = len(head_config) | |
self.heads = nn.ModuleDict({ | |
name: nn.Linear(backbone.config.hidden_size, num_classes) | |
for name, num_classes in head_config.items() | |
}) | |
self.do = nn.Dropout(dropout) | |
self.l2_reg = l2_reg | |
self.device = 'cpu' | |
self.torch_dtype = torch.float16 | |
self.head_config = head_config | |
def forward(self, x, head_names=None) -> dict: | |
""" | |
Forward pass of the model. | |
Requires tokenizer output as input. The input should be a dictionary with keys 'input_ids', 'attention_mask'. | |
Args: | |
x (dict): Tokenizer output | |
head_names (list): (optional) List of head names to return logits for. If None, returns logits for all heads. | |
Returns: | |
dict: A dictionary with head names as keys and logits as values | |
""" | |
x = self.backbone(**x, return_dict=True, output_hidden_states=True).last_hidden_state[:, 0, :] | |
x = self.do(x) | |
if head_names is None: | |
return {name: head(x) for name, head in self.heads.items()} | |
return {name: head(x) for name, head in self.heads.items() if name in head_names} | |
def get_l2_loss(self): | |
""" | |
Getter for L2 regularization loss | |
Returns: | |
torch.Tensor: L2 regularization loss | |
""" | |
l2_loss = torch.tensor(0.).to(self.device) | |
for param in self.parameters(): | |
if param.requires_grad: | |
l2_loss += torch.norm(param, 2) | |
return (self.l2_reg * l2_loss).to(self.device) | |
def to(self, *args, **kwargs): | |
super().to(*args, **kwargs) | |
if isinstance(args[0], torch.dtype): | |
self.torch_dtype = args[0] | |
elif isinstance(args[0], str): | |
self.device = args[0] | |
return self | |
def load_head(self, head_name, path): | |
""" | |
Load head from a file | |
Args: | |
head_name (str): Name of the head | |
path (str): Path to the file | |
Returns: | |
None | |
""" | |
model = torch.load(path, map_location=self.device) | |
if head_name in self.heads: | |
num_classes = model['weight'].shape[0] | |
self.heads[head_name].load_state_dict(model) | |
self.to(self.torch_dtype).to(self.device) | |
self.head_config[head_name] = num_classes | |
return | |
assert model['weight'].shape[1] == self.backbone.config.hidden_size | |
num_classes = model['weight'].shape[0] | |
self.heads[head_name] = nn.Linear(self.backbone.config.hidden_size, num_classes) | |
self.heads[head_name].load_state_dict(model) | |
self.head_config[head_name] = num_classes | |
self.to(self.torch_dtype).to(self.device) | |
def save_head(self, head_name, path): | |
""" | |
Save head to a file | |
Args: | |
head_name (str): Name of the head | |
path (str): Path to the file | |
""" | |
torch.save(self.heads[head_name].state_dict(), path) | |
def save(self, path): | |
""" | |
Save the full model to a file | |
Args: | |
path (str): Path to the file | |
""" | |
torch.save(self.state_dict(), path) | |
def load(self, path): | |
""" | |
Load the full model from a file | |
Args: | |
path (str): Path to the file | |
""" | |
self.load_state_dict(torch.load(path, map_location=self.device)) | |
self.to(self.torch_dtype).to(self.device) | |
def save_backbone(self, path): | |
""" | |
Save the backbone to a file | |
Args: | |
path (str): Path to the file | |
""" | |
self.backbone.save_pretrained(path) | |
def load_backbone(self, path): | |
""" | |
Load the backbone from a file | |
Args: | |
path (str): Path to the file | |
""" | |
self.backbone = AutoModel.from_pretrained(path) | |
self.to(self.torch_dtype).to(self.device) | |
def freeze_backbone(self): | |
""" Freeze the backbone """ | |
for param in self.backbone.parameters(): | |
param.requires_grad = False | |
def unfreeze_backbone(self): | |
""" Unfreeze the backbone """ | |
for param in self.backbone.parameters(): | |
param.requires_grad = True | |
def freeze_head(self, head_name): | |
""" | |
Freeze a head by name | |
Args: | |
head_name (str): Name of the head | |
""" | |
for param in self.heads[head_name].parameters(): | |
param.requires_grad = False | |
def unfreeze_head(self, head_name): | |
""" | |
Unfreeze a head by name | |
Args: | |
head_name (str): Name of the head | |
""" | |
for param in self.heads[head_name].parameters(): | |
param.requires_grad = True | |
def freeze_all_heads(self): | |
""" Freeze all heads """ | |
for head_name in self.heads.keys(): | |
self.freeze_head(head_name) | |
def unfreeze_all_heads(self): | |
""" Unfreeze all heads """ | |
for head_name in self.heads.keys(): | |
self.unfreeze_head(head_name) | |
def freeze_all(self): | |
""" Freeze all """ | |
self.freeze_backbone() | |
self.freeze_all_heads() | |
def unfreeze_all(self): | |
""" Unfreeze all """ | |
self.unfreeze_backbone() | |
self.unfreeze_all_heads() | |
def add_head(self, head_name, num_classes): | |
""" | |
Add a new head to the model | |
Args: | |
head_name (str): Name of the head | |
num_classes (int): Number of classes for the head | |
""" | |
self.heads[head_name] = nn.Linear(self.backbone.config.hidden_size, num_classes) | |
self.heads[head_name].to(self.torch_dtype).to(self.device) | |
self.head_config[head_name] = num_classes | |
def remove_head(self, head_name): | |
""" | |
Remove a head from the model | |
""" | |
if head_name not in self.heads: | |
raise ValueError(f'Head {head_name} not found') | |
del self.heads[head_name] | |
del self.head_config[head_name] | |
def from_pretrained(cls, model_name, head_config=None, dropout=0.1, l2_reg=0.01): | |
""" | |
Load a pretrained model from Huggingface model hub | |
Args: | |
model_name (str): Name of the model | |
head_config (dict): Head configuration | |
dropout (float): Dropout rate | |
l2_reg (float): L2 regularization rate | |
""" | |
if head_config is None: | |
head_config = {} | |
# check if model exists locally | |
hf_cache_dir = HF_HUB_CACHE | |
model_path = os.path.join(hf_cache_dir, model_name) | |
if os.path.exists(model_path): | |
return cls._from_directory(model_path, head_config, dropout, l2_reg) | |
model_path = snapshot_download(repo_id=model_name, cache_dir=hf_cache_dir) | |
return cls._from_directory(model_path, head_config, dropout, l2_reg) | |
def _from_directory(cls, model_path, head_config, dropout=0.1, l2_reg=0.01): | |
""" | |
Load a model from a directory | |
Args: | |
model_path (str): Path to the model directory | |
head_config (dict): Head configuration | |
dropout (float): Dropout rate | |
l2_reg (float): L2 regularization rate | |
""" | |
backbone = AutoModel.from_pretrained(os.path.join(model_path, 'pretrained/backbone')) | |
instance = cls(backbone, head_config, dropout, l2_reg) | |
instance.load(os.path.join(model_path, 'multi-head-sequence-classification-model-model.pth')) | |
instance.head_config = {k: v.weight.shape[1] for k, v in instance.heads.items()} | |
return instance | |