File size: 3,854 Bytes
28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 5286b18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from sklearn.preprocessing import MinMaxScaler
class VAE(nn.Module):
def __init__(self, input_size, latent_dim=32):
super(VAE, self).__init__()
# Encoder
self.encoder = nn.Sequential(
nn.Linear(input_size, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU()
)
self.fc_mu = nn.Linear(64, latent_dim)
self.fc_logvar = nn.Linear(64, latent_dim)
# Decoder
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 64),
nn.ReLU(),
nn.Linear(64, 128),
nn.ReLU(),
nn.Linear(128, 256),
nn.ReLU(),
nn.Linear(256, input_size)
)
def encode(self, x):
h = self.encoder(x)
return self.fc_mu(h), self.fc_logvar(h)
def reparameterize(self, mu, logvar):
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def decode(self, z):
return self.decoder(z)
def forward(self, x):
batch_size, seq_len, _ = x.size()
x = x.view(batch_size * seq_len, -1)
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
decoded = self.decode(z)
return decoded.view(batch_size, seq_len, -1), mu, logvar
def vae_loss(recon_x, x, mu, logvar):
BCE = F.mse_loss(recon_x, x, reduction='sum')
KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return BCE + KLD
def anomaly_detection(X_embeddings, X_posture, epochs=200, patience=5):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Normalize posture
scaler_posture = MinMaxScaler()
X_posture_scaled = scaler_posture.fit_transform(X_posture.reshape(-1, 1))
# Process facial embeddings
X_embeddings = torch.FloatTensor(X_embeddings).to(device)
if X_embeddings.dim() == 2:
X_embeddings = X_embeddings.unsqueeze(0)
# Process posture
X_posture_scaled = torch.FloatTensor(X_posture_scaled).to(device)
if X_posture_scaled.dim() == 2:
X_posture_scaled = X_posture_scaled.unsqueeze(0)
model_embeddings = VAE(input_size=X_embeddings.shape[2]).to(device)
model_posture = VAE(input_size=X_posture_scaled.shape[2]).to(device)
optimizer_embeddings = optim.Adam(model_embeddings.parameters())
optimizer_posture = optim.Adam(model_posture.parameters())
# Train models
for epoch in range(epochs):
for model, optimizer, X in [(model_embeddings, optimizer_embeddings, X_embeddings),
(model_posture, optimizer_posture, X_posture_scaled)]:
model.train()
optimizer.zero_grad()
recon_batch, mu, logvar = model(X)
loss = vae_loss(recon_batch, X, mu, logvar)
loss.backward()
optimizer.step()
# Compute reconstruction error for embeddings and posture
model_embeddings.eval()
model_posture.eval()
with torch.no_grad():
recon_embeddings, _, _ = model_embeddings(X_embeddings)
recon_posture, _, _ = model_posture(X_posture_scaled)
mse_embeddings = F.mse_loss(recon_embeddings, X_embeddings, reduction='none').mean(dim=2).cpu().numpy().squeeze()
mse_posture = F.mse_loss(recon_posture, X_posture_scaled, reduction='none').mean(dim=2).cpu().numpy().squeeze()
return mse_embeddings, mse_posture
def determine_anomalies(mse_values, threshold):
mean = np.mean(mse_values)
std = np.std(mse_values)
anomalies = mse_values > (mean + threshold * std)
return anomalies |