import math import pickle import torch import torch.nn as nn from torch.distributions import Normal # import numpy as np # import pandas as pd # from matplotlib import pyplot as plt # from sklearn.preprocessing import StandardScaler,MinMaxScaler # from torch.nn.utils.rnn import pad_sequence from torch.utils.data import Dataset, DataLoader import torch.nn.functional as F class CycleDataset(Dataset): def __init__(self, data, attrib_x, attrib_y, max_len, C_rated, min_val=None, max_val=None, mode='train'): self.data = data self.cycle_indices = data['Cycle_Index'].unique() self.attrib_x = attrib_x self.attrib_y = attrib_y self.C_rated = C_rated self.mode = mode self.max_len = max_len self.data['Current'] /= self.C_rated if mode == 'train': self.min_val = data[attrib_x].values.min(axis=0) self.max_val = data[attrib_x].values.max(axis=0) with open('./para_BNN/min_max_values.pkl', 'wb') as f: pickle.dump((self.min_val, self.max_val), f) else: self.min_val = min_val self.max_val = max_val def get_min_max_values(self): if self.mode != 'train': return None return self.min_val, self.max_val def __len__(self): return len(self.cycle_indices) def __getitem__(self, index): cycle_index = self.cycle_indices[index] cycle_data = self.data[self.data['Cycle_Index'] == cycle_index].copy() # cycle_data['Current'] /= self.C_rated # 提取特征和标签 features = cycle_data[self.attrib_x].values # C_ini = cycle_data[self.attrib_y].values[0] label = cycle_data[self.attrib_y].values[0] # # 标准化特征 features = (features - self.min_val) / (self.max_val - self.min_val) # label = (label - self.y_mean) / self.y_std # features = (features - self.min_val) / self.max_val pad_len = self.max_len - len(features) features = torch.tensor(features, dtype=torch.float32).clone().detach() # 在 features 后面填充固定值 features = torch.cat([features, torch.full((pad_len, features.shape[1]), 0)]) # 转换为张量 # features = torch.tensor(padded_features, dtype=torch.float32) label = torch.tensor(label, dtype=torch.float32) # label = label.view(1,1) return features, label # # def pad_collate(self, batch): # # 填充批次数据,使其长度一致 # features_batch, labels_batch = zip(*batch) # features_batch = pad_sequence(features_batch, batch_first=True) # labels_batch = torch.stack(labels_batch) # # return features_batch, labels_batch class Transformer_FeatureExtractor(nn.Module): def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len): super(Transformer_FeatureExtractor, self).__init__() self.num_layers = num_layers self.hidden_size = hidden_dim self.batch_size = batch_size self.max_seq_len = max_seq_len # self.cls_token = nn.Parameter(torch.randn(self.batch_size, 1, self.hidden_size)) self.embedding = nn.Linear(input_dim, hidden_dim) self.position_encoding = self.create_position_encoding() self.transformer_encoder = nn.TransformerEncoder( nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads,dropout=0), num_layers=num_layers ) def create_position_encoding(self): position_encoding = torch.zeros(self.max_seq_len, self.hidden_size) position = torch.arange(0, self.max_seq_len).unsqueeze(1) div_term = torch.exp(torch.arange(0, self.hidden_size, 2) * (-math.log(10000.0) / self.hidden_size)) position_encoding[:, 0::2] = torch.sin(position * div_term) position_encoding[:, 1::2] = torch.cos(position * div_term) position_encoding = position_encoding.unsqueeze(0) return nn.Parameter(position_encoding, requires_grad=False) def forward(self, x): seq_len = x.shape[1] positions = self.position_encoding[:, :seq_len, :] x = self.embedding(x) x = x + positions # x = torch.cat((x, self.cls_token),dim=1) # x = torch.cat((self.cls_token, x),dim=1) x_layer = self.transformer_encoder(x) feature = torch.mean(x_layer, dim=1) return feature # class BaseVaraitionLayer_(nn.Module): # def __init__(self): # super().__init__() # def kl_div(self, mu_q, sigma_q, mu_p, sigma_p): # ''' # Calculates kl divergence between two guassians (Q || P) # :param mu_q: torch.Tensor -> mu parameter of distribution Q # :param sigma_q: torch.Tensor -> sigma parameter of distribution Q # :param mu_p: float -> mu parameter of distribution P # :param sigma_p: float -> sigma parameter of distribution P # :return: torch.Tensor of shape 0 # ''' # kl = torch.log(sigma_p) - torch.log(sigma_q) # + (sigma_q**2 + (mu_q - mu_p)**2) / (2 * (sigma_p**2)) - 0.5 # return kl.sum() class BayesLinear(nn.Module): def __init__(self, input_dim, output_dim, prior_mu, prior_sigma): super(BayesLinear, self).__init__() self.input_dim = input_dim self.output_dim = output_dim self.prior_mu = prior_mu self.prior_sigma = prior_sigma self.weight_mu = nn.Parameter(torch.Tensor(output_dim, input_dim)) self.weight_rho = nn.Parameter(torch.Tensor(output_dim, input_dim)) self.bias_mu = nn.Parameter(torch.Tensor(output_dim)) self.bias_rho = nn.Parameter(torch.Tensor(output_dim)) self.weight = None self.bias = None self.prior = Normal(prior_mu, prior_sigma) self.reset_parameters() def reset_parameters(self): nn.init.kaiming_uniform_(self.weight_mu, a=math.sqrt(self.input_dim)) nn.init.constant_(self.weight_rho, -3.0) nn.init.zeros_(self.bias_mu) nn.init.constant_(self.bias_rho, -3.0) def forward(self, input): weight_epsilon = torch.randn_like(self.weight_mu) bias_epsilon = torch.randn_like(self.bias_mu) weight_sigma = torch.log1p(torch.exp(self.weight_rho)) bias_sigma = torch.log1p(torch.exp(self.bias_rho)) self.weight = self.weight_mu + weight_sigma * weight_epsilon self.bias = self.bias_mu + bias_sigma * bias_epsilon weight_log_prior = self.prior.log_prob(self.weight) bias_log_prior = self.prior.log_prob(self.bias) self.log_prior = torch.sum(weight_log_prior) + torch.sum(bias_log_prior) self.weight_post = Normal(self.weight_mu.data, torch.log(1 + torch.exp(self.weight_rho))) self.bias_post = Normal(self.bias_mu.data, torch.log(1 + torch.exp(self.bias_rho))) self.log_post = self.weight_post.log_prob(self.weight).sum() + self.bias_post.log_prob(self.bias).sum() # output_mean = torch.matmul(input, weight.t()) + bias # output_var = torch.matmul(input, weight_sigma.t())**2 + bias_sigma**2 # output_mean = nn.functional.linear(input, self.weight_mu, self.bias_mu) # output_variance = nn.functional.linear(input ** 2, weight_sigma ** 2, bias_sigma ** 2) + 1e-8 # return output_mean, output_var return F.linear(input, self.weight, self.bias) class BNN_Regression(nn.Module): def __init__(self, input_dim, output_dim, noise_tol): super(BNN_Regression, self).__init__() self.input_dim = input_dim self.output_dim = output_dim # self.batch_size = batch_size self.noise_tol = noise_tol self.relu = nn.ReLU() self.tanh = nn.Tanh() # self.bnn1 = BayesLinear(input_dim=input_dim, output_dim=64, prior_mu=0, prior_sigma=1.) # self.bnn2 = BayesLinear(input_dim=64, output_dim=32, prior_mu=0, prior_sigma=1.) # self.fc = BayesLinear(input_dim=16, output_dim=output_dim,prior_mu=0, prior_sigma=1.) self.bnn = BayesLinear(input_dim=input_dim, output_dim=16, prior_mu=0, prior_sigma=1.) self.fc = BayesLinear(input_dim=16, output_dim=output_dim, prior_mu=0, prior_sigma=1.) def forward(self, x): x = self.bnn(x) x = self.relu(x) predictions = self.fc(x) # x = self.bnn1(x) # x = self.relu(x) # x = self.bnn2(x) # x = self.tanh(x) # x = self.bnn3(x) # x = self.relu(x) # predictions = self.fc(x) return predictions def log_prior(self): # calculate the log prior over all the layers # return self.bnn1.log_prior + self.bnn2.log_prior + self.bnn3.log_prior + self.fc.log_prior return self.bnn.log_prior + self.fc.log_prior def log_post(self): # calculate the log posterior over all the layers # return self.bnn1.log_post + self.bnn2.log_post + self.bnn3.log_post + self.fc.log_post return self.bnn.log_post + self.fc.log_post def sample_elbo(self, input, target, samples, device): # we calculate the negative elbo, which will be our loss function # initialize tensors outputs = torch.zeros(samples, target.shape[0]).to(device) log_priors = torch.zeros(samples) log_posts = torch.zeros(samples) log_likes = torch.zeros(samples) # make predictions and calculate prior, posterior, and likelihood for a given number of samples # 蒙特卡罗近似 for i in range(samples): outputs[i] = self(input).reshape(-1) # make predictions log_priors[i] = self.log_prior() # get log prior log_posts[i] = self.log_post() # get log variational posterior log_likes[i] = Normal(outputs[i], self.noise_tol).log_prob(target.reshape(-1)).sum() # calculate the log likelihood # calculate monte carlo estimate of prior posterior and likelihood log_prior = log_priors.mean() log_post = log_posts.mean() log_like = log_likes.mean() # calculate the negative elbo (which is our loss function) loss = log_post - log_prior - log_like return loss class ATBNN_Model(nn.Module): def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len): super(ATBNN_Model, self).__init__() self.feature_extractor = Transformer_FeatureExtractor(input_dim=input_dim, output_dim=hidden_dim, hidden_dim=hidden_dim, num_layers=num_layers, num_heads=num_heads, batch_size=batch_size, max_seq_len=max_seq_len) self.bnn_regression = BNN_Regression(input_dim=hidden_dim, output_dim=output_dim, noise_tol=0.01) def forward(self, x): self.features = self.feature_extractor(x) predictions = self.bnn_regression(self.features) return predictions