reab5555 commited on
Commit
c2daf8a
·
verified ·
1 Parent(s): 10f371d

Update anomaly_detection.py

Browse files
Files changed (1) hide show
  1. anomaly_detection.py +52 -29
anomaly_detection.py CHANGED
@@ -1,23 +1,30 @@
1
  import torch
2
  import torch.nn as nn
3
  import torch.optim as optim
 
4
  import numpy as np
5
  from sklearn.preprocessing import MinMaxScaler
6
 
7
- class Autoencoder(nn.Module):
8
- def __init__(self, input_size):
9
- super(Autoencoder, self).__init__()
 
 
10
  self.encoder = nn.Sequential(
11
  nn.Linear(input_size, 256),
12
  nn.ReLU(),
13
  nn.Linear(256, 128),
14
  nn.ReLU(),
15
  nn.Linear(128, 64),
16
- nn.ReLU(),
17
- nn.Linear(64, 32)
18
  )
 
 
 
 
 
19
  self.decoder = nn.Sequential(
20
- nn.Linear(32, 64),
21
  nn.ReLU(),
22
  nn.Linear(64, 128),
23
  nn.ReLU(),
@@ -25,59 +32,75 @@ class Autoencoder(nn.Module):
25
  nn.ReLU(),
26
  nn.Linear(256, input_size)
27
  )
28
-
 
 
 
 
 
 
 
 
 
 
 
 
29
  def forward(self, x):
30
  batch_size, seq_len, _ = x.size()
31
  x = x.view(batch_size * seq_len, -1)
32
- encoded = self.encoder(x)
33
- decoded = self.decoder(encoded)
34
- return decoded.view(batch_size, seq_len, -1)
 
 
 
 
 
 
35
 
36
  def anomaly_detection(X_embeddings, X_posture, epochs=200, patience=5):
37
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
38
-
39
  # Normalize posture
40
  scaler_posture = MinMaxScaler()
41
  X_posture_scaled = scaler_posture.fit_transform(X_posture.reshape(-1, 1))
42
-
43
  # Process facial embeddings
44
  X_embeddings = torch.FloatTensor(X_embeddings).to(device)
45
  if X_embeddings.dim() == 2:
46
  X_embeddings = X_embeddings.unsqueeze(0)
47
-
48
  # Process posture
49
  X_posture_scaled = torch.FloatTensor(X_posture_scaled).to(device)
50
  if X_posture_scaled.dim() == 2:
51
  X_posture_scaled = X_posture_scaled.unsqueeze(0)
52
-
53
- model_embeddings = Autoencoder(input_size=X_embeddings.shape[2]).to(device)
54
- model_posture = Autoencoder(input_size=X_posture_scaled.shape[2]).to(device)
55
-
56
- criterion = nn.MSELoss()
57
  optimizer_embeddings = optim.Adam(model_embeddings.parameters())
58
  optimizer_posture = optim.Adam(model_posture.parameters())
59
-
60
  # Train models
61
  for epoch in range(epochs):
62
  for model, optimizer, X in [(model_embeddings, optimizer_embeddings, X_embeddings),
63
  (model_posture, optimizer_posture, X_posture_scaled)]:
64
  model.train()
65
  optimizer.zero_grad()
66
- output = model(X)
67
- loss = criterion(output, X)
68
  loss.backward()
69
  optimizer.step()
70
-
71
- # Compute MSE for embeddings and posture
72
  model_embeddings.eval()
73
  model_posture.eval()
74
  with torch.no_grad():
75
- reconstructed_embeddings = model_embeddings(X_embeddings).cpu().numpy()
76
- reconstructed_posture = model_posture(X_posture_scaled).cpu().numpy()
77
-
78
- mse_embeddings = np.mean(np.power(X_embeddings.cpu().numpy() - reconstructed_embeddings, 2), axis=2).squeeze()
79
- mse_posture = np.mean(np.power(X_posture_scaled.cpu().numpy() - reconstructed_posture, 2), axis=2).squeeze()
80
-
81
  return mse_embeddings, mse_posture
82
 
83
  def determine_anomalies(mse_values, threshold):
 
1
  import torch
2
  import torch.nn as nn
3
  import torch.optim as optim
4
+ import torch.nn.functional as F
5
  import numpy as np
6
  from sklearn.preprocessing import MinMaxScaler
7
 
