File size: 3,210 Bytes
5286b18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.preprocessing import MinMaxScaler


class Autoencoder(nn.Module):
    def __init__(self, input_size):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_size, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
        )
        self.decoder = nn.Sequential(
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, input_size)
        )

    def forward(self, x):
        batch_size, seq_len, _ = x.size()
        x = x.view(batch_size * seq_len, -1)
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded.view(batch_size, seq_len, -1)

def anomaly_detection(X_embeddings, X_posture, epochs=200, patience=5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Normalize posture
    scaler_posture = MinMaxScaler()
    X_posture_scaled = scaler_posture.fit_transform(X_posture.reshape(-1, 1))

    # Process facial embeddings
    X_embeddings = torch.FloatTensor(X_embeddings).to(device)
    if X_embeddings.dim() == 2:
        X_embeddings = X_embeddings.unsqueeze(0)

    # Process posture
    X_posture_scaled = torch.FloatTensor(X_posture_scaled).to(device)
    if X_posture_scaled.dim() == 2:
        X_posture_scaled = X_posture_scaled.unsqueeze(0)

    model_embeddings = Autoencoder(input_size=X_embeddings.shape[2]).to(device)
    model_posture = Autoencoder(input_size=X_posture_scaled.shape[2]).to(device)

    criterion = nn.MSELoss()
    optimizer_embeddings = optim.Adam(model_embeddings.parameters())
    optimizer_posture = optim.Adam(model_posture.parameters())

    # Train models
    for epoch in range(epochs):
        for model, optimizer, X in [(model_embeddings, optimizer_embeddings, X_embeddings),
                                    (model_posture, optimizer_posture, X_posture_scaled)]:
            model.train()
            optimizer.zero_grad()
            output = model(X)
            loss = criterion(output, X)
            loss.backward()
            optimizer.step()

    # Compute MSE for embeddings and posture
    model_embeddings.eval()
    model_posture.eval()
    with torch.no_grad():
        reconstructed_embeddings = model_embeddings(X_embeddings).cpu().numpy()
        reconstructed_posture = model_posture(X_posture_scaled).cpu().numpy()

        mse_embeddings = np.mean(np.power(X_embeddings.cpu().numpy() - reconstructed_embeddings, 2), axis=2).squeeze()
        mse_posture = np.mean(np.power(X_posture_scaled.cpu().numpy() - reconstructed_posture, 2), axis=2).squeeze()

    return mse_embeddings, mse_posture

def determine_anomalies(mse_values, threshold):
    mean = np.mean(mse_values)
    std = np.std(mse_values)
    anomalies = mse_values > (mean + threshold * std)
    return anomalies