12_ATBNN_Demo / net_BNN.py
JHao2830's picture
Upload 9 files
556849d
raw
history blame
11.4 kB
import math
import pickle
import torch
import torch.nn as nn
from torch.distributions import Normal
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sklearn.preprocessing import StandardScaler,MinMaxScaler
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
class CycleDataset(Dataset):
def __init__(self, data, attrib_x, attrib_y, max_len, C_rated, min_val=None, max_val=None, mode='train'):
self.data = data
self.cycle_indices = data['Cycle_Index'].unique()
self.attrib_x = attrib_x
self.attrib_y = attrib_y
self.C_rated = C_rated
self.mode = mode
self.max_len = max_len
self.data['Current'] /= self.C_rated
if mode == 'train':
self.min_val = data[attrib_x].values.min(axis=0)
self.max_val = data[attrib_x].values.max(axis=0)
with open('./para_BNN/min_max_values.pkl', 'wb') as f:
pickle.dump((self.min_val, self.max_val), f)
else:
self.min_val = min_val
self.max_val = max_val
def get_min_max_values(self):
if self.mode != 'train':
return None
return self.min_val, self.max_val
def __len__(self):
return len(self.cycle_indices)
def __getitem__(self, index):
cycle_index = self.cycle_indices[index]
cycle_data = self.data[self.data['Cycle_Index'] == cycle_index].copy()
# cycle_data['Current'] /= self.C_rated
# 提取特征和标签
features = cycle_data[self.attrib_x].values
# C_ini = cycle_data[self.attrib_y].values[0]
label = cycle_data[self.attrib_y].values[0]
# # 标准化特征
features = (features - self.min_val) / (self.max_val - self.min_val)
# label = (label - self.y_mean) / self.y_std
# features = (features - self.min_val) / self.max_val
pad_len = self.max_len - len(features)
features = torch.tensor(features, dtype=torch.float32).clone().detach()
# 在 features 后面填充固定值
features = torch.cat([features, torch.full((pad_len, features.shape[1]), 0)])
# 转换为张量
# features = torch.tensor(padded_features, dtype=torch.float32)
label = torch.tensor(label, dtype=torch.float32)
# label = label.view(1,1)
return features, label
#
# def pad_collate(self, batch):
# # 填充批次数据,使其长度一致
# features_batch, labels_batch = zip(*batch)
# features_batch = pad_sequence(features_batch, batch_first=True)
# labels_batch = torch.stack(labels_batch)
#
# return features_batch, labels_batch
class Transformer_FeatureExtractor(nn.Module):
def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len):
super(Transformer_FeatureExtractor, self).__init__()
self.num_layers = num_layers
self.hidden_size = hidden_dim
self.batch_size = batch_size
self.max_seq_len = max_seq_len
# self.cls_token = nn.Parameter(torch.randn(self.batch_size, 1, self.hidden_size))
self.embedding = nn.Linear(input_dim, hidden_dim)
self.position_encoding = self.create_position_encoding()
self.transformer_encoder = nn.TransformerEncoder(
nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads,dropout=0),
num_layers=num_layers
)
def create_position_encoding(self):
position_encoding = torch.zeros(self.max_seq_len, self.hidden_size)
position = torch.arange(0, self.max_seq_len).unsqueeze(1)
div_term = torch.exp(torch.arange(0, self.hidden_size, 2) * (-math.log(10000.0) / self.hidden_size))
position_encoding[:, 0::2] = torch.sin(position * div_term)
position_encoding[:, 1::2] = torch.cos(position * div_term)
position_encoding = position_encoding.unsqueeze(0)
return nn.Parameter(position_encoding, requires_grad=False)
def forward(self, x):
seq_len = x.shape[1]
positions = self.position_encoding[:, :seq_len, :]
x = self.embedding(x)
x = x + positions
# x = torch.cat((x, self.cls_token),dim=1)
# x = torch.cat((self.cls_token, x),dim=1)
x_layer = self.transformer_encoder(x)
feature = torch.mean(x_layer, dim=1)
return feature
# class BaseVaraitionLayer_(nn.Module):
# def __init__(self):
# super().__init__()
# def kl_div(self, mu_q, sigma_q, mu_p, sigma_p):
# '''
# Calculates kl divergence between two guassians (Q || P)
# :param mu_q: torch.Tensor -> mu parameter of distribution Q
# :param sigma_q: torch.Tensor -> sigma parameter of distribution Q
# :param mu_p: float -> mu parameter of distribution P
# :param sigma_p: float -> sigma parameter of distribution P
# :return: torch.Tensor of shape 0
# '''
# kl = torch.log(sigma_p) - torch.log(sigma_q)
# + (sigma_q**2 + (mu_q - mu_p)**2) / (2 * (sigma_p**2)) - 0.5
# return kl.sum()
class BayesLinear(nn.Module):
def __init__(self, input_dim, output_dim, prior_mu, prior_sigma):
super(BayesLinear, self).__init__()
self.input_dim = input_dim
self.output_dim = output_dim
self.prior_mu = prior_mu
self.prior_sigma = prior_sigma
self.weight_mu = nn.Parameter(torch.Tensor(output_dim, input_dim))
self.weight_rho = nn.Parameter(torch.Tensor(output_dim, input_dim))
self.bias_mu = nn.Parameter(torch.Tensor(output_dim))
self.bias_rho = nn.Parameter(torch.Tensor(output_dim))
self.weight = None
self.bias = None
self.prior = Normal(prior_mu, prior_sigma)
self.reset_parameters()
def reset_parameters(self):
nn.init.kaiming_uniform_(self.weight_mu, a=math.sqrt(self.input_dim))
nn.init.constant_(self.weight_rho, -3.0)
nn.init.zeros_(self.bias_mu)
nn.init.constant_(self.bias_rho, -3.0)
def forward(self, input):
weight_epsilon = torch.randn_like(self.weight_mu)
bias_epsilon = torch.randn_like(self.bias_mu)
weight_sigma = torch.log1p(torch.exp(self.weight_rho))
bias_sigma = torch.log1p(torch.exp(self.bias_rho))
self.weight = self.weight_mu + weight_sigma * weight_epsilon
self.bias = self.bias_mu + bias_sigma * bias_epsilon
weight_log_prior = self.prior.log_prob(self.weight)
bias_log_prior = self.prior.log_prob(self.bias)
self.log_prior = torch.sum(weight_log_prior) + torch.sum(bias_log_prior)
self.weight_post = Normal(self.weight_mu.data, torch.log(1 + torch.exp(self.weight_rho)))
self.bias_post = Normal(self.bias_mu.data, torch.log(1 + torch.exp(self.bias_rho)))
self.log_post = self.weight_post.log_prob(self.weight).sum() + self.bias_post.log_prob(self.bias).sum()
# output_mean = torch.matmul(input, weight.t()) + bias
# output_var = torch.matmul(input, weight_sigma.t())**2 + bias_sigma**2
# output_mean = nn.functional.linear(input, self.weight_mu, self.bias_mu)
# output_variance = nn.functional.linear(input ** 2, weight_sigma ** 2, bias_sigma ** 2) + 1e-8
# return output_mean, output_var
return F.linear(input, self.weight, self.bias)
class BNN_Regression(nn.Module):
def __init__(self, input_dim, output_dim, noise_tol):
super(BNN_Regression, self).__init__()
self.input_dim = input_dim
self.output_dim = output_dim
# self.batch_size = batch_size
self.noise_tol = noise_tol
self.relu = nn.ReLU()
self.tanh = nn.Tanh()
# self.bnn1 = BayesLinear(input_dim=input_dim, output_dim=64, prior_mu=0, prior_sigma=1.)
# self.bnn2 = BayesLinear(input_dim=64, output_dim=32, prior_mu=0, prior_sigma=1.)
# self.fc = BayesLinear(input_dim=16, output_dim=output_dim,prior_mu=0, prior_sigma=1.)
self.bnn = BayesLinear(input_dim=input_dim, output_dim=16, prior_mu=0, prior_sigma=1.)
self.fc = BayesLinear(input_dim=16, output_dim=output_dim, prior_mu=0, prior_sigma=1.)
def forward(self, x):
x = self.bnn(x)
x = self.relu(x)
predictions = self.fc(x)
# x = self.bnn1(x)
# x = self.relu(x)
# x = self.bnn2(x)
# x = self.tanh(x)
# x = self.bnn3(x)
# x = self.relu(x)
# predictions = self.fc(x)
return predictions
def log_prior(self):
# calculate the log prior over all the layers
# return self.bnn1.log_prior + self.bnn2.log_prior + self.bnn3.log_prior + self.fc.log_prior
return self.bnn.log_prior + self.fc.log_prior
def log_post(self):
# calculate the log posterior over all the layers
# return self.bnn1.log_post + self.bnn2.log_post + self.bnn3.log_post + self.fc.log_post
return self.bnn.log_post + self.fc.log_post
def sample_elbo(self, input, target, samples, device):
# we calculate the negative elbo, which will be our loss function
# initialize tensors
outputs = torch.zeros(samples, target.shape[0]).to(device)
log_priors = torch.zeros(samples)
log_posts = torch.zeros(samples)
log_likes = torch.zeros(samples)
# make predictions and calculate prior, posterior, and likelihood for a given number of samples
# 蒙特卡罗近似
for i in range(samples):
outputs[i] = self(input).reshape(-1) # make predictions
log_priors[i] = self.log_prior() # get log prior
log_posts[i] = self.log_post() # get log variational posterior
log_likes[i] = Normal(outputs[i], self.noise_tol).log_prob(target.reshape(-1)).sum() # calculate the log likelihood
# calculate monte carlo estimate of prior posterior and likelihood
log_prior = log_priors.mean()
log_post = log_posts.mean()
log_like = log_likes.mean()
# calculate the negative elbo (which is our loss function)
loss = log_post - log_prior - log_like
return loss
class ATBNN_Model(nn.Module):
def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len):
super(ATBNN_Model, self).__init__()
self.feature_extractor = Transformer_FeatureExtractor(input_dim=input_dim,
output_dim=hidden_dim,
hidden_dim=hidden_dim,
num_layers=num_layers,
num_heads=num_heads,
batch_size=batch_size,
max_seq_len=max_seq_len)
self.bnn_regression = BNN_Regression(input_dim=hidden_dim,
output_dim=output_dim,
noise_tol=0.01)
def forward(self, x):
self.features = self.feature_extractor(x)
predictions = self.bnn_regression(self.features)
return predictions