Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
# coding: utf-8 | |
# In[1]: | |
import os | |
import random | |
import re | |
import pandas as pd | |
import numpy as np | |
import seaborn as sb | |
import matplotlib.pyplot as plt | |
import matplotlib.colors as mplc | |
import subprocess | |
import warnings | |
from scipy import signal | |
import plotly.figure_factory as ff | |
import plotly | |
import plotly.graph_objs as go | |
from plotly.offline import download_plotlyjs, plot | |
import plotly.express as px | |
from my_modules import * | |
os.getcwd() | |
# In[2]: | |
pn.extension() | |
#Silence FutureWarnings & UserWarnings | |
warnings.filterwarnings('ignore', category= FutureWarning) | |
warnings.filterwarnings('ignore', category= UserWarning) | |
# ## II.2. *DIRECTORIES | |
# In[5]: | |
# Set base directory | |
##### MAC WORKSTATION ##### | |
#base_dir = r'/Volumes/LaboLabrie/Projets/OC_TMA_Pejovic/Temp/Zoe/CyCIF_pipeline/' | |
########################### | |
##### WINDOWS WORKSTATION ##### | |
#base_dir = r'C:\Users\LaboLabrie\gerz2701\cyCIF-pipeline\Set_B' | |
############################### | |
##### LOCAL WORKSTATION ##### | |
#input_path = '/Users/harshithakolipaka/Downloads/wetransfer_data-zip_2024-05-17_1431/' | |
############################# | |
#set_name = 'Set_A' | |
#set_name = 'test' | |
#present_dir = os.path.dirname(os.path.realpath(__file__)) | |
#input_path = os.path.join(present_dir, 'wetransfer_data-zip_2024-05-17_1431') | |
#base_dir = input_path | |
''' | |
# Function to change permissions recursively with error handling | |
def change_permissions_recursive(path, mode): | |
for root, dirs, files in os.walk(path): | |
for dir in dirs: | |
try: | |
os.chmod(os.path.join(root, dir), mode) | |
except Exception as e: | |
print(f"An error occurred while changing permissions for directory {os.path.join(root, dir)}: {e}") | |
for file in files: | |
try: | |
os.chmod(os.path.join(root, file), mode) | |
except Exception as e: | |
print(f"An error occurred while changing permissions for file {os.path.join(root, file)}: {e}") | |
change_permissions_recursive(base_dir, 0o777) | |
change_permissions_recursive('/code', 0o777) | |
''' | |
base_dir = '/code/wetransfer_data-zip_2024-05-17_1431' | |
set_path = 'test' | |
selected_metadata_files = ['Slide_B_DD1s1.one_1.tif.csv', 'Slide_B_DD1s1.one_2.tif.csv'] | |
ls_samples = ['Ashlar_Exposure_Time.csv', 'new_data.csv', 'DD3S1.csv', 'DD3S2.csv', 'DD3S3.csv', 'TMA.csv'] | |
set_name = set_path | |
# In[7]: | |
project_name = set_name # Project name | |
step_suffix = 'bs' # Curent part (here part II) | |
previous_step_suffix_long = "_qc_eda" # Previous part (here QC/EDA NOTEBOOK) | |
# Initial input data directory | |
input_data_dir = os.path.join(base_dir, project_name + previous_step_suffix_long) | |
# BS output directories | |
output_data_dir = os.path.join(base_dir, project_name + "_" + step_suffix) | |
# BS images subdirectory | |
output_images_dir = os.path.join(output_data_dir,"images") | |
# Data and Metadata directories | |
# Metadata directories | |
metadata_dir = os.path.join(base_dir, project_name + "_metadata") | |
# images subdirectory | |
metadata_images_dir = os.path.join(metadata_dir,"images") | |
# Create directories if they don't already exist | |
for d in [base_dir, input_data_dir, output_data_dir, output_images_dir, metadata_dir, metadata_images_dir]: | |
if not os.path.exists(d): | |
print("Creation of the" , d, "directory...") | |
os.makedirs(d) | |
else : | |
print("The", d, "directory already exists !") | |
os.chdir(input_data_dir) | |
# In[8]: | |
# Verify paths | |
print('base_dir :', base_dir) | |
print('input_data_dir :', input_data_dir) | |
print('output_data_dir :', output_data_dir) | |
print('output_images_dir :', output_images_dir) | |
print('metadata_dir :', metadata_dir) | |
print('metadata_images_dir :', metadata_images_dir) | |
# ## II.3. FILES | |
#Don't forget to put your data in the projname_data directory ! | |
# ### II.3.1. METADATA | |
# In[9]: | |
# Import all metadata we need from the QC/EDA chapter | |
# METADATA | |
filename = "marker_intensity_metadata.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
if not os.path.exists(filename): | |
print("WARNING: Could not find desired file: "+filename) | |
else : | |
print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
metadata = pd.read_csv(filename) | |
# Verify size with verify_line_no() function in my_modules.py | |
#verify_line_no(filename, metadata.shape[0] + 1) | |
# Verify headers | |
exp_cols = ['Round','Target','Channel','target_lower','full_column','marker','localisation'] | |
compare_headers(exp_cols, metadata.columns.values, "Marker metadata file") | |
metadata = metadata.dropna() | |
metadata.head() | |
# ### II.3.2. NOT_INTENSITIES | |
# In[10]: | |
# NOT_INTENSITIES | |
filename = "not_intensities.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
if not os.path.exists(filename): | |
print("WARNING: Could not find desired file: "+filename) | |
else : | |
print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
#not_intensities = [] | |
with open(filename, 'r') as fh: | |
not_intensities = fh.read().strip().split("\n") | |
# take str, strip whitespace, split on new line character | |
not_intensities = ['Nuc_X', 'Nuc_X_Inv', 'Nuc_Y', 'Nuc_Y_Inv', 'Nucleus_Roundness', 'Nucleus_Size', 'Cell_Size', | |
'ROI_index', 'Sample_ID', 'replicate_ID', 'Cell_ID','cell_type', 'cell_subtype', 'cluster','ID', | |
'Cytoplasm_Size', 'immune_checkpoint', 'Unique_ROI_index', 'Patient', 'Primary_chem(1)_vs_surg(0)'] | |
# Verify size | |
print("Verifying data read from file is the correct length...\n") | |
verify_line_no(filename, len(not_intensities)) | |
# Print to console | |
print("not_intensities =\n", not_intensities) | |
import os | |
import pandas as pd | |
# Function to compare headers (assuming you have this function defined in your my_modules.py) | |
def compare_headers(expected, actual, description): | |
missing = [col for col in expected if col not in actual] | |
if missing: | |
print(f"WARNING: Missing expected columns in {description}: {missing}") | |
else: | |
print(f"All expected columns are present in {description}.") | |
# Get the current script directory | |
present_dir = os.path.dirname(os.path.realpath(__file__)) | |
# Define the input path | |
input_path = os.path.join(present_dir, 'wetransfer_data-zip_2024-05-17_1431') | |
base_dir = input_path | |
set_path = 'test' | |
# Project and step names | |
project_name = set_path # Project name | |
previous_step_suffix_long = "_qc_eda" # Previous part (here QC/EDA NOTEBOOK) | |
# Initial input data directory | |
input_data_dir = os.path.join(base_dir, project_name + previous_step_suffix_long) | |
# Metadata directories | |
metadata_dir = os.path.join(base_dir, project_name + "_metadata") | |
metadata_images_dir = os.path.join(metadata_dir, "images") | |
# Define writable directory | |
writable_directory = '/tmp' | |
# Check and read metadata file | |
filename = "marker_intensity_metadata.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check if the file exists | |
if not os.path.exists(filename): | |
print("WARNING: Could not find desired file: " + filename) | |
else: | |
print("The", filename, "file was imported for further analysis!") | |
# Open, read in information | |
metadata = pd.read_csv(filename) | |
# Verify headers | |
exp_cols = ['Round', 'Target', 'Channel', 'target_lower', 'full_column', 'marker', 'localisation'] | |
compare_headers(exp_cols, metadata.columns.values, "Marker metadata file") | |
metadata = metadata.dropna() | |
print(metadata.head()) | |
# Example of writing to the writable directory | |
output_file_path = os.path.join(writable_directory, 'processed_metadata.csv') | |
try: | |
metadata.to_csv(output_file_path, index=False) | |
print(f"Processed metadata written successfully to {output_file_path}") | |
except PermissionError as e: | |
print(f"Permission denied: Unable to write the file at {output_file_path}. Error: {e}") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
# ### II.3.3. FULL_TO_SHORT_COLUMN_NAMES | |
# In[11]: | |
# FULL_TO_SHORT_COLUMN_NAMES | |
filename = "full_to_short_column_names.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
if not os.path.exists(filename): | |
print("WARNING: Could not find desired file: " + filename) | |
else : | |
print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
df = pd.read_csv(filename, header = 0) | |
# Verify size | |
print("Verifying data read from file is the correct length...\n") | |
#verify_line_no(filename, df.shape[0] + 1) | |
# Turn into dictionary | |
full_to_short_names = df.set_index('full_name').T.to_dict('records')[0] | |
# Print information | |
print('full_to_short_names =\n',full_to_short_names) | |
# ### II.3.4. SHORT_TO_FULL_COLUMN_NAMES | |
# In[12]: | |
# SHORT_TO_FULL_COLUMN_NAMES | |
filename = "short_to_full_column_names.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
if not os.path.exists(filename): | |
print("WARNING: Could not find desired file: " + filename) | |
else : | |
print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
df = pd.read_csv(filename, header = 0) | |
# Verify size | |
print("Verifying data read from file is the correct length...\n") | |
#verify_line_no(filename, df.shape[0] + 1) | |
# Turn into dictionary | |
short_to_full_names = df.set_index('short_name').T.to_dict('records')[0] | |
# Print information | |
print('short_to_full_names =\n',short_to_full_names) | |
# ### II.3.5. SAMPLES COLORS | |
# In[13]: | |
# COLORS INFORMATION | |
filename = "sample_color_data.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
if not os.path.exists(filename): | |
print("WARNING: Could not find desired file: " + filename) | |
else : | |
print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
df = pd.read_csv(filename, header = 0) | |
df = df.drop(columns = ['hex']) | |
# our tuple of float values for rgb, (r, g, b) was read in | |
# as a string '(r, g, b)'. We need to extract the r-, g-, and b- | |
# substrings and convert them back into floats | |
df['rgb'] = df.apply(lambda row: rgb_tuple_from_str(row['rgb']), axis = 1) | |
# Verify size | |
print("Verifying data read from file is the correct length...\n") | |
#verify_line_no(filename, df.shape[0] + 1) | |
# Turn into dictionary | |
sample_color_dict = df.set_index('Sample_ID')['rgb'].to_dict() | |
# Print information | |
print('sample_color_dict =\n',sample_color_dict) | |
sample_color_dict = pd.DataFrame.from_dict(sample_color_dict, orient='index', columns=['R', 'G', 'B']) | |
# In[14]: | |
sample_color_dict | |
# ### II.3.6. CHANNELS COLORS | |
# In[15]: | |
# CHANNELS | |
filename = "channel_color_data.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
if not os.path.exists(filename): | |
print("WARNING: Could not find desired file: "+filename) | |
else : | |
print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
df = pd.read_csv(filename, header = 0) | |
df = df.drop(columns = ['hex']) | |
# our tuple of float values for rgb, (r, g, b) was read in | |
# as a string '(r, g, b)'. We need to extract the r-, g-, and b- | |
# substrings and convert them back into floats | |
df['rgb'] = df.apply(lambda row: rgb_tuple_from_str(row['rgb']), axis = 1) | |
# Verify size | |
print("Verifying data read from file is the correct length...\n") | |
#verify_line_no(filename, df.shape[0] + 1) | |
# Turn into dictionary | |
channel_color_dict = df.set_index('Channel')['rgb'].to_dict() | |
# Print information | |
print('channel_color_dict =\n',channel_color_dict) | |
channel_color_dict = pd.DataFrame.from_dict(channel_color_dict, orient='index', columns=['R', 'G', 'B']) | |
# In[16]: | |
channel_color_dict | |
# ### II.3.7. ROUNDS COLORS | |
# In[17]: | |
# ROUND | |
filename = "round_color_data.csv" | |
filename = os.path.join(metadata_dir, filename) | |
# Check file exists | |
if not os.path.exists(filename): | |
print("WARNING: Could not find desired file: "+filename) | |
else : | |
print("The",filename,"file was imported for further analysis!") | |
# Open, read in information | |
df = pd.read_csv(filename, header = 0) | |
df = df.drop(columns = ['hex']) | |
# our tuple of float values for rgb, (r, g, b) was read in | |
# as a string '(r, g, b)'. We need to extract the r-, g-, and b- | |
# substrings and convert them back into floats | |
df['rgb'] = df.apply(lambda row: rgb_tuple_from_str(row['rgb']), axis = 1) | |
# Verify size | |
print("Verifying data read from file is the correct length...\n") | |
#verify_line_no(filename, df.shape[0] + 1) | |
# Turn into dictionary | |
round_color_dict = df.set_index('Round')['rgb'].to_dict() | |
# Print information | |
print('round_color_dict =\n',round_color_dict) | |
round_color_dict = pd.DataFrame.from_dict(round_color_dict, orient='index', columns=['R', 'G', 'B']) | |
# In[18]: | |
round_color_dict | |
# ### II.3.8. DATA | |
# In[19]: | |
# DATA | |
# List files in the directory | |
# Check if the directory exists | |
if os.path.exists(input_data_dir): | |
ls_samples = [sample for sample in os.listdir(input_data_dir) if sample.endswith("_qc_eda.csv")] | |
print("The following CSV files were detected:") | |
print([sample for sample in ls_samples]) | |
else: | |
print(f"The directory {input_data_dir} does not exist.") | |
# In[20]: | |
# Import all the others files | |
dfs = {} | |
# Set variable to hold default header values | |
# First gather information on expected headers using first file in ls_samples | |
# Read in the first row of the file corresponding to the first sample (index = 0) in ls_samples | |
df = pd.read_csv(os.path.join(input_data_dir, ls_samples[0]) , index_col = 0, nrows = 1) | |
expected_headers = df.columns.values | |
print(expected_headers) | |
############################### | |
# !! This may take a while !! # | |
############################### | |
for sample in ls_samples: | |
file_path = os.path.join(input_data_dir,sample) | |
try: | |
# Read the CSV file | |
df = pd.read_csv(file_path, index_col=0) | |
# Check if the DataFrame is empty, if so, don't continue trying to process df and remove it | |
if not df.empty: | |
# Reorder the columns to match the expected headers list | |
df = df.reindex(columns=expected_headers) | |
print(sample, "file is processed !\n") | |
#print(df) | |
except pd.errors.EmptyDataError: | |
print(f'\nEmpty data error in {sample} file. Removing from analysis...') | |
ls_samples.remove(sample) | |
# Add df to dfs | |
dfs[sample] = df | |
#print(dfs) | |
# In[21]: | |
# Merge dfs into one df | |
df = pd.concat(dfs.values(), ignore_index=False , sort = False) | |
#del dfs | |
df.head() | |
# In[22]: | |
df.shape | |
# In[23]: | |
# Check for NaN entries (should not be any unless columns do not align) | |
# False means no NaN entries | |
# True means NaN entries | |
df.isnull().any().any() | |
# ## II.4. *FILTERING | |
# In[24]: | |
print("Number of cells before filtering :", df.shape[0]) | |
cells_before_filter = f"Number of cells before filtering :{df.shape[0]}" | |
# In[25]: | |
#print(df) | |
# In[26]: | |
# Delete small cells and objects w/high AF555 Signal (RBCs) | |
# We usually use the 95th percentile calculated during QC_EDA | |
df = df.loc[(df['Nucleus_Size'] > 42 )] | |
df = df.loc[(df['Nucleus_Size'] < 216)] | |
print("Number of cells after filtering on nucleus size:", df.shape[0]) | |
df = df.loc[(df['AF555_Cell_Intensity_Average'] < 2000)] | |
print("Number of cells after filtering on AF555A ___ intensity:", df.shape[0]) | |
cells_after_filter_nucleus = f"Number of cells after filtering on nucleus size: {df.shape[0]}" | |
cells_after_filter_intensity = f"Number of cells after filtering on AF555A ___ intensity: {df.shape[0]}" | |
# In[27]: | |
# Assign cell type | |
# Assign tumor cells at each row at first (random assigning here just for development purposes) | |
# Generate random values for cell_type column | |
random_values = np.random.randint(0, 10, size=len(df)) | |
# Assign cell type based on random values | |
def assign_cell_type(n): | |
return np.random.choice(['STROMA','CANCER','IMMUNE','ENDOTHELIAL']) | |
df['cell_type'] = np.vectorize(assign_cell_type)(random_values) | |
df['cell_subtype'] = df['cell_type'].copy() | |
# In[28]: | |
filtered_dataframe = df | |
df.head() | |
# In[29]: | |
quality_control_df = filtered_dataframe | |
# In[30]: | |
def check_index_format(index_str, ls_samples): | |
""" | |
Checks if the given index string follows the specified format. | |
Args: | |
index_str (str): The index string to be checked. | |
ls_samples (list): A list of valid sample names. | |
Returns: | |
bool: True if the index string follows the format, False otherwise. | |
""" | |
# Split the index string into parts | |
parts = index_str.split('_') | |
# Check if there are exactly 3 parts | |
if len(parts) != 3: | |
print(len(parts)) | |
return False | |
# Check if the first part is in ls_samples | |
sample_name = parts[0] | |
if f'{sample_name}_qc_eda.csv' not in ls_samples: | |
print(sample_name) | |
return False | |
# Check if the second part is in ['cell', 'cytoplasm', 'nucleus'] | |
location = parts[1] | |
valid_locations = ['Cell', 'Cytoplasm', 'Nucleus'] | |
if location not in valid_locations: | |
print(location) | |
return False | |
# Check if the third part is a number | |
try: | |
index = int(parts[2]) | |
except ValueError: | |
print(index) | |
return False | |
# If all checks pass, return True | |
return True | |
# In[31]: | |
# Let's take a look at a few features to make sure our dataframe is as expected | |
df.index | |
def check_format_ofindex(index): | |
for index in df.index: | |
check_index = check_index_format(index, ls_samples) | |
if check_index is False: | |
index_format = "Bad" | |
return index_format | |
index_format = "Good" | |
return index_format | |
print(check_format_ofindex(df.index)) | |
# In[32]: | |
import panel as pn | |
import pandas as pd | |
def quality_check(file, not_intensities): | |
# Load the output file | |
df = file | |
# Check Index | |
check_index = check_format_ofindex(df.index) | |
# Check Shape | |
check_shape = df.shape | |
# Check for NaN entries | |
check_no_null = df.isnull().any().any() | |
mean_intensity = df.loc[:, ~df.columns.isin(not_intensities)].mean(axis=1) | |
if (mean_intensity == 0).any(): | |
df = df.loc[mean_intensity > 0, :] | |
print("df.shape after removing 0 mean values: ", df.shape) | |
check_zero_intensities = f'Shape after removing 0 mean values: {df.shape}' | |
else: | |
print("No zero intensity values.") | |
check_zero_intensities = "No zero intensity values." | |
# Create a quality check results table | |
quality_check_results_table = pd.DataFrame({ | |
'Check': ['Index', 'Shape', 'Check for NaN Entries', 'Check for Zero Intensities'], | |
'Result': [str(check_index), str(check_shape), str(check_no_null), check_zero_intensities] | |
}) | |
# Create a quality check results component | |
quality_check_results_component = pn.Card( | |
pn.pane.DataFrame(quality_check_results_table), | |
title="Quality Control Results", | |
header_background="#2196f3", | |
header_color="white", | |
) | |
return quality_check_results_component | |
# ## II.5. CELL TYPES COLORS | |
# Establish colors to use throughout workflow | |
# we want colors that are categorical, since Cell Type is a non-ordered category. | |
# A categorical color palette will have dissimilar colors. | |
# Get those unique colors | |
cell_types = ['STROMA','CANCER','IMMUNE','ENDOTHELIAL'] | |
color_values = sb.color_palette("hls", n_colors = len(cell_types)) | |
# each color value is a tuple of three values: (R, G, B) | |
print("Unique cell types are:",df.cell_type.unique()) | |
# Display those unique colors | |
sb.palplot(sb.color_palette(color_values)) | |
# In[33]: | |
# Define your custom colors for each cell type | |
custom_colors = { | |
'CANCER': (0.1333, 0.5451, 0.1333), | |
'STROMA': (0.4, 0.4, 0.4), | |
'IMMUNE': (1, 1, 0), | |
'ENDOTHELIAL': (0.502, 0, 0.502) | |
} | |
# Retrieve the list of cell types | |
cell_types = list(custom_colors.keys()) | |
# Extract the corresponding colors from the dictionary | |
color_values = [custom_colors[cell] for cell in cell_types] | |
# Display the colors | |
sb.palplot(sb.color_palette(color_values)) | |
# In[34]: | |
# Store in a dctionnary | |
celltype_color_dict = dict(zip(cell_types, color_values)) | |
celltype_color_dict | |
# In[35]: | |
celltype_color_df = pd.DataFrame.from_dict(celltype_color_dict, orient='index', columns=['R', 'G', 'B']) | |
# In[36]: | |
# Save color information (mapping and legend) to metadata directory | |
# Create dataframe | |
celltype_color_df = color_dict_to_df(celltype_color_dict, "cell_type") | |
celltype_color_df.head() | |
# Save to file in metadatadirectory | |
present_dir = os.path.dirname(os.path.realpath(__file__)) | |
filename = os.path.join(present_dir, "celltype_color_data.csv") | |
#filename = "celltype_color_data.csv" | |
filename = os.path.join(metadata_dir, filename) | |
celltype_color_df.to_csv(filename, index = False) | |
print("File" + filename + " was created!") | |
# In[37]: | |
celltype_color_df.head() | |
# In[38]: | |
# Legend of cell type info only | |
g = plt.figure(figsize = (1,1)).add_subplot(111) | |
g.axis('off') | |
handles = [] | |
for item in celltype_color_dict.keys(): | |
h = g.bar(0,0, color = celltype_color_dict[item], | |
label = item, linewidth =0) | |
handles.append(h) | |
first_legend = plt.legend(handles=handles, loc='upper right', title = 'Cell type'), | |
filename = "Celltype_legend.png" | |
filename = os.path.join(metadata_images_dir, filename) | |
plt.savefig(filename, bbox_inches = 'tight') | |
# In[39]: | |
metadata | |
# In[40]: | |
df.columns.values | |
# In[41]: | |
df.shape | |
# In[42]: | |
metadata.shape | |
# ## II.6. *CELL SUBTYPES COLORS | |
# In[43]: | |
# Establish colors to use throughout workflow | |
# we want colors that are categorical, since Cell Type is a non-ordered category. | |
# A categorical color palette will have dissimilar colors. | |
# Get those unique colors | |
cell_subtypes = ['DC','B', 'TCD4','TCD8','M1','M2','Treg', \ | |
'IMMUNE_OTHER', 'CANCER', 'αSMA_myCAF',\ | |
'STROMA_OTHER', 'ENDOTHELIAL'] | |
color_values = sb.color_palette("Paired",n_colors = len(cell_subtypes)) | |
# each color value is a tuple of three values: (R, G, B) | |
print("Unique cell types are:",df.cell_subtype.unique()) | |
# Display those unique colors | |
sb.palplot(sb.color_palette(color_values)) | |
# In[44]: | |
# Store in a dctionnary | |
cellsubtype_color_dict = dict(zip(cell_subtypes, color_values)) | |
cellsubtype_color_dict | |
# In[45]: | |
cellsubtype_color_df = pd.DataFrame.from_dict(cellsubtype_color_dict, orient='index', columns=['R', 'G', 'B']) | |
# In[46]: | |
# Save color information (mapping and legend) to metadata directory | |
# Create dataframe | |
cellsubtype_color_df = color_dict_to_df(cellsubtype_color_dict, "cell_subtype") | |
# Save to file in metadatadirectory | |
filename = "cellsubtype_color_data.csv" | |
filename = os.path.join(metadata_dir, filename) | |
cellsubtype_color_df.to_csv(filename, index = False) | |
print("File" + filename + " was created!") | |
# In[47]: | |
cellsubtype_color_df.head() | |
# In[48]: | |
# Legend of cell type info only | |
g = plt.figure(figsize = (1,1)).add_subplot(111) | |
g.axis('off') | |
handles = [] | |
for item in cellsubtype_color_dict.keys(): | |
h = g.bar(0,0, color = cellsubtype_color_dict[item], | |
label = item, linewidth =0) | |
handles.append(h) | |
first_legend = plt.legend(handles=handles, loc='upper right', title = 'Cell subtype'), | |
filename = "Cellsubtype_legend.png" | |
filename = os.path.join(metadata_images_dir, filename) | |
plt.savefig(filename, bbox_inches = 'tight') | |
# ## II.7. IMMUNE CHECKPOINT COLORS | |
# In[49]: | |
# Assign IMMUNE SUBTYPES | |
df['cell_subtype'] = df['cell_type'].copy() | |
df['immune_checkpoint'] = 'none' | |
df | |
immune_checkpoint = ['B7H4', 'PDL1', 'PD1', 'None'] | |
color_values = sb.color_palette("husl",n_colors=len(immune_checkpoint)) | |
# each color value is a tuple of three values: (R, G, B) | |
print("Unique immune checkpoint are:",df.immune_checkpoint.unique()) | |
# Display those unique colors | |
sb.palplot(sb.color_palette(color_values)) | |
# In[50]: | |
immune_checkpoint = ['B7H4', 'PDL1', 'PD1', 'B7H4_PDL1', 'None'] | |
# Base colors for the primary checkpoints | |
base_colors = sb.color_palette("husl", n_colors=3) # Three distinct colors | |
# Function to mix two RGB colors | |
def mix_colors(color1, color2): | |
return tuple((c1 + c2) / 2 for c1, c2 in zip(color1, color2)) | |
# Generate mixed colors for the combinations of checkpoints | |
mixed_colors = [ | |
mix_colors(base_colors[0], base_colors[1]), # Mix B7H4 and PDL1 | |
# mix_colors(base_colors[0], base_colors[2]), # Mix B7H4 and PD1 | |
# mix_colors(base_colors[1], base_colors[2]), # Mix PDL1 and PD1 | |
tuple(np.mean(base_colors, axis=0)) # Mix B7H4, PDL1, and PD1 | |
] | |
# Adding the color for 'None' | |
#none_color = [(0.8, 0.8, 0.8)] # A shade of gray | |
# Combine all colors into one list | |
color_values = base_colors + mixed_colors #+ none_color | |
# Display unique immune checkpoint combinations | |
print("Unique immune checkpoint combinations are:", immune_checkpoint) | |
# Display the unique colors | |
sb.palplot(color_values) | |
# In[51]: | |
# Store in a dctionnary | |
immunecheckpoint_color_dict = dict(zip(immune_checkpoint, color_values)) | |
immunecheckpoint_color_dict | |
# In[52]: | |
# Save color information (mapping and legend) to metadata directory | |
# Create dataframe | |
immunecheckpoint_color_df = color_dict_to_df(immunecheckpoint_color_dict, "immune_checkpoint") | |
immunecheckpoint_color_df.head() | |
# Save to file in metadatadirectory | |
filename = "immunecheckpoint_color_data.csv" | |
filename = os.path.join(metadata_dir, filename) | |
immunecheckpoint_color_df.to_csv(filename, index = False) | |
print("File " + filename + " was created!") | |
# In[53]: | |
# Legend of cell type info only | |
g = plt.figure(figsize = (1,1)).add_subplot(111) | |
g.axis('off') | |
handles = [] | |
for item in immunecheckpoint_color_dict.keys(): | |
h = g.bar(0,0, color = immunecheckpoint_color_dict[item], | |
label = item, linewidth =0) | |
handles.append(h) | |
first_legend = plt.legend(handles=handles, loc='upper right', title = 'Immune checkpoint'), | |
filename = "Cellsubtype_legend.png" | |
filename = os.path.join(metadata_images_dir, filename) | |
plt.savefig(filename, bbox_inches = 'tight') | |
# ## II.7. BACKGROUND SUBSTRACTION | |
# In[54]: | |
def do_background_sub(col, df, metadata): | |
#print(col.name) | |
location = metadata.loc[metadata['full_column'] == col.name, 'localisation'].values[0] | |
#print('location = ' + location) | |
channel = metadata.loc[metadata['full_column'] == col.name, 'Channel'].values[0] | |
#print('channel = ' + channel) | |
af_target = metadata.loc[ | |
(metadata['Channel']==channel) \ | |
& (metadata['localisation']==location) \ | |
& (metadata['target_lower'].str.contains(r'^af\d{3}$')),\ | |
'full_column'].values[0] | |
return col - df.loc[:,af_target] | |
# In[55]: | |
metadata_with_localisation = metadata | |
metadata_with_localisation | |
# In[56]: | |
#Normalization | |
df.loc[:, ~df.columns.isin(not_intensities)] = \ | |
df.loc[:, ~df.columns.isin(not_intensities)].apply(lambda column: divide_exp_time(column, 'Exp', metadata), axis = 0) | |
# In[57]: | |
normalization_df = df | |
normalization_df.head() | |
# In[58]: | |
# Do background subtraction | |
# this uses a df (metadata) outside of | |
# the scope of the lambda... | |
# careful that this might break inside of a script... | |
df.loc[:,~df.columns.isin(not_intensities)] = \ | |
df.loc[:,~df.columns.isin(not_intensities)].apply(lambda column: do_background_sub(column, df, metadata),axis = 0) | |
# In[59]: | |
df | |
background_substraction_df = df | |
background_substraction_df.head() | |
# In[60]: | |
# Drop AF columns | |
df = df.filter(regex='^(?!AF\d{3}).*') | |
print(df.columns.values) | |
# In[61]: | |
intensities_df = df.loc[:, ~df.columns.isin(not_intensities)] | |
intensities_df | |
# In[62]: | |
normalization_df.head() | |
# In[63]: | |
metadata_df = metadata_with_localisation | |
intensities_df = intensities_df # Assuming you have loaded the intensities DataFrame | |
# Create a list of column names from the intensities DataFrame | |
column_names = intensities_df.columns.tolist() | |
# Create a Select widget for choosing a column | |
column_selector = pn.widgets.Select(name='Select Column', options=column_names) | |
# Create a Markdown widget to display the selected column's information | |
column_info_md = pn.pane.Markdown(name='Column Information', width=400, object='Select a column to view its information.') | |
# Define a function to update the column information | |
def update_column_info(event): | |
selected_column = event.new | |
if selected_column: | |
# Get the selected column's intensity | |
intensity = intensities_df[selected_column].values | |
# Get the corresponding channel, localization, and experiment from the metadata | |
channel = metadata_df.loc[metadata_df['full_column'] == selected_column, 'Channel'].values[0] | |
localization = metadata_df.loc[metadata_df['full_column'] == selected_column, 'localisation'].values[0] | |
exposure = metadata_df.loc[metadata_df['full_column'] == selected_column, 'Exp'].values[0] | |
# Create a Markdown string with the column information | |
column_info_text = f"**Intensity:** {intensity}\n\n**Channel:** {channel}\n\n**Localization:** {localization}\n\n**Exposure:** {exposure}" | |
# Update the Markdown widget with the column information | |
column_info_md.object = column_info_text | |
else: | |
column_info_md.object = 'Select a column to view its information.' | |
# Watch for changes in the column selector and update the column information | |
column_selector.param.watch(update_column_info, 'value') | |
# Create a Panel app and display the widgets | |
bs_info = pn.Column(column_selector, column_info_md) | |
bs_info | |
normalization_df.head() | |
# In[65]: | |
import panel as pn | |
df_widget = pn.widgets.DataFrame(metadata, name="MetaData") | |
app2 = pn.template.GoldenTemplate( | |
site="Cyc-IF", | |
title=" Background-Substraction", | |
main=[pn.Tabs(("Background-Substraction",pn.Column( | |
#pn.Column(pn.pane.Markdown("### Celltype thresholds"), pn.pane.DataFrame(celltype_color_df)), | |
#pn.Column(pn.pane.Markdown("### Cell Subtype thresholds"), pn.pane.DataFrame(cellsubtype_color_df)), | |
#pn.Column(pn.pane.Markdown("### Cells Before Filtering"),pn.pane.Str(cells_before_filter)), | |
#pn.Column(pn.pane.Markdown("### Cells After Filtering Nucleus"),pn.pane.Str(cells_after_filter_nucleus)), | |
#pn.Column(pn.pane.Markdown("### Cells After Filtering Intensity"),pn.pane.Str(cells_after_filter_intensity)), | |
#pn.Column(pn.pane.Markdown("### Dataframe after filtering"), pn.pane.DataFrame(filtered_dataframe.head())), | |
pn.Column(pn.pane.Markdown("### The metadata obtained that specifies the localisation:"), metadata_with_localisation.head(8)), | |
pn.Column(pn.pane.Markdown("### The channels and exposure of each intensities column"), bs_info), | |
pn.Column(pn.pane.Markdown("### Dataframe after perfroming normalization"),pn.pane.DataFrame(normalization_df.head(), width = 1500)), | |
pn.Column(pn.pane.Markdown("### Dataframe after background Substraction"), pn.pane.DataFrame(background_substraction_df.head()), | |
))), | |
("Quality Control", pn.Column( | |
quality_check(quality_control_df, not_intensities) | |
#pn.pane.Markdown("### The Quality check results are:"), quality_check_results(check_shape, check_no_null, check_all_expected_files_present, check_zero_intensities) | |
)) | |
)],) | |
app2.servable() |