from typing import Optional, Sequence, Dict, Any
import scipy.cluster.hierarchy as sch
import scanpy as sc
import numpy as np
from pandas.api.types import CategoricalDtype
import pandas as pd
from scanpy.tools._utils import _choose_representation
from anndata import AnnData
from ..dynamic_graphs.distDistance import fitClusterGaussianRepresentation,calculateKL
def getclusterreps(data):
'''
Get the sampled Norm distribution of Z space for each cluster
Parameters
----------
data: pandas dataframe
The hidden representation of Z space. The index of the dataframe is the cluster id. The columns of the dataframe are the hidden representation of Z space.
Returns
-------
outs: dictionary
The sampled Norm distribution of Z space. The keys of the dictionary are the cluster id. The values of the dictionary are the sampled Norm distribution of Z space.
'''
clusterid = data.index.unique().tolist()
out = {}
for each in clusterid:
out[each] = fitClusterGaussianRepresentation(data.loc[each].T.values)
return out
def calculateDistance(reps):
'''
Calculate the distance between the sampled Norm distribution of Z space
Parameters
----------
reps: dictionary
The sampled Norm distribution of Z space. The keys of the dictionary are the cluster id. The values of the dictionary are the sampled Norm distribution of Z space.
Returns
-------
df: pandas dataframe
The distance between the sampled Norm distribution of Z space. The index of the dataframe is the cluster id. The columns of the dataframe are the cluster id.
'''
outs = {}
for i, each1 in enumerate(list(reps.keys())):
outs[each1] = []
for j, each2 in enumerate(list(reps.keys())):
outs[each1].append(((calculateKL(reps[each1], reps[each2])+calculateKL(reps[each2], reps[each1]))/2))
df = pd.DataFrame(outs,index=list(reps.keys()))
df.columns = list(reps.keys())
df.index = df.index.astype(int)
df = df.sort_index()
df.columns = df.columns.astype(int)
df = df.sort_index(axis=1)
return df
def calculateDistanceUMAP(umaps):
'''
Calculate the distance between umap coordinates of Z space
Parameters
----------
umaps: pandas dataframe
The umap coordinates of Z space. The index of the dataframe is the cluster id. The columns of the dataframe are the umap coordinates of Z space.
Returns
-------
df: pandas dataframe
The distance between umap coordinates of Z space. The index of the dataframe is the cluster id. The columns of the dataframe are the cluster id.
'''
outs = {}
for i, each1 in enumerate(list(umaps.index)):
outs[each1] = []
for j, each2 in enumerate(list(umaps.index)):
outs[each1].append(np.linalg.norm(umaps.loc[each1]- umaps.loc[each2]))
df = pd.DataFrame(outs,index=list(umaps.index))
df.columns = list(umaps.index)
df.index = df.index.astype(int)
df = df.sort_index()
df.columns = df.columns.astype(int)
df = df.sort_index(axis=1)
return df
def mydendrogram(
adata: AnnData,
groupby: str,
n_pcs: Optional[int] = None,
use_rep: Optional[str] = None,
var_names: Optional[Sequence[str]] = None,
use_raw: Optional[bool] = None,
cor_method: str = "pearson",
linkage_method: str = "complete",
optimal_ordering: bool = False,
key_added: Optional[str] = None,
inplace: bool = True,
) -> Optional[Dict[str, Any]]:
"""\
Modified from scanpy.tl.dendrogram.
Computes a hierarchical clustering for the given `groupby` categories.
By default, the latent representation 'Z' is used.
Alternatively, a list of `var_names` (e.g. genes) can be given.
Average values of either `var_names` or components are used
to compute a correlation matrix.
The hierarchical clustering can be visualized using
:func:`scanpy.pl.dendrogram` or multiple other visualizations that can
include a dendrogram: :func:`~scanpy.pl.matrixplot`,
:func:`~scanpy.pl.heatmap`, :func:`~scanpy.pl.dotplot`,
and :func:`~scanpy.pl.stacked_violin`.
.. note::
The computation of the hierarchical clustering is based on predefined
groups and not per cell. The correlation matrix is computed using by
default pearson but other methods are available.
Parameters
----------
adata
Annotated data matrix
{n_pcs}
{use_rep}
var_names
List of var_names to use for computing the hierarchical clustering.
If `var_names` is given, then `use_rep` and `n_pcs` is ignored.
use_raw
Only when `var_names` is not None.
Use `raw` attribute of `adata` if present.
cor_method
correlation method to use.
Options are 'pearson', 'kendall', and 'spearman'
linkage_method
linkage method to use. See :func:`scipy.cluster.hierarchy.linkage`
for more information.
optimal_ordering
Same as the optimal_ordering argument of :func:`scipy.cluster.hierarchy.linkage`
which reorders the linkage matrix so that the distance between successive
leaves is minimal.
key_added
By default, the dendrogram information is added to
`.uns[f'dendrogram_{{groupby}}']`.
Notice that the `groupby` information is added to the dendrogram.
inplace
If `True`, adds dendrogram information to `adata.uns[key_added]`,
else this function returns the information.
Returns
-------
If `inplace=False`, returns dendrogram information,
else `adata.uns[key_added]` is updated with it.
"""
if isinstance(groupby, str):
# if not a list, turn into a list
groupby = [groupby]
for group in groupby:
if group not in adata.obs_keys():
raise ValueError(
"groupby has to be a valid observation. "
f"Given value: {group}, valid observations: {adata.obs_keys()}"
)
if not isinstance(adata.obs[group].dtype, CategoricalDtype):
raise ValueError(
"groupby has to be a categorical observation. "
f"Given value: {group}, Column type: {adata.obs[group].dtype}"
)
if var_names is None:
rep_df = pd.DataFrame(
_choose_representation(adata, use_rep=use_rep, n_pcs=n_pcs)
)
categorical = adata.obs[groupby[0]]
if len(groupby) > 1:
for group in groupby[1:]:
# create new category by merging the given groupby categories
categorical = (
categorical.astype(str) + "_" + adata.obs[group].astype(str)
).astype("category")
categorical.name = "_".join(groupby)
rep_df.set_index(categorical, inplace=True)
categories = rep_df.index.categories
else:
gene_names = adata.raw.var_names if use_raw else adata.var_names
# aggregate values within categories using 'mean'
mean_df = rep_df.groupby(level=0).mean()
import scipy.cluster.hierarchy as sch
from scipy.spatial import distance
test = getclusterreps(rep_df)
test2 = calculateDistance(test)
corr_matrix = mean_df.T.corr(method=cor_method)
corr_condensed = distance.squareform(test2)
z_var = sch.linkage(
corr_condensed, method=linkage_method, optimal_ordering=optimal_ordering
)
dendro_info = sch.dendrogram(z_var, labels=list(categories), no_plot=True)
dat = dict(
linkage=z_var,
groupby=groupby,
use_rep=use_rep,
cor_method=cor_method,
linkage_method=linkage_method,
categories_ordered=dendro_info["ivl"],
categories_idx_ordered=dendro_info["leaves"],
dendrogram_info=dendro_info,
correlation_matrix=corr_matrix.values,
)
if inplace:
if key_added is None:
key_added = f'dendrogram_{"_".join(groupby)}'
# logg.info(f"Storing dendrogram info using `.uns[{key_added!r}]`")
adata.uns[key_added] = dat
else:
return dat
def mydendrogramUMAP(
adata: AnnData,
groupby: str,
n_pcs: Optional[int] = None,
use_rep: Optional[str] = None,
var_names: Optional[Sequence[str]] = None,
use_raw: Optional[bool] = None,
cor_method: str = "pearson",
linkage_method: str = "complete",
optimal_ordering: bool = False,
key_added: Optional[str] = None,
inplace: bool = True,
) -> Optional[Dict[str, Any]]:
"""\
Modify from scanpy.tl.dendrogram but the distance is between umap coordinates of latent representation 'Z' space
Computes a hierarchical clustering for the given `groupby` categories.
Alternatively, a list of `var_names` (e.g. genes) can be given.
Average values of either `var_names` or components are used
to compute a correlation matrix.
The hierarchical clustering can be visualized using
:func:`scanpy.pl.dendrogram` or multiple other visualizations that can
include a dendrogram: :func:`~scanpy.pl.matrixplot`,
:func:`~scanpy.pl.heatmap`, :func:`~scanpy.pl.dotplot`,
and :func:`~scanpy.pl.stacked_violin`.
.. note::
The computation of the hierarchical clustering is based on predefined
groups and not per cell. The correlation matrix is computed using by
default pearson but other methods are available.
Parameters
----------
adata
Annotated data matrix
{n_pcs}
{use_rep}
var_names
List of var_names to use for computing the hierarchical clustering.
If `var_names` is given, then `use_rep` and `n_pcs` is ignored.
use_raw
Only when `var_names` is not None.
Use `raw` attribute of `adata` if present.
cor_method
correlation method to use.
Options are 'pearson', 'kendall', and 'spearman'
linkage_method
linkage method to use. See :func:`scipy.cluster.hierarchy.linkage`
for more information.
optimal_ordering
Same as the optimal_ordering argument of :func:`scipy.cluster.hierarchy.linkage`
which reorders the linkage matrix so that the distance between successive
leaves is minimal.
key_added
By default, the dendrogram information is added to
`.uns[f'dendrogram_{{groupby}}']`.
Notice that the `groupby` information is added to the dendrogram.
inplace
If `True`, adds dendrogram information to `adata.uns[key_added]`,
else this function returns the information.
Returns
-------
If `inplace=False`, returns dendrogram information,
else `adata.uns[key_added]` is updated with it.
"""
if isinstance(groupby, str):
# if not a list, turn into a list
groupby = [groupby]
for group in groupby:
if group not in adata.obs_keys():
raise ValueError(
"groupby has to be a valid observation. "
f"Given value: {group}, valid observations: {adata.obs_keys()}"
)
if not isinstance(adata.obs[group].dtype, CategoricalDtype):
raise ValueError(
"groupby has to be a categorical observation. "
f"Given value: {group}, Column type: {adata.obs[group].dtype}"
)
if var_names is None:
rep_df = pd.DataFrame(
_choose_representation(adata, use_rep=use_rep, n_pcs=n_pcs)
)
categorical = adata.obs[groupby[0]]
if len(groupby) > 1:
for group in groupby[1:]:
# create new category by merging the given groupby categories
categorical = (
categorical.astype(str) + "_" + adata.obs[group].astype(str)
).astype("category")
categorical.name = "_".join(groupby)
rep_df.set_index(categorical, inplace=True)
categories = rep_df.index.categories
else:
gene_names = adata.raw.var_names if use_raw else adata.var_names
# aggregate values within categories using 'mean'
mean_df = rep_df.groupby(level=0).mean()
import scipy.cluster.hierarchy as sch
from scipy.spatial import distance
# test = getclusterrepsUMAP(rep_df)
test2 = calculateDistanceUMAP(mean_df)
# corr_matrix = mean_df.T.corr(method=cor_method)
corr_condensed = distance.squareform(test2)
z_var = sch.linkage(
corr_condensed, method=linkage_method, optimal_ordering=optimal_ordering
)
dendro_info = sch.dendrogram(z_var, labels=list(categories), no_plot=True)
dat = dict(
linkage=z_var,
groupby=groupby,
use_rep=use_rep,
cor_method=cor_method,
linkage_method=linkage_method,
categories_ordered=dendro_info["ivl"],
categories_idx_ordered=dendro_info["leaves"],
dendrogram_info=dendro_info,
# correlation_matrix=corr_matrix.values,
)
if inplace:
if key_added is None:
key_added = f'dendrogram_{"_".join(groupby)}'
# logg.info(f"Storing dendrogram info using `.uns[{key_added!r}]`")
adata.uns[key_added] = dat
else:
return dat
def get_siblings_at_each_level(leaf_id, Z):
"""
Get the sibling clusters for the given leaf at each level of the hierarchical clustering.
Parameters
----------
leaf_id: int
The leaf id of the given leaf.
Z: numpy.ndarray
The linkage matrix of the hierarchical clustering.
Returns
-------
siblings: list
The sibling clusters for the given leaf at each level of the hierarchical clustering.
"""
n = Z.shape[0] + 1 # The total number of initial leaves
clusters = {i: [i] for i in range(n)} # Initialize clusters with single leaves
siblings = []
# Helper function to find the parent node for the given leaf or cluster
def find_parent(node):
for i, (left, right, _, _) in enumerate(Z):
if node in [left, right]:
return i + n, left if node != left else right
return None, None
# Build clusters and collect siblings at different levels
for k in range(n - 1):
left, right = int(Z[k, 0]), int(Z[k, 1])
clusters[n + k] = clusters[left] + clusters[right]
# Trace back from the leaf to the root and collect siblings
current_node = leaf_id
while True:
parent, sibling_node = find_parent(current_node)
if parent is None: # Reached the root
break
# Check if sibling_node is a leaf or a cluster and collect all leaves
sibling_leaves = clusters[sibling_node]
siblings.append(sibling_leaves)
current_node = parent
return siblings[::-1] # Reverse to have siblings from bottom to top
def map_leaves_to_label(categories_idx_ordered, categories_ordered):
table = {}
for i, each in enumerate(categories_idx_ordered):
table[each] = int(categories_ordered[i])
return table
def my_logfold_change(adata1, adata2,log_fold_change_cutoff=None,abs = False):
'''
Define the log fold change between two datasets. (data is already log transformed) LogFoldChange = Log(mean2) - Log(mean1)
Parameters
----------
adata1: AnnData
The annotated data matrix.
adata2: AnnData
The annotated data matrix.
log_fold_change_cutoff: float
The cutoff of the log fold change to select the genes. Default is None.
abs: bool
If True, the absolute value of the log fold change will be used. Default is False.
Returns
-------
df: pandas dataframe
The log fold change between two datasets. The columns of the dataframe are the log fold change, the names of the genes, the mean of the genes in the first dataset and the mean of the genes in the second dataset.
'''
genenames = adata1.var_names.tolist()
mean1 = np.mean(adata1.X.toarray(),axis=0)
mean2 = np.mean(adata2.X.toarray(),axis=0)
logfold_change = mean2 - mean1
logfold_change_dict = {}
order = np.argsort(logfold_change)[::-1]
mean2 = mean2[order]
mean1 = mean1[order]
for i, each in enumerate(genenames):
logfold_change_dict[each] = logfold_change[i]
if abs ==True:
temp = {k: v for k, v in sorted(logfold_change_dict.items(), key=lambda item: np.abs(item[1]),reverse=True)}
else:
temp = {k: v for k, v in sorted(logfold_change_dict.items(), key=lambda item: item[1],reverse=True)}
df = pd.DataFrame()
df['log_fold_changes'] = temp.values()
df['names'] = temp.keys()
if log_fold_change_cutoff is not None:
df = df[np.abs(df['log_fold_changes'])>log_fold_change_cutoff]
df['Mean_compared_clusters'] = mean1
df['Mean_selected_clusters'] = mean2
return df
def build_reference_siblings(dendrogram):
'''
Find the sibling clusters for each cluster at each level of the hierarchical clustering.
Parameters
----------
dendrogram: dict
The data structure to store the hierarchical clustering information. The data structure is the output of the function mydendrogram.
Returns
-------
out: dict
The data structure to store the sibling clusters for each cluster at each level of the hierarchical clustering. The keys of the dictionary are the cluster ids. The values of the dictionary are the sibling clusters at each level of the hierarchical clustering.
'''
total_leaves = len(dendrogram['dendrogram_info']['ivl'])
table = map_leaves_to_label(dendrogram['categories_idx_ordered'], dendrogram['categories_ordered'])
out = {}
for leave_id in range(total_leaves):
siblings = get_siblings_at_each_level(leave_id, dendrogram['linkage'])
leaf_node_id = table[leave_id]
out[str(leaf_node_id)] = {}
level_table = {}
memory = []
for i, sibs in enumerate(siblings[::-1]):
translated_sibs = [table[each] for each in sibs]
memory+=translated_sibs
level_table[len(siblings)-1-i] = memory.copy()
for level, sibs in enumerate(siblings):
out[str(leaf_node_id)][str(level)] = level_table[level]
return out
def gethcmarkers(adata, selected,groups,cutoff=0.05):
'''
Get the hierarchical clustering markers for the selected cluster and the background clusters.
Parameters
----------
adata: AnnData
The annotated data matrix.
selected: int
The cluster id of the selected cluster.
groups: list
The cluster ids of the background clusters.
cutoff: float
The cutoff of the adjusted p-value to select the hierarchical clustering markers.
Returns
-------
positive_marker: pandas dataframe
The positive hierarchical clustering markers for the selected cluster.
negative_marker: pandas dataframe
The negative hierarchical clustering markers for the selected cluster.
'''
ref = [[selected],[selected]+groups]
ids = []
ids = adata[adata.obs['leiden'] == str(selected)].obs.index.tolist()
selectedids = ids.copy()
groupids = []
for each in groups:
groupids +=adata[adata.obs['leiden'] == str(each)].obs.index.tolist()
ids += groupids.copy()
new = adata[ids]
new.obs['compare'] = None
new.obs.loc[new.obs.index.isin(selectedids), "compare"] = 'selected'
new.obs.loc[new.obs.index.isin(groupids), "compare"] = 'background'
sc.tl.rank_genes_groups(new,groupby='compare',method='wilcoxon',rankby_abs=False)
selected = new[selectedids]
background = new[groupids]
df1 = my_logfold_change(background,selected)
df1 = df1.set_index('names')
df = pd.DataFrame.from_dict({key: new.uns['rank_genes_groups'][key]['selected'] for key in['names','pvals_adj','scores']})
df = df.set_index('names')
df = df.join(df1)
positive_marker = df[df['log_fold_changes']>0]
negative_marker = df[df['log_fold_changes']>0]
positive_marker= positive_marker[positive_marker['pvals_adj'] <cutoff]
negative_marker= negative_marker[negative_marker['pvals_adj'] <cutoff]
return positive_marker, negative_marker,ref
def get_stage_hcmarkers(adata,key,use_rep,cutoff=0.05):
'''
Perform hierarchical clustering on the stage and get the hierarchical clustering markers for each cluster.
Parameters
----------
adata: AnnData
The annotated data matrix.
key: str
The key of the cluster information in adata.obs.
use_rep: str
The key of the representation in adata.obsm.
cutoff: float
The cutoff of the adjusted p-value to select the hierarchical clustering markers.
Returns
-------
Z: numpy.ndarray
The linkage matrix of the hierarchical clustering.
order: list
The order of the clusters in the hierarchical clustering.
out_table: dict
The data structure to store the hierarchical clustering markers for each cluster.
'''
if use_rep == 'z':
mydendrogram(adata,use_rep='z',groupby=key)
elif use_rep == 'umaps':
mydendrogramUMAP(adata,use_rep='X_umap',groupby=key)
test = build_reference_siblings(adata.uns['dendrogram_'+key])
Z = adata.uns['dendrogram_'+key]['linkage']
order = adata.uns['dendrogram_'+key]['categories_ordered']
out_table = {}
for each in list(test.keys()):
out_table[each] = {}
for level in test[each]:
out_table[each][level] = {}
out_table[each][level]['chosen'] = {}
pos_markers, neg_markers, ref = gethcmarkers(adata,int(each),test[each][level],cutoff=cutoff)
out_table[each][level]['chosen']['pos'] = pos_markers
out_table[each][level]['chosen']['neg'] = neg_markers
out_table[each][level]['ref'] = ref
return Z,order,out_table
[docs]
def get_dataset_hcmarkers(adata,stage_key,cluster_key,use_rep,cutoff=0.05):
'''
Perform hierarchical clustering on the dataset and get the hierarchical clustering markers for each stage.
Parameters
----------
adata: AnnData
The annotated data matrix.
stage_key: str
The key of the stage information in adata.obs.
cluster_key: str
The key of the cluster information in adata.obs.
use_rep: str
The key of the representation in adata.obsm.
cutoff: float
The cutoff of the adjusted p-value to select the hierarchical clustering markers. Default is 0.05.
Returns
-------
hcmarkers: dict
The hierarchical clustering markers for each stage. The keys of the dictionary are the cluster information in adata.obs. The values of the dictionary are the positive and negative hierarchical clustering markers for each cluster. The positive and negative hierarchical clustering markers are pandas dataframe with the columns of the names of the genes, the log fold change and the adjusted p-value.
'''
hcmarkers = {}
for each in list(adata.obs[stage_key].unique()):
hcmarkers[str(each)] = {}
stage_adata = adata[adata.obs[stage_key] == each]
Z,order,markers = get_stage_hcmarkers(stage_adata,cluster_key,use_rep=use_rep,cutoff=cutoff)
hcmarkers[str(each)]['Z'] = Z
hcmarkers[str(each)]['order'] = order
hcmarkers[str(each)]['markers'] = markers
return hcmarkers
# get_dataset_hcmarkers(adata,'stage','leiden')
#unit test
if __name__ == '__main__':
import pickle
# adata = sc.read_h5ad('/mnt/md0/yumin/UNAGI_old/data/mes/4/org_test/dataset.h5ad')
adata = sc.read('/mnt/md0/yumin/UNAGI_pyro/src/small_0/dataset.h5ad')
# adata.uns = pickle.load(open('/mnt/md0/yumin/UNAGI_old/data/mes/4/org_test/attribute.pkl', 'rb'))
adata.uns = pickle.load(open('/mnt/md0/yumin/UNAGI_pyro/src/small_0/attribute.pkl', 'rb'))
adata.uns['hcmarkers'] = get_dataset_hcmarkers(adata,stage_key='stage',cluster_key='leiden',use_rep='umaps')
# with open('/mnt/md0/yumin/UNAGI_old/data/mes/4/org_test/attribute_1109_newhc.pkl','wb') as f:
# pickle.dump(adata.uns,f)