3 / app.py
ligdis's picture
Update app.py
7c429fd verified
# regular imports
import os
import sys
import csv
import collections
import pandas as pd
import streamlit as st
import json
import gc
import requests
from PIL import Image
from io import BytesIO
from io import StringIO
from datasets import load_dataset
st.set_page_config(
page_title="Ligand Discovery 3: Protein-set Enrichment Analysis",
page_icon=":home:",
layout="wide", # "centered",
initial_sidebar_state="expanded"
)
st.markdown("""
<style>
.css-13sdm1b.e16nr0p33 {
margin-top: -75px;
}
</style>
""", unsafe_allow_html=True)
hide_streamlit_style = """
<style>
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
#header {visibility: hidden;}
</style>
"""
st.markdown(hide_streamlit_style, unsafe_allow_html=True)
proteins_set = None
ROOT = os.path.abspath(os.path.dirname(__file__))
# TMP = os.path.join(ROOT, "tmp")
# if not os.path.exists(TMP):
# os.mkdir(TMP)
MIN_SET_SIZE = 1
PROFILE_TYPE = "Fragment"
OVERVIEW_PVALUE_CUTOFF = 0.05
# relative imports
# sys.path.append(os.path.join(ROOT, "../src/"))
# from util import listdir_util
def listdir_util(path):
for d in os.listdir(path):
if d.startswith("_"):
continue
else:
yield d
# import metadata
from proteome_meta import task_suf
from proteome_meta import annotation_type_dict
from proteome_meta import annotation_dict
from proteome_meta import universe_dict
# path to results and original data
PATH = os.path.abspath(os.path.join(ROOT, "../results/proteins/"))
DATA = os.path.abspath(os.path.join(ROOT, "../data"))
DATA2 = 'ligdis/data'
mySeparator = "/"
CACHE = os.path.abspath(os.path.join(ROOT, "../cache"))
# generic inputs
# protein id to gene name
dataset = load_dataset('ligdis/data', data_files={"general/pid2name_primary.tsv"}, delimiter='\t')
df = dataset['train'].to_pandas()
pid2name = dict(zip(df.iloc[:, 0], df.iloc[:, 1]))
name2pid = dict(zip(df.iloc[:, 1], df.iloc[:, 0]))
del dataset, df # Delete the variable
gc.collect()
def pid2gene(x):
if x in pid2name:
return pid2name[x]
else:
return x
def gene2pid(x):
if x in name2pid:
return name2pid[x]
else:
return x
def pretty_term(x):
x = x.title()
if x.endswith("]"):
x = x.split(" [")[0]
return x
def hf_tsv_2_pandas_df(hf_repo, data_file, myHeader):
url = '/'.join(("https://huggingface.co/datasets", hf_repo, "resolve/main", data_file))
response = requests.get(url)
if response.status_code == 200:
tsv_data = StringIO(response.text) # Use StringIO to treat the string content as a file-like object
df = pd.read_csv(tsv_data, sep='\t', header = myHeader) # Load the TSV file into a pandas DataFrame
else:
df = pd.DataFrame()
st.write("Error loading dataset from hf_repo: ", hf_repo, " and data_file: ", data_file)
return(df)
def load_hf_json(json_url):
response = requests.get(json_url)
if response.status_code == 200:
out = response.json()
else:
print("Failed to retrieve ", json_url, " file. HTTP Status Code: ", response.status_code)
return(out)
def load_hf_image(image_url):
response = requests.get(image_url)
if response.status_code == 200:
img = Image.open(BytesIO(response.content))
else:
print("Failed to retrieve image. HTTP Status Code:", response.status_code)
return(img)
# side bar
st.sidebar.title("Ligand Discovery 3: Protein-set Enrichment Analysis")
# signatures (aka profiles)
st.sidebar.header("Select a fragment")
profile_type = PROFILE_TYPE
profile_type_subfolder = profile_type.lower()
# @st.cache_data
# def get_sorted_fids():
# fids = []
# for fid in listdir_util(os.path.join(DATA, "signatures", "proteins", "fragment")):
# fids += [fid]
# fids = sorted(fids)
# return fids
with open("fid.txt", "r") as file:
lines = file.readlines()
# Remove the newline characters (\n) from each line
fids = [line.strip() for line in lines]
# fids = get_sorted_fids()
profile = st.sidebar.selectbox("Fragment identifier", options=fids)
profile_subfolder = profile
all_cases = fids
draw_fragment = True
st.sidebar.header("Choose a type of analysis")
type_of_analysis = st.sidebar.radio(
"Type of analysis", options=["Overview", "Detailed"]
)
# OVERVIEW TYPE OF ANALYSYS
if type_of_analysis == "Overview":
st.header("Enrichment overview for {0} {1}".format(profile_type.lower(), profile))
view = st.sidebar.radio("Select View", options=["Table", "Plot"])
df = hf_tsv_2_pandas_df(hf_repo="ligdis/cache_overview", data_file="{0}.tsv".format(profile), myHeader=0)
# df = pd.read_csv(os.path.join(CACHE, "overview", "{0}.tsv".format(profile)), sep="\t")
if view == "Table":
columns = st.columns(4)
prot2idx = collections.defaultdict(list)
for i,r in enumerate(list(df["edge"])):
for x in r.split(","):
gn = pid2gene(x)
prot2idx[gn] += [i]
all_proteins_ = sorted(prot2idx.keys())
ann2idx = collections.defaultdict(list)
for i,r in enumerate(df["term"]):
ann2idx[r] += [i]
all_annotations_ = sorted(ann2idx.keys())
type2idx = collections.defaultdict(list)
for i,r in enumerate(list(df["type"])):
type2idx[r] += [i]
all_types_ = sorted(type2idx.keys())
subtype2idx = collections.defaultdict(list)
for i,r in enumerate(list(df["subtype"])):
subtype2idx[r] += [i]
all_subtypes_ = sorted(subtype2idx.keys())
selected_proteins = columns[0].multiselect("Filter by proteins in leading edge ({0} unique proteins)".format(len(all_proteins_)), options=all_proteins_)
selected_annotations = columns[1].multiselect("Select annotations", options=all_annotations_)
selected_subtypes = columns[2].multiselect("Filter by annotation subtype", options=all_subtypes_)
selected_types = columns[3].multiselect("Filter by annotation type", options=all_types_)
keep_idxs = []
if selected_proteins is not None:
for x in selected_proteins:
for idx in prot2idx[x]:
keep_idxs += [idx]
if selected_annotations is not None:
for x in selected_annotations:
for idx in ann2idx[x]:
keep_idxs += [idx]
if selected_subtypes is not None:
for x in selected_subtypes:
for idx in subtype2idx[x]:
keep_idxs += [idx]
if selected_types is not None:
for x in selected_types:
for idx in type2idx[x]:
keep_idxs += [idx]
if keep_idxs:
keep_idxs = sorted(set(keep_idxs))
df = df.iloc[keep_idxs]
df["edge_genes"] = [" ".join([pid2gene(x) for x in r.split(",")]) for r in list(df["edge"])]
df_view = df[["term", "overlap", "setsize", "score", "pval", "edge_genes", "subtype", "type"]]
df_view = df_view.rename(columns = {
"term": "Term",
"overlap": "Edge size",
"setsize": "Set size",
"score": "Score",
"pval": "P-value",
"edge_genes": "Leading edge",
"subtype": "Category subtype",
"type": "Category type"
})
df_view["rank"] = [i+1 for i in range(df_view.shape[0])]
df_view = df_view.set_index("rank")
st.dataframe(df_view.reset_index(drop=True), height=2000)
else:
# st.image(os.path.join(CACHE, "overview", "{0}.png".format(profile)))
image_url = ''.join(("https://huggingface.co/datasets/ligdis/cache_overview/resolve/main/", "{0}.png".format(profile), "?download=true")) # Replace with actual URL
st.image(image_url)
## DETAILED TYPE OF ANALYSIS
else:
def annotations_selector():
st.sidebar.header("Select protein annotation category")
annotation_types = [
"Sequence",
"Functions",
"Processes and pathways",
"Localization",
"Drugs and Diseases",
]
annotation_type = st.sidebar.radio("Type of annotation", annotation_types)
annotations = annotation_type_dict[annotation_type]
annotation = st.sidebar.selectbox("Annotation source", options=annotations)
annotation_subfolder = annotation_dict[annotation]
return annotation, annotation_subfolder, annotation_type, annotations
def universe_selector():
preselected="HEK293T Core"
universe = preselected
universe_subfolder = universe_dict[universe]
return universe, universe_subfolder
annotation, annotation_subfolder, annotation_type, annotations = (
annotations_selector()
)
universe, universe_subfolder = universe_selector()
st.header("Fragment: {0} & Category: {2} ({1})".format(profile_subfolder, annotation_type, annotation))
# cache_folder = os.path.join(CACHE, "detailed", profile_subfolder, annotation_subfolder)
cache_folder = '/'.join(("https://huggingface.co/datasets/ligdis", '_'.join(("cache_detailed", profile_subfolder)), "resolve/main", annotation_subfolder ))
# read metrics
metrics_json_url = '/'.join((cache_folder, "metrics.json"))
metrics = load_hf_json(metrics_json_url)
# with open(os.path.join(cache_folder, "metrics.json"), "r") as f:
# metrics = json.load(f)
metric_cols = st.columns(3)
metric_cols[0].metric(
"{0} profile: {1}".format(profile_type, profile),
value="{0} proteins".format(metrics["signature_size"]),
)
metric_cols[1].metric(
"{0}: {1}".format(annotation_type, annotation),
value="{0} categories".format(metrics["annotations_size"]),
)
metric_cols[2].metric(metrics["title"], value=round(metrics["value"], 2))
columns = st.columns(6)
view = columns[0].radio("View", options=["Tables", "Basic plots", "Advanced plots"])
if view == "Tables":
p_value_cutoff = columns[2].number_input("P-value cutoff", value=0.05, min_value=0., max_value=1., format="%.3f")
min_edge_size = columns[3].number_input("Minimum leading edge size", value=5, min_value=0, max_value=10000)
max_edge_size = columns[4].number_input("Maximum leading edge size", value=5000, min_value=1, max_value=10000)
protein_label = "Gene Name"
if protein_label == "Gene Name":
convert_to_gene = True
else:
convert_to_gene = False
# available_selections = json.load(open(os.path.join(cache_folder, "selections.json"), "r"))
selections_json_url = '/'.join((cache_folder, "selections.json"))
available_selections = load_hf_json(selections_json_url)
all_annotations = available_selections["all_annotations"]
available_proteins = available_selections["available_proteins"]
select_columns = st.columns(3)
selected_annotations = select_columns[2].multiselect(
"Select annotation categories", options=available_proteins
)
selected_proteins = select_columns[0].multiselect(
"Filter by proteins found in at least one annotation term ({0})".format(
len(available_proteins)
),
options=available_proteins,
)
task_filename = ''.join((profile, "_val_log2fc.tsv"))
ligdis_annotations_repo = '/'.join(('ligdis', annotation_subfolder))
annotations_json = '/'.join((profile_type_subfolder, profile_subfolder, task_filename.split(".tsv")[0], 'annotations.json'))
annotations_json_url = ''.join(("https://huggingface.co/datasets/", ligdis_annotations_repo, "/resolve/main/", annotations_json))
annotations_ = load_hf_json(annotations_json_url)
if selected_proteins:
if convert_to_gene:
selected_proteins = [gene2pid(x) for x in selected_proteins]
selected_proteins = set(selected_proteins)
if not selected_annotations:
for k, v in annotations_.items():
if len(selected_proteins.intersection(v)) > 0:
selected_annotations += [k]
if not selected_annotations:
st.warning(
"No available annotations for any of your proteins of interest..."
)
# result = pd.read_csv(os.path.join(cache_folder, "result.tsv"), sep="\t")
ligdis_cache_detailed_fragment_repo = '_'.join(("ligdis/cache_detailed", profile_subfolder))
result_file = '/'.join((annotation_subfolder, "result.tsv"))
result = hf_tsv_2_pandas_df(hf_repo = ligdis_cache_detailed_fragment_repo, data_file = result_file, myHeader=0)
result = result[result["leading_edge_size"] >= min_edge_size]
result = result[result["leading_edge_size"] <= max_edge_size]
result = result.reset_index(drop=True)
leading_proteins = available_selections["leading_proteins"]
selected_leading_proteins = select_columns[1].multiselect(
"Filter by proteins found in at least one leading edge",
options = leading_proteins)
if selected_leading_proteins:
prot2idx = collections.defaultdict(list)
for i, r in enumerate(list(result["leading_edge"])):
if str(r) == "nan":
continue
for x in r.split(","):
prot2idx[pid2gene(x)] += [i]
idxs = []
for v in selected_leading_proteins:
for x in prot2idx[v]:
idxs += [x]
idxs = sorted(set(idxs))
result = result.iloc[idxs]
# df_merge = pd.read_csv(os.path.join(cache_folder, "df_merge.tsv"), sep="\t")
df_merge_file = '/'.join((annotation_subfolder, "df_merge.tsv"))
df_merge = hf_tsv_2_pandas_df(hf_repo=ligdis_cache_detailed_fragment_repo, data_file=df_merge_file, myHeader=0)
type_of_task = metrics["type_of_task"]
if type_of_task == "ranksum":
sort_by = "NES"
if sort_by == "NES":
sort_by_nes = True
else:
sort_by_nes = False
direction = "Up"
if direction == "Up":
is_up = True
else:
is_up = False
df = result.copy()
df = df.rename(columns = {"Term": "term"})
df_merge = df_merge[["term", "score_mean"]]
df = df.merge(df_merge, how="left", on="term")
df = df[df["leading_edge"].notnull()]
df["edge_genes"] = [" ".join([pid2gene(x) for x in r.split(",")]) for r in list(df["leading_edge"])]
df = df[["term","leading_edge_size", "geneset_size", "nes", "pval", "fdr", "score_mean", "edge_genes", "leading_edge"]]
if selected_annotations:
df = df[df["term"].isin(selected_annotations)]
if is_up:
df = df[df["nes"] >= 0]
else:
df = df[df["nes"] < 0]
if sort_by_nes:
if is_up:
df = df.sort_values(by="nes", ascending=False)
else:
df = df.sort_values(by="nes", ascending=True)
else:
df = df.sort_values(by="pval")
df = df.reset_index(drop=True)
df = df.rename(columns = {
"term": "Term",
"leading_edge_size": "Edge size",
"geneset_size": "Set size",
"nes": "Score",
"pval": "P-value",
"fdr": "FDR",
"score_mean": "Mean score",
"edge_genes": "Leading edge",
})
st.dataframe(df[[c for c in list(df.columns)[:-1] if c != "Mean score"]].reset_index(drop=True))
term = st.selectbox("Explore term...", df["Term"])
if term is not None:
# signature_ori = pd.read_csv(os.path.join(results_path, "signature.tsv"), delimiter="\t", header=None)
ligdis_ontology_repo = '/'.join(("ligdis", annotation_subfolder))
ontology_signature_file = '/'.join((profile_type_subfolder, profile_subfolder, task_filename.split(".tsv")[0], "signature.tsv"))
signature_ = hf_tsv_2_pandas_df(hf_repo=ligdis_ontology_repo, data_file=ontology_signature_file, myHeader=None )
# signature_file = os.path.abspath(os.path.join(DATA,"signatures","proteins",profile_type_subfolder,profile_subfolder,task_filename))
ligdis_data_repo = '/'.join(("ligdis", "data"))
fragment_signature_file = '/'.join(("signatures/proteins/fragment", profile_subfolder, task_filename))
# Explore term
t_values = {}
for r in signature_.values:
t_values[r[0]] = r[1]
o_values = {}
# signature_original = pd.read_csv(signature_file, delimiter="\t", header=None)
signature_original = hf_tsv_2_pandas_df(hf_repo=ligdis_data_repo, data_file=fragment_signature_file, myHeader=None)
for r in signature_original.values:
o_values[r[0]] = r[1]
cols = st.columns([0.15, 1])
col = cols[0]
annotations_size = len(annotations_[term])
signature_size = len(signature_)
df_filt = df[df["Term"] == term]
leading_edge = list(df_filt["leading_edge"])[0]
if str(leading_edge) == "nan":
leading_edge = []
else:
leading_edge = leading_edge.split(",")
display_proteins = col.radio(
"Display proteins",
[
"Leading edge ({0})".format(len(leading_edge)),
"In category ({0})".format(annotations_size),
"Full profile ({0})".format(signature_size),
],
)
if "Leading" in display_proteins:
proteins = leading_edge
elif "category" in display_proteins:
proteins = annotations_[term]
else:
proteins = signature_[0]
o_values = [o_values[pid] for pid in proteins]
t_values = [t_values[pid] for pid in proteins]
proteins_set = set(proteins)
if convert_to_gene:
genes = [pid2gene(x) for x in proteins]
label = "Gene Name"
else:
label = "UniProtAC"
dl = pd.DataFrame(
{"Gene Name": genes, "UniProt AC": proteins, "Log2FC": o_values, "Z-score": t_values}
)
sort_by = col.radio(
"Sort proteins", ["By Z-score", "Alphabetically"]
)
if sort_by != "Alphabetically":
if is_up:
dl = dl.sort_values("Z-score", ascending=False)
else:
dl = dl.sort_values("Z-score", ascending=True)
else:
dl = dl.sort_values(label)
dl = dl.reset_index(drop=True)
col = cols[1]
col.dataframe(dl.reset_index(drop=True))
if view == "Basic plots":
top_plots_number = columns[1].number_input("Maximum number of plots", value=12, min_value=1, max_value=50)
plot_columns = st.columns(4)
# with open(os.path.join(cache_folder, "basic", "idx2term.json"), "r") as f:
# idx2term = json.load(f)
idx2term_json_url = '/'.join((cache_folder, "basic", "idx2term.json"))
idx2term = load_hf_json(idx2term_json_url)
idxs = [i for i in range(len(idx2term))]
i = 0
j = 0
for idx in idxs:
if i == len(plot_columns):
i = 0
col = plot_columns[i]
if j == top_plots_number:
break
# col.image(os.path.join(cache_folder, "basic", "plot_{0}.png".format(idx)))
image_url = '/'.join((cache_folder, "basic", "plot_{0}.png".format(idx)))
col.image(image_url) # Show the image
i += 1
j += 1
if view == "Advanced plots":
top_plots_number = columns[1].number_input("Maximum number of plots", value=5, min_value=1, max_value=10)
# with open(os.path.join(cache_folder, "advanced", "idx2term.json"), "r") as f:
# idx2term = json.load(f)
idx2term_json_url = '/'.join((cache_folder, "advanced", "idx2term.json"))
idx2term = load_hf_json(idx2term_json_url)
idxs = [i for i in range(len(idx2term))]
j = 0
for idx in idxs:
if j == top_plots_number:
break
# st.image(os.path.join(cache_folder, "advanced", "plot_{0}.png".format(idx)))
image_url = '/'.join((cache_folder, "advanced", "plot_{0}.png".format(idx)))
st.image(image_url) # Show the image
j += 1