"""Tigramite causal discovery for time series."""
# Author: Jakob Runge <jakob@jakob-runge.com>
#
# License: GNU General Public License v3.0
from __future__ import print_function
from scipy import stats
import numpy as np
import sys
import warnings
from .independence_tests_base import CondIndTest
[docs]class ParCorrMult(CondIndTest):
r"""Partial correlation test for multivariate X and Y.
Multivariate partial correlation is estimated through ordinary least squares (OLS)
regression and some test for multivariate dependency among the residuals.
Notes
-----
To test :math:`X \perp Y | Z`, first :math:`Z` is regressed out from
:math:`X` and :math:`Y` assuming the model
.. math:: X & = Z \beta_X + \epsilon_{X} \\
Y & = Z \beta_Y + \epsilon_{Y}
using OLS regression. Then different measures for the dependency among the residuals
can be used. Currently only a test for zero correlation on the maximum of the residuals'
correlation is performed.
Parameters
----------
correlation_type : {'max_corr'}
Which dependency measure to use on residuals.
**kwargs :
Arguments passed on to Parent class CondIndTest.
"""
# documentation
@property
def measure(self):
"""
Concrete property to return the measure of the independence test
"""
return self._measure
def __init__(self, correlation_type='max_corr', **kwargs):
self._measure = 'par_corr_mult'
self.two_sided = True
self.residual_based = True
self.correlation_type = correlation_type
if self.correlation_type not in ['max_corr']:
raise ValueError("correlation_type must be in ['max_corr'].")
CondIndTest.__init__(self, **kwargs)
def _get_single_residuals(self, array, xyz, target_var,
standardize=True,
return_means=False):
"""Returns residuals of linear multiple regression.
Performs a OLS regression of the variable indexed by target_var on the
conditions Z. Here array is assumed to contain X and Y as the first two
rows with the remaining rows (if present) containing the conditions Z.
Optionally returns the estimated regression line.
Parameters
----------
array : array-like
data array with X, Y, Z in rows and observations in columns
xyz : array of ints
XYZ identifier array of shape (dim,).
target_var : {0, 1}
Variable to regress out conditions from.
standardize : bool, optional (default: True)
Whether to standardize the array beforehand. Must be used for
partial correlation.
return_means : bool, optional (default: False)
Whether to return the estimated regression line.
Returns
-------
resid [, mean] : array-like
The residual of the regression and optionally the estimated line.
"""
dim, T = array.shape
dim_z = (xyz == 2).sum()
# Standardize
if standardize:
array -= array.mean(axis=1).reshape(dim, 1)
std = array.std(axis=1)
for i in range(dim):
if std[i] != 0.:
array[i] /= std[i]
if np.any(std == 0.) and self.verbosity > 0:
warnings.warn("Possibly constant array!")
# array /= array.std(axis=1).reshape(dim, 1)
# if np.isnan(array).sum() != 0:
# raise ValueError("nans after standardizing, "
# "possibly constant array!")
y = np.fastCopyAndTranspose(array[np.where(xyz==target_var)[0], :])
if dim_z > 0:
z = np.fastCopyAndTranspose(array[np.where(xyz==2)[0], :])
beta_hat = np.linalg.lstsq(z, y, rcond=None)[0]
mean = np.dot(z, beta_hat)
resid = y - mean
else:
resid = y
mean = None
if return_means:
return (np.fastCopyAndTranspose(resid), np.fastCopyAndTranspose(mean))
return np.fastCopyAndTranspose(resid)
[docs] def get_dependence_measure(self, array, xyz):
"""Return multivariate kernel correlation coefficient.
Estimated as some dependency measure on the
residuals of a linear OLS regression.
Parameters
----------
array : array-like
data array with X, Y, Z in rows and observations in columns
xyz : array of ints
XYZ identifier array of shape (dim,).
Returns
-------
val : float
Partial correlation coefficient.
"""
dim, T = array.shape
dim_x = (xyz==0).sum()
dim_y = (xyz==1).sum()
x_vals = self._get_single_residuals(array, xyz, target_var=0)
y_vals = self._get_single_residuals(array, xyz, target_var=1)
array_resid = np.vstack((x_vals.reshape(dim_x, T), y_vals.reshape(dim_y, T)))
xyz_resid = np.array([index_code for index_code in xyz if index_code != 2])
val = self.mult_corr(array_resid, xyz_resid)
return val
[docs] def mult_corr(self, array, xyz, standardize=True):
"""Return multivariate dependency measure.
Parameters
----------
array : array-like
data array with X, Y in rows and observations in columns
xyz : array of ints
XYZ identifier array of shape (dim,).
standardize : bool, optional (default: True)
Whether to standardize the array beforehand. Must be used for
partial correlation.
Returns
-------
val : float
Multivariate dependency measure.
"""
dim, n = array.shape
dim_x = (xyz==0).sum()
dim_y = (xyz==1).sum()
# Standardize
if standardize:
array -= array.mean(axis=1).reshape(dim, 1)
std = array.std(axis=1)
for i in range(dim):
if std[i] != 0.:
array[i] /= std[i]
if np.any(std == 0.) and self.verbosity > 0:
warnings.warn("Possibly constant array!")
# array /= array.std(axis=1).reshape(dim, 1)
# if np.isnan(array).sum() != 0:
# raise ValueError("nans after standardizing, "
# "possibly constant array!")
x = array[np.where(xyz==0)[0]]
y = array[np.where(xyz==1)[0]]
if self.correlation_type == 'max_corr':
# Get (positive or negative) absolute maximum correlation value
corr = np.corrcoef(x, y)[:len(x), len(x):].flatten()
val = corr[np.argmax(np.abs(corr))]
# val = 0.
# for x_vals in x:
# for y_vals in y:
# val_here, _ = stats.pearsonr(x_vals, y_vals)
# val = max(val, np.abs(val_here))
# elif self.correlation_type == 'linear_hsci':
# # For linear kernel and standardized data (centered and divided by std)
# # biased V -statistic of HSIC reduces to sum of squared inner products
# # over all dimensions
# val = ((x.dot(y.T)/float(n))**2).sum()
else:
raise NotImplementedError("Currently only"
"correlation_type == 'max_corr' implemented.")
return val
[docs] def get_shuffle_significance(self, array, xyz, value,
return_null_dist=False):
"""Returns p-value for shuffle significance test.
For residual-based test statistics only the residuals are shuffled.
Parameters
----------
array : array-like
data array with X, Y, Z in rows and observations in columns
xyz : array of ints
XYZ identifier array of shape (dim,).
value : number
Value of test statistic for unshuffled estimate.
Returns
-------
pval : float
p-value
"""
dim, T = array.shape
dim_x = (xyz==0).sum()
dim_y = (xyz==1).sum()
x_vals = self._get_single_residuals(array, xyz, target_var=0)
y_vals = self._get_single_residuals(array, xyz, target_var=1)
array_resid = np.vstack((x_vals.reshape(dim_x, T), y_vals.reshape(dim_y, T)))
xyz_resid = np.array([index_code for index_code in xyz if index_code != 2])
null_dist = self._get_shuffle_dist(array_resid, xyz_resid,
self.get_dependence_measure,
sig_samples=self.sig_samples,
sig_blocklength=self.sig_blocklength,
verbosity=self.verbosity)
pval = (null_dist >= np.abs(value)).mean()
# Adjust p-value for two-sided measures
if pval < 1.:
pval *= 2.
# Adjust p-value for dimensions of x and y (conservative Bonferroni-correction)
# pval *= dim_x*dim_y
if return_null_dist:
return pval, null_dist
return pval
[docs] def get_analytic_significance(self, value, T, dim, xyz):
"""Returns analytic p-value depending on correlation_type.
Assumes two-sided correlation. If the degrees of freedom are less than
1, numpy.nan is returned.
Parameters
----------
value : float
Test statistic value.
T : int
Sample length
dim : int
Dimensionality, ie, number of features.
xyz : array of ints
XYZ identifier array of shape (dim,).
Returns
-------
pval : float or numpy.nan
P-value.
"""
# Get the number of degrees of freedom
deg_f = T - dim
dim_x = (xyz==0).sum()
dim_y = (xyz==1).sum()
if self.correlation_type == 'max_corr':
if deg_f < 1:
pval = np.nan
elif abs(abs(value) - 1.0) <= sys.float_info.min:
pval = 0.0
else:
trafo_val = value * np.sqrt(deg_f/(1. - value*value))
# Two sided significance level
pval = stats.t.sf(np.abs(trafo_val), deg_f) * 2
else:
raise NotImplementedError("Currently only"
"correlation_type == 'max_corr' implemented.")
# Adjust p-value for dimensions of x and y (conservative Bonferroni-correction)
pval *= dim_x*dim_y
return pval
[docs] def get_model_selection_criterion(self, j, parents, tau_max=0, corrected_aic=False):
"""Returns Akaike's Information criterion modulo constants.
Fits a linear model of the parents to each variable in j and returns
the average score. Leave-one-out cross-validation is asymptotically
equivalent to AIC for ordinary linear regression models. Here used to
determine optimal hyperparameters in PCMCI, in particular the
pc_alpha value.
Parameters
----------
j : int
Index of target variable in data array.
parents : list
List of form [(0, -1), (3, -2), ...] containing parents.
tau_max : int, optional (default: 0)
Maximum time lag. This may be used to make sure that estimates for
different lags in X, Z, all have the same sample size.
Returns:
score : float
Model score.
"""
Y = [(j, 0)]
X = [(j, 0)] # dummy variable here
Z = parents
array, xyz, _ = self.dataframe.construct_array(X=X, Y=Y, Z=Z,
tau_max=tau_max,
mask_type=self.mask_type,
return_cleaned_xyz=False,
do_checks=True,
verbosity=self.verbosity)
dim, T = array.shape
y = self._get_single_residuals(array, xyz, target_var=0)
n_comps = y.shape[0]
score = 0.
for y_component in y:
# Get RSS
rss = (y_component**2).sum()
# Number of parameters
p = dim - 1
# Get AIC
if corrected_aic:
comp_score = T * np.log(rss) + 2. * p + (2.*p**2 + 2.*p)/(T - p - 1)
else:
comp_score = T * np.log(rss) + 2. * p
score += comp_score
score /= float(n_comps)
return score
if __name__ == '__main__':
import tigramite
from tigramite.data_processing import DataFrame
# import numpy as np
import timeit
seed=3
random_state = np.random.default_rng(seed=seed)
cmi = ParCorrMult(
# significance = 'shuffle_test',
# sig_samples=1000,
)
samples=1
rate = np.zeros(1)
for i in range(1):
print(i)
data = random_state.standard_normal((100, 6))
data[:,2] += -0.5*data[:,0]
# data[:,1] += data[:,2]
dataframe = DataFrame(data,
# vector_vars={0:[(0,0), (1,0)], 1:[(2,0),(3,0)], 2:[(4,0),(5,0)]}
)
cmi.set_dataframe(dataframe)
pval = cmi.run_test(
X=[(0,0)],
Y=[(1,0)], #, (3, 0)],
# Z=[(5,0)]
Z = [(2, 0)]
)[1]
rate[i] = pval <= 0.1
cmi.get_model_selection_criterion(j=0, parents=[(1, 0), (2, 0)], tau_max=0, corrected_aic=False)
# print(cmi.run_test(X=[(0,0),(1,0)], Y=[(2,0), (3, 0)], Z=[(5,0)]))
print(rate.mean())