Source code for tigramite.independence_tests.gpdc_torch

"""Tigramite causal discovery for time series."""

# Author: Jakob Runge <jakob@jakob-runge.com>
#
# License: GNU General Public License v3.0

from __future__ import print_function
import json, warnings, os, pathlib
import numpy as np
import gc
import dcor
import torch
import gpytorch
from .LBFGS import FullBatchLBFGS
from .independence_tests_base import CondIndTest

class GaussProcRegTorch():
    r"""Gaussian processes abstract base class.

    GP is estimated with gpytorch. Note that the kernel's hyperparameters are
    optimized during fitting.

    When the null distribution is not analytically available, but can be
    precomputed with the function generate_and_save_nulldists(...) which saves
    a \*.npz file containing the null distribution for different sample sizes.
    This file can then be supplied as null_dist_filename.

    Parameters
    ----------
    null_samples : int
        Number of null samples to use

    cond_ind_test : CondIndTest
        Conditional independence test that this Gaussian Proccess Regressor will
        calculate the null distribution for.  This is used to grab the
        get_dependence_measure function.

    null_dist_filename : str, otional (default: None)
        Path to file containing null distribution.

    verbosity : int, optional (default: 0)
        Level of verbosity.
    """

    def __init__(self,
                 null_samples,
                 cond_ind_test,
                 null_dist_filename=None,
                 checkpoint_size=None,
                 verbosity=0):
        # Set the dependence measure function
        self.cond_ind_test = cond_ind_test
        # Set member variables
        self.verbosity = verbosity
        # Set the null distribution defaults
        self.null_samples = null_samples
        self.null_dists = {}
        self.null_dist_filename = null_dist_filename
        # Check if we are loading a null distrubtion from a cached file
        if self.null_dist_filename is not None:
            self.null_dists, self.null_samples = \
                self._load_nulldist(self.null_dist_filename)
        # Size for batching
        self.checkpoint_size = checkpoint_size

    def _load_nulldist(self, filename):
        r"""
        Load a precomputed null distribution from a \*.npz file.  This
        distribution can be calculated using generate_and_save_nulldists(...).

        Parameters
        ----------
        filename : strng
            Path to the \*.npz file

        Returns
        -------
        null_dists, null_samples : dict, int
            The null distirbution as a dictionary of distributions keyed by
            sample size, the number of null samples in total.
        """
        null_dist_file = np.load(filename)
        null_dists = dict(zip(null_dist_file['T'],
                              null_dist_file['exact_dist']))
        null_samples = len(null_dist_file['exact_dist'][0])
        return null_dists, null_samples

    def _generate_nulldist(self, df,
                           add_to_null_dists=True):
        """Generates null distribution for pairwise independence tests.

        Generates the null distribution for sample size df. Assumes pairwise
        samples transformed to uniform marginals. Uses get_dependence_measure
        available in class and generates self.sig_samples random samples. Adds
        the null distributions to self.null_dists.

        Parameters
        ----------
        df : int
            Degrees of freedom / sample size to generate null distribution for.
        add_to_null_dists : bool, optional (default: True)
            Whether to add the null dist to the dictionary of null dists or
            just return it.

        Returns
        -------
        null_dist : array of shape [df,]
            Only returned,if add_to_null_dists is False.
        """

        if self.verbosity > 0:
            print("Generating null distribution for df = %d. " % df)
            if add_to_null_dists:
                print("For faster computations, run function "
                      "generate_and_save_nulldists(...) to "
                      "precompute null distribution and load *.npz file with "
                      "argument null_dist_filename")

        xyz = np.array([0, 1])

        null_dist = np.zeros(self.null_samples)
        for i in range(self.null_samples):
            array = self.cond_ind_test.random_state.random((2, df))
            null_dist[i] = self.cond_ind_test.get_dependence_measure(
                array, xyz)

        null_dist.sort()
        if add_to_null_dists:
            self.null_dists[df] = null_dist
        return null_dist

    def _generate_and_save_nulldists(self, sample_sizes, null_dist_filename):
        """Generates and saves null distribution for pairwise independence
        tests.

        Generates the null distribution for different sample sizes. Calls
        generate_nulldist. Null dists are saved to disk as
        self.null_dist_filename.npz. Also adds the null distributions to
        self.null_dists.

        Parameters
        ----------
        sample_sizes : list
            List of sample sizes.

        null_dist_filename : str
            Name to save file containing null distributions.
        """

        self.null_dist_filename = null_dist_filename

        null_dists = np.zeros((len(sample_sizes), self.null_samples))

        for iT, T in enumerate(sample_sizes):
            null_dists[iT] = self._generate_nulldist(
                T, add_to_null_dists=False)
            self.null_dists[T] = null_dists[iT]

        np.savez("%s" % null_dist_filename,
                 exact_dist=null_dists,
                 T=np.array(sample_sizes))


    def _get_single_residuals(self, array, target_var,
                                    return_means=False,
                                    standardize=True,
                                    return_likelihood=False,
                                    training_iter=50,
                                    lr=0.1):
        """Returns residuals of Gaussian process regression.

        Performs a GP regression of the variable indexed by target_var on the
        conditions Z. Here array is assumed to contain X and Y as the first two
        rows with the remaining rows (if present) containing the conditions Z.
        Optionally returns the estimated mean and the likelihood.

        Parameters
        ----------
        array : array-like
            data array with X, Y, Z in rows and observations in columns

        target_var : {0, 1}
            Variable to regress out conditions from.

        standardize : bool, optional (default: True)
            Whether to standardize the array beforehand.

        return_means : bool, optional (default: False)
            Whether to return the estimated regression line.

        return_likelihood : bool, optional (default: False)
            Whether to return the log_marginal_likelihood of the fitted GP.

        training_iter : int, optional (default: 50)
            Number of training iterations.

        lr : float, optional (default: 0.1)
            Learning rate (default: 0.1).

        Returns
        -------
        resid [, mean, likelihood] : array-like
            The residual of the regression and optionally the estimated mean
            and/or the likelihood.
        """

        dim, T = array.shape

        if dim <= 2:
            if return_likelihood:
                return array[target_var, :], -np.inf
            return array[target_var, :]

        # Implement using PyTorch
        # Standardize
        if standardize:
            array -= array.mean(axis=1).reshape(dim, 1)
            std = array.std(axis=1)
            for i in range(dim):
                if std[i] != 0.:
                    array[i] /= std[i]
            if np.any(std == 0.) and self.verbosity > 0:
                warnings.warn("Possibly constant array!")
            # array /= array.std(axis=1).reshape(dim, 1)
            # if np.isnan(array).any():
            #     raise ValueError("Nans after standardizing, "
            #                      "possibly constant array!")

        target_series = array[target_var, :]
        z = np.fastCopyAndTranspose(array[2:])
        if np.ndim(z) == 1:
            z = z.reshape(-1, 1)

        train_x = torch.tensor(z).float()
        train_y = torch.tensor(target_series).float()

        device_type = 'cuda' if torch.cuda.is_available() else 'cpu'
        output_device = torch.device(device_type)
        train_x, train_y = train_x.to(output_device), train_y.to(output_device)

        if device_type == 'cuda':
            # If GPU is available, use MultiGPU with Kernel Partitioning
            n_devices = torch.cuda.device_count()
            class mExactGPModel(gpytorch.models.ExactGP):
                def __init__(self, train_x, train_y, likelihood, n_devices):
                    super(mExactGPModel, self).__init__(train_x, train_y, likelihood)
                    self.mean_module = gpytorch.means.ConstantMean()
                    base_covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel())

                    self.covar_module = gpytorch.kernels.MultiDeviceKernel(
                        base_covar_module, device_ids=range(n_devices),
                        output_device=output_device
                    )

                def forward(self, x):
                    mean_x = self.mean_module(x)
                    covar_x = self.covar_module(x)
                    return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)

            def mtrain(train_x,
                      train_y,
                      n_devices,
                      output_device,
                      checkpoint_size,
                      preconditioner_size,
                      n_training_iter,
                      ):
                likelihood = gpytorch.likelihoods.GaussianLikelihood().to(output_device)
                model = mExactGPModel(train_x, train_y, likelihood, n_devices).to(output_device)
                model.train()
                likelihood.train()

                optimizer = FullBatchLBFGS(model.parameters(), lr=lr)
                # "Loss" for GPs - the marginal log likelihood
                mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model)

                with gpytorch.beta_features.checkpoint_kernel(checkpoint_size), \
                        gpytorch.settings.max_preconditioner_size(preconditioner_size):

                    def closure():
                        optimizer.zero_grad()
                        output = model(train_x)
                        loss = -mll(output, train_y)
                        return loss

                    loss = closure()
                    loss.backward()

                    for i in range(n_training_iter):
                        options = {'closure': closure, 'current_loss': loss, 'max_ls': 10}
                        loss, _, _, _, _, _, _, fail = optimizer.step(options)

                        '''print('Iter %d/%d - Loss: %.3f   lengthscale: %.3f   noise: %.3f' % (
                            i + 1, n_training_iter, loss.item(),
                            model.covar_module.module.base_kernel.lengthscale.item(),
                            model.likelihood.noise.item()
                        ))'''

                        if fail:
                            # print('Convergence reached!')
                            break

                return model, likelihood, mll

            def find_best_gpu_setting(train_x,
                                      train_y,
                                      n_devices,
                                      output_device,
                                      preconditioner_size
                                      ):
                N = train_x.size(0)

                # Find the optimum partition/checkpoint size by decreasing in powers of 2
                # Start with no partitioning (size = 0)
                settings = [0] + [int(n) for n in np.ceil(N / 2 ** np.arange(1, np.floor(np.log2(N))))]

                for checkpoint_size in settings:
                    print('Number of devices: {} -- Kernel partition size: {}'.format(n_devices, checkpoint_size))
                    try:
                        # Try a full forward and backward pass with this setting to check memory usage
                        _, _, _ = mtrain(train_x, train_y,
                                     n_devices=n_devices, output_device=output_device,
                                     checkpoint_size=checkpoint_size,
                                     preconditioner_size=preconditioner_size, n_training_iter=1)

                        # when successful, break out of for-loop and jump to finally block
                        break
                    except RuntimeError as e:
                        pass
                    except AttributeError as e:
                        pass
                    finally:
                        # handle CUDA OOM error
                        gc.collect()
                        torch.cuda.empty_cache()
                return checkpoint_size

            # Set a large enough preconditioner size to reduce the number of CG iterations run
            preconditioner_size = 100
            if self.checkpoint_size is None:
                self.checkpoint_size = find_best_gpu_setting(train_x, train_y,
                                                        n_devices=n_devices,
                                                        output_device=output_device,
                                                        preconditioner_size=preconditioner_size)

            model, likelihood, mll = mtrain(train_x, train_y,
                                      n_devices=n_devices, output_device=output_device,
                                      checkpoint_size=self.checkpoint_size,
                                      preconditioner_size=100,
                                      n_training_iter=training_iter)

            # Get into evaluation (predictive posterior) mode
            model.eval()
            likelihood.eval()

            # Make predictions by feeding model through likelihood
            with torch.no_grad(), gpytorch.settings.fast_pred_var(), gpytorch.beta_features.checkpoint_kernel(1000):
                mean = model(train_x).loc.detach()
                loglik = mll(model(train_x), train_y)*T

            resid = (train_y - mean).detach().cpu().numpy()
            mean = mean.detach().cpu().numpy()

        else:
            # If only CPU is available, we will use the simplest form of GP model, exact inference
            class ExactGPModel(gpytorch.models.ExactGP):
                def __init__(self, train_x, train_y, likelihood):
                    super(ExactGPModel, self).__init__(
                        train_x, train_y, likelihood)
                    self.mean_module = gpytorch.means.ConstantMean()

                    # We only use the RBF kernel here, the WhiteNoiseKernel is deprecated
                    # and its featured integrated into the Likelihood-Module.
                    self.covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel())

                def forward(self, x):
                    mean_x = self.mean_module(x)
                    covar_x = self.covar_module(x)
                    return gpytorch.distributions.MultivariateNormal(mean_x, covar_x)

            # initialize likelihood and model
            likelihood = gpytorch.likelihoods.GaussianLikelihood()
            model = ExactGPModel(train_x, train_y, likelihood)

            # Find optimal model hyperparameters
            model.train()
            likelihood.train()

            # Use the adam optimizer
            # Includes GaussianLikelihood parameters
            optimizer = torch.optim.Adam(model.parameters(), lr=lr)

            # "Loss" for GPs - the marginal log likelihood
            mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model)

            for i in range(training_iter):
                # Zero gradients from previous iteration
                optimizer.zero_grad()
                # Output from model
                output = model(train_x)

                # Calc loss and backprop gradients
                loss = -mll(output, train_y)
                loss.backward()
                optimizer.step()

            # Get into evaluation (predictive posterior) mode
            model.eval()
            likelihood.eval()

            # Make predictions by feeding model through likelihood
            with torch.no_grad(), gpytorch.settings.fast_pred_var():
                mean = model(train_x).loc.detach()
                loglik = mll(model(train_x), train_y) * T

            resid = (train_y - mean).detach().numpy()
            mean = mean.detach().numpy()

        if return_means and not return_likelihood:
            return resid, mean
        elif return_likelihood and not return_means:
            return resid, loglik
        elif return_means and return_likelihood:
            return resid, mean, loglik
        return resid

    def _get_model_selection_criterion(self, j, parents, tau_max=0):
        """Returns log marginal likelihood for GP regression.

        Fits a GP model of the parents to variable j and returns the negative
        log marginal likelihood as a model selection score. Is used to determine
        optimal hyperparameters in PCMCI, in particular the pc_alpha value.

        Parameters
        ----------
        j : int
            Index of target variable in data array.

        parents : list
            List of form [(0, -1), (3, -2), ...] containing parents.

        tau_max : int, optional (default: 0)
            Maximum time lag. This may be used to make sure that estimates for
            different lags in X, Z, all have the same sample size.

        Returns:
        score : float
            Model score.
        """

        Y = [(j, 0)]
        X = [(j, 0)]   # dummy variable here
        Z = parents
        array, xyz, _ = \
            self.cond_ind_test.dataframe.construct_array(
                X=X, Y=Y, Z=Z,
                tau_max=tau_max,
                mask_type=self.cond_ind_test.mask_type,
                return_cleaned_xyz=False,
                do_checks=True,
                verbosity=self.verbosity)

        dim, T = array.shape

        _, logli = self._get_single_residuals(array,
                                              target_var=1,
                                              return_likelihood=True)

        score = -logli
        return score


