import torch | |
import pandas as pd | |
from sklearn.preprocessing import LabelEncoder | |
from sklearn.model_selection import train_test_split | |
df = pd.read_csv('Emotion_classify_Data.csv') | |
""" | | | |
""" | |
def preprocess_data(df): | |
""" | |
Preprocess the data by renaming columns, removing rows with missing values, and removing extra spaces. | |
""" | |
df = df.rename(columns={'Comment': 'text', 'Emotion': 'label'}) | |
df = df.dropna() | |
df['text'] = df['text'].str.replace('\t', ' ').str.replace(' +', ' ', regex=True).str.strip() | |
df['label'] = df['label'].str.replace('\t', ' ').str.replace(' +', ' ', regex=True).str.strip() | |
return df | |
df = preprocess_data(df) | |
indep = df['text'] | |
dep = df['label'] | |
labelEncoder = LabelEncoder() | |
dep = labelEncoder.fit_transform(dep) | |
# First split: Separate out a training set and a temporary set | |
X_train, X_temp, y_train, y_temp = train_test_split(indep, dep, test_size=0.4, random_state=42) | |
# Second split: Divide the temporary set into validation and test sets | |
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42) | |
import torch | |
import torch.nn as nn | |
class LSTMModel(nn.Module): | |
def __init__(self, max_words, max_len): | |
super(LSTMModel, self).__init__() | |
self.embedding = nn.Embedding(num_embeddings=max_words, embedding_dim=16, max_norm=max_len) | |
self.lstm = nn.LSTM(input_size=16, hidden_size=64, num_layers=1, batch_first=True, dropout=0.1) | |
self.fc = nn.Linear(in_features=64, out_features=3) | |
self.softmax = nn.Softmax(dim=1) | |
def forward(self, x): | |
x = self.embedding(x) | |
x, (hidden, cell) = self.lstm(x) | |
x = x[:, -1, :] # Get the last output of the sequence | |
x = self.fc(x) | |
x = self.softmax(x) | |
return x | |
# Usage | |
max_words = 10000 # Adjust as per your vocabulary size | |
max_len = 100 # Adjust as per your sequence length | |
model = LSTMModel(max_words, max_len) | |
tokenizer = Tokenizer(num_words=max_words, oov_token='<OOV>') | |
tokenizer.fit_on_texts(X_train) | |
X_train_seq = pad_sequences(tokenizer.texts_to_sequences(X_train), maxlen=max_len) | |
X_text_seq = pad_sequences(tokenizer.texts_to_sequences(X_test), maxlen=max_len) | |
import torch | |
from collections import Counter | |
from itertools import chain | |
# Create a vocabulary from the training set | |
def create_vocab(texts, max_words, oov_token='<OOV>'): | |
# Count the words | |
word_counts = Counter(chain.from_iterable([text.split() for text in texts])) | |
# Most common words | |
most_common = word_counts.most_common(max_words - 1) # Reserve one for OOV token | |
# Create the vocabulary | |
vocab = {word: idx + 1 for idx, (word, count) in enumerate(most_common)} | |
vocab[oov_token] = 0 # OOV token | |
return vocab | |
# Convert texts to sequences of indices | |
def texts_to_sequences(texts, vocab): | |
sequences = [] | |
for text in texts: | |
sequence = [vocab.get(word, vocab['<OOV>']) for word in text.split()] | |
sequences.append(sequence) | |
return sequences | |
# Pad sequences to a fixed length | |
def pad_sequences(sequences, maxlen): | |
padded_sequences = torch.zeros((len(sequences), maxlen), dtype=torch.long) | |
for idx, sequence in enumerate(sequences): | |
if len(sequence) > maxlen: | |
sequence = sequence[:maxlen] | |
padded_sequences[idx, :len(sequence)] = torch.tensor(sequence) | |
return padded_sequences | |
# Create the vocabulary | |
vocab = create_vocab(X_train, max_words) | |
# Convert texts to sequences | |
X_train_seq = pad_sequences(texts_to_sequences(X_train, vocab), maxlen=max_len) | |
X_test_seq = pad_sequences(texts_to_sequences(X_test, vocab), maxlen=max_len) | |
import torch | |
import torch.nn as nn | |
from import Dataset, DataLoader | |
# Convert labels to tensors | |
y_train_tensor = torch.tensor(y_train) | |
y_test_tensor = torch.tensor(y_test) | |
num_epochs = 10 | |
# Create a custom dataset | |
class TextDataset(Dataset): | |
def __init__(self, sequences, labels): | |
self.sequences = sequences | |
self.labels = labels | |
def __len__(self): | |
return len(self.sequences) | |
def __getitem__(self, idx): | |
return self.sequences[idx], self.labels[idx] | |
# Create datasets | |
train_dataset = TextDataset(X_train_seq, y_train_tensor) | |
test_dataset = TextDataset(X_test_seq, y_test_tensor) | |
# Create dataloaders | |
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) | |
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False) | |
# Define the model | |
class LSTMModel(nn.Module): | |
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim): | |
super(LSTMModel, self).__init__() | |
self.embedding = nn.Embedding(vocab_size, embedding_dim) | |
self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, dropout=0.1) | |
self.fc = nn.Linear(hidden_dim, output_dim) | |
def forward(self, x): | |
x = self.embedding(x) | |
x, (hidden, cell) = self.lstm(x) | |
x = self.fc(x[:, -1, :]) # Use the last hidden state | |
return x | |
# Instantiate the model | |
model = LSTMModel(max_words, 16, 64, 3) | |
# Loss and optimizer | |
criterion = nn.CrossEntropyLoss() | |
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) | |
# Training loop | |
for epoch in range(num_epochs): | |
for inputs, labels in train_loader: | |
# Forward pass | |
outputs = model(inputs) | |
loss = criterion(outputs, labels) | |
# Backward and optimize | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}') |