Source code for tigramite.independence_tests.regressionCI

"""Tigramite causal discovery for time series."""

# Author: Tom Hochsprung <tom.hochsprung@dlr.de>, Jakob Runge <jakob@jakob-runge.com>
#
# License: GNU General Public License v3.0

import numpy as np

from scipy.stats import chi2, normaltest
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn import metrics

from .independence_tests_base import CondIndTest
# from numba import jit   # could make it even faster, also acticate @jit(forceobj=True)


[docs]class RegressionCI(CondIndTest): r"""Flexible parametric conditional independence tests for continuous, categorical, or mixed data. Assumes one-dimensional X, Y. Notes ----- To test :math:`X \perp Y | Z`, the regressions Y|XZ vs Y|Z, or, depending on certain criteria, X|YZ vs X|Z are compared. For that, the notion of the deviance is employed. If the fits of the respective regressions do not differ significantly (measured using the deviance), the null hypotheses of conditional independence is "accepted". This approach assumes that X and Y are univariate, and Z can be either empty, univariate or multivariate. Moreover, this approach works for all combinations of "discrete" and "continuous" X, Y and respective columns of Z; depending on the case, linear regression or multinomial regression is employed. Assumes one-dimensional X, Y. Parameters ---------- **kwargs : Arguments passed on to parent class CondIndTest. """ @property def measure(self): """ Concrete property to return the measure of the independence test """ return self._measure def __init__(self, **kwargs): # Setup the member variables self._measure = 'regression_ci' self.two_sided = False self.residual_based = False self.recycle_residuals = False CondIndTest.__init__(self, **kwargs)
[docs] def set_dataframe(self, dataframe): """Initialize and check the dataframe. Parameters ---------- dataframe : data object Set tigramite dataframe object. It must have the attributes dataframe.values yielding a numpy array of shape (observations T, variables N) and optionally a mask of the same shape and a missing values flag. """ self.dataframe = dataframe if self.mask_type is not None: if dataframe.mask is None: raise ValueError("mask_type is not None, but no mask in dataframe.") dataframe._check_mask(dataframe.mask) if dataframe.data_type is None: raise ValueError("data_type cannot be None for RegressionCI.") dataframe._check_mask(dataframe.data_type, check_data_type=True)
# @jit(forceobj=True)
[docs] def get_dependence_measure(self, array, xyz, data_type): """Returns test statistic. Parameters ---------- array : array-like data array with X, Y, Z in rows and observations in columns. xyz : array of ints XYZ identifier array of shape (dim,). data_type : array-like array of same shape as array which describes whether samples are continuous or discrete: 0s for continuous and 1s for discrete Returns ------- val : float test estimate. """ def convert_to_one_hot(data, nb_classes): """Convert an iterable of indices to one-hot encoded labels.""" targets = np.array(data).reshape(-1) # categories need to be mapped to 0, 1, ... in this function targets = targets - np.min(targets) return np.eye(nb_classes)[targets] def do_componentwise_one_hot_encoding(X, var_type): """A function that one-hot encodes all categorical components of X""" T, dim = X.shape X_new = np.empty([T, 0]) # componentwise dummy-encoding (if necessary, otherwise, keep component as usual): for i in range(0, len(var_type)): if var_type[i] == 1: nb_classes = len(set(X[:, i])) X_new = np.hstack((X_new, convert_to_one_hot(X[:, i].astype(int), nb_classes=nb_classes))) elif var_type[i] == 0: X_new = np.hstack((X_new, X[:, i].reshape((T, 1)))) else: raise ValueError("data_type only allows entries in {0, 1}") return X_new def calc_deviance_logistic(X, y, var_type): """Calculates the deviance (i.e., 2 * log-likelihood) for a multinomial logistic regression (with standard regression assumptions) """ # 1-hot-encode all categorical columns X = do_componentwise_one_hot_encoding(X, var_type=var_type) y = np.ravel(y) # do logistic regression model = LogisticRegression(multi_class='multinomial', solver='lbfgs') model.fit(X, y) deviance = 2*metrics.log_loss(y, model.predict_proba(X), normalize=False) # dofs: +2 for intercept (+1) (not too important, cancels out later anyway) dof = model.n_features_in_ + 1 return deviance, dof def calc_deviance_linear(X, y, var_type): """Calculates the deviance (i.e., 2 * log-likelihood) for a linear regression (with standard regression assumptions """ n, p = X.shape # p is not important for later # 1-hot-encode all categorical columns X = do_componentwise_one_hot_encoding(X, var_type = var_type) y = np.ravel(y) # do linear regression model = LinearRegression() model.fit(X, y) # predictions based on fitted model preds = model.predict(X) # residual sum of squares rss = np.sum(np.power((preds - y), 2)) # deviance (only the term with the rss-term is important, the rest cancels out later anyway) # deviance is calculated as -2*log-likelihood deviance = n * np.log(2 * np.pi) + n * np.log(rss / n) + n # dofs: +2 for intercept (+1) (not too important, cancels out later anyway) dof = model.n_features_in_ + 1 return deviance, dof def entropy(series): value, counts = np.unique(series, return_counts=True) norm_counts = counts / counts.sum() return -(norm_counts * np.log(norm_counts)).sum() x_indices = np.where(xyz == 0)[0] y_indices = np.where(xyz == 1)[0] z_indices = np.where(xyz == 2)[0] x = array[x_indices].T y = array[y_indices].T x_type = data_type[x_indices] y_type = data_type[y_indices] if len(z_indices) == 0: z = np.ones((array.shape[1], 1)) z_type = [0] else: z = array[z_indices].T z_type = data_type[z_indices] z_type = z_type.max(axis=1) # check, whether within X and within Y all datapoints have the same datatype if ((x_type.max() != x_type.min()) or (y_type.max() != y_type.min())): raise ValueError("All samples regarding X or respectively Y must have the same datatype") x_type = x_type.max() y_type = y_type.max() # if z was (originally) None, then just an intercept is fitted ... # Now, different cases for X discrete/continuous and Y discrete/continuous # Case 1: X continuous, Y continuous if (x_type == 0) and (y_type == 0): # Use the more normal variable as dependent variable TODO: makes sense? if normaltest(x)[0] >= normaltest(y)[0]: dep_var = y rest = np.hstack((x, z)) rest_type = np.hstack((x_type, z_type)) else: dep_var = x rest = np.hstack((y, z)) rest_type = np.hstack((y_type, z_type)) # Fit Y | Z dev1, dof1 = calc_deviance_linear(z, dep_var, var_type = z_type) # Fit Y | ZX dev2, dof2 = calc_deviance_linear(rest, dep_var, var_type=rest_type) # print(dev1, dev2, np.abs(dev1 - dev2)) # Case 2: X discrete, Y continuous elif (x_type == 1) and (y_type == 0): xz = np.hstack((x, z)) # Fit Y | Z dev1, dof1 = calc_deviance_linear(z, y, var_type = z_type) # Fit Y | XZ dev2, dof2 = calc_deviance_linear(xz, y, var_type = np.hstack((x_type, z_type))) # Case 3: X continuous, Y discrete elif (x_type == 0) and (y_type == 1): yz = np.hstack((y, z)) # Fit X | Z dev1, dof1 = calc_deviance_linear(z, x, var_type = z_type) # Fit X | YZ dev2, dof2 = calc_deviance_linear(yz, x, var_type = np.hstack((y_type, z_type))) # Case 4: X discrete, Y discrete elif (x_type == 1) and (y_type == 1): # Use the variable with smaller entropy as dependent variable TODO: makes sense? if entropy(x) >= entropy(y): dep_var = y rest = np.hstack((x, z)) rest_type = np.hstack((x_type, z_type)) else: dep_var = x rest = np.hstack((y, z)) rest_type = np.hstack((y_type, z_type)) # xz = np.hstack((x, z)) # Fit Y | Z dev1, dof1 = calc_deviance_logistic(z, dep_var, var_type = z_type) # Fit Y | XZ dev2, dof2 = calc_deviance_logistic(rest, dep_var, var_type=rest_type) # calculate the difference between the deviance for the smaller and for the larger model # (i.e., the actual deviance) stat = dev1 - dev2 dof = dof2 - dof1 self._temp_dof = dof return stat
[docs] def get_analytic_significance(self, value, T, dim, xyz): """Return the p_value of test statistic. According to a chi-square distribution with 'dof' degrees of freedom. """ # Calculate the p_value p_value = chi2.sf(value, self._temp_dof) del self._temp_dof return p_value
if __name__ == '__main__': import tigramite from tigramite.data_processing import DataFrame import tigramite.data_processing as pp import numpy as np seed=43 random_state = np.random.default_rng(seed=seed) ci = RegressionCI() T = 100 reals = 100 rate = np.zeros(reals) x_example = "continuous" y_example = "continuous" dimz = 1 # z_example = ["discrete", "continuous"] z_example = ["continuous"] #, "discrete"] # z_example = None rate = np.zeros(reals) for i in range(reals): if (dimz > 0): z = np.zeros((T, dimz)) for k in range(0, len(z_example)): if z_example[k] == "discrete": z[:, k] = random_state.binomial(n=1, p=0.5, size=T) else: z[:, k] = random_state.uniform(low = 0, high = 1, size=T) else: z = None x = np.empty(T).reshape(T, 1) y = np.empty(T).reshape(T, 1) for t in range(T): if dimz > 0: if z_example[0] == "discrete": val = z[t, 0].squeeze() prob = 0.2 + val * 0.6 else: prob = z[t, 0].squeeze() else: prob = 0.2 if x_example == "discrete": x[t] = random_state.choice([0, 1], p=[prob, 1. - prob]) else: x[t] = 0.1*random_state.random() # np.random.uniform(prob, 1) #np.random.normal(prob, 1) if y_example == "discrete": y[t] = random_state.choice([0, 1], p=[prob, (1. - prob)]) # + x[t] else: y[t] = random_state.normal(prob, 1) + 0.5*x[t] # # Continuous data # z = np.random.randn(T, dimz) # x = (0.5*z[:,0] + np.random.randn(T)).reshape(T, 1) # y = (0.5*z[:,0] + np.random.randn(T)).reshape(T, 1) #+ 2*x if x_example == "discrete": x_type = np.ones(T) else: x_type = np.zeros(T) if y_example == "discrete": y_type = np.ones(T) else: y_type = np.zeros(T) if dimz > 0: z_type = np.zeros((T, dimz)) for j in range(0, len(z_example)): if z_example[j] == "discrete": z_type[:, j] = np.ones(T) else: z_type[:, j] = np.zeros(T) else: z_type = None val, pval = ci.run_test_raw(x, y, z=z, x_type=x_type, y_type=y_type, z_type=z_type) rate[i] = pval # data = np.hstack((x, y, z)) # data_type = np.zeros(data.shape) # data_type[:, 0] = x_example == "discrete" # data_type[:, 1] = y_example == "discrete" # data_type[:, 2] = z_example == "discrete" # data_type = data_type.astype('int') # # print(data_type) # dataframe = pp.DataFrame(data=data, data_type=data_type) # ci.set_dataframe(dataframe) # val, pval = ci.run_test(X=[(0, 0)], Y=[(1, 0)], Z=[(2, 0)]) # rate[i] = pval print((rate <= 0.05).mean())