Spaces:
Runtime error
Runtime error
| import pickle | |
| from transformers import AutoModel, AutoTokenizer | |
| from tqdm import tqdm | |
| import pandas as pd | |
| import torch | |
| import numpy as np | |
| from pyspark.sql import SparkSession | |
| import time | |
| # Paths to JSON data files | |
| TRAIN_DATA = "data/train_data_162k.json" | |
| TEST_DATA = "data/test_data_162k.json" | |
| VAL_DATA = "data/val_data_162k.json" | |
| # Function to load BERT model and tokenizer | |
| def load_bert(): | |
| v_phobert = AutoModel.from_pretrained("vinai/phobert-base-v2") | |
| v_tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2", use_fast=False) | |
| return v_phobert, v_tokenizer | |
| # Load BERT model and tokenizer | |
| phobert, tokenizer = load_bert() | |
| print("Load model done!") | |
| # Initialize SparkSession | |
| spark = SparkSession.builder \ | |
| .appName("Feature Extraction") \ | |
| .master("local[*]") \ | |
| .config("spark.executor.memory", "50g") \ | |
| .config("spark.executor.instances", "4") \ | |
| .config("spark.executor.cores", "12") \ | |
| .config("spark.memory.offHeap.enabled", True) \ | |
| .config("spark.driver.memory", "50g") \ | |
| .config("spark.memory.offHeap.size", "16g") \ | |
| .config("spark.ui.showConsoleProgress", False) \ | |
| .config("spark.driver.maxResultSize", "16g") \ | |
| .config("spark.log.level", "ERROR") \ | |
| .getOrCreate() | |
| # Load JSON data into Spark DataFrames | |
| train_data = spark.read.json(TRAIN_DATA) | |
| test_data = spark.read.json(TEST_DATA) | |
| val_data = spark.read.json(VAL_DATA) | |
| print("Load data done!") | |
| # Function to extract BERT features from text | |
| def make_bert_features(v_text): | |
| v_tokenized = [] | |
| max_len = 256 # Maximum sequence length | |
| # Use tqdm to display progress | |
| for i_text in v_text: | |
| # Tokenize using BERT tokenizer | |
| line = tokenizer.encode(i_text, truncation=True) | |
| v_tokenized.append(line) | |
| # Pad sequences to ensure consistent length | |
| padded = [] | |
| for i in v_tokenized: | |
| if len(i) < max_len: | |
| padded.append(i + [1] * (max_len - len(i))) # Padding with 1s | |
| else: | |
| padded.append(i[:max_len]) # Truncate if sequence is too long | |
| padded = np.array(padded) | |
| # Create attention mask | |
| attention_mask = np.where(padded == 1, 0, 1) | |
| # Convert to PyTorch tensors | |
| padded = torch.tensor(padded).to(torch.long) | |
| attention_mask = torch.tensor(attention_mask) | |
| # Obtain features from BERT | |
| with torch.no_grad(): | |
| last_hidden_states = phobert(input_ids=padded, attention_mask=attention_mask) | |
| v_features = last_hidden_states[0][:, 0, :].numpy() | |
| print(v_features.shape) | |
| return v_features | |
| # Extract BERT features for train, test, and validation datasets | |
| train_features = train_data.select("processed_content").rdd.map(make_bert_features) | |
| test_features = test_data.select("processed_content").rdd.map(make_bert_features) | |
| val_features = val_data.select("processed_content").rdd.map(make_bert_features) | |
| # Convert category column to lists | |
| category_list_train = train_data.select("category").rdd.flatMap(lambda x: x).collect() | |
| category_list_test = test_data.select("category").rdd.flatMap(lambda x: x).collect() | |
| category_list_val = val_data.select("category").rdd.flatMap(lambda x: x).collect() | |
| # Convert to one-hot encoding using pd.get_dummies() | |
| y_train = pd.get_dummies(category_list_train) | |
| y_test = pd.get_dummies(category_list_test) | |
| y_val = pd.get_dummies(category_list_val) | |
| # Save data to file using pickle | |
| start_time = time.time() | |
| print("Saving to file") | |
| data_dict = { | |
| 'X_train': train_features.collect(), | |
| 'X_test': test_features.collect(), | |
| 'X_val': val_features.collect(), | |
| 'y_train': y_train, | |
| 'y_test': y_test, | |
| 'y_val': y_val | |
| } | |
| # Save dictionary to pickle file | |
| with open('data/features_162k_phobertbase_v2.pkl', 'wb') as f: | |
| pickle.dump(data_dict, f) | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| print(f'Total feature extraction time: {duration:.2f} seconds') | |
| print("Done!") | |