File size: 8,820 Bytes
d1daaca 8499e95 d1daaca 8499e95 d1daaca 8499e95 d1daaca 1856939 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# Import libraries
import logging
import re
import pandas as pd
import numpy as np
import tensorflow as tf
import nltk
from Sastrawi.Stemmer.StemmerFactory import StemmerFactory
from transformers import AutoTokenizer, TFBertModel
from tensorflow.keras import backend as K
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Layer
from tensorflow_addons.optimizers import AdamW
import streamlit as st
from nltk.corpus import stopwords
from concurrent.futures import ThreadPoolExecutor
import kagglehub
import os
# Text Processing
nltk.download('punkt')
nltk.download('stopwords')
# Logging configuration
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Load dataset
def load_dataset():
try:
path = kagglehub.dataset_download("dannytheodore/brimo-app-review")
dataset_path = f"{path}/brimo_googleplaystore_review.csv"
return pd.read_csv(dataset_path, index_col=0)
except Exception as e:
logging.error(f"Error loading dataset: {e}")
st.error("Failed to load the dataset.")
return None
# Map the labels to positive, neutral, negative
def map_labels(score):
if score >= 4:
return 2 # Positive
elif score == 3:
return 1 # Neutral
else:
return 0 # Negative
# Preprocess text
def preprocess_text(text, stop_words, stemmer):
try:
text = text.lower()
text = re.sub(r"@[A-Za-z0-9_]+", " ", text) # Remove mentions
text = re.sub(r"#[A-Za-z0-9_]+", " ", text) # Remove hashtags
text = re.sub(r"http\S+", " ", text) # Remove URLs
text = re.sub(r"www.\S+", " ", text) # Remove www URLs
text = re.sub(r"[^A-Za-z\s']", " ", text) # Remove non-letter characters
tokens = text.split()
tokens = [word for word in tokens if word not in stop_words] # Remove stopwords
tokens = [stemmer.stem(word) for word in tokens] # Apply stemming
return ' '.join(tokens)
except Exception as e:
logging.error(f"Error processing text: {text}\n{e}")
return text
# Preprocess and tokenize reviews asynchronously
def preprocess_and_tokenize_reviews(reviews, tokenizer, stop_words, stemmer, max_length=128):
with ThreadPoolExecutor() as executor:
cleaned_reviews = list(executor.map(lambda x: preprocess_text(x, stop_words, stemmer), reviews))
return tokenizer(cleaned_reviews, padding='max_length', truncation=True, max_length=max_length, return_tensors='tf')
# Custom Keras Layer
class BertLayer(Layer):
def __init__(self, base_model, **kwargs):
super(BertLayer, self).__init__(**kwargs)
self.base_model = base_model
def call(self, inputs):
input_ids, attention_mask = inputs
outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
return outputs.last_hidden_state
def get_config(self):
config = super(BertLayer, self).get_config()
config.update({"base_model": self.base_model})
return config
# Add Pooler Layer (from the first [CLS] token)
class PoolerLayer(Layer):
def __init__(self, **kwargs):
super(PoolerLayer, self).__init__(**kwargs)
def call(self, inputs):
cls_token = inputs[:, 0, :] # First token's output (the [CLS] token)
pooled_output = tf.keras.activations.tanh(cls_token) # Apply tanh activation
return pooled_output
# Custom F1 Score Metric
class F1Score(tf.keras.metrics.Metric):
def __init__(self, name="f1_score", **kwargs):
super(F1Score, self).__init__(name=name, **kwargs)
self.true_positives = self.add_weight(name="tp", initializer="zeros")
self.false_positives = self.add_weight(name="fp", initializer="zeros")
self.false_negatives = self.add_weight(name="fn", initializer="zeros")
def update_state(self, y_true, y_pred, sample_weight=None):
y_pred = tf.argmax(y_pred, axis=-1)
y_true = tf.argmax(y_true, axis=-1)
tp = tf.reduce_sum(tf.cast((y_true == y_pred) & (y_true != 0), tf.float32))
fp = tf.reduce_sum(tf.cast((y_true != y_pred) & (y_pred != 0), tf.float32))
fn = tf.reduce_sum(tf.cast((y_true != y_pred) & (y_true != 0), tf.float32))
self.true_positives.assign_add(tp)
self.false_positives.assign_add(fp)
self.false_negatives.assign_add(fn)
def result(self):
precision = self.true_positives / (self.true_positives + self.false_positives + K.epsilon())
recall = self.true_positives / (self.true_positives + self.false_negatives + K.epsilon())
f1 = 2 * (precision * recall) / (precision + recall + K.epsilon())
return f1
def reset_state(self):
self.true_positives.assign(0)
self.false_positives.assign(0)
self.false_negatives.assign(0)
# Load model and tokenizer
def load_model_and_tokenizer():
try:
model_path = 'best_model.h5'
if os.path.exists(model_path):
model = load_model(model_path, custom_objects={'TFBertModel': TFBertModel, 'BertLayer': BertLayer, 'PoolerLayer': PoolerLayer, 'F1Score': F1Score})
else:
st.error("Model file not found. Please check the file path.")
return None, None
except Exception as e:
logging.error(f"Error loading model: {e}")
st.error("Failed to load the model. Please check the model file and try again.")
return None, None
# Recreate the AdamW optimizer
optimizer = AdamW(learning_rate=2e-5, weight_decay=1e-5)
# Recompile the model with the AdamW optimizer
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=[F1Score()])
# Load tokenizer from the tokenizer folder
try:
tokenizer = AutoTokenizer.from_pretrained('indobenchmark/indobert-base-p1')
except Exception as e:
logging.error(f"Error loading tokenizer: {e}")
st.error("Failed to load the tokenizer. Please check the tokenizer files.")
return None, None
return model, tokenizer
# Sentiment mapping
sentiment_map = {0: 'Negative', 1: 'Neutral', 2: 'Positive'}
# Run Streamlit app
def run(model, tokenizer, stop_words, stemmer):
# Add a banner image
banner_image = "https://businessnews.co.id/wp-content/uploads/2021/04/Screenshot_112.jpg"
st.image(banner_image, use_container_width=True)
# Set title and description
st.title('BRImo Sentiment Analysis using IndoBERT')
st.subheader('This application analyzes the sentiment of user-provided reviews.')
# Input form
with st.form(key='review_form'):
review_input = st.text_area("Enter Review: (in Bahasa Indonesia)", height=150)
submit_button = st.form_submit_button("Analyze Sentiment")
if submit_button:
if review_input:
# Preprocess and tokenize the review
tokenized_review = preprocess_and_tokenize_reviews([review_input], tokenizer, stop_words, stemmer)
# Make prediction
if model:
predictions = model.predict({'input_ids': tokenized_review['input_ids'], 'attention_mask': tokenized_review['attention_mask']})
predicted_label = np.argmax(predictions, axis=-1)
sentiment = sentiment_map[predicted_label[0]]
st.write(f"### Predicted Sentiment: {sentiment}")
else:
st.error("Model is not loaded. Please check the model file and try again.")
else:
st.error("Please enter a review to analyze.")
if __name__ == "__main__":
# Load necessary components
df = load_dataset()
model, tokenizer = load_model_and_tokenizer()
if df is not None and model is not None and tokenizer is not None:
# Preprocess dataset and prepare stopwords and stemmer
manual_stopwords = ["di", "ke", "dari", "yang", "dan", "atau", "dengan", "untuk", "ini", "itu", "aja", "saja", "lah", "bri", "brimo", "aplikasi", "rekening", "coba", "yg", "ke", "untuk", "nya", "saya", "dia", "dan", "sangat", "video", "login", "apk", "jadi", "akun", "malah", "uang", "banget", "dalam", "atm", "padahal"]
stop_words = set(stopwords.words('indonesian'))
stop_words.update(manual_stopwords)
factory = StemmerFactory()
stemmer = factory.create_stemmer()
df['label'] = df['score'].apply(map_labels)
run(model, tokenizer, stop_words, stemmer)
else:
if df is None:
logging.error("Failed to load dataset.")
st.error("Failed to load the dataset. Please check the dataset file.")
if model is None or tokenizer is None:
logging.error("Failed to load model or tokenizer.")
st.error("Failed to load the model or tokenizer. Please check the model file.") |