Spaces:
Sleeping
Sleeping
import math | |
import pickle | |
import torch | |
import torch.nn as nn | |
from torch.distributions import Normal | |
import numpy as np | |
import pandas as pd | |
from matplotlib import pyplot as plt | |
from sklearn.preprocessing import StandardScaler,MinMaxScaler | |
from torch.nn.utils.rnn import pad_sequence | |
from torch.utils.data import Dataset, DataLoader | |
import torch.nn.functional as F | |
class CycleDataset(Dataset): | |
def __init__(self, data, attrib_x, attrib_y, max_len, C_rated, min_val=None, max_val=None, mode='train'): | |
self.data = data | |
self.cycle_indices = data['Cycle_Index'].unique() | |
self.attrib_x = attrib_x | |
self.attrib_y = attrib_y | |
self.C_rated = C_rated | |
self.mode = mode | |
self.max_len = max_len | |
self.data['Current'] /= self.C_rated | |
if mode == 'train': | |
self.min_val = data[attrib_x].values.min(axis=0) | |
self.max_val = data[attrib_x].values.max(axis=0) | |
with open('./para_BNN/min_max_values.pkl', 'wb') as f: | |
pickle.dump((self.min_val, self.max_val), f) | |
else: | |
self.min_val = min_val | |
self.max_val = max_val | |
def get_min_max_values(self): | |
if self.mode != 'train': | |
return None | |
return self.min_val, self.max_val | |
def __len__(self): | |
return len(self.cycle_indices) | |
def __getitem__(self, index): | |
cycle_index = self.cycle_indices[index] | |
cycle_data = self.data[self.data['Cycle_Index'] == cycle_index].copy() | |
# cycle_data['Current'] /= self.C_rated | |
# 提取特征和标签 | |
features = cycle_data[self.attrib_x].values | |
# C_ini = cycle_data[self.attrib_y].values[0] | |
label = cycle_data[self.attrib_y].values[0] | |
# # 标准化特征 | |
features = (features - self.min_val) / (self.max_val - self.min_val) | |
# label = (label - self.y_mean) / self.y_std | |
# features = (features - self.min_val) / self.max_val | |
pad_len = self.max_len - len(features) | |
features = torch.tensor(features, dtype=torch.float32).clone().detach() | |
# 在 features 后面填充固定值 | |
features = torch.cat([features, torch.full((pad_len, features.shape[1]), 0)]) | |
# 转换为张量 | |
# features = torch.tensor(padded_features, dtype=torch.float32) | |
label = torch.tensor(label, dtype=torch.float32) | |
# label = label.view(1,1) | |
return features, label | |
# | |
# def pad_collate(self, batch): | |
# # 填充批次数据,使其长度一致 | |
# features_batch, labels_batch = zip(*batch) | |
# features_batch = pad_sequence(features_batch, batch_first=True) | |
# labels_batch = torch.stack(labels_batch) | |
# | |
# return features_batch, labels_batch | |
class Transformer_FeatureExtractor(nn.Module): | |
def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len): | |
super(Transformer_FeatureExtractor, self).__init__() | |
self.num_layers = num_layers | |
self.hidden_size = hidden_dim | |
self.batch_size = batch_size | |
self.max_seq_len = max_seq_len | |
# self.cls_token = nn.Parameter(torch.randn(self.batch_size, 1, self.hidden_size)) | |
self.embedding = nn.Linear(input_dim, hidden_dim) | |
self.position_encoding = self.create_position_encoding() | |
self.transformer_encoder = nn.TransformerEncoder( | |
nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads,dropout=0), | |
num_layers=num_layers | |
) | |
def create_position_encoding(self): | |
position_encoding = torch.zeros(self.max_seq_len, self.hidden_size) | |
position = torch.arange(0, self.max_seq_len).unsqueeze(1) | |
div_term = torch.exp(torch.arange(0, self.hidden_size, 2) * (-math.log(10000.0) / self.hidden_size)) | |
position_encoding[:, 0::2] = torch.sin(position * div_term) | |
position_encoding[:, 1::2] = torch.cos(position * div_term) | |
position_encoding = position_encoding.unsqueeze(0) | |
return nn.Parameter(position_encoding, requires_grad=False) | |
def forward(self, x): | |
seq_len = x.shape[1] | |
positions = self.position_encoding[:, :seq_len, :] | |
x = self.embedding(x) | |
x = x + positions | |
# x = torch.cat((x, self.cls_token),dim=1) | |
# x = torch.cat((self.cls_token, x),dim=1) | |
x_layer = self.transformer_encoder(x) | |
feature = torch.mean(x_layer, dim=1) | |
return feature | |
# class BaseVaraitionLayer_(nn.Module): | |
# def __init__(self): | |
# super().__init__() | |
# def kl_div(self, mu_q, sigma_q, mu_p, sigma_p): | |
# ''' | |
# Calculates kl divergence between two guassians (Q || P) | |
# :param mu_q: torch.Tensor -> mu parameter of distribution Q | |
# :param sigma_q: torch.Tensor -> sigma parameter of distribution Q | |
# :param mu_p: float -> mu parameter of distribution P | |
# :param sigma_p: float -> sigma parameter of distribution P | |
# :return: torch.Tensor of shape 0 | |
# ''' | |
# kl = torch.log(sigma_p) - torch.log(sigma_q) | |
# + (sigma_q**2 + (mu_q - mu_p)**2) / (2 * (sigma_p**2)) - 0.5 | |
# return kl.sum() | |
class BayesLinear(nn.Module): | |
def __init__(self, input_dim, output_dim, prior_mu, prior_sigma): | |
super(BayesLinear, self).__init__() | |
self.input_dim = input_dim | |
self.output_dim = output_dim | |
self.prior_mu = prior_mu | |
self.prior_sigma = prior_sigma | |
self.weight_mu = nn.Parameter(torch.Tensor(output_dim, input_dim)) | |
self.weight_rho = nn.Parameter(torch.Tensor(output_dim, input_dim)) | |
self.bias_mu = nn.Parameter(torch.Tensor(output_dim)) | |
self.bias_rho = nn.Parameter(torch.Tensor(output_dim)) | |
self.weight = None | |
self.bias = None | |
self.prior = Normal(prior_mu, prior_sigma) | |
self.reset_parameters() | |
def reset_parameters(self): | |
nn.init.kaiming_uniform_(self.weight_mu, a=math.sqrt(self.input_dim)) | |
nn.init.constant_(self.weight_rho, -3.0) | |
nn.init.zeros_(self.bias_mu) | |
nn.init.constant_(self.bias_rho, -3.0) | |
def forward(self, input): | |
weight_epsilon = torch.randn_like(self.weight_mu) | |
bias_epsilon = torch.randn_like(self.bias_mu) | |
weight_sigma = torch.log1p(torch.exp(self.weight_rho)) | |
bias_sigma = torch.log1p(torch.exp(self.bias_rho)) | |
self.weight = self.weight_mu + weight_sigma * weight_epsilon | |
self.bias = self.bias_mu + bias_sigma * bias_epsilon | |
weight_log_prior = self.prior.log_prob(self.weight) | |
bias_log_prior = self.prior.log_prob(self.bias) | |
self.log_prior = torch.sum(weight_log_prior) + torch.sum(bias_log_prior) | |
self.weight_post = Normal(self.weight_mu.data, torch.log(1 + torch.exp(self.weight_rho))) | |
self.bias_post = Normal(self.bias_mu.data, torch.log(1 + torch.exp(self.bias_rho))) | |
self.log_post = self.weight_post.log_prob(self.weight).sum() + self.bias_post.log_prob(self.bias).sum() | |
# output_mean = torch.matmul(input, weight.t()) + bias | |
# output_var = torch.matmul(input, weight_sigma.t())**2 + bias_sigma**2 | |
# output_mean = nn.functional.linear(input, self.weight_mu, self.bias_mu) | |
# output_variance = nn.functional.linear(input ** 2, weight_sigma ** 2, bias_sigma ** 2) + 1e-8 | |
# return output_mean, output_var | |
return F.linear(input, self.weight, self.bias) | |
class BNN_Regression(nn.Module): | |
def __init__(self, input_dim, output_dim, noise_tol): | |
super(BNN_Regression, self).__init__() | |
self.input_dim = input_dim | |
self.output_dim = output_dim | |
# self.batch_size = batch_size | |
self.noise_tol = noise_tol | |
self.relu = nn.ReLU() | |
self.tanh = nn.Tanh() | |
# self.bnn1 = BayesLinear(input_dim=input_dim, output_dim=64, prior_mu=0, prior_sigma=1.) | |
# self.bnn2 = BayesLinear(input_dim=64, output_dim=32, prior_mu=0, prior_sigma=1.) | |
# self.fc = BayesLinear(input_dim=16, output_dim=output_dim,prior_mu=0, prior_sigma=1.) | |
self.bnn = BayesLinear(input_dim=input_dim, output_dim=16, prior_mu=0, prior_sigma=1.) | |
self.fc = BayesLinear(input_dim=16, output_dim=output_dim, prior_mu=0, prior_sigma=1.) | |
def forward(self, x): | |
x = self.bnn(x) | |
x = self.relu(x) | |
predictions = self.fc(x) | |
# x = self.bnn1(x) | |
# x = self.relu(x) | |
# x = self.bnn2(x) | |
# x = self.tanh(x) | |
# x = self.bnn3(x) | |
# x = self.relu(x) | |
# predictions = self.fc(x) | |
return predictions | |
def log_prior(self): | |
# calculate the log prior over all the layers | |
# return self.bnn1.log_prior + self.bnn2.log_prior + self.bnn3.log_prior + self.fc.log_prior | |
return self.bnn.log_prior + self.fc.log_prior | |
def log_post(self): | |
# calculate the log posterior over all the layers | |
# return self.bnn1.log_post + self.bnn2.log_post + self.bnn3.log_post + self.fc.log_post | |
return self.bnn.log_post + self.fc.log_post | |
def sample_elbo(self, input, target, samples, device): | |
# we calculate the negative elbo, which will be our loss function | |
# initialize tensors | |
outputs = torch.zeros(samples, target.shape[0]).to(device) | |
log_priors = torch.zeros(samples) | |
log_posts = torch.zeros(samples) | |
log_likes = torch.zeros(samples) | |
# make predictions and calculate prior, posterior, and likelihood for a given number of samples | |
# 蒙特卡罗近似 | |
for i in range(samples): | |
outputs[i] = self(input).reshape(-1) # make predictions | |
log_priors[i] = self.log_prior() # get log prior | |
log_posts[i] = self.log_post() # get log variational posterior | |
log_likes[i] = Normal(outputs[i], self.noise_tol).log_prob(target.reshape(-1)).sum() # calculate the log likelihood | |
# calculate monte carlo estimate of prior posterior and likelihood | |
log_prior = log_priors.mean() | |
log_post = log_posts.mean() | |
log_like = log_likes.mean() | |
# calculate the negative elbo (which is our loss function) | |
loss = log_post - log_prior - log_like | |
return loss | |
class ATBNN_Model(nn.Module): | |
def __init__(self, input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len): | |
super(ATBNN_Model, self).__init__() | |
self.feature_extractor = Transformer_FeatureExtractor(input_dim=input_dim, | |
output_dim=hidden_dim, | |
hidden_dim=hidden_dim, | |
num_layers=num_layers, | |
num_heads=num_heads, | |
batch_size=batch_size, | |
max_seq_len=max_seq_len) | |
self.bnn_regression = BNN_Regression(input_dim=hidden_dim, | |
output_dim=output_dim, | |
noise_tol=0.01) | |
def forward(self, x): | |
self.features = self.feature_extractor(x) | |
predictions = self.bnn_regression(self.features) | |
return predictions |