|
import pickle |
|
from transformers import AutoModel, AutoTokenizer |
|
from tqdm import tqdm |
|
import pandas as pd |
|
import torch |
|
import numpy as np |
|
from pyspark.sql import SparkSession |
|
import time |
|
|
|
|
|
TRAIN_DATA = "data/train_data_162k.json" |
|
TEST_DATA = "data/test_data_162k.json" |
|
VAL_DATA = "data/val_data_162k.json" |
|
|
|
|
|
def load_bert(): |
|
v_phobert = AutoModel.from_pretrained("vinai/phobert-base-v2") |
|
v_tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2", use_fast=False) |
|
return v_phobert, v_tokenizer |
|
|
|
|
|
phobert, tokenizer = load_bert() |
|
print("Load model done!") |
|
|
|
|
|
spark = SparkSession.builder \ |
|
.appName("Feature Extraction") \ |
|
.master("local[*]") \ |
|
.config("spark.executor.memory", "50g") \ |
|
.config("spark.executor.instances", "4") \ |
|
.config("spark.executor.cores", "12") \ |
|
.config("spark.memory.offHeap.enabled", True) \ |
|
.config("spark.driver.memory", "50g") \ |
|
.config("spark.memory.offHeap.size", "16g") \ |
|
.config("spark.ui.showConsoleProgress", False) \ |
|
.config("spark.driver.maxResultSize", "16g") \ |
|
.config("spark.log.level", "ERROR") \ |
|
.getOrCreate() |
|
|
|
|
|
train_data = spark.read.json(TRAIN_DATA) |
|
test_data = spark.read.json(TEST_DATA) |
|
val_data = spark.read.json(VAL_DATA) |
|
print("Load data done!") |
|
|
|
|
|
def make_bert_features(v_text): |
|
v_tokenized = [] |
|
max_len = 256 |
|
|
|
|
|
for i_text in v_text: |
|
|
|
line = tokenizer.encode(i_text, truncation=True) |
|
v_tokenized.append(line) |
|
|
|
|
|
padded = [] |
|
for i in v_tokenized: |
|
if len(i) < max_len: |
|
padded.append(i + [1] * (max_len - len(i))) |
|
else: |
|
padded.append(i[:max_len]) |
|
|
|
padded = np.array(padded) |
|
|
|
|
|
attention_mask = np.where(padded == 1, 0, 1) |
|
|
|
|
|
padded = torch.tensor(padded).to(torch.long) |
|
attention_mask = torch.tensor(attention_mask) |
|
|
|
|
|
with torch.no_grad(): |
|
last_hidden_states = phobert(input_ids=padded, attention_mask=attention_mask) |
|
|
|
v_features = last_hidden_states[0][:, 0, :].numpy() |
|
print(v_features.shape) |
|
return v_features |
|
|
|
|
|
train_features = train_data.select("processed_content").rdd.map(make_bert_features) |
|
test_features = test_data.select("processed_content").rdd.map(make_bert_features) |
|
val_features = val_data.select("processed_content").rdd.map(make_bert_features) |
|
|
|
|
|
category_list_train = train_data.select("category").rdd.flatMap(lambda x: x).collect() |
|
category_list_test = test_data.select("category").rdd.flatMap(lambda x: x).collect() |
|
category_list_val = val_data.select("category").rdd.flatMap(lambda x: x).collect() |
|
|
|
|
|
y_train = pd.get_dummies(category_list_train) |
|
y_test = pd.get_dummies(category_list_test) |
|
y_val = pd.get_dummies(category_list_val) |
|
|
|
|
|
start_time = time.time() |
|
print("Saving to file") |
|
data_dict = { |
|
'X_train': train_features.collect(), |
|
'X_test': test_features.collect(), |
|
'X_val': val_features.collect(), |
|
'y_train': y_train, |
|
'y_test': y_test, |
|
'y_val': y_val |
|
} |
|
|
|
|
|
with open('data/features_162k_phobertbase_v2.pkl', 'wb') as f: |
|
pickle.dump(data_dict, f) |
|
|
|
end_time = time.time() |
|
duration = end_time - start_time |
|
print(f'Total feature extraction time: {duration:.2f} seconds') |
|
print("Done!") |
|
|