from __future__ import print_function
import numpy as np
import warnings
from .parcorr import ParCorr
from .robust_parcorr import RobustParCorr
# from tigramite.independence_tests.parcorr import ParCorr
# from tigramite.independence_tests.robust_parcorr import RobustParCorr
from tigramite import data_processing as pp
[docs]class ParCorrWLS(ParCorr):
r"""Weighted partial correlation test.
Partial correlation is estimated through linear weighted least squares (WLS)
regression and a test for non-zero linear Pearson correlation on the
residuals.
Either the variances, i.e. weights, are known, or they can be estimated using non-parametric regression
(using k nearest neighbour).
Notes
-----
To test :math:`X \perp Y | Z`, first :math:`Z` is regressed out from
:math:`X` and :math:`Y` assuming the model
.. math:: X & = Z \beta_X + \epsilon_{X} \\
Y & = Z \beta_Y + \epsilon_{Y}
using WLS regression. Here, we do not assume homoskedasticity of the error terms.
Then the dependency of the residuals is tested with
the Pearson correlation test.
.. math:: \rho\left(r_X, r_Y\right)
For the ``significance='analytic'`` Student's-*t* distribution with
:math:`T-D_Z-2` degrees of freedom is implemented.
Parameters
----------
gt_std_matrix: array-like, optional (default: None)
Standard deviations of the noise of shape (T, nb_nodes)
expert_knowledge: string or dict (default: time-dependent heteroskedasticity)
Either string "time-dependent heteroskedasticity" meaning that every variable only has time-dependent
heteroskedasticity, or string "homoskedasticity" where we assume homoskedasticity for all variables, or
dictionary containing expert knowledge about heteroskedastic relationships as list of tuples or strings.
window_size: int (default: 10)
Number of nearest neighbours that we are using for estimating the variance function.
robustify: bool (default: False)
Indicates whether the robust partial correlation test should be used, i.e. whether the data should be
transformed to normal marginals before testing
**kwargs :
Arguments passed on to Parent class ParCorr.
"""
# documentation
def __init__(self, gt_std_matrix=None,
expert_knowledge="time-dependent heteroskedasticity",
window_size=10, robustify=False, **kwargs):
self.gt_std_matrix = gt_std_matrix
self.expert_knowledge = expert_knowledge
self.window_size = window_size
self.robustify = robustify
self.stds = None
ParCorr.__init__(self,
recycle_residuals=False, # Doesn't work with ParCorrWLS
**kwargs)
self._measure = 'par_corr_wls'
def _stds_preparation(self, X, Y, Z, tau_max=0, cut_off='2xtau_max', verbosity=0):
"""Helper function to bring expert_knowledge into standard form."""
if self.expert_knowledge == "time-dependent heteroskedasticity":
self.expert_knowledge = {variable: ["time-dependent heteroskedasticity"]
for variable in range(self.dataframe.N)}
elif self.expert_knowledge == "homoskedasticity":
self.expert_knowledge = {}
def _get_array(self, X, Y, Z, tau_max=0, cut_off='2xtau_max', verbosity=0, return_cleaned_xyz=True,
remove_constant_data=False):
"""Convenience wrapper around construct_array. Simultaneously, construct self.stds which needs to correspond
to the variables in the array."""
if self.measure in ['par_corr_wls']:
if len(X) > 1 or len(Y) > 1:
raise ValueError("X and Y for %s must be univariate." % self.measure)
Z_orig = Z.copy()
expert_knowledge_XY = []
for var in [X[0][0], Y[0][0]]:
if type(self.expert_knowledge) != str and var in self.expert_knowledge:
expert_knowledge_XY += self.expert_knowledge[var]
# add heteroskedasticity-inducing parents to Z (later these are removed again)
# to obtain data cleaned the same as X and Y for weight estimation
for item in expert_knowledge_XY:
if type(item) == tuple:
Z += [item]
# Call the _get_array function of the parent class
if remove_constant_data:
array, xyz, XYZ, data_type, nonzero_array, nonzero_xyz, nonzero_XYZ, nonzero_data_type = super()._get_array(
X=X, Y=Y, Z=Z,
tau_max=tau_max,
cut_off=cut_off,
verbosity=verbosity,
remove_constant_data=remove_constant_data)
X, Y, Z = XYZ
flat_XYZ = X + Y + Z
counter = None if (len(Z) - len(Z_orig)) <= 0 else -1 * (len(Z) - len(Z_orig))
data_hs_parent = {}
for i, item in enumerate(expert_knowledge_XY):
if type(item) == tuple:
data_hs_parent[item] = array[flat_XYZ.index(item), :]
# stds have to correspond to array without the zero-rows
nonzero_array_copy = nonzero_array.copy()
nonzero_X, nonzero_Y, nonzero_Z = nonzero_XYZ
self._get_std_estimation(nonzero_array_copy, nonzero_X, nonzero_Y, nonzero_Z, tau_max,
cut_off, verbosity, data_hs_parent)
if data_type:
data_type = data_type[:counter]
nonzero_data_type = nonzero_data_type[:counter]
return array[:counter], xyz[:counter], (X, Y, Z[:counter]), data_type, \
nonzero_array[:counter], nonzero_xyz[:counter], (nonzero_X, nonzero_Y, nonzero_Z[:counter]), \
nonzero_data_type
else:
array, xyz, XYZ, data_type = super()._get_array(
X=X, Y=Y, Z=Z,
tau_max=tau_max,
cut_off=cut_off,
verbosity=verbosity,
remove_constant_data=remove_constant_data)
X, Y, Z = XYZ
flat_XYZ = X + Y + Z
counter = None if (len(Z) - len(Z_orig)) <= 0 else -1 * (len(Z) - len(Z_orig))
dim, T = array.shape
# save the data of the heteroskedasticity inducing parents to use for weight estimation
data_hs_parent = np.zeros((len(expert_knowledge_XY), T))
for i, item in enumerate(expert_knowledge_XY):
if type(item) == tuple:
data_hs_parent[i, :] = array[flat_XYZ.index(item), :]
array_copy = array.copy()
self._get_std_estimation(array_copy, X, Y, Z, tau_max, cut_off, verbosity, data_hs_parent)
if data_type:
data_type = data_type[:counter]
return array[:counter], xyz[:counter], (X, Y, Z[:counter]), data_type
def _estimate_std_time(self, arr, target_var):
"""
Estimate the standard deviations of the error terms using the squared-residuals approach. First calculate
the absolute value of the residuals using OLS, then smooth them using a sliding window while keeping the time
order of the residuals.
In this way we can approximate variances that are time-dependent.
Parameters
----------
arr: array
Data array of shape (dim, T)
target_var: {0, 1}
Variable to regress out conditions from.
Returns
-------
std_est: array
Standard deviation array of shape (T,)
"""
dim, T = arr.shape
dim_z = dim - 2
# Standardization not necessary for variance estimation
y = np.copy(arr[target_var, :])
if dim_z > 0:
z = np.fastCopyAndTranspose(arr[2:, :])
beta_hat = np.linalg.lstsq(z, y, rcond=None)[0]
mean = np.dot(z, beta_hat)
resid = abs(y - mean)
else:
resid = abs(y)
# average variance within window
std_est = np.concatenate(
(np.ones(self.window_size - 1), np.convolve(resid, np.ones(self.window_size), 'valid') / self.window_size))
return std_est
def _estimate_std_parent(self, arr, target_var, target_lag, H, data_hs_parent):
"""
Estimate the standard deviations of the error terms using a residual-based approach.
First calculate the absolute value of the residuals using OLS, then smooth them by averaging over the k ones
that are closest in H-value. In this way we are able to deal with parent-dependent heteroskedasticity.
Parameters
----------
arr: array
Data array of shape (dim, T)
target_var: {0, 1}
Variable to obtain noise variance approximation for.
target_lag: -int
Lag of the variable to obtain noise variance approximation for.
H: of the form [(var, -tau)], where var specifies the variable index and tau the time lag
Variable to use for the sorting of the residuals, i.e. variable that the heteroskedasticity depends on.
Returns
-------
std_est: array
Standard deviation array of shape (T,)
"""
dim, T = arr.shape
dim_z = dim - 2
y = np.copy(arr[target_var, :])
if dim_z > 0:
z = np.fastCopyAndTranspose(arr[2:, :])
beta_hat = np.linalg.lstsq(z, y, rcond=None)[0]
mean = np.dot(z, beta_hat)
resid = abs(y - mean)
lag = H[1] + target_lag
# order the residuals w.r.t. the heteroskedasticity-inducing parent corresponding to sample h
h = data_hs_parent[-1 * lag:]
ordered_z_ind = np.argsort(h)
ordered_z_ind = ordered_z_ind * (ordered_z_ind > 0)
revert_argsort = np.argsort(ordered_z_ind)
truncate_resid = resid[np.abs(lag):]
sorted_resid = truncate_resid[ordered_z_ind]
# smooth the nearest neighbour residuals
variance_est_sorted = np.concatenate(
(np.ones(self.window_size - 1),
np.convolve(sorted_resid, np.ones(self.window_size), 'valid') / self.window_size,))
std_est = variance_est_sorted[revert_argsort]
std_est = np.concatenate((std_est, np.ones(np.abs(lag))))
std_est = np.roll(std_est, np.abs(lag))
else:
resid = abs(y)
std_est = np.concatenate(
(np.ones(self.window_size - 1),
np.convolve(resid, np.ones(self.window_size), 'valid') / self.window_size))
return std_est
def _get_std_estimation(self, array, X, Y, Z=[], tau_max=0, cut_off='2xtau_max', verbosity=0, data_hs_parent=None):
"""Use expert knowledge on the heteroskedastic relationships contained in self.expert_knowledge to estimate the
standard deviations of the error terms.
The expert knowledge can specify whether there is sampling index / time dependent heteroskedasticity,
heteroskedasticity with respect to a specified parent, or homoskedasticity.
Parameters
----------
array : array
Data array of shape (dim, T)
X, Y : list of tuples
X,Y are of the form [(var, -tau)], where var specifies the
variable index and tau the time lag.
Return
------
stds: array-like
Array of standard deviations of error terms for X and Y of shape (2, T).
"""
self._stds_preparation(X, Y, Z, tau_max, cut_off, verbosity)
dim, T = array.shape
if self.gt_std_matrix is not None:
stds_dataframe = pp.DataFrame(self.gt_std_matrix,
mask=self.dataframe.mask,
missing_flag=self.dataframe.missing_flag,
datatime={0: np.arange(len(self.gt_std_matrix[:, 0]))})
stds, _, _, _ = stds_dataframe.construct_array(X=X, Y=Y, Z=Z,
tau_max=tau_max,
mask_type=self.mask_type,
return_cleaned_xyz=True,
do_checks=True,
remove_overlaps=True,
cut_off=cut_off,
verbosity=verbosity)
else:
stds = np.ones((2, T))
for count, variable in enumerate([X[0], Y[0]]):
# Here we assume that it is known what the heteroskedasticity function depends on for every variable
if variable[0] in self.expert_knowledge:
hs_source = self.expert_knowledge[variable[0]][0]
if hs_source == "time-dependent heteroskedasticity":
stds[count] = self._estimate_std_time(array, count)
elif type(hs_source) is tuple:
stds[count] = self._estimate_std_parent(array, count, variable[1],
hs_source, data_hs_parent[hs_source])
self.stds = stds
return stds
def _get_single_residuals(self, array, target_var,
standardize=False,
return_means=False):
"""Returns residuals of weighted linear multiple regression.
Performs a WLS regression of the variable indexed by target_var on the
conditions Z. Here array is assumed to contain X and Y as the first two
rows with the remaining rows (if present) containing the conditions Z.
Optionally returns the estimated regression line.
Parameters
----------
array : array-like
data array with X, Y, Z in rows and observations in columns.
target_var : {0, 1}
Variable to regress out conditions from.
standardize : bool, optional (default: True)
Whether to standardize the array beforehand. Must be used for
partial correlation.
return_means : bool, optional (default: False)
Whether to return the estimated regression line.
Returns
-------
resid [, mean] : array-like
The residual of the regression and optionally the estimated line.
"""
dim, T = array.shape
dim_z = dim - 2
x_vals_sum = np.sum(array)
x_vals_has_nan = np.isnan(x_vals_sum)
if x_vals_has_nan:
raise ValueError("array has nans")
try:
stds = self.stds[target_var]
except TypeError:
warnings.warn("No estimated or ground truth standard deviations supplied for weights. "
"Assume homoskedasticity, i.e. all weights are 1.")
stds = np.ones(T)
# Standardize
if standardize:
array -= array.mean(axis=1).reshape(dim, 1)
std = array.std(axis=1)
for i in range(dim):
if std[i] != 0.:
array[i] /= std[i]
if np.any(std == 0.) and self.verbosity > 0:
warnings.warn("Possibly constant array!")
x_vals_sum = np.sum(array)
x_vals_has_nan = np.isnan(x_vals_sum)
if x_vals_has_nan:
raise ValueError("array has nans")
y = np.copy(array[target_var, :])
weights = np.diag(np.reciprocal(stds))
if dim_z > 0:
z = np.fastCopyAndTranspose(array[2:, :])
# include weights in z and y
zw = np.dot(weights, z)
yw = np.dot(y, weights)
beta_hat = np.linalg.lstsq(zw, yw, rcond=None)[0]
mean = np.dot(z, beta_hat)
resid = np.dot(y - mean, weights)
resid_vals_sum = np.sum(resid)
resid_vals_has_nan = np.isnan(resid_vals_sum)
if resid_vals_has_nan:
raise ValueError("resid has nans")
else:
# resid = y
resid = np.dot(y, weights)
mean = None
if return_means:
return resid, mean
return resid
[docs] def get_dependence_measure(self, array, xyz):
if self.robustify:
array = RobustParCorr.trafo2normal(self, array)
return ParCorr.get_dependence_measure(self, array, xyz)
[docs] def get_shuffle_significance(self, array, xyz, value,
return_null_dist=False):
if self.robustify:
array = RobustParCorr.trafo2normal(self, array)
return ParCorr.get_shuffle_significance(self, array, xyz, value,
return_null_dist=False)
[docs] def get_model_selection_criterion(self, j, parents, tau_max=0, corrected_aic=False):
"""Returns Akaike's Information criterion modulo constants.
Fits a linear model of the parents to variable j and returns the
score. Leave-one-out cross-validation is asymptotically equivalent to
AIC for ordinary linear regression models. Here used to determine
optimal hyperparameters in PCMCI, in particular the pc_alpha value.
Parameters
----------
j : int
Index of target variable in data array.
parents : list
List of form [(0, -1), (3, -2), ...] containing parents.
tau_max : int, optional (default: 0)
Maximum time lag. This may be used to make sure that estimates for
different lags in X, Z, all have the same sample size.
Returns:
score : float
Model score.
"""
Y = [(j, 0)]
X = [(j, 0)] # dummy variable here
Z = parents
array, xyz, _, _ = self._get_array(X, Y, Z, tau_max=tau_max, verbosity=self.verbosity,
return_cleaned_xyz=False)
dim, T = array.shape
# Transform to normal marginals
if self.robustify:
array = RobustParCorr.trafo2normal(self, array)
y = self._get_single_residuals(array, target_var=1, return_means=False)
# Get RSS
rss = (y ** 2).sum()
# Number of parameters
p = dim - 1
# Get AIC
if corrected_aic:
score = T * np.log(rss) + 2. * p + (2. * p ** 2 + 2. * p) / (T - p - 1)
else:
score = T * np.log(rss) + 2. * p
return score