File size: 3,854 Bytes
28891a7
 
 
c2daf8a
28891a7
 
 
c2daf8a
 
 
 
 
28891a7
 
 
 
 
 
c2daf8a
28891a7
c2daf8a
 
 
 
 
28891a7
c2daf8a
28891a7
 
 
 
 
 
 
c2daf8a
 
 
 
 
 
 
 
 
 
 
 
 
28891a7
 
 
c2daf8a
 
 
 
 
 
 
 
 
28891a7
 
 
c2daf8a
28891a7
 
 
c2daf8a
28891a7
 
 
 
c2daf8a
28891a7
 
 
 
c2daf8a
 
 
 
28891a7
 
c2daf8a
28891a7
 
 
 
 
 
c2daf8a
 
28891a7
 
c2daf8a
 
28891a7
 
 
c2daf8a
 
 
 
 
28891a7
 
 
 
 
 
5286b18
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from sklearn.preprocessing import MinMaxScaler

class VAE(nn.Module):
    def __init__(self, input_size, latent_dim=32):
        super(VAE, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_size, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU()
        )
        
        self.fc_mu = nn.Linear(64, latent_dim)
        self.fc_logvar = nn.Linear(64, latent_dim)
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, input_size)
        )
        
    def encode(self, x):
        h = self.encoder(x)
        return self.fc_mu(h), self.fc_logvar(h)
    
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decode(self, z):
        return self.decoder(z)
    
    def forward(self, x):
        batch_size, seq_len, _ = x.size()
        x = x.view(batch_size * seq_len, -1)
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        decoded = self.decode(z)
        return decoded.view(batch_size, seq_len, -1), mu, logvar

def vae_loss(recon_x, x, mu, logvar):
    BCE = F.mse_loss(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD

def anomaly_detection(X_embeddings, X_posture, epochs=200, patience=5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Normalize posture
    scaler_posture = MinMaxScaler()
    X_posture_scaled = scaler_posture.fit_transform(X_posture.reshape(-1, 1))
    
    # Process facial embeddings
    X_embeddings = torch.FloatTensor(X_embeddings).to(device)
    if X_embeddings.dim() == 2:
        X_embeddings = X_embeddings.unsqueeze(0)
    
    # Process posture
    X_posture_scaled = torch.FloatTensor(X_posture_scaled).to(device)
    if X_posture_scaled.dim() == 2:
        X_posture_scaled = X_posture_scaled.unsqueeze(0)
    
    model_embeddings = VAE(input_size=X_embeddings.shape[2]).to(device)
    model_posture = VAE(input_size=X_posture_scaled.shape[2]).to(device)
    
    optimizer_embeddings = optim.Adam(model_embeddings.parameters())
    optimizer_posture = optim.Adam(model_posture.parameters())
    
    # Train models
    for epoch in range(epochs):
        for model, optimizer, X in [(model_embeddings, optimizer_embeddings, X_embeddings),
                                    (model_posture, optimizer_posture, X_posture_scaled)]:
            model.train()
            optimizer.zero_grad()
            recon_batch, mu, logvar = model(X)
            loss = vae_loss(recon_batch, X, mu, logvar)
            loss.backward()
            optimizer.step()
    
    # Compute reconstruction error for embeddings and posture
    model_embeddings.eval()
    model_posture.eval()
    with torch.no_grad():
        recon_embeddings, _, _ = model_embeddings(X_embeddings)
        recon_posture, _, _ = model_posture(X_posture_scaled)
        mse_embeddings = F.mse_loss(recon_embeddings, X_embeddings, reduction='none').mean(dim=2).cpu().numpy().squeeze()
        mse_posture = F.mse_loss(recon_posture, X_posture_scaled, reduction='none').mean(dim=2).cpu().numpy().squeeze()
    
    return mse_embeddings, mse_posture

def determine_anomalies(mse_values, threshold):
    mean = np.mean(mse_values)
    std = np.std(mse_values)
    anomalies = mse_values > (mean + threshold * std)
    return anomalies