Source code for tigramite.models
"""Tigramite causal inference for time series."""
# Author: Jakob Runge <jakob@jakob-runge.com>
#
# License: GNU General Public License v3.0
from __future__ import print_function
from copy import deepcopy
import json, warnings, os, pathlib
import numpy as np
import sklearn
import sklearn.linear_model
import networkx
from tigramite.data_processing import DataFrame
from tigramite.pcmci import PCMCI
[docs]class Models():
"""Base class for time series models.
Allows to fit any model from sklearn to the parents of a target variable.
Also takes care of missing values, masking and preprocessing. If the
target variable is multivariate, a model that supports multi-output
regression must be used. Note that
sklearn.multioutput.MultiOutputRegressor allows to extend single-output
models.
Parameters
----------
dataframe : data object
Tigramite dataframe object. It must have the attributes dataframe.values
yielding a numpy array of shape (observations T, variables N) and
optionally a mask of the same shape and a missing values flag.
model : sklearn model object
For example, sklearn.linear_model.LinearRegression() for a linear
regression model.
conditional_model : sklearn model object, optional (default: None)
Used to fit conditional causal effects in nested regression.
If None, model is used.
data_transform : sklearn preprocessing object, optional (default: None)
Used to transform data prior to fitting. For example,
sklearn.preprocessing.StandardScaler for simple standardization. The
fitted parameters are stored. Note that the inverse_transform is then
applied to the predicted data.
mask_type : {None, 'y','x','z','xy','xz','yz','xyz'}
Masking mode: Indicators for which variables in the dependence
measure I(X; Y | Z) the samples should be masked. If None, the mask
is not used. Explained in tutorial on masking and missing values.
verbosity : int, optional (default: 0)
Level of verbosity.
"""
def __init__(self,
dataframe,
model,
conditional_model=None,
data_transform=None,
mask_type=None,
verbosity=0):
# Set the mask type and dataframe object
self.mask_type = mask_type
self.dataframe = dataframe
# Get the number of nodes and length for this dataset
self.N = self.dataframe.N
self.T = self.dataframe.T
# Set the model to be used
self.model = model
if conditional_model is None:
self.conditional_model = model
else:
self.conditional_model = conditional_model
# Set the data_transform object and verbosity
self.data_transform = data_transform
self.verbosity = verbosity
# Initialize the object that will be set later
self.all_parents = None
self.selected_variables = None
self.tau_max = None
self.fit_results = None
# @profile
[docs] def get_general_fitted_model(self,
Y, X, Z=None,
conditions=None,
tau_max=None,
cut_off='max_lag_or_tau_max',
empty_predictors_function=np.mean,
return_data=False):
"""Fit time series model.
For each variable in selected_variables, the sklearn model is fitted
with :math:`y` given by the target variable(s), and :math:`X` given by its
parents. The fitted model class is returned for later use.
Parameters
----------
X, Y, Z : lists of tuples
List of variables for estimating model Y = f(X,Z)
conditions : list of tuples.
Conditions for estimating conditional causal effects.
tau_max : int, optional (default: None)
Maximum time lag. If None, the maximum lag in all_parents is used.
cut_off : {'max_lag_or_tau_max', '2xtau_max', 'max_lag'}
How many samples to cutoff at the beginning. The default is
'max_lag_or_tau_max', which uses the maximum of tau_max and the
conditions. This is useful to compare multiple models on the same
sample. Other options are '2xtau_max', which guarantees that MCI
tests are all conducted on the same samples. Last, 'max_lag' uses
as much samples as possible.
empty_predictors_function : function
Function to apply to y if no predictors are given.
return_data : bool, optional (default: False)
Whether to save the data array.
Returns
-------
fit_results : dictionary of sklearn model objects
Returns the sklearn model after fitting. Also returns the data
transformation parameters.
"""
def get_vectorized_length(W):
return sum([len(self.dataframe.vector_vars[w[0]]) for w in W])
self.X = X
self.Y = Y
if conditions is None:
conditions = []
self.conditions = conditions
if Z is not None:
Z = [z for z in Z if z not in conditions]
self.Z = Z
# lenX = len(self.X)
# lenS = len(self.conditions)
self.lenX = get_vectorized_length(self.X)
self.lenS = get_vectorized_length(self.conditions)
self.cut_off = cut_off
# Find the maximal conditions lag
max_lag = 0
for y in self.Y:
this_lag = np.abs(np.array(self.X + self.Z + self.conditions)[:, 1]).max()
max_lag = max(max_lag, this_lag)
# Set the default tau max and check if it should be overwritten
if tau_max is None:
self.tau_max = max_lag
else:
self.tau_max = tau_max
if self.tau_max < max_lag:
raise ValueError("tau_max = %d, but must be at least "
" max_lag = %d"
"" % (self.tau_max, max_lag))
# Construct array of shape (var, time)
array, xyz, _ = \
self.dataframe.construct_array(X=self.X, Y=self.Y,
Z=self.conditions,
extraZ=self.Z,
tau_max=self.tau_max,
mask_type=self.mask_type,
cut_off=self.cut_off,
remove_overlaps=True,
verbosity=self.verbosity)
# Transform the data if needed
self.fitted_data_transform = None
if self.data_transform is not None:
# Fit only X, Y, and S for later use in transforming input
X_transform = deepcopy(self.data_transform)
x_indices = list(np.where(xyz==0)[0])
X_transform.fit(array[x_indices, :].T)
self.fitted_data_transform = {'X': X_transform}
Y_transform = deepcopy(self.data_transform)
y_indices = list(np.where(xyz==1)[0])
Y_transform.fit(array[y_indices, :].T)
self.fitted_data_transform['Y'] = Y_transform
if len(self.conditions) > 0:
S_transform = deepcopy(self.data_transform)
s_indices = list(np.where(xyz==2)[0])
S_transform.fit(array[s_indices, :].T)
self.fitted_data_transform['S'] = S_transform
# Now transform whole array
all_transform = deepcopy(self.data_transform)
array = all_transform.fit_transform(X=array.T).T
# Fit the model
# Copy and fit the model
a_model = deepcopy(self.model)
predictor_indices = list(np.where(xyz==0)[0]) \
+ list(np.where(xyz==3)[0]) \
+ list(np.where(xyz==2)[0])
predictor_array = array[predictor_indices, :].T
target_array = array[np.where(xyz==1)[0], :].T
if predictor_array.size == 0:
# Just fit default (eg, mean)
class EmptyPredictorModel:
def fit(self, X, y):
if y.ndim == 1:
self.result = empty_predictors_function(y)
else:
self.result = empty_predictors_function(y, axis=0)
def predict(self, X):
return self.result
a_model = EmptyPredictorModel()
a_model.fit(X=predictor_array, y=target_array)
# Cache the results
fit_results = {}
fit_results['observation_array'] = array
fit_results['xyz'] = xyz
fit_results['model'] = a_model
# Cache the data transform
fit_results['fitted_data_transform'] = self.fitted_data_transform
# Cache and return the fit results
self.fit_results = fit_results
return fit_results
# @profile
[docs] def get_general_prediction(self,
intervention_data,
conditions_data=None,
pred_params=None,
transform_interventions_and_prediction=False,
return_further_pred_results=False,
aggregation_func=np.mean,
):
r"""Predict effect of intervention with fitted model.
Uses the model.predict() function of the sklearn model.
Parameters
----------
intervention_data : numpy array
Numpy array of shape (time, len(X)) that contains the do(X) values.
conditions_data : data object, optional
Numpy array of shape (time, len(S)) that contains the S=s values.
pred_params : dict, optional
Optional parameters passed on to sklearn prediction function (model and
conditional_model).
transform_interventions_and_prediction : bool (default: False)
Whether to perform the inverse data_transform on prediction results.
return_further_pred_results : bool, optional (default: False)
In case the predictor class returns more than just the expected value,
the entire results can be returned.
aggregation_func : callable
Callable applied to output of 'predict'. Default is 'np.mean'.
Returns
-------
Results from prediction.
"""
intervention_T, _ = intervention_data.shape
if intervention_data.shape[1] != self.lenX:
raise ValueError("intervention_data.shape[1] must be len(X).")
if conditions_data is not None:
if conditions_data.shape[1] != len(self.conditions):
raise ValueError("conditions_data.shape[1] must be len(S).")
if conditions_data.shape[0] != intervention_data.shape[0]:
raise ValueError("conditions_data.shape[0] must match intervention_data.shape[0].")
# Print message
if self.verbosity > 1:
print("\n## Predicting target %s" % str(self.Y))
if pred_params is not None:
for key in list(pred_params):
print("%s = %s" % (key, pred_params[key]))
# Default value for pred_params
if pred_params is None:
pred_params = {}
# Check the model is fitted.
if self.fit_results is None:
raise ValueError("Model not yet fitted.")
# Transform the data if needed
fitted_data_transform = self.fit_results['fitted_data_transform']
if transform_interventions_and_prediction and fitted_data_transform is not None:
intervention_data = fitted_data_transform['X'].transform(X=intervention_data)
if self.conditions is not None and conditions_data is not None:
conditions_data = fitted_data_transform['S'].transform(X=conditions_data)
# Extract observational Z from stored array
z_indices = list(np.where(self.fit_results['xyz']==3)[0])
z_array = self.fit_results['observation_array'][z_indices, :].T
Tobs = len(self.fit_results['observation_array'].T)
if self.conditions is not None and conditions_data is not None:
s_indices = list(np.where(self.fit_results['xyz']==2)[0])
s_array = self.fit_results['observation_array'][s_indices, :].T
pred_dict = {}
# Now iterate through interventions (and potentially S)
for index, dox_vals in enumerate(intervention_data):
# Construct XZS-array
intervention_array = dox_vals.reshape(1, self.lenX) * np.ones((Tobs, self.lenX))
if self.conditions is not None and conditions_data is not None:
conditions_array = conditions_data[index].reshape(1, self.lenS) * np.ones((Tobs, self.lenS))
predictor_array = np.hstack((intervention_array, z_array, conditions_array))
else:
predictor_array = np.hstack((intervention_array, z_array))
predicted_vals = self.fit_results['model'].predict(
X=predictor_array, **pred_params)
if self.conditions is not None and conditions_data is not None:
a_conditional_model = deepcopy(self.conditional_model)
if type(predicted_vals) is tuple:
predicted_vals_here = predicted_vals[0]
else:
predicted_vals_here = predicted_vals
a_conditional_model.fit(X=s_array, y=predicted_vals_here)
self.fit_results['conditional_model'] = a_conditional_model
predicted_vals = a_conditional_model.predict(
X=conditions_array, **pred_params)
if transform_interventions_and_prediction and fitted_data_transform is not None:
predicted_vals = fitted_data_transform['Y'].inverse_transform(X=predicted_vals.reshape(-1, 1)).squeeze()
pred_dict[index] = predicted_vals
# Apply aggregation function
if type(predicted_vals) is tuple:
aggregated_pred = aggregation_func(predicted_vals[0], axis=0)
else:
aggregated_pred = aggregation_func(predicted_vals, axis=0)
aggregated_pred = aggregated_pred.squeeze()
if index == 0:
predicted_array = np.zeros((intervention_T, ) + aggregated_pred.shape,
dtype=aggregated_pred.dtype)
predicted_array[index] = aggregated_pred
# if fitted_data_transform is not None:
# rescaled = fitted_data_transform['Y'].inverse_transform(X=predicted_array[index, iy].reshape(-1, 1))
# predicted_array[index, iy] = rescaled.squeeze()
if return_further_pred_results:
return predicted_array, pred_dict
else:
return predicted_array
[docs] def fit_full_model(self, all_parents,
selected_variables=None,
tau_max=None,
cut_off='max_lag_or_tau_max',
empty_predictors_function=np.mean,
return_data=False):
"""Fit time series model.
For each variable in selected_variables, the sklearn model is fitted
with :math:`y` given by the target variable, and :math:`X` given by its
parents. The fitted model class is returned for later use.
Parameters
----------
all_parents : dictionary
Dictionary of form {0:[(0, -1), (3, 0), ...], 1:[], ...} containing
the parents estimated with PCMCI.
selected_variables : list of integers, optional (default: range(N))
Specify to estimate parents only for selected variables. If None is
passed, parents are estimated for all variables.
tau_max : int, optional (default: None)
Maximum time lag. If None, the maximum lag in all_parents is used.
cut_off : {'max_lag_or_tau_max', '2xtau_max', 'max_lag'}
How many samples to cutoff at the beginning. The default is
'max_lag_or_tau_max', which uses the maximum of tau_max and the
conditions. This is useful to compare multiple models on the same
sample. Other options are '2xtau_max', which guarantees that MCI
tests are all conducted on the same samples. Last, 'max_lag' uses
as much samples as possible.
empty_predictors_function : function
Function to apply to y if no predictors are given.
return_data : bool, optional (default: False)
Whether to save the data array.
Returns
-------
fit_results : dictionary of sklearn model objects for each variable
Returns the sklearn model after fitting. Also returns the data
transformation parameters.
"""
# Initialize the fit by setting the instance's all_parents attribute
self.all_parents = all_parents
# Set the default selected variables to all variables and check if this
# should be overwritten
self.selected_variables = range(self.N)
if selected_variables is not None:
self.selected_variables = selected_variables
# Find the maximal parents lag
max_parents_lag = 0
for j in self.selected_variables:
if all_parents[j]:
this_parent_lag = np.abs(np.array(all_parents[j])[:, 1]).max()
max_parents_lag = max(max_parents_lag, this_parent_lag)
# Set the default tau_max and check if it should be overwritten
self.tau_max = max_parents_lag
if tau_max is not None:
self.tau_max = tau_max
if self.tau_max < max_parents_lag:
raise ValueError("tau_max = %d, but must be at least "
" max_parents_lag = %d"
"" % (self.tau_max, max_parents_lag))
# Initialize the fit results
fit_results = {}
for j in self.selected_variables:
Y = [(j, 0)]
X = [(j, 0)] # dummy
Z = self.all_parents[j]
array, xyz, _ = \
self.dataframe.construct_array(X, Y, Z,
tau_max=self.tau_max,
mask_type=self.mask_type,
cut_off=cut_off,
remove_overlaps=True,
verbosity=self.verbosity)
# Get the dimensions out of the constructed array
dim, T = array.shape
dim_z = dim - 2
# Transform the data if needed
if self.data_transform is not None:
array = self.data_transform.fit_transform(X=array.T).T
# Cache the results
fit_results[j] = {}
# Cache the data transform
fit_results[j]['data_transform'] = deepcopy(self.data_transform)
if return_data:
# Cache the data if needed
fit_results[j]['data'] = array
fit_results[j]['used_indices'] = self.dataframe.use_indices_dataset_dict
# Copy and fit the model if there are any parents for this variable to fit
a_model = deepcopy(self.model)
if dim_z > 0:
a_model.fit(X=array[2:].T, y=array[1])
else:
# Just fit default (eg, mean)
class EmptyPredictorModel:
def fit(self, X, y):
self.result = empty_predictors_function(y)
def predict(self, X):
return self.result
a_model = EmptyPredictorModel()
# a_model = empty_predictors_model(array[1])
a_model.fit(X=array[2:].T, y=array[1])
fit_results[j]['model'] = a_model
# Cache and return the fit results
self.fit_results = fit_results
return fit_results
[docs] def get_coefs(self):
"""Returns dictionary of coefficients for linear models.
Only for models from sklearn.linear_model
Returns
-------
coeffs : dictionary
Dictionary of dictionaries for each variable with keys given by the
parents and the regression coefficients as values.
"""
coeffs = {}
for j in self.selected_variables:
coeffs[j] = {}
for ipar, par in enumerate(self.all_parents[j]):
coeffs[j][par] = self.fit_results[j]['model'].coef_[ipar]
return coeffs
[docs] def get_val_matrix(self):
"""Returns the coefficient array for different lags for linear model.
Requires fit_model() before. An entry val_matrix[i,j,tau] gives the
coefficient of the link from i to j at lag tau, including tau=0.
Returns
-------
val_matrix : array-like, shape (N, N, tau_max + 1)
Array of coefficients for each time lag, including lag-zero.
"""
coeffs = self.get_coefs()
val_matrix = np.zeros((self.N, self.N, self.tau_max + 1, ))
for j in list(coeffs):
for par in list(coeffs[j]):
i, tau = par
val_matrix[i,j,abs(tau)] = coeffs[j][par]
return val_matrix
[docs] def predict_full_model(self,
new_data=None,
pred_params=None,
cut_off='max_lag_or_tau_max'):
r"""Predict target variable with fitted model.
Uses the model.predict() function of the sklearn model.
A list of predicted time series for self.selected_variables is returned.
Parameters
----------
new_data : data object, optional
New Tigramite dataframe object with optional new mask. Note that
the data will be cut off according to cut_off, see parameter
`cut_off` below.
pred_params : dict, optional
Optional parameters passed on to sklearn prediction function.
cut_off : {'2xtau_max', 'max_lag', 'max_lag_or_tau_max'}
How many samples to cutoff at the beginning. The default is
'2xtau_max', which guarantees that MCI tests are all conducted on
the same samples. For modeling, 'max_lag_or_tau_max' can be used,
which uses the maximum of tau_max and the conditions, which is
useful to compare multiple models on the same sample. Last,
'max_lag' uses as much samples as possible.
Returns
-------
Results from prediction.
"""
if hasattr(self, 'selected_variables'):
target_list = self.selected_variables
else:
raise ValueError("Model not yet fitted.")
pred_list = []
self.stored_test_array = {}
for target in target_list:
# Default value for pred_params
if pred_params is None:
pred_params = {}
# Construct the array form of the data
Y = [(target, 0)] # dummy
X = [(target, 0)] # dummy
Z = self.all_parents[target]
# Check if we've passed a new dataframe object
if new_data is not None:
# if new_data.mask is None:
# # if no mask is supplied, use the same mask as for the fitted array
# new_data_mask = self.test_mask
# else:
new_data_mask = new_data.mask
test_array, _, _ = new_data.construct_array(X, Y, Z,
tau_max=self.tau_max,
mask=new_data_mask,
mask_type=self.mask_type,
cut_off=cut_off,
remove_overlaps=True,
verbosity=self.verbosity)
# Otherwise use the default values
else:
test_array, _, _ = \
self.dataframe.construct_array(X, Y, Z,
tau_max=self.tau_max,
mask_type=self.mask_type,
cut_off=cut_off,
remove_overlaps=True,
verbosity=self.verbosity)
# Transform the data if needed
a_transform = self.fit_results[target]['data_transform']
if a_transform is not None:
test_array = a_transform.transform(X=test_array.T).T
# Cache the test array
self.stored_test_array[target] = test_array
# Run the predictor
predicted = self.fit_results[target]['model'].predict(
X=test_array[2:].T, **pred_params)
if test_array[2:].size == 0:
# If there are no predictors, return the value of
# empty_predictors_function, which is np.mean
# and expand to the test array length
predicted = predicted * np.ones(test_array.shape[1])
pred_list.append(predicted)
return pred_list
[docs] def get_residuals_cov_mean(self, new_data=None, pred_params=None):
r"""Returns covariance and means of residuals from fitted model.
Residuals are available as self.residuals.
Parameters
----------
new_data : data object, optional
New Tigramite dataframe object with optional new mask. Note that
the data will be cut off according to cut_off, see parameter
`cut_off` below.
pred_params : dict, optional
Optional parameters passed on to sklearn prediction function.
Returns
-------
Results from prediction.
"""
assert self.dataframe.analysis_mode == 'single'
N = self.dataframe.N
T = self.dataframe.T[0]
# Get overlapping samples
used_indices = {}
overlapping = set(list(range(0, T)))
for j in self.all_parents:
if self.fit_results[j] is not None:
if 'used_indices' not in self.fit_results[j]:
raise ValueError("Run ")
used_indices[j] = set(self.fit_results[j]['used_indices'][0])
overlapping = overlapping.intersection(used_indices[j])
overlapping = sorted(list(overlapping))
if len(overlapping) <= 10:
raise ValueError("Less than 10 overlapping samples due to masking and/or missing values,"
" cannot compute residual covariance!")
predicted = self.predict_full_model(new_data=new_data,
pred_params=pred_params,
cut_off='max_lag_or_tau_max')
# Residuals only exist after tau_max
residuals = self.dataframe.values[0].copy()
for index, j in enumerate([j for j in self.all_parents]): # if len(parents[j]) > 0]):
residuals[list(used_indices[j]), j] -= predicted[index]
overlapping_residuals = residuals[overlapping]
len_residuals = len(overlapping_residuals)
cov = np.cov(overlapping_residuals, rowvar=0)
mean = np.mean(overlapping_residuals, axis=0) # residuals should have zero mean due to prediction including constant
self.residuals = overlapping_residuals
return cov, mean
[docs]class LinearMediation(Models):
r"""Linear mediation analysis for time series models.
Fits linear model to parents and provides functions to return measures such
as causal effect, mediated causal effect, average causal effect, etc. as
described in [4]_. Also allows for contemporaneous links.
For general linear and nonlinear causal effect analysis including latent
variables and further functionality use the CausalEffects class.
Notes
-----
This class implements the following causal mediation measures introduced in
[4]_:
* causal effect (CE)
* mediated causal effect (MCE)
* average causal effect (ACE)
* average causal susceptibility (ACS)
* average mediated causal effect (AMCE)
Consider a simple model of a causal chain as given in the Example with
.. math:: X_t &= \eta^X_t \\
Y_t &= 0.5 X_{t-1} + \eta^Y_t \\
Z_t &= 0.5 Y_{t-1} + \eta^Z_t
Here the link coefficient of :math:`X_{t-2} \to Z_t` is zero while the
causal effect is 0.25. MCE through :math:`Y` is 0.25 implying that *all*
of the the CE is explained by :math:`Y`. ACE from :math:`X` is 0.37 since it
has CE 0.5 on :math:`Y` and 0.25 on :math:`Z`.
Examples
--------
>>> links_coeffs = {0: [], 1: [((0, -1), 0.5)], 2: [((1, -1), 0.5)]}
>>> data, true_parents = toys.var_process(links_coeffs, T=1000, seed=42)
>>> dataframe = pp.DataFrame(data)
>>> med = LinearMediation(dataframe=dataframe)
>>> med.fit_model(all_parents=true_parents, tau_max=3)
>>> print "Link coefficient (0, -2) --> 2: ", med.get_coeff(
i=0, tau=-2, j=2)
>>> print "Causal effect (0, -2) --> 2: ", med.get_ce(i=0, tau=-2, j=2)
>>> print "Mediated Causal effect (0, -2) --> 2 through 1: ", med.get_mce(
i=0, tau=-2, j=2, k=1)
>>> print "Average Causal Effect: ", med.get_all_ace()
>>> print "Average Causal Susceptibility: ", med.get_all_acs()
>>> print "Average Mediated Causal Effect: ", med.get_all_amce()
Link coefficient (0, -2) --> 2: 0.0
Causal effect (0, -2) --> 2: 0.250648072987
Mediated Causal effect (0, -2) --> 2 through 1: 0.250648072987
Average Causal Effect: [ 0.36897445 0.25718002 0. ]
Average Causal Susceptibility: [ 0. 0.24365041 0.38250406]
Average Mediated Causal Effect: [ 0. 0.12532404 0. ]
References
----------
.. [4] J. Runge et al. (2015): Identifying causal gateways and mediators in
complex spatio-temporal systems.
Nature Communications, 6, 8502. http://doi.org/10.1038/ncomms9502
Parameters
----------
dataframe : data object
Tigramite dataframe object. It must have the attributes dataframe.values
yielding a numpy array of shape (observations T, variables N) and
optionally a mask of the same shape and a missing values flag.
model_params : dictionary, optional (default: None)
Optional parameters passed on to sklearn model
data_transform : sklearn preprocessing object, optional (default: StandardScaler)
Used to transform data prior to fitting. For example,
sklearn.preprocessing.StandardScaler for simple standardization. The
fitted parameters are stored.
mask_type : {None, 'y','x','z','xy','xz','yz','xyz'}
Masking mode: Indicators for which variables in the dependence
measure I(X; Y | Z) the samples should be masked. If None, the mask
is not used. Explained in tutorial on masking and missing values.
verbosity : int, optional (default: 0)
Level of verbosity.
"""
def __init__(self,
dataframe,
model_params=None,
data_transform=sklearn.preprocessing.StandardScaler(),
mask_type=None,
verbosity=0):
# Initialize the member variables to None
self.phi = None
self.psi = None
self.all_psi_k = None
self.dataframe = dataframe
self.mask_type = mask_type
self.data_transform = data_transform
if model_params is None:
self.model_params = {}
else:
self.model_params = model_params
self.bootstrap_available = False
# Build the model using the parameters
if model_params is None:
model_params = {}
this_model = sklearn.linear_model.LinearRegression(**model_params)
Models.__init__(self,
dataframe=dataframe,
model=this_model,
data_transform=data_transform,
mask_type=mask_type,
verbosity=verbosity)
[docs] def fit_model(self, all_parents, tau_max=None, return_data=False):
r"""Fit linear time series model.
Fits a sklearn.linear_model.LinearRegression model to the parents of
each variable and computes the coefficient matrices :math:`\Phi` and
:math:`\Psi` as described in [4]_. Does accept contemporaneous links.
Parameters
----------
all_parents : dictionary
Dictionary of form {0:[(0, -1), (3, 0), ...], 1:[], ...} containing
the parents estimated with PCMCI.
tau_max : int, optional (default: None)
Maximum time lag. If None, the maximum lag in all_parents is used.
return_data : bool, optional (default: False)
Whether to save the data array. Needed to get residuals.
"""
# Fit the model using the base class
self.fit_results = self.fit_full_model(all_parents=all_parents,
selected_variables=None,
return_data=return_data,
tau_max=tau_max)
# Cache the results in the member variables
coeffs = self.get_coefs()
self.phi = self._get_phi(coeffs)
self.psi = self._get_psi(self.phi)
self.all_psi_k = self._get_all_psi_k(self.phi)
self.all_parents = all_parents
# self.tau_max = tau_max
[docs] def fit_model_bootstrap(self,
boot_blocklength=1,
seed=None,
boot_samples=100):
"""Fits boostrap-versions of Phi, Psi, etc.
Random draws are generated
Parameters
----------
boot_blocklength : int, or in {'cube_root', 'from_autocorrelation'}
Block length for block-bootstrap. If 'cube_root' it is the cube
root of the time series length.
seed : int, optional(default = None)
Seed for RandomState (default_rng)
boot_samples : int
Number of bootstrap samples.
"""
self.phi_boots = np.empty((boot_samples,) + self.phi.shape)
self.psi_boots = np.empty((boot_samples,) + self.psi.shape)
self.all_psi_k_boots = np.empty((boot_samples,) + self.all_psi_k.shape)
if self.verbosity > 0:
print("\n##\n## Generating bootstrap samples of Phi, Psi, etc " +
"\n##\n" +
"\nboot_samples = %s \n" % boot_samples +
"\nboot_blocklength = %s \n" % boot_blocklength
)
for b in range(boot_samples):
# # Replace dataframe in method args by bootstrapped dataframe
# method_args_bootstrap['dataframe'].bootstrap = boot_draw
if seed is None:
random_state = np.random.default_rng(None)
else:
random_state = np.random.default_rng(seed+b)
dataframe_here = deepcopy(self.dataframe)
dataframe_here.bootstrap = {'boot_blocklength':boot_blocklength,
'random_state':random_state}
model = Models(dataframe=dataframe_here,
model=sklearn.linear_model.LinearRegression(**self.model_params),
data_transform=self.data_transform,
mask_type=self.mask_type,
verbosity=0)
model.fit_full_model(all_parents=self.all_parents,
tau_max=self.tau_max)
# Cache the results in the member variables
coeffs = model.get_coefs()
phi = self._get_phi(coeffs)
self.phi_boots[b] = phi
self.psi_boots[b] = self._get_psi(phi)
self.all_psi_k_boots[b] = self._get_all_psi_k(phi)
self.bootstrap_available = True
return self
[docs] def get_bootstrap_of(self, function, function_args, conf_lev=0.9):
"""Applies bootstrap-versions of Phi, Psi, etc. to any function in
this class.
Parameters
----------
function : string
Valid function from LinearMediation class
function_args : dict
Optional function arguments.
conf_lev : float
Confidence interval.
Returns
-------
Upper/Lower confidence interval of function.
"""
valid_functions = [
'get_coeff',
'get_ce',
'get_ce_max',
'get_joint_ce',
'get_joint_ce_matrix',
'get_mce',
'get_conditional_mce',
'get_joint_mce',
'get_ace',
'get_all_ace',
'get_acs',
'get_all_acs',
'get_amce',
'get_all_amce',
'get_val_matrix',
]
if function not in valid_functions:
raise ValueError("function must be in %s" %valid_functions)
realizations = self.phi_boots.shape[0]
original_phi = deepcopy(self.phi)
original_psi = deepcopy(self.psi)
original_all_psi_k = deepcopy(self.all_psi_k)
for r in range(realizations):
self.phi = self.phi_boots[r]
self.psi = self.psi_boots[r]
self.all_psi_k = self.all_psi_k_boots[r]
boot_effect = getattr(self, function)(**function_args)
if r == 0:
bootstrap_result = np.empty((realizations,) + boot_effect.shape)
bootstrap_result[r] = boot_effect
# Confidence intervals for val_matrix; interval is two-sided
c_int = (1. - (1. - conf_lev)/2.)
confidence_interval = np.percentile(
bootstrap_result, axis=0,
q = [100*(1. - c_int), 100*c_int])
self.phi = original_phi
self.psi = original_psi
self.all_psi_k = original_all_psi_k
self.bootstrap_result = bootstrap_result
return confidence_interval
def _check_sanity(self, X, Y, k=None):
"""Checks validity of some parameters."""
if len(X) != 1 or len(Y) != 1:
raise ValueError("X must be of form [(i, -tau)] and Y = [(j, 0)], "
"but are X = %s, Y=%s" % (X, Y))
i, tau = X[0]
if abs(tau) > self.tau_max:
raise ValueError("X must be of form [(i, -tau)] with"
" tau <= tau_max")
if k is not None and (k < 0 or k >= self.N):
raise ValueError("k must be in [0, N)")
def _get_phi(self, coeffs):
"""Returns the linear coefficient matrices for different lags.
Parameters
----------
coeffs : dictionary
Dictionary of coefficients for each parent.
Returns
-------
phi : array-like, shape (tau_max + 1, N, N)
Matrices of coefficients for each time lag.
"""
phi = np.zeros((self.tau_max + 1, self.N, self.N))
# phi[0] = np.identity(self.N)
# Also includes contemporaneous lags
for j in list(coeffs):
for par in list(coeffs[j]):
i, tau = par
phi[abs(tau), j, i] = coeffs[j][par]
return phi
def _get_psi(self, phi):
"""Returns the linear causal effect matrices for different lags incl
lag zero.
Parameters
----------
phi : array-like
Coefficient matrices at different lags.
Returns
-------
psi : array-like, shape (tau_max + 1, N, N)
Matrices of causal effects for each time lag incl contemporaneous links.
"""
psi = np.zeros((self.tau_max + 1, self.N, self.N))
psi[0] = np.linalg.pinv(np.identity(self.N) - phi[0])
for tau in range(1, self.tau_max + 1):
for s in range(1, tau + 1):
psi[tau] += np.matmul(psi[0], np.matmul(phi[s], psi[tau - s]) )
# Lagged-only effects:
# psi = np.zeros((self.tau_max + 1, self.N, self.N))
# psi[0] = np.identity(self.N)
# for n in range(1, self.tau_max + 1):
# psi[n] = np.zeros((self.N, self.N))
# for s in range(1, n + 1):
# psi[n] += np.dot(phi[s], psi[n - s])
return psi
def _get_psi_k(self, phi, k):
"""Returns the linear causal effect matrices excluding variable k.
Essentially, this blocks all path through parents of variable k
at any lag.
Parameters
----------
phi : array-like
Coefficient matrices at different lags.
k : int or list of ints
Variable indices to exclude causal effects through.
Returns
-------
psi_k : array-like, shape (tau_max + 1, N, N)
Matrices of causal effects excluding k.
"""
psi_k = np.zeros((self.tau_max + 1, self.N, self.N))
phi_k = np.copy(phi)
if isinstance(k, int):
phi_k[:, k, :] = 0.
else:
for k_here in k:
phi_k[:, k_here, :] = 0.
psi_k[0] = np.linalg.pinv(np.identity(self.N) - phi_k[0])
for tau in range(1, self.tau_max + 1):
# psi_k[tau] = np.matmul(psi_k[0], np.matmul(phi_k[tau], psi_k[0]))
for s in range(1, tau + 1):
psi_k[tau] += np.matmul(psi_k[0], np.matmul(phi_k[s], psi_k[tau - s]))
# psi_k[0] = np.identity(self.N)
# phi_k = np.copy(phi)
# phi_k[:, k, :] = 0.
# for n in range(1, self.tau_max + 1):
# psi_k[n] = np.zeros((self.N, self.N))
# for s in range(1, n + 1):
# psi_k[n] += np.dot(phi_k[s], psi_k[n - s])
return psi_k
def _get_all_psi_k(self, phi):
"""Returns the linear causal effect matrices excluding variables.
Parameters
----------
phi : array-like
Coefficient matrices at different lags.
Returns
-------
all_psi_k : array-like, shape (N, tau_max + 1, N, N)
Matrices of causal effects where for each row another variable is
excluded.
"""
all_psi_k = np.zeros((self.N, self.tau_max + 1, self.N, self.N))
for k in range(self.N):
all_psi_k[k] = self._get_psi_k(phi, k)
return all_psi_k
[docs] def get_coeff(self, i, tau, j):
"""Returns link coefficient.
This is the direct causal effect for a particular link (i, -tau) --> j.
Parameters
----------
i : int
Index of cause variable.
tau : int
Lag of cause variable (incl lag zero).
j : int
Index of effect variable.
Returns
-------
coeff : float
"""
return self.phi[abs(tau), j, i]
[docs] def get_ce(self, i, tau, j):
"""Returns the causal effect.
This is the causal effect for (i, -tau) -- --> j.
Parameters
----------
i : int
Index of cause variable.
tau : int
Lag of cause variable (incl lag zero).
j : int
Index of effect variable.
Returns
-------
ce : float
"""
return self.psi[abs(tau), j, i]
[docs] def get_ce_max(self, i, j):
"""Returns the causal effect.
This is the maximum absolute causal effect for i --> j across all
lags (incl lag zero).
Parameters
----------
i : int
Index of cause variable.
j : int
Index of effect variable.
Returns
-------
ce : float
"""
argmax = np.abs(self.psi[:, j, i]).argmax()
return self.psi[:, j, i][argmax]
[docs] def get_joint_ce(self, i, j):
"""Returns the joint causal effect.
This is the causal effect from all lags [t, ..., t-tau_max]
of i on j at time t. Note that the joint effect does not
count links passing through parents of i itself.
Parameters
----------
i : int
Index of cause variable.
j : int
Index of effect variable.
Returns
-------
joint_ce : array of shape (tau_max + 1)
Causal effect from each lag [t, ..., t-tau_max] of i on j.
"""
joint_ce = self.all_psi_k[i, :, j, i]
return joint_ce
[docs] def get_joint_ce_matrix(self, i, j):
"""Returns the joint causal effect matrix of i on j.
This is the causal effect from all lags [t, ..., t-tau_max]
of i on j at times [t, ..., t-tau_max]. Note that the joint effect does not
count links passing through parents of i itself.
An entry (taui, tauj) stands for the effect of i at t-taui on j at t-tauj.
Parameters
----------
i : int
Index of cause variable.
j : int
Index of effect variable.
Returns
-------
joint_ce_matrix : 2d array of shape (tau_max + 1, tau_max + 1)
Causal effect matrix from each lag of i on each lag of j.
"""
joint_ce_matrix = np.zeros((self.tau_max + 1, self.tau_max + 1))
for tauj in range(self.tau_max + 1):
joint_ce_matrix[tauj:, tauj] = self.all_psi_k[i, tauj:, j, i][::-1]
return joint_ce_matrix
[docs] def get_mce(self, i, tau, j, k):
"""Returns the mediated causal effect.
This is the causal effect for i --> j minus the causal effect not going
through k.
Parameters
----------
i : int
Index of cause variable.
tau : int
Lag of cause variable.
j : int
Index of effect variable.
k : int or list of ints
Indices of mediator variables.
Returns
-------
mce : float
"""
if isinstance(k, int):
effect_without_k = self.all_psi_k[k, abs(tau), j, i]
else:
effect_without_k = self._get_psi_k(self.phi, k=k)[abs(tau), j, i]
mce = self.psi[abs(tau), j, i] - effect_without_k
return mce
[docs] def get_conditional_mce(self, i, tau, j, k, notk):
"""Returns the conditional mediated causal effect.
This is the causal effect for i --> j for all paths going through k, but not through notk.
Parameters
----------
i : int
Index of cause variable.
tau : int
Lag of cause variable.
j : int
Index of effect variable.
k : int or list of ints
Indices of mediator variables.
notk : int or list of ints
Indices of mediator variables to exclude.
Returns
-------
mce : float
"""
if isinstance(k, int):
k = set([k])
else:
k = set(k)
if isinstance(notk, int):
notk = set([notk])
else:
notk = set(notk)
bothk = list(k.union(notk))
notk = list(notk)
effect_without_bothk = self._get_psi_k(self.phi, k=bothk)[abs(tau), j, i]
effect_without_notk = self._get_psi_k(self.phi, k=notk)[abs(tau), j, i]
# mce = self.psi[abs(tau), j, i] - effect_without_k
mce = effect_without_notk - effect_without_bothk
return mce
[docs] def get_joint_mce(self, i, j, k):
"""Returns the joint causal effect mediated through k.
This is the mediated causal effect from all lags [t, ..., t-tau_max]
of i on j at time t for paths through k. Note that the joint effect
does not count links passing through parents of i itself.
Parameters
----------
i : int
Index of cause variable.
j : int
Index of effect variable.
k : int or list of ints
Indices of mediator variables.
Returns
-------
joint_mce : array of shape (tau_max + 1)
Mediated causal effect from each lag [t, ..., t-tau_max] of i on j through k.
"""
if isinstance(k, int):
k_here = [k]
effect_without_k = self._get_psi_k(self.phi, k=[i] + k_here)
joint_mce = self.all_psi_k[i, :, j, i] - effect_without_k[:, j, i]
return joint_mce
[docs] def get_ace(self, i, lag_mode='absmax', exclude_i=True):
"""Returns the average causal effect.
This is the average causal effect (ACE) emanating from variable i to any
other variable. With lag_mode='absmax' this is based on the lag of
maximum CE for each pair.
Parameters
----------
i : int
Index of cause variable.
lag_mode : {'absmax', 'all_lags'}
Lag mode. Either average across all lags between each pair or only
at the lag of maximum absolute causal effect.
exclude_i : bool, optional (default: True)
Whether to exclude causal effects on the variable itself at later
lags.
Returns
-------
ace :float
Average Causal Effect.
"""
all_but_i = np.ones(self.N, dtype='bool')
if exclude_i:
all_but_i[i] = False
if lag_mode == 'absmax':
return np.abs(self.psi[:, all_but_i, i]).max(axis=0).mean()
elif lag_mode == 'all_lags':
return np.abs(self.psi[:, all_but_i, i]).mean()
else:
raise ValueError("lag_mode = %s not implemented" % lag_mode)
[docs] def get_all_ace(self, lag_mode='absmax', exclude_i=True):
"""Returns the average causal effect for all variables.
This is the average causal effect (ACE) emanating from variable i to any
other variable. With lag_mode='absmax' this is based on the lag of
maximum CE for each pair.
Parameters
----------
lag_mode : {'absmax', 'all_lags'}
Lag mode. Either average across all lags between each pair or only
at the lag of maximum absolute causal effect.
exclude_i : bool, optional (default: True)
Whether to exclude causal effects on the variable itself at later
lags.
Returns
-------
ace : array of shape (N,)
Average Causal Effect for each variable.
"""
ace = np.zeros(self.N)
for i in range(self.N):
ace[i] = self.get_ace(i, lag_mode=lag_mode, exclude_i=exclude_i)
return ace
[docs] def get_acs(self, j, lag_mode='absmax', exclude_j=True):
"""Returns the average causal susceptibility.
This is the Average Causal Susceptibility (ACS) affecting a variable j
from any other variable. With lag_mode='absmax' this is based on the lag
of maximum CE for each pair.
Parameters
----------
j : int
Index of variable.
lag_mode : {'absmax', 'all_lags'}
Lag mode. Either average across all lags between each pair or only
at the lag of maximum absolute causal effect.
exclude_j : bool, optional (default: True)
Whether to exclude causal effects on the variable itself at previous
lags.
Returns
-------
acs : float
Average Causal Susceptibility.
"""
all_but_j = np.ones(self.N, dtype='bool')
if exclude_j:
all_but_j[j] = False
if lag_mode == 'absmax':
return np.abs(self.psi[:, j, all_but_j]).max(axis=0).mean()
elif lag_mode == 'all_lags':
return np.abs(self.psi[:, j, all_but_j]).mean()
else:
raise ValueError("lag_mode = %s not implemented" % lag_mode)
[docs] def get_all_acs(self, lag_mode='absmax', exclude_j=True):
"""Returns the average causal susceptibility.
This is the Average Causal Susceptibility (ACS) for each variable from
any other variable. With lag_mode='absmax' this is based on the lag of
maximum CE for each pair.
Parameters
----------
lag_mode : {'absmax', 'all_lags'}
Lag mode. Either average across all lags between each pair or only
at the lag of maximum absolute causal effect.
exclude_j : bool, optional (default: True)
Whether to exclude causal effects on the variable itself at previous
lags.
Returns
-------
acs : array of shape (N,)
Average Causal Susceptibility.
"""
acs = np.zeros(self.N)
for j in range(self.N):
acs[j] = self.get_acs(j, lag_mode=lag_mode, exclude_j=exclude_j)
return acs
[docs] def get_amce(self, k, lag_mode='absmax',
exclude_k=True, exclude_self_effects=True):
"""Returns the average mediated causal effect.
This is the Average Mediated Causal Effect (AMCE) through a variable k
With lag_mode='absmax' this is based on the lag of maximum CE for each
pair.
Parameters
----------
k : int
Index of variable.
lag_mode : {'absmax', 'all_lags'}
Lag mode. Either average across all lags between each pair or only
at the lag of maximum absolute causal effect.
exclude_k : bool, optional (default: True)
Whether to exclude causal effects through the variable itself at
previous lags.
exclude_self_effects : bool, optional (default: True)
Whether to exclude causal self effects of variables on themselves.
Returns
-------
amce : float
Average Mediated Causal Effect.
"""
all_but_k = np.ones(self.N, dtype='bool')
if exclude_k:
all_but_k[k] = False
N_new = self.N - 1
else:
N_new = self.N
if exclude_self_effects:
weights = np.identity(N_new) == False
else:
weights = np.ones((N_new, N_new), dtype='bool')
# if self.tau_max < 2:
# raise ValueError("Mediation only nonzero for tau_max >= 2")
all_mce = self.psi[:, :, :] - self.all_psi_k[k, :, :, :]
# all_mce[:, range(self.N), range(self.N)] = 0.
if lag_mode == 'absmax':
return np.average(np.abs(all_mce[:, all_but_k, :]
[:, :, all_but_k]
).max(axis=0), weights=weights)
elif lag_mode == 'all_lags':
return np.abs(all_mce[:, all_but_k, :][:, :, all_but_k]).mean()
else:
raise ValueError("lag_mode = %s not implemented" % lag_mode)
[docs] def get_all_amce(self, lag_mode='absmax',
exclude_k=True, exclude_self_effects=True):
"""Returns the average mediated causal effect.
This is the Average Mediated Causal Effect (AMCE) through all variables
With lag_mode='absmax' this is based on the lag of maximum CE for each
pair.
Parameters
----------
lag_mode : {'absmax', 'all_lags'}
Lag mode. Either average across all lags between each pair or only
at the lag of maximum absolute causal effect.
exclude_k : bool, optional (default: True)
Whether to exclude causal effects through the variable itself at
previous lags.
exclude_self_effects : bool, optional (default: True)
Whether to exclude causal self effects of variables on themselves.
Returns
-------
amce : array of shape (N,)
Average Mediated Causal Effect.
"""
amce = np.zeros(self.N)
for k in range(self.N):
amce[k] = self.get_amce(k,
lag_mode=lag_mode,
exclude_k=exclude_k,
exclude_self_effects=exclude_self_effects)
return amce
[docs] def get_val_matrix(self, symmetrize=False):
"""Returns the matrix of linear coefficients.
Requires fit_model() before. An entry val_matrix[i,j,tau] gives the
coefficient of the link from i to j at lag tau. Lag=0 is always set
to zero for LinearMediation, use Models class for contemporaneous
models.
Parameters
----------
symmetrize : bool
If True, the lag-zero entries will be symmetrized such that
no zeros appear. Useful since other parts of tigramite
through an error for non-symmetric val_matrix, eg plotting.
Returns
-------
val_matrix : array
Matrix of linear coefficients, shape (N, N, tau_max + 1).
"""
val_matrix = np.copy(self.phi.transpose())
N = val_matrix.shape[0]
if symmetrize:
# Symmetrize since otherwise other parts of tigramite through an error
for i in range(N):
for j in range(N):
if val_matrix[i,j, 0] == 0.:
val_matrix[i,j, 0] = val_matrix[j,i, 0]
return val_matrix
[docs] def net_to_tsg(self, row, lag, max_lag):
"""Helper function to translate from network to time series graph."""
return row * max_lag + lag
[docs] def tsg_to_net(self, node, max_lag):
"""Helper function to translate from time series graph to network."""
row = node // max_lag
lag = node % max_lag
return (row, -lag)
[docs] def get_tsg(self, link_matrix, val_matrix=None, include_neighbors=False):
"""Returns time series graph matrix.
Constructs a matrix of shape (N*tau_max, N*tau_max) from link_matrix.
This matrix can be used for plotting the time series graph and analyzing
causal pathways.
Parameters
----------
link_matrix : bool array-like, optional (default: None)
Matrix of significant links. Must be of same shape as val_matrix.
Either sig_thres or link_matrix has to be provided.
val_matrix : array_like
Matrix of shape (N, N, tau_max+1) containing test statistic values.
include_neighbors : bool, optional (default: False)
Whether to include causal paths emanating from neighbors of i
Returns
-------
tsg : array of shape (N*tau_max, N*tau_max)
Time series graph matrix.
"""
N = len(link_matrix)
max_lag = link_matrix.shape[2] + 1
# Create TSG
tsg = np.zeros((N * max_lag, N * max_lag))
for i, j, tau in np.column_stack(np.where(link_matrix)):
# if tau > 0 or include_neighbors:
for t in range(max_lag):
link_start = self.net_to_tsg(i, t - tau, max_lag)
link_end = self.net_to_tsg(j, t, max_lag)
if (0 <= link_start and
(link_start % max_lag) <= (link_end % max_lag)):
if val_matrix is not None:
tsg[link_start, link_end] = val_matrix[i, j, tau]
else:
tsg[link_start, link_end] = 1
return tsg
[docs] def get_mediation_graph_data(self, i, tau, j, include_neighbors=False):
r"""Returns link and node weights for mediation analysis.
Returns array with non-zero entries for links that are on causal
paths between :math:`i` and :math:`j` at lag :math:`\tau`.
``path_val_matrix`` contains the corresponding path coefficients and
``path_node_array`` the MCE values. ``tsg_path_val_matrix`` contains the
corresponding values in the time series graph format.
Parameters
----------
i : int
Index of cause variable.
tau : int
Lag of cause variable.
j : int
Index of effect variable.
include_neighbors : bool, optional (default: False)
Whether to include causal paths emanating from neighbors of i
Returns
-------
graph_data : dictionary
Dictionary of matrices for coloring mediation graph plots.
"""
path_link_matrix = np.zeros((self.N, self.N, self.tau_max + 1))
path_val_matrix = np.zeros((self.N, self.N, self.tau_max + 1))
# Get mediation of path variables
path_node_array = (self.psi.reshape(1, self.tau_max + 1, self.N, self.N)
- self.all_psi_k)[:, abs(tau), j, i]
# Get involved links
val_matrix = self.phi.transpose()
link_matrix = val_matrix != 0.
max_lag = link_matrix.shape[2] + 1
# include_neighbors = False because True would allow
# --> o -- motifs in networkx.all_simple_paths as paths, but
# these are blocked...
tsg = self.get_tsg(link_matrix, val_matrix=val_matrix,
include_neighbors=False)
if include_neighbors:
# Add contemporaneous links only at source node
for m, n in zip(*np.where(link_matrix[:, :, 0])):
# print m,n
if m != n:
tsg[self.net_to_tsg(m, max_lag - tau - 1, max_lag),
self.net_to_tsg(n, max_lag - tau - 1, max_lag)
] = val_matrix[m, n, 0]
tsg_path_val_matrix = np.zeros(tsg.shape)
graph = networkx.DiGraph(tsg)
pathways = []
for path in networkx.all_simple_paths(graph,
source=self.net_to_tsg(i,
max_lag - tau - 1,
max_lag),
target=self.net_to_tsg(j,
max_lag - 0 - 1,
max_lag)):
pathways.append([self.tsg_to_net(p, max_lag) for p in path])
for ip, p in enumerate(path[1:]):
tsg_path_val_matrix[path[ip], p] = tsg[path[ip], p]
k, tau_k = self.tsg_to_net(p, max_lag)
link_start = self.tsg_to_net(path[ip], max_lag)
link_end = self.tsg_to_net(p, max_lag)
delta_tau = abs(link_end[1] - link_start[1])
path_val_matrix[link_start[0],
link_end[0],
delta_tau] = val_matrix[link_start[0],
link_end[0],
delta_tau]
graph_data = {'path_node_array': path_node_array,
'path_val_matrix': path_val_matrix,
'tsg_path_val_matrix': tsg_path_val_matrix}
return graph_data
[docs]class Prediction(Models, PCMCI):
r"""Prediction class for time series models.
Allows to fit and predict from any sklearn model. The optimal predictors can
be estimated using PCMCI. Also takes care of missing values, masking and
preprocessing.
Parameters
----------
dataframe : data object
Tigramite dataframe object. It must have the attributes dataframe.values
yielding a numpy array of shape (observations T, variables N) and
optionally a mask of the same shape and a missing values flag.
train_indices : array-like
Either boolean array or time indices marking the training data.
test_indices : array-like
Either boolean array or time indices marking the test data.
prediction_model : sklearn model object
For example, sklearn.linear_model.LinearRegression() for a linear
regression model.
cond_ind_test : Conditional independence test object, optional
Only needed if predictors are estimated with causal algorithm.
The class will be initialized with masking set to the training data.
data_transform : sklearn preprocessing object, optional (default: None)
Used to transform data prior to fitting. For example,
sklearn.preprocessing.StandardScaler for simple standardization. The
fitted parameters are stored.
verbosity : int, optional (default: 0)
Level of verbosity.
"""
def __init__(self,
dataframe,
train_indices,
test_indices,
prediction_model,
cond_ind_test=None,
data_transform=None,
verbosity=0):
if dataframe.analysis_mode != 'single':
raise ValueError("Prediction class currently only supports single "
"datasets.")
# dataframe.values = {0: dataframe.values[0]}
# Default value for the mask
if dataframe.mask is not None:
mask = {0: dataframe.mask[0]}
else:
mask = {0: np.zeros(dataframe.values[0].shape, dtype='bool')}
# Get the dataframe shape
T = dataframe.T[0]
# Have the default dataframe be the training data frame
train_mask = deepcopy(mask)
train_mask[0][[t for t in range(T) if t not in train_indices]] = True
self.dataframe = deepcopy(dataframe)
self.dataframe.mask = train_mask
self.dataframe._initialized_from = 'dict'
# = DataFrame(dataframe.values[0],
# mask=train_mask,
# missing_flag=dataframe.missing_flag)
# Initialize the models baseclass with the training dataframe
Models.__init__(self,
dataframe=self.dataframe,
model=prediction_model,
data_transform=data_transform,
mask_type='y',
verbosity=verbosity)
# Build the testing dataframe as well
self.test_mask = deepcopy(mask)
self.test_mask[0][[t for t in range(T) if t not in test_indices]] = True
self.train_indices = train_indices
self.test_indices = test_indices
# Setup the PCMCI instance
if cond_ind_test is not None:
# Force the masking
cond_ind_test.set_mask_type('y')
cond_ind_test.verbosity = verbosity
PCMCI.__init__(self,
dataframe=self.dataframe,
cond_ind_test=cond_ind_test,
verbosity=verbosity)
# Set the member variables
self.cond_ind_test = cond_ind_test
# Initialize member varialbes that are set outside
self.target_predictors = None
self.selected_targets = None
self.fitted_model = None
self.test_array = None
[docs] def get_predictors(self,
selected_targets=None,
selected_links=None,
steps_ahead=1,
tau_max=1,
pc_alpha=0.2,
max_conds_dim=None,
max_combinations=1):
"""Estimate predictors using PC1 algorithm.
Wrapper around PCMCI.run_pc_stable that estimates causal predictors.
The lead time can be specified by ``steps_ahead``.
Parameters
----------
selected_targets : list of ints, optional (default: None)
List of variables to estimate predictors of. If None, predictors of
all variables are estimated.
selected_links : dict or None
Dictionary of form {0:[(0, -1), (3, -2), ...], 1:[], ...}
specifying whether only selected links should be tested. If None is
passed, all links are tested
steps_ahead : int, default: 1
Minimum time lag to test. Useful for multi-step ahead predictions.
tau_max : int, default: 1
Maximum time lag. Must be larger or equal to tau_min.
pc_alpha : float or list of floats, default: 0.2
Significance level in algorithm. If a list or None is passed, the
pc_alpha level is optimized for every variable across the given
pc_alpha values using the score computed in
cond_ind_test.get_model_selection_criterion()
max_conds_dim : int or None
Maximum number of conditions to test. If None is passed, this number
is unrestricted.
max_combinations : int, default: 1
Maximum number of combinations of conditions of current cardinality
to test. Defaults to 1 for PC_1 algorithm. For original PC algorithm
a larger number, such as 10, can be used.
Returns
-------
predictors : dict
Dictionary of form {0:[(0, -1), (3, -2), ...], 1:[], ...}
containing estimated predictors.
"""
if selected_links is not None:
link_assumptions = {}
for j in selected_links.keys():
link_assumptions[j] = {(i, -tau):"-?>" for i in range(self.N) for tau in range(1, tau_max+1)}
else:
link_assumptions = None
# Ensure an independence model is given
if self.cond_ind_test is None:
raise ValueError("No cond_ind_test given!")
# Set the selected variables
self.selected_variables = range(self.N)
if selected_targets is not None:
self.selected_variables = selected_targets
predictors = self.run_pc_stable(link_assumptions=link_assumptions,
tau_min=steps_ahead,
tau_max=tau_max,
save_iterations=False,
pc_alpha=pc_alpha,
max_conds_dim=max_conds_dim,
max_combinations=max_combinations)
return predictors
[docs] def fit(self, target_predictors,
selected_targets=None, tau_max=None, return_data=False):
r"""Fit time series model.
Wrapper around ``Models.fit_full_model()``. To each variable in
``selected_targets``, the sklearn model is fitted with :math:`y` given
by the target variable, and :math:`X` given by its predictors. The
fitted model class is returned for later use.
Parameters
----------
target_predictors : dictionary
Dictionary of form {0:[(0, -1), (3, -2), ...], 1:[], ...} containing
the predictors estimated with PCMCI.
selected_targets : list of integers, optional (default: range(N))
Specify to fit model only for selected targets. If None is
passed, models are estimated for all variables.
tau_max : int, optional (default: None)
Maximum time lag. If None, the maximum lag in target_predictors is
used.
return_data : bool, optional (default: False)
Whether to save the data array.
Returns
-------
self : instance of self
"""
if selected_targets is None:
self.selected_targets = range(self.N)
else:
self.selected_targets = selected_targets
if tau_max is None:
# Find the maximal parents lag
max_parents_lag = 0
for j in self.selected_targets:
if target_predictors[j]:
this_parent_lag = np.abs(np.array(target_predictors[j])[:, 1]).max()
max_parents_lag = max(max_parents_lag, this_parent_lag)
else:
max_parents_lag = tau_max
if len(set(np.array(self.test_indices) - max_parents_lag)
.intersection(self.train_indices)) > 0:
if self.verbosity > 0:
warnings.warn("test_indices - maxlag(predictors) [or tau_max] "
"overlaps with train_indices: Choose test_indices "
"such that there is a gap of max_lag to train_indices!")
self.target_predictors = target_predictors
for target in self.selected_targets:
if target not in list(self.target_predictors):
raise ValueError("No predictors given for target %s" % target)
self.fitted_model = \
self.fit_full_model(all_parents=self.target_predictors,
selected_variables=self.selected_targets,
tau_max=tau_max,
return_data=return_data)
return self
[docs] def predict(self, target,
new_data=None,
pred_params=None,
cut_off='max_lag_or_tau_max'):
r"""Predict target variable with fitted model.
Uses the model.predict() function of the sklearn model.
If target is an int, the predicted time series is returned. If target
is a list of integers, then a list of predicted time series is returned.
If the list of integers equals range(N), then an array of shape (T, N)
of the predicted series is returned.
Parameters
----------
target : int or list of integers
Index or indices of target variable(s).
new_data : data object, optional
New Tigramite dataframe object with optional new mask. Note that
the data will be cut off according to cut_off, see parameter
`cut_off` below.
pred_params : dict, optional
Optional parameters passed on to sklearn prediction function.
cut_off : {'2xtau_max', 'max_lag', 'max_lag_or_tau_max'}
How many samples to cutoff at the beginning. The default is
'2xtau_max', which guarantees that MCI tests are all conducted on
the same samples. For modeling, 'max_lag_or_tau_max' can be used,
which uses the maximum of tau_max and the conditions, which is
useful to compare multiple models on the same sample. Last,
'max_lag' uses as much samples as possible.
Returns
-------
Results from prediction.
"""
if isinstance(target, int):
target_list = [target]
elif isinstance(target, list):
target_list = target
else:
raise ValueError("target must be either int or list of integers "
"indicating the index of the variables to "
"predict.")
if target_list == list(range(self.N)):
return_type = 'array'
elif len(target_list) == 1:
return_type = 'series'
else:
return_type = 'list'
pred_list = []
self.stored_test_array = {}
for target in target_list:
# Print message
if self.verbosity > 0:
print("\n##\n## Predicting target %s\n##" % target)
if pred_params is not None:
for key in list(pred_params):
print("%s = %s" % (key, pred_params[key]))
# Default value for pred_params
if pred_params is None:
pred_params = {}
# Check this is a valid target
if target not in self.selected_targets:
raise ValueError("Target %s not yet fitted" % target)
# Construct the array form of the data
Y = [(target, 0)] # dummy
X = [(target, 0)] # dummy
Z = self.target_predictors[target]
# Check if we've passed a new dataframe object
if new_data is not None:
# if new_data.mask is None:
# # if no mask is supplied, use the same mask as for the fitted array
# new_data_mask = self.test_mask
# else:
new_data_mask = new_data.mask
test_array, _, _ = new_data.construct_array(X, Y, Z,
tau_max=self.tau_max,
mask=new_data_mask,
mask_type=self.mask_type,
cut_off=cut_off,
remove_overlaps=True,
verbosity=self.verbosity)
# Otherwise use the default values
else:
test_array, _, _ = \
self.dataframe.construct_array(X, Y, Z,
tau_max=self.tau_max,
mask=self.test_mask,
mask_type=self.mask_type,
cut_off=cut_off,
remove_overlaps=True,
verbosity=self.verbosity)
# Transform the data if needed
a_transform = self.fitted_model[target]['data_transform']
if a_transform is not None:
test_array = a_transform.transform(X=test_array.T).T
# Cache the test array
self.stored_test_array[target] = test_array
# Run the predictor
predicted = self.fitted_model[target]['model'].predict(
X=test_array[2:].T, **pred_params)
if test_array[2:].size == 0:
# If there are no predictors, return the value of
# empty_predictors_function, which is np.mean
# and expand to the test array length
predicted = predicted * np.ones(test_array.shape[1])
pred_list.append(predicted)
if return_type == 'series':
return pred_list[0]
elif return_type == 'list':
return pred_list
elif return_type == 'array':
return np.array(pred_list).transpose()
[docs] def get_train_array(self, j):
"""Returns training array for variable j."""
return self.fitted_model[j]['data']
[docs] def get_test_array(self, j):
"""Returns test array for variable j."""
return self.stored_test_array[j]
if __name__ == '__main__':
import tigramite
import tigramite.data_processing as pp
from tigramite.toymodels import structural_causal_processes as toys
from tigramite.independence_tests.parcorr import ParCorr
import tigramite.plotting as tp
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.multioutput import MultiOutputRegressor
def lin_f(x): return x
T = 1000
def lin_f(x): return x
auto_coeff = 0.
coeff = 2.
links = {
0: [((0, -1), auto_coeff, lin_f)],
1: [((1, -1), auto_coeff, lin_f), ((0, 0), coeff, lin_f)],
}
data, nonstat = toys.structural_causal_process(links, T=T,
noises=None, seed=7)
# data[:,1] = data[:,1] > 0.
# # Create some missing values
# data[-10:,:] = 999.
# var_names = range(2)
# graph = np.array([['', '-->'],
# ['<--', '']],
# dtype='<U3')
print(data, data.mean(axis=0))
dataframe = pp.DataFrame(data,
# vector_vars={0:[(0,0), (1,0)], 1:[(2,0), (3,0)]}
)
graph = toys.links_to_graph(links, tau_max=4)
# # We are interested in lagged total effect of X on Y
X = [(0, 0), (0, -1)]
Y = [(1, 0), (1, -1)]
model = Models(dataframe=dataframe,
model = LinearRegression(),
# model = LogisticRegression(),
# model = MultiOutputRegressor(LogisticRegression()),
)
model.get_general_fitted_model(
Y=Y, X=X, Z=[(0, -2)],
conditions=[(0, -3)],
tau_max=7,
cut_off='tau_max',
empty_predictors_function=np.mean,
return_data=False)
# print(model.fit_results[(1, 0)]['model'].coef_)
dox_vals = np.array([0.]) #np.linspace(-1., 1., 1)
intervention_data = np.tile(dox_vals.reshape(len(dox_vals), 1), len(X))
conditions_data = np.tile(1. + dox_vals.reshape(len(dox_vals), 1), 1)
def aggregation_func(x, axis=0, bins=2):
x = x.astype('int64')
return np.apply_along_axis(np.bincount, axis=axis, arr=x, minlength=bins).T
aggregation_func = np.mean
pred = model.get_general_prediction(
intervention_data=intervention_data,
conditions_data=conditions_data,
pred_params=None,
transform_interventions_and_prediction=False,
return_further_pred_results=False,
aggregation_func=aggregation_func,
)
print("\n", pred)
# T = 1000
# links = {0: [((0, -1), 0.9, lin_f)],
# 1: [((1, -1), 0.9, lin_f), ((0, 0), -0.8, lin_f)],
# 2: [((2, -1), 0.9, lin_f), ((0, 0), 0.9, lin_f), ((1, 0), 0.8, lin_f)],
# # 3: [((3, -1), 0.9, lin_f), ((1, 0), 0.8, lin_f), ((2, 0), -0.9, lin_f)]
# }
# # noises = [np.random.randn for j in links.keys()]
# data, nonstat = toys.structural_causal_process(links, T=T, noises=None, seed=7)
# missing_flag = 999
# for i in range(0, 20):
# data[i::100] = missing_flag
# # mask = data>0
# parents = toys._get_true_parent_neighbor_dict(links)
# dataframe = pp.DataFrame(data, missing_flag = missing_flag)
# model = LinearRegression()
# model.fit(X=np.random.randn(10,2), y=np.random.randn(10))
# model.predict(X=np.random.randn(10,2)[:,2:])
# sys.exit(0)
# med = LinearMediation(dataframe=dataframe, #mask_type='y',
# data_transform=None)
# med.fit_model(all_parents=parents, tau_max=None, return_data=True)
# print(med.get_residuals_cov_mean())
# med.fit_model_bootstrap(
# boot_blocklength='cube_root',
# seed = 42,
# )
# # print(med.get_val_matrix())
# print (med.get_ce(i=0, tau=0, j=3))
# print(med.get_bootstrap_of(function='get_ce',
# function_args={'i':0, 'tau':0, 'j':3}, conf_lev=0.9))
# print (med.get_coeff(i=0, tau=-2, j=1))
# print (med.get_ce_max(i=0, j=2))
# print (med.get_ce(i=0, tau=0, j=3))
# print (med.get_mce(i=0, tau=0, k=[2], j=3))
# print (med.get_mce(i=0, tau=0, k=[1,2], j=3) - med.get_mce(i=0, tau=0, k=[1], j=3))
# print (med.get_conditional_mce(i=0, tau=0, k=[2], notk=[1], j=3))
# print (med.get_bootstrap_of('get_conditional_mce', {'i':0, 'tau':0, 'k':[2], 'notk':[1], 'j':3}))
# print(med.get_joint_ce(i=0, j=2))
# print(med.get_joint_mce(i=0, j=2, k=1))
# print(med.get_joint_ce_matrix(i=0, j=2))
# i=0; tau=4; j=2
# graph_data = med.get_mediation_graph_data(i=i, tau=tau, j=j)
# tp.plot_mediation_time_series_graph(
# # var_names=var_names,
# path_node_array=graph_data['path_node_array'],
# tsg_path_val_matrix=graph_data['tsg_path_val_matrix']
# )
# tp.plot_mediation_graph(
# # var_names=var_names,
# path_val_matrix=graph_data['path_val_matrix'],
# path_node_array=graph_data['path_node_array'],
# );
# plt.show()
# print ("Average Causal Effect X=%.2f, Y=%.2f, Z=%.2f " % tuple(med.get_all_ace()))
# print ("Average Causal Susceptibility X=%.2f, Y=%.2f, Z=%.2f " % tuple(med.get_all_acs()))
# print ("Average Mediated Causal Effect X=%.2f, Y=%.2f, Z=%.2f " % tuple(med.get_all_amce()))
# med = Models(dataframe=dataframe, model=sklearn.linear_model.LinearRegression(), data_transform=None)
# # Fit the model
# med.get_fit(all_parents=true_parents, tau_max=3)
# print(med.get_val_matrix())
# for j, i, tau, coeff in toys._iter_coeffs(links):
# print(i, j, tau, coeff, med.get_coeff(i=i, tau=tau, j=j))
# for causal_coeff in [med.get_ce(i=0, tau=-2, j=2),
# med.get_mce(i=0, tau=-2, j=2, k=1)]:
# print(causal_coeff)
# pred = Prediction(dataframe=dataframe,
# cond_ind_test=ParCorr(), #CMIknn ParCorr
# prediction_model = sklearn.linear_model.LinearRegression(),
# # prediction_model = sklearn.gaussian_process.GaussianProcessRegressor(),
# # prediction_model = sklearn.neighbors.KNeighborsRegressor(),
# data_transform=sklearn.preprocessing.StandardScaler(),
# train_indices= list(range(int(0.8*T))),
# test_indices= list(range(int(0.8*T), T)),
# verbosity=0
# )
# # predictors = pred.get_predictors(
# # selected_targets=[2],
# # selected_links=None,
# # steps_ahead=1,
# # tau_max=1,
# # pc_alpha=0.2,
# # max_conds_dim=None,
# # max_combinations=1)
# predictors = {0: [], # [(0, -1)],
# 1: [(1, -1), (0, -1)],
# 2: [(2, -1), (1, 0)]}
# pred.fit(target_predictors=predictors,
# selected_targets=None, tau_max=None, return_data=False)
# res = pred.predict(target=0,
# new_data=None,
# pred_params=None,
# cut_off='max_lag_or_tau_max')
# print(data[:,2])
# print(res)