import math import pickle import torch import torch.nn as nn from torch.distributions import Normal import numpy as np import pandas as pd from matplotlib import pyplot as plt from sklearn.preprocessing import StandardScaler,MinMaxScaler from torch.nn.utils.rnn import pad_sequence from torch.utils.data import Dataset, DataLoader import torch.nn.functional as F class CycleDataset(Dataset): def __init__(self, data, attrib_x, attrib_y, max_len, C_rated, min_val=None, max_val=None, mode='train'): self.data = data self.cycle_indices = data['Cycle_Index'].unique() self.attrib_x = attrib_x self.attrib_y = attrib_y self.C_rated = C_rated self.mode = mode self.max_len = max_len self.data['Current'] /= self.C_rated if mode == 'train': self.min_val = data[attrib_x].values.min(axis=0) self.max_val = data[attrib_x].values.max(axis=0) with open('./para_BNN/min_max_values.pkl', 'wb') as f: pickle.dump((self.min_val, self.max_val), f) else: self.min_val = min_val self.max_val = max_val def get_min_max_values(self): if self.mode != 'train': return None return self.min_val, self.max_val def __len__(self): return len(self.cycle_indices) def __getitem__(self, index): cycle_index = self.cycle_indices[index] cycle_data = self.data[self.data['Cycle_Index'] == cycle_index].copy() # cycle_data['Current'] /= self.C_rated # 提取特征和标签 features = cycle_data[self.attrib_x].values # C_ini = cycle_data[self.attrib_y].values[0] label = cycle_data[self.attrib_y].values[0] # # 标准化特征 features = (features - self.min_val) / (self.max_val - self.min_val) # label = (label - self.y_mean) / self.y_std # features = (features - self.min_val) / self.max_val pad_len = self.max_len - len(features) features = torch.tensor(features, dtype=torch.float32).clone().detach() # 在 features 后面填充固定值 features = torch.cat([features, torch.full((pad_len, features.shape[1]), 0)]) # 转换为张量 # features = torch.tensor(padded_features, dtype=torch.float32) label = torch.tensor(label, dtype=torch.float32) # label = label.view(1,1) return features, label # # def pad_collate(self, batch): # # 填充批次数据,使其长度一致 # features_batch, labels_batch = zip(*batch) # features_batch = pad_sequence(features_batch, batch_first=True) # labels_batch = torch.stack(labels_batch) # # return features_batch, labels_batch class Transformer_FeatureExtractor(nn.Module): def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len): super(Transformer_FeatureExtractor, self).__init__() self.num_layers = num_layers self.hidden_size = hidden_dim self.batch_size = batch_size self.max_seq_len = max_seq_len # self.cls_token = nn.Parameter(torch.randn(self.batch_size, 1, self.hidden_size)) self.embedding = nn.Linear(input_dim, hidden_dim) self.position_encoding = self.create_position_encoding() self.transformer_encoder = nn.TransformerEncoder( nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads,dropout=0), num_layers=num_layers ) def create_position_encoding(self): position_encoding = torch.zeros(self.max_seq_len, self.hidden_size) position = torch.arange(0, self.max_seq_len).unsqueeze(1) div_term = torch.exp(torch.arange(0, self.hidden_size, 2) * (-math.log(10000.0) / self.hidden_size)) position_encoding[:, 0::2] = torch.sin(position * div_term) position_encoding[:, 1::2] = torch.cos(position * div_term) position_encoding = position_encoding.unsqueeze(0) return nn.Parameter(position_encoding, requires_grad=False) def forward(self, x): seq_len = x.shape[1] positions = self.position_encoding[:, :seq_len, :] x = self.embedding(x) x = x + positions # x = torch.cat((x, self.cls_token),dim=1) # x = torch.cat((self.cls_token, x),dim=1) x_layer = self.transformer_encoder(x) feature = torch.mean(x_layer, dim=1) return feature # class BaseVaraitionLayer_(nn.Module): # def __init__(self): # super().__init__() # def kl_div(self, mu_q, sigma_q, mu_p, sigma_p): # ''' # Calculates kl divergence between two guassians (Q || P) # :param mu_q: torch.Tensor -> mu parameter of distribution Q # :param sigma_q: torch.Tensor -> sigma parameter of distribution Q # :param mu_p: float -> mu parameter of distribution P # :param sigma_p: float -> sigma parameter of distribution P # :return: torch.Tensor of shape 0 # ''' # kl = torch.log(sigma_p) - torch.log(sigma_q) # + (sigma_q**2 + (mu_q - mu_p)**2) / (2 * (sigma_p**2)) - 0.5 # return kl.sum() class BayesLinear(nn.Module): def __init__(self, input_dim, output_dim, prior_mu, prior_sigma): super(BayesLinear, self).__init__() self.input_dim = input_dim self.output_dim = output_dim self.prior_mu = prior_mu self.prior_sigma = prior_sigma self.weight_mu = nn.Parameter(torch.Tensor(output_dim, input_dim)) self.weight_rho = nn.Parameter(torch.Tensor(output_dim, input_dim)) self.bias_mu = nn.Parameter(torch.Tensor(output_dim)) self.bias_rho = nn.Parameter(torch.Tensor(output_dim)) self.weight = None self.bias = None self.prior = Normal(prior_mu, prior_sigma) self.reset_parameters() def reset_parameters(self): nn.init.kaiming_uniform_(self.weight_mu, a=math.sqrt(self.input_dim)) nn.init.constant_(self.weight_rho, -3.0) nn.init.zeros_(self.bias_mu) nn.init.constant_(self.bias_rho, -3.0) def forward(self, input): weight_epsilon = torch.randn_like(self.weight_mu) bias_epsilon = torch.randn_like(self.bias_mu) weight_sigma = torch.log1p(torch.exp(self.weight_rho)) bias_sigma = torch.log1p(torch.exp(self.bias_rho)) self.weight = self.weight_mu + weight_sigma * weight_epsilon self.bias = self.bias_mu + bias_sigma * bias_epsilon weight_log_prior = self.prior.log_prob(self.weight) bias_log_prior = self.prior.log_prob(self.bias) self.log_prior = torch.sum(weight_log_prior) + torch.sum(bias_log_prior) self.weight_post = Normal(self.weight_mu.data, torch.log(1 + torch.exp(self.weight_rho))) self.bias_post = Normal(self.bias_mu.data, torch.log(1 + torch.exp(self.bias_rho))) self.log_post = self.weight_post.log_prob(self.weight).sum() + self.bias_post.log_prob(self.bias).sum() # output_mean = torch.matmul(input, weight.t()) + bias # output_var = torch.matmul(input, weight_sigma.t())**2 + bias_sigma**2 # output_mean = nn.functional.linear(input, self.weight_mu, self.bias_mu) # output_variance = nn.functional.linear(input ** 2, weight_sigma ** 2, bias_sigma ** 2) + 1e-8 # return output_mean, output_var return F.linear(input, self.weight, self.bias) class BNN_Regression(nn.Module): def __init__(self, input_dim, output_dim, noise_tol): super(BNN_Regression, self).__init__() self.input_dim = input_dim self.output_dim = output_dim # self.batch_size = batch_size self.noise_tol = noise_tol self.relu = nn.ReLU() self.tanh = nn.Tanh() # self.bnn1 = BayesLinear(input_dim=input_dim, output_dim=64, prior_mu=0, prior_sigma=1.) # self.bnn2 = BayesLinear(input_dim=64, output_dim=32, prior_mu=0, prior_sigma=1.) # self.fc = BayesLinear(input_dim=16, output_dim=output_dim,prior_mu=0, prior_sigma=1.) self.bnn = BayesLinear(input_dim=input_dim, output_dim=16, prior_mu=0, prior_sigma=1.) self.fc = BayesLinear(input_dim=16, output_dim=output_dim, prior_mu=0, prior_sigma=1.) def forward(self, x): x = self.bnn(x) x = self.relu(x) predictions = self.fc(x) # x = self.bnn1(x) # x = self.relu(x) # x = self.bnn2(x) # x = self.tanh(x) # x = self.bnn3(x) # x = self.relu(x) # predictions = self.fc(x) return predictions def log_prior(self): # calculate the log prior over all the layers # return self.bnn1.log_prior + self.bnn2.log_prior + self.bnn3.log_prior + self.fc.log_prior return self.bnn.log_prior + self.fc.log_prior def log_post(self): # calculate the log posterior over all the layers # return self.bnn1.log_post + self.bnn2.log_post + self.bnn3.log_post + self.fc.log_post return self.bnn.log_post + self.fc.log_post def sample_elbo(self, input, target, samples, device): # we calculate the negative elbo, which will be our loss function # initialize tensors outputs = torch.zeros(samples, target.shape[0]).to(device) log_priors = torch.zeros(samples) log_posts = torch.zeros(samples) log_likes = torch.zeros(samples) # make predictions and calculate prior, posterior, and likelihood for a given number of samples # 蒙特卡罗近似 for i in range(samples): outputs[i] = self(input).reshape(-1) # make predictions log_priors[i] = self.log_prior() # get log prior log_posts[i] = self.log_post() # get log variational posterior log_likes[i] = Normal(outputs[i], self.noise_tol).log_prob(target.reshape(-1)).sum() # calculate the log likelihood # calculate monte carlo estimate of prior posterior and likelihood log_prior = log_priors.mean() log_post = log_posts.mean() log_like = log_likes.mean() # calculate the negative elbo (which is our loss function) loss = log_post - log_prior - log_like return loss class ATBNN_Model(nn.Module): def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len): super(ATBNN_Model, self).__init__() self.feature_extractor = Transformer_FeatureExtractor(input_dim=input_dim, output_dim=hidden_dim, hidden_dim=hidden_dim, num_layers=num_layers, num_heads=num_heads, batch_size=batch_size, max_seq_len=max_seq_len) self.bnn_regression = BNN_Regression(input_dim=hidden_dim, output_dim=output_dim, noise_tol=0.01) def forward(self, x): self.features = self.feature_extractor(x) predictions = self.bnn_regression(self.features) return predictions