import torch import torch.nn as nn import torch.optim as optim import numpy as np from sklearn.preprocessing import MinMaxScaler @spaces.GPU(duration=300) class Autoencoder(nn.Module): def __init__(self, input_size): super(Autoencoder, self).__init__() self.encoder = nn.Sequential( nn.Linear(input_size, 256), nn.ReLU(), nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, 32) ) self.decoder = nn.Sequential( nn.Linear(32, 64), nn.ReLU(), nn.Linear(64, 128), nn.ReLU(), nn.Linear(128, 256), nn.ReLU(), nn.Linear(256, input_size) ) def forward(self, x): batch_size, seq_len, _ = x.size() x = x.view(batch_size * seq_len, -1) encoded = self.encoder(x) decoded = self.decoder(encoded) return decoded.view(batch_size, seq_len, -1) def anomaly_detection(X_embeddings, X_posture, epochs=200, patience=5): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Normalize posture scaler_posture = MinMaxScaler() X_posture_scaled = scaler_posture.fit_transform(X_posture.reshape(-1, 1)) # Process facial embeddings X_embeddings = torch.FloatTensor(X_embeddings).to(device) if X_embeddings.dim() == 2: X_embeddings = X_embeddings.unsqueeze(0) # Process posture X_posture_scaled = torch.FloatTensor(X_posture_scaled).to(device) if X_posture_scaled.dim() == 2: X_posture_scaled = X_posture_scaled.unsqueeze(0) model_embeddings = Autoencoder(input_size=X_embeddings.shape[2]).to(device) model_posture = Autoencoder(input_size=X_posture_scaled.shape[2]).to(device) criterion = nn.MSELoss() optimizer_embeddings = optim.Adam(model_embeddings.parameters()) optimizer_posture = optim.Adam(model_posture.parameters()) # Train models for epoch in range(epochs): for model, optimizer, X in [(model_embeddings, optimizer_embeddings, X_embeddings), (model_posture, optimizer_posture, X_posture_scaled)]: model.train() optimizer.zero_grad() output = model(X) loss = criterion(output, X) loss.backward() optimizer.step() # Compute MSE for embeddings and posture model_embeddings.eval() model_posture.eval() with torch.no_grad(): reconstructed_embeddings = model_embeddings(X_embeddings).cpu().numpy() reconstructed_posture = model_posture(X_posture_scaled).cpu().numpy() mse_embeddings = np.mean(np.power(X_embeddings.cpu().numpy() - reconstructed_embeddings, 2), axis=2).squeeze() mse_posture = np.mean(np.power(X_posture_scaled.cpu().numpy() - reconstructed_posture, 2), axis=2).squeeze() return mse_embeddings, mse_posture def determine_anomalies(mse_values, threshold): mean = np.mean(mse_values) std = np.std(mse_values) anomalies = mse_values > (mean + threshold * std) return anomalies