|
|
|
"""Sentiment_analysis.ipynb |
|
|
|
Automatically generated by Colaboratory. |
|
|
|
Original file is located at |
|
https://colab.research.google.com/drive/1EHgMQQJzwbNja0JVMM2DVvrVTMHIS3Vg |
|
""" |
|
|
|
!pip install transformers |
|
|
|
import pandas as pd |
|
from wordcloud import WordCloud |
|
import seaborn as sns |
|
import re |
|
import string |
|
from collections import Counter, defaultdict |
|
|
|
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer |
|
|
|
import plotly.express as px |
|
from plotly.subplots import make_subplots |
|
import plotly.graph_objects as go |
|
from plotly.offline import plot |
|
|
|
import matplotlib.gridspec as gridspec |
|
from matplotlib.ticker import MaxNLocator |
|
import matplotlib.patches as mpatches |
|
import matplotlib.pyplot as plt |
|
import warnings |
|
warnings.filterwarnings('ignore') |
|
import nltk |
|
nltk.download('stopwords') |
|
from nltk.corpus import stopwords |
|
stopWords_nltk = set(stopwords.words('english')) |
|
|
|
|
|
import re |
|
from typing import Union, List |
|
|
|
class CleanText(): |
|
""" clearing text except digits () . , word character """ |
|
|
|
def __init__(self, clean_pattern = r"[^A-ZĞÜŞİÖÇIa-zğüı'şöç0-9.\"',()]"): |
|
self.clean_pattern =clean_pattern |
|
|
|
def __call__(self, text: Union[str, list]) -> str: |
|
|
|
if isinstance(text, str): |
|
docs = [[text]] |
|
|
|
if isinstance(text, list): |
|
docs = text |
|
|
|
text = [[re.sub(self.clean_pattern, " ", sent) for sent in sents] for sents in docs] |
|
|
|
|
|
text = ' '.join([' '.join(sents) for sents in text]) |
|
|
|
return text |
|
|
|
def remove_emoji(data): |
|
emoj = re.compile("[" |
|
u"\U0001F600-\U0001F64F" |
|
u"\U0001F300-\U0001F5FF" |
|
u"\U0001F680-\U0001F6FF" |
|
u"\U0001F1E0-\U0001F1FF" |
|
u"\U00002500-\U00002BEF" |
|
u"\U00002702-\U000027B0" |
|
u"\U00002702-\U000027B0" |
|
u"\U000024C2-\U0001F251" |
|
u"\U0001f926-\U0001f937" |
|
u"\U00010000-\U0010ffff" |
|
u"\u2640-\u2642" |
|
u"\u2600-\u2B55" |
|
u"\u200d" |
|
u"\u23cf" |
|
u"\u23e9" |
|
u"\u231a" |
|
u"\ufe0f" |
|
u"\u3030" |
|
"]+", re.UNICODE) |
|
return re.sub(emoj, '', data) |
|
|
|
def tokenize(text): |
|
""" basic tokenize method with word character, non word character and digits """ |
|
text = re.sub(r" +", " ", str(text)) |
|
text = re.split(r"(\d+|[a-zA-ZğüşıöçĞÜŞİÖÇ]+|\W)", text) |
|
text = list(filter(lambda x: x != '' and x != ' ', text)) |
|
sent_tokenized = ' '.join(text) |
|
return sent_tokenized |
|
|
|
regex = re.compile('[%s]' % re.escape(string.punctuation)) |
|
|
|
def remove_punct(text): |
|
text = regex.sub(" ", text) |
|
return text |
|
|
|
clean = CleanText() |
|
|
|
def label_encode(x): |
|
if x == 1 or x == 2: |
|
return 0 |
|
if x == 3: |
|
return 1 |
|
if x == 5 or x == 4: |
|
return 2 |
|
|
|
def label2name(x): |
|
if x == 0: |
|
return "Negative" |
|
if x == 1: |
|
return "Neutral" |
|
if x == 2: |
|
return "Positive" |
|
|
|
from google.colab import files |
|
uploaded = files.upload() |
|
df = pd.read_csv('tripadvisor_hotel_reviews.csv') |
|
|
|
print("df.columns: ", df.columns) |
|
|
|
fig = px.histogram(df, |
|
x = 'Rating', |
|
title = 'Histogram of Review Rating', |
|
template = 'ggplot2', |
|
color = 'Rating', |
|
color_discrete_sequence= px.colors.sequential.Blues_r, |
|
opacity = 0.8, |
|
height = 525, |
|
width = 835, |
|
) |
|
|
|
fig.update_yaxes(title='Count') |
|
fig.show() |
|
|
|
df.info() |
|
|
|
df["label"] = df["Rating"].apply(lambda x: label_encode(x)) |
|
df["label_name"] = df["label"].apply(lambda x: label2name(x)) |
|
|
|
df["Review"] = df["Review"].apply(lambda x: remove_punct(clean(remove_emoji(x).lower())[0][0])) |
|
|
|
df.head() |
|
|
|
fig = make_subplots(rows=1, cols=2, specs=[[{"type": "pie"}, {"type": "bar"}]]) |
|
colors = ['gold', 'mediumturquoise', 'lightgreen'] |
|
fig.add_trace(go.Pie(labels=df.label_name.value_counts().index, |
|
values=df.label.value_counts().values), 1, 1) |
|
|
|
fig.update_traces(hoverinfo='label+percent', textfont_size=20, |
|
marker=dict(colors=colors, line=dict(color='#000000', width=2))) |
|
|
|
fig.add_trace(go.Bar(x=df.label_name.value_counts().index, y=df.label.value_counts().values, marker_color = colors), 1,2) |
|
|
|
fig.show() |
|
|
|
import pandas as pd |
|
import numpy as np |
|
import os |
|
import random |
|
from pathlib import Path |
|
import json |
|
|
|
import torch |
|
from tqdm.notebook import tqdm |
|
|
|
from transformers import BertTokenizer |
|
from torch.utils.data import TensorDataset |
|
|
|
from transformers import BertForSequenceClassification |
|
|
|
class Config(): |
|
seed_val = 17 |
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
epochs = 5 |
|
batch_size = 6 |
|
seq_length = 512 |
|
lr = 2e-5 |
|
eps = 1e-8 |
|
pretrained_model = 'bert-base-uncased' |
|
test_size=0.15 |
|
random_state=42 |
|
add_special_tokens=True |
|
return_attention_mask=True |
|
pad_to_max_length=True |
|
do_lower_case=False |
|
return_tensors='pt' |
|
config = Config() |
|
|
|
|
|
params = {"seed_val": config.seed_val, |
|
"device":str(config.device), |
|
"epochs":config.epochs, |
|
"batch_size":config.batch_size, |
|
"seq_length":config.seq_length, |
|
"lr":config.lr, |
|
"eps":config.eps, |
|
"pretrained_model": config.pretrained_model, |
|
"test_size":config.test_size, |
|
"random_state":config.random_state, |
|
"add_special_tokens":config.add_special_tokens, |
|
"return_attention_mask":config.return_attention_mask, |
|
"pad_to_max_length":config.pad_to_max_length, |
|
"do_lower_case":config.do_lower_case, |
|
"return_tensors":config.return_tensors, |
|
} |
|
|
|
import random |
|
|
|
device = config.device |
|
|
|
random.seed(config.seed_val) |
|
np.random.seed(config.seed_val) |
|
torch.manual_seed(config.seed_val) |
|
torch.cuda.manual_seed_all(config.seed_val) |
|
|
|
df.head() |
|
|
|
from sklearn.model_selection import train_test_split |
|
|
|
train_df_, val_df = train_test_split(df, |
|
test_size=0.10, |
|
random_state=config.random_state, |
|
stratify=df.label.values) |
|
|
|
train_df_.head() |
|
|
|
train_df, test_df = train_test_split(train_df_, |
|
test_size=0.10, |
|
random_state=42, |
|
stratify=train_df_.label.values) |
|
|
|
print(len(train_df['label'].unique())) |
|
print(train_df.shape) |
|
|
|
print(len(val_df['label'].unique())) |
|
print(val_df.shape) |
|
|
|
print(len(test_df['label'].unique())) |
|
print(test_df.shape) |
|
|
|
tokenizer = BertTokenizer.from_pretrained(config.pretrained_model, |
|
do_lower_case=config.do_lower_case) |
|
|
|
encoded_data_train = tokenizer.batch_encode_plus( |
|
train_df.Review.values, |
|
add_special_tokens=config.add_special_tokens, |
|
return_attention_mask=config.return_attention_mask, |
|
pad_to_max_length=config.pad_to_max_length, |
|
max_length=config.seq_length, |
|
return_tensors=config.return_tensors |
|
) |
|
encoded_data_val = tokenizer.batch_encode_plus( |
|
val_df.Review.values, |
|
add_special_tokens=config.add_special_tokens, |
|
return_attention_mask=config.return_attention_mask, |
|
pad_to_max_length=config.pad_to_max_length, |
|
max_length=config.seq_length, |
|
return_tensors=config.return_tensors |
|
) |
|
|
|
input_ids_train = encoded_data_train['input_ids'] |
|
attention_masks_train = encoded_data_train['attention_mask'] |
|
labels_train = torch.tensor(train_df.label.values) |
|
|
|
input_ids_val = encoded_data_val['input_ids'] |
|
attention_masks_val = encoded_data_val['attention_mask'] |
|
labels_val = torch.tensor(val_df.label.values) |
|
|
|
dataset_train = TensorDataset(input_ids_train, attention_masks_train, labels_train) |
|
dataset_val = TensorDataset(input_ids_val, attention_masks_val, labels_val) |
|
|
|
model = BertForSequenceClassification.from_pretrained(config.pretrained_model, |
|
num_labels=3, |
|
output_attentions=False, |
|
output_hidden_states=False) |
|
|
|
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler |
|
|
|
dataloader_train = DataLoader(dataset_train, |
|
sampler=RandomSampler(dataset_train), |
|
batch_size=config.batch_size) |
|
|
|
dataloader_validation = DataLoader(dataset_val, |
|
sampler=SequentialSampler(dataset_val), |
|
batch_size=config.batch_size) |
|
|
|
from transformers import AdamW, get_linear_schedule_with_warmup |
|
|
|
optimizer = AdamW(model.parameters(), |
|
lr=config.lr, |
|
eps=config.eps) |
|
|
|
|
|
scheduler = get_linear_schedule_with_warmup(optimizer, |
|
num_warmup_steps=0, |
|
num_training_steps=len(dataloader_train)*config.epochs) |
|
|
|
from sklearn.metrics import f1_score |
|
|
|
def f1_score_func(preds, labels): |
|
preds_flat = np.argmax(preds, axis=1).flatten() |
|
labels_flat = labels.flatten() |
|
return f1_score(labels_flat, preds_flat, average='weighted') |
|
|
|
def accuracy_per_class(preds, labels, label_dict): |
|
label_dict_inverse = {v: k for k, v in label_dict.items()} |
|
|
|
preds_flat = np.argmax(preds, axis=1).flatten() |
|
labels_flat = labels.flatten() |
|
|
|
for label in np.unique(labels_flat): |
|
y_preds = preds_flat[labels_flat==label] |
|
y_true = labels_flat[labels_flat==label] |
|
print(f'Class: {label_dict_inverse[label]}') |
|
print(f'Accuracy: {len(y_preds[y_preds==label])}/{len(y_true)}\n') |
|
|
|
def evaluate(dataloader_val): |
|
|
|
model.eval() |
|
|
|
loss_val_total = 0 |
|
predictions, true_vals = [], [] |
|
|
|
for batch in dataloader_val: |
|
|
|
batch = tuple(b.to(config.device) for b in batch) |
|
|
|
inputs = {'input_ids': batch[0], |
|
'attention_mask': batch[1], |
|
'labels': batch[2], |
|
} |
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
|
|
loss = outputs[0] |
|
logits = outputs[1] |
|
loss_val_total += loss.item() |
|
|
|
logits = logits.detach().cpu().numpy() |
|
label_ids = inputs['labels'].cpu().numpy() |
|
predictions.append(logits) |
|
true_vals.append(label_ids) |
|
|
|
|
|
loss_val_avg = loss_val_total/len(dataloader_val) |
|
|
|
predictions = np.concatenate(predictions, axis=0) |
|
true_vals = np.concatenate(true_vals, axis=0) |
|
|
|
return loss_val_avg, predictions, true_vals |
|
|
|
config.device |
|
|
|
model.to(config.device) |
|
|
|
for epoch in tqdm(range(1, config.epochs+1)): |
|
|
|
model.train() |
|
|
|
loss_train_total = 0 |
|
|
|
progress_bar = tqdm(dataloader_train, desc='Epoch {:1d}'.format(epoch), leave=False, disable=False) |
|
|
|
for batch in progress_bar: |
|
|
|
model.zero_grad() |
|
|
|
batch = tuple(b.to(config.device) for b in batch) |
|
|
|
inputs = {'input_ids': batch[0], |
|
'attention_mask': batch[1], |
|
'labels': batch[2], |
|
} |
|
|
|
outputs = model(**inputs) |
|
|
|
loss = outputs[0] |
|
loss_train_total += loss.item() |
|
loss.backward() |
|
|
|
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) |
|
|
|
optimizer.step() |
|
scheduler.step() |
|
progress_bar.set_postfix({'training_loss': '{:.3f}'.format(loss.item()/len(batch))}) |
|
|
|
|
|
torch.save(model.state_dict(), f'_BERT_epoch_{epoch}.model') |
|
|
|
tqdm.write(f'\nEpoch {epoch}') |
|
|
|
loss_train_avg = loss_train_total/len(dataloader_train) |
|
tqdm.write(f'Training loss: {loss_train_avg}') |
|
|
|
val_loss, predictions, true_vals = evaluate(dataloader_validation) |
|
val_f1 = f1_score_func(predictions, true_vals) |
|
tqdm.write(f'Validation loss: {val_loss}') |
|
|
|
tqdm.write(f'F1 Score (Weighted): {val_f1}'); |
|
|
|
with Path('params.json').open("w") as f: |
|
json.dump(params, f, ensure_ascii=False, indent=4) |
|
|
|
model.load_state_dict(torch.load(f'./_BERT_epoch_3.model', map_location=torch.device('cpu'))) |
|
|
|
from sklearn.metrics import classification_report |
|
|
|
preds_flat = np.argmax(predictions, axis=1).flatten() |
|
print(classification_report(preds_flat, true_vals)) |
|
|
|
pred_final = [] |
|
|
|
for i, row in tqdm(val_df.iterrows(), total=val_df.shape[0]): |
|
predictions = [] |
|
|
|
review = row["Review"] |
|
encoded_data_test_single = tokenizer.batch_encode_plus( |
|
[review], |
|
add_special_tokens=config.add_special_tokens, |
|
return_attention_mask=config.return_attention_mask, |
|
pad_to_max_length=config.pad_to_max_length, |
|
max_length=config.seq_length, |
|
return_tensors=config.return_tensors |
|
) |
|
input_ids_test = encoded_data_test_single['input_ids'] |
|
attention_masks_test = encoded_data_test_single['attention_mask'] |
|
|
|
|
|
inputs = {'input_ids': input_ids_test.to(device), |
|
'attention_mask':attention_masks_test.to(device), |
|
} |
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
|
|
logits = outputs[0] |
|
logits = logits.detach().cpu().numpy() |
|
predictions.append(logits) |
|
predictions = np.concatenate(predictions, axis=0) |
|
pred_final.append(np.argmax(predictions, axis=1).flatten()[0]) |
|
|
|
val_df["pred"] = pred_final |
|
|
|
control = val_df.pred.values == val_df.label.values |
|
val_df["control"] = control |
|
|
|
val_df = val_df[val_df.control == False] |
|
|
|
|
|
|
|
name2label = {"Negative":0, |
|
"Neutral":1, |
|
"Positive":2 |
|
} |
|
label2name = {v: k for k, v in name2label.items()} |
|
|
|
val_df["pred_name"] = val_df.pred.apply(lambda x: label2name.get(x)) |
|
from sklearn.metrics import confusion_matrix |
|
|
|
|
|
pred_name_values = val_df.pred_name.values |
|
label_values = val_df.label_name.values |
|
confmat = confusion_matrix(label_values, pred_name_values, labels=list(name2label.keys())) |
|
|
|
confmat |
|
|
|
df_confusion_val = pd.crosstab(label_values, pred_name_values) |
|
df_confusion_val |
|
|
|
df_confusion_val.to_csv("val_df_confusion.csv") |
|
|
|
test_df.head() |
|
|
|
encoded_data_test = tokenizer.batch_encode_plus( |
|
test_df.Review.values, |
|
add_special_tokens=config.add_special_tokens, |
|
return_attention_mask=config.return_attention_mask, |
|
pad_to_max_length=config.pad_to_max_length, |
|
max_length=config.seq_length, |
|
return_tensors=config.return_tensors |
|
) |
|
input_ids_test = encoded_data_test['input_ids'] |
|
attention_masks_test = encoded_data_test['attention_mask'] |
|
labels_test = torch.tensor(test_df.label.values) |
|
|
|
model = BertForSequenceClassification.from_pretrained(config.pretrained_model, |
|
num_labels=3, |
|
output_attentions=False, |
|
output_hidden_states=False) |
|
|
|
model.to(config.device) |
|
|
|
model.load_state_dict(torch.load(f'./_BERT_epoch_3.model', map_location=torch.device('cpu'))) |
|
|
|
_, predictions_test, true_vals_test = evaluate(dataloader_validation) |
|
|
|
|
|
def predict_sentiment(text): |
|
|
|
encoded_text = tokenizer.encode_plus( |
|
text, |
|
add_special_tokens=config.add_special_tokens, |
|
return_attention_mask=config.return_attention_mask, |
|
pad_to_max_length=config.pad_to_max_length, |
|
max_length=config.seq_length, |
|
return_tensors=config.return_tensors |
|
) |
|
|
|
|
|
input_ids = encoded_text['input_ids'].to(config.device) |
|
attention_mask = encoded_text['attention_mask'].to(config.device) |
|
|
|
|
|
model.eval() |
|
with torch.no_grad(): |
|
outputs = model(input_ids, attention_mask) |
|
|
|
|
|
logits = outputs[0] |
|
logits = logits.detach().cpu().numpy() |
|
|
|
|
|
pred = np.argmax(logits, axis=1).flatten()[0] |
|
|
|
|
|
pred_name = label2name.get(pred) |
|
|
|
return pred_name |
|
|
|
text = "Your text here" |
|
prediction = predict_sentiment(text) |
|
print(f"The sentiment of the text is: {prediction}") |
|
|
|
from sklearn.metrics import classification_report |
|
|
|
preds_flat_test = np.argmax(predictions_test, axis=1).flatten() |
|
print(classification_report(preds_flat_test, true_vals_test)) |
|
|
|
pred_final = [] |
|
|
|
for i, row in tqdm(test_df.iterrows(), total=test_df.shape[0]): |
|
predictions = [] |
|
|
|
review = row["Review"] |
|
encoded_data_test_single = tokenizer.batch_encode_plus( |
|
[review], |
|
add_special_tokens=config.add_special_tokens, |
|
return_attention_mask=config.return_attention_mask, |
|
pad_to_max_length=config.pad_to_max_length, |
|
max_length=config.seq_length, |
|
return_tensors=config.return_tensors |
|
) |
|
input_ids_test = encoded_data_test_single['input_ids'] |
|
attention_masks_test = encoded_data_test_single['attention_mask'] |
|
|
|
inputs = {'input_ids': input_ids_test.to(device), |
|
'attention_mask':attention_masks_test.to(device), |
|
} |
|
|
|
with torch.no_grad(): |
|
outputs = model(**inputs) |
|
|
|
logits = outputs[0] |
|
logits = logits.detach().cpu().numpy() |
|
predictions.append(logits) |
|
predictions = np.concatenate(predictions, axis=0) |
|
pred_final.append(np.argmax(predictions, axis=1).flatten()[0]) |
|
|
|
|
|
test_df["pred"] = pred_final |
|
|
|
control = test_df.pred.values == test_df.label.values |
|
test_df["control"] = control |
|
|
|
test_df = test_df[test_df.control == False] |
|
test_df["pred_name"] = test_df.pred.apply(lambda x: label2name.get(x)) |
|
|
|
from sklearn.metrics import confusion_matrix |
|
|
|
|
|
pred_name_values = test_df.pred_name.values |
|
label_values = test_df.label_name.values |
|
confmat = confusion_matrix(label_values, pred_name_values, labels=list(name2label.keys())) |
|
confmat |
|
|
|
df_confusion_test = pd.crosstab(label_values, pred_name_values) |
|
df_confusion_test |
|
|
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
|
|
|
|
|
|
fig, ax = plt.subplots(figsize=(10,10)) |
|
sns.heatmap(confmat, annot=True, fmt='d', |
|
xticklabels=name2label.keys(), yticklabels=name2label.keys()) |
|
plt.ylabel('Vraies valeurs') |
|
plt.xlabel('Prédictions') |
|
plt.show() |