12_ATBNN_Demo / net_BNN.py
JHao2830's picture
Update net_BNN.py
afb4341
raw
history blame
11.4 kB
import math
import pickle
import torch
import torch.nn as nn
from torch.distributions import Normal
# import numpy as np
# import pandas as pd
# from matplotlib import pyplot as plt
# from sklearn.preprocessing import StandardScaler,MinMaxScaler
# from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
class CycleDataset(Dataset):
def __init__(self, data, attrib_x, attrib_y, max_len, C_rated, min_val=None, max_val=None, mode='train'):
self.data = data
self.cycle_indices = data['Cycle_Index'].unique()
self.attrib_x = attrib_x
self.attrib_y = attrib_y
self.C_rated = C_rated
self.mode = mode
self.max_len = max_len
self.data['Current'] /= self.C_rated
if mode == 'train':
self.min_val = data[attrib_x].values.min(axis=0)
self.max_val = data[attrib_x].values.max(axis=0)
with open('./para_BNN/min_max_values.pkl', 'wb') as f:
pickle.dump((self.min_val, self.max_val), f)
else:
self.min_val = min_val
self.max_val = max_val
def get_min_max_values(self):
if self.mode != 'train':
return None
return self.min_val, self.max_val
def __len__(self):
return len(self.cycle_indices)
def __getitem__(self, index):
cycle_index = self.cycle_indices[index]
cycle_data = self.data[self.data['Cycle_Index'] == cycle_index].copy()
# cycle_data['Current'] /= self.C_rated
# 提取特征和标签
features = cycle_data[self.attrib_x].values
# C_ini = cycle_data[self.attrib_y].values[0]
label = cycle_data[self.attrib_y].values[0]
# # 标准化特征
features = (features - self.min_val) / (self.max_val - self.min_val)
# label = (label - self.y_mean) / self.y_std
# features = (features - self.min_val) / self.max_val
pad_len = self.max_len - len(features)
features = torch.tensor(features, dtype=torch.float32).clone().detach()
# 在 features 后面填充固定值
features = torch.cat([features, torch.full((pad_len, features.shape[1]), 0)])
# 转换为张量
# features = torch.tensor(padded_features, dtype=torch.float32)
label = torch.tensor(label, dtype=torch.float32)
# label = label.view(1,1)
return features, label
#
# def pad_collate(self, batch):
# # 填充批次数据,使其长度一致
# features_batch, labels_batch = zip(*batch)
# features_batch = pad_sequence(features_batch, batch_first=True)
# labels_batch = torch.stack(labels_batch)
#
# return features_batch, labels_batch
class Transformer_FeatureExtractor(nn.Module):
def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len):
super(Transformer_FeatureExtractor, self).__init__()
self.num_layers = num_layers
self.hidden_size = hidden_dim
self.batch_size = batch_size
self.max_seq_len = max_seq_len
# self.cls_token = nn.Parameter(torch.randn(self.batch_size, 1, self.hidden_size))
self.embedding = nn.Linear(input_dim, hidden_dim)
self.position_encoding = self.create_position_encoding()
self.transformer_encoder = nn.TransformerEncoder(
nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads,dropout=0),
num_layers=num_layers
)
def create_position_encoding(self):
position_encoding = torch.zeros(self.max_seq_len, self.hidden_size)
position = torch.arange(0, self.max_seq_len).unsqueeze(1)
div_term = torch.exp(torch.arange(0, self.hidden_size, 2) * (-math.log(10000.0) / self.hidden_size))
position_encoding[:, 0::2] = torch.sin(position * div_term)
position_encoding[:, 1::2] = torch.cos(position * div_term)
position_encoding = position_encoding.unsqueeze(0)
return nn.Parameter(position_encoding, requires_grad=False)
def forward(self, x):
seq_len = x.shape[1]
positions = self.position_encoding[:, :seq_len, :]
x = self.embedding(x)
x = x + positions
# x = torch.cat((x, self.cls_token),dim=1)
# x = torch.cat((self.cls_token, x),dim=1)
x_layer = self.transformer_encoder(x)
feature = torch.mean(x_layer, dim=1)
return feature
# class BaseVaraitionLayer_(nn.Module):
# def __init__(self):
# super().__init__()
# def kl_div(self, mu_q, sigma_q, mu_p, sigma_p):
# '''
# Calculates kl divergence between two guassians (Q || P)
# :param mu_q: torch.Tensor -> mu parameter of distribution Q
# :param sigma_q: torch.Tensor -> sigma parameter of distribution Q
# :param mu_p: float -> mu parameter of distribution P
# :param sigma_p: float -> sigma parameter of distribution P
# :return: torch.Tensor of shape 0
# '''
# kl = torch.log(sigma_p) - torch.log(sigma_q)
# + (sigma_q**2 + (mu_q - mu_p)**2) / (2 * (sigma_p**2)) - 0.5
# return kl.sum()
class BayesLinear(nn.Module):
def __init__(self, input_dim, output_dim, prior_mu, prior_sigma):
super(BayesLinear, self).__init__()
self.input_dim = input_dim
self.output_dim = output_dim
self.prior_mu = prior_mu
self.prior_sigma = prior_sigma
self.weight_mu = nn.Parameter(torch.Tensor(output_dim, input_dim))
self.weight_rho = nn.Parameter(torch.Tensor(output_dim, input_dim))
self.bias_mu = nn.Parameter(torch.Tensor(output_dim))
self.bias_rho = nn.Parameter(torch.Tensor(output_dim))
self.weight = None
self.bias = None
self.prior = Normal(prior_mu, prior_sigma)
self.reset_parameters()
def reset_parameters(self):
nn.init.kaiming_uniform_(self.weight_mu, a=math.sqrt(self.input_dim))
nn.init.constant_(self.weight_rho, -3.0)
nn.init.zeros_(self.bias_mu)
nn.init.constant_(self.bias_rho, -3.0)
def forward(self, input):
weight_epsilon = torch.randn_like(self.weight_mu)
bias_epsilon = torch.randn_like(self.bias_mu)
weight_sigma = torch.log1p(torch.exp(self.weight_rho))
bias_sigma = torch.log1p(torch.exp(self.bias_rho))
self.weight = self.weight_mu + weight_sigma * weight_epsilon
self.bias = self.bias_mu + bias_sigma * bias_epsilon
weight_log_prior = self.prior.log_prob(self.weight)
bias_log_prior = self.prior.log_prob(self.bias)
self.log_prior = torch.sum(weight_log_prior) + torch.sum(bias_log_prior)
self.weight_post = Normal(self.weight_mu.data, torch.log(1 + torch.exp(self.weight_rho)))
self.bias_post = Normal(self.bias_mu.data, torch.log(1 + torch.exp(self.bias_rho)))
self.log_post = self.weight_post.log_prob(self.weight).sum() + self.bias_post.log_prob(self.bias).sum()
# output_mean = torch.matmul(input, weight.t()) + bias
# output_var = torch.matmul(input, weight_sigma.t())**2 + bias_sigma**2
# output_mean = nn.functional.linear(input, self.weight_mu, self.bias_mu)
# output_variance = nn.functional.linear(input ** 2, weight_sigma ** 2, bias_sigma ** 2) + 1e-8
# return output_mean, output_var
return F.linear(input, self.weight, self.bias)
class BNN_Regression(nn.Module):
def __init__(self, input_dim, output_dim, noise_tol):
super(BNN_Regression, self).__init__()
self.input_dim = input_dim
self.output_dim = output_dim
# self.batch_size = batch_size
self.noise_tol = noise_tol
self.relu = nn.ReLU()
self.tanh = nn.Tanh()
# self.bnn1 = BayesLinear(input_dim=input_dim, output_dim=64, prior_mu=0, prior_sigma=1.)
# self.bnn2 = BayesLinear(input_dim=64, output_dim=32, prior_mu=0, prior_sigma=1.)
# self.fc = BayesLinear(input_dim=16, output_dim=output_dim,prior_mu=0, prior_sigma=1.)
self.bnn = BayesLinear(input_dim=input_dim, output_dim=16, prior_mu=0, prior_sigma=1.)
self.fc = BayesLinear(input_dim=16, output_dim=output_dim, prior_mu=0, prior_sigma=1.)
def forward(self, x):
x = self.bnn(x)
x = self.relu(x)
predictions = self.fc(x)
# x = self.bnn1(x)
# x = self.relu(x)
# x = self.bnn2(x)
# x = self.tanh(x)
# x = self.bnn3(x)
# x = self.relu(x)
# predictions = self.fc(x)
return predictions
def log_prior(self):
# calculate the log prior over all the layers
# return self.bnn1.log_prior + self.bnn2.log_prior + self.bnn3.log_prior + self.fc.log_prior
return self.bnn.log_prior + self.fc.log_prior
def log_post(self):
# calculate the log posterior over all the layers
# return self.bnn1.log_post + self.bnn2.log_post + self.bnn3.log_post + self.fc.log_post
return self.bnn.log_post + self.fc.log_post
def sample_elbo(self, input, target, samples, device):
# we calculate the negative elbo, which will be our loss function
# initialize tensors
outputs = torch.zeros(samples, target.shape[0]).to(device)
log_priors = torch.zeros(samples)
log_posts = torch.zeros(samples)
log_likes = torch.zeros(samples)
# make predictions and calculate prior, posterior, and likelihood for a given number of samples
# 蒙特卡罗近似
for i in range(samples):
outputs[i] = self(input).reshape(-1) # make predictions
log_priors[i] = self.log_prior() # get log prior
log_posts[i] = self.log_post() # get log variational posterior
log_likes[i] = Normal(outputs[i], self.noise_tol).log_prob(target.reshape(-1)).sum() # calculate the log likelihood
# calculate monte carlo estimate of prior posterior and likelihood
log_prior = log_priors.mean()
log_post = log_posts.mean()
log_like = log_likes.mean()
# calculate the negative elbo (which is our loss function)
loss = log_post - log_prior - log_like
return loss
class ATBNN_Model(nn.Module):
def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len):
super(ATBNN_Model, self).__init__()
self.feature_extractor = Transformer_FeatureExtractor(input_dim=input_dim,
output_dim=hidden_dim,
hidden_dim=hidden_dim,
num_layers=num_layers,
num_heads=num_heads,
batch_size=batch_size,
max_seq_len=max_seq_len)
self.bnn_regression = BNN_Regression(input_dim=hidden_dim,
output_dim=output_dim,
noise_tol=0.01)
def forward(self, x):
self.features = self.feature_extractor(x)
predictions = self.bnn_regression(self.features)
return predictions