Source code for tigramite.independence_tests.robust_parcorr

"""Tigramite causal discovery for time series."""

# Author: Jakob Runge <jakob@jakob-runge.com>
#
# License: GNU General Public License v3.0

from __future__ import print_function
from scipy import stats
import numpy as np
import sys
import warnings

from .independence_tests_base import CondIndTest

[docs]class RobustParCorr(CondIndTest): r"""Robust partial correlation test based on non-paranormal models. Partial correlation is estimated through transformation to standard normal marginals, ordinary least squares (OLS) regression, and a test for non-zero linear Pearson correlation on the residuals. Notes ----- To test :math:`X \perp Y | Z`, firstly, each marginal is transformed to be standard normally distributed. For that, the transform :math:`\Phi^{-1}\circ\hat{F}` is used. Here, :math:`\Phi^{-1}` is the quantile function of a standard normal distribution and :math:`\hat{F}` is the empirical distribution function for the respective marginal. This idea stems from the literature on nonparanormal models, see: - Han Liu, John Lafferty, and Larry Wasserman. The nonparanormal: semiparametric estimation of high dimensional undirected graphs. J. Mach. Learn. Res., 10:2295–2328, 2009. - Han Liu, Fang Han, Ming Yuan, John Lafferty, and Larry Wasserman. High-dimensional semiparametric Gaussian copula graphical models. Ann. Statist., 40(4):2293–2326, 2012a. - Naftali Harris, Mathias Drton. PC Algorithm for Nonparanormal Graphical Models. Journal of Machine Learning Research, 14: 3365-3383, 2013. Afterwards (where Z, X, and Y are now assumed to be transformed to the standard normal scale): :math:`Z` is regressed out from :math:`X` and :math:`Y` assuming the model .. math:: X & = Z \beta_X + \epsilon_{X} \\ Y & = Z \beta_Y + \epsilon_{Y} using OLS regression. Then the dependency of the residuals is tested with the Pearson correlation test. .. math:: \rho\left(r_X, r_Y\right) For the ``significance='analytic'`` Student's-*t* distribution with :math:`T-D_Z-2` degrees of freedom is implemented. Parameters ---------- **kwargs : Arguments passed on to Parent class CondIndTest. """ @property def measure(self): """ Concrete property to return the measure of the independence test """ return self._measure def __init__(self, **kwargs): self._measure = 'robust_par_corr' self.two_sided = True self.residual_based = True CondIndTest.__init__(self, **kwargs)
[docs] def trafo2normal(self, x, thres=0.00001): """Transforms input array to standard normal marginals. For that, the code first transforms to uniform :math:`[0,1]` marginals using the empirical distribution function, and then transforms to normal marginals by applying the quantile function of a standard normal. Assumes x.shape = (dim, T) Parameters ---------- x : array-like Input array. thres : float Small number between 0 and 1; after transformation to the uniform scale, all values that are too close to zero are replaced by thres, similarly, all values that are too close to one, are replaced by 1-thres. This avoids NaNs. Returns ------- normal : array-like array with normal marginals. """ def trafo(xi): xisorted = np.sort(xi) yi = np.linspace(1. / len(xi), 1, len(xi)) return np.interp(xi, xisorted, yi) if np.ndim(x) == 1: u = trafo(x) u[u==0.] = thres u[u==1.] = 1. - thres normal = stats.norm.ppf(u) else: normal = np.empty(x.shape) for i in range(x.shape[0]): uniform = trafo(x[i]) uniform[uniform==0.] = thres uniform[uniform==1.] = 1. - thres normal[i] = stats.norm.ppf(uniform) return normal
def _get_single_residuals(self, array, target_var, standardize=True, return_means=False): """Returns residuals of linear multiple regression. Performs a OLS regression of the variable indexed by target_var on the conditions Z. Here array is assumed to contain X and Y as the first two rows with the remaining rows (if present) containing the conditions Z. Optionally returns the estimated regression line. Parameters ---------- array : array-like data array with X, Y, Z in rows and observations in columns target_var : {0, 1} Variable to regress out conditions from. standardize : bool, optional (default: True) Whether to standardize the array beforehand. Must be used for partial correlation. return_means : bool, optional (default: False) Whether to return the estimated regression line. Returns ------- resid [, mean] : array-like The residual of the regression and optionally the estimated line. """ dim, T = array.shape dim_z = dim - 2 # Standardize if standardize: array -= array.mean(axis=1).reshape(dim, 1) std = array.std(axis=1) for i in range(dim): if std[i] != 0.: array[i] /= std[i] if np.any(std == 0.) and self.verbosity > 0: warnings.warn("Possibly constant array!") # array /= array.std(axis=1).reshape(dim, 1) # if np.isnan(array).sum() != 0: # raise ValueError("nans after standardizing, " # "possibly constant array!") y = array[target_var, :] if dim_z > 0: z = np.fastCopyAndTranspose(array[2:, :]) beta_hat = np.linalg.lstsq(z, y, rcond=None)[0] mean = np.dot(z, beta_hat) resid = y - mean else: resid = y mean = None if return_means: return (resid, mean) return resid
[docs] def get_dependence_measure(self, array, xyz, data_type=None): """Return partial correlation. Marginals are firstly transformed to standard normal scale. Dependence Measure is then estimated as the Pearson correlation of the residuals of a linear OLS regression. Parameters ---------- array : array-like data array with X, Y, Z in rows and observations in columns xyz : array of ints XYZ identifier array of shape (dim,). Returns ------- val : float Partial correlation coefficient. """ # Transform to normal marginals array = self.trafo2normal(array) x_vals = self._get_single_residuals(array, target_var=0) y_vals = self._get_single_residuals(array, target_var=1) val, _ = stats.pearsonr(x_vals, y_vals) return val
[docs] def get_shuffle_significance(self, array, xyz, value, return_null_dist=False): """Returns p-value for shuffle significance test. Firstly, each marginal is transformed to the standard normal scale. For residual-based test statistics only the residuals are shuffled. Parameters ---------- array : array-like data array with X, Y, Z in rows and observations in columns xyz : array of ints XYZ identifier array of shape (dim,). value : number Value of test statistic for unshuffled estimate. Returns ------- pval : float p-value """ # Transform to normal marginals array = self.trafo2normal(array) x_vals = self._get_single_residuals(array, target_var=0) y_vals = self._get_single_residuals(array, target_var=1) array_resid = np.array([x_vals, y_vals]) xyz_resid = np.array([0, 1]) null_dist = self._get_shuffle_dist(array_resid, xyz_resid, self.get_dependence_measure, sig_samples=self.sig_samples, sig_blocklength=self.sig_blocklength, verbosity=self.verbosity) pval = (null_dist >= np.abs(value)).mean() # Adjust p-value for two-sided measures if pval < 1.: pval *= 2. if return_null_dist: return pval, null_dist return pval
[docs] def get_analytic_significance(self, value, T, dim, xyz): """Returns analytic p-value from Student's t-test for the Pearson correlation coefficient. Assumes two-sided correlation. If the degrees of freedom are less than 1, numpy.nan is returned. Parameters ---------- value : float Test statistic value. T : int Sample length dim : int Dimensionality, ie, number of features. xyz : array of ints XYZ identifier array of shape (dim,). Returns ------- pval : float or numpy.nan P-value. """ # Get the number of degrees of freedom deg_f = T - dim if deg_f < 1: pval = np.nan elif abs(abs(value) - 1.0) <= sys.float_info.min: pval = 0.0 else: trafo_val = value * np.sqrt(deg_f/(1. - value*value)) # Two sided significance level pval = stats.t.sf(np.abs(trafo_val), deg_f) * 2 return pval
[docs] def get_analytic_confidence(self, value, df, conf_lev): """Returns analytic confidence interval for correlation coefficient. Based on Student's t-distribution. Parameters ---------- value : float Test statistic value. df : int degrees of freedom of the test conf_lev : float Confidence interval, eg, 0.9 Returns ------- (conf_lower, conf_upper) : Tuple of floats Upper and lower confidence bound of confidence interval. """ # Confidence interval is two-sided c_int = (1. - (1. - conf_lev) / 2.) value_tdist = value * np.sqrt(df) / np.sqrt(1. - value**2) conf_lower = (stats.t.ppf(q=1. - c_int, df=df, loc=value_tdist) / np.sqrt(df + stats.t.ppf(q=1. - c_int, df=df, loc=value_tdist)**2)) conf_upper = (stats.t.ppf(q=c_int, df=df, loc=value_tdist) / np.sqrt(df + stats.t.ppf(q=c_int, df=df, loc=value_tdist)**2)) return (conf_lower, conf_upper)
[docs] def get_model_selection_criterion(self, j, parents, tau_max=0, corrected_aic=False): """Returns Akaike's Information criterion modulo constants. First of all, each marginal is transformed to the standard normal scale. For this, each marginal is transformed to the uniform scale using the empirical distribution function and then, transformed to the standard normal scale by applying the quantile function of a standard normal. Afterwards, fits a linear model of the parents to variable j and returns the score. Leave-one-out cross-validation is asymptotically equivalent to AIC for ordinary linear regression models. Here used to determine optimal hyperparameters in PCMCI(plus), in particular the pc_alpha value. Parameters ---------- j : int Index of target variable in data array. parents : list List of form [(0, -1), (3, -2), ...] containing parents. tau_max : int, optional (default: 0) Maximum time lag. This may be used to make sure that estimates for different lags in X, Z, all have the same sample size. Returns: score : float Model score. """ Y = [(j, 0)] X = [(j, 0)] # dummy variable here Z = parents array, xyz, _ = self.dataframe.construct_array(X=X, Y=Y, Z=Z, tau_max=tau_max, mask_type=self.mask_type, return_cleaned_xyz=False, do_checks=True, verbosity=self.verbosity) dim, T = array.shape # Transform to normal marginals array = self.trafo2normal(array) y = self._get_single_residuals(array, target_var=1, return_means=False) # Get RSS rss = (y**2).sum() # Number of parameters p = dim - 1 # Get AIC if corrected_aic: score = T * np.log(rss) + 2. * p + (2.*p**2 + 2.*p)/(T - p - 1) else: score = T * np.log(rss) + 2. * p return score