!pip install transformers |
import pandas as pd |
from wordcloud import WordCloud |
import seaborn as sns |
import re |
import string |
from collections import Counter, defaultdict |
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer |
import plotly.express as px |
from plotly.subplots import make_subplots |
import plotly.graph_objects as go |
from plotly.offline import plot |
import matplotlib.gridspec as gridspec |
from matplotlib.ticker import MaxNLocator |
import matplotlib.patches as mpatches |
import matplotlib.pyplot as plt |
import warnings |
warnings.filterwarnings('ignore') |
import nltk |
nltk.download('stopwords') |
from nltk.corpus import stopwords |
stopWords_nltk = set(stopwords.words('english')) |
import re |
from typing import Union, List |
class CleanText(): |
""" clearing text except digits () . , word character """ |
def __init__(self, clean_pattern = r"[^A-ZĞÜŞİÖÇIa-zğüı'şöç0-9.\"',()]"): |
self.clean_pattern =clean_pattern |
def __call__(self, text: Union[str, list]) -> str: |
if isinstance(text, str): |
docs = [[text]] |
if isinstance(text, list): |
docs = text |
text = [[re.sub(self.clean_pattern, " ", sent) for sent in sents] for sents in docs] |
text = ' '.join([' '.join(sents) for sents in text]) |
return text |
def remove_emoji(data): |
emoj = re.compile("[" |
u"\U0001F600-\U0001F64F" |
u"\U0001F300-\U0001F5FF" |
u"\U0001F680-\U0001F6FF" |
u"\U0001F1E0-\U0001F1FF" |
u"\U00002500-\U00002BEF" |
u"\U00002702-\U000027B0" |
u"\U00002702-\U000027B0" |
u"\U000024C2-\U0001F251" |
u"\U0001f926-\U0001f937" |
u"\U00010000-\U0010ffff" |
u"\u2640-\u2642" |
u"\u2600-\u2B55" |
u"\u200d" |
u"\u23cf" |
u"\u23e9" |
u"\u231a" |
u"\ufe0f" |
u"\u3030" |
"]+", re.UNICODE) |
return re.sub(emoj, '', data) |
def tokenize(text): |
""" basic tokenize method with word character, non word character and digits """ |
text = re.sub(r" +", " ", str(text)) |
text = re.split(r"(\d+|[a-zA-ZğüşıöçĞÜŞİÖÇ]+|\W)", text) |
text = list(filter(lambda x: x != '' and x != ' ', text)) |
sent_tokenized = ' '.join(text) |
return sent_tokenized |
regex = re.compile('[%s]' % re.escape(string.punctuation)) |
def remove_punct(text): |
text = regex.sub(" ", text) |
return text |
clean = CleanText() |
def label_encode(x): |
if x == 1 or x == 2: |
return 0 |
if x == 3: |
return 1 |
if x == 5 or x == 4: |
return 2 |
def label2name(x): |
if x == 0: |
return "Negative" |
if x == 1: |
return "Neutral" |
if x == 2: |
return "Positive" |
from google.colab import files |
uploaded = files.upload() |
df = pd.read_csv('tripadvisor_hotel_reviews.csv') |
print("df.columns: ", df.columns) |
fig = px.histogram(df, |
x = 'Rating', |
title = 'Histogram of Review Rating', |
template = 'ggplot2', |
color = 'Rating', |
color_discrete_sequence= px.colors.sequential.Blues_r, |
opacity = 0.8, |
height = 525, |
width = 835, |
) |
fig.update_yaxes(title='Count') |
fig.show() |
df.info() |
df["label"] = df["Rating"].apply(lambda x: label_encode(x)) |
df["label_name"] = df["label"].apply(lambda x: label2name(x)) |
df["Review"] = df["Review"].apply(lambda x: remove_punct(clean(remove_emoji(x).lower())[0][0])) |
df.head() |
fig = make_subplots(rows=1, cols=2, specs=[[{"type": "pie"}, {"type": "bar"}]]) |
colors = ['gold', 'mediumturquoise', 'lightgreen'] |
fig.add_trace(go.Pie(labels=df.label_name.value_counts().index, |
values=df.label.value_counts().values), 1, 1) |
fig.update_traces(hoverinfo='label+percent', textfont_size=20, |
marker=dict(colors=colors, line=dict(color='#000000', width=2))) |
fig.add_trace(go.Bar(x=df.label_name.value_counts().index, y=df.label.value_counts().values, marker_color = colors), 1,2) |
fig.show() |
import pandas as pd |
import numpy as np |
import os |
import random |
from pathlib import Path |
import json |
import torch |
from tqdm.notebook import tqdm |
from transformers import BertTokenizer |
from torch.utils.data import TensorDataset |
from transformers import BertForSequenceClassification |
class Config(): |
seed_val = 17 |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
epochs = 5 |
batch_size = 6 |
seq_length = 512 |
lr = 2e-5 |
eps = 1e-8 |
pretrained_model = 'bert-base-uncased' |
test_size=0.15 |
random_state=42 |
add_special_tokens=True |
return_attention_mask=True |
pad_to_max_length=True |
do_lower_case=False |
return_tensors='pt' |
config = Config() |
params = {"seed_val": config.seed_val, |
"device":str(config.device), |
"epochs":config.epochs, |
"batch_size":config.batch_size, |
"seq_length":config.seq_length, |
"lr":config.lr, |
"eps":config.eps, |
"pretrained_model": config.pretrained_model, |
"test_size":config.test_size, |
"random_state":config.random_state, |
"add_special_tokens":config.add_special_tokens, |
"return_attention_mask":config.return_attention_mask, |
"pad_to_max_length":config.pad_to_max_length, |
"do_lower_case":config.do_lower_case, |
"return_tensors":config.return_tensors, |
} |
import random |
device = config.device |
random.seed(config.seed_val) |
np.random.seed(config.seed_val) |
torch.manual_seed(config.seed_val) |
torch.cuda.manual_seed_all(config.seed_val) |
df.head() |
from sklearn.model_selection import train_test_split |
train_df_, val_df = train_test_split(df, |
test_size=0.10, |
random_state=config.random_state, |
stratify=df.label.values) |
train_df_.head() |
train_df, test_df = train_test_split(train_df_, |
test_size=0.10, |
random_state=42, |
stratify=train_df_.label.values) |
print(len(train_df['label'].unique())) |
print(train_df.shape) |
print(len(val_df['label'].unique())) |
print(val_df.shape) |
print(len(test_df['label'].unique())) |
print(test_df.shape) |
tokenizer = BertTokenizer.from_pretrained(config.pretrained_model, |
do_lower_case=config.do_lower_case) |
encoded_data_train = tokenizer.batch_encode_plus( |
train_df.Review.values, |
add_special_tokens=config.add_special_tokens, |
return_attention_mask=config.return_attention_mask, |
pad_to_max_length=config.pad_to_max_length, |
max_length=config.seq_length, |
return_tensors=config.return_tensors |
) |
encoded_data_val = tokenizer.batch_encode_plus( |
val_df.Review.values, |
add_special_tokens=config.add_special_tokens, |
return_attention_mask=config.return_attention_mask, |
pad_to_max_length=config.pad_to_max_length, |
max_length=config.seq_length, |
return_tensors=config.return_tensors |
) |
input_ids_train = encoded_data_train['input_ids'] |
attention_masks_train = encoded_data_train['attention_mask'] |
labels_train = torch.tensor(train_df.label.values) |
input_ids_val = encoded_data_val['input_ids'] |
attention_masks_val = encoded_data_val['attention_mask'] |
labels_val = torch.tensor(val_df.label.values) |
dataset_train = TensorDataset(input_ids_train, attention_masks_train, labels_train) |
dataset_val = TensorDataset(input_ids_val, attention_masks_val, labels_val) |
model = BertForSequenceClassification.from_pretrained(config.pretrained_model, |
num_labels=3, |
output_attentions=False, |
output_hidden_states=False) |
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler |
dataloader_train = DataLoader(dataset_train, |
sampler=RandomSampler(dataset_train), |
batch_size=config.batch_size) |
dataloader_validation = DataLoader(dataset_val, |
sampler=SequentialSampler(dataset_val), |
batch_size=config.batch_size) |
from transformers import AdamW, get_linear_schedule_with_warmup |
optimizer = AdamW(model.parameters(), |
lr=config.lr, |
eps=config.eps) |
scheduler = get_linear_schedule_with_warmup(optimizer, |
num_warmup_steps=0, |
num_training_steps=len(dataloader_train)*config.epochs) |
from sklearn.metrics import f1_score |
def f1_score_func(preds, labels): |
preds_flat = np.argmax(preds, axis=1).flatten() |
labels_flat = labels.flatten() |
return f1_score(labels_flat, preds_flat, average='weighted') |
def accuracy_per_class(preds, labels, label_dict): |
label_dict_inverse = {v: k for k, v in label_dict.items()} |
preds_flat = np.argmax(preds, axis=1).flatten() |
labels_flat = labels.flatten() |
for label in np.unique(labels_flat): |
y_preds = preds_flat[labels_flat==label] |
y_true = labels_flat[labels_flat==label] |
print(f'Class: {label_dict_inverse[label]}') |
print(f'Accuracy: {len(y_preds[y_preds==label])}/{len(y_true)}\n') |
def evaluate(dataloader_val): |
model.eval() |
loss_val_total = 0 |
predictions, true_vals = [], [] |
for batch in dataloader_val: |
batch = tuple(b.to(config.device) for b in batch) |
inputs = {'input_ids': batch[0], |
'attention_mask': batch[1], |
'labels': batch[2], |
} |
with torch.no_grad(): |
outputs = model(**inputs) |
loss = outputs[0] |
logits = outputs[1] |
loss_val_total += loss.item() |
logits = logits.detach().cpu().numpy() |
label_ids = inputs['labels'].cpu().numpy() |
predictions.append(logits) |
true_vals.append(label_ids) |
loss_val_avg = loss_val_total/len(dataloader_val) |
predictions = np.concatenate(predictions, axis=0) |
true_vals = np.concatenate(true_vals, axis=0) |
return loss_val_avg, predictions, true_vals |
config.device |
model.to(config.device) |
for epoch in tqdm(range(1, config.epochs+1)): |
model.train() |
loss_train_total = 0 |
progress_bar = tqdm(dataloader_train, desc='Epoch {:1d}'.format(epoch), leave=False, disable=False) |
for batch in progress_bar: |
model.zero_grad() |
batch = tuple(b.to(config.device) for b in batch) |
inputs = {'input_ids': batch[0], |
'attention_mask': batch[1], |
'labels': batch[2], |
} |
outputs = model(**inputs) |
loss = outputs[0] |
loss_train_total += loss.item() |
loss.backward() |
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) |
optimizer.step() |
scheduler.step() |
progress_bar.set_postfix({'training_loss': '{:.3f}'.format(loss.item()/len(batch))}) |
torch.save(model.state_dict(), f'_BERT_epoch_{epoch}.model') |
tqdm.write(f'\nEpoch {epoch}') |
loss_train_avg = loss_train_total/len(dataloader_train) |
tqdm.write(f'Training loss: {loss_train_avg}') |
val_loss, predictions, true_vals = evaluate(dataloader_validation) |
val_f1 = f1_score_func(predictions, true_vals) |
tqdm.write(f'Validation loss: {val_loss}') |
tqdm.write(f'F1 Score (Weighted): {val_f1}'); |
with Path('params.json').open("w") as f: |
json.dump(params, f, ensure_ascii=False, indent=4) |
model.load_state_dict(torch.load(f'./_BERT_epoch_3.model', map_location=torch.device('cpu'))) |
from sklearn.metrics import classification_report |
preds_flat = np.argmax(predictions, axis=1).flatten() |
print(classification_report(preds_flat, true_vals)) |
pred_final = [] |
for i, row in tqdm(val_df.iterrows(), total=val_df.shape[0]): |
predictions = [] |
review = row["Review"] |
encoded_data_test_single = tokenizer.batch_encode_plus( |
[review], |
add_special_tokens=config.add_special_tokens, |
return_attention_mask=config.return_attention_mask, |
pad_to_max_length=config.pad_to_max_length, |
max_length=config.seq_length, |
return_tensors=config.return_tensors |
) |
input_ids_test = encoded_data_test_single['input_ids'] |
attention_masks_test = encoded_data_test_single['attention_mask'] |
inputs = {'input_ids': input_ids_test.to(device), |
'attention_mask':attention_masks_test.to(device), |
} |
with torch.no_grad(): |
outputs = model(**inputs) |
logits = outputs[0] |
logits = logits.detach().cpu().numpy() |
predictions.append(logits) |
predictions = np.concatenate(predictions, axis=0) |
pred_final.append(np.argmax(predictions, axis=1).flatten()[0]) |
val_df["pred"] = pred_final |
control = val_df.pred.values == val_df.label.values |
val_df["control"] = control |
val_df = val_df[val_df.control == False] |
name2label = {"Negative":0, |
"Neutral":1, |
"Positive":2 |
} |
label2name = {v: k for k, v in name2label.items()} |
val_df["pred_name"] = val_df.pred.apply(lambda x: label2name.get(x)) |
from sklearn.metrics import confusion_matrix |
pred_name_values = val_df.pred_name.values |
label_values = val_df.label_name.values |
confmat = confusion_matrix(label_values, pred_name_values, labels=list(name2label.keys())) |
confmat |
df_confusion_val = pd.crosstab(label_values, pred_name_values) |
df_confusion_val |
df_confusion_val.to_csv("val_df_confusion.csv") |
test_df.head() |
encoded_data_test = tokenizer.batch_encode_plus( |
test_df.Review.values, |
add_special_tokens=config.add_special_tokens, |
return_attention_mask=config.return_attention_mask, |
pad_to_max_length=config.pad_to_max_length, |
max_length=config.seq_length, |
return_tensors=config.return_tensors |
) |
input_ids_test = encoded_data_test['input_ids'] |
attention_masks_test = encoded_data_test['attention_mask'] |
labels_test = torch.tensor(test_df.label.values) |
model = BertForSequenceClassification.from_pretrained(config.pretrained_model, |
num_labels=3, |
output_attentions=False, |
output_hidden_states=False) |
model.to(config.device) |
model.load_state_dict(torch.load(f'./_BERT_epoch_3.model', map_location=torch.device('cpu'))) |
_, predictions_test, true_vals_test = evaluate(dataloader_validation) |
def predict_sentiment(text): |
encoded_text = tokenizer.encode_plus( |
text, |
add_special_tokens=config.add_special_tokens, |
return_attention_mask=config.return_attention_mask, |
pad_to_max_length=config.pad_to_max_length, |
max_length=config.seq_length, |
return_tensors=config.return_tensors |
) |
input_ids = encoded_text['input_ids'].to(config.device) |
attention_mask = encoded_text['attention_mask'].to(config.device) |
model.eval() |
with torch.no_grad(): |
outputs = model(input_ids, attention_mask) |
logits = outputs[0] |
logits = logits.detach().cpu().numpy() |
pred = np.argmax(logits, axis=1).flatten()[0] |
pred_name = label2name.get(pred) |
return pred_name |
text = "Your text here" |
prediction = predict_sentiment(text) |
print(f"The sentiment of the text is: {prediction}") |
from sklearn.metrics import classification_report |
preds_flat_test = np.argmax(predictions_test, axis=1).flatten() |
print(classification_report(preds_flat_test, true_vals_test)) |
pred_final = [] |
for i, row in tqdm(test_df.iterrows(), total=test_df.shape[0]): |
predictions = [] |
review = row["Review"] |
encoded_data_test_single = tokenizer.batch_encode_plus( |
[review], |
add_special_tokens=config.add_special_tokens, |
return_attention_mask=config.return_attention_mask, |
pad_to_max_length=config.pad_to_max_length, |
max_length=config.seq_length, |
return_tensors=config.return_tensors |
) |
input_ids_test = encoded_data_test_single['input_ids'] |
attention_masks_test = encoded_data_test_single['attention_mask'] |
inputs = {'input_ids': input_ids_test.to(device), |
'attention_mask':attention_masks_test.to(device), |
} |
with torch.no_grad(): |
outputs = model(**inputs) |
logits = outputs[0] |
logits = logits.detach().cpu().numpy() |
predictions.append(logits) |
predictions = np.concatenate(predictions, axis=0) |
pred_final.append(np.argmax(predictions, axis=1).flatten()[0]) |
test_df["pred"] = pred_final |
control = test_df.pred.values == test_df.label.values |
test_df["control"] = control |
test_df = test_df[test_df.control == False] |
test_df["pred_name"] = test_df.pred.apply(lambda x: label2name.get(x)) |
from sklearn.metrics import confusion_matrix |
pred_name_values = test_df.pred_name.values |
label_values = test_df.label_name.values |
confmat = confusion_matrix(label_values, pred_name_values, labels=list(name2label.keys())) |
confmat |
df_confusion_test = pd.crosstab(label_values, pred_name_values) |
df_confusion_test |
import matplotlib.pyplot as plt |
import seaborn as sns |
fig, ax = plt.subplots(figsize=(10,10)) |
sns.heatmap(confmat, annot=True, fmt='d', |
xticklabels=name2label.keys(), yticklabels=name2label.keys()) |
plt.ylabel('Vraies valeurs') |
plt.xlabel('Prédictions') |
plt.show() |