Source code for tigramite.independence_tests.cmiknn

"""Tigramite causal discovery for time series."""

# Author: Jakob Runge <jakob@jakob-runge.com>
#
# License: GNU General Public License v3.0

from __future__ import print_function
from scipy import special, spatial
import numpy as np
from .independence_tests_base import CondIndTest
from numba import jit
import warnings


[docs]class CMIknn(CondIndTest): r"""Conditional mutual information test based on nearest-neighbor estimator. Conditional mutual information is the most general dependency measure coming from an information-theoretic framework. It makes no assumptions about the parametric form of the dependencies by directly estimating the underlying joint density. The test here is based on the estimator in S. Frenzel and B. Pompe, Phys. Rev. Lett. 99, 204101 (2007), combined with a shuffle test to generate the distribution under the null hypothesis of independence first used in [3]_. The knn-estimator is suitable only for variables taking a continuous range of values. For discrete variables use the CMIsymb class. Notes ----- CMI is given by .. math:: I(X;Y|Z) &= \int p(z) \iint p(x,y|z) \log \frac{ p(x,y |z)}{p(x|z)\cdot p(y |z)} \,dx dy dz Its knn-estimator is given by .. math:: \widehat{I}(X;Y|Z) &= \psi (k) + \frac{1}{T} \sum_{t=1}^T \left[ \psi(k_{Z,t}) - \psi(k_{XZ,t}) - \psi(k_{YZ,t}) \right] where :math:`\psi` is the Digamma function. This estimator has as a parameter the number of nearest-neighbors :math:`k` which determines the size of hyper-cubes around each (high-dimensional) sample point. Then :math:`k_{Z,},k_{XZ},k_{YZ}` are the numbers of neighbors in the respective subspaces. :math:`k` can be viewed as a density smoothing parameter (although it is data-adaptive unlike fixed-bandwidth estimators). For large :math:`k`, the underlying dependencies are more smoothed and CMI has a larger bias, but lower variance, which is more important for significance testing. Note that the estimated CMI values can be slightly negative while CMI is a non- negative quantity. This method requires the scipy.spatial.cKDTree package. References ---------- .. [3] J. Runge (2018): Conditional Independence Testing Based on a Nearest-Neighbor Estimator of Conditional Mutual Information. In Proceedings of the 21st International Conference on Artificial Intelligence and Statistics. http://proceedings.mlr.press/v84/runge18a.html Parameters ---------- knn : int or float, optional (default: 0.2) Number of nearest-neighbors which determines the size of hyper-cubes around each (high-dimensional) sample point. If smaller than 1, this is computed as a fraction of T, hence knn=knn*T. For knn larger or equal to 1, this is the absolute number. shuffle_neighbors : int, optional (default: 5) Number of nearest-neighbors within Z for the shuffle surrogates which determines the size of hyper-cubes around each (high-dimensional) sample point. transform : {'ranks', 'standardize', 'uniform', False}, optional (default: 'ranks') Whether to transform the array beforehand by standardizing or transforming to uniform marginals. workers : int (optional, default = -1) Number of workers to use for parallel processing. If -1 is given all processors are used. Default: -1. model_selection_folds : int (optional, default = 3) Number of folds in cross-validation used in model selection. significance : str, optional (default: 'shuffle_test') Type of significance test to use. For CMIknn only 'fixed_thres' and 'shuffle_test' are available. **kwargs : Arguments passed on to parent class CondIndTest. """ @property def measure(self): """ Concrete property to return the measure of the independence test """ return self._measure def __init__(self, knn=0.2, shuffle_neighbors=5, significance='shuffle_test', transform='ranks', workers=-1, model_selection_folds=3, **kwargs): # Set the member variables self.knn = knn self.shuffle_neighbors = shuffle_neighbors self.transform = transform self._measure = 'cmi_knn' self.two_sided = False self.residual_based = False self.recycle_residuals = False self.workers = workers self.model_selection_folds = model_selection_folds # Call the parent constructor CondIndTest.__init__(self, significance=significance, **kwargs) # Print some information about construction if self.verbosity > 0: if self.knn < 1: print("knn/T = %s" % self.knn) else: print("knn = %s" % self.knn) print("shuffle_neighbors = %d\n" % self.shuffle_neighbors) @jit(forceobj=True) def _get_nearest_neighbors(self, array, xyz, knn): """Returns nearest neighbors according to Frenzel and Pompe (2007). Retrieves the distances eps to the k-th nearest neighbors for every sample in joint space XYZ and returns the numbers of nearest neighbors within eps in subspaces Z, XZ, YZ. Parameters ---------- array : array-like data array with X, Y, Z in rows and observations in columns xyz : array of ints XYZ identifier array of shape (dim,). knn : int or float Number of nearest-neighbors which determines the size of hyper-cubes around each (high-dimensional) sample point. If smaller than 1, this is computed as a fraction of T, hence knn=knn*T. For knn larger or equal to 1, this is the absolute number. Returns ------- k_xz, k_yz, k_z : tuple of arrays of shape (T,) Nearest neighbors in subspaces. """ array = array.astype(np.float64) xyz = xyz.astype(np.int32) dim, T = array.shape # Add noise to destroy ties... array += (1E-6 * array.std(axis=1).reshape(dim, 1) * self.random_state.random((array.shape[0], array.shape[1]))) if self.transform == 'standardize': # Standardize array = array.astype(np.float64) array -= array.mean(axis=1).reshape(dim, 1) std = array.std(axis=1) for i in range(dim): if std[i] != 0.: array[i] /= std[i] # array /= array.std(axis=1).reshape(dim, 1) # FIXME: If the time series is constant, return nan rather than # raising Exception if np.any(std == 0.) and self.verbosity > 0: warnings.warn("Possibly constant array!") # raise ValueError("nans after standardizing, " # "possibly constant array!") elif self.transform == 'uniform': array = self._trafo2uniform(array) elif self.transform == 'ranks': array = array.argsort(axis=1).argsort(axis=1).astype(np.float64) array = array.T tree_xyz = spatial.cKDTree(array) epsarray = tree_xyz.query(array, k=[knn+1], p=np.inf, eps=0., workers=self.workers)[0][:, 0].astype(np.float64) # To search neighbors < eps epsarray = np.multiply(epsarray, 0.99999) # Subsample indices x_indices = np.where(xyz == 0)[0] y_indices = np.where(xyz == 1)[0] z_indices = np.where(xyz == 2)[0] # Find nearest neighbors in subspaces xz = array[:, np.concatenate((x_indices, z_indices))] tree_xz = spatial.cKDTree(xz) k_xz = tree_xz.query_ball_point(xz, r=epsarray, eps=0., p=np.inf, workers=self.workers, return_length=True) yz = array[:, np.concatenate((y_indices, z_indices))] tree_yz = spatial.cKDTree(yz) k_yz = tree_yz.query_ball_point(yz, r=epsarray, eps=0., p=np.inf, workers=self.workers, return_length=True) if len(z_indices) > 0: z = array[:, z_indices] tree_z = spatial.cKDTree(z) k_z = tree_z.query_ball_point(z, r=epsarray, eps=0., p=np.inf, workers=self.workers, return_length=True) else: # Number of neighbors is T when z is empty. k_z = np.full(T, T, dtype=np.float64) return k_xz, k_yz, k_z
[docs] def get_dependence_measure(self, array, xyz): """Returns CMI estimate as described in Frenzel and Pompe PRL (2007). Parameters ---------- array : array-like data array with X, Y, Z in rows and observations in columns xyz : array of ints XYZ identifier array of shape (dim,). Returns ------- val : float Conditional mutual information estimate. """ dim, T = array.shape if self.knn < 1: knn_here = max(1, int(self.knn*T)) else: knn_here = max(1, int(self.knn)) k_xz, k_yz, k_z = self._get_nearest_neighbors(array=array, xyz=xyz, knn=knn_here) val = special.digamma(knn_here) - (special.digamma(k_xz) + special.digamma(k_yz) - special.digamma(k_z)).mean() return val
[docs] def get_shuffle_significance(self, array, xyz, value, return_null_dist=False): """Returns p-value for nearest-neighbor shuffle significance test. For non-empty Z, overwrites get_shuffle_significance from the parent class which is a block shuffle test, which does not preserve dependencies of X and Y with Z. Here the parameter shuffle_neighbors is used to permute only those values :math:`x_i` and :math:`x_j` for which :math:`z_j` is among the nearest niehgbors of :math:`z_i`. If Z is empty, the block-shuffle test is used. Parameters ---------- array : array-like data array with X, Y, Z in rows and observations in columns xyz : array of ints XYZ identifier array of shape (dim,). value : number Value of test statistic for unshuffled estimate. Returns ------- pval : float p-value """ dim, T = array.shape # Skip shuffle test if value is above threshold # if value > self.minimum threshold: # if return_null_dist: # return 0., None # else: # return 0. # max_neighbors = max(1, int(max_neighbor_ratio*T)) x_indices = np.where(xyz == 0)[0] z_indices = np.where(xyz == 2)[0] if len(z_indices) > 0 and self.shuffle_neighbors < T: if self.verbosity > 2: print(" nearest-neighbor shuffle significance " "test with n = %d and %d surrogates" % ( self.shuffle_neighbors, self.sig_samples)) # Get nearest neighbors around each sample point in Z z_array = np.fastCopyAndTranspose(array[z_indices, :]) tree_xyz = spatial.cKDTree(z_array) neighbors = tree_xyz.query(z_array, k=self.shuffle_neighbors, p=np.inf, eps=0.)[1].astype(np.int32) null_dist = np.zeros(self.sig_samples) for sam in range(self.sig_samples): # Generate random order in which to go through indices loop in # next step order = self.random_state.permutation(T).astype(np.int32) # Shuffle neighbor indices for each sample index for i in range(len(neighbors)): self.random_state.shuffle(neighbors[i]) # neighbors = self.random_state.permuted(neighbors, axis=1) # Select a series of neighbor indices that contains as few as # possible duplicates restricted_permutation = self.get_restricted_permutation( T=T, shuffle_neighbors=self.shuffle_neighbors, neighbors=neighbors, order=order) array_shuffled = np.copy(array) for i in x_indices: array_shuffled[i] = array[i, restricted_permutation] null_dist[sam] = self.get_dependence_measure(array_shuffled, xyz) else: null_dist = \ self._get_shuffle_dist(array, xyz, self.get_dependence_measure, sig_samples=self.sig_samples, sig_blocklength=self.sig_blocklength, verbosity=self.verbosity) pval = (null_dist >= value).mean() if return_null_dist: # Sort null_dist.sort() return pval, null_dist return pval
[docs] def get_conditional_entropy(self, array, xyz): """Returns the nearest-neighbor conditional entropy estimate of H(X|Y). Parameters ---------- array : array-like data array with X, Y in rows and observations in columns xyz : array of ints XYZ identifier array of shape (dim,). Here only uses 0 for X and 1 for Y. Returns ------- val : float Entropy estimate. """ dim, T = array.shape if self.knn < 1: knn_here = max(1, int(self.knn*T)) else: knn_here = max(1, int(self.knn)) array = array.astype(np.float64) # Add noise to destroy ties... array += (1E-6 * array.std(axis=1).reshape(dim, 1) * self.random_state.random((array.shape[0], array.shape[1]))) if self.transform == 'standardize': # Standardize array = array.astype(np.float64) array -= array.mean(axis=1).reshape(dim, 1) std = array.std(axis=1) for i in range(dim): if std[i] != 0.: array[i] /= std[i] # array /= array.std(axis=1).reshape(dim, 1) # FIXME: If the time series is constant, return nan rather than # raising Exception if np.any(std == 0.) and self.verbosity > 0: warnings.warn("Possibly constant array!") # if np.isnan(array).sum() != 0: # raise ValueError("nans after standardizing, " # "possibly constant array!") elif self.transform == 'uniform': array = self._trafo2uniform(array) elif self.transform == 'ranks': array = array.argsort(axis=1).argsort(axis=1).astype(np.float64) # Compute conditional entropy as H(X|Y) = H(X) - I(X;Y) # First compute H(X) # Use cKDTree to get distances eps to the k-th nearest neighbors for # every sample in joint space X with maximum norm x_indices = np.where(xyz == 0)[0] y_indices = np.where(xyz == 1)[0] dim_x = int(np.where(xyz == 0)[0][-1] + 1) if 1 in xyz: dim_y = int(np.where(xyz == 1)[0][-1] + 1 - dim_x) else: dim_y = 0 x_array = np.fastCopyAndTranspose(array[x_indices, :]) tree_xyz = spatial.cKDTree(x_array) epsarray = tree_xyz.query(x_array, k=[knn_here+1], p=np.inf, eps=0., workers=self.workers)[0][:, 0].astype(np.float64) h_x = - special.digamma(knn_here) + special.digamma(T) + dim_x * np.log(2.*epsarray).mean() # Then compute MI(X;Y) if dim_y > 0: xyz_here = np.array([index for index in xyz if index == 0 or index == 1]) array_xy = array[list(x_indices) + list(y_indices), :] i_xy = self.get_dependence_measure(array_xy, xyz_here) else: i_xy = 0. h_x_y = h_x - i_xy return h_x_y
@jit(forceobj=True) def get_restricted_permutation(self, T, shuffle_neighbors, neighbors, order): restricted_permutation = np.zeros(T, dtype=np.int32) used = np.array([], dtype=np.int32) for sample_index in order: m = 0 use = neighbors[sample_index, m] while ((use in used) and (m < shuffle_neighbors - 1)): m += 1 use = neighbors[sample_index, m] restricted_permutation[sample_index] = use used = np.append(used, use) return restricted_permutation
[docs] def get_model_selection_criterion(self, j, parents, tau_max=0): """Returns a cross-validation-based score for nearest-neighbor estimates. Fits a nearest-neighbor model of the parents to variable j and returns the score. The lower, the better the fit. Here used to determine optimal hyperparameters in PCMCI(pc_alpha or fixed thres). Parameters ---------- j : int Index of target variable in data array. parents : list List of form [(0, -1), (3, -2), ...] containing parents. tau_max : int, optional (default: 0) Maximum time lag. This may be used to make sure that estimates for different lags in X, Z, all have the same sample size. Returns: score : float Model score. """ import sklearn from sklearn.neighbors import KNeighborsRegressor from sklearn.model_selection import cross_val_score Y = [(j, 0)] X = [(j, 0)] # dummy variable here Z = parents array, xyz, _ = self.dataframe.construct_array(X=X, Y=Y, Z=Z, tau_max=tau_max, mask_type=self.mask_type, return_cleaned_xyz=False, do_checks=True, verbosity=self.verbosity) dim, T = array.shape # Standardize array = array.astype(np.float64) array -= array.mean(axis=1).reshape(dim, 1) std = array.std(axis=1) for i in range(dim): if std[i] != 0.: array[i] /= std[i] if np.any(std == 0.) and self.verbosity > 0: warnings.warn("Possibly constant array!") # raise ValueError("nans after standardizing, " # "possibly constant array!") predictor_indices = list(np.where(xyz==2)[0]) predictor_array = array[predictor_indices, :].T # Target is only first entry of Y, ie [y] target_array = array[np.where(xyz==1)[0][0], :] if predictor_array.size == 0: # Regressing on ones if empty parents predictor_array = np.ones(T).reshape(T, 1) if self.knn < 1: knn_here = max(1, int(self.knn*T)) else: knn_here = max(1, int(self.knn)) knn_model = KNeighborsRegressor(n_neighbors=knn_here) scores = cross_val_score(estimator=knn_model, X=predictor_array, y=target_array, cv=self.model_selection_folds, n_jobs=self.workers) # print(scores) return -scores.mean()
if __name__ == '__main__': import tigramite from tigramite.data_processing import DataFrame import tigramite.data_processing as pp import numpy as np random_state = np.random.default_rng(seed=42) cmi = CMIknn(mask_type=None, significance='fixed_thres', sig_samples=100, sig_blocklength=1, transform='none', knn=0.1, verbosity=0) T = 1000 dimz = 1 # # Continuous data # z = random_state.standard_normal((T, dimz)) # x = (1.*z[:,0] + random_state.standard_normal(T)).reshape(T, 1) # y = (1.*z[:,0] + random_state.standard_normal(T)).reshape(T, 1) # print('X _|_ Y') # print(cmi.run_test_raw(x, y, z=None)) # print('X _|_ Y | Z') # print(cmi.run_test_raw(x, y, z=z)) # Continuous data z = random_state.standard_normal((T, dimz)) x = random_state.standard_normal(T).reshape(T, 1) y = (0.*z[:,0] + 1.*x[:,0] + random_state.standard_normal(T)).reshape(T, 1) data = np.hstack((x, y, z)) data[:,0] = 0.5 print (data.shape) dataframe = DataFrame(data=data) cmi.set_dataframe(dataframe) print(cmi.run_test(X=[(0, 0)], Y=[(1, 0)], alpha_or_thres=0.5 )) # print(cmi.get_model_selection_criterion(j=1, parents=[], tau_max=0)) # print(cmi.get_model_selection_criterion(j=1, parents=[(0, 0)], tau_max=0)) # print(cmi.get_model_selection_criterion(j=1, parents=[(0, 0), (2, 0)], tau_max=0))