Spaces:
Configuration error
Configuration error
import streamlit as st | |
from rdkit.Chem import MACCSkeys | |
from rdkit import Chem | |
import numpy as np | |
import pandas as pd | |
import xgboost as xgb | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from torch.autograd import Variable | |
from torch.utils.data import Dataset | |
import torch.utils.data | |
from torch_geometric.data import DataLoader | |
from torch_geometric.data import Data | |
import os | |
from tqdm import tqdm | |
import pandas as pd | |
import numpy as np | |
import torch.optim as optim | |
import torch.nn as nn | |
from torch.utils.data import Dataset, DataLoader | |
from sklearn.metrics import classification_report, confusion_matrix, average_precision_score, roc_auc_score | |
model_path = 'model/' | |
import torch | |
import matplotlib.pyplot as plt | |
import torch.nn.functional as F | |
from torch.autograd import Variable | |
from torch.utils.data import Dataset | |
import torch.utils.data | |
from torch_geometric.data import DataLoader | |
from torch_geometric.data import Data | |
from torch_geometric.nn import GATConv, RGCNConv, GCNConv, global_add_pool, global_mean_pool, global_max_pool, GlobalAttention, Set2Set | |
from sklearn.metrics import f1_score, accuracy_score, average_precision_score, roc_auc_score | |
import rdkit | |
from rdkit.Chem.Scaffolds import MurckoScaffold | |
from itertools import compress | |
import random | |
from collections import defaultdict | |
if torch.cuda.is_available(): | |
map_location=lambda storage, loc: storage.cuda() | |
else: | |
map_location='cpu' | |
import torch | |
from torch_geometric.nn import GATConv, RGCNConv, GCNConv, global_add_pool, global_mean_pool, global_max_pool, GlobalAttention, Set2Set | |
from sklearn.metrics import f1_score, accuracy_score, average_precision_score, roc_auc_score, classification_report, confusion_matrix | |
from sklearn.model_selection import KFold, train_test_split | |
import rdkit | |
from rdkit.Chem.Scaffolds import MurckoScaffold | |
from transformers import AutoModelWithLMHead, AutoTokenizer | |
import math | |
# from itertools import compress | |
# import random | |
# from collections import defaultdict | |
import pickle | |
device = 'cpu' | |
model_path = 'model/' | |
adj_max=80 | |
fps_len=167 | |
max_len=120 | |
vocabulary = {'C': 1, 'c': 2, '1': 3, '(': 4, '-': 5, '2': 6, 's': 7, 'N': 8, '=': 9, ')': 10, 'n': 11, '[': 12, | |
'@': 13, | |
'H': 14, ']': 15, 'O': 16, 'S': 17, '3': 18, 'l': 19, 'B': 20, 'r': 21, '/': 22, '\\': 23, 'o': 24, | |
'4': 25, | |
'5': 26, '6': 27, '7': 28, '+': 29, '.': 30, 'I': 31, 'F': 32, '8': 33, '#': 34, 'P': 35, '9': 36, | |
'a': 37, | |
'%': 38, '0': 39, 'i': 40, 'e': 41, 'L': 42, 'K': 43, 't': 44, 'T': 45, 'A': 46, 'g': 47, 'Z': 48, | |
'M': 49, | |
'R': 50, 'p': 51, 'b': 52, 'X': 53} | |
known_drugs = ['O=C(NCCC(O)=O)C(C=C1)=CC=C1/N=N/C(C=C2C(O)=O)=CC=C2OCCOC3=CC=C(NC4=NC=C(C)C(NC5=CC=CC(S(NC(C)(C)C)(=O)=O)=C5)=N4)C=C3', | |
'OCCOC1=CC=C(NC2=NC=C(C)C(NC3=CC=CC(S(NC(C)(C)C)(=O)=O)=C3)=N2)C=C1', | |
'C1CCC(C1)C(CC#N)N2C=C(C=N2)C3=C4C=CNC4=NC=N3', | |
'CC1CCN(CC1N(C)C2=NC=NC3=C2C=CN3)C(=O)CC#N', | |
'CCS(=O)(=O)N1CC(C1)(CC#N)N2C=C(C=N2)C3=C4C=CNC4=NC=N3', | |
'C1CC1C(=O)NC2=NN3C(=N2)C=CC=C3C4=CC=C(C=C4)CN5CCS(=O)(=O)CC5', | |
'CCC1CN(CC1C2=CN=C3N2C4=C(NC=C4)N=C3)C(=O)NCC(F)(F)F', | |
'OC(COC1=CC=C(NC2=NC=C(C)C(NC3=CC=CC(S(NC(C)(C)C)(=O)=O)=C3)=N2)C=C1)=O', | |
'O=C(NCCC(O)=O)C(C=C1)=CC=C1/N=N/C(C=C2C(O)=O)=CC=C2OCCOC3=CC=C(NC4=NC=C(C)C(NC5=CC=CC(S(N)(=O)=O)=C5)=N4)C=C3', | |
'OC1=CC=C(NC2=NC=C(C)C(NC3=CC=CC(S(NC(C)(C)C)(=O)=O)=C3)=N2)C=C1', | |
'OCCOC1=CC=C(NC2=NC=C(C)C(NC3=CC=CC(S(N)(=O)=O)=C3)=N2)C=C1', | |
'CC1=CN=C(N=C1NC2=CC(=CC=C2)S(=O)(=O)NC(C)(C)C)NC3=CC=C(C=C3)OCCN4CCCC4', | |
'C1CCN(C1)CCOC2=C3COCC=CCOCC4=CC(=CC=C4)C5=NC(=NC=C5)NC(=C3)C=C2'] | |
device = torch.device('cpu') | |
class jak_dataset(Dataset): | |
def __init__(self, dataframe, max_len=80): | |
super(jak_dataset, self).__init__() | |
self.len = len(dataframe) | |
self.dataframe = dataframe | |
self.max_len = max_len | |
def __getitem__(self, idx): | |
y = 1 if self.dataframe.Activity[idx]==1 else 0 | |
X = torch.zeros(self.max_len) | |
for idx, atom in enumerate(list(self.dataframe.Smiles[idx])[:self.max_len]): | |
X[idx] = vocabulary[atom] | |
return X.long(), y | |
def __len__(self): | |
return self.len | |
class encoder(nn.Module): | |
def __init__(self, input_length, num_words, embedding_size=32, inner_size=32, output_size=fps_len, stride=1): | |
super(encoder, self).__init__() | |
self.input_length = input_length | |
self.num_words = num_words | |
self.embedding_size = embedding_size | |
self.inner_size = inner_size | |
self.output_size = output_size | |
self.stride = stride | |
self.embedding = nn.Embedding(self.num_words + 1, self.embedding_size, padding_idx=0) | |
self.conv_1 = nn.Conv1d(self.embedding_size, self.inner_size, 1, self.stride) | |
self.conv_2 = nn.Conv1d(self.embedding_size, self.inner_size, 2, self.stride) | |
self.conv_3 = nn.Conv1d(self.embedding_size, self.inner_size, 3, self.stride) | |
self.w = nn.Linear(self.inner_size * 3, self.output_size) | |
self.activation = nn.LeakyReLU() | |
self.dropout = nn.Dropout(0.25) | |
self.init_weights() | |
def init_weights(self): | |
torch.nn.init.xavier_uniform_(self.conv_1.weight) | |
torch.nn.init.xavier_uniform_(self.conv_2.weight) | |
torch.nn.init.xavier_uniform_(self.conv_3.weight) | |
torch.nn.init.xavier_uniform_(self.w.weight) | |
torch.nn.init.xavier_uniform_(self.embedding.weight) | |
def forward(self, x): | |
x = self.embedding(x).permute(0, 2, 1) | |
tri = self.conv_3(x) | |
bi = self.conv_2(x) | |
uni = self.conv_1(x) | |
tri_maxpool = nn.MaxPool1d(tri.shape[2]) | |
bi_maxpool = nn.MaxPool1d(bi.shape[2]) | |
uni_maxpool = nn.MaxPool1d(uni.shape[2]) | |
integrate_feat = torch.cat( | |
(tri_maxpool(tri).squeeze(2), bi_maxpool(bi).squeeze(2), uni_maxpool(uni).squeeze(2)), dim=1) | |
#print(integrate_feat.shape) | |
return self.w(self.activation(integrate_feat)) | |
def generate_scaffold(smiles, include_chirality=False): | |
""" | |
Obtain Bemis-Murcko scaffold from smiles | |
:param smiles: | |
:param include_chirality: | |
:return: smiles of scaffold | |
""" | |
scaffold = MurckoScaffold.MurckoScaffoldSmiles( | |
smiles=smiles, includeChirality=include_chirality | |
) | |
return scaffold | |
def random_scaffold_split( | |
dataset, | |
smiles_list, | |
task_idx=None, | |
null_value=0, | |
frac_train=0.8, | |
frac_valid=0.1, | |
frac_test=0.1, | |
seed=42, | |
): | |
""" | |
Adapted from https://github.com/pfnet-research/chainer-chemistry/blob/master/\ | |
chainer_chemistry/dataset/splitters/scaffold_splitter.py | |
Split dataset by Bemis-Murcko scaffolds | |
This function can also ignore examples containing null values for a | |
selected task when splitting. Deterministic split | |
:param dataset: pytorch geometric dataset obj | |
:param smiles_list: list of smiles corresponding to the dataset obj | |
:param task_idx: column idx of the data.y tensor. Will filter out | |
examples with null value in specified task column of the data.y tensor | |
prior to splitting. If None, then no filtering | |
:param null_value: float that specifies null value in data.y to filter if | |
task_idx is provided | |
:param frac_train: | |
:param frac_valid: | |
:param frac_test: | |
:param seed; | |
:return: train, valid, test slices of the input dataset obj | |
""" | |
np.testing.assert_almost_equal(frac_train + frac_valid + frac_test, 1.0) | |
if task_idx is not None: | |
# filter based on null values in task_idx | |
# get task array | |
y_task = np.array([data.y[task_idx].item() for data in dataset]) | |
# boolean array that correspond to non null values | |
non_null = y_task != null_value | |
smiles_list = list(compress(enumerate(smiles_list), non_null)) | |
else: | |
non_null = np.ones(len(dataset)) == 1 | |
smiles_list = list(compress(enumerate(smiles_list), non_null)) | |
rng = np.random.RandomState(seed) | |
scaffolds = defaultdict(list) | |
for ind, smiles in smiles_list: | |
scaffold = generate_scaffold(smiles, include_chirality=True) | |
scaffolds[scaffold].append(ind) | |
scaffold_sets = rng.permutation(list(scaffolds.values())) | |
n_total_valid = int(np.floor(frac_valid * len(dataset))) | |
n_total_test = int(np.floor(frac_test * len(dataset))) | |
train_idx = [] | |
valid_idx = [] | |
test_idx = [] | |
for scaffold_set in scaffold_sets: | |
if len(valid_idx) + len(scaffold_set) <= n_total_valid: | |
valid_idx.extend(scaffold_set) | |
elif len(test_idx) + len(scaffold_set) <= n_total_test: | |
test_idx.extend(scaffold_set) | |
else: | |
train_idx.extend(scaffold_set) | |
return train_idx, valid_idx, test_idx | |
def load_smi_y(enzyme): | |
try: | |
path = 'data/' + enzyme + '_' + 'MACCS.csv' | |
data = pd.read_csv(path) | |
except: | |
path = enzyme + '_' + 'MACCS.csv' | |
data = pd.read_csv(path) | |
X = data['Smiles'] | |
y = data['Activity'] | |
return X, y | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
class CNNforclassification(nn.Module): | |
def __init__(self, max_len, voc_len, load_path='model/CNN_encoder_pretrain2.pt', | |
last_layer_size=fps_len, output_size=2): | |
super(CNNforclassification, self).__init__() | |
self.last_layer_size = last_layer_size | |
self.output_size = output_size | |
self.pretrained = encoder(max_len, voc_len) | |
self.pretrained.load_state_dict( | |
torch.load(load_path, map_location=device)) | |
self.w = nn.Linear(self.last_layer_size, self.output_size) | |
self.activation = nn.LeakyReLU() | |
def forward(self, x): | |
return self.w(self.activation(self.pretrained(x))) | |
def CNN_predict(enzyme, smi): | |
ml = 'CNN' | |
known_drugs = [smi] | |
file_path = 'model/' + ml + '_' + enzyme + '.pt' | |
print(file_path) | |
weight_dict = {1: torch.tensor([3.0, 1.0]), 2: torch.tensor([2.0, 1.0]), 3: torch.tensor([2.0, 1.0]), | |
4: torch.tensor([2.0, 1.0])} | |
model = CNNforclassification(max_len, len(vocabulary)) | |
model.load_state_dict(torch.load(file_path, map_location=torch.device('cpu'))) | |
model.eval() | |
params = {'batch_size':16, 'shuffle':False, 'drop_last':False, 'num_workers':0} | |
known_df = pd.DataFrame(known_drugs) | |
known_df.columns = ['Smiles'] | |
known_df['Activity'] = 0 | |
known_data = jak_dataset(known_df) | |
known_loader = DataLoader(known_data, **params) | |
for idx, (X, y_true) in tqdm(enumerate(known_loader), total=len(known_loader)): | |
# print(X) | |
model.eval() | |
# print(X) | |
output = model(X.clone().detach()) | |
# print(output) | |
a, y_pred = torch.max(output, 1) | |
# print(a) | |
# print(output) | |
# print(torch.max(torch.softmax(output, 1), 1)[0].tolist()) | |
# print(a.tolist()) | |
# print(torch.max(torch.softmax(output, 1), 1)[1].tolist()) | |
y_prob = torch.softmax(output,1)[:, 1].tolist() | |
# print(y_prob) | |
# print(y_pred.tolist()) | |
return y_prob, y_pred | |
class RGCN_VAE(torch.nn.Module): | |
def __init__(self, in_embd, layer_embd, out_embd, num_relations, dropout): | |
super(RGCN_VAE, self).__init__() | |
self.embedding = nn.ModuleList([nn.Embedding(35,in_embd), nn.Embedding(10,in_embd), \ | |
nn.Embedding(5,in_embd), nn.Embedding(7,in_embd), \ | |
nn.Embedding(5,in_embd), nn.Embedding(5,in_embd)]) | |
self.GATConv1 = RGCNConv(6*in_embd, layer_embd, num_relations) | |
self.GATConv2 = RGCNConv(layer_embd, out_embd*2, num_relations) | |
# self.GATConv1 = GCNConv(6*in_embd, layer_embd, num_relations) | |
# self.GATConv2 = GCNConv(layer_embd, out_embd*2, num_relations) | |
self.GATConv1.reset_parameters() | |
self.GATConv2.reset_parameters() | |
self.activation = nn.Sigmoid() | |
self.d = out_embd | |
self.pool = GlobalAttention(gate_nn=nn.Sequential( \ | |
nn.Linear(out_embd, out_embd), nn.BatchNorm1d(out_embd), nn.ReLU(), nn.Linear(out_embd, 1))) | |
self.graph_linear = nn.Linear(out_embd, 1) | |
def recognition_model(self, x, edge_index, edge_type, batch): | |
for i in range(6): | |
embds = self.embedding[i](x[:,i]) | |
if i == 0: | |
x_ = embds | |
else: | |
x_ = torch.cat((x_, embds), 1) | |
out = self.activation(self.GATConv1(x_, edge_index, edge_type)) | |
out = self.activation(self.GATConv2(out, edge_index, edge_type)) | |
# out = self.activation(self.GATConv1(x_, edge_index)) | |
# out = self.activation(self.GATConv2(out, edge_index)) | |
mu = out[:,0:self.d] | |
logvar = out[:,self.d:2*self.d] | |
return mu, logvar | |
def reparametrize(self, mu, logvar): | |
std = logvar.mul(0.5).exp_() | |
eps = Variable(std.data.new(std.size()).normal_()) | |
return eps.mul(std) + mu | |
def generation_model(self, Z): | |
out = self.activation([email protected]) | |
return out | |
def forward(self, x, edge_index, edge_type, batch, type_): | |
if type_=='pretrain': | |
mu, logvar = self.recognition_model(x, edge_index, edge_type, batch) | |
Z = self.reparametrize(mu, logvar) | |
A_hat = self.generation_model(Z) | |
N = x.size(0) | |
A = torch.zeros((N,N), device=device) | |
with torch.no_grad(): | |
for i in range(edge_index.size(1)): | |
A[edge_index[0,i], edge_index[1,i]] = 1 | |
# print(A.size(),A_hat.size()) | |
return A, A_hat, mu, logvar | |
else: | |
mu = self.cal_mu(x, edge_index, edge_type, batch) | |
out = self.pool(mu, batch) | |
out = self.graph_linear(out) | |
out = self.activation(out) | |
return out | |
def cal_mu(self, x, edge_index, edge_type, batch): | |
mu, _ = self.recognition_model(x, edge_index, edge_type, batch) | |
return mu | |
class GCN_VAE(torch.nn.Module): | |
def __init__(self, in_embd, layer_embd, out_embd, num_relations, dropout): | |
super(GCN_VAE, self).__init__() | |
self.embedding = nn.ModuleList([nn.Embedding(35,in_embd), nn.Embedding(10,in_embd), \ | |
nn.Embedding(5,in_embd), nn.Embedding(7,in_embd), \ | |
nn.Embedding(5,in_embd), nn.Embedding(5,in_embd)]) | |
self.GATConv1 = GCNConv(6*in_embd, layer_embd, num_relations) | |
self.GATConv2 = GCNConv(layer_embd, out_embd*2, num_relations) | |
self.GATConv1.reset_parameters() | |
self.GATConv2.reset_parameters() | |
self.activation = nn.Sigmoid() | |
self.d = out_embd | |
self.pool = GlobalAttention(gate_nn=nn.Sequential( \ | |
nn.Linear(out_embd, out_embd), nn.BatchNorm1d(out_embd), nn.ReLU(), nn.Linear(out_embd, 1))) | |
self.graph_linear = nn.Linear(out_embd, 1) | |
def recognition_model(self, x, edge_index, edge_type, batch): | |
for i in range(6): | |
embds = self.embedding[i](x[:,i]) | |
if i == 0: | |
x_ = embds | |
else: | |
x_ = torch.cat((x_, embds), 1) | |
out = self.activation(self.GATConv1(x_, edge_index)) | |
out = self.activation(self.GATConv2(out, edge_index)) | |
mu = out[:,0:self.d] | |
logvar = out[:,self.d:2*self.d] | |
return mu, logvar | |
def reparametrize(self, mu, logvar): | |
std = logvar.mul(0.5).exp_() | |
eps = Variable(std.data.new(std.size()).normal_()) | |
return eps.mul(std) + mu | |
def generation_model(self, Z): | |
out = self.activation([email protected]) | |
return out | |
def forward(self, x, edge_index, edge_type, batch, type_): | |
if type_=='pretrain': | |
mu, logvar = self.recognition_model(x, edge_index, edge_type, batch) | |
Z = self.reparametrize(mu, logvar) | |
A_hat = self.generation_model(Z) | |
N = x.size(0) | |
A = torch.zeros((N,N), device=device) | |
with torch.no_grad(): | |
for i in range(edge_index.size(1)): | |
A[edge_index[0,i], edge_index[1,i]] = 1 | |
# print(A.size(),A_hat.size()) | |
return A, A_hat, mu, logvar | |
else: | |
mu = self.cal_mu(x, edge_index, edge_type, batch) | |
out = self.pool(mu, batch) | |
out = self.graph_linear(out) | |
out = self.activation(out) | |
return out | |
def cal_mu(self, x, edge_index, edge_type, batch): | |
mu, _ = self.recognition_model(x, edge_index, edge_type, batch) | |
return mu | |
class GAT_VAE(torch.nn.Module): | |
def __init__(self, in_embd, layer_embd, out_embd, num_relations, dropout): | |
super(GAT_VAE, self).__init__() | |
self.embedding = nn.ModuleList([nn.Embedding(35,in_embd), nn.Embedding(10,in_embd), \ | |
nn.Embedding(5,in_embd), nn.Embedding(7,in_embd), \ | |
nn.Embedding(5,in_embd), nn.Embedding(5,in_embd)]) | |
self.GATConv1 = GATConv(6*in_embd, layer_embd, num_relations) | |
self.GATConv2 = GATConv(layer_embd, out_embd*2, num_relations) | |
self.GATConv1.reset_parameters() | |
self.GATConv2.reset_parameters() | |
self.activation = nn.Sigmoid() | |
self.d = out_embd | |
self.pool = GlobalAttention(gate_nn=nn.Sequential( \ | |
nn.Linear(out_embd, out_embd), nn.BatchNorm1d(out_embd), nn.ReLU(), nn.Linear(out_embd, 1))) | |
self.graph_linear = nn.Linear(out_embd, 1) | |
def recognition_model(self, x, edge_index, edge_type, batch): | |
for i in range(6): | |
embds = self.embedding[i](x[:,i]) | |
if i == 0: | |
x_ = embds | |
else: | |
x_ = torch.cat((x_, embds), 1) | |
out = self.activation(self.GATConv1(x_, edge_index)) | |
out = self.activation(self.GATConv2(out, edge_index)) | |
mu = out[:,0:self.d] | |
logvar = out[:,self.d:2*self.d] | |
return mu, logvar | |
def reparametrize(self, mu, logvar): | |
std = logvar.mul(0.5).exp_() | |
eps = Variable(std.data.new(std.size()).normal_()) | |
return eps.mul(std) + mu | |
def generation_model(self, Z): | |
out = self.activation([email protected]) | |
return out | |
def forward(self, x, edge_index, edge_type, batch, type_): | |
if type_=='pretrain': | |
mu, logvar = self.recognition_model(x, edge_index, edge_type, batch) | |
Z = self.reparametrize(mu, logvar) | |
A_hat = self.generation_model(Z) | |
N = x.size(0) | |
A = torch.zeros((N,N), device=device) | |
with torch.no_grad(): | |
for i in range(edge_index.size(1)): | |
A[edge_index[0,i], edge_index[1,i]] = 1 | |
# print(A.size(),A_hat.size()) | |
return A, A_hat, mu, logvar | |
else: | |
mu = self.cal_mu(x, edge_index, edge_type, batch) | |
out = self.pool(mu, batch) | |
out = self.graph_linear(out) | |
out = self.activation(out) | |
return out | |
def cal_mu(self, x, edge_index, edge_type, batch): | |
mu, _ = self.recognition_model(x, edge_index, edge_type, batch) | |
return mu | |
class GDataset(Dataset): | |
def __init__(self, nodes, edges, relations, y, idx): | |
super(GDataset, self).__init__() | |
self.nodes = nodes | |
self.edges = edges | |
self.y = y | |
self.relations = relations | |
self.idx = idx | |
def __getitem__(self, idx): | |
idx = self.idx[idx] | |
edge_index = torch.tensor(self.edges[idx].T, dtype=torch.long) | |
x = torch.tensor(self.nodes[idx], dtype=torch.long) | |
y = torch.tensor(self.y[idx], dtype=torch.float) | |
edge_type = torch.tensor(self.relations[idx], dtype=torch.float) | |
return Data(x=x,edge_index=edge_index,edge_type=edge_type,y=y) | |
def __len__(self): | |
return len(self.idx) | |
def collate_fn(self,batch): | |
pass | |
def preprocess_test(smiles): | |
nodes = [] | |
edges = [] | |
relations = [] | |
lens = [] | |
adjs = [] | |
ords = [] | |
for i in range(len(smiles)): | |
node, adj, order = gen_smiles2graph(smiles[i]) | |
if node == 'error': | |
print(i, smiles, 'error') | |
continue | |
lens.append(adj.shape[0]) | |
adjs.append(adj) | |
ords.append(order) | |
node[:,2] += 1 | |
node[:,3] -= 1 | |
nodes.append(node) | |
adjs = np.array(adjs) | |
lens = np.array(lens) | |
def file2array(path, delimiter=' '): | |
fp = open(path, 'r', encoding='utf-8') | |
string = fp.read() | |
fp.close() | |
row_list = string.splitlines() | |
data_list = [[float(i) for i in row.strip().split(',')] for row in row_list] | |
return np.array(data_list) | |
def adj2idx(adj): | |
idx = [] | |
for i in range(adj.shape[0]): | |
for j in range(adj.shape[1]): | |
if adj[i,j] == 1: | |
idx.append([i,j]) | |
return np.array(idx) | |
def order2relation(adj): | |
idx = [] | |
for i in range(adj.shape[0]): | |
for j in range(adj.shape[1]): | |
if adj[i,j] != 0: | |
idx.extend([adj[i,j]]) | |
return np.array(idx) | |
for i in range(lens.shape[0]): | |
adj = adjs[i] | |
order = ords[i] | |
idx = adj2idx(adj) | |
relation = order2relation(order)-1 | |
edges.append(idx) | |
relations.append(relation) | |
return smiles, nodes, edges, relations | |
def gen_smiles2graph(sml): | |
"""Argument for the RD2NX function should be a valid SMILES sequence | |
returns: the graph | |
""" | |
ls = [1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 19, 20, 30, 33, 34, 35, 36, 37, 38, 47, 52, 53, 54, 55, 56, 83, 88] | |
dic = {} | |
for i in range(len(ls)): | |
dic[ls[i]] = i | |
m = rdkit.Chem.MolFromSmiles(sml) | |
# m = rdkit.Chem.AddHs(m) | |
order_string = { | |
rdkit.Chem.rdchem.BondType.SINGLE: 1, | |
rdkit.Chem.rdchem.BondType.DOUBLE: 2, | |
rdkit.Chem.rdchem.BondType.TRIPLE: 3, | |
rdkit.Chem.rdchem.BondType.AROMATIC: 4, | |
} | |
N = len(list(m.GetAtoms())) | |
nodes = np.zeros((N, 6)) | |
try: | |
test = m.GetAtoms() | |
except: | |
return 'error', 'error', 'error' | |
for i in m.GetAtoms(): | |
atom_types= dic[i.GetAtomicNum()] | |
atom_degree= i.GetDegree() | |
atom_form_charge= i.GetFormalCharge() | |
atom_hybridization= i.GetHybridization() | |
atom_aromatic= i.GetIsAromatic() | |
atom_chirality= i.GetChiralTag() | |
nodes[i.GetIdx()] = [atom_types, atom_degree, atom_form_charge, atom_hybridization, atom_aromatic, atom_chirality] | |
adj = np.zeros((N, N)) | |
orders = np.zeros((N, N)) | |
for j in m.GetBonds(): | |
u = min(j.GetBeginAtomIdx(), j.GetEndAtomIdx()) | |
v = max(j.GetBeginAtomIdx(), j.GetEndAtomIdx()) | |
order = j.GetBondType() | |
if order in order_string: | |
order = order_string[order] | |
else: | |
raise Warning("Ignoring bond order" + order) | |
adj[u, v] = 1 | |
adj[v, u] = 1 | |
orders[u, v] = order | |
orders[v, u] = order | |
# adj += np.eye(N) | |
return nodes, adj, orders | |
def get_preds(probabilities, threshold=0.5): | |
return [1 if prob > threshold else 0 for prob in probabilities] | |
def GVAE_pred(smi, enzyme, model_path=model_path, device='cpu'): | |
smiles, nodes, edges, relations = preprocess_test([smi]) | |
y = [0]*len(smiles) | |
test_set = GDataset(nodes, edges, relations,y, range(len(smiles))) | |
test_loader = DataLoader(test_set, batch_size=len(smiles), shuffle=False) | |
model = torch.load(model_path+'GVAE'+ '_' + enzyme + '.pt') | |
model.eval() | |
for data in test_loader: | |
data.to(device) | |
preds = model(data.x, data.edge_index, data.edge_type, data.batch, 'fintune') | |
# print(preds) | |
# print(get_preds(preds)[0]) | |
return get_preds(preds)[0] | |
# if __name__ == '__main__': | |
# smiles = ['CC1=CN=C(N=C1NC2=CC(=CC=C2)S(=O)(=O)NC(C)(C)C)NC3=CC=C(C=C3)OCCN4CCCC4'] | |
# smiles, nodes, edges, relations = preprocess_test(smiles) | |
# y = [0]*len(smiles) | |
# test_set = GDataset(nodes, edges, relations, y, range(len(smiles))) | |
# test_loader = DataLoader(test_set, batch_size=len(smiles), shuffle=False) | |
# model = torch.load(model_path+'GVAE_JAK1.pt') | |
# for data in test_loader: | |
# data.to(device) | |
# preds = model(data.x, data.edge_index, data.edge_type, data.batch, 'fintune') | |
# print(preds) | |
def smile_list_to_MACCS(smi_list): | |
MACCS_list = [] | |
for smi in smi_list: | |
mol = Chem.MolFromSmiles(smi) | |
maccs = list(MACCSkeys.GenMACCSKeys(mol).ToBitString()) | |
MACCS_list.append(maccs) | |
return MACCS_list | |
model_path = 'model/' | |
st.write(""" | |
# JAK prediction app | |
This app predicts the compound inhibition to certain JAK(s) | |
""") | |
st.sidebar.header('User Input Parameters') | |
def user_input_features(): | |
name = st.text_input('compound name', 'Fedratinib') | |
# if name == None: | |
# name = 'test' | |
smi = st.text_input('compound SMILES', 'CC1=CN=C(N=C1NC2=CC(=CC=C2)S(=O)(=O)NC(C)(C)C)NC3=CC=C(C=C3)OCCN4CCCC4') | |
# if name == None and smi == None: | |
# name ='Fedratinib' | |
# smi = 'CC1=CN=C(N=C1NC2=CC(=CC=C2)S(=O)(=O)NC(C)(C)C)NC3=CC=C(C=C3)OCCN4CCCC4' | |
# enzyme = st.multiselect( | |
# 'Choose JAK kinase: ', | |
# ['JAK1', 'JAK2', 'JAK3', 'TYK2']) | |
# if enzyme == None: | |
# enzyme = 'JAK1' | |
st.write('Select JAK kinase: ') | |
JAK1 = st.checkbox('JAK1') | |
JAK2 = st.checkbox('JAK2') | |
JAK3 = st.checkbox('JAK3') | |
TYK2 = st.checkbox('TYK2') | |
all_enzyme = st.checkbox('Select all enzymes') | |
enzyme = [] | |
if JAK1 == True: | |
enzyme.append('JAK1') | |
if JAK2 == True: | |
enzyme.append('JAK2') | |
if JAK3 == True: | |
enzyme.append('JAK3') | |
if TYK2 == True: | |
enzyme.append('TYK2') | |
if all_enzyme == True: | |
enzyme = ['JAK1', 'JAK2', 'JAK3', 'TYK2'] | |
# model = st.multiselect( | |
# 'Choose model: ', | |
# ['knn','SVM_linear', 'SVM_poly', 'SVM_rbf', 'SVM_sigmoid', 'XGBoost']) | |
model = [] | |
st.write('Select model: ') | |
knn = st.checkbox('KNN') | |
SVM_linear = st.checkbox('SVM_linear') | |
SVM_poly = st.checkbox('SVM_poly') | |
SVM_rbf = st.checkbox('SVM_rbf') | |
SVM_sigmoid = st.checkbox('SVM_sigmoid') | |
RF = st.checkbox('RF') | |
XGBoost = st.checkbox('XGBoost') | |
CNN = st.checkbox('CNN') | |
GVAE = st.checkbox('GraphVAE') | |
chembert = st.checkbox('chemBERTa') | |
all_model = st.checkbox('Select all models') | |
if knn == True: | |
model.append('knn') | |
if SVM_linear == True: | |
model.append('SVM_linear') | |
if SVM_poly == True: | |
model.append('SVM_poly') | |
if SVM_rbf == True: | |
model.append('SVM_rbf') | |
if SVM_sigmoid == True: | |
model.append('SVM_sigmoid') | |
if RF == True: | |
model.append('RF') | |
if XGBoost == True: | |
model.append('XGBoost') | |
if CNN == True: | |
model.append('CNN') | |
if GVAE == True: | |
model.append('GVAE') | |
if chembert == True: | |
model.append('chembert') | |
if all_model == True: | |
model = ['knn', 'SVM_linear', 'SVM_poly', 'SVM_rbf', 'SVM_sigmoid', 'RF', 'XGBoost', 'CNN', 'GVAE', 'chembert'] | |
return name, smi, enzyme, model | |
with st.sidebar: | |
name, smi, enzyme, model_chosen = user_input_features() | |
st.subheader('User Input parameters:') | |
st.write('Current compound: ', name) | |
st.write('Current compound SMILE: ', smi) | |
st.write('Selected kinase:', enzyme) | |
st.write('Selected model: ', model_chosen) | |
if st.button('Start Prediction'): | |
if model_chosen==[]: | |
st.write('Did not choose model!') | |
if enzyme==[]: | |
st.write('Did not choose JAK kinase!') | |
if smi=='': | |
st.write('NO SMILES input!') | |
elif smi != '' and model_chosen !=[] and enzyme != []: | |
try: # TEST WHETHER SMILES STRING IS VALID | |
MACCS_list = smile_list_to_MACCS([smi]) | |
header = ['bit' + str(i) for i in range(167)] | |
df = pd.DataFrame(MACCS_list,columns=header) | |
maccs = df.values | |
valid_smi = True | |
except: | |
st.write('Invalid compound SMILES! ') | |
valid_smi = False | |
try: | |
if valid_smi == True: | |
row_num = len(enzyme) | |
col_num = len(model_chosen) | |
prediction = [] | |
df = pd.DataFrame() | |
for jak in enzyme: | |
for ml in model_chosen: | |
modelname = ml + '_' + jak + '.sav' | |
try: | |
if ml != 'GVAE' and ml != 'CNN': | |
model = pickle.load(open(model_path+modelname, 'rb')) | |
pred = model.predict(maccs) | |
elif ml == 'GVAE': | |
pred = GVAE_pred(smi, jak) | |
elif ml == 'CNN': | |
prob, pred = CNN_predict(jak, smi) | |
label =['noninhibitor', 'inhibitor'] | |
# st.write(jak, ' ', ml, ' prediction is ', label[int(pred)]) | |
prediction.append(label[int(pred)]) | |
# st.write(jak, ' ', ml) | |
except: | |
if ml != 'GVAE' and ml != 'CNN': | |
st.write(modelname, ' cannot be loaded') | |
elif ml == 'GVAE' or ml == 'CNN': | |
st.write('CANNOT LOAD ', ml, ' for ', jak) | |
prediction.append('NA') | |
# try: | |
# pred_prob = model.predict_proba(maccs) | |
# # st.write(jak, ' ', ml, ' prediction is ', pred_prob) | |
# except: | |
# pass | |
# st.write('cannot predict_proba') | |
vec = np.array(prediction) | |
df = pd.DataFrame(vec.reshape(-1, col_num)) | |
df.columns = model_chosen | |
df.index = enzyme | |
if name == '': | |
name = 'test compound' | |
title = 'Evaluation report for ' + name | |
st.subheader(title) | |
# st.write('Compound name: ', name) | |
# st.write('Compound SMILES: ', smi) | |
# df.loc[len(df)] = prediction | |
st.write(df) | |
except: | |
st.write('CANNOT FINISH PREDICTION') | |