[docs]class GPDCtorch(CondIndTest): r"""GPDC conditional independence test based on Gaussian processes and distance correlation. Here with gpytorch implementation. GPDC is based on a Gaussian process (GP) regression and a distance correlation test on the residuals [2]_. GP is estimated with gpytorch. The distance correlation test is implemented with the dcor package available from pip. Here the null distribution is not analytically available, but can be precomputed with the function generate_and_save_nulldists(...) which saves a \*.npz file containing the null distribution for different sample sizes. This file can then be supplied as null_dist_filename. Notes ----- GPDC is based on a Gaussian process (GP) regression and a distance correlation test on the residuals. Distance correlation is described in [2]_. To test :math:`X \perp Y | Z`, first :math:`Z` is regressed out from :math:`X` and :math:`Y` assuming the model .. math:: X & = f_X(Z) + \epsilon_{X} \\ Y & = f_Y(Z) + \epsilon_{Y} \\ \epsilon_{X,Y} &\sim \mathcal{N}(0, \sigma^2) using GP regression. Here :math:`\sigma^2` and the kernel bandwidth are optimzed using ``gpytorch``. Then the residuals are transformed to uniform marginals yielding :math:`r_X,r_Y` and their dependency is tested with .. math:: \mathcal{R}\left(r_X, r_Y\right) The null distribution of the distance correlation should be pre-computed. Otherwise it is computed during runtime. Parameters ---------- null_dist_filename : str, otional (default: None) Path to file containing null distribution. **kwargs : Arguments passed on to parent class GaussProcRegTorch. """ @property def measure(self): """ Concrete property to return the measure of the independence test """ return self._measure def __init__(self, null_dist_filename=None, **kwargs): self._measure = 'gp_dc' self.two_sided = False self.residual_based = True # Call the parent constructor CondIndTest.__init__(self, **kwargs) # Build the regressor self.gauss_pr = GaussProcRegTorch(self.sig_samples, self, null_dist_filename=null_dist_filename, verbosity=self.verbosity) if self.verbosity > 0: print("null_dist_filename = %s" % self.gauss_pr.null_dist_filename) print("") def _load_nulldist(self, filename): r""" Load a precomputed null distribution from a \*.npz file. This distribution can be calculated using generate_and_save_nulldists(...). Parameters ---------- filename : strng Path to the \*.npz file Returns ------- null_dists, null_samples : dict, int The null distirbution as a dictionary of distributions keyed by sample size, the number of null samples in total. """ return self.gauss_pr._load_nulldist(filename)
[docs] def generate_nulldist(self, df, add_to_null_dists=True): """Generates null distribution for pairwise independence tests. Generates the null distribution for sample size df. Assumes pairwise samples transformed to uniform marginals. Uses get_dependence_measure available in class and generates self.sig_samples random samples. Adds the null distributions to self.gauss_pr.null_dists. Parameters ---------- df : int Degrees of freedom / sample size to generate null distribution for. add_to_null_dists : bool, optional (default: True) Whether to add the null dist to the dictionary of null dists or just return it. Returns ------- null_dist : array of shape [df,] Only returned,if add_to_null_dists is False. """ return self.gauss_pr._generate_nulldist(df, add_to_null_dists)
[docs] def generate_and_save_nulldists(self, sample_sizes, null_dist_filename): """Generates and saves null distribution for pairwise independence tests. Generates the null distribution for different sample sizes. Calls generate_nulldist. Null dists are saved to disk as self.null_dist_filename.npz. Also adds the null distributions to self.gauss_pr.null_dists. Parameters ---------- sample_sizes : list List of sample sizes. null_dist_filename : str Name to save file containing null distributions. """ self.gauss_pr._generate_and_save_nulldists(sample_sizes, null_dist_filename)
def _get_single_residuals(self, array, target_var, return_means=False, standardize=True, return_likelihood=False, training_iter=50, lr=0.1): """Returns residuals of Gaussian process regression. Performs a GP regression of the variable indexed by target_var on the conditions Z. Here array is assumed to contain X and Y as the first two rows with the remaining rows (if present) containing the conditions Z. Optionally returns the estimated mean and the likelihood. Parameters ---------- array : array-like data array with X, Y, Z in rows and observations in columns target_var : {0, 1} Variable to regress out conditions from. standardize : bool, optional (default: True) Whether to standardize the array beforehand. return_means : bool, optional (default: False) Whether to return the estimated regression line. return_likelihood : bool, optional (default: False) Whether to return the log_marginal_likelihood of the fitted GP training_iter : int, optional (default: 50) Number of training iterations. lr : float, optional (default: 0.1) Learning rate (default: 0.1). Returns ------- resid [, mean, likelihood] : array-like The residual of the regression and optionally the estimated mean and/or the likelihood. """ return self.gauss_pr._get_single_residuals( array, target_var, return_means, standardize, return_likelihood, training_iter, lr)
[docs] def get_model_selection_criterion(self, j, parents, tau_max=0): """Returns log marginal likelihood for GP regression. Fits a GP model of the parents to variable j and returns the negative log marginal likelihood as a model selection score. Is used to determine optimal hyperparameters in PCMCI, in particular the pc_alpha value. Parameters ---------- j : int Index of target variable in data array. parents : list List of form [(0, -1), (3, -2), ...] containing parents. tau_max : int, optional (default: 0) Maximum time lag. This may be used to make sure that estimates for different lags in X, Z, all have the same sample size. Returns: score : float Model score. """ return self.gauss_pr._get_model_selection_criterion(j, parents, tau_max)
[docs] def get_dependence_measure(self, array, xyz): """Return GPDC measure. Estimated as the distance correlation of the residuals of a GP regression. Parameters ---------- array : array-like data array with X, Y, Z in rows and observations in columns xyz : array of ints XYZ identifier array of shape (dim,). Returns ------- val : float GPDC test statistic. """ x_vals = self._get_single_residuals(array, target_var=0) y_vals = self._get_single_residuals(array, target_var=1) val = self._get_dcorr(np.array([x_vals, y_vals])) return val
def _get_dcorr(self, array_resid): """Return distance correlation coefficient. The variables are transformed to uniform marginals using the empirical cumulative distribution function beforehand. Here the null distribution is not analytically available, but can be precomputed with the function generate_and_save_nulldists(...) which saves a *.npz file containing the null distribution for different sample sizes. This file can then be supplied as null_dist_filename. Parameters ---------- array_resid : array-like data array must be of shape (2, T) Returns ------- val : float Distance correlation coefficient. """ # Remove ties before applying transformation to uniform marginals # array_resid = self._remove_ties(array_resid, verbosity=4) x_vals, y_vals = self._trafo2uniform(array_resid) val = dcor.distance_correlation(x_vals, y_vals, method='AVL') return val
[docs] def get_shuffle_significance(self, array, xyz, value, return_null_dist=False): """Returns p-value for shuffle significance test. For residual-based test statistics only the residuals are shuffled. Parameters ---------- array : array-like data array with X, Y, Z in rows and observations in columns xyz : array of ints XYZ identifier array of shape (dim,). value : number Value of test statistic for unshuffled estimate. Returns ------- pval : float p-value """ x_vals = self._get_single_residuals(array, target_var=0) y_vals = self._get_single_residuals(array, target_var=1) array_resid = np.array([x_vals, y_vals]) xyz_resid = np.array([0, 1]) null_dist = self._get_shuffle_dist(array_resid, xyz_resid, self.get_dependence_measure, sig_samples=self.sig_samples, sig_blocklength=self.sig_blocklength, verbosity=self.verbosity) pval = (null_dist >= value).mean() if return_null_dist: return pval, null_dist return pval
[docs] def get_analytic_significance(self, value, T, dim, xyz): """Returns p-value for the distance correlation coefficient. The null distribution for necessary degrees of freedom (df) is loaded. If not available, the null distribution is generated with the function generate_nulldist(). It is recommended to generate the nulldists for a wide range of sample sizes beforehand with the function generate_and_save_nulldists(...). The distance correlation coefficient is one-sided. If the degrees of freedom are less than 1, numpy.nan is returned. Parameters ---------- value : float Test statistic value. T : int Sample length dim : int Dimensionality, ie, number of features. xyz : array of ints XYZ identifier array of shape (dim,). Returns ------- pval : float or numpy.nan p-value. """ # GP regression approximately doesn't cost degrees of freedom df = T if df < 1: pval = np.nan else: # idx_near = (np.abs(self.sample_sizes - df)).argmin() if int(df) not in list(self.gauss_pr.null_dists): # if np.abs(self.sample_sizes[idx_near] - df) / float(df) > 0.01: if self.verbosity > 0: print("Null distribution for GPDC not available " "for deg. of freed. = %d." % df) self.generate_nulldist(df) null_dist_here = self.gauss_pr.null_dists[int(df)] pval = np.mean(null_dist_here > np.abs(value)) return pval