!pip install -q -U watermark |
!pip install -qq transformers |
import transformers |
from transformers import BertModel, BertTokenizer, AdamW, get_linear_schedule_with_warmup |
import torch |
import numpy as np |
import pandas as pd |
import seaborn as sns |
from pylab import rcParams |
import matplotlib.pyplot as plt |
from matplotlib import rc |
from sklearn.model_selection import train_test_split |
from sklearn.metrics import confusion_matrix, classification_report |
from collections import defaultdict |
from textwrap import wrap |
from torch import nn, optim |
from torch.utils.data import Dataset, DataLoader |
import torch.nn.functional as F |
sns.set(style='whitegrid', palette='muted', font_scale=1.2) |
HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"] |
sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE)) |
rcParams['figure.figsize'] = 12, 8 |
np.random.seed(RANDOM_SEED) |
torch.manual_seed(RANDOM_SEED) |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
!gdown --id 1S6qMioqPJjyBLpLVz4gmRTnJHnjitnuV |
!gdown --id 1zdmewp7ayS4js4VtrJEHzAheSW-5NBZv |
df = pd.read_csv("reviews.csv") |
sns.countplot(x='score', data = df) |
plt.xlabel('review score'); |
def to_sentiment(rating): |
rating = int(rating) |
if rating <= 2: |
return 0 |
elif rating == 3: |
return 1 |
else: |
return 2 |
df['sentiment'] = df.score.apply(to_sentiment) |
class_names = ['negative', 'neutral', 'positive'] |
print(df.sentiment) |
ax = sns.countplot(x='sentiment', data = df) |
plt.xlabel('review sentiment') |
ax.set_xticklabels(class_names); |
PRE_TRAINED_MODEL_NAME = 'bert-base-uncased' |
tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME) |
sample_txt = 'When was I last outside? I am stuck at home for 2 weeks.' |
tokens = tokenizer.tokenize(sample_txt) |
token_ids = tokenizer.convert_tokens_to_ids(tokens) |
print(f' Sentence: {sample_txt}') |
print(f' Tokens: {tokens}') |
print(f'Token IDs: {token_ids}') |
tokenizer.sep_token, tokenizer.sep_token_id |
tokenizer.cls_token, tokenizer.cls_token_id |
tokenizer.pad_token, tokenizer.pad_token_id |
tokenizer.unk_token, tokenizer.unk_token_id |
encoding = tokenizer.encode_plus( |
sample_txt, |
max_length=32, |
add_special_tokens=True, |
return_token_type_ids=False, |
pad_to_max_length=True, |
return_attention_mask=True, |
return_tensors='pt', |
) |
encoding.keys() |
print(len(encoding['input_ids'][0])) |
encoding['input_ids'][0] |
print(len(encoding['attention_mask'][0])) |
encoding['attention_mask'] |
tokenizer.convert_ids_to_tokens(encoding['input_ids'][0]) |
token_lens = [] |
for txt in df.content: |
tokens = tokenizer.encode(txt, max_length=512) |
token_lens.append(len(tokens)) |
sns.distplot(token_lens) |
plt.xlim([0, 256]); |
plt.xlabel('Token count'); |
MAX_LEN = 160 |
class GPReviewDataset(Dataset): |
def __init__(self, reviews, targets, tokenizer, max_len): |
self.reviews = reviews |
self.targets = targets |
self.tokenizer = tokenizer |
self.max_len = max_len |
def __len__(self): |
return len(self.reviews) |
def __getitem__(self, item): |
review = str(self.reviews[item]) |
target = self.targets[item] |
encoding = self.tokenizer.encode_plus( |
review, |
add_special_tokens=True, |
max_length=self.max_len, |
return_token_type_ids=False, |
pad_to_max_length=True, |
return_attention_mask=True, |
return_tensors='pt', |
) |
return { |
'review_text': review, |
'input_ids': encoding['input_ids'].flatten(), |
'attention_mask': encoding['attention_mask'].flatten(), |
'targets': torch.tensor(target, dtype=torch.long) |
} |
df_train, df_test = train_test_split(df, test_size=0.1, random_state=RANDOM_SEED) |
df_val, df_test = train_test_split(df_test, test_size=0.5, random_state=RANDOM_SEED) |
df_train.shape, df_val.shape, df_test.shape |
def create_data_loader(df, tokenizer, max_len, batch_size): |
ds = GPReviewDataset( |
reviews=df.content.to_numpy(), |
targets=df.sentiment.to_numpy(), |
tokenizer=tokenizer, |
max_len=max_len |
) |
return DataLoader( |
ds, |
batch_size=batch_size, |
num_workers=4 |
) |
train_data_loader = create_data_loader(df_train, tokenizer, MAX_LEN, BATCH_SIZE) |
val_data_loader = create_data_loader(df_val, tokenizer, MAX_LEN, BATCH_SIZE) |
test_data_loader = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE) |
data = next(iter(train_data_loader)) |
data.keys() |
print(data['input_ids'].shape) |
print(data['attention_mask'].shape) |
print(data['targets'].shape) |
bert_model = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME) |
last_hidden_state, pooled_output = bert_model( |
input_ids=encoding['input_ids'], |
attention_mask=encoding['attention_mask'], |
return_dict = False |
) |
last_hidden_state.shape |
bert_model.config.hidden_size |
pooled_output.shape |
class SentimentClassifier(nn.Module): |
def __init__(self, n_classes): |
super(SentimentClassifier, self).__init__() |
self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME) |
self.drop = nn.Dropout(p=0.3) |
self.out = nn.Linear(self.bert.config.hidden_size, n_classes) |
def forward(self, input_ids, attention_mask): |
returned = self.bert( |
input_ids=input_ids, |
attention_mask=attention_mask |
) |
pooled_output = returned["pooler_output"] |
output = self.drop(pooled_output) |
return self.out(output) |
model = SentimentClassifier(len(class_names)) |
model = model.to(device) |
input_ids = data['input_ids'].to(device) |
attention_mask = data['attention_mask'].to(device) |
print(input_ids.shape) |
print(attention_mask.shape) |
F.softmax(model(input_ids, attention_mask), dim=1) |
EPOCHS = 6 |
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False) |
total_steps = len(train_data_loader) * EPOCHS |
scheduler = get_linear_schedule_with_warmup( |
optimizer, |
num_warmup_steps=0, |
num_training_steps=total_steps |
) |
loss_fn = nn.CrossEntropyLoss().to(device) |
def train_epoch( |
model, |
data_loader, |
loss_fn, |
optimizer, |
device, |
scheduler, |
n_examples |
): |
model = model.train() |
losses = [] |
correct_predictions = 0 |
for d in data_loader: |
input_ids = d["input_ids"].to(device) |
attention_mask = d["attention_mask"].to(device) |
targets = d["targets"].to(device) |
outputs = model( |
input_ids=input_ids, |
attention_mask=attention_mask |
) |
_, preds = torch.max(outputs, dim=1) |
loss = loss_fn(outputs, targets) |
correct_predictions += torch.sum(preds == targets) |
losses.append(loss.item()) |
loss.backward() |
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) |
optimizer.step() |
scheduler.step() |
optimizer.zero_grad() |
return correct_predictions.double() / n_examples, np.mean(losses) |
def eval_model(model, data_loader, loss_fn, device, n_examples): |
model = model.eval() |
losses = [] |
correct_predictions = 0 |
with torch.no_grad(): |
for d in data_loader: |
input_ids = d["input_ids"].to(device) |
attention_mask = d["attention_mask"].to(device) |
targets = d["targets"].to(device) |
outputs = model( |
input_ids=input_ids, |
attention_mask=attention_mask |
) |
_, preds = torch.max(outputs, dim=1) |
loss = loss_fn(outputs, targets) |
correct_predictions += torch.sum(preds == targets) |
losses.append(loss.item()) |
return correct_predictions.double() / n_examples, np.mean(losses) |
%%time |
history = defaultdict(list) |
best_accuracy = 0 |
for epoch in range(EPOCHS): |
print(f'Epoch {epoch + 1}/{EPOCHS}') |
print('-' * 10) |
train_acc, train_loss = train_epoch( |
model, |
train_data_loader, |
loss_fn, |
optimizer, |
device, |
scheduler, |
len(df_train) |
) |
print(f'Train loss {train_loss} accuracy {train_acc}') |
val_acc, val_loss = eval_model( |
model, |
val_data_loader, |
loss_fn, |
device, |
len(df_val) |
) |
print(f'Val loss {val_loss} accuracy {val_acc}') |
print() |
history['train_acc'].append(train_acc) |
history['train_loss'].append(train_loss) |
history['val_acc'].append(val_acc) |
history['val_loss'].append(val_loss) |
if val_acc > best_accuracy: |
torch.save(model.state_dict(), 'best_model_state.bin') |
best_accuracy = val_acc |
print(history['train_acc']) |
list_of_train_accuracy= [t.cpu().numpy() for t in history['train_acc']] |
list_of_train_accuracy |
print(history['val_acc']) |
list_of_val_accuracy= [t.cpu().numpy() for t in history['val_acc']] |
list_of_val_accuracy |
plt.plot(list_of_train_accuracy, label='train accuracy') |
plt.plot(list_of_val_accuracy, label='validation accuracy') |
plt.title('Training history') |
plt.ylabel('Accuracy') |
plt.xlabel('Epoch') |
plt.legend() |
plt.ylim([0, 1]); |
test_acc, _ = eval_model( |
model, |
test_data_loader, |
loss_fn, |
device, |
len(df_test) |
) |
print(('\n')) |
print('Test Accuracy : ', test_acc.item()) |
def get_predictions(model, data_loader): |
model = model.eval() |
review_texts = [] |
predictions = [] |
prediction_probs = [] |
real_values = [] |
with torch.no_grad(): |
for d in data_loader: |
texts = d["review_text"] |
input_ids = d["input_ids"].to(device) |
attention_mask = d["attention_mask"].to(device) |
targets = d["targets"].to(device) |
outputs = model( |
input_ids=input_ids, |
attention_mask=attention_mask |
) |
_, preds = torch.max(outputs, dim=1) |
probs = F.softmax(outputs, dim=1) |
review_texts.extend(texts) |
predictions.extend(preds) |
prediction_probs.extend(probs) |
real_values.extend(targets) |
predictions = torch.stack(predictions).cpu() |
prediction_probs = torch.stack(prediction_probs).cpu() |
real_values = torch.stack(real_values).cpu() |
return review_texts, predictions, prediction_probs, real_values |
y_review_texts, y_pred, y_pred_probs, y_test = get_predictions( |
model, |
test_data_loader |
) |
print(classification_report(y_test, y_pred, target_names=class_names)) |
def show_confusion_matrix(confusion_matrix): |
hmap = sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues") |
hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right') |
hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right') |
plt.ylabel('True sentiment') |
plt.xlabel('Predicted sentiment'); |
cm = confusion_matrix(y_test, y_pred) |
df_cm = pd.DataFrame(cm, index=class_names, columns=class_names) |
show_confusion_matrix(df_cm) |
idx = 2 |
review_text = y_review_texts[idx] |
true_sentiment = y_test[idx] |
pred_df = pd.DataFrame({ |
'class_names': class_names, |
'values': y_pred_probs[idx] |
}) |
print("\n".join(wrap(review_text))) |
print() |
print(f'True sentiment: {class_names[true_sentiment]}') |
sns.barplot(x='values', y='class_names', data=pred_df, orient='h') |
plt.ylabel('sentiment') |
plt.xlabel('probability') |
plt.xlim([0, 1]); |
review_text = input("Enter a comment for sentiment analysis: ") |
encoded_review = tokenizer.encode_plus( |
review_text, |
max_length=MAX_LEN, |
add_special_tokens=True, |
return_token_type_ids=False, |
pad_to_max_length=True, |
return_attention_mask=True, |
return_tensors='pt', |
) |
input_ids = encoded_review['input_ids'].to(device) |
attention_mask = encoded_review['attention_mask'].to(device) |
output = model(input_ids, attention_mask) |
_, prediction = torch.max(output, dim=1) |
print(f'Review text: {review_text}') |
print(f'Sentiment : {class_names[prediction]}') |
def suggest_improved_text(review_text, model, tokenizer): |
sentiment = analyze_sentiment(review_text, model, tokenizer) |
if sentiment in ['negative', 'neutral']: |
encoded_input = tokenizer.encode_plus( |
review_text, |
max_length=MAX_LEN, |
add_special_tokens=True, |
return_token_type_ids=False, |
pad_to_max_length=True, |
return_attention_mask=True, |
return_tensors='pt' |
) |
input_ids = encoded_input['input_ids'].to(device) |
attention_mask = encoded_input['attention_mask'].to(device) |
outputs = model(input_ids, attention_mask) |
_, predicted_sentiment = torch.max(outputs, dim=1) |
improved_text = generate_improved_text(text, predicted_sentiment) |
return improved_text |
return review_text |
def analyze_sentiment(review_text, model, tokenizer): |
encoded_input = tokenizer.encode_plus( |
review_text, |
max_length=MAX_LEN, |
add_special_tokens=True, |
return_token_type_ids=False, |
pad_to_max_length=True, |
return_attention_mask=True, |
return_tensors='pt' |
) |
input_ids = encoded_input['input_ids'].to(device) |
attention_mask = encoded_input['attention_mask'].to(device) |
outputs = model(input_ids, attention_mask) |
_, predicted_sentiment = torch.max(outputs, dim=1) |
return class_names[predicted_sentiment] |
def generate_improved_text(review_text, predicted_sentiment): |
positive_words = ["marvellous", "fantastic", "excellent", "admirable", "formidable"] |
if predicted_sentiment == 0: |
improved_text = review_text + " " + " ".join(positive_words) |
else: |
improved_text = review_text |
return improved_text |