|
import os |
|
import sys |
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) |
|
import protac_degradation_predictor as pdp |
|
|
|
from collections import defaultdict |
|
import warnings |
|
import logging |
|
from typing import Literal |
|
|
|
from sklearn.preprocessing import OrdinalEncoder |
|
from tqdm import tqdm |
|
import pandas as pd |
|
import numpy as np |
|
import pytorch_lightning as pl |
|
from rdkit import DataStructs |
|
|
|
|
|
root = logging.getLogger() |
|
root.setLevel(logging.DEBUG) |
|
|
|
handler = logging.StreamHandler(sys.stdout) |
|
handler.setLevel(logging.DEBUG) |
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
handler.setFormatter(formatter) |
|
root.addHandler(handler) |
|
|
|
|
|
def get_random_split_indices(active_df: pd.DataFrame, test_split: float) -> pd.Index: |
|
""" Get the indices of the test set using a random split. |
|
|
|
Args: |
|
active_df (pd.DataFrame): The DataFrame containing the active PROTACs. |
|
test_split (float): The percentage of the active PROTACs to use as the test set. |
|
|
|
Returns: |
|
pd.Index: The indices of the test set. |
|
""" |
|
test_df = active_df.sample(frac=test_split, random_state=42) |
|
return test_df.index |
|
|
|
|
|
def get_e3_ligase_split_indices(active_df: pd.DataFrame) -> pd.Index: |
|
""" Get the indices of the test set using the E3 ligase split. |
|
|
|
Args: |
|
active_df (pd.DataFrame): The DataFrame containing the active PROTACs. |
|
|
|
Returns: |
|
pd.Index: The indices of the test set. |
|
""" |
|
encoder = OrdinalEncoder() |
|
active_df['E3 Group'] = encoder.fit_transform(active_df[['E3 Ligase']]).astype(int) |
|
test_df = active_df[(active_df['E3 Ligase'] != 'VHL') & (active_df['E3 Ligase'] != 'CRBN')] |
|
return test_df.index |
|
|
|
|
|
def get_smiles2fp_and_avg_tanimoto(protac_df: pd.DataFrame) -> tuple: |
|
""" Get the SMILES to fingerprint dictionary and the average Tanimoto similarity. |
|
|
|
Args: |
|
protac_df (pd.DataFrame): The DataFrame containing the PROTACs. |
|
|
|
Returns: |
|
tuple: The SMILES to fingerprint dictionary and the average Tanimoto similarity. |
|
""" |
|
unique_smiles = protac_df['Smiles'].unique().tolist() |
|
|
|
smiles2fp = {} |
|
for smiles in tqdm(unique_smiles, desc='Precomputing fingerprints'): |
|
smiles2fp[smiles] = pdp.get_fingerprint(smiles) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tanimoto_matrix = defaultdict(list) |
|
fps = list(smiles2fp.values()) |
|
|
|
|
|
for i, (smiles1, fp1) in enumerate(tqdm(zip(unique_smiles, fps), desc='Computing Tanimoto similarity', total=len(fps))): |
|
similarities = DataStructs.BulkTanimotoSimilarity(fp1, fps[i:]) |
|
for j, similarity in enumerate(similarities): |
|
distance = 1 - similarity |
|
tanimoto_matrix[smiles1].append(distance) |
|
if i != i + j: |
|
tanimoto_matrix[unique_smiles[i + j]].append(distance) |
|
|
|
|
|
avg_tanimoto = {k: np.mean(v) for k, v in tanimoto_matrix.items()} |
|
protac_df['Avg Tanimoto'] = protac_df['Smiles'].map(avg_tanimoto) |
|
|
|
smiles2fp = {s: np.array(fp) for s, fp in smiles2fp.items()} |
|
|
|
return smiles2fp, protac_df |
|
|
|
|
|
def get_tanimoto_split_indices( |
|
active_df: pd.DataFrame, |
|
active_col: str, |
|
test_split: float, |
|
n_bins_tanimoto: int = 200, |
|
) -> pd.Index: |
|
""" Get the indices of the test set using the Tanimoto-based split. |
|
|
|
Args: |
|
active_df (pd.DataFrame): The DataFrame containing the active PROTACs. |
|
n_bins_tanimoto (int): The number of bins to use for the Tanimoto similarity. |
|
|
|
Returns: |
|
pd.Index: The indices of the test set. |
|
""" |
|
tanimoto_groups = pd.cut(active_df['Avg Tanimoto'], bins=n_bins_tanimoto).copy() |
|
encoder = OrdinalEncoder() |
|
active_df['Tanimoto Group'] = encoder.fit_transform(tanimoto_groups.values.reshape(-1, 1)).astype(int) |
|
|
|
|
|
tanimoto_groups = active_df.groupby('Tanimoto Group')['Avg Tanimoto'].mean().sort_values(ascending=False).index |
|
|
|
test_df = [] |
|
|
|
|
|
|
|
|
|
for group in tanimoto_groups: |
|
group_df = active_df[active_df['Tanimoto Group'] == group] |
|
if test_df == []: |
|
test_df.append(group_df) |
|
continue |
|
|
|
num_entries = len(group_df) |
|
num_active_group = group_df[active_col].sum() |
|
num_inactive_group = num_entries - num_active_group |
|
|
|
tmp_test_df = pd.concat(test_df) |
|
num_entries_test = len(tmp_test_df) |
|
num_active_test = tmp_test_df[active_col].sum() |
|
num_inactive_test = num_entries_test - num_active_test |
|
|
|
|
|
if num_entries_test + num_entries < test_split * len(active_df): |
|
|
|
if num_entries_test + num_entries < test_split / 2 * len(active_df): |
|
test_df.append(group_df) |
|
continue |
|
|
|
|
|
if (num_active_group + num_active_test) / (num_entries_test + num_entries) < 0.6: |
|
if (num_inactive_group + num_inactive_test) / (num_entries_test + num_entries) < 0.6: |
|
test_df.append(group_df) |
|
test_df = pd.concat(test_df) |
|
return test_df.index |
|
|
|
|
|
def get_target_split_indices(active_df: pd.DataFrame, active_col: str, test_split: float) -> pd.Index: |
|
""" Get the indices of the test set using the target-based split. |
|
|
|
Args: |
|
active_df (pd.DataFrame): The DataFrame containing the active PROTACs. |
|
active_col (str): The column containing the active/inactive information. |
|
test_split (float): The percentage of the active PROTACs to use as the test set. |
|
|
|
Returns: |
|
pd.Index: The indices of the test set. |
|
""" |
|
encoder = OrdinalEncoder() |
|
active_df['Uniprot Group'] = encoder.fit_transform(active_df[['Uniprot']]).astype(int) |
|
|
|
test_df = [] |
|
|
|
|
|
|
|
|
|
|
|
for group in reversed(active_df['Uniprot'].value_counts().index): |
|
group_df = active_df[active_df['Uniprot'] == group] |
|
if test_df == []: |
|
test_df.append(group_df) |
|
continue |
|
|
|
num_entries = len(group_df) |
|
num_active_group = group_df[active_col].sum() |
|
num_inactive_group = num_entries - num_active_group |
|
|
|
tmp_test_df = pd.concat(test_df) |
|
num_entries_test = len(tmp_test_df) |
|
num_active_test = tmp_test_df[active_col].sum() |
|
num_inactive_test = num_entries_test - num_active_test |
|
|
|
|
|
if num_entries_test + num_entries < test_split * len(active_df): |
|
|
|
if num_entries_test + num_entries < test_split / 2 * len(active_df): |
|
test_df.append(group_df) |
|
continue |
|
|
|
|
|
if (num_active_group + num_active_test) / (num_entries_test + num_entries) < 0.6: |
|
if (num_inactive_group + num_inactive_test) / (num_entries_test + num_entries) < 0.6: |
|
test_df.append(group_df) |
|
test_df = pd.concat(test_df) |
|
return test_df.index |
|
|
|
|
|
def main( |
|
active_col: str = 'Active (Dmax 0.6, pDC50 6.0)', |
|
test_split: float = 0.1, |
|
studies: str | Literal['all', 'standard', 'e3_ligase', 'similarity', 'target'] = 'all', |
|
): |
|
""" Get and save the datasets for the different studies. |
|
|
|
Args: |
|
active_col (str): The column containing the active/inactive information. It should be in the format 'Active (Dmax N, pDC50 M)', where N and M are the thresholds float values for Dmax and pDC50, respectively. |
|
test_split (float): The percentage of the active PROTACs to use as the test set. |
|
studies (str): The type of studies to save dataset for. Options: 'all', 'standard', 'e3_ligase', 'similarity', 'target'. |
|
""" |
|
pl.seed_everything(42) |
|
|
|
|
|
active_name = active_col.replace(' ', '_').replace('(', '').replace(')', '').replace(',', '') |
|
|
|
|
|
Dmax_threshold = float(active_col.split('Dmax')[1].split(',')[0].strip('(').strip(')').strip()) |
|
pDC50_threshold = float(active_col.split('pDC50')[1].strip('(').strip(')').strip()) |
|
|
|
|
|
protac_df = pd.read_csv('../data/PROTAC-Degradation-DB.csv') |
|
|
|
protac_df['E3 Ligase'] = protac_df['E3 Ligase'].str.replace('Iap', 'IAP') |
|
protac_df[active_col] = protac_df.apply( |
|
lambda x: pdp.is_active(x['DC50 (nM)'], x['Dmax (%)'], pDC50_threshold=pDC50_threshold, Dmax_threshold=Dmax_threshold), axis=1 |
|
) |
|
_, protac_df = get_smiles2fp_and_avg_tanimoto(protac_df) |
|
|
|
|
|
test_indeces = {} |
|
active_df = protac_df[protac_df[active_col].notna()].copy() |
|
|
|
|
|
if 'Active - OR' in active_df.columns: |
|
active_df.drop(columns='Active - OR', inplace=True) |
|
|
|
if studies == 'standard' or studies == 'all': |
|
test_indeces['standard'] = get_random_split_indices(active_df, test_split) |
|
if studies == 'target' or studies == 'all': |
|
test_indeces['target'] = get_target_split_indices(active_df, active_col, test_split) |
|
if studies == 'e3_ligase' or studies == 'all': |
|
test_indeces['e3_ligase'] = get_e3_ligase_split_indices(active_df) |
|
if studies == 'similarity' or studies == 'all': |
|
test_indeces['similarity'] = get_tanimoto_split_indices(active_df, active_col, test_split) |
|
|
|
|
|
data_dir = '../data/studies' |
|
if not os.path.exists(data_dir): |
|
os.makedirs(data_dir) |
|
|
|
|
|
for split_type, indeces in test_indeces.items(): |
|
test_df = active_df.loc[indeces].copy() |
|
train_val_df = active_df[~active_df.index.isin(test_df.index)].copy() |
|
|
|
|
|
|
|
train_val_perc = f'{int((1 - test_split) * 100)}' |
|
test_perc = f'{int(test_split * 100)}' |
|
|
|
train_val_filename = f'{data_dir}/{split_type}_train_val_{train_val_perc}split_{active_name}.csv' |
|
test_filename = f'{data_dir}/{split_type}_test_{test_perc}split_{active_name}.csv' |
|
|
|
print('') |
|
print(f'Saving train_val datasets as: {train_val_filename}') |
|
print(f'Saving test datasets as: {test_filename}') |
|
|
|
train_val_df.to_csv(train_val_filename, index=False) |
|
test_df.to_csv(test_filename, index=False) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |