import os | |
import numpy as np | |
import pandas as pd | |
import subprocess | |
import os | |
import random | |
import re | |
import pandas as pd | |
import numpy as np | |
import seaborn as sb | |
import matplotlib.pyplot as plt | |
import matplotlib.colors as mplc | |
import subprocess | |
from scipy import signal | |
import plotly.figure_factory as ff | |
import plotly | |
import plotly.graph_objs as go | |
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot | |
# This function takes in a dataframe, changes the names | |
# of the column in various ways, and returns the dataframe. | |
# For best accuracy and generalizability, the code uses | |
# regular expressions (regex) to find strings for replacement. | |
def apply_header_changes(df): | |
# remove lowercase x at beginning of name | |
df.columns = df.columns.str.replace("^x","") | |
# remove space at beginning of name | |
df.columns = df.columns.str.replace("^ ","") | |
# replace space with underscore | |
df.columns = df.columns.str.replace(" ","_") | |
# fix typos | |
df.columns = df.columns.str.replace("AF_AF","AF") | |
# change "Cell Id" into "ID" | |
df.columns = df.columns.str.replace("Cell Id","ID") | |
# if the ID is the index, change "Cell Id" into "ID" | | = "ID" | |
# | |
df.columns = df.columns.str.replace("","") | |
return df | |
def apply_df_changes(df): | |
# Remove "@1" after the ID in the index | |
df.index = df.index.str.replace(r'@1$', '') | |
return df | |
def compare_headers(expected, actual, name): | |
missing_actual = np.setdiff1d(expected, actual) | |
extra_actual = np.setdiff1d(actual, expected) | |
if len(missing_actual) > 0: | |
#print("WARNING: File '" + name + "' lacks the following expected header(s) after import header reformatting: \n" | |
# + str(missing_actual)) | |
print("WARNING: File '" + name + "' lacks the following expected item(s): \n" + str(missing_actual)) | |
if len(extra_actual) > 0: | |
#print("WARNING: '" + name + "' has the following unexpected header(s) after import header reformatting: \n" | |
# + str(extra_actual)) | |
print("WARNING: '" + name + "' has the following unexpected item(s): \n" + str(extra_actual)) | |
return None | |
def add_metadata_location(row): | |
fc = row['full_column'].lower() | |
if 'cytoplasm' in fc and 'cell' not in fc and 'nucleus' not in fc: | |
return 'cytoplasm' | |
elif 'cell' in fc and 'cytoplasm' not in fc and 'nucleus' not in fc: | |
return 'cell' | |
elif 'nucleus' in fc and 'cell' not in fc and 'cytoplasm' not in fc: | |
return 'nucleus' | |
else: | |
return 'unknown' | |
def get_perc(row, cell_type): | |
total = row['stroma'] + row['immune'] + row['cancer']+row['endothelial'] | |
return round(row[cell_type]/total *100,1) | |
# Divide each marker (and its localisation) by the right exposure setting for each group of samples | |
def divide_exp_time(col, exp_col, metadata): | |
exp_time = metadata.loc[metadata['full_column'] ==, exp_col].values[0] | |
return col/exp_time | |
def do_background_sub(col, df, metadata): | |
#print( | |
location = metadata.loc[metadata['full_column'] ==, 'localisation'].values[0] | |
#print('location = ' + location) | |
channel = metadata.loc[metadata['full_column'] ==, 'Channel'].values[0] | |
#print('channel = ' + channel) | |
af_target = metadata.loc[ | |
(metadata['Channel']==channel) \ | |
& (metadata['localisation']==location) \ | |
& (metadata['target_lower'].str.contains(r'^af\d{3}$')),\ | |
'full_column'].values[0] | |
return col - df.loc[:,af_target] | |
""" | |
This function plots distributions. It takes in a string title (title), a list of | |
dataframes from which to plot (dfs), a list of dataframe names for the legend | |
(names), a list of the desired colors for the plotted samples (colors), | |
a string for the x-axis label (x_label), ```a float binwidth for histrogram (bin_size)```, | |
a boolean to show the legend or not (legend), | |
and the names of the marker(s) to plot (input_labels). If not specified, | |
the function will plot all markers in one plot. input_labels can either be a | |
single string, e.g., 'my_marker', or a list, e.g., ['my_marker1','my_marker2']. | |
The function will create a distribution plot and save it to png. It requires | |
a list of items not to be considered as markers when evaluating column names | |
(not_markers) to be in memory. It also requires a desired output location of | |
the files (output_dir) to already be in memory. | |
""" | |
def make_distr_plot_per_sample(title, location, dfs, df_names, colors, x_label, legend, xlims = None, markers = ['all'],not_intensities = None): | |
# Get list of markers to plot if not specified by user, using columns in first df | |
# Writing function(parameter = FILLER) makes that parameter optional when user calls function, | |
# since it is given a default value! | |
if markers == ["all"]: | |
markers = [c for c in dfs[0].columns.values if c not in not_intensities] | |
elif not isinstance(markers, list): | |
markers = [markers] | |
# Make input labels a set to get only unique values, then put back into list | |
markers = list(set(markers)) | |
### GET XLIMS ### | |
if xlims == None: | |
mins = [df.loc[:,markers].min().min() for df in dfs] | |
maxes = [df.loc[:,markers].max().max() for df in dfs] | |
xlims = [min(mins), max(maxes)] | |
if not isinstance(xlims, list): | |
print("Problem - xlmis not list. Exiting method...") | |
return None | |
# Check for data with only 1 unique value - this will cause error if plotted | |
group_labels = [] | |
hist_data = [] | |
# Iterate through all dataframes (dfs) | |
for i in range(len(dfs)): | |
# Iterate through all marker labels | |
for f in markers: | |
# If there is only one unique value in the marker data for this dataframe, | |
# you cannot plot a distribution plot. It gives you a linear algebra | |
# singular value matrix error | |
if dfs[i][f].nunique() != 1: | |
# Add df name and marker name to labels list | |
# If we have >1 df, we want to make clear | |
# which legend label is associated with which df | |
if len(df_names) > 1: | |
group_labels.append(df_names[i]+"_"+f) | |
else: | |
group_labels.append(f) | |
# add the data to the data list | |
hist_data.append(dfs[i][f]) | |
# if no data had >1 unique values, there is nothing to plot | |
if len(group_labels) < 1: | |
print("No markers plotted - all were singular value. Names and markers were " + str(df_names) + ", " + str(markers)) | |
return None | |
if isinstance(colors[0], tuple): | |
colors = ['rgb' + str(color) for color in colors] | |
### PLOT DATA ### | |
# Create plot | |
fig = ff.create_distplot(hist_data, group_labels, bin_size=0.1, | |
#colors=colors, bin_size=bin_size, show_rug=False)#show_hist=False, | |
colors=colors, show_rug=False) | |
# Adjust title, font, background color, legend... | |
fig.update_layout(title_text=title, font=dict(size=18), | |
plot_bgcolor = 'white', showlegend = legend)#, legend_x = 3) | |
# Adjust opacity | |
fig.update_traces(opacity=0.6) | |
# Adjust x-axis parameters | |
fig.update_xaxes(title_text = x_label, showline=True, linewidth=2, linecolor='black', | |
tickfont=dict(size=18), range = xlims) # x lims was here | |
# Adjust y-axis parameters | |
fig.update_yaxes(title_text = "Kernel density estimate",showline=True, linewidth=1, linecolor='black', | |
tickfont=dict(size=18)) | |
# Save plot to HTML | |
#, file = output_dir + "/" + title + ".html") | |
# Plot in new tab | |
#plot(fig) | |
# Save to png | |
filename = os.path.join(location, title.replace(" ","_") + ".png") | |
fig.write_image(filename) | |
return None | |
# this could be changed to use recursion and make it 'smarter' | |
def shorten_feature_names(long_names): | |
name_dict = dict(zip(long_names,[n.split('_')[0] for n in long_names])) | |
names_lts, long_names, iteration = shorten_feature_names_helper(name_dict, long_names, 1) | |
# names_lts = names long-to-short | |
# names_stl = names stl | |
names_stl = {} | |
for n in names_lts.items(): | |
names_stl[n[1]] = n[0] | |
return names_lts, names_stl | |
def shorten_feature_names_helper(name_dict, long_names, iteration): | |
#print("\nThis is iteration #"+str(iteration)) | |
#print("name_dict is: " + str(name_dict)) | |
#print("long_names is: " + str(long_names)) | |
## If the number of unique nicknames == number of long names | |
## then the work here is done | |
#print('\nCompare lengths: ' + str(len(set(name_dict.values()))) + ", " + str(len(long_names))) | |
#print('set(name_dict.values()): ' + str(set(name_dict.values()))) | |
#print('long_names: ' + str(long_names)) | |
if len(set(name_dict.values())) == len(long_names): | |
#print('All done!') | |
return name_dict, long_names, iteration | |
## otherwise, if the number of unique nicknames is not | |
## equal to the number of long names (must be shorter than), | |
## then we need to find more unique names | |
iteration += 1 | |
nicknames_set = set() | |
non_unique_nicknames = set() | |
# construct set of current nicknames | |
for long_name in long_names: | |
#print('long_name is ' + long_name + ' and non_unique_nicknames set is ' + str(non_unique_nicknames)) | |
short_name = name_dict[long_name] | |
if short_name in nicknames_set: | |
non_unique_nicknames.add(short_name) | |
else: | |
nicknames_set.add(short_name) | |
#print('non_unique_nicknames are: ' + str(non_unique_nicknames)) | |
# figure out all long names associated | |
# with the non-unique short names | |
trouble_long_names = set() | |
for long_name in long_names: | |
short_name = name_dict[long_name] | |
if short_name in non_unique_nicknames: | |
trouble_long_names.add(long_name) | |
#print('troublesome long names are: ' + str(trouble_long_names)) | |
#print('name_dict: ' + str(name_dict)) | |
# operate on all names that are associated with | |
# the non-unique short nicknames | |
for long_name in trouble_long_names: | |
#print('trouble long name is: ' + long_name) | |
#print('old nickname is: ' + name_dict[long_name]) | |
name_dict[long_name] = '_'.join(long_name.split('_')[0:iteration]) | |
#print('new nickname is: ' + name_dict[long_name]) | |
shorten_feature_names_helper(name_dict, long_names, iteration) | |
return name_dict, long_names, iteration | |
def heatmap_function2(title, | |
data, | |
method, metric, cmap, | |
cbar_kws, xticklabels, save_loc, | |
row_cluster, col_cluster, | |
annotations = {'rows':[],'cols':[]}): | |
sb.set(font_scale= 6.0) | |
# Extract row and column mappings | |
row_mappings = [] | |
col_mappings = [] | |
for ann in annotations['rows']: | |
row_mappings.append(ann['mapping']) | |
for ann in annotations['cols']: | |
col_mappings.append(ann['mapping']) | |
# If empty lists, convert to None so seaborn accepts | |
# as the row_colors or col_colors objects | |
if len(row_mappings) == 0: | |
row_mappings = None | |
if len(col_mappings) == 0: | |
col_mappings = None | |
def heatmap_function(title, | |
data, | |
method, metric, cmap, | |
cbar_kws, xticklabels, save_loc, | |
row_cluster, col_cluster, | |
annotations = {'rows':[],'cols':[]}): | |
sb.set(font_scale= 2.0) | |
# Extract row and column mappings | |
row_mappings = [] | |
col_mappings = [] | |
for ann in annotations['rows']: | |
row_mappings.append(ann['mapping']) | |
for ann in annotations['cols']: | |
col_mappings.append(ann['mapping']) | |
# If empty lists, convert to None so seaborn accepts | |
# as the row_colors or col_colors objects | |
if len(row_mappings) == 0: | |
row_mappings = None | |
if len(col_mappings) == 0: | |
col_mappings = None | |
# Create clustermap | |
g = sb.clustermap(data = data, | |
robust = True, | |
method = method, metric = metric, | |
cmap = cmap, | |
row_cluster = row_cluster, col_cluster = col_cluster, | |
figsize = (40,30), | |
row_colors=row_mappings, col_colors=col_mappings, | |
yticklabels = False, | |
cbar_kws = cbar_kws, | |
xticklabels = xticklabels) | |
# To rotate slightly the x labels | |
plt.setp(g.ax_heatmap.xaxis.get_majorticklabels(), rotation=45) | |
# Add title | |
g.fig.suptitle(title, fontsize = 60.0) | |
#And now for the legends: | |
# iterate through 'rows', 'cols' | |
for ann_type in annotations.keys(): | |
# iterate through each individual annotation feature | |
for ann in annotations[ann_type]: | |
color_dict = ann['dict'] | |
handles = [] | |
for item in color_dict.keys(): | |
h =,0, color = color_dict[item], label = item, | |
linewidth = 0) | |
handles.append(h) | |
legend = plt.legend(handles = handles, loc = ann['location'], title = ann['label'], | |
bbox_to_anchor=ann['bbox_to_anchor'], | |
bbox_transform=plt.gcf().transFigure) | |
ax = plt.gca().add_artist(legend) | |
# Save image | |
filename = os.path.join(save_loc, title.lower().replace(" ","_") + ".png") | |
g.savefig(filename) | |
return None | |
# sources - | |
# | |
# | |
def verify_line_no(filename, lines_read): | |
# Use Linux "wc -l" command to get the number of lines in the unopened file | |
wc = subprocess.check_output(['wc', '-l', filename]).decode("utf-8") | |
# Take that string, turn it into a list, extract the first item, | |
# and make that an int - this is the number of lines in the file | |
wc = int(wc.split()[0]) | |
if lines_read != wc: | |
print("WARNING: '" + filename + "' has " + str(wc) + | |
" lines, but imported dataframe has " | |
+ str(lines_read) + " (including header).") | |
return None | |
''' def rgb_tuple_from_str(rgb_str): | |
rgb_str = rgb_str.replace("(","").replace(")","").replace(" ","") | |
rgb = list(map(float,rgb_str.split(","))) | |
return tuple(rgb) | |
''' | |
def rgb_tuple_from_str(rgb_str): | |
# Remove unwanted characters and clean the string | |
rgb_str = rgb_str.replace("(", "").replace(")", "").replace(" ", "").replace("np.float64", "") | |
# Split the cleaned string and convert to float | |
try: | |
rgb = list(map(float, rgb_str.split(","))) | |
except ValueError as e: | |
print(f"Error converting string to float: {rgb_str}") | |
raise e | |
return tuple(rgb) | |
def color_dict_to_df(cd, column_name): | |
df = pd.DataFrame.from_dict(cd, orient = 'index') | |
df['rgb'] = df.apply(lambda row: (np.float64(row[0]), np.float64(row[1]), np.float64(row[2])), axis = 1) | |
df = df.drop(columns = [0,1,2]) | |
df['hex'] = df.apply(lambda row: mplc.to_hex(row['rgb']), axis = 1) | |
df[column_name] = df.index | |
return df | |
# p-values that are less than or equal to 0.05 | |
def p_add_star(row): | |
m = [str('{:0.3e}'.format(m)) + "*" | |
if m <= 0.05 \ | |
else str('{:0.3e}'.format(m)) | |
for m in row ] | |
return pd.Series(m) | |
# assigns a specific number of asterisks based on the thresholds | |
def p_to_star(row): | |
output = [] | |
for item in row: | |
if item <= 0.001: | |
stars = 3 | |
elif item <= 0.01: | |
stars = 2 | |
elif item <= 0.05: | |
stars = 1 | |
else: | |
stars = 0 | |
value = '' | |
for i in range(stars): | |
value += '*' | |
output.append(value) | |
return pd.Series(output) | |
def plot_gaussian_distributions(df): | |
# Initialize thresholds list to store all calculated thresholds | |
all_thresholds = [] | |
# Iterate over all columns except the first one (assuming the first one is non-numeric or an index) | |
for column in df.columns: | |
# Extract the marker data | |
marker_data = df[column] | |
# Calculating mean and standard deviation for each marker | |
m_mean, m_std = np.mean(marker_data), np.std(marker_data) | |
# Generating x values for the Gaussian curve | |
x_vals = np.linspace(marker_data.min(), marker_data.max(), 100) | |
# Calculating Gaussian distribution curve | |
gaussian_curve = (1 / (m_std * np.sqrt(2 * np.pi))) * np.exp(-(x_vals - m_mean) ** 2 / (2 * m_std ** 2)) | |
# Creating figure for Gaussian distribution for each marker | |
fig = go.Figure() | |
fig.add_trace(go.Scatter(x=x_vals, y=gaussian_curve, mode='lines', name=f'{column} Gaussian Distribution')) | |
fig.update_layout(title=f'Gaussian Distribution for {column} Marker') | |
# Calculating thresholds based on each marker's distribution | |
seuil_1sigma = m_mean + m_std | |
seuil_2sigma = m_mean + 2 * m_std | |
seuil_3sigma = m_mean + 3 * m_std | |
# Display the figures with thresholds | |
fig.add_shape(type='line', x0=seuil_1sigma, y0=0, x1=seuil_1sigma, y1=np.max(gaussian_curve), | |
line=dict(color='red', dash='dash'), name=f'Seuil 1σ: {seuil_1sigma:.2f}') | |
fig.add_shape(type='line', x0=seuil_2sigma, y0=0, x1=seuil_2sigma, y1=np.max(gaussian_curve), | |
line=dict(color='green', dash='dash'), name=f'Seuil 2σ: {seuil_2sigma:.2f}') | |
fig.add_shape(type='line', x0=seuil_3sigma, y0=0, x1=seuil_3sigma, y1=np.max(gaussian_curve), | |
line=dict(color='blue', dash='dash'), name=f'Seuil 3σ: {seuil_3sigma:.2f}') | |
# Add markers and values to the plot | |
fig.add_trace(go.Scatter(x=[seuil_1sigma, seuil_2sigma, seuil_3sigma], | |
y=[0, 0, 0], | |
mode='markers+text', | |
text=[f'{seuil_1sigma:.2f}', f'{seuil_2sigma:.2f}', f'{seuil_3sigma:.2f}'], | |
textposition="top center", | |
marker=dict(size=10, color=['red', 'green', 'blue']), | |
name='Threshold Values')) | | | |
# Append thresholds for each marker to the list | |
all_thresholds.append((column, seuil_1sigma, seuil_2sigma, seuil_3sigma)) # Include the column name | |
# Return thresholds for all markers | |
return all_thresholds | |