File size: 3,875 Bytes
c2a30b3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
import pickle
from transformers import AutoModel, AutoTokenizer
from tqdm import tqdm
import pandas as pd
import torch
import numpy as np
from pyspark.sql import SparkSession
import time
# Paths to JSON data files
TRAIN_DATA = "data/train_data_162k.json"
TEST_DATA = "data/test_data_162k.json"
VAL_DATA = "data/val_data_162k.json"
# Function to load BERT model and tokenizer
def load_bert():
v_phobert = AutoModel.from_pretrained("vinai/phobert-base-v2")
v_tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2", use_fast=False)
return v_phobert, v_tokenizer
# Load BERT model and tokenizer
phobert, tokenizer = load_bert()
print("Load model done!")
# Initialize SparkSession
spark = SparkSession.builder \
.appName("Feature Extraction") \
.master("local[*]") \
.config("spark.executor.memory", "50g") \
.config("spark.executor.instances", "4") \
.config("spark.executor.cores", "12") \
.config("spark.memory.offHeap.enabled", True) \
.config("spark.driver.memory", "50g") \
.config("spark.memory.offHeap.size", "16g") \
.config("spark.ui.showConsoleProgress", False) \
.config("spark.driver.maxResultSize", "16g") \
.config("spark.log.level", "ERROR") \
.getOrCreate()
# Load JSON data into Spark DataFrames
train_data = spark.read.json(TRAIN_DATA)
test_data = spark.read.json(TEST_DATA)
val_data = spark.read.json(VAL_DATA)
print("Load data done!")
# Function to extract BERT features from text
def make_bert_features(v_text):
v_tokenized = []
max_len = 256 # Maximum sequence length
# Use tqdm to display progress
for i_text in v_text:
# Tokenize using BERT tokenizer
line = tokenizer.encode(i_text, truncation=True)
v_tokenized.append(line)
# Pad sequences to ensure consistent length
padded = []
for i in v_tokenized:
if len(i) < max_len:
padded.append(i + [1] * (max_len - len(i))) # Padding with 1s
else:
padded.append(i[:max_len]) # Truncate if sequence is too long
padded = np.array(padded)
# Create attention mask
attention_mask = np.where(padded == 1, 0, 1)
# Convert to PyTorch tensors
padded = torch.tensor(padded).to(torch.long)
attention_mask = torch.tensor(attention_mask)
# Obtain features from BERT
with torch.no_grad():
last_hidden_states = phobert(input_ids=padded, attention_mask=attention_mask)
v_features = last_hidden_states[0][:, 0, :].numpy()
print(v_features.shape)
return v_features
# Extract BERT features for train, test, and validation datasets
train_features = train_data.select("processed_content").rdd.map(make_bert_features)
test_features = test_data.select("processed_content").rdd.map(make_bert_features)
val_features = val_data.select("processed_content").rdd.map(make_bert_features)
# Convert category column to lists
category_list_train = train_data.select("category").rdd.flatMap(lambda x: x).collect()
category_list_test = test_data.select("category").rdd.flatMap(lambda x: x).collect()
category_list_val = val_data.select("category").rdd.flatMap(lambda x: x).collect()
# Convert to one-hot encoding using pd.get_dummies()
y_train = pd.get_dummies(category_list_train)
y_test = pd.get_dummies(category_list_test)
y_val = pd.get_dummies(category_list_val)
# Save data to file using pickle
start_time = time.time()
print("Saving to file")
data_dict = {
'X_train': train_features.collect(),
'X_test': test_features.collect(),
'X_val': val_features.collect(),
'y_train': y_train,
'y_test': y_test,
'y_val': y_val
}
# Save dictionary to pickle file
with open('data/features_162k_phobertbase_v2.pkl', 'wb') as f:
pickle.dump(data_dict, f)
end_time = time.time()
duration = end_time - start_time
print(f'Total feature extraction time: {duration:.2f} seconds')
print("Done!")
|