import random import torch import torch.nn as nn import numpy as np import pandas as pd from matplotlib import pyplot as plt from torch.utils.data import Dataset, DataLoader import net_BNN def train(n_epochs,dataloader,val_dataloader,model,criterion,optimizer,device): num_batches = len(dataloader) num_sample = 10 model.train() #重要!设置模式 for epoch in range(n_epochs): total_loss = 0 total_MSEloss = 0 for inputs,labels in dataloader: inputs,labels = inputs.to(device),labels.to(device) #GPU optimizer.zero_grad() # MC sample # sample_output = torch.zeros(num_sample, labels.shape[0]).to(device) # for i in range(num_sample): # sample_output[i] = model(inputs).reshape(-1) # # outputs = torch.Tensor(sample_output.mean(dim=0).unsqueeze(1)) outputs = model(inputs) features = model.features loss = model.bnn_regression.sample_elbo(features, labels, 10, device) MSEloss = criterion(outputs,labels) loss.backward() optimizer.step() total_loss += loss.item() total_MSEloss += MSEloss.item() train_loss = total_loss / num_batches train_MSEloss = total_MSEloss / num_batches val_loss, val_MSEloss = val(val_dataloader,model,criterion,device) # torch.save(model.state_dict(),".\model.pth") torch.save(model.state_dict(), f"./model_B/model_epoch{epoch + 1 }_Trainloss{train_MSEloss:.8f}_ValLoss{val_MSEloss:.8f}.pt") print('Epoch [{}/{}], Train_Loss: {:.8f}, Val_Loss: {:.8f}'.format(epoch + 1, num_epochs , train_loss,val_loss), end=' ') print('Train_MSE_Loss: {:.8f}, Val_MSE_Loss: {:.8f}'.format(train_MSEloss, val_MSEloss)) def val(dataloader,model,criterion,device): val_loss = 0 num_batches = len(dataloader) val_MSEloss = 0 num_sample = 10 model.eval() #重要!设置模式 with torch.no_grad(): for inputs,labels in dataloader: inputs = inputs.to(device) labels = labels.to(device) #GPU # # MC sample # sample_output = torch.zeros(num_sample, labels.shape[0]).to(device) # for i in range(num_sample): # sample_output[i] = model(inputs).reshape(-1) # # outputs = torch.Tensor(sample_output.mean(dim=0).unsqueeze(1)) outputs = model(inputs) features = model.features loss = model.bnn_regression.sample_elbo(features, labels, 1, device) # outputs = model(inputs) # features = model.features.to(device) # loss = model.bnn_regression.sample_elbo(features, labels, 1, device) MSEloss = criterion(outputs,labels) val_loss += loss.item() val_MSEloss += MSEloss.item() val_loss = val_loss / num_batches val_MSEloss = val_MSEloss / num_batches return val_loss, val_MSEloss def test_plot(dataloader,model,device,criterion): total_loss = 0.0 num_samples = 0 model.eval() with torch.no_grad(): predictions = [] true_labels = [] var = [] for test_features, test_labels in dataloader: outputs, vars = model(test_features.to(device)) loss = criterion(outputs, test_labels.to(device)) total_loss += loss.item() * test_features.size(0) num_samples += test_features.size(0) predictions.append(outputs.tolist()) var.append(vars.tolist()) true_labels.append(test_labels.tolist()) average_loss = total_loss / num_samples print('Validation Loss: {:.8f}'.format(average_loss)) # # predictions = [(np.array(x) * y_std + y_mean).tolist() for x in predictions] # true_labels = [(np.array(x) * y_std + y_mean).tolist() for x in true_labels] x = range(len(sum(predictions, []))) pred_array = np.array(sum(predictions, [])).flatten() var_array = np.array(sum(var, [])).flatten() plt.plot(sum(predictions,[]), label='Predictions') plt.plot(sum(true_labels,[]), label='True Labels') plt.fill_between(x, pred_array + var_array, pred_array - var_array, alpha=0.5) plt.legend() plt.xlabel('Sample Index') plt.ylabel('Cycle Capacity') plt.show() def setup_seed(seed): torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) np.random.seed(seed) random.seed(seed) torch.backends.cudnn.deterministic = True # 设置随机数种子 setup_seed(20) base_path = r'E:\member\ShiJH\Battery Datasets\SNL_18650_LFP Datasets\modified_dataset' # csv_files_list = [base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-1C_a_timeseries.csv'), # # base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-1C_b_timeseries.csv'), # base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-3C_a_timeseries.csv')] # train_data = pd.DataFrame() # cycle_index = 0 # index_max = 0 # for csv_file in csv_files_list: # df = pd.read_csv(csv_file) # C_ini = df['cycle capacity'].values[0] # df['SOH'] = df['cycle capacity'] / C_ini # index_max = df['Cycle_Index'].max() # df['Cycle_Index'] = df['Cycle_Index'] + cycle_index # cycle_index += index_max # train_data = pd.concat([train_data, df], ignore_index=True) traindata_path = base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-3C_a_timeseries.csv') train_data = pd.read_csv(traindata_path) train_data['SOH'] = train_data['cycle capacity'] / train_data['cycle capacity'].values[0] # attrib_feature = ['Test_Time','Charge_Capacity','Discharge_Capacity','Voltage','Environment_Temperature','Cell_Temperature'] attrib_feature = ['Current','Voltage','Environment_Temperature','Cell_Temperature','SOC'] attrib_label = ['SOH'] max_len = 200 # 初值100 C_rated = 1.1 C_train = train_data[attrib_label].values[0] train_dataset = net_BNN.CycleDataset( data= train_data, attrib_x=attrib_feature, attrib_y=attrib_label, max_len=max_len, C_rated=C_rated, mode='train') min_val, max_val = train_dataset.get_min_max_values() batch_size = 30 # train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, # collate_fn=train_dataset.pad_collate, drop_last=True) train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True) valdata_path = base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-3C_b_timeseries.csv') val_data = pd.read_csv(valdata_path) val_data['SOH'] = val_data['cycle capacity'] / val_data['cycle capacity'].values[0] # C_val = val_data[attrib_label].values[0] # scaler_val = train_dataset.scaler val_dataset = net_BNN.CycleDataset( data=val_data, attrib_x=attrib_feature, attrib_y=attrib_label, max_len=max_len, C_rated=C_rated, min_val=min_val, max_val=max_val, mode='val') # val_dataloader = DataLoader(tar_dataset, batch_size=batch_size,shuffle=False, collate_fn=tar_dataset.pad_collate,drop_last=True) val_dataloader = DataLoader(val_dataset, batch_size=batch_size,shuffle=False,drop_last=True) # testdata_path = base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-1C_c_timeseries.csv') # test_data = pd.read_csv(testdata_path) # C_test = test_data[attrib_label].values[0] # test_dataset = net.CycleDataset(test_data, attrib_feature, attrib_label, C_test, max_len, C_rated, # min_val=min_val, max_val=max_val, mode='test') # # test_dataloader = DataLoader(test_dataset, batch_size=batch_size,shuffle=False, collate_fn=test_dataset.pad_collate,drop_last=True) # test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=True) # 初始化Transformer模型 input_dim = len(attrib_feature) output_dim = 1 hidden_dim = 64 num_layers = 2 num_heads = 4 lr = 1e-4 max_seq_len = 200 # ATBNN_model = net.ATBNN_Model(input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len) device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') # 导入网络结构 ATBNN_model = net_BNN.ATBNN_Model( input_dim = input_dim, output_dim = output_dim, hidden_dim = hidden_dim, num_layers = num_layers, num_heads = num_heads, batch_size = batch_size, max_seq_len= max_seq_len) # ATBNN_model.load_state_dict(torch.load("./model_BNN_new/model_epoch185_Trainloss0.01930173_ValLoss0.01682474.pt")) ATBNN_model.to(device) optimizer = torch.optim.Adam(ATBNN_model.parameters(), lr=lr) # optimizer = torch.optim.Adadelta(ATBNN_model.parameters(), lr=1.0, rho=0.9, eps=1e-6, weight_decay=0) # optimizer = torch.optim.SGD(ATBNN_model.parameters(),lr=lr) # scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer,gamma=0.9) criterion = nn.MSELoss(reduction='mean') num_epochs = 10000 ATBNN_model.to(device) train(num_epochs, train_dataloader, val_dataloader, ATBNN_model, criterion, optimizer, device) # test_plot(test_dataloader, ATBNN_model, device, criterion)