8
+ class VAE(nn.Module):
9
+ def __init__(self, input_size, latent_dim=32):
10
+ super(VAE, self).__init__()
11
+
12
+ # Encoder
13
  self.encoder = nn.Sequential(
14
  nn.Linear(input_size, 256),
15
  nn.ReLU(),
16
  nn.Linear(256, 128),
17
  nn.ReLU(),
18
  nn.Linear(128, 64),
19
+ nn.ReLU()
 
20
  )
21
+
22
+ self.fc_mu = nn.Linear(64, latent_dim)
23
+ self.fc_logvar = nn.Linear(64, latent_dim)
24
+
25
+ # Decoder
26
  self.decoder = nn.Sequential(
27
+ nn.Linear(latent_dim, 64),
28
  nn.ReLU(),
29
  nn.Linear(64, 128),
30
  nn.ReLU(),
 
32
  nn.ReLU(),
33
  nn.Linear(256, input_size)
34
  )
35
+
36
+ def encode(self, x):
37
+ h = self.encoder(x)
38
+ return self.fc_mu(h), self.fc_logvar(h)
39
+
40
+ def reparameterize(self, mu, logvar):
41
+ std = torch.exp(0.5 * logvar)
42
+ eps = torch.randn_like(std)
43
+ return mu + eps * std
44
+
45
+ def decode(self, z):
46
+ return self.decoder(z)
47
+
48
  def forward(self, x):
49
  batch_size, seq_len, _ = x.size()
50
  x = x.view(batch_size * seq_len, -1)
51
+ mu, logvar = self.encode(x)
52
+ z = self.reparameterize(mu, logvar)
53
+ decoded = self.decode(z)
54
+ return decoded.view(batch_size, seq_len, -1), mu, logvar
55
+
56
+ def vae_loss(recon_x, x, mu, logvar):
57
+ BCE = F.mse_loss(recon_x, x, reduction='sum')
58
+ KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
59
+ return BCE + KLD
60
 
61
  def anomaly_detection(X_embeddings, X_posture, epochs=200, patience=5):
62
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
63
+
64
  # Normalize posture
65
  scaler_posture = MinMaxScaler()
66
  X_posture_scaled = scaler_posture.fit_transform(X_posture.reshape(-1, 1))
67
+
68
  # Process facial embeddings
69
  X_embeddings = torch.FloatTensor(X_embeddings).to(device)
70
  if X_embeddings.dim() == 2:
71
  X_embeddings = X_embeddings.unsqueeze(0)
72
+
73
  # Process posture
74
  X_posture_scaled = torch.FloatTensor(X_posture_scaled).to(device)
75
  if X_posture_scaled.dim() == 2:
76
  X_posture_scaled = X_posture_scaled.unsqueeze(0)
77
+
78
+ model_embeddings = VAE(input_size=X_embeddings.shape[2]).to(device)
79
+ model_posture = VAE(input_size=X_posture_scaled.shape[2]).to(device)
80
+
 
81
  optimizer_embeddings = optim.Adam(model_embeddings.parameters())
82
  optimizer_posture = optim.Adam(model_posture.parameters())
83
+
84
  # Train models
85
  for epoch in range(epochs):
86
  for model, optimizer, X in [(model_embeddings, optimizer_embeddings, X_embeddings),
87
  (model_posture, optimizer_posture, X_posture_scaled)]:
88
  model.train()
89
  optimizer.zero_grad()
90
+ recon_batch, mu, logvar = model(X)
91
+ loss = vae_loss(recon_batch, X, mu, logvar)
92
  loss.backward()
93
  optimizer.step()
94
+
95
+ # Compute reconstruction error for embeddings and posture
96
  model_embeddings.eval()
97
  model_posture.eval()
98
  with torch.no_grad():
99
+ recon_embeddings, _, _ = model_embeddings(X_embeddings)
100
+ recon_posture, _, _ = model_posture(X_posture_scaled)
101
+ mse_embeddings = F.mse_loss(recon_embeddings, X_embeddings, reduction='none').mean(dim=2).cpu().numpy().squeeze()
102
+ mse_posture = F.mse_loss(recon_posture, X_posture_scaled, reduction='none').mean(dim=2).cpu().numpy().squeeze()
103
+
 
104
  return mse_embeddings, mse_posture
105
 
106
  def determine_anomalies(mse_values, threshold):