|
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
import numpy as np
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
|
|
|
|
class Autoencoder(nn.Module):
|
|
def __init__(self, input_size):
|
|
super(Autoencoder, self).__init__()
|
|
self.encoder = nn.Sequential(
|
|
nn.Linear(input_size, 256),
|
|
nn.ReLU(),
|
|
nn.Linear(256, 128),
|
|
nn.ReLU(),
|
|
nn.Linear(128, 64),
|
|
nn.ReLU(),
|
|
nn.Linear(64, 32)
|
|
)
|
|
self.decoder = nn.Sequential(
|
|
nn.Linear(32, 64),
|
|
nn.ReLU(),
|
|
nn.Linear(64, 128),
|
|
nn.ReLU(),
|
|
nn.Linear(128, 256),
|
|
nn.ReLU(),
|
|
nn.Linear(256, input_size)
|
|
)
|
|
|
|
def forward(self, x):
|
|
batch_size, seq_len, _ = x.size()
|
|
x = x.view(batch_size * seq_len, -1)
|
|
encoded = self.encoder(x)
|
|
decoded = self.decoder(encoded)
|
|
return decoded.view(batch_size, seq_len, -1)
|
|
|
|
def anomaly_detection(X_embeddings, X_posture, epochs=200, patience=5):
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
|
|
|
|
scaler_posture = MinMaxScaler()
|
|
X_posture_scaled = scaler_posture.fit_transform(X_posture.reshape(-1, 1))
|
|
|
|
|
|
X_embeddings = torch.FloatTensor(X_embeddings).to(device)
|
|
if X_embeddings.dim() == 2:
|
|
X_embeddings = X_embeddings.unsqueeze(0)
|
|
|
|
|
|
X_posture_scaled = torch.FloatTensor(X_posture_scaled).to(device)
|
|
if X_posture_scaled.dim() == 2:
|
|
X_posture_scaled = X_posture_scaled.unsqueeze(0)
|
|
|
|
model_embeddings = Autoencoder(input_size=X_embeddings.shape[2]).to(device)
|
|
model_posture = Autoencoder(input_size=X_posture_scaled.shape[2]).to(device)
|
|
|
|
criterion = nn.MSELoss()
|
|
optimizer_embeddings = optim.Adam(model_embeddings.parameters())
|
|
optimizer_posture = optim.Adam(model_posture.parameters())
|
|
|
|
|
|
for epoch in range(epochs):
|
|
for model, optimizer, X in [(model_embeddings, optimizer_embeddings, X_embeddings),
|
|
(model_posture, optimizer_posture, X_posture_scaled)]:
|
|
model.train()
|
|
optimizer.zero_grad()
|
|
output = model(X)
|
|
loss = criterion(output, X)
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
|
|
model_embeddings.eval()
|
|
model_posture.eval()
|
|
with torch.no_grad():
|
|
reconstructed_embeddings = model_embeddings(X_embeddings).cpu().numpy()
|
|
reconstructed_posture = model_posture(X_posture_scaled).cpu().numpy()
|
|
|
|
mse_embeddings = np.mean(np.power(X_embeddings.cpu().numpy() - reconstructed_embeddings, 2), axis=2).squeeze()
|
|
mse_posture = np.mean(np.power(X_posture_scaled.cpu().numpy() - reconstructed_posture, 2), axis=2).squeeze()
|
|
|
|
return mse_embeddings, mse_posture
|
|
|
|
def determine_anomalies(mse_values, threshold):
|
|
mean = np.mean(mse_values)
|
|
std = np.std(mse_values)
|
|
anomalies = mse_values > (mean + threshold * std)
|
|
return anomalies |