Spaces:
Runtime error
Runtime error
File size: 4,449 Bytes
28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c6afbe9 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a 28891a7 c2daf8a c6afbe9 c2daf8a c6afbe9 c2daf8a 28891a7 c6afbe9 c2daf8a 28891a7 c6afbe9 28891a7 c6afbe9 28891a7 c2daf8a 28891a7 c2daf8a c6afbe9 28891a7 c6afbe9 28891a7 c2daf8a c6afbe9 c2daf8a c6afbe9 c2daf8a c6afbe9 28891a7 5286b18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from sklearn.preprocessing import MinMaxScaler
class VAE(nn.Module):
def __init__(self, input_size, latent_dim=32):
super(VAE, self).__init__()
# Encoder
self.encoder = nn.Sequential(
nn.Linear(input_size, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU()
)
self.fc_mu = nn.Linear(64, latent_dim)
self.fc_logvar = nn.Linear(64, latent_dim)
# Decoder
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 64),
nn.ReLU(),
nn.Linear(64, 128),
nn.ReLU(),
nn.Linear(128, 256),
nn.ReLU(),
nn.Linear(256, input_size)
)
def encode(self, x):
h = self.encoder(x)
return self.fc_mu(h), self.fc_logvar(h)
def reparameterize(self, mu, logvar):
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def decode(self, z):
return self.decoder(z)
def forward(self, x):
batch_size, seq_len, _ = x.size()
x = x.view(batch_size * seq_len, -1)
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
decoded = self.decode(z)
return decoded.view(batch_size, seq_len, -1), mu, logvar
def vae_loss(recon_x, x, mu, logvar):
BCE = F.mse_loss(recon_x, x, reduction='sum')
KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return BCE + KLD
def anomaly_detection(X_embeddings, X_posture, X_voice, epochs=200, patience=5):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Normalize posture
scaler_posture = MinMaxScaler()
X_posture_scaled = scaler_posture.fit_transform(X_posture.reshape(-1, 1))
# Process facial embeddings
X_embeddings = torch.FloatTensor(X_embeddings).to(device)
if X_embeddings.dim() == 2:
X_embeddings = X_embeddings.unsqueeze(0)
# Process posture
X_posture_scaled = torch.FloatTensor(X_posture_scaled).to(device)
if X_posture_scaled.dim() == 2:
X_posture_scaled = X_posture_scaled.unsqueeze(0)
# Process voice embeddings
X_voice = torch.FloatTensor(X_voice).to(device)
if X_voice.dim() == 2:
X_voice = X_voice.unsqueeze(0)
model_embeddings = VAE(input_size=X_embeddings.shape[2]).to(device)
model_posture = VAE(input_size=X_posture_scaled.shape[2]).to(device)
model_voice = VAE(input_size=X_voice.shape[2]).to(device)
optimizer_embeddings = optim.Adam(model_embeddings.parameters())
optimizer_posture = optim.Adam(model_posture.parameters())
optimizer_voice = optim.Adam(model_voice.parameters())
# Train models
for epoch in range(int(epochs)): # Ensure epochs is an integer
for model, optimizer, X in [(model_embeddings, optimizer_embeddings, X_embeddings),
(model_posture, optimizer_posture, X_posture_scaled),
(model_voice, optimizer_voice, X_voice)]:
model.train()
optimizer.zero_grad()
recon_batch, mu, logvar = model(X)
loss = vae_loss(recon_batch, X, mu, logvar)
loss.backward()
optimizer.step()
# Compute reconstruction error for embeddings, posture, and voice
model_embeddings.eval()
model_posture.eval()
model_voice.eval()
with torch.no_grad():
recon_embeddings, _, _ = model_embeddings(X_embeddings)
recon_posture, _, _ = model_posture(X_posture_scaled)
recon_voice, _, _ = model_voice(X_voice)
mse_embeddings = F.mse_loss(recon_embeddings, X_embeddings, reduction='none').mean(dim=2).cpu().numpy().squeeze()
mse_posture = F.mse_loss(recon_posture, X_posture_scaled, reduction='none').mean(dim=2).cpu().numpy().squeeze()
mse_voice = F.mse_loss(recon_voice, X_voice, reduction='none').mean(dim=2).cpu().numpy().squeeze()
return mse_embeddings, mse_posture, mse_voice
def determine_anomalies(mse_values, threshold):
mean = np.mean(mse_values)
std = np.std(mse_values)
anomalies = mse_values > (mean + threshold * std)
return anomalies |