Spaces:
Sleeping
Sleeping
import random | |
import torch | |
import torch.nn as nn | |
import numpy as np | |
import pandas as pd | |
from matplotlib import pyplot as plt | |
from torch.utils.data import Dataset, DataLoader | |
import net_BNN | |
def train(n_epochs,dataloader,val_dataloader,model,criterion,optimizer,device): | |
num_batches = len(dataloader) | |
num_sample = 10 | |
model.train() #重要!设置模式 | |
for epoch in range(n_epochs): | |
total_loss = 0 | |
total_MSEloss = 0 | |
for inputs,labels in dataloader: | |
inputs,labels = inputs.to(device),labels.to(device) #GPU | |
optimizer.zero_grad() | |
# MC sample | |
# sample_output = torch.zeros(num_sample, labels.shape[0]).to(device) | |
# for i in range(num_sample): | |
# sample_output[i] = model(inputs).reshape(-1) | |
# | |
# outputs = torch.Tensor(sample_output.mean(dim=0).unsqueeze(1)) | |
outputs = model(inputs) | |
features = model.features | |
loss = model.bnn_regression.sample_elbo(features, labels, 10, device) | |
MSEloss = criterion(outputs,labels) | |
loss.backward() | |
optimizer.step() | |
total_loss += loss.item() | |
total_MSEloss += MSEloss.item() | |
train_loss = total_loss / num_batches | |
train_MSEloss = total_MSEloss / num_batches | |
val_loss, val_MSEloss = val(val_dataloader,model,criterion,device) | |
# torch.save(model.state_dict(),".\model.pth") | |
torch.save(model.state_dict(), f"./model_B/model_epoch{epoch + 1 }_Trainloss{train_MSEloss:.8f}_ValLoss{val_MSEloss:.8f}.pt") | |
print('Epoch [{}/{}], Train_Loss: {:.8f}, Val_Loss: {:.8f}'.format(epoch + 1, num_epochs , train_loss,val_loss), end=' ') | |
print('Train_MSE_Loss: {:.8f}, Val_MSE_Loss: {:.8f}'.format(train_MSEloss, val_MSEloss)) | |
def val(dataloader,model,criterion,device): | |
val_loss = 0 | |
num_batches = len(dataloader) | |
val_MSEloss = 0 | |
num_sample = 10 | |
model.eval() #重要!设置模式 | |
with torch.no_grad(): | |
for inputs,labels in dataloader: | |
inputs = inputs.to(device) | |
labels = labels.to(device) #GPU | |
# # MC sample | |
# sample_output = torch.zeros(num_sample, labels.shape[0]).to(device) | |
# for i in range(num_sample): | |
# sample_output[i] = model(inputs).reshape(-1) | |
# | |
# outputs = torch.Tensor(sample_output.mean(dim=0).unsqueeze(1)) | |
outputs = model(inputs) | |
features = model.features | |
loss = model.bnn_regression.sample_elbo(features, labels, 1, device) | |
# outputs = model(inputs) | |
# features = model.features.to(device) | |
# loss = model.bnn_regression.sample_elbo(features, labels, 1, device) | |
MSEloss = criterion(outputs,labels) | |
val_loss += loss.item() | |
val_MSEloss += MSEloss.item() | |
val_loss = val_loss / num_batches | |
val_MSEloss = val_MSEloss / num_batches | |
return val_loss, val_MSEloss | |
def test_plot(dataloader,model,device,criterion): | |
total_loss = 0.0 | |
num_samples = 0 | |
model.eval() | |
with torch.no_grad(): | |
predictions = [] | |
true_labels = [] | |
var = [] | |
for test_features, test_labels in dataloader: | |
outputs, vars = model(test_features.to(device)) | |
loss = criterion(outputs, test_labels.to(device)) | |
total_loss += loss.item() * test_features.size(0) | |
num_samples += test_features.size(0) | |
predictions.append(outputs.tolist()) | |
var.append(vars.tolist()) | |
true_labels.append(test_labels.tolist()) | |
average_loss = total_loss / num_samples | |
print('Validation Loss: {:.8f}'.format(average_loss)) | |
# | |
# predictions = [(np.array(x) * y_std + y_mean).tolist() for x in predictions] | |
# true_labels = [(np.array(x) * y_std + y_mean).tolist() for x in true_labels] | |
x = range(len(sum(predictions, []))) | |
pred_array = np.array(sum(predictions, [])).flatten() | |
var_array = np.array(sum(var, [])).flatten() | |
plt.plot(sum(predictions,[]), label='Predictions') | |
plt.plot(sum(true_labels,[]), label='True Labels') | |
plt.fill_between(x, pred_array + var_array, pred_array - var_array, alpha=0.5) | |
plt.legend() | |
plt.xlabel('Sample Index') | |
plt.ylabel('Cycle Capacity') | |
plt.show() | |
def setup_seed(seed): | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
np.random.seed(seed) | |
random.seed(seed) | |
torch.backends.cudnn.deterministic = True | |
# 设置随机数种子 | |
setup_seed(20) | |
base_path = r'E:\member\ShiJH\Battery Datasets\SNL_18650_LFP Datasets\modified_dataset' | |
# csv_files_list = [base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-1C_a_timeseries.csv'), | |
# # base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-1C_b_timeseries.csv'), | |
# base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-3C_a_timeseries.csv')] | |
# train_data = pd.DataFrame() | |
# cycle_index = 0 | |
# index_max = 0 | |
# for csv_file in csv_files_list: | |
# df = pd.read_csv(csv_file) | |
# C_ini = df['cycle capacity'].values[0] | |
# df['SOH'] = df['cycle capacity'] / C_ini | |
# index_max = df['Cycle_Index'].max() | |
# df['Cycle_Index'] = df['Cycle_Index'] + cycle_index | |
# cycle_index += index_max | |
# train_data = pd.concat([train_data, df], ignore_index=True) | |
traindata_path = base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-3C_a_timeseries.csv') | |
train_data = pd.read_csv(traindata_path) | |
train_data['SOH'] = train_data['cycle capacity'] / train_data['cycle capacity'].values[0] | |
# attrib_feature = ['Test_Time','Charge_Capacity','Discharge_Capacity','Voltage','Environment_Temperature','Cell_Temperature'] | |
attrib_feature = ['Current','Voltage','Environment_Temperature','Cell_Temperature','SOC'] | |
attrib_label = ['SOH'] | |
max_len = 200 # 初值100 | |
C_rated = 1.1 | |
C_train = train_data[attrib_label].values[0] | |
train_dataset = net_BNN.CycleDataset( | |
data= train_data, | |
attrib_x=attrib_feature, | |
attrib_y=attrib_label, | |
max_len=max_len, | |
C_rated=C_rated, | |
mode='train') | |
min_val, max_val = train_dataset.get_min_max_values() | |
batch_size = 30 | |
# train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, | |
# collate_fn=train_dataset.pad_collate, drop_last=True) | |
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True) | |
valdata_path = base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-3C_b_timeseries.csv') | |
val_data = pd.read_csv(valdata_path) | |
val_data['SOH'] = val_data['cycle capacity'] / val_data['cycle capacity'].values[0] | |
# C_val = val_data[attrib_label].values[0] | |
# scaler_val = train_dataset.scaler | |
val_dataset = net_BNN.CycleDataset( | |
data=val_data, | |
attrib_x=attrib_feature, | |
attrib_y=attrib_label, | |
max_len=max_len, | |
C_rated=C_rated, | |
min_val=min_val, | |
max_val=max_val, | |
mode='val') | |
# val_dataloader = DataLoader(tar_dataset, batch_size=batch_size,shuffle=False, collate_fn=tar_dataset.pad_collate,drop_last=True) | |
val_dataloader = DataLoader(val_dataset, batch_size=batch_size,shuffle=False,drop_last=True) | |
# testdata_path = base_path + str('\modified_SNL_18650_LFP_25C_0-100_0.5-1C_c_timeseries.csv') | |
# test_data = pd.read_csv(testdata_path) | |
# C_test = test_data[attrib_label].values[0] | |
# test_dataset = net.CycleDataset(test_data, attrib_feature, attrib_label, C_test, max_len, C_rated, | |
# min_val=min_val, max_val=max_val, mode='test') | |
# # test_dataloader = DataLoader(test_dataset, batch_size=batch_size,shuffle=False, collate_fn=test_dataset.pad_collate,drop_last=True) | |
# test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=True) | |
# 初始化Transformer模型 | |
input_dim = len(attrib_feature) | |
output_dim = 1 | |
hidden_dim = 64 | |
num_layers = 2 | |
num_heads = 4 | |
lr = 1e-4 | |
max_seq_len = 200 | |
# ATBNN_model = net.ATBNN_Model(input_dim, output_dim, hidden_dim, num_layers, num_heads, batch_size, max_seq_len) | |
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') | |
# 导入网络结构 | |
ATBNN_model = net_BNN.ATBNN_Model( | |
input_dim = input_dim, | |
output_dim = output_dim, | |
hidden_dim = hidden_dim, | |
num_layers = num_layers, | |
num_heads = num_heads, | |
batch_size = batch_size, | |
max_seq_len= max_seq_len) | |
# ATBNN_model.load_state_dict(torch.load("./model_BNN_new/model_epoch185_Trainloss0.01930173_ValLoss0.01682474.pt")) | |
ATBNN_model.to(device) | |
optimizer = torch.optim.Adam(ATBNN_model.parameters(), lr=lr) | |
# optimizer = torch.optim.Adadelta(ATBNN_model.parameters(), lr=1.0, rho=0.9, eps=1e-6, weight_decay=0) | |
# optimizer = torch.optim.SGD(ATBNN_model.parameters(),lr=lr) | |
# scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer,gamma=0.9) | |
criterion = nn.MSELoss(reduction='mean') | |
num_epochs = 10000 | |
ATBNN_model.to(device) | |
train(num_epochs, train_dataloader, val_dataloader, ATBNN_model, criterion, optimizer, device) | |
# test_plot(test_dataloader, ATBNN_model, device, criterion) |