Source code for UNAGI.dynamic_graphs.distDistance

import multiprocessing
import numpy as np
import gc
import anndata
import time
import pandas as pd
import scanpy as sc
from sklearn.metrics import silhouette_score, davies_bouldin_score
from sklearn.neighbors import KernelDensity
from sklearn import cluster
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from sklearn.mixture import GaussianMixture
from scipy.stats import norm, entropy, multivariate_normal, gamma
from scipy import stats 
import torch
from torch.nn import functional as F
from torch import nn, optim
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torch.distributions.gamma import Gamma
from torch.distributions.bernoulli import Bernoulli
from torch.distributions.normal import Normal
from torch.distributions.multivariate_normal import MultivariateNormal
from scipy.stats import entropy 
from scipy import stats
from scipy.sparse import csr_matrix
from sklearn.decomposition import TruncatedSVD
import threading
from scipy.stats import multivariate_normal
import gc

[docs] def getSimilarity(adatai,adataj, i, j): ''' calculate the differential gene similarity between two clusters. Differentail gene similarity = (1-Jaccard index) * gene ranking difference factor parameters ------------------- adatai: anndata The data of stage i adataj: anndata The data of stage j i: int the cluster id in stage i j: int the cluster id in stage j return ------------------- distance: np.array The distance of top 100 differential gene between two clusters ''' if isinstance(adatai[i], list): iDE = adatai[i] else: iDE = adatai[i].tolist() if isinstance(adataj[j], list): jDE = adataj[j] else: jDE = adataj[j].tolist() intersection = 0 orderdistance = 0 for i, each in enumerate(iDE): if each in jDE: intersection+=1 jindex = jDE.index(each) orderdistance += abs(jindex-i) else: orderdistance+=100 union = len(iDE)+len(jDE) - intersection JaccardIndex = intersection/union orderdistance = orderdistance/(100*len(jDE)) # print(orderdistance) DEG_distance = (1-JaccardIndex)*orderdistance return DEG_distance
def mcSampling(mus, sigmas): ''' monte carlo sampling strategy to sample data points from a gaussian distribution in each cell, for example the hidden space is 10, then sample 100 data point from input cell. If the number of cell is 200, then the sampled matrix will be [10, 200*100] parameters ------------------- mus: list mu vectors of input cells (shape: [number of cell, number of hidden nodes]) sigmas: list sigma vectors of input cells (shape: [number of cell, number of hidden nodes]) return ------------------- samplegaussian: list A list of sampled data-points from fitted gaussian distributions. ''' samplegaussian= [[] for _ in range(len(mus[1]))] #hyperparameter needed to be changed by users hiddensize=len(mus[1]) mus = np.vstack(mus) sigmas=np.vstack(sigmas) for i in range(hiddensize): mean = mus[:,i] mean = mean.repeat(100) std = sigmas[:,i] std = std.repeat(100) normal = Normal(torch.zeros(size=mean.shape)+torch.tensor(mean),torch.zeros(size=std.shape)+torch.tensor(std)).sample() samplegaussian[i].append(normal.numpy()) return samplegaussian class GaussianRepThread(threading.Thread): ''' The class to fit gaussian distributions for each hidden node. parameters ------------------- output: list A list of threads data: list A list of samples of the gaussian distribution of each hidden node i: int the index of the thread ''' def __init__(self, output, data,i): threading.Thread.__init__(self) self.i = i self.data = data self.output = output def run(self): #time.sleep(10) self.output[self.i] = norm.fit(self.data) def fitClusterGaussianRepresentation(data): ''' Fitting gaussian distributions for each hidden node parameters ------------------- data: list A list of samples of the gaussian distribution of each hidden node return ------------------- out: list A list of mu and sigma of each hidden node ''' threads = [] out = [[] for _ in range(len(data))] for i in range(len(data)): threads.append(GaussianRepThread(out,data[i],i)) for each in threads: each.start() for each in threads: each.join() return out def getClusterRepresentation(mus, sigmas): ''' MC strategy to sample gaussian data points. Use sampled data points to fit gaussian distributions parameters ------------------- mus: np.array mu vectors of input cells (shape: [number of cell, number of hidden nodes]) sigmas: np.array sigma vectors of input cells (shape: [number of cell, number of hidden nodes]) return ------------------- fitted_gaussian_distributions: list A list of fitted gaussian distributions ''' T1 = time.time() sampling = mcSampling(mus,sigmas) T2 = time.time() #print('time is %s ' %((T2-T1))) fitted_gaussian_distributions = fitClusterGaussianRepresentation(sampling) return fitted_gaussian_distributions
[docs] def normalizeDistance(distance): ''' Normalize the kl divergence distance and top differential gene distances. (Use the min-max normalization method.) parameters ------------------- distance: list A list of distance metrics. ([gaussian kl divergence, top differential gene difference]) return ------------------- normalizedDistance: np.array Sum of normalized kl divergence and top differential gene difference distance ''' distance = np.array(distance) gaussiankld = np.array(distance[:,0]) de = np.array(distance[:,1]) if gaussiankld.shape[0] != 1: #min max normalization de_minmax = (de - de.min()) / (de.max() - de.min()) ggkld = gaussiankld #min max normalization ggkld_minmax = (ggkld - ggkld.min()) / (ggkld.max() - ggkld.min()) else: ggkld_minmax= gaussiankld de_minmax = de normalizedDistance = ggkld_minmax+de_minmax return normalizedDistance
[docs] def calculateKL(cluster1gaussian, cluster2gaussian): ''' calculate KL divergence of multivariate gaussian distributions between two clusters. parameters ------------------- cluster1gaussian: list A list of [mean, std] of gaussian distribution of cluster 1 cluster2gaussian: list A list of [mean, std] of gaussian distribution of cluster 2 return ------------------- kl: np.array kl divergence of multivariate gaussian distributions between two clusters. ''' cluster1 = np.array(cluster1gaussian,dtype=np.float64).reshape(-1,2) cluster2 = np.array(cluster2gaussian,dtype=np.float64).reshape(-1,2) std1 = cluster1[:,1] covariance1 = torch.tensor(np.diag(std1**2)) mean1 = cluster1[:,0] p = MultivariateNormal(torch.tensor(mean1), covariance1) mean2 = cluster2[:,0] std2 = cluster2[:,1] covariance2 = torch.tensor(np.diag(std2**2)) q = MultivariateNormal(torch.tensor(mean2), covariance2) kl = torch.distributions.kl.kl_divergence(p, q) kl = kl.detach().numpy() return kl