Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from datasets import load_dataset | |
| import numpy as np | |
| import os | |
| from sklearn.metrics import accuracy_score, precision_recall_fscore_support | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader | |
| from transformers import AutoModelForTokenClassification, AutoTokenizer, DataCollatorForTokenClassification | |
| from transformers import DebertaV2Config, DebertaV2ForTokenClassification | |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" | |
| # print weights | |
| def print_trainable_parameters(model): | |
| pytorch_total_params = sum(p.numel() for p in model.parameters()) | |
| torch_total_params = sum(p.numel() for p in model.parameters() if p.requires_grad) | |
| print(f'total params: {pytorch_total_params}. tunable params: {torch_total_params}') | |
| device = torch.device('cpu') | |
| print(f"Is CUDA available: {torch.cuda.is_available()}") | |
| # True | |
| if torch.cuda.is_available(): | |
| print(f"CUDA device: {torch.cuda.get_device_name(torch.cuda.current_device())}") | |
| device = torch.device('cuda') | |
| # Load models | |
| st.write('Loading the pretrained model ...') | |
| teacher_model_name = "iiiorg/piiranha-v1-detect-personal-information" | |
| teacher_model = AutoModelForTokenClassification.from_pretrained(teacher_model_name) | |
| tokenizer = AutoTokenizer.from_pretrained(teacher_model_name) | |
| print(teacher_model) | |
| print_trainable_parameters(teacher_model) | |
| label2id = teacher_model.config.label2id | |
| id2label = teacher_model.config.id2label | |
| st.write("id2label: ", id2label) | |
| st.write("label2id: ", label2id) | |
| dimension = len(id2label) | |
| st.write("dimension", dimension) | |
| student_model_config = teacher_model.config | |
| student_model_config.num_attention_heads = 8 | |
| student_model_config.num_hidden_layers = 4 | |
| student_model = DebertaV2ForTokenClassification.from_pretrained( | |
| "microsoft/mdeberta-v3-base", | |
| config=student_model_config) | |
| # ignore_mismatched_sizes=True) | |
| print(student_model) | |
| print_trainable_parameters(student_model) | |
| if torch.cuda.is_available(): | |
| teacher_model = teacher_model.to(device) | |
| student_model = student_model.to(device) | |
| # Load data. | |
| raw_dataset = load_dataset("ai4privacy/pii-masking-400k", split='train') | |
| raw_dataset = raw_dataset.filter(lambda example: example["language"].startswith("en")) | |
| #raw_dataset = raw_dataset.select(range(2000)) | |
| raw_dataset = raw_dataset.filter(lambda example, idx: idx % 11 == 0, with_indices=True) | |
| raw_dataset = raw_dataset.train_test_split(test_size=0.2) | |
| print(raw_dataset) | |
| print(raw_dataset.column_names) | |
| # inputs = tokenizer( | |
| # raw_dataset['train'][0]['mbert_tokens'], | |
| # truncation=True, | |
| # is_split_into_words=True) | |
| # print(inputs) | |
| # print(inputs.tokens()) | |
| # print(inputs.word_ids()) | |
| # function to align labels with tokens | |
| # --> special tokens: -100 label id (ignored by cross entropy), | |
| # --> if tokens are inside a word, replace 'B-' with 'I-' | |
| def align_labels_with_tokens(labels): | |
| aligned_label_ids = [] | |
| aligned_label_ids.append(-100) | |
| for i, label in enumerate(labels): | |
| if label.startswith("B-"): | |
| label = label.replace("B-", "I-") | |
| aligned_label_ids.append(label2id[label]) | |
| aligned_label_ids.append(-100) | |
| return aligned_label_ids | |
| # create tokenize function | |
| def tokenize_function(examples): | |
| # tokenize and truncate text. The examples argument would have already stripped | |
| # the train or test label. | |
| new_labels = [] | |
| inputs = tokenizer( | |
| examples['mbert_tokens'], | |
| is_split_into_words=True, | |
| padding=True, | |
| truncation=True, | |
| max_length=512) | |
| for _, labels in enumerate(examples['mbert_token_classes']): | |
| new_labels.append(align_labels_with_tokens(labels)) | |
| inputs["labels"] = new_labels | |
| return inputs | |
| # tokenize training and validation datasets | |
| tokenized_data = raw_dataset.map( | |
| tokenize_function, | |
| batched=True) | |
| tokenized_data.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels']) | |
| # data collator | |
| data_collator = DataCollatorForTokenClassification(tokenizer) | |
| st.write(tokenized_data["train"][:2]["labels"]) | |
| # Function to evaluate model performance | |
| def evaluate_model(model, dataloader, device): | |
| model.eval() # Set model to evaluation mode | |
| all_preds = [] | |
| all_labels = [] | |
| # Disable gradient calculations | |
| with torch.no_grad(): | |
| for batch in dataloader: | |
| input_ids = batch['input_ids'].to(device) | |
| attention_mask = batch['attention_mask'].to(device) | |
| labels = batch['labels'].to(device) | |
| # Forward pass to get logits | |
| outputs = model(input_ids, attention_mask=attention_mask) | |
| logits = outputs.logits | |
| # Get predictions | |
| preds = torch.argmax(logits, dim=-1).cpu().numpy() | |
| all_preds.extend(preds) | |
| all_labels.extend(labels.cpu().numpy()) | |
| # Calculate evaluation metrics | |
| print("evaluate_model sizes") | |
| print(len(all_preds[0])) | |
| print(len(all_labels[0])) | |
| all_preds = np.asarray(all_preds, dtype=np.float32) | |
| all_labels = np.asarray(all_labels, dtype=np.float32) | |
| print("Flattened sizes") | |
| print(all_preds.size) | |
| print(all_labels.size) | |
| all_preds = all_preds.flatten() | |
| all_labels = all_labels.flatten() | |
| accuracy = accuracy_score(all_labels, all_preds) | |
| precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='micro') | |
| return accuracy, precision, recall, f1 | |
| # Function to compute distillation and hard-label loss | |
| def distillation_loss(student_logits, teacher_logits, true_labels, temperature, alpha): | |
| # print("Distillation loss sizes") | |
| # print(teacher_logits.size()) | |
| # print(student_logits.size()) | |
| # print(true_labels.size()) | |
| # Compute soft targets from teacher logits | |
| soft_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1) | |
| student_soft = nn.functional.log_softmax(student_logits / temperature, dim=-1) | |
| # KL Divergence loss for distillation | |
| distill_loss = nn.functional.kl_div(student_soft, soft_targets, reduction='batchmean') * (temperature ** 2) | |
| # Cross-entropy loss for hard labels | |
| student_logit_reshape = torch.transpose(student_logits, 1, 2) # transpose to match the labels dimension | |
| hard_loss = nn.CrossEntropyLoss()(student_logit_reshape, true_labels) | |
| # Combine losses | |
| loss = alpha * distill_loss + (1.0 - alpha) * hard_loss | |
| return loss | |
| # hyperparameters | |
| batch_size = 32 | |
| lr = 1e-4 | |
| num_epochs = 30 | |
| temperature = 2.0 | |
| alpha = 0.5 | |
| # define optimizer | |
| optimizer = optim.Adam(student_model.parameters(), lr=lr) | |
| # create training data loader | |
| dataloader = DataLoader(tokenized_data['train'], batch_size=batch_size, collate_fn=data_collator) | |
| # create testing data loader | |
| test_dataloader = DataLoader(tokenized_data['test'], batch_size=batch_size, collate_fn=data_collator) | |
| # put student model in train mode | |
| student_model.train() | |
| # train model | |
| for epoch in range(num_epochs): | |
| for batch in dataloader: | |
| # Prepare inputs | |
| input_ids = batch['input_ids'].to(device) | |
| attention_mask = batch['attention_mask'].to(device) | |
| labels = batch['labels'].to(device) | |
| # Disable gradient calculation for teacher model | |
| with torch.no_grad(): | |
| teacher_outputs = teacher_model(input_ids, attention_mask=attention_mask) | |
| teacher_logits = teacher_outputs.logits | |
| # Forward pass through the student model | |
| student_outputs = student_model(input_ids, attention_mask=attention_mask) | |
| student_logits = student_outputs.logits | |
| # Compute the distillation loss | |
| loss = distillation_loss(student_logits, teacher_logits, labels, temperature, alpha) | |
| # Backpropagation | |
| optimizer.zero_grad() | |
| loss.backward() | |
| optimizer.step() | |
| print(f"Epoch {epoch + 1} completed with loss: {loss.item()}") | |
| # Evaluate the teacher model | |
| teacher_accuracy, teacher_precision, teacher_recall, teacher_f1 = evaluate_model(teacher_model, test_dataloader, device) | |
| print(f"Teacher (test) - Accuracy: {teacher_accuracy:.4f}, Precision: {teacher_precision:.4f}, Recall: {teacher_recall:.4f}, F1 Score: {teacher_f1:.4f}") | |
| # Evaluate the student model | |
| student_accuracy, student_precision, student_recall, student_f1 = evaluate_model(student_model, test_dataloader, device) | |
| print(f"Student (test) - Accuracy: {student_accuracy:.4f}, Precision: {student_precision:.4f}, Recall: {student_recall:.4f}, F1 Score: {student_f1:.4f}") | |
| print("\n") | |
| # put student model back into train mode | |
| student_model.train() | |
| #Compare the models | |
| # create testing data loader | |
| validation_dataloader = DataLoader(tokenized_data['test'], batch_size=8, collate_fn=data_collator) | |
| # Evaluate the teacher model | |
| teacher_accuracy, teacher_precision, teacher_recall, teacher_f1 = evaluate_model(teacher_model, validation_dataloader, device) | |
| print(f"Teacher (validation) - Accuracy: {teacher_accuracy:.4f}, Precision: {teacher_precision:.4f}, Recall: {teacher_recall:.4f}, F1 Score: {teacher_f1:.4f}") | |
| # Evaluate the student model | |
| student_accuracy, student_precision, student_recall, student_f1 = evaluate_model(student_model, validation_dataloader, device) | |
| print(f"Student (validation) - Accuracy: {student_accuracy:.4f}, Precision: {student_precision:.4f}, Recall: {student_recall:.4f}, F1 Score: {student_f1:.4f}") | |
| st.write('Pushing model to huggingface') | |
| # Push model to huggingface | |
| hf_name = 'CarolXia' # your hf username or org name | |
| mode_name = "pii-kd-deberta-v2" | |
| model_id = hf_name + "/" + mode_name | |
| student_model.push_to_hub(model_id, token=st.secrets["HUGGINGFACE_TOKEN"]) | |