Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
# coding: utf-8 | |
# # IV. MARKERS TRESHOLDS NOTEBOOK | |
# ## IV.1. PACKAGES IMPORT | |
import os | |
import random | |
import re | |
import pandas as pd | |
import numpy as np | |
import seaborn as sb | |
import matplotlib.pyplot as plt | |
import matplotlib.colors as mplc | |
import subprocess | |
import warnings | |
import panel as pn | |
import json | |
from scipy import signal | |
from scipy.stats import pearsonr | |
import plotly.figure_factory as ff | |
import plotly | |
import plotly.graph_objs as go | |
from plotly.subplots import make_subplots | |
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot | |
import plotly.express as px | |
import sys | |
sys.setrecursionlimit(5000) | |
from my_modules import * | |
#Silence FutureWarnings & UserWarnings | |
warnings.filterwarnings('ignore', category= FutureWarning) | |
warnings.filterwarnings('ignore', category= UserWarning) | |
# ## IV.2. *DIRECTORIES | |
# Set base directory | |
#input_path = '/Users/harshithakolipaka/Downloads/wetransfer_data-zip_2024-05-17_1431' | |
#set_path = 'test' | |
present_dir = os.path.dirname(os.path.realpath(__file__)) | |
stored_variables_path = os.path.join(present_dir,'stored_variables.json') | |
with open(stored_variables_path, 'r') as file: | |
stored_vars = json.load(file) | |
directory = stored_vars['base_dir'] | |
input_path = os.path.join(present_dir,directory) | |
set_path = stored_vars['set_path'] | |
selected_metadata_files = stored_vars['selected_metadata_files'] | |
ls_samples = stored_vars['ls_samples'] | |
base_dir = input_path | |
set_name = set_path | |
project_name = set_name # Project name | |
step_suffix = 'mt' # Curent part (here part IV) | |
previous_step_suffix_long = "_zscore" # Previous part (here ZSCORE NOTEBOOK) | |
# Initial input data directory | |
input_data_dir = os.path.join(base_dir, project_name + previous_step_suffix_long) | |
# ZSCORE/LOG2 output directories | |
output_data_dir = os.path.join(base_dir, project_name + "_" + step_suffix) | |
# ZSCORE/LOG2 images subdirectory | |
output_images_dir = os.path.join(output_data_dir,"images") | |
# Data and Metadata directories | |
# Metadata directories | |
metadata_dir = os.path.join(base_dir, project_name + "_metadata") | |
# images subdirectory | |
metadata_images_dir = os.path.join(metadata_dir,"images") | |
# Create directories if they don't already exist | |
#for d in [base_dir, input_data_dir, output_data_dir, output_images_dir, metadata_dir, metadata_images_dir]: | |
# if not os.path.exists(d): | |
#print("Creation of the" , d, "directory...") | |
# os.makedirs(d) | |
#else : | |
# print("The", d, "directory already exists !") | |
#os.chdir(input_data_dir) | |
# Verify paths | |
#print('base_dir :', base_dir) | |
#print('input_data_dir :', input_data_dir) | |
#print('output_data_dir :', output_data_dir) | |
#print('output_images_dir :', output_images_dir) | |
#print('metadata_dir :', metadata_dir) | |
#print('metadata_images_dir :', metadata_images_dir) | |
# ## IV.3. FILES | |
# ### IV.3.1. METADATA | |
filename = "marker_intensity_metadata.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
#if not os.path.exists(filename): | |
# print("WARNING: Could not find desired file: "+filename) | |
#else : | |
# print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
metadata = pd.read_csv(filename) | |
# Verify size with verify_line_no() function in my_modules.py | |
#verify_line_no(filename, metadata.shape[0] + 1) | |
# Verify headers | |
exp_cols = ['Round','Target','Channel','target_lower','full_column','marker','localisation'] | |
compare_headers(exp_cols, metadata.columns.values, "Marker metadata file") | |
metadata = metadata.dropna() | |
metadata.head() | |
# ### IV.3.2. NOT_INTENSITIES | |
filename = "not_intensities.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
#if not os.path.exists(filename): | |
# print("WARNING: Could not find desired file: "+filename) | |
#else : | |
# print("The",filename,"file was imported for further analysis!") | |
not_intensities = [] | |
with open(filename, 'r') as fh: | |
not_intensities = fh.read().strip().split("\n") | |
# take str, strip whitespace, split on new line character | |
# Verify size | |
#print("\nVerifying data read from file is the correct length...\n") | |
#verify_line_no(filename, len(not_intensities)) | |
# Print to console | |
#print("not_intensities =\n", not_intensities) | |
# ### IV.3.3. FULL_TO_SHORT_COLUMN_NAMES | |
filename = "full_to_short_column_names.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
#if not os.path.exists(filename): | |
# print("WARNING: Could not find desired file: " + filename) | |
#else : | |
# print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
df = pd.read_csv(filename, header = 0) | |
# Verify size | |
print("Verifying data read from file is the correct length...\n") | |
#verify_line_no(filename, df.shape[0] + 1) | |
# Turn into dictionary | |
full_to_short_names = df.set_index('full_name').T.to_dict('records')[0] | |
#print('full_to_short_names =\n',full_to_short_names) | |
# ### IV.3.4. SHORT_TO_FULL_COLUMN_NAMES | |
filename = "short_to_full_column_names.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
#if not os.path.exists(filename): | |
# print("WARNING: Could not find desired file: " + filename) | |
#else : | |
# print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
df = pd.read_csv(filename, header = 0) | |
# Verify size | |
#print("Verifying data read from file is the correct length...\n") | |
#verify_line_no(filename, df.shape[0] + 1) | |
# Turn into dictionary | |
short_to_full_names = df.set_index('short_name').T.to_dict('records')[0] | |
# Print information | |
#print('short_to_full_names =\n',short_to_full_names) | |
# ### IV.3.10. DATA | |
# List files in the directory | |
# Check if the directory exists | |
if os.path.exists(input_data_dir): | |
# List files in the directory | |
ls_samples = [sample for sample in os.listdir(input_data_dir) if sample.endswith("_zscore.csv")] | |
# print("The following CSV files were detected:") | |
# print([sample for sample in ls_samples]) | |
#else: | |
# print(f"The directory {input_data_dir} does not exist.") | |
# Import all the others files | |
dfs = {} | |
# Set variable to hold default header values | |
# First gather information on expected headers using first file in ls_samples | |
# Read in the first row of the file corresponding to the first sample (index = 0) in ls_samples | |
df = pd.read_csv(os.path.join(input_data_dir, ls_samples[0]) , index_col = 0, nrows = 1) | |
expected_headers = df.columns.values | |
#print('Header order should be :\n', expected_headers, '\n') | |
############################### | |
# !! This may take a while !! # | |
############################### | |
for sample in ls_samples: | |
file_path = os.path.join(input_data_dir,sample) | |
try: | |
# Read the CSV file | |
df = pd.read_csv(file_path, index_col=0) | |
# Check if the DataFrame is empty, if so, don't continue trying to process df and remove it | |
if not df.empty: | |
# Reorder the columns to match the expected headers list | |
df = df.reindex(columns=expected_headers) | |
# print(sample, "file is processed !\n") | |
#print(df) | |
except pd.errors.EmptyDataError: | |
# print(f'\nEmpty data error in {sample} file. Removing from analysis...') | |
ls_samples.remove(sample) | |
# Add df to dfs | |
dfs[sample] = df | |
#print(dfs) | |
# Merge dfs into one df | |
df = pd.concat(dfs.values(), ignore_index=False , sort = False) | |
del dfs | |
print(df.head()) | |
intial_df = pn.pane.DataFrame(df.head(40), width = 2500) | |
# ### Marker Classification | |
# ## IV.5. *DOTPLOTS | |
df | |
# Load existing data from stored_variables.json with error handling | |
try: | |
with open(stored_variables_path, 'r') as file: | |
data = json.load(file) | |
except json.JSONDecodeError as e: | |
# print(f"Error reading JSON file: {e}") | |
data = {} | |
# Debug: Print loaded data to verify keys | |
#print(data) | |
df | |
df.head() | |
# ### IV.7.2. DOTPLOTS-DETERMINED TRESHOLD | |
#Empty dict in stored_variables to store the cell type classification for each marker | |
#stored_variables_path = '/Users/harshithakolipaka/Downloads/stored_variables.json' | |
try: | |
with open(stored_variables_path, 'r') as f: | |
stored_variables = json.load(f) | |
except FileNotFoundError: | |
stored_variables = {} | |
# Check if 'thresholds' field is present, if not, add it | |
if 'cell_type_classification' not in stored_variables: | |
cell_type_classification = {} | |
stored_variables['cell_type_classification'] = cell_type_classification | |
with open(stored_variables_path, 'w') as f: | |
json.dump(stored_variables, f, indent=4) | |
#Empty dict in stored_variables to store the cell subtype classification for each marker | |
#stored_variables_path = '/Users/harshithakolipaka/Downloads/stored_variables.json' | |
try: | |
with open(stored_variables_path, 'r') as f: | |
stored_variables = json.load(f) | |
except FileNotFoundError: | |
stored_variables = {} | |
# Check if 'thresholds' field is present, if not, add it | |
if 'cell_subtype_classification' not in stored_variables: | |
cell_type_classification = {} | |
stored_variables['cell_subtype_classification'] = cell_type_classification | |
with open(stored_variables_path, 'w') as f: | |
json.dump(stored_variables, f, indent=4) | |
df | |
data = df | |
import json | |
import panel as pn | |
# Load existing stored variables | |
with open(stored_variables_path, 'r') as f: | |
stored_variables = json.load(f) | |
# Initialize a dictionary to hold threshold inputs | |
threshold_inputs = {} | |
# Create widgets for each marker to get threshold inputs from the user | |
for marker in stored_variables['markers']: | |
threshold_inputs[marker] = pn.widgets.FloatInput(name=f'{marker} Threshold', value=0.0, step=0.1) | |
# Load stored_variables.json | |
#stored_variables_path = '/Users/harshithakolipaka/Downloads/stored_variables.json' | |
try: | |
with open(stored_variables_path, 'r') as f: | |
stored_variables = json.load(f) | |
except FileNotFoundError: | |
stored_variables = {} | |
# Check if 'thresholds' field is present, if not, add it | |
if 'thresholds' not in stored_variables: | |
thresholds = {marker: input_widget.value for marker, input_widget in threshold_inputs.items()} | |
stored_variables['thresholds'] = thresholds | |
with open(stored_variables_path, 'w') as f: | |
json.dump(stored_variables, f, indent=4) | |
# Save button to save thresholds to stored_variables.json | |
def save_thresholds(event): | |
thresholds = {marker: input_widget.value for marker, input_widget in threshold_inputs.items()} | |
stored_variables['thresholds'] = thresholds | |
with open(stored_variables_path, 'w') as f: | |
json.dump(stored_variables, f, indent=4) | |
pn.state.notifications.success('Thresholds saved successfully!') | |
save_button2 = pn.widgets.Button(name='Save Thresholds', button_type='primary') | |
save_button2.on_click(save_thresholds) | |
# Create a GridSpec layout | |
grid = pn.GridSpec() | |
# Add the widgets to the grid with three per row | |
row = 0 | |
col = 0 | |
for marker in stored_variables['markers']: | |
grid[row, col] = threshold_inputs[marker] | |
col += 1 | |
if col == 5: | |
col = 0 | |
row += 1 | |
# Add the save button at the end | |
grid[row + 1, :5] = save_button2 | |
# Panel layout | |
threshold_panel = pn.Column( | |
pn.pane.Markdown("## Define Thresholds for Markers"), | |
grid) | |
import pandas as pd | |
import json | |
# Load stored variables from the JSON file | |
with open(stored_variables_path, 'r') as file: | |
stored_variables = json.load(file) | |
# Step 1: Identify intensities | |
intensities = list(df.columns) | |
def assign_cell_type(row): | |
for intensity in intensities: | |
marker = intensity.split('_')[0] # Extract marker from intensity name | |
if marker in stored_variables['thresholds']: | |
threshold = stored_variables['thresholds'][marker] | |
if row[intensity] > threshold: | |
for cell_type, markers in stored_variables['cell_type_classification'].items(): | |
if marker in markers: | |
return cell_type | |
return 'STROMA' # Default if no condition matches | |
# Step 5: Apply the classification function to the DataFrame | |
df['cell_type'] = df.apply(lambda row: assign_cell_type(row), axis=1) | |
df.head() | |
# Check if 'IMMUNE' is present in any row of the cell_type column | |
present_stroma = df['cell_type'].str.contains('STROMA').sum() | |
present_cancer = df['cell_type'].str.contains('CANCER').sum() | |
present_immune = df['cell_type'].str.contains('IMMUNE').sum() | |
present_endothelial = df['cell_type'].str.contains('ENDOTHELIAL').sum() | |
# Print the result | |
#print(present_stroma) | |
#print(present_cancer) | |
#print(present_immune) | |
#print(present_endothelial) | |
#print(len(df)) | |
df.head(30) | |
df | |
# ## IV.8. *HEATMAPS | |
#print(df.columns) | |
# Assuming df_merged is your DataFrame | |
if 'Sample_ID.1' in df.columns: | |
df = df.rename(columns={'Sample_ID.1': 'Sample_ID'}) | |
# print("After renaming Sample_ID", df.columns) | |
# Selecting a subset of rows from the DataFrame df based on the 'Sample_ID' column | |
# and then randomly choosing 20,000 rows from that subset to create the DataFrame test_dfkeep = ['TMA.csv'] | |
with open(stored_variables_path, 'r') as file: | |
ls_samples = stored_vars['ls_samples'] | |
keep = ls_samples | |
keep_cell_type = ['ENDOTHELIAL','CANCER', 'STROMA', 'IMMUNE'] | |
#if 'Sample_ID' in df.columns: | |
# print("The",df.loc[df['cell_type'].isin(keep_cell_type)]) | |
test2_df = df.loc[(df['cell_type'].isin(keep_cell_type)) | |
& (df['Sample_ID'].isin(keep)), :].copy() | |
#print(test2_df.head()) | |
random_rows = np.random.choice(len(test2_df),20000) | |
df2 = test2_df.iloc[random_rows,:].copy() | |
df2 | |
#print(df2) | |
# ### COLORS | |
# #### SAMPLES COLORS | |
color_values = sb.color_palette("husl",n_colors = len(ls_samples)) | |
sb.palplot(sb.color_palette(color_values)) | |
TMA_samples = [s for s in df.Sample_ID.unique() if 'TMA' in s] | |
TMA_color_values = sb.color_palette(n_colors = len(TMA_samples),palette = "gray") | |
sb.palplot(sb.color_palette(TMA_color_values)) | |
# Store in a dictionary | |
color_dict = dict() | |
color_dict = dict(zip(df.Sample_ID.unique(), color_values)) | |
# Replace all TMA samples' colors with gray | |
i = 0 | |
for key in color_dict.keys(): | |
if 'TMA' in key: | |
color_dict[key] = TMA_color_values[i] | |
i +=1 | |
color_dict | |
color_df_sample = color_dict_to_df(color_dict, "Sample_ID") | |
# Save to file in metadatadirectory | |
filename = "sample_color_data.csv" | |
filename = os.path.join(metadata_dir, filename) | |
color_df_sample.to_csv(filename, index = False) | |
color_df_sample | |
# Legend of sample info only | |
g = plt.figure(figsize = (1,1)).add_subplot(111) | |
g.axis('off') | |
handles = [] | |
for item in color_dict.keys(): | |
h = g.bar(0,0, color = color_dict[item], | |
label = item, linewidth =0) | |
handles.append(h) | |
first_legend = plt.legend(handles=handles, loc='upper right', title = 'Sample') | |
filename = "Sample_legend.png" | |
filename = os.path.join(metadata_images_dir, filename) | |
plt.savefig(filename, bbox_inches = 'tight') | |
filename = "sample_color_data.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
#if not os.path.exists(filename): | |
# print("WARNING: Could not find desired file: " + filename) | |
#else : | |
# print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
df = pd.read_csv(filename, header = 0) | |
df = df.drop(columns = ['hex']) | |
# our tuple of float values for rgb, (r, g, b) was read in | |
# as a string '(r, g, b)'. We need to extract the r-, g-, and b- | |
# substrings and convert them back into floats | |
df['rgb'] = df.apply(lambda row: rgb_tuple_from_str(row['rgb']), axis = 1) | |
# Verify size | |
#print("Verifying data read from file is the correct length...\n") | |
#verify_line_no(filename, df.shape[0] + 1) | |
# Turn into dictionary | |
sample_color_dict = df.set_index('Sample_ID')['rgb'].to_dict() | |
# Print information | |
#print('sample_color_dict =\n',sample_color_dict) | |
# #### CELL TYPES COLORS | |
# Define your custom colors for each cell type | |
custom_colors = { | |
'CANCER': (0.1333, 0.5451, 0.1333), | |
'STROMA': (0.4, 0.4, 0.4), | |
'IMMUNE': (1, 1, 0), | |
'ENDOTHELIAL': (0.502, 0, 0.502) | |
} | |
# Retrieve the list of cell types | |
cell_types = list(custom_colors.keys()) | |
# Extract the corresponding colors from the dictionary | |
color_values = [custom_colors[cell] for cell in cell_types] | |
# Display the colors | |
sb.palplot(sb.color_palette(color_values)) | |
# Store in a dctionnary | |
celltype_color_dict = dict(zip(cell_types, color_values)) | |
celltype_color_dict | |
# Save color information (mapping and legend) to metadata directory | |
# Create dataframe | |
celltype_color_df = color_dict_to_df(celltype_color_dict, "cell_type") | |
celltype_color_df.head() | |
# Save to file in metadatadirectory | |
filename = "celltype_color_data.csv" | |
filename = os.path.join(metadata_dir, filename) | |
celltype_color_df.to_csv(filename, index = False) | |
#print("File" + filename + " was created!") | |
# Legend of cell type info only | |
g = plt.figure(figsize = (1,1)).add_subplot(111) | |
g.axis('off') | |
handles = [] | |
for item in celltype_color_dict.keys(): | |
h = g.bar(0,0, color = celltype_color_dict[item], | |
label = item, linewidth =0) | |
handles.append(h) | |
first_legend = plt.legend(handles=handles, loc='upper right', title = 'Cell type'), | |
filename = "Celltype_legend.png" | |
filename = os.path.join(metadata_images_dir, filename) | |
plt.savefig(filename, bbox_inches = 'tight') | |
filename = "celltype_color_data.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
#if not os.path.exists(filename): | |
# print("WARNING: Could not find desired file: "+filename) | |
#else : | |
# print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
df = pd.read_csv(filename, header = 0) | |
df = df.drop(columns = ['hex']) | |
# our tuple of float values for rgb, (r, g, b) was read in | |
# as a string '(r, g, b)'. We need to extract the r-, g-, and b- | |
# substrings and convert them back into floats | |
df['rgb'] = df.apply(lambda row: rgb_tuple_from_str(row['rgb']), axis = 1) | |
# Verify size | |
#print("Verifying data read from file is the correct length...\n") | |
#verify_line_no(filename, df.shape[0] + 1) | |
# Turn into dictionary | |
cell_type_color_dict = df.set_index('cell_type')['rgb'].to_dict() | |
# Print information | |
#print('cell_type_color_dict =\n',cell_type_color_dict) | |
# Colors dictionaries | |
sample_row_colors =df2.Sample_ID.map(sample_color_dict) | |
#print(sample_row_colors[1:5]) | |
cell_type_row_colors = df2.cell_type.map(cell_type_color_dict) | |
#print(cell_type_row_colors[1:5]) | |
# ## Cell Subtype Colours | |
import pandas as pd | |
import os | |
def rgb_tuple_from_str(rgb_str): | |
# Cleaning the string to remove any unexpected 'np.float64' | |
rgb_str = rgb_str.replace("(","").replace(")","").replace(" ","").replace("np.float64", "") | |
try: | |
rgb = list(map(float, rgb_str.split(","))) | |
return tuple(rgb) | |
except ValueError as e: | |
# print(f"Error converting {rgb_str} to floats: {e}") | |
return None # or handle the error as needed | |
filename = "cellsubtype_color_data.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
#if not os.path.exists(filename): | |
# print("WARNING: Could not find desired file: " + filename) | |
#else: | |
# print("The", filename, "file was imported for further analysis!") | |
# Open, read in information | |
df = pd.read_csv(filename, header=0) | |
df = df.drop(columns=['hex']) | |
# Clean the 'rgb' column to remove unexpected strings | |
df['rgb'] = df['rgb'].str.replace("np.float64", "", regex=False) | |
# Apply the function to convert string to tuple of floats | |
df['rgb'] = df.apply(lambda row: rgb_tuple_from_str(row['rgb']), axis=1) | |
# Verify size | |
#print("Verifying data read from file is the correct length...\n") | |
# verify_line_no(filename, df.shape[0] + 1) | |
# Turn into dictionary | |
cell_subtype_color_dict = df.set_index('cell_subtype')['rgb'].to_dict() | |
# Print information | |
#print('cell_subtype_color_dict =\n', cell_subtype_color_dict) | |
df2 | |
# Colors dictionaries | |
sample_row_colors =df2.Sample_ID.map(sample_color_dict) | |
#print(sample_row_colors[1:5]) | |
cell_subtype_row_colors = df2.cell_subtype.map(cell_subtype_color_dict) | |
#print(cell_subtype_row_colors[1:5]) | |
# #### Cell Type | |
df | |
#print(f"Loaded sample files: {ls_samples}") | |
selected_intensities = list(df.columns) | |
selected_intensities = list(df.columns) | |
#print(selected_intensities) | |
df | |
df2 | |
df = df2 | |
df | |
import json | |
import pandas as pd | |
import numpy as np | |
import panel as pn | |
import plotly.graph_objects as go | |
pn.extension('plotly') | |
# Load the selected intensities from the JSON file | |
with open(stored_variables_path, 'r') as f: | |
json_data = json.load(f) | |
ls_samples = json_data["ls_samples"] | |
#print(f"Loaded sample files: {ls_samples}") | |
# Checkbox group to select files | |
checkbox_group = pn.widgets.CheckBoxGroup(name='Select Files', options=ls_samples) | |
# Initially empty dropdowns for X and Y axis selection | |
x_axis_dropdown = pn.widgets.Select(name='Select X-Axis', options=[]) | |
y_axis_dropdown = pn.widgets.Select(name='Select Y-Axis', options=[]) | |
# Input field for the number of random samples | |
random_sample_input = pn.widgets.IntInput(name='Number of Random Samples', value=20000, step=100) | |
# Sliders for interactive X and Y lines | |
x_line_slider = pn.widgets.FloatSlider(name='X Axis Line Position', start=0, end=1, step=0.01) | |
y_line_slider = pn.widgets.FloatSlider(name='Y Axis Line Position', start=0, end=1, step=0.01) | |
# Placeholder for the dot plot | |
plot_placeholder = pn.pane.Plotly() | |
# Placeholder for the digital reconstruction plot | |
reconstruction_placeholder = pn.pane.Plotly() | |
# Function to create the dot plot | |
def create_dot_plot(selected_files, x_axis, y_axis, n_samples, x_line_pos, y_line_pos): | |
if not selected_files: | |
# print("No files selected.") | |
return go.Figure() | |
keep = selected_files | |
test2_df = df.loc[df['Sample_ID'].isin(keep), :].copy() | |
# print(f"Number of samples in test2_df: {len(test2_df)}") | |
if len(test2_df) > n_samples: | |
random_rows = np.random.choice(len(test2_df), n_samples) | |
test_df = test2_df.iloc[random_rows, :].copy() | |
else: | |
test_df = test2_df | |
# print(f"Number of samples in test_df: {len(test_df)}") | |
if x_axis not in test_df.columns or y_axis not in test_df.columns: | |
# print(f"Selected axes {x_axis} or {y_axis} not in DataFrame columns.") | |
return go.Figure() | |
fig = go.Figure() | |
title = 'Threshold' | |
fig.add_trace(go.Scatter( | |
x=test_df[x_axis], | |
y=test_df[y_axis], | |
mode='markers', | |
marker=dict(color='LightSkyBlue', size=2) | |
)) | |
# Add vertical and horizontal lines | |
fig.add_vline(x=x_line_pos, line_width=2, line_dash="dash", line_color="red") | |
fig.add_hline(y=y_line_pos, line_width=2, line_dash="dash", line_color="red") | |
fig.update_layout( | |
title=title, | |
plot_bgcolor='white', | |
autosize=True, | |
margin=dict(l=20, r=20, t=40, b=20), | |
xaxis=dict(title=x_axis, linecolor='black', range=[test_df[x_axis].min(), test_df[x_axis].max()]), | |
yaxis=dict(title=y_axis, linecolor='black', range=[test_df[y_axis].min(), test_df[y_axis].max()]) | |
) | |
return fig | |
def assign_cell_types_again(): | |
with open(stored_variables_path, 'r') as file: | |
stored_variables = json.load(file) | |
intensities = list(df.columns) | |
def assign_cell_type(row): | |
for intensity in intensities: | |
marker = intensity.split('_')[0] # Extract marker from intensity name | |
if marker in stored_variables['thresholds']: | |
threshold = stored_variables['thresholds'][marker] | |
if row[intensity] > threshold: | |
for cell_type, markers in stored_variables['cell_type_classification'].items(): | |
if marker in markers: | |
return cell_type | |
return 'STROMA' # Default if no condition matches | |
df['cell_type'] = df.apply(lambda row: assign_cell_type(row), axis=1) | |
return df | |
# Function to create the digital reconstruction plot | |
def create_reconstruction_plot(selected_files): | |
if not selected_files: | |
# print("No files selected.") | |
return go.Figure() | |
df = assign_cell_types_again() | |
fig = go.Figure() | |
for sample in selected_files: | |
sample_id = sample | |
sample_id2 = sample.split('_')[0] | |
location_colors = df.loc[df['Sample_ID'] == sample_id, ['Nuc_X', 'Nuc_Y_Inv', 'cell_type']] | |
title = sample_id2 + " Background Subtracted XY Map cell types" | |
for celltype in df.loc[df['Sample_ID'] == sample_id, 'cell_type'].unique(): | |
fig.add_scatter( | |
mode='markers', | |
marker=dict(size=3, opacity=0.5, color='rgb' + str(cell_type_color_dict[celltype])), | |
x=location_colors.loc[location_colors['cell_type'] == celltype, 'Nuc_X'], | |
y=location_colors.loc[location_colors['cell_type'] == celltype, 'Nuc_Y_Inv'], | |
name=celltype | |
) | |
fig.update_layout( | |
title=title, | |
plot_bgcolor='white', | |
autosize=True, | |
margin=dict(l=20, r=20, t=40, b=20), | |
legend=dict( | |
title='Cell Types', | |
font=dict( | |
family='Arial', | |
size=12, | |
color='black' | |
), | |
bgcolor='white', | |
bordercolor='black', | |
borderwidth=0.4, | |
itemsizing='constant' | |
), | |
xaxis=dict(title='Nuc_X', linecolor='black', range=[location_colors['Nuc_X'].min(), location_colors['Nuc_X'].max()]), | |
yaxis=dict(title='Nuc_Y_Inv', linecolor='black', range=[location_colors['Nuc_Y_Inv'].min(), location_colors['Nuc_Y_Inv'].max()]) | |
) | |
return fig | |
def update_dropdown_options(event): | |
selected_files = checkbox_group.value | |
# print(f"Selected files in update_dropdown_options: {selected_files}") | |
if selected_files: | |
keep = selected_files | |
test2_df = df.loc[df['Sample_ID'].isin(keep), :].copy() | |
selected_intensities = list(test2_df.columns) | |
selected_intensities = [col for col in selected_intensities if '_Intensity_Average' in col] | |
# print(f"Updated dropdown options: {selected_intensities}") | |
x_axis_dropdown.options = selected_intensities | |
y_axis_dropdown.options = selected_intensities | |
else: | |
x_axis_dropdown.options = [] | |
y_axis_dropdown.options = [] | |
def update_slider_ranges(event): | |
selected_files = checkbox_group.value | |
x_axis = x_axis_dropdown.value | |
y_axis = y_axis_dropdown.value | |
# print("Axis:",x_axis,y_axis) | |
if selected_files and x_axis and y_axis: | |
keep = selected_files | |
test2_df = df.loc[df['Sample_ID'].isin(keep), :].copy() | |
x_range = (test2_df[x_axis].min(), test2_df[x_axis].max()) | |
y_range = (test2_df[y_axis].min(), test2_df[y_axis].max()) | |
# print("Ranges:",x_range,y_range) | |
x_line_slider.start = -abs(x_range[1]) | |
x_line_slider.end = abs(x_range[1]) | |
y_line_slider.start = -abs(y_range[1]) | |
y_line_slider.end = abs(y_range[1]) | |
x_line_slider.value = 0 | |
y_line_slider.value = 0 | |
def on_value_change(event): | |
selected_files = checkbox_group.value | |
x_axis = x_axis_dropdown.value | |
y_axis = y_axis_dropdown.value | |
n_samples = random_sample_input.value | |
x_line_pos = x_line_slider.value | |
y_line_pos = y_line_slider.value | |
# print(f"Selected files: {selected_files}") | |
# print(f"X-Axis: {x_axis}, Y-Axis: {y_axis}, Number of samples: {n_samples}, X Line: {x_line_pos}, Y Line: {y_line_pos}") | |
plot = create_dot_plot(selected_files, x_axis, y_axis, n_samples, x_line_pos, y_line_pos) | |
reconstruction_plot = create_reconstruction_plot(selected_files) | |
plot_placeholder.object = plot | |
reconstruction_placeholder.object = reconstruction_plot | |
# Link value changes to function | |
checkbox_group.param.watch(update_dropdown_options, 'value') | |
checkbox_group.param.watch(update_slider_ranges, 'value') | |
x_axis_dropdown.param.watch(update_slider_ranges, 'value') | |
y_axis_dropdown.param.watch(update_slider_ranges, 'value') | |
x_axis_dropdown.param.watch(on_value_change, 'value') | |
y_axis_dropdown.param.watch(on_value_change, 'value') | |
random_sample_input.param.watch(on_value_change, 'value') | |
x_line_slider.param.watch(on_value_change, 'value') | |
y_line_slider.param.watch(on_value_change, 'value') | |
# Layout | |
plot_with_reconstruction = pn.Column( | |
"## Select Files to Construct Dot Plot", | |
checkbox_group, | |
x_axis_dropdown, | |
y_axis_dropdown, | |
random_sample_input, | |
pn.Row(x_line_slider, y_line_slider), | |
pn.Row( | |
pn.Column( | |
"## Dot Plot", | |
pn.Column(plot_placeholder)), | |
pn.Column( | |
"## Digital Reconstruction Plot", | |
reconstruction_placeholder), | |
)) | |
# Serve the app | |
#plot_with_reconstruction.show() | |
# ## MAKE HEATMAPS | |
# ### Cell Subtype | |
# Create data structure to hold everything we need for row/column annotations | |
# annotations is a dictionary | |
## IMPORTANT - if you use 'annotations', it MUST have both 'rows' and 'cols' | |
## objects inside. These can be empty lists, but they must be there! | |
anns = {} | |
# create a data structure to hold everything we need for only row annotations | |
# row_annotations is a list, where each item therein is a dictioary corresponding | |
# to all of the data pertaining to that particular annotation | |
# Adding each item (e.g., Sample, then Cluster), one at a time to ensure ordering | |
# is as anticipated on figure | |
row_annotations = [] | |
row_annotations.append({'label':'Sample', | |
'type':'row', | |
'mapping':sample_row_colors, | |
'dict':sample_color_dict, | |
'location':'center left', | |
'bbox_to_anchor':(0.1, 0.9)}) | |
row_annotations.append({'label':'Cell type', | |
'type':'row', | |
'mapping':cell_type_row_colors, | |
'dict':cell_type_color_dict, | |
'location':'center left', | |
'bbox_to_anchor':(0.17, 0.9)}) | |
anns['rows'] = row_annotations | |
# Now we repeat the process for column annotations | |
col_annotations = [] | |
anns['cols'] = col_annotations | |
# To simplify marker display in the following figures (heatmap, etc) | |
figure_marker_names = {key: value.split('_')[0] for key, value in full_to_short_names.items()} | |
not_intensities | |
df2 | |
df2.drop('cell_subtype', axis = 'columns') | |
not_intensities = ['Nuc_X', 'Nuc_X_Inv', 'Nuc_Y', 'Nuc_Y_Inv', 'Nucleus_Roundness', 'Nucleus_Size', 'Cell_Size', | |
'ROI_index', 'Sample_ID', 'replicate_ID', 'Cell_ID','cell_type', 'cell_subtype', 'cluster','ID', | |
'Cytoplasm_Size', 'immune_checkpoint', 'Unique_ROI_index', 'Patient', 'Primary_chem(1)_vs_surg(0)'] | |
df2 = assign_cell_types_again() | |
df2.drop('cell_subtype', axis = 'columns') | |
df2.head() | |
# Save one heatmap | |
data = df | |
data | |
#print(data.columns) | |
# Selecting a subset of rows from df based on the 'Sample_ID' column | |
# and then random>ly choosing 50,000 rows from that subset to create the DataFrame test_df | |
with open(stored_variables_path, 'r') as file: | |
ls_samples = stored_vars['ls_samples'] | |
keep = list(ls_samples) | |
keep_cell_type = ['STROMA','CANCER','IMMUNE','ENDOTHELIAL'] | |
# Check the individual conditions | |
cell_type_condition = data['cell_type'].isin(keep_cell_type) | |
sample_id_condition = data['Sample_ID'].isin(keep) | |
#print("Cell type condition:") | |
#print(cell_type_condition.head()) | |
#print("Sample ID condition:") | |
#print(sample_id_condition.head()) | |
# Combine the conditions | |
combined_condition = cell_type_condition & sample_id_condition | |
#print("Combined condition:") | |
#print(combined_condition.head()) | |
# Apply the combined condition to filter the DataFrame | |
test2_df = data.loc[combined_condition].copy() | |
#print("Filtered DataFrame:") | |
#print(test2_df.head()) | |
#test2_df = data.loc[data['cell_type'].isin(keep_cell_type) & data['Sample_ID'].isin(keep)].copy() | |
#print("Test2_df",test2_df.head()) | |
#print(len(test2_df)) | |
#random_rows = np.random.choice(len(test2_df),len(test2_df)) | |
random_rows = np.random.choice(len(test2_df),1000) | |
test_df = test2_df.iloc[random_rows,:].copy() | |
#print(len(test_df)) | |
test_df | |
import json | |
import panel as pn | |
import param | |
import pandas as pd | |
# Initialize Panel extension | |
pn.extension('tabulator') | |
# Path to the stored variables file | |
file_path = stored_variables_path | |
# Load existing data from stored_variables.json with error handling | |
def load_data(): | |
try: | |
with open(file_path, 'r') as file: | |
return json.load(file) | |
except json.JSONDecodeError as e: | |
print(f"Error reading JSON file: {e}") | |
return {} | |
data = load_data() | |
# Define markers, cell types, and cell subtypes from the loaded data | |
markers = data.get('markers', []) | |
cell_types = data.get('cell_type', []) | |
cell_subtypes = data.get('cell_subtype', []) | |
# Sanitize option names | |
def sanitize_options(options): | |
return [opt.replace(' ', '_').replace('+', 'plus').replace('α', 'a').replace("'", "") for opt in options] | |
sanitized_cell_types = sanitize_options(cell_types) | |
sanitized_cell_subtypes = sanitize_options(cell_subtypes) | |
# Helper function to create a Parameterized class and DataFrame | |
def create_classification_df(items, item_label): | |
params = {item_label: param.String()} | |
for marker in markers: | |
params[marker] = param.Boolean(default=False) | |
Classification = type(f'{item_label}Classification', (param.Parameterized,), params) | |
classification_widgets = [] | |
for item in items: | |
item_params = {marker: False for marker in markers} | |
item_params[item_label] = item | |
classification_widgets.append(Classification(**item_params)) | |
classification_df = pd.DataFrame([cw.param.values() for cw in classification_widgets]) | |
classification_df = classification_df[[item_label] + markers] | |
return classification_df | |
# Create DataFrames for cell types and cell subtypes | |
cell_type_df = create_classification_df(sanitized_cell_types, 'CELL_TYPE') | |
cell_subtype_df = create_classification_df(sanitized_cell_subtypes, 'CELL_SUBTYPE') | |
# Define formatters for Tabulator widgets | |
tabulator_formatters = {marker: {'type': 'tickCross'} for marker in markers} | |
# Create Tabulator widgets | |
cell_type_table = pn.widgets.Tabulator(cell_type_df, formatters=tabulator_formatters) | |
cell_subtype_table = pn.widgets.Tabulator(cell_subtype_df, formatters=tabulator_formatters) | |
# Save functions for cell types and cell subtypes | |
def save_data(table, classification_key, item_label): | |
current_data = table.value | |
df_bool = current_data.replace({'✔': True, '✘': False}) | |
classification = {} | |
for i, row in df_bool.iterrows(): | |
item = row[item_label] | |
selected_markers = [marker for marker in markers if row[marker]] | |
classification[item] = selected_markers | |
data[classification_key] = classification | |
# try: | |
with open(file_path, 'w') as file: | |
json.dump(data, file, indent=4) | |
# print(f"{classification_key} saved successfully.") | |
# except IOError as e: | |
# print(f"Error writing JSON file: {e}") | |
# Button actions | |
def save_cell_type_selections(event): | |
save_data(cell_type_table, 'cell_type_classification', 'CELL_TYPE') | |
def save_cell_subtype_selections(event): | |
save_data(cell_subtype_table, 'cell_subtype_classification', 'CELL_SUBTYPE') | |
# Create save buttons | |
save_cell_type_button = pn.widgets.Button(name='Save Cell Type Selections', button_type='primary') | |
save_cell_type_button.on_click(save_cell_type_selections) | |
save_cell_subtype_button = pn.widgets.Button(name='Save Cell Subtype Selections', button_type='primary') | |
save_cell_subtype_button.on_click(save_cell_subtype_selections) | |
cell_type_classification_app_main = pn.Column( | |
pn.pane.Markdown("# Cell Type Classification"), | |
cell_type_table, | |
save_cell_type_button | |
) | |
cell_subtype_classification_app_main = pn.Column( | |
pn.pane.Markdown("# Cell Subtype Classification"), | |
cell_subtype_table, | |
save_cell_subtype_button | |
) | |
#cell_subtype_classification_app_main.show() | |
import json | |
import panel as pn | |
# Load existing stored variables | |
with open(stored_variables_path, 'r') as f: | |
stored_variables = json.load(f) | |
# Initialize a dictionary to hold threshold inputs | |
subtype_threshold_inputs = {} | |
# Create widgets for each marker to get threshold inputs from the user | |
for marker in stored_variables['markers']: | |
subtype_threshold_inputs[marker] = pn.widgets.FloatInput(name=f'{marker} Threshold', value=0.0, step=0.1) | |
try: | |
with open(stored_variables_path, 'r') as f: | |
stored_variables = json.load(f) | |
except FileNotFoundError: | |
stored_variables = {} | |
# Check if 'thresholds' field is present, if not, add it | |
if 'subtype_thresholds' not in stored_variables: | |
subtype_thresholds = {marker: input_widget.value for marker, input_widget in subtype_threshold_inputs.items()} | |
stored_variables['subtype_thresholds'] = subtype_thresholds | |
with open(stored_variables_path, 'w') as f: | |
json.dump(stored_variables, f, indent=4) | |
# Save button to save thresholds to stored_variables.json | |
def save_thresholds(event): | |
subtype_thresholds = {marker: input_widget.value for marker, input_widget in subtype_threshold_inputs.items()} | |
stored_variables['subtype_thresholds'] = subtype_thresholds | |
with open(stored_variables_path, 'w') as f: | |
json.dump(stored_variables, f, indent=4) | |
save_button = pn.widgets.Button(name='Save Thresholds', button_type='primary') | |
save_button.on_click(save_thresholds) | |
# Create a GridSpec layout | |
subtype_grid = pn.GridSpec() | |
# Add the widgets to the grid with five per row | |
row = 0 | |
col = 0 | |
for marker in stored_variables['markers']: | |
subtype_grid[row, col] = subtype_threshold_inputs[marker] | |
col += 1 | |
if col == 5: | |
col = 0 | |
row += 1 | |
# Add the save button at the end, spanning across all columns of the new row | |
subtype_grid[row + 1, :5] = save_button | |
# Panel layout | |
subtype_threshold_panel = pn.Column( | |
pn.pane.Markdown("## Define Thresholds for Markers"), | |
subtype_grid) | |
# Display the panel | |
#subtype_threshold_panel.show() | |
with open(stored_variables_path, 'r') as file: | |
stored_variables = json.load(file) | |
intensities = list(df.columns) | |
def assign_cell_subtypes(row): | |
for intensity in intensities: | |
marker = intensity.split('_')[0] # Extract marker from intensity name | |
if marker in stored_variables['subtype_thresholds']: | |
threshold = stored_variables['subtype_thresholds'][marker] | |
if row[intensity] > threshold: | |
for cell_subtype, markers in stored_variables['cell_subtype_classification'].items(): | |
if marker in markers: | |
return cell_subtype | |
return 'DC' | |
df = assign_cell_types_again() | |
df['cell_subtype'] = df.apply(lambda row: assign_cell_subtypes(row), axis=1) | |
df | |
data | |
# Define a color dictionary | |
cell_subtype_color_dict = { | |
'DC': (0.6509803921568628, 0.807843137254902, 0.8901960784313725), | |
'B': (0.12156862745098039, 0.47058823529411764, 0.7058823529411765), | |
'TCD4': (0.6980392156862745, 0.8745098039215686, 0.5411764705882353), | |
'Exhausted TCD4': (0.2, 0.6274509803921569, 0.17254901960784313), | |
'Exhausted TCD8': (0.984313725490196, 0.6039215686274509, 0.6), | |
'TCD8': (0.8901960784313725, 0.10196078431372549, 0.10980392156862745), | |
'M1': (0.9921568627450981, 0.7490196078431373, 0.43529411764705883), | |
'M2': (1.0, 0.4980392156862745, 0.0), | |
'Treg': (0.792156862745098, 0.6980392156862745, 0.8392156862745098), | |
'Other CD45+': (0.41568627450980394, 0.23921568627450981, 0.6039215686274509), | |
'Cancer': (1.0, 1.0, 0.6), | |
'myCAF αSMA+': (0.6941176470588235, 0.34901960784313724, 0.1568627450980392), | |
'Stroma': (0.6509803921568628, 0.807843137254902, 0.8901960784313725), | |
'Endothelial': (0.12156862745098039, 0.47058823529411764, 0.7058823529411765) | |
} | |
# Add the 'rgb' prefix to the colors | |
cell_subtype_color_dict = {k: f"rgb{v}" for k, v in cell_subtype_color_dict.items()} | |
# Load stored variables from JSON file | |
def load_stored_variables(path): | |
with open(path, 'r') as file: | |
return json.load(file) | |
# Get subtype intensities columns | |
subtype_intensities = [col for col in df.columns if '_Intensity_Average' in col] | |
# Assign cell subtype based on thresholds and classifications | |
def assign_cell_subtype(row): | |
#print("new_row") | |
stored_variables = load_stored_variables(stored_variables_path) | |
for subtype_intensity in subtype_intensities: | |
marker = subtype_intensity.split('_')[0] | |
if marker in stored_variables['subtype_thresholds']: | |
subtype_threshold = stored_variables['subtype_thresholds'][marker] | |
if row[subtype_intensity] > subtype_threshold: | |
for cell_subtype, markers in stored_variables['cell_subtype_classification'].items(): | |
#print(cell_subtype,marker,markers) | |
if marker in markers: | |
#print("Markers:",marker) | |
return cell_subtype # Return the assigned subtype | |
return 'DC' # Default value if no conditions match | |
# Main function to assign cell subtypes to DataFrame | |
def assign_cell_subtypes_again(): | |
df['cell_subtype'] = df.apply(lambda row: assign_cell_subtype(row), axis=1) | |
return df | |
import json | |
import pandas as pd | |
import numpy as np | |
import panel as pn | |
import plotly.graph_objects as go | |
pn.extension('plotly') | |
# Load the selected intensities from the JSON file | |
with open(stored_variables_path, 'r') as f: | |
json_data = json.load(f) | |
subtype_ls_samples = json_data["ls_samples"] | |
#print(f"Loaded sample files: {subtype_ls_samples}") | |
# Checkbox group to select files | |
subtype_checkbox_group = pn.widgets.CheckBoxGroup(name='Select Files', options=subtype_ls_samples) | |
# Initially empty dropdowns for X and Y axis selection | |
subtype_x_axis_dropdown = pn.widgets.Select(name='Select X-Axis', options=[]) | |
subtype_y_axis_dropdown = pn.widgets.Select(name='Select Y-Axis', options=[]) | |
# Input field for the number of random samples | |
subtype_random_sample_input = pn.widgets.IntInput(name='Number of Random Samples', value=20000, step=100) | |
# Sliders for interactive X and Y lines | |
subtype_x_line_slider = pn.widgets.FloatSlider(name='X Axis Line Position', start=0, end=1, step=0.01) | |
subtype_y_line_slider = pn.widgets.FloatSlider(name='Y Axis Line Position', start=0, end=1, step=0.01) | |
# Placeholder for the dot plot | |
subtype_plot_placeholder = pn.pane.Plotly() | |
# Placeholder for the digital reconstruction plot | |
subtype_reconstruction_placeholder = pn.pane.Plotly() | |
def update_color_dict(): | |
# Define a color dictionary | |
cell_subtype_color_dict = { | |
'DC': (0.6509803921568628, 0.807843137254902, 0.8901960784313725), | |
'B': (0.12156862745098039, 0.47058823529411764, 0.7058823529411765), | |
'TCD4': (0.6980392156862745, 0.8745098039215686, 0.5411764705882353), | |
'Exhausted TCD4': (0.2, 0.6274509803921569, 0.17254901960784313), | |
'Exhausted TCD8': (0.984313725490196, 0.6039215686274509, 0.6), | |
'TCD8': (0.8901960784313725, 0.10196078431372549, 0.10980392156862745), | |
'M1': (0.9921568627450981, 0.7490196078431373, 0.43529411764705883), | |
'M2': (1.0, 0.4980392156862745, 0.0), | |
'Treg': (0.792156862745098, 0.6980392156862745, 0.8392156862745098), | |
'Other CD45+': (0.41568627450980394, 0.23921568627450981, 0.6039215686274509), | |
'Cancer': (1.0, 1.0, 0.6), | |
'myCAF αSMA+': (0.6941176470588235, 0.34901960784313724, 0.1568627450980392), | |
'Stroma': (0.6509803921568628, 0.807843137254902, 0.8901960784313725), | |
'Endothelial': (0.12156862745098039, 0.47058823529411764, 0.7058823529411765) | |
} | |
# Add the 'rgb' prefix to the colors | |
cell_subtype_color_dict = {k: f"rgb{v}" for k, v in cell_subtype_color_dict.items()} | |
return cell_subtype_color_dict | |
# Function to create the dot plot | |
def create_subtype_dot_plot(subtype_selected_files, subtype_x_axis, subtype_y_axis, subtype_n_samples, subtype_x_line_pos, subtype_y_line_pos): | |
if not subtype_selected_files: | |
# print("No files selected.") | |
return go.Figure() | |
subtype_keep = subtype_selected_files | |
# print(df) | |
subtype_test2_df = df.loc[df['Sample_ID'].isin(subtype_keep), :].copy() | |
#subtype_test2_df = df.loc[df['Sample_ID'].isin('TMA.csv'), :].copy() | |
# print(f"Number of samples in test2_df: {len(subtype_test2_df)}") | |
if len(subtype_test2_df) > subtype_n_samples: | |
subtype_random_rows = np.random.choice(len(subtype_test2_df), subtype_n_samples) | |
subtype_test_df = subtype_test2_df.iloc[subtype_random_rows, :].copy() | |
else: | |
subtype_test_df = subtype_test2_df | |
# print(f"Number of samples in test_df: {len(subtype_test_df)}") | |
if subtype_x_axis not in subtype_test_df.columns or subtype_y_axis not in subtype_test_df.columns: | |
# print(f"Selected axes {subtype_x_axis} or {subtype_y_axis} not in DataFrame columns.") | |
return go.Figure() | |
fig = go.Figure() | |
title = 'Threshold' | |
fig.add_trace(go.Scatter( | |
x=subtype_test_df[subtype_x_axis], | |
y=subtype_test_df[subtype_y_axis], | |
mode='markers', | |
marker=dict(color='LightSkyBlue', size=2) | |
)) | |
# Add vertical and horizontal lines | |
fig.add_vline(x=subtype_x_line_pos, line_width=2, line_dash="dash", line_color="red") | |
fig.add_hline(y=subtype_y_line_pos, line_width=2, line_dash="dash", line_color="red") | |
fig.update_layout( | |
title=title, | |
plot_bgcolor='white', | |
autosize=True, | |
margin=dict(l=20, r=20, t=40, b=20), | |
xaxis=dict(title=subtype_x_axis, linecolor='black', range=[subtype_test_df[subtype_x_axis].min(), subtype_test_df[subtype_x_axis].max()]), | |
yaxis=dict(title=subtype_y_axis, linecolor='black', range=[subtype_test_df[subtype_y_axis].min(), subtype_test_df[subtype_y_axis].max()]) | |
) | |
return fig | |
def create_subtype_reconstruction_plot(subtype_selected_files): | |
cell_subtype_color_dict = update_color_dict() | |
# print(subtype_selected_files) | |
if not subtype_selected_files: | |
# print("No files selected.") | |
return go.Figure() | |
df = assign_cell_subtypes_again() | |
subtype_fig = go.Figure() | |
for sample in subtype_selected_files: | |
sample_id = sample | |
sample_id2 = sample.split('_')[0] | |
location_colors = df.loc[df['Sample_ID'] == sample_id, ['Nuc_X', 'Nuc_Y_Inv', 'cell_subtype']] | |
# print(location_colors.head()) | |
title = sample_id2 + " Background Subtracted XY Map cell subtypes" | |
for cellsubtype in df.loc[df['Sample_ID'] == sample_id, 'cell_subtype'].unique(): | |
color = str(cell_subtype_color_dict[cellsubtype]) | |
subtype_fig.add_scatter( | |
mode='markers', | |
marker=dict(size=3, opacity=0.5, color=color), | |
x=location_colors.loc[location_colors['cell_subtype'] == cellsubtype, 'Nuc_X'], | |
y=location_colors.loc[location_colors['cell_subtype'] == cellsubtype, 'Nuc_Y_Inv'], | |
name=cellsubtype | |
) | |
subtype_fig.update_layout(title=title, plot_bgcolor='white') | |
subtype_fig.update_xaxes(title_text='Nuc_X', linecolor='black') | |
subtype_fig.update_yaxes(title_text='Nuc_Y_Inv', linecolor='black') | |
# Adjust the size of the points | |
for trace in subtype_fig.data: | |
trace.marker.size = 2 | |
subtype_fig.update_layout( | |
title=title, | |
plot_bgcolor='white', | |
legend=dict( | |
title='Cell Subtypes', # Legend title | |
font=dict( | |
family='Arial', | |
size=12, | |
color='black' | |
), | |
bgcolor='white', | |
bordercolor='black', | |
borderwidth=0.4, | |
itemsizing='constant' | |
) | |
) | |
# Save the figure as an image if needed | |
#subtype_fig.write_image(output_images_dir + "/" + title.replace(" ", "_") + ".png", width=1200, height=800, scale=4) | |
# print(sample_id, "processed!") | |
return subtype_fig | |
def update_subtype_dropdown_options(event): | |
# print(1) | |
subtype_selected_files = subtype_checkbox_group.value | |
# print(f"Selected files in update_dropdown_options: {subtype_selected_files}") | |
if subtype_selected_files: | |
subtype_keep = subtype_selected_files | |
subtype_test2_df = df.loc[df['Sample_ID'].isin(subtype_keep), :].copy() | |
subtype_selected_intensities = list(subtype_test2_df.columns) | |
subtype_selected_intensities = [col for col in subtype_selected_intensities if '_Intensity_Average' in col] | |
# print(f"Updated dropdown options: {subtype_selected_intensities}") | |
subtype_x_axis_dropdown.options = subtype_selected_intensities | |
subtype_y_axis_dropdown.options = subtype_selected_intensities | |
else: | |
subtype_x_axis_dropdown.options = [] | |
subtype_y_axis_dropdown.options = [] | |
def update_subtype_slider_ranges(event): | |
subtype_selected_files = subtype_checkbox_group.value | |
subtype_x_axis = subtype_x_axis_dropdown.value | |
subtype_y_axis = subtype_y_axis_dropdown.value | |
if subtype_selected_files and subtype_x_axis and subtype_y_axis: | |
subtype_keep = subtype_selected_files | |
subtype_test2_df = df.loc[df['Sample_ID'].isin(subtype_keep), :].copy() | |
subtype_x_range = (subtype_test2_df[subtype_x_axis].min(), subtype_test2_df[subtype_x_axis].max()) | |
subtype_y_range = (subtype_test2_df[subtype_y_axis].min(), subtype_test2_df[subtype_y_axis].max()) | |
subtype_x_line_slider.start = -abs(subtype_x_range[1]) | |
subtype_x_line_slider.end = abs(subtype_x_range[1]) | |
subtype_y_line_slider.start = -abs(subtype_y_range[1]) | |
subtype_y_line_slider.end = abs(subtype_y_range[1]) | |
subtype_x_line_slider.value = 0 | |
subtype_y_line_slider.value = 0 | |
def on_subtype_value_change(event): | |
subtype_selected_files = subtype_checkbox_group.value | |
subtype_x_axis = subtype_x_axis_dropdown.value | |
subtype_y_axis = subtype_y_axis_dropdown.value | |
subtype_n_samples = subtype_random_sample_input.value | |
subtype_x_line_pos = subtype_x_line_slider.value | |
subtype_y_line_pos = subtype_y_line_slider.value | |
# print(f"Selected files: {subtype_selected_files}") | |
# print(f"X-Axis: {subtype_x_axis}, Y-Axis: {subtype_y_axis}, Number of samples: {subtype_n_samples}, X Line: {subtype_x_line_pos}, Y Line: {subtype_y_line_pos}") | |
subtype_plot = create_subtype_dot_plot(subtype_selected_files, subtype_x_axis, subtype_y_axis, subtype_n_samples, subtype_x_line_pos, subtype_y_line_pos) | |
subtype_reconstruction_plot = create_subtype_reconstruction_plot(subtype_selected_files) | |
subtype_plot_placeholder.object = subtype_plot | |
subtype_reconstruction_placeholder.object = subtype_reconstruction_plot | |
# Link value changes to function | |
subtype_checkbox_group.param.watch(update_subtype_dropdown_options, 'value') | |
subtype_checkbox_group.param.watch(update_subtype_slider_ranges, 'value') | |
subtype_x_axis_dropdown.param.watch(update_subtype_slider_ranges, 'value') | |
subtype_y_axis_dropdown.param.watch(update_subtype_slider_ranges, 'value') | |
subtype_x_axis_dropdown.param.watch(on_subtype_value_change, 'value') | |
subtype_y_axis_dropdown.param.watch(on_subtype_value_change, 'value') | |
subtype_random_sample_input.param.watch(on_subtype_value_change, 'value') | |
subtype_x_line_slider.param.watch(on_subtype_value_change, 'value') | |
subtype_y_line_slider.param.watch(on_subtype_value_change, 'value') | |
# Layout | |
plot_with_subtype_reconstruction = pn.Column( | |
"## Select Files to Construct Dot Plot", | |
subtype_checkbox_group, | |
subtype_x_axis_dropdown, | |
subtype_y_axis_dropdown, | |
subtype_random_sample_input, | |
pn.Row(subtype_x_line_slider, subtype_y_line_slider), | |
pn.Row( | |
pn.Column( | |
"## Dot Plot", | |
pn.Column(subtype_plot_placeholder)), | |
pn.Column( | |
"## Cell Subtype Digital Reconstruction Plot", | |
subtype_reconstruction_placeholder), | |
) | |
) | |
subtype_x_axis = subtype_x_axis_dropdown.value | |
subtype_y_axis = subtype_y_axis_dropdown.value | |
#print(subtype_x_axis ,subtype_y_axis) | |
# Normalize the values in df2.cell_subtype | |
df2['cell_subtype'] = df2['cell_subtype'].str.strip().str.lower() | |
# Normalize the keys in cell_subtype_color_dict | |
cell_subtype_color_dict = {k.strip().lower(): v for k, v in cell_subtype_color_dict.items()} | |
# Map the cell_subtype values to colors | |
cell_subtype_row_colors = df2.cell_subtype.map(cell_subtype_color_dict) | |
# Debugging: print the unique values and the resulting mapped colors | |
#print("Unique values in df2.cell_subtype:", df2.cell_subtype.unique()) | |
#print("Keys in cell_subtype_color_dict:", cell_subtype_color_dict.keys()) | |
#print(cell_subtype_row_colors[1:5]) | |
data | |
cell_subtype_color_dict | |
# Remove the 'rgb' prefix | |
cell_subtype_color_dict = {k: v[3:] for k, v in cell_subtype_color_dict.items()} | |
cell_subtype_color_dict | |
# Colors dictionaries | |
sample_row_colors =df.Sample_ID.map(sample_color_dict) | |
#print(sample_row_colors[1:5]) | |
cell_subtype_row_colors = df.cell_subtype.map(cell_subtype_color_dict) | |
#print(cell_subtype_row_colors[1:5]) | |
# Count of each immune_checkpoint type by cell_subtype | |
counts = df.groupby(['cell_type', 'cell_subtype']).size().reset_index(name='count') | |
counts | |
total = sum(counts['count']) | |
counts['percentage'] = counts.groupby('cell_subtype')['count'].transform(lambda x: (x / total) * 100) | |
#print(counts) | |
# ## IV.10. SAVE | |
# Save the data by Sample_ID | |
# Check for the existence of the output file first | |
for sample in ls_samples: | |
#sample_id = sample.split('_')[0] | |
sample_id = sample | |
filename = os.path.join(output_data_dir, sample_id + "_" + step_suffix + ".csv") | |
if os.path.exists(filename): | |
df_save = df.loc[df['Sample_ID'] == sample_id, :] | |
df_save.to_csv(filename, index=True, index_label='ID', mode='w') # 'mode='w'' overwrites the file | |
# print("File " + filename + " was overwritten!") | |
else: | |
df_save = df.loc[df['Sample_ID'] == sample_id, :] | |
df_save.to_csv(filename, index=True, index_label='ID') # Save normally if the file doesn't exist | |
# print("File " + filename + " was created and saved !") | |
# All samples | |
filename = os.path.join(output_data_dir, "all_Samples_" + project_name + ".csv") | |
# Save the DataFrame to a CSV file | |
df.to_csv(filename, index=True, index_label='ID') | |
#print("Merged file " + filename + " created!") | |
# ## Panel App | |
# Create widgets and panes | |
df_widget = pn.widgets.DataFrame(metadata, name="MetaData") | |
# Define the three tabs content | |
metadata_tab = pn.Column(pn.pane.Markdown("## Initial DataFrame"),intial_df) | |
dotplot_tab = pn.Column(plot_with_reconstruction) | |
celltype_classification_tab = pn.Column(cell_type_classification_app_main, threshold_panel) | |
cellsubtype_classification_tab = pn.Column(cell_subtype_classification_app_main, subtype_threshold_panel) | |
subtype_dotplot_tab = pn.Column(plot_with_subtype_reconstruction,) | |
app4_5 = pn.template.GoldenTemplate( | |
site="Cyc-IF", | |
title="Marker Threshold & Classification", | |
main=[ | |
pn.Tabs( | |
("Metadata", metadata_tab), | |
("Classify-Celltype-Marker",celltype_classification_tab), | |
("Cell_Types", dotplot_tab), | |
("Classify-Cell Subtype-Marker",cellsubtype_classification_tab), | |
("Cell-Subtypes", subtype_dotplot_tab), | |
# ("Heatmap",pn.Column(celltype_heatmap, cell_subtype_heatmap)) | |
) | |
] | |
) | |
app4_5.show() |