12_ATBNN_Demo / app.py
JHao2830's picture
Update app.py
b4ef9f3
raw
history blame
6.21 kB
import pickle
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from torch.utils.data import Dataset, DataLoader
import net_BNN
import gradio as gr
from gradio.components import *
# 使用已训练的模型进行推理
class SingleDataset(Dataset):
def __init__(self, data, attrib_x, attrib_y, max_len, C_rated, min_val=None, max_val=None, mode='train'):
self.data = data
self.cycle_indices = data['Cycle_Index'].unique()
self.attrib_x = attrib_x
self.attrib_y = attrib_y
self.C_rated = C_rated
self.mode = mode
self.max_len = max_len
self.data['Current'] /= self.C_rated
if mode == 'train':
self.min_val = data[attrib_x].values.min(axis=0)
self.max_val = data[attrib_x].values.max(axis=0)
with open('./para_BNN/min_max_values.pkl', 'wb') as f:
pickle.dump((self.min_val, self.max_val), f)
else:
self.min_val = min_val
self.max_val = max_val
def get_min_max_values(self):
if self.mode != 'train':
return None
return self.min_val, self.max_val
def __len__(self):
return len(self.cycle_indices)
def get_data_by_cycle_index(self, cycle_index):
# 获取指定 cycle_index 的数据
cycle_data = self.data[self.data['Cycle_Index'] == cycle_index].copy()
# 提取特征和标签
features = cycle_data[self.attrib_x].values
label = cycle_data[self.attrib_y].values[0]
# 标准化特征
features = (features - self.min_val) / (self.max_val - self.min_val)
pad_len = self.max_len - len(features)
features = torch.tensor(features, dtype=torch.float32).clone().detach()
features = torch.cat([features, torch.full((pad_len, features.shape[1]), 0)])
label = torch.tensor(label, dtype=torch.float32)
return features, label
def __getitem__(self, index):
cycle_index = self.cycle_indices[index]
cycle_data = self.data[self.data['Cycle_Index'] == cycle_index].copy()
# cycle_data['Current'] /= self.C_rated
# 提取特征和标签
features = cycle_data[self.attrib_x].values
# C_ini = cycle_data[self.attrib_y].values[0]
label = cycle_data[self.attrib_y].values[0]
# # 标准化特征
features = (features - self.min_val) / (self.max_val - self.min_val)
# label = (label - self.y_mean) / self.y_std
# features = (features - self.min_val) / self.max_val
pad_len = self.max_len - len(features)
features = torch.tensor(features, dtype=torch.float32).clone().detach()
# 在 features 后面填充固定值
features = torch.cat([features, torch.full((pad_len, features.shape[1]), 0)])
# 转换为张量
# features = torch.tensor(padded_features, dtype=torch.float32)
label = torch.tensor(label, dtype=torch.float32)
# label = label.view(1,1)
return features, label
def test(model_path, var_path, csv_path, pos):
attrib_feature = ['Current', 'Voltage', 'Environment_Temperature', 'Cell_Temperature', 'SOC']
attrib_label = ['SOH']
max_len = 200
input_dim = len(attrib_feature)
output_dim = 1
hidden_dim = 64
num_layers = 2
num_heads = 4
lr = 1e-3
max_seq_len = 200
batch_size = 1
C_rated = 1.1
model_path = model_path.name
var_path = var_path.name
csv_path = csv_path.name
with open(var_path, 'rb') as f:
min_val, max_val = pickle.load(f)
test_data = pd.read_csv(csv_path)
test_data['SOH'] = test_data['cycle capacity'] / test_data['cycle capacity'].values[0]
# C_val = val_data[attrib_label].values[0]
# scaler_val = train_dataset.scaler
dataset = SingleDataset(
data=test_data,
attrib_x=attrib_feature,
attrib_y=attrib_label,
max_len=max_len,
C_rated=C_rated,
min_val=min_val,
max_val=max_val,
mode='test')
features, label = dataset.get_data_by_cycle_index(pos)
features = torch.unsqueeze(features, dim=0)
# 导入网络结构
model = net_BNN.ATBNN_Model(
input_dim=input_dim,
output_dim=output_dim,
hidden_dim=hidden_dim,
num_layers=num_layers,
num_heads=num_heads,
batch_size=batch_size,
max_seq_len=max_seq_len)
# model.load_state_dict(torch.load("./model_B/model_epoch634_Trainloss0.00013560_ValLoss0.00107357.pt")) #LOSS=0
# model.load_state_dict(torch.load("./model_B/model_epoch2168_Trainloss0.00001729_ValLoss0.00107112.pt"))
model.load_state_dict(torch.load(model_path,map_location=torch.device("cpu")))
# model.load_state_dict(torch.load("./model/model_epoch1.pt"))
device = torch.device('cpu')
# device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
num_sample = 100
model.eval()
with torch.no_grad():
predictions = []
# true_labels = []
cov = []
# upp = []
# low = []
# outputs = model(test_features.to(device))
# MC sample
sample_output = torch.zeros(num_sample, label.shape[0]).to(device)
for i in range(num_sample):
sample_output[i] = model(features.to(device)).reshape(-1)
outputs = C_rated * torch.Tensor(sample_output.mean(dim=0).unsqueeze(1)).to(device)
covs = C_rated * torch.Tensor(sample_output.std(dim=0).unsqueeze(1)).to(device)
results = "{:.4f}±{:.4f}".format(outputs.item(), covs.item())
# true_labels.append((test_labels).tolist())
return results
#
inputs = [
File(label="上传预训练模型"),
File(label="上传参数文件"),
File(label="上传CSV测试数据"),
Number(label="选择循环次数(1~4004)")
]
outputs = [
Textbox(label="最大可用容量估计结果"),
]
gr.Interface(fn=test, inputs=inputs, outputs=outputs, title="ATBNN Model",
description="加载预训练模型,加载测试数据并进行预测,得到当前循环的最大可用容量。").launch()