# Source code for tigramite.independence_tests.gsquared

"""Tigramite causal discovery for time series."""

# Author: Sagar Nagaraj Simha, Jakob Runge <jakob@jakob-runge.com>
#

from __future__ import print_function
from scipy import special, spatial
import numpy as np

from scipy.stats import chi2
from scipy.special import xlogy
from scipy.stats.contingency import crosstab
from scipy.stats.contingency import expected_freq
from scipy.stats.contingency import margins
from .independence_tests_base import CondIndTest

[docs]class Gsquared(CondIndTest):
r"""G-squared conditional independence test for categorical data.

Uses Chi2 as the null distribution and the method from [7]_ to
adjust the degrees of freedom. Valid only asymptotically, recommended are
above 1000-2000 samples (depends on data). For smaller sample sizes use the
CMIsymb class which includes a local permutation test.

Assumes one-dimensional X, Y.

This method requires the scipy.stats package.

Notes
-----
The general formula is

.. math:: G(X;Y|Z) &= 2 n \sum p(z)  \sum \sum  p(x,y|z) \log
\frac{ p(x,y |z)}{p(x|z)\cdot p(y |z)}

where :math:n is the sample size. This is simply :math:2 n CMI(X;Y|Z).

References
----------

.. [7] Bishop, Y.M.M., Fienberg, S.E. and Holland, P.W. (1975) Discrete
Multivariate Analysis: Theory and Practice. MIT Press, Cambridge.

Parameters
----------
n_symbs : int, optional (default: None)
Number of symbols in input data. Should be at least as large as the
maximum array entry + 1. If None, n_symbs is inferred by scipy's crosstab

**kwargs :
Arguments passed on to parent class CondIndTest.
"""
@property
def measure(self):
"""
Concrete property to return the measure of the independence test
"""
return self._measure

def __init__(self,
n_symbs=None,
**kwargs):

# Setup the member variables
self._measure = 'gsquared'
self.n_symbs = n_symbs
self.two_sided = False
self.residual_based = False
self.recycle_residuals = False
CondIndTest.__init__(self, **kwargs)

if self.verbosity > 0:
print("n_symbs = %s" % self.n_symbs)
print("")

[docs]    def get_dependence_measure(self, array, xyz):
"""Returns Gsquared/G-test test statistic.

Parameters
----------
array : array-like
data array with X, Y, Z in rows and observations in columns.

xyz : array of ints
XYZ identifier array of shape (dim,).

Returns
-------
val : float
G-squared estimate.
"""
_, T = array.shape
z_indices = np.where(xyz == 2)[0]

# Flip 2D-array so that order is ([zn...z0, ym...y0, xk...x0], T). The
# contingency table is built in this order to ease creating subspaces
# of Z=z.
array_flip = np.flipud(array)

# When n_symbs is given, levels=range(0, n_symbs). If data does not
# have a symbol in levels, then count=0 in the corresponding N-D
# position of contingency table. If levels does not contain a certain
# symbol that is present in the data, then the symbol from data is
# ignored. If None, then levels are inferred from data (default).

if self.n_symbs is None:
levels = None
else:
levels = np.tile(np.arange(self.n_symbs), (len(xyz), 1))
# Assuming same list of levels for (z, y, x).

_, observed = crosstab(*(np.asarray(np.split(array_flip, len(xyz), axis=0)).reshape((-1, T))), levels=levels,
sparse=False)

observed_shape = observed.shape

gsquare = 0.0
dof = 0

# The following loop is over the z-subspace to sum over the G-squared
# statistic and count empty entries to adjust the degrees of freedom.

# TODO: Can be further optimized to operate entirely on observed array
# without 'for', to operate only within slice of z. sparse=True can
# also optimize further.

# For each permutation of z = (zn ... z1, z0). Example - (0...1,0,1)
for zs in np.ndindex(observed_shape[:len(z_indices)]):
observedYX = observed[zs]
mY, mX = margins(observedYX)

if(np.sum(mY)!=0):
expectedYX = expected_freq(observedYX)
gsquare += 2 * np.sum(xlogy(observedYX, observedYX)
- xlogy(observedYX, expectedYX))

# Check how many rows and columns are all-zeros. i.e. how may
# marginals are zero in expected-frq
nzero_rows = np.sum(~expectedYX.any(axis=1))
nzero_cols = np.sum(~expectedYX.any(axis=0))

# Compute dof. Reduce by 1 dof for every marginal row & column=
# Bishop, 1975].
cardYX = observedYX.shape
dof += ((cardYX[0] - 1 - nzero_rows) * (cardYX[1] - 1 - nzero_cols))

# dof cannot be lesser than 1
dof = max(dof, 1)
self._temp_dof = dof
return gsquare

[docs]    def get_analytic_significance(self, value, T, dim, xyz):
"""Return the p_value of test statistic value 'value', according to a
chi-square distribution with 'dof' degrees of freedom."""

# Calculate the p_value
p_value = chi2.sf(value, self._temp_dof)
del self._temp_dof

return p_value

if __name__ == '__main__':

import tigramite
from tigramite.data_processing import DataFrame
import tigramite.data_processing as pp
import numpy as np

seed=42
random_state = np.random.default_rng(seed=seed)
cmi = Gsquared()

T = 1000
dimz = 3
z = random_state.binomial(n=1, p=0.5, size=(T, dimz)).reshape(T, dimz)
x = np.empty(T).reshape(T, 1)
y = np.empty(T).reshape(T, 1)
for t in range(T):
val = z[t, 0].squeeze()
prob = 0.2+val*0.6
x[t] = random_state.choice([0,1], p=[prob, 1.-prob])
y[t] = random_state.choice([0,1, 2], p=[prob, (1.-prob)/2., (1.-prob)/2.])

print('start')
print(cmi.run_test_raw(x, y, z=None))
print(cmi.run_test_raw(x, y, z=z))