# Source code for tigramite.causal_effects

```"""Tigramite causal inference for time series."""

# Author: Jakob Runge <jakob@jakob-runge.com>
#

import numpy as np
import math
import itertools
from copy import deepcopy
from collections import defaultdict
from tigramite.models import Models
import struct

[docs]class CausalEffects():
r"""Causal effect estimation.

Methods for the estimation of linear or non-parametric causal effects
between (potentially multivariate) X and Y (potentially conditional
on S) by (generalized) backdoor adjustment. Various graph types are
supported, also including hidden variables.

Linear and non-parametric estimators are based on sklearn. For the
linear case without hidden variables also an efficient estimation
based on Wright's path coefficients is available. This estimator
also allows to estimate mediation effects.

See the corresponding paper [6]_ and tigramite tutorial for an
in-depth introduction.

References
----------

.. [6] J. Runge, Necessary and sufficient graphical conditions for
optimal adjustment sets in causal graphical models with
hidden variables, Advances in Neural Information Processing
Systems, 2021, 34
https://proceedings.neurips.cc/paper/2021/hash/8485ae387a981d783f8764e508151cd9-Abstract.html

Parameters
----------
graph : array of either shape [N, N], [N, N, tau_max+1], or [N, N, tau_max+1, tau_max+1]
Different graph types are supported, see tutorial.
X : list of tuples
List of tuples [(i, -tau), ...] containing cause variables.
Y : list of tuples
List of tuples [(j, 0), ...] containing effect variables.
S : list of tuples
List of tuples [(i, -tau), ...] containing conditioned variables.
graph_type : str
Type of graph.
hidden_variables : list of tuples
Hidden variables in format [(i, -tau), ...]. The internal graph is
constructed by a latent projection.
check_SM_overlap : bool
Whether to check whether S overlaps with M.
verbosity : int, optional (default: 0)
Level of verbosity.
"""

def __init__(self,
graph,
graph_type,
X,
Y,
S=None,
hidden_variables=None,
check_SM_overlap=True,
verbosity=0):

self.verbosity = verbosity
self.N = graph.shape[0]

if S is None:
S = []

self.listX = list(X)
self.listY = list(Y)
self.listS = list(S)

self.X = set(X)
self.Y = set(Y)
self.S = set(S)

#
# Checks regarding graph type
#
supported_graphs = ['dag',
'tsg_dag',
'stationary_dag',

# 'mag',
# 'tsg_mag',
# 'stationary_mag',
# 'pag',
# 'tsg_pag',
# 'stationary_pag',
]
if graph_type not in supported_graphs:
raise ValueError("Only graph types %s supported!" %supported_graphs)

# TODO?: check that masking aligns with hidden samples in variables
if hidden_variables is None:
hidden_variables = []

self.hidden_variables = set(hidden_variables)
if len(self.hidden_variables.intersection(self.X.union(self.Y).union(self.S))) > 0:
raise ValueError("XYS overlaps with hidden_variables!")

# Only needed for later extension to MAG/PAGs
if 'pag' in graph_type:
self.possible = True
self.definite_status = True
else:
self.possible = False
self.definite_status = False

# Not needed for now...
# self.ignore_time_bounds = False

# Construct internal graph from input graph depending on graph type
# and hidden variables
self._construct_graph(graph=graph, graph_type=graph_type,
hidden_variables=hidden_variables)

self._check_graph(self.graph)

self._check_XYS()

self.ancX = self._get_ancestors(X)
self.ancY = self._get_ancestors(Y)
self.ancS = self._get_ancestors(S)

# If X is not in anc(Y), then no causal link exists
if self.ancY.intersection(set(X)) == set():
self.no_causal_path = True
if self.verbosity > 0:
print("No causal path from X to Y exists.")
else:
self.no_causal_path = False

# Get mediators
mediators = self.get_mediators(start=self.X, end=self.Y)

M = set(mediators)
self.M = M

self.listM = list(self.M)

for varlag in self.X.union(self.Y).union(self.S):
if abs(varlag[1]) > self.tau_max:
raise ValueError("X, Y, S must have time lags inside graph.")

if len(self.X.intersection(self.Y)) > 0:
raise ValueError("Overlap between X and Y.")

if len(self.S.intersection(self.Y.union(self.X))) > 0:
raise ValueError("Conditions S overlap with X or Y.")

# # TODO: need to prove that this is sufficient for non-identifiability!
# if len(self.X.intersection(self._get_descendants(self.M))) > 0:
#     raise ValueError("Not identifiable: Overlap between X and des(M)")

if check_SM_overlap and len(self.S.intersection(self.M)) > 0:
raise ValueError("Conditions S overlap with mediators M.")

self.desX = self._get_descendants(self.X)
self.desY = self._get_descendants(self.Y)
self.desM = self._get_descendants(self.M)
self.descendants = self.desY.union(self.desM)

# Define forb as X and descendants of YM
self.forbidden_nodes = self.descendants.union(self.X)  #.union(S)

# Define valid ancestors
self.vancs = self.ancX.union(self.ancY).union(self.ancS) - self.forbidden_nodes

if self.verbosity > 0:
if len(self.S.intersection(self.desX)) > 0:
print("Warning: Potentially outside assumptions: Conditions S overlap with des(X)")

# Here only check if S overlaps with des(Y), leave the option that S
# contains variables in des(M) to the user
if len(self.S.intersection(self.desY)) > 0:
raise ValueError("Not identifiable: Conditions S overlap with des(Y).")

if self.verbosity > 0:
print("\n##\n## Initializing CausalEffects class\n##"
"\n\nInput:")
print("\ngraph_type = %s" % graph_type
+ "\nX = %s" % self.listX
+ "\nY = %s" % self.listY
+ "\nS = %s" % self.listS
+ "\nM = %s" % self.listM
)
if len(self.hidden_variables) > 0:
print("\nhidden_variables = %s" % self.hidden_variables
)
print("\n\n")
if self.no_causal_path:
print("No causal path from X to Y exists!")

def _construct_graph(self, graph, graph_type, hidden_variables):
"""Construct internal graph object based on input graph and hidden variables.

Uses the latent projection operation.
"""

if graph.ndim != 2:
raise ValueError("graph_type in ['dag', 'admg'] assumes graph.shape=(N, N).")
# Convert to shape [N, N, 1, 1] with dummy dimension
# to process as tsg_dag or tsg_admg with potential hidden variables
self.graph = np.expand_dims(graph, axis=(2, 3))

# tau_max needed in _get_latent_projection_graph
self.tau_max = 0

if len(hidden_variables) > 0:
self.graph = self._get_latent_projection_graph() # stationary=False)
else:
# graph = self.graph
self.graph_type = 'tsg_' + graph_type

if graph.ndim != 4:
raise ValueError("tsg-graph_type assumes graph.shape=(N, N, tau_max+1, tau_max+1).")

# Then tau_max is implicitely derived from
# the dimensions
self.graph = graph
self.tau_max = graph.shape[2] - 1

if len(hidden_variables) > 0:
self.graph = self._get_latent_projection_graph() #, stationary=False)
else:
self.graph_type = graph_type

# Currently only stationary_dag without hidden variables is supported
if graph.ndim != 3:
raise ValueError("stationary graph_type assumes graph.shape=(N, N, tau_max+1).")

# # TODO: remove if theory for stationary ADMGs is clear
# if graph_type == 'stationary_dag' and len(hidden_variables) > 0:
#     raise ValueError("Hidden variables currently not supported for "
#                      "stationary_dag.")

# For a stationary DAG without hidden variables it's sufficient to consider
# a tau_max that includes the parents of X, Y, M, and S. A conservative
# estimate thereof is simply the lag-dimension of the stationary DAG plus
# the maximum lag of XYS.
statgraph_tau_max = graph.shape[2] - 1
maxlag_XYS = 0
for varlag in self.X.union(self.Y).union(self.S):
maxlag_XYS = max(maxlag_XYS, abs(varlag[1]))

self.tau_max = maxlag_XYS + statgraph_tau_max

stat_graph = deepcopy(graph)

#########################################
# Use this tau_max and construct ADMG by assuming paths of
# maximal lag 10*tau_max... TO BE REVISED!
self.graph = graph
self.graph = self._get_latent_projection_graph(stationary=True)
#########################################

# Also create stationary graph extended to tau_max
self.stationary_graph = np.zeros((self.N, self.N, self.tau_max + 1), dtype='<U3')
self.stationary_graph[:, :, :stat_graph.shape[2]] = stat_graph

# allowed_edges = ["-->", "<--"]

# # Construct tsg_graph
# graph = np.zeros((self.N, self.N, self.tau_max + 1, self.tau_max + 1), dtype='<U3')
# graph[:] = ""
# for (i, j) in itertools.product(range(self.N), range(self.N)):
#     for jt, tauj in enumerate(range(0, self.tau_max + 1)):
#         for it, taui in enumerate(range(tauj, self.tau_max + 1)):
#             tau = abs(taui - tauj)
#             if tau == 0 and j == i:
#                 continue
#             if tau > statgraph_tau_max:
#                 continue

#             # if tau == 0:
#             #     if stat_graph[i, j, tau] == '-->':
#             #         graph[i, j, taui, tauj] = "-->"
#             #         graph[j, i, tauj, taui] = "<--"

#             #     # elif stat_graph[i, j, tau] == '<--':
#             #     #     graph[i, j, taui, tauj] = "<--"
#             #     #     graph[j, i, tauj, taui] = "-->"
#             # else:
#             if stat_graph[i, j, tau] == '-->':
#                 graph[i, j, taui, tauj] = "-->"
#                 graph[j, i, tauj, taui] = "<--"
#             elif stat_graph[i, j, tau] == '<--':
#                 pass
#             elif stat_graph[i, j, tau] == '':
#                 pass
#             else:
#                 edge = stat_graph[i, j, tau]
#                 raise ValueError("Invalid graph edge %s. " %(edge) +
#                      "For graph_type = %s only %s are allowed." %(graph_type, str(allowed_edges)))

#             # elif stat_graph[i, j, tau] == '<--':
#             #     graph[i, j, taui, tauj] = "<--"
#             #     graph[j, i, tauj, taui] = "-->"

# self.graph_type = 'tsg_dag'
# self.graph = graph

# return (graph, graph_type, self.tau_max, hidden_variables)

# max_lag = self._get_maximum_possible_lag(XYZ=list(X.union(Y).union(S)), graph=graph)

# stat_mediators = self._get_mediators_stationary_graph(start=X, end=Y, max_lag=max_lag)
# self.tau_max = self._get_maximum_possible_lag(XYZ=list(X.union(Y).union(S).union(stat_mediators)), graph=graph)
# self.tau_max = graph_taumax
# for varlag in X.union(Y).union(S):
#     self.tau_max = max(self.tau_max, abs(varlag[1]))

# if verbosity > 0:
#     print("Setting tau_max = ", self.tau_max)

# if tau_max is None:
#     self.tau_max = graph_taumax
#     for varlag in X.union(Y).union(S):
#         self.tau_max = max(self.tau_max, abs(varlag[1]))

#     if verbosity > 0:
#         print("Setting tau_max = ", self.tau_max)
# else:
# self.tau_max = graph_taumax
# # Repeat hidden variable pattern
# # if larger tau_max is given
# if self.tau_max > graph_taumax:
#     for lag in range(graph_taumax + 1, self.tau_max + 1):
#         for j in range(self.N):
#             if (j, -(lag % (graph_taumax+1))) in self.hidden_variables:
# print(self.hidden_variables)

#     self.graph = self._get_latent_projection_graph(self.graph, stationary=True)
# else:

def _check_XYS(self):
"""Check whether XYS are sober.
"""

XYS = self.X.union(self.Y).union(self.S)
for xys in XYS:
var, lag = xys
if var < 0 or var >= self.N:
raise ValueError("XYS vars must be in [0...N]")
if lag < -self.tau_max or lag > 0:
raise ValueError("XYS lags must be in [-taumax...0]")

[docs]    def check_XYS_paths(self):
"""Check whether one can remove nodes from X and Y with no proper causal paths.

Returns
-------
X, Y : cleaned lists of X and Y with irrelevant nodes removed.
"""

# TODO: Also check S...
oldX = self.X.copy()
oldY = self.Y.copy()

# anc_Y = self._get_ancestors(self.Y)
# anc_S = self._get_ancestors(self.S)

# Remove first from X those nodes with no causal path to Y or S
X = set([x for x in self.X if x in self.ancY.union(self.ancS)])

# Remove from Y those nodes with no causal path from X
# des_X = self._get_descendants(X)

Y = set([y for y in self.Y if y in self.desX])

# Also require that all x in X have proper path to Y or S,
# that is, the first link goes out of x
# and into path nodes
mediators_S = self.get_mediators(start=self.X, end=self.S)
path_nodes = list(self.M.union(Y).union(mediators_S))
X = X.intersection(self._get_all_parents(path_nodes))

if set(oldX) != set(X) and self.verbosity > 0:
print("Consider pruning X = %s to X = %s " %(oldX, X) +
"since only these have causal path to Y")

if set(oldY) != set(Y) and self.verbosity > 0:
print("Consider pruning Y = %s to Y = %s " %(oldY, Y) +
"since only these have causal path from X")

return (list(X), list(Y))

def _check_graph(self, graph):
"""Checks that graph contains no invalid entries/structure.

Assumes graph.shape = (N, N, tau_max+1, tau_max+1)
"""

allowed_edges = ["-->", "<--"]
allowed_edges += ["<->", "<-+", "+->"]
elif 'mag' in self.graph_type:
allowed_edges += ["<->"]
elif 'pag' in self.graph_type:
allowed_edges += ["<->", "o-o", "o->", "<-o"]                         # "o--",
# "--o",
# "x-o",
# "o-x",
# "x--",
# "--x",
# "x->",
# "<-x",
# "x-x",
# ]

graph_dict = defaultdict(list)
for i, j, taui, tauj in zip(*np.where(graph)):
edge = graph[i, j, taui, tauj]
# print((i, -taui), edge, (j, -tauj), graph[j, i, tauj, taui])
if edge != self._reverse_link(graph[j, i, tauj, taui]):
raise ValueError(
"graph needs to have consistent edges (eg"
" graph[i,j,taui,tauj]='-->' requires graph[j,i,tauj,taui]='<--')"
)

if edge not in allowed_edges:
raise ValueError("Invalid graph edge %s. " %(edge) +
"For graph_type = %s only %s are allowed." %(self.graph_type, str(allowed_edges)))

if edge == "-->" or edge == "+->":
# Map to (i,-taui, j, tauj) graph
indexi = i * (self.tau_max + 1) + taui
indexj = j * (self.tau_max + 1) + tauj

graph_dict[indexj].append(indexi)

# Check for cycles
if self._check_cyclic(graph_dict):
raise ValueError("graph is cyclic.")

# if MAG: check for almost cycles
# if PAG???

def _check_cyclic(self, graph_dict):
"""Return True if the graph_dict has a cycle.

graph_dict must be represented as a dictionary mapping vertices to
iterables of neighbouring vertices. For example:

>>> cyclic({1: (2,), 2: (3,), 3: (1,)})
True
>>> cyclic({1: (2,), 2: (3,), 3: (4,)})
False
"""

path = set()
visited = set()

def visit(vertex):
if vertex in visited:
return False
for neighbour in graph_dict.get(vertex, ()):
if neighbour in path or visit(neighbour):
return True
path.remove(vertex)
return False

return any(visit(v) for v in graph_dict)

[docs]    def get_mediators(self, start, end):
"""Returns mediator variables on proper causal paths.

Parameters
----------
start : set
Set of start nodes.
end : set
Set of end nodes.

Returns
-------
mediators : set
Mediators on causal paths from start to end.
"""

des_X = self._get_descendants(start)

mediators = set()

# Walk along proper causal paths backwards from Y to X
# potential_mediators = set()
for y in end:
j, tau = y
this_level = [y]
while len(this_level) > 0:
next_level = []
for varlag in this_level:
for parent in self._get_parents(varlag):
i, tau = parent
# print(varlag, parent, des_X)
if (parent in des_X
and parent not in mediators
# and parent not in potential_mediators
and parent not in start
and parent not in end
and (-self.tau_max <= tau <= 0)): # or self.ignore_time_bounds)):
mediators = mediators.union(set([parent]))
next_level.append(parent)

this_level = next_level

return mediators

def _get_mediators_stationary_graph(self, start, end, max_lag):
"""Returns mediator variables on proper causal paths
from X to Y in a stationary graph."""

des_X = self._get_descendants_stationary_graph(start, max_lag)

mediators = set()

# Walk along proper causal paths backwards from Y to X
potential_mediators = set()
for y in end:
j, tau = y
this_level = [y]
while len(this_level) > 0:
next_level = []
for varlag in this_level:
node=varlag, patterns=["<*-", "<*+"], max_lag=max_lag, exclude=None):
i, tau = parent
if (parent in des_X
and parent not in mediators
# and parent not in potential_mediators
and parent not in start
and parent not in end
# and (-self.tau_max <= tau <= 0 or self.ignore_time_bounds)
):
mediators = mediators.union(set([parent]))
next_level.append(parent)

this_level = next_level

return mediators

"""Reverse a given link, taking care to replace > with < and vice versa."""

return ""

left_mark = "<"
else:

right_mark = ">"
else:

return left_mark + link[1] + right_mark

"""Matches pattern including wildcards with link.

In an ADMG we have edge types ["-->", "<--", "<->", "+->", "<-+"].
Here +-> corresponds to having both "-->" and "<->".

In a MAG we have edge types   ["-->", "<--", "<->", "---"].
"""

if pattern == '' or link == '':
return True if pattern == link else False
else:
left_mark, middle_mark, right_mark = pattern
if left_mark != '*':
if link[0] != left_mark: return False

if right_mark != '*':
if link[2] != right_mark: return False

if middle_mark != '*' and link[1] != middle_mark: return False

return True

"""Find adjacencies of node that match given patterns."""

graph = self.graph

if exclude is None:
exclude = []
#     exclude = self.hidden_variables
# else:
#     exclude = set(exclude).union(self.hidden_variables)

# Setup
i, lag_i = node
lag_i = abs(lag_i)

if exclude is None: exclude = []
if type(patterns) == str:
patterns = [patterns]

# Init
for k, lag_ik in zip(*np.where(graph[i,:,lag_i,:])):
# print((k, lag_ik), graph[i,k,lag_i,lag_ik])
# matches = [self._match_link(patt, graph[i,k,lag_i,lag_ik]) for patt in patterns]
# if np.any(matches):
for patt in patterns:
match = (k, -lag_ik)
if match not in exclude:
else:
break

for k, lag_ki in zip(*np.where(graph[:,i,:,lag_i])):
# print((k, lag_ki), graph[k,i,lag_ki,lag_i])
# if np.any(matches):
for patt in patterns:
match = (k, -lag_ki)
if match not in exclude:
else:
break

def _is_match(self, nodei, nodej, pattern_ij):
"""Check whether the link between X and Y agrees with pattern."""

graph = self.graph

(i, lag_i) = nodei
(j, lag_j) = nodej
tauij = lag_j - lag_i
if abs(tauij) >= graph.shape[2]:
return False
return ((tauij >= 0 and self._match_link(pattern_ij, graph[i, j, tauij])) or

def _get_children(self, varlag):
"""Returns set of children (varlag --> ...) for (lagged) varlag."""
if self.possible:
patterns=['-*>', 'o*o', 'o*>']
else:
patterns=['-*>', '+*>']

def _get_parents(self, varlag):
"""Returns set of parents (varlag <-- ...) for (lagged) varlag."""
if self.possible:
patterns=['<*-', 'o*o', '<*o']
else:
patterns=['<*-', '<*+']

def _get_spouses(self, varlag):
"""Returns set of spouses (varlag <-> ...)  for (lagged) varlag."""

def _get_neighbors(self, varlag):
"""Returns set of neighbors (varlag --- ...) for (lagged) varlag."""

def _get_ancestors(self, W):
"""Get ancestors of nodes in W up to time tau_max.

Includes the nodes themselves.
"""

ancestors = set(W)

for w in W:
j, tau = w
this_level = [w]
while len(this_level) > 0:
next_level = []
for varlag in this_level:

for par in self._get_parents(varlag):
i, tau = par
if par not in ancestors and -self.tau_max <= tau <= 0:
ancestors = ancestors.union(set([par]))
next_level.append(par)

this_level = next_level

return ancestors

def _get_all_parents(self, W):
"""Get parents of nodes in W up to time tau_max.

Includes the nodes themselves.
"""

parents = set(W)

for w in W:
j, tau = w
for par in self._get_parents(w):
i, tau = par
if par not in parents and -self.tau_max <= tau <= 0:
parents = parents.union(set([par]))

return parents

def _get_all_spouses(self, W):
"""Get spouses of nodes in W up to time tau_max.

Includes the nodes themselves.
"""

spouses = set(W)

for w in W:
j, tau = w
for spouse in self._get_spouses(w):
i, tau = spouse
if spouse not in spouses and -self.tau_max <= tau <= 0:
spouses = spouses.union(set([spouse]))

return spouses

def _get_descendants_stationary_graph(self, W, max_lag):
"""Get descendants of nodes in W up to time t in stationary graph.

Includes the nodes themselves.
"""

descendants = set(W)

for w in W:
j, tau = w
this_level = [w]
while len(this_level) > 0:
next_level = []
for varlag in this_level:
node=varlag, patterns=["-*>", "-*+"], max_lag=max_lag, exclude=None):
i, tau = child
if (child not in descendants
# and (-self.tau_max <= tau <= 0 or self.ignore_time_bounds)
):
descendants = descendants.union(set([child]))
next_level.append(child)

this_level = next_level

return descendants

def _get_descendants(self, W):
"""Get descendants of nodes in W up to time t.

Includes the nodes themselves.
"""

descendants = set(W)

for w in W:
j, tau = w
this_level = [w]
while len(this_level) > 0:
next_level = []
for varlag in this_level:
for child in self._get_children(varlag):
i, tau = child
if (child not in descendants
and (-self.tau_max <= tau <= 0)): # or self.ignore_time_bounds)):
descendants = descendants.union(set([child]))
next_level.append(child)

this_level = next_level

return descendants

def _get_collider_path_nodes(self, W, descendants):
"""Get non-descendant collider path nodes and their parents of nodes in W up to time t.

"""

collider_path_nodes = set([])
# print("descendants ", descendants)
for w in W:
# print(w)
j, tau = w
this_level = [w]
while len(this_level) > 0:
next_level = []
for varlag in this_level:
# print("\t", varlag, self._get_spouses(varlag))
for spouse in self._get_spouses(varlag):
# print("\t\t", spouse)
i, tau = spouse
if (spouse not in collider_path_nodes
and spouse not in descendants
and (-self.tau_max <= tau <= 0)): # or self.ignore_time_bounds)):
collider_path_nodes = collider_path_nodes.union(set([spouse]))
next_level.append(spouse)

this_level = next_level

for w in collider_path_nodes:
for par in self._get_parents(w):
if (par not in collider_path_nodes
and par not in descendants
and (-self.tau_max <= tau <= 0)): # or self.ignore_time_bounds)):
collider_path_nodes = collider_path_nodes.union(set([par]))

return collider_path_nodes

max_lag=0, exclude=None):
"""Find adjacencies of node matching patterns in a stationary graph."""

# graph = self.graph

# Setup
i, lag_i = node
if exclude is None: exclude = []
if type(patterns) == str:
patterns = [patterns]

# Init

for k, lag_ik in zip(*np.where(graph[i,:,:])):
matches = [self._match_link(patt, graph[i, k, lag_ik]) for patt in patterns]
if np.any(matches):
match = (k, lag_i + lag_ik)
if (k, lag_i + lag_ik) not in exclude and (-max_lag <= lag_i + lag_ik <= 0): # or self.ignore_time_bounds):

for k, lag_ki in zip(*np.where(graph[:,i,:])):
if np.any(matches):
match = (k, lag_i - lag_ki)
if (k, lag_i - lag_ki) not in exclude and (-max_lag <= lag_i - lag_ki <= 0): # or self.ignore_time_bounds):

def _get_canonical_dag_from_graph(self, graph):
"""Constructs canonical DAG as links_coeffs dictionary from graph.

This corresponds to a canonical DAG (Richardson Spirtes 2002).

Can be used to evaluate d-separation.
"""

N, N, tau_maxplusone = graph.shape
tau_max = tau_maxplusone - 1

links = {j: [] for j in range(N)}

latent_index = N
for i, j, tau in zip(*np.where(graph)):

edge_type = graph[i, j, tau]

# Consider contemporaneous links only once
if tau == 0 and j > i:
continue

if edge_type == "-->":
elif edge_type == "<--":
elif edge_type == "<->":
latent_index += 1
# elif edge_type == "---":
#     selection_vars.append(latent_index)
#     latent_index += 1
elif edge_type == "+->":
latent_index += 1
elif edge_type == "<-+":
latent_index += 1

def _get_maximum_possible_lag(self, XYZ, graph):
"""Construct maximum relevant time lag for d-separation in stationary graph.

TO BE REVISED!

"""

seen_j, seen_tauj = seen_path[index + 1]

if (i == seen_i and j == seen_j
and abs(tauj-taui) == abs(seen_tauj-seen_taui)):
return True

return False

# TODO: does this work with PAGs?
# if self.possible:
#     patterns=['<*-', '<*o', 'o*o']
# else:
#     patterns=['<*-']

max_lag = 0
for node in XYZ:
j, tau = node   # tau <= 0
max_lag = max(max_lag, abs(tau))

causal_path = []
queue = [(node, causal_path)]

while queue:
varlag, causal_path = queue.pop()
causal_path = [varlag] + causal_path

var, lag = varlag
i, tautmp = partmp
# Get shifted lag since canonical_dag_links is at t=0
tau = tautmp + lag
par = (i, tau)

if (par not in causal_path):

if len(causal_path) == 1:
queue.append((par, causal_path))
continue

if (len(causal_path) > 1) and not _repeating((par, varlag), causal_path):

max_lag = max(max_lag, abs(tau))
queue.append((par, causal_path))

return max_lag

def _get_latent_projection_graph(self, stationary=False):
"""For DAGs/ADMGs uses the Latent projection operation (Pearl 2009).

Assumes a normal or stationary graph with potentially unobserved nodes.
Also allows particular time steps to be unobserved. By stationarity
that pattern of unobserved nodes is repeated into -infinity.

Latent projection operation for latents = nodes before t-tau_max or due to <->:
(i)  auxADMG contains (i, -taui) --> (j, -tauj) iff there is a directed path
(i, -taui) --> ... --> (j, -tauj) on which
every non-endpoint vertex is in hidden variables (= not in observed_vars)
here iff (i, -|taui-tauj|) --> j in graph
(ii) auxADMG contains (i, -taui) <-> (j, -tauj) iff there exists a path of the
form (i, -taui) <-- ... --> (j, -tauj) on
which every non-endpoint vertex is non-collider AND in L (=not in observed_vars)
here iff (i, -|taui-tauj|) <-> j OR there is path
(i, -taui) <-- nodes before t-tau_max --> (j, -tauj)
"""

# graph = self.graph

# if self.hidden_variables is None:
#     hidden_variables_here = []
# else:
hidden_variables_here = self.hidden_variables

aux_graph = np.zeros((self.N, self.N, self.tau_max + 1, self.tau_max + 1), dtype='<U3')
aux_graph[:] = ""
for (i, j) in itertools.product(range(self.N), range(self.N)):
for jt, tauj in enumerate(range(0, self.tau_max + 1)):
for it, taui in enumerate(range(0, self.tau_max + 1)):
tau = abs(taui - tauj)
if tau == 0 and j == i:
continue
if (i, -taui) in hidden_variables_here or (j, -tauj) in hidden_variables_here:
continue
# print("\n")
# print((i, -taui), (j, -tauj))

cond_i_xy = (
# tau <= graph_taumax
# and (graph[i, j, tau] == '-->' or graph[i, j, tau] == '+->')
#     )
# and
self._check_path( #graph=graph,
start=[(i, -taui)],
end=[(j, -tauj)],
conditions=None,
starts_with=['-*>', '+*>'],
ends_with=['-*>', '+*>'],
path_type='causal',
hidden_by_taumax=False,
hidden_variables=hidden_variables_here,
stationary_graph=stationary,
))
cond_i_yx = (
# tau <= graph_taumax
# and (graph[i, j, tau] == '<--' or graph[i, j, tau] == '<-+')
#     )
# and
self._check_path( #graph=graph,
start=[(j, -tauj)],
end=[(i, -taui)],
conditions=None,
starts_with=['-*>', '+*>'],
ends_with=['-*>', '+*>'],
path_type='causal',
hidden_by_taumax=False,
hidden_variables=hidden_variables_here,
stationary_graph=stationary,
))
if stationary:
hidden_by_taumax_here = True
else:
hidden_by_taumax_here = False

cond_ii = (
# tau <= graph_taumax
# and
(
#     graph[i, j, tau] == '<->'
# or graph[i, j, tau] == '+->' or graph[i, j, tau] == '<-+'))
self._check_path( #graph=graph,
start=[(i, -taui)],
end=[(j, -tauj)],
conditions=None,
starts_with=['<**', '+**'],
ends_with=['**>', '**+'],
path_type='any',
hidden_by_taumax=hidden_by_taumax_here,
hidden_variables=hidden_variables_here,
stationary_graph=stationary,
)))

if cond_i_xy and not cond_i_yx and not cond_ii:
aux_graph[i, j, taui, tauj] = "-->"  #graph[i, j, tau]
# if tau == 0:
aux_graph[j, i, tauj, taui] = "<--"  # graph[j, i, tau]
elif not cond_i_xy and cond_i_yx and not cond_ii:
aux_graph[i, j, taui, tauj] = "<--"  #graph[i, j, tau]
# if tau == 0:
aux_graph[j, i, tauj, taui] = "-->"  # graph[j, i, tau]
elif not cond_i_xy and not cond_i_yx and cond_ii:
aux_graph[i, j, taui, tauj] = '<->'
# if tau == 0:
aux_graph[j, i, tauj, taui] = '<->'
elif cond_i_xy and not cond_i_yx and cond_ii:
aux_graph[i, j, taui, tauj] = '+->'
# if tau == 0:
aux_graph[j, i, tauj, taui] = '<-+'
elif not cond_i_xy and cond_i_yx and cond_ii:
aux_graph[i, j, taui, tauj] = '<-+'
# if tau == 0:
aux_graph[j, i, tauj, taui] = '+->'
elif cond_i_xy and cond_i_yx:
raise ValueError("Cycle between %s and %s!" %(str(i, -taui), str(j, -tauj)))
# print(aux_graph[i, j, taui, tauj])

# print((i, -taui), (j, -tauj), cond_i_xy, cond_i_yx, cond_ii, aux_graph[i, j, taui, tauj], aux_graph[j, i, tauj, taui])

return aux_graph

def _check_path(self,
# graph,
start, end,
conditions=None,
starts_with=None,
ends_with=None,
path_type='any',
# causal_children=None,
stationary_graph=False,
hidden_by_taumax=False,
hidden_variables=None,
):
"""Check whether an open/active path between start and end given conditions exists.

Also allows to restrict start and end patterns and to consider causal/non-causal paths

hidden_by_taumax and hidden_variables are relevant for the latent projection operation.
"""

if conditions is None:
conditions = set([])
# if conditioned_variables is None:
#     S = []

start = set(start)
end = set(end)
conditions = set(conditions)

# Get maximal possible time lag of a connecting path
# See Thm. XXXX - TO BE REVISED!
XYZ = start.union(end).union(conditions)
if stationary_graph:
max_lag = 10*self.tau_max  # TO BE REVISED! self._get_maximum_possible_lag(XYZ, self.graph)
causal_children = list(self._get_mediators_stationary_graph(start, end, max_lag).union(end))
else:
max_lag = None
causal_children = list(self.get_mediators(start, end).union(end))

# if hidden_variables is None:
#     hidden_variables = set([])

if hidden_by_taumax:
if hidden_variables is None:
hidden_variables = set([])
hidden_variables = hidden_variables.union([(k, -tauk) for k in range(self.N)
for tauk in range(self.tau_max+1, max_lag + 1)])

# print("causal_children ", causal_children)

if starts_with is None:
starts_with = ['***']
elif type(starts_with) == str:
starts_with = [starts_with]

if ends_with is None:
ends_with = ['***']
elif type(ends_with) == str:
ends_with = [ends_with]
#
# Breadth-first search to find connection
#
# print("\nstart, starts_with, ends_with, end ", start, starts_with, ends_with, end)
# print("hidden_variables ", hidden_variables)
start_from = set()
for x in start:
if stationary_graph:
max_lag=max_lag, exclude=list(start))
else:

# if before_taumax and neighbor[1] >= -self.tau_max:
#     continue

if (hidden_variables is not None and neighbor not in end
and neighbor not in hidden_variables):
continue

if path_type == 'non_causal':
continue
elif path_type == 'causal':
continue

# print("start, end, start_from ", start, end, start_from)

visited = set()
for (varlag_i, link_ik, varlag_k) in start_from:

# Traversing through motifs i *-* k *-* j
while start_from:

# print("Continue ", start_from)
# for (link_ik, varlag_k) in start_from:
removables = []
for (varlag_i, link_ik, varlag_k) in start_from:

# print("varlag_k in end ", varlag_k in end, link_ik)
if varlag_k in end:
# print("Connected ", varlag_i, link_ik, varlag_k)
return True
else:

for removable in removables:
start_from.remove(removable)
if len(start_from)==0:
return False

# Get any neighbor from starting nodes

# print("Get k = ", link_ik, varlag_k)
# print("start_from ", start_from)
# print("visited    ", visited)

if stationary_graph:
max_lag=max_lag, exclude=list(start))
else:

# print ("visited ", (link_kj, varlag_j), visited)
# if (varlag_i, link_kj, varlag_j) in visited:
# print("in visited")
continue
# print("Not in visited")

if path_type == 'causal':
continue

# If motif  i *-* k *-* j is open,
# print(left_mark, right_mark)

if self.definite_status:
# Exclude paths that are not definite_status implying that any of the following
# motifs occurs:
# i *-> k o-* j
if (left_mark == '>' and right_mark == 'o'):
continue
# i *-o k <-* j
if (left_mark == 'o' and right_mark == '<'):
continue
# i *-o k o-* j and i and j are adjacent
if (left_mark == 'o' and right_mark == 'o'
and self._is_match(varlag_i, varlag_j, "***")):
continue

# If k is in conditions and motif is *-o k o-*, then motif is blocked since
# i and j are non-adjacent due to the check above
if varlag_k in conditions and (left_mark == 'o' and right_mark == 'o'):
continue  # [('>', '<'), ('>', '+'), ('+', '<'), ('+', '+')]

# If k is in conditions and left or right mark is tail '-', then motif is blocked
if varlag_k in conditions and (left_mark == '-' or right_mark == '-'):
continue  # [('>', '<'), ('>', '+'), ('+', '<'), ('+', '+')]

# If k is not in conditions and left and right mark are heads '><', then motif is blocked
if varlag_k not in conditions and (left_mark == '>' and right_mark == '<'):
continue  # [('>', '<'), ('>', '+'), ('+', '<'), ('+', '+')]

# if (before_taumax and varlag_j not in end
#     and varlag_j[1] >= -self.tau_max):
#     # print("before_taumax ", varlag_j)
#     continue

if (hidden_variables is not None and varlag_j not in end
and varlag_j not in hidden_variables):
continue

# Motif is open

# print("Separated")
return False

[docs]    def get_optimal_set(self,
alternative_conditions=None,
minimize=False,
return_separate_sets=False,
):

See Runge NeurIPS 2021.

Parameters
----------
alternative_conditions : set of tuples
Used only internally in optimality theorem. If None, self.S is used.
minimize : {False, True, 'colliders_only'}
Minimize optimal set. If True, minimize such that no subset
can be removed without making it invalid. If 'colliders_only',
only colliders are minimized.
return_separate_sets : bool
Whether to return tuple of parents, colliders, collider_parents, and S.

Returns
-------
Oset_S : False or list or tuple of lists
Returns optimal adjustment set if a valid set exists, otherwise False.
"""

# Needed for optimality theorem where Osets for alternative S are tested
if alternative_conditions is None:
S = self.S.copy()
vancs = self.vancs.copy()
else:
S = alternative_conditions
newancS = self._get_ancestors(S)
vancs = self.ancX.union(self.ancY).union(newancS) - self.forbidden_nodes

# vancs = self._get_ancestors(list(self.X.union(self.Y).union(S))) - self.forbidden_nodes

# descendants = self._get_descendants(self.Y.union(self.M))

# Sufficient condition for non-identifiability
if len(self.X.intersection(self.descendants)) > 0:
return False  # raise ValueError("Not identifiable: Overlap between X and des(M)")

##
## Construct O-set
##

parents = self._get_all_parents(self.Y.union(self.M)) # set([])

# Remove forbidden nodes
parents = parents - self.forbidden_nodes

# Construct valid collider path nodes
colliders = set([])
for w in self.Y.union(self.M):
j, tau = w
this_level = [w]
non_suitable_nodes = []
while len(this_level) > 0:
next_level = []
for varlag in this_level:
suitable_spouses = set(self._get_spouses(varlag)) - set(non_suitable_nodes)
for spouse in suitable_spouses:
i, tau = spouse
if spouse in self.X:
return False

if (# Node not already in set
spouse not in colliders  #.union(parents)
# not forbidden
and spouse not in self.forbidden_nodes
# in time bounds
and (-self.tau_max <= tau <= 0) # or self.ignore_time_bounds)
and (spouse in vancs
or not self._check_path(#graph=self.graph,
start=self.X, end=[spouse],
conditions=list(parents.union(vancs)) + list(S),
))
):
colliders = colliders.union(set([spouse]))
next_level.append(spouse)
else:
if spouse not in colliders:
non_suitable_nodes.append(spouse)

this_level = set(next_level) - set(non_suitable_nodes)

# Add parents and raise Error if not identifiable
collider_parents = self._get_all_parents(colliders)
if len(self.X.intersection(collider_parents)) > 0:
return False

colliders_and_their_parents = colliders.union(collider_parents)

# Add valid collider path nodes and their parents
Oset = parents.union(colliders_and_their_parents)

if minimize:
removable = []
# First remove all those that have no path from X
sorted_Oset =  Oset
if minimize == 'colliders_only':
sorted_Oset = [node for node in sorted_Oset if node not in parents]

for node in sorted_Oset:
if (not self._check_path(#graph=self.graph,
start=self.X, end=[node],
conditions=list(Oset - set([node])) + list(S))):
removable.append(node)

Oset = Oset - set(removable)
if minimize == 'colliders_only':
sorted_Oset = [node for node in Oset if node not in parents]

removable = []
# Next remove all those with no direct connection to Y
for node in sorted_Oset:
if (not self._check_path(#graph=self.graph,
start=[node], end=self.Y,
conditions=list(Oset - set([node])) + list(S) + list(self.X),
ends_with=['**>', '**+'])):
removable.append(node)

Oset = Oset - set(removable)

Oset_S = Oset.union(S)

if return_separate_sets:
return parents, colliders, collider_parents, S
else:
return list(Oset_S)

def _get_collider_paths_optimality(self, source_nodes, target_nodes,
condition,
inside_set=None,
):
"""Returns relevant collider paths to check optimality.

Iterates over collider paths within O-set via depth-first search

"""

for w in source_nodes:
# Only used to return *all* collider paths
# (needed in optimality theorem)

coll_path = []

queue = [(w, coll_path)]

non_valid_subsets = []

while queue:

varlag, coll_path = queue.pop()

coll_path = coll_path + [varlag]

suitable_nodes = set(self._get_spouses(varlag))

if start_with_tail_or_head and coll_path == [w]:
children = set(self._get_children(varlag))
suitable_nodes = suitable_nodes.union(children)

for node in suitable_nodes:
i, tau = node
if ((-self.tau_max <= tau <= 0) # or self.ignore_time_bounds)
and node not in coll_path):

if condition == 'II' and node not in target_nodes and node not in self.vancs:
continue

if node in inside_set:
if condition == 'I':
non_valid = False
for pathset in non_valid_subsets[::-1]:
if set(pathset).issubset(set(coll_path + [node])):
non_valid = True
break
if non_valid is False:
queue.append((node, coll_path))
else:
continue
elif condition == 'II':
queue.append((node, coll_path))

if node in target_nodes:
# yield coll_path
# collider_paths[node].append(coll_path)
if condition == 'I':
# Construct OÏ€iN
Sprime = self.S.union(coll_path)
OpiN = self.get_optimal_set(alternative_conditions=Sprime)
if OpiN is False:
queue = [(q_node, q_path) for (q_node, q_path) in queue if set(coll_path).issubset(set(q_path + [q_node])) is False]
non_valid_subsets.append(coll_path)
else:
return False

elif condition == 'II':
return True
# yield coll_path

if condition == 'I':
return True
elif condition == 'II':
return False
# return collider_paths

[docs]    def check_optimality(self):
"""Check whether optimal adjustment set exists according to Thm. 3 in Runge NeurIPS 2021.

Returns
-------
optimality : bool
Returns True if an optimal adjustment set exists, otherwise False.
"""

# Cond. 0: Exactly one valid adjustment set exists

#
# Cond. I
#
parents, colliders, collider_parents, _ = self.get_optimal_set(return_separate_sets=True)
Oset = parents.union(colliders).union(collider_parents)
n_nodes = self._get_all_spouses(self.Y.union(self.M).union(colliders)) - self.forbidden_nodes - Oset - self.S - self.Y - self.M - colliders

if (len(n_nodes) == 0):
# # (1) There are no spouses N âˆˆ sp(YMC) \ (forbOS)
cond_I = True
else:

# (2) For all N âˆˆ N and all its collider paths i it holds that
# OÏ€iN does not block all non-causal paths from X to Y
# cond_I = True
cond_I = self._get_collider_paths_optimality(
source_nodes=list(n_nodes), target_nodes=list(self.Y.union(self.M)),
condition='I',
)

#
# Cond. II
#
e_nodes = Oset.difference(parents)
cond_II = True
for E in e_nodes:
Oset_minusE = Oset.difference(set([E]))
if self._check_path(#graph=self.graph,
start=list(self.X), end=[E],
conditions=list(self.S) + list(Oset_minusE)):

cond_II = self._get_collider_paths_optimality(
target_nodes=self.Y.union(self.M),
source_nodes=list(set([E])),
condition='II',
inside_set=list(Oset.union(self.S)),

if cond_II is False:
if self.verbosity > 1:
print("Non-optimal due to E = ", E)
break

optimality = (cond_0 or (cond_I and cond_II))
if self.verbosity > 0:
print("Optimality = %s with cond_0 = %s, cond_I = %s, cond_II = %s"
%  (optimality, cond_0, cond_I, cond_II))
return optimality

def _check_validity(self, Z):
"""Checks whether Z is a valid adjustment set."""

# causal_children = list(self.M.union(self.Y))
backdoor_path = self._check_path(#graph=self.graph,
start=list(self.X), end=list(self.Y),
conditions=list(Z),
# causal_children=causal_children,
path_type = 'non_causal')

if backdoor_path:
return False
else:
return True

minimize=False,
):

See van der Zander, B.; LiÅ›kiewicz, M. & Textor, J.
Separators and adjustment sets in causal graphs: Complete
criteria and an algorithmic framework
Artificial Intelligence, Elsevier, 2019, 270, 1-40

"""

vancs = self.vancs.copy()

if minimize:
# Get removable nodes by computing minimal valid set from Z
if minimize == 'keep_parentsYM':
minimize_nodes = vancs - self._get_all_parents(list(self.Y.union(self.M)))

else:
minimize_nodes = vancs

# Zprime2 = Zprime
# First remove all nodes that have no unique path to X given Oset
for node in minimize_nodes:
# path = self.oracle.check_shortest_path(X=X, Y=[node],
#     Z=list(vancs - set([node])),
#     max_lag=None,
#     forbidden_nodes=None, #list(Zprime - set([node])),
#     return_path=False)
path = self._check_path(#graph=self.graph,
start=self.X, end=[node],
conditions=list(vancs - set([node])),
)

if path is False:
vancs = vancs - set([node])

if minimize == 'keep_parentsYM':
minimize_nodes = vancs - self._get_all_parents(list(self.Y.union(self.M)))
else:
minimize_nodes = vancs

# print(Zprime2)
# Next remove all nodes that have no unique path to Y given Oset_min
# Z = Zprime2
for node in minimize_nodes:

path = self._check_path(#graph=self.graph,
start=[node], end=self.Y,
conditions=list(vancs - set([node])) + list(self.X),
)

if path is False:
vancs = vancs - set([node])

if self._check_validity(list(vancs)) is False:
return False
else:
return list(vancs)

check_one_set_exists=False, yield_index=None):
"""Constructs all valid adjustment sets or just checks whether one exists.

See van der Zander, B.; LiÅ›kiewicz, M. & Textor, J.
Separators and adjustment sets in causal graphs: Complete
criteria and an algorithmic framework
Artificial Intelligence, Elsevier, 2019, 270, 1-40

"""

cond_set = set(self.S)
all_vars = [(i, -tau) for i in range(self.N)
for tau in range(0, self.tau_max + 1)]

all_vars_set = set(all_vars) - self.forbidden_nodes

def find_sep(I, R):
Rprime = R - self.X - self.Y
# TODO: anteriors and NOT ancestors where
# anteriors include --- links in causal paths
# print(I)
XYI = list(self.X.union(self.Y).union(I))
# print(XYI)
ancs = self._get_ancestors(list(XYI))
Z = ancs.intersection(Rprime)
if self._check_validity(Z) is False:
return False
else:
return Z

def list_sep(I, R):
# print(find_sep(X, Y, I, R))
if find_sep(I, R) is not False:
# print(I,R)
if I == R:
# print('--->', I)
yield I
else:
# Pick arbitrary node from R-I
RminusI = list(R - I)
# print(R, I, RminusI)
v = RminusI[0]
# print("here ", X, Y, I.union(set([v])), R)
yield from list_sep(I.union(set([v])), R)
yield from list_sep(I, R - set([v]))

# print("all ", X, Y, cond_set, all_vars_set)
all_sets = []
I = cond_set
R = all_vars_set
for index, valid_set in enumerate(list_sep(I, R)):
# print(valid_set)
all_sets.append(list(valid_set))
if check_one_set_exists and index > 0:
break

if yield_index is not None and index == yield_index:
return valid_set

if yield_index is not None:
return None

if check_one_set_exists:
if len(all_sets) == 1:
return True
else:
return False

return all_sets

def _get_causal_paths(self, source_nodes, target_nodes,
mediators=None,
mediated_through=None,
proper_paths=True,
):
"""Returns causal paths via depth-first search.

Allows to restrict paths through mediated_through.

"""

source_nodes = set(source_nodes)
target_nodes = set(target_nodes)

if mediators is None:
mediators = set()
else:
mediators = set(mediators)

if mediated_through is None:
mediated_through = []
mediated_through = set(mediated_through)

if proper_paths:
inside_set = mediators.union(target_nodes) - source_nodes
else:
inside_set = mediators.union(target_nodes).union(source_nodes)

all_causal_paths = {}
for w in source_nodes:
all_causal_paths[w] = {}
for z in target_nodes:
all_causal_paths[w][z] = []

for w in source_nodes:

causal_path = []
queue = [(w, causal_path)]

while queue:

varlag, causal_path = queue.pop()
causal_path = causal_path + [varlag]
suitable_nodes = set(self._get_children(varlag)
).intersection(inside_set)
for node in suitable_nodes:
i, tau = node
if ((-self.tau_max <= tau <= 0) # or self.ignore_time_bounds)
and node not in causal_path):

queue.append((node, causal_path))

if node in target_nodes:
if len(mediated_through) > 0 and len(set(causal_path).intersection(mediated_through)) == 0:
continue
else:
all_causal_paths[w][node].append(causal_path + [node])

return all_causal_paths

[docs]    def fit_total_effect(self,
dataframe,
estimator,
conditional_estimator=None,
data_transform=None,
ignore_identifiability=False,
):
"""Returns a fitted model for the total causal effect of X on Y
conditional on S.

Parameters
----------
dataframe : data object
Tigramite dataframe object. It must have the attributes dataframe.values
yielding a numpy array of shape (observations T, variables N) and
optionally a mask of the same shape and a missing values flag.
estimator : sklearn model object
For example, sklearn.linear_model.LinearRegression() for a linear
regression model.
adjustment_set : str or list of tuples
If 'optimal' the Oset is used, if 'minimized_optimal' the minimized Oset,
and if 'colliders_minimized_optimal', the colliders-minimized Oset.
If a list of tuples is passed, this set is used.
conditional_estimator : sklearn model object, optional (default: None)
Used to fit conditional causal effects in nested regression.
If None, the same model as for estimator is used.
data_transform : sklearn preprocessing object, optional (default: None)
Used to transform data prior to fitting. For example,
sklearn.preprocessing.StandardScaler for simple standardization. The
fitted parameters are stored.
Masking mode: Indicators for which variables in the dependence
measure I(X; Y | Z) the samples should be masked. If None, the mask
is not used. Explained in tutorial on masking and missing values.
ignore_identifiability : bool
Only applies to adjustment sets supplied by user. Ignores if that
set leads to a non-identifiable effect.
"""

if self.no_causal_path:
if self.verbosity > 0:
print("No causal path from X to Y exists.")
return self

self.dataframe = dataframe
self.conditional_estimator = conditional_estimator

# if self.dataframe.has_vector_data:
#     raise ValueError("vector_vars in DataFrame cannot be used together with CausalEffects!"
#                      " You can estimate vector-valued effects by using multivariate X, Y, S."
#                      " Note, however, that this requires assuming a graph at the level "
#                      "of the components of X, Y, S, ...")

if self.N != self.dataframe.N:
raise ValueError("Dataset dimensions inconsistent with number of variables in graph.")

# Check optimality and use either optimal or colliders_only set
else:
if ignore_identifiability is False and self._check_validity(adjustment_set) is False:
raise ValueError("Chosen adjustment_set is not valid.")

raise ValueError("Causal effect not identifiable via adjustment.")

# Fit model of Y on X and Z (and conditions)
# Build the model
self.model = Models(
dataframe=dataframe,
model=estimator,
conditional_model=conditional_estimator,
data_transform=data_transform,
verbosity=self.verbosity)

self.model.get_general_fitted_model(
conditions=self.listS,
tau_max=self.tau_max,
cut_off='tau_max',
return_data=False)

return self

[docs]    def predict_total_effect(self,
intervention_data,
conditions_data=None,
pred_params=None,
return_further_pred_results=False,
aggregation_func=np.mean,
transform_interventions_and_prediction=False,
):
"""Predict effect of intervention with fitted model.

Uses the model.predict() function of the sklearn model.

Parameters
----------
intervention_data : numpy array
Numpy array of shape (time, len(X)) that contains the do(X) values.
conditions_data : data object, optional
Numpy array of shape (time, len(S)) that contains the S=s values.
pred_params : dict, optional
Optional parameters passed on to sklearn prediction function.
return_further_pred_results : bool, optional (default: False)
In case the predictor class returns more than just the expected value,
the entire results can be returned.
aggregation_func : callable
Callable applied to output of 'predict'. Default is 'np.mean'.
transform_interventions_and_prediction : bool (default: False)
Whether to perform the inverse data_transform on prediction results.

Returns
-------
Results from prediction: an array of shape  (time, len(Y)).
If estimate_confidence = True, then a tuple is returned.
"""

def get_vectorized_length(W):
return sum([len(self.dataframe.vector_vars[w[0]]) for w in W])

# lenX = len(self.listX)
# lenS = len(self.listS)

lenX = get_vectorized_length(self.listX)
lenS = get_vectorized_length(self.listS)

if intervention_data.shape[1] != lenX:
raise ValueError("intervention_data.shape[1] must be len(X).")

if conditions_data is not None and lenS > 0:
if conditions_data.shape[1] != lenS:
raise ValueError("conditions_data.shape[1] must be len(S).")
if conditions_data.shape[0] != intervention_data.shape[0]:
raise ValueError("conditions_data.shape[0] must match intervention_data.shape[0].")
elif conditions_data is not None and lenS == 0:
raise ValueError("conditions_data specified, but S=None or empty.")
elif conditions_data is None and lenS > 0:
raise ValueError("S specified, but conditions_data is None.")

if self.no_causal_path:
if self.verbosity > 0:
print("No causal path from X to Y exists.")
return np.zeros((len(intervention_data), len(self.listY)))

effect = self.model.get_general_prediction(
intervention_data=intervention_data,
conditions_data=conditions_data,
pred_params=pred_params,
return_further_pred_results=return_further_pred_results,
transform_interventions_and_prediction=transform_interventions_and_prediction,
aggregation_func=aggregation_func,)

return effect

[docs]    def fit_wright_effect(self,
dataframe,
mediation=None,
method='parents',
data_transform=None,
):
"""Returns a fitted model for the total or mediated causal effect of X on Y
potentially through mediator variables.

Parameters
----------
dataframe : data object
Tigramite dataframe object. It must have the attributes dataframe.values
yielding a numpy array of shape (observations T, variables N) and
optionally a mask of the same shape and a missing values flag.
mediation : None, 'direct', or list of tuples
If None, total effect is estimated, if 'direct' then only the direct effect is estimated,
else only those causal paths are considerd that pass at least through one of these mediator nodes.
Method to use for estimating Wright's path coefficients. If 'optimal',
the Oset is used, if 'links_coeffs', the coefficients in links_coeffs are used,
if 'parents', the parents are used (only valid for DAGs).
Only used if method = 'links_coeffs'.
Dictionary of format: {0:[((i, -tau), coeff),...], 1:[...],
...} for all variables where i must be in [0..N-1] and tau >= 0 with
number of variables N. coeff must be a float.
data_transform : None
Not implemented for Wright estimator. Complicated for missing samples.
Masking mode: Indicators for which variables in the dependence
measure I(X; Y | Z) the samples should be masked. If None, the mask
is not used. Explained in tutorial on masking and missing values.
"""

if self.no_causal_path:
if self.verbosity > 0:
print("No causal path from X to Y exists.")
return self

if data_transform is not None:
raise ValueError("data_transform not implemented for Wright estimator."
" You can preprocess data yourself beforehand.")

import sklearn.linear_model

self.dataframe = dataframe
if self.dataframe.has_vector_data:
raise ValueError("vector_vars in DataFrame cannot be used together with Wright method!"
" You can either 1) estimate vector-valued effects by using multivariate (X, Y, S)"
" together with assuming a graph at the level of the components of (X, Y, S), "
" or 2) use vector_vars together with fit_total_effect and an estimator"
" that supports multiple outputs.")

estimator = sklearn.linear_model.LinearRegression()

# Fit model of Y on X and Z (and conditions)
# Build the model
self.model = Models(
dataframe=dataframe,
model=estimator,
data_transform=None, #data_transform,
verbosity=self.verbosity)

mediators = self.M  # self.get_mediators(start=self.X, end=self.Y)

if mediation == 'direct':
causal_paths = {}
for w in self.X:
causal_paths[w] = {}
for z in self.Y:
if w in self._get_parents(z):
causal_paths[w][z] = [[w, z]]
else:
causal_paths[w][z] = []
else:
causal_paths = self._get_causal_paths(source_nodes=self.X,
target_nodes=self.Y, mediators=mediators,
mediated_through=mediation, proper_paths=True)

coeffs = {}
max_lag = 0
for medy in [med for med in mediators] + [y for y in self.listY]:
coeffs[medy] = {}
j, tauj = medy
par, coeff, _ = par_coeff
i, taui = par
taui_shifted = taui + tauj
max_lag = max(abs(par[1]), max_lag)
coeffs[medy][(i, taui_shifted)] = coeff #self.fit_results[j][(j, 0)]['model'].coef_[ipar]

self.model.tau_max = max_lag
# print(coeffs)

elif method == 'optimal':
# all_parents = {}
coeffs = {}
for medy in [med for med in mediators] + [y for y in self.listY]:
coeffs[medy] = {}
mediator_parents = self._get_all_parents([medy]).intersection(mediators.union(self.X).union(self.Y)) - set([medy])
all_parents = self._get_all_parents([medy]) - set([medy])
for par in mediator_parents:
Sprime = set(all_parents) - set([par, medy])
causal_effects = CausalEffects(graph=self.graph,
X=[par], Y=[medy], S=Sprime,
graph_type=self.graph_type,
check_SM_overlap=False,
)
oset = causal_effects.get_optimal_set()
# print(medy, par, list(set(all_parents)), oset)
if oset is False:
raise ValueError("Not identifiable via Wright's method.")
fit_res = self.model.get_general_fitted_model(
Y=[medy], X=[par], Z=oset,
tau_max=self.tau_max,
cut_off='tau_max',
return_data=False)
coeffs[medy][par] = fit_res['model'].coef_[0]

elif method == 'parents':
coeffs = {}
for medy in [med for med in mediators] + [y for y in self.listY]:
coeffs[medy] = {}
# mediator_parents = self._get_all_parents([medy]).intersection(mediators.union(self.X)) - set([medy])
all_parents = self._get_all_parents([medy]) - set([medy])
if 'dag' not in self.graph_type:
spouses = self._get_all_spouses([medy]) - set([medy])
if len(spouses) != 0:
raise ValueError("method == 'parents' only possible for "

# print(j, all_parents[j])
# if len(all_parents[j]) > 0:
# print(medy, list(all_parents))
fit_res = self.model.get_general_fitted_model(
Y=[medy], X=list(all_parents), Z=[],
conditions=None,
tau_max=self.tau_max,
cut_off='tau_max',
return_data=False)

for ipar, par in enumerate(list(all_parents)):
# print(par, fit_res['model'].coef_)
coeffs[medy][par] = fit_res['model'].coef_[0][ipar]

else:
raise ValueError("method must be 'optimal', 'links_coeffs', or 'parents'.")

# Effect is sum over products over all path coefficients
# from x in X to y in Y
effect = {}
for (x, y) in itertools.product(self.listX, self.listY):
effect[(x, y)] = 0.
for causal_path in causal_paths[x][y]:
effect_here = 1.
# print(x, y, causal_path)
for index, node in enumerate(causal_path[:-1]):
i, taui = node
j, tauj = causal_path[index + 1]
# tau_ij = abs(tauj - taui)
# print((j, tauj), (i, taui))
effect_here *= coeffs[(j, tauj)][(i, taui)]

effect[(x, y)] += effect_here

# Make fitted coefficients available as attribute
self.coeffs = coeffs

# Modify and overwrite variables in self.model
self.model.Y = self.listY
self.model.X = self.listX
self.model.Z = []
self.model.conditions = []
self.model.cut_off = 'tau_max' # 'max_lag_or_tau_max'

class dummy_fit_class():
def __init__(self, y_here, listX_here, effect_here):
dim = len(listX_here)
self.coeff_array = np.array([effect_here[(x, y_here)] for x in listX_here]).reshape(dim, 1)
def predict(self, X):
return np.dot(X, self.coeff_array).squeeze()

fit_results = {}
for y in self.listY:
fit_results[y] = {}
fit_results[y]['model'] = dummy_fit_class(y, self.listX, effect)
fit_results[y]['data_transform'] = deepcopy(data_transform)

# self.effect = effect
self.model.fit_results = fit_results
return self

[docs]    def predict_wright_effect(self,
intervention_data,
pred_params=None,
):
"""Predict linear effect of intervention with fitted Wright-model.

Parameters
----------
intervention_data : numpy array
Numpy array of shape (time, len(X)) that contains the do(X) values.
pred_params : dict, optional
Optional parameters passed on to sklearn prediction function.

Returns
-------
Results from prediction: an array of shape  (time, len(Y)).
"""

lenX = len(self.listX)
lenY = len(self.listY)

if intervention_data.shape[1] != lenX:
raise ValueError("intervention_data.shape[1] must be len(X).")

if self.no_causal_path:
if self.verbosity > 0:
print("No causal path from X to Y exists.")
return np.zeros((len(intervention_data), len(self.Y)))

intervention_T, _ = intervention_data.shape

predicted_array = np.zeros((intervention_T, lenY))
pred_dict = {}
for iy, y in enumerate(self.listY):
# Print message
if self.verbosity > 1:
print("\n## Predicting target %s" % str(y))
if pred_params is not None:
for key in list(pred_params):
print("%s = %s" % (key, pred_params[key]))
# Default value for pred_params
if pred_params is None:
pred_params = {}
# Check this is a valid target
if y not in self.model.fit_results:
raise ValueError("y = %s not yet fitted" % str(y))

# data_transform is too complicated for Wright estimator
# Transform the data if needed
# fitted_data_transform = self.model.fit_results[y]['fitted_data_transform']
# if fitted_data_transform is not None:
#     intervention_data = fitted_data_transform['X'].transform(X=intervention_data)

# Now iterate through interventions (and potentially S)
for index, dox_vals in enumerate(intervention_data):
# Construct XZS-array
intervention_array = dox_vals.reshape(1, lenX)
predictor_array = intervention_array

predicted_vals = self.model.fit_results[y]['model'].predict(
X=predictor_array, **pred_params)
predicted_array[index, iy] = predicted_vals.mean()

# data_transform is too complicated for Wright estimator
# if fitted_data_transform is not None:
#     rescaled = fitted_data_transform['Y'].inverse_transform(X=predicted_array[index, iy].reshape(-1, 1))
#     predicted_array[index, iy] = rescaled.squeeze()

return predicted_array

[docs]    def fit_bootstrap_of(self, method, method_args,
boot_samples=100,
boot_blocklength=1,
seed=None):
"""Runs chosen method on bootstrap samples drawn from DataFrame.

Bootstraps for tau=0 are drawn from [max_lag, ..., T] and all lagged
variables constructed in DataFrame.construct_array are consistently
shifted with respect to this bootsrap sample to ensure that lagged
relations in the bootstrap sample are preserved.

This function fits the models, predict_bootstrap_of can then be used
to get confidence intervals for the effect of interventions.

Parameters
----------
method : str
Chosen method among valid functions in this class.
method_args : dict
Arguments passed to method.
boot_samples : int
Number of bootstrap samples to draw.
boot_blocklength : int, optional (default: 1)
Block length for block-bootstrap.
seed : int, optional(default = None)
Seed for RandomState (default_rng)
"""

# if dataframe.analysis_mode != 'single':
#     raise ValueError("CausalEffects class currently only supports single "
#                      "datasets.")

valid_methods = ['fit_total_effect',
'fit_wright_effect',
]

if method not in valid_methods:
raise ValueError("method must be one of %s" % str(valid_methods))

# First call the method on the original dataframe
# to make available adjustment set etc
getattr(self, method)(**method_args)

self.original_model = deepcopy(self.model)

if self.verbosity > 0:
print("\n##\n## Running Bootstrap of %s " % method +
"\n##\n" +
"\nboot_samples = %s \n" % boot_samples +
"\nboot_blocklength = %s \n" % boot_blocklength
)

method_args_bootstrap = deepcopy(method_args)
self.bootstrap_results = {}

for b in range(boot_samples):
# # Replace dataframe in method args by bootstrapped dataframe
# method_args_bootstrap['dataframe'].bootstrap = boot_draw
if seed is None:
random_state = np.random.default_rng(None)
else:
random_state = np.random.default_rng(seed*boot_samples + b)

method_args_bootstrap['dataframe'].bootstrap = {'boot_blocklength':boot_blocklength,
'random_state':random_state}

# Call method and save fitted model
getattr(self, method)(**method_args_bootstrap)
self.bootstrap_results[b] = deepcopy(self.model)

# Reset model
self.model = self.original_model

return self

[docs]    def predict_bootstrap_of(self, method, method_args,
conf_lev=0.9,
return_individual_bootstrap_results=False):
"""Predicts with fitted bootstraps.

To be used after fitting with fit_bootstrap_of. Only uses the
expected values of the predict function, not potential other output.

Parameters
----------
method : str
Chosen method among valid functions in this class.
method_args : dict
Arguments passed to method.
conf_lev : float, optional (default: 0.9)
Two-sided confidence interval.
return_individual_bootstrap_results : bool
Returns the individual bootstrap predictions.

Returns
-------
confidence_intervals : numpy array
"""

valid_methods = ['predict_total_effect',
'predict_wright_effect',
]

if method not in valid_methods:
raise ValueError("method must be one of %s" % str(valid_methods))

# def get_vectorized_length(W):
#     return sum([len(self.dataframe.vector_vars[w[0]]) for w in W])

lenX = len(self.listX)
lenS = len(self.listS)
lenY = len(self.listY)

intervention_T, _ = method_args['intervention_data'].shape

boot_samples = len(self.bootstrap_results)
# bootstrap_predicted_array = np.zeros((boot_samples, intervention_T, lenY))

for b in range(boot_samples): #self.bootstrap_results.keys():
self.model = self.bootstrap_results[b]
boot_effect = getattr(self, method)(**method_args)

if isinstance(boot_effect, tuple):
boot_effect = boot_effect[0]

if b == 0:
bootstrap_predicted_array = np.zeros((boot_samples, ) + boot_effect.shape,
dtype=boot_effect.dtype)
bootstrap_predicted_array[b] = boot_effect

# Reset model
self.model = self.original_model

# Confidence intervals for val_matrix; interval is two-sided
c_int = (1. - (1. - conf_lev)/2.)
confidence_interval = np.percentile(
bootstrap_predicted_array, axis=0,
q = [100*(1. - c_int), 100*c_int])   #[:,:,0]

if return_individual_bootstrap_results:
return bootstrap_predicted_array, confidence_interval

return confidence_interval

[docs]    @staticmethod
def get_dict_from_graph(graph, parents_only=False):
"""Helper function to convert graph to dictionary of links.

Parameters
---------
graph : array of shape (N, N, tau_max+1)
Matrix format of graph in string format.

parents_only : bool
Whether to only return parents ('-->' in graph)

Returns
-------
Dictionary of form {0:{(0, -1): o-o, ...}, 1:{...}, ...}.
"""
N = graph.shape[0]

links = dict([(j, {}) for j in range(N)])

if parents_only:
for (i, j, tau) in zip(*np.where(graph=='-->')):
else:
for (i, j, tau) in zip(*np.where(graph!='')):

[docs]    @staticmethod
"""Helper function to convert dictionary of links to graph array format.

Parameters
---------
Dictionary of form {0:[((0, -1), coeff, func), ...], 1:[...], ...}.
Also format {0:[(0, -1), ...], 1:[...], ...} is allowed.
tau_max : int or None
Maximum lag. If None, the maximum lag in links is used.

Returns
-------
graph : array of shape (N, N, tau_max+1)
Matrix format of graph with 1 for true links and 0 else.
"""

"""Helper function to retrieve tau_min and tau_max from links.
"""

# Get maximum time lag
min_lag = np.inf
max_lag = 0
for j in range(N):
if coeff != 0.:
min_lag = min(min_lag, abs(lag))
max_lag = max(max_lag, abs(lag))
else:
min_lag = min(min_lag, abs(lag))
max_lag = max(max_lag, abs(lag))

return min_lag, max_lag

# Get maximum time lag

# Set maximum lag
if tau_max is None:
tau_max = max_lag
else:
if max_lag > tau_max:
raise ValueError("tau_max is smaller than maximum lag = %d "
"found in links, use tau_max=None or larger "
"value" % max_lag)

graph = np.zeros((N, N, tau_max + 1), dtype='<U3')
if coeff != 0.:
graph[var, j, abs(lag)] = "-->"
if lag == 0:
graph[j, var, 0] = "<--"
else:
graph[var, j, abs(lag)] = "-->"
if lag == 0:
graph[j, var, 0] = "<--"

return graph

if __name__ == '__main__':

# Consider some toy data
import tigramite
import tigramite.toymodels.structural_causal_processes as toys
import tigramite.data_processing as pp
import tigramite.plotting as tp
from matplotlib import pyplot as plt
import sys

import sklearn
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPRegressor

# def lin_f(x): return x
# coeff = .5

# links_coeffs = {0: [((0, -1), 0.5, lin_f)],
#          1: [((1, -1), 0.5, lin_f), ((0, -1), 0.5, lin_f)],
#          2: [((2, -1), 0.5, lin_f), ((1, 0), 0.5, lin_f)]
#          }
# T = 1000
# data, nonstat = toys.structural_causal_process(
# dataframe = pp.DataFrame(data)

# original_graph = np.array([[['', ''],
#     ['-->', ''],
#     ['-->', ''],
#     ['', '']],

#    [['<--', ''],
#     ['', '-->'],
#     ['-->', ''],
#     ['-->', '']],

#    [['<--', ''],
#     ['<--', ''],
#     ['', '-->'],
#     ['-->', '']],

#    [['', ''],
#     ['<--', ''],
#     ['<--', ''],
#     ['', '-->']]], dtype='<U3')
# graph = np.copy(original_graph)

# # Add T <-> Reco and T
# graph[2,3,0] = '+->' ; graph[3,2,0] = '<-+'
# graph[1,3,1] = '<->' #; graph[2,1,0] = '<--'

# added = np.zeros((4, 4, 1), dtype='<U3')
# graph = np.append(graph, added , axis=2)

# X = [(1, 0)]
# Y = [(3, 0)]

# # # Initialize class as `stationary_dag`
#                             X=X, Y=Y, S=None,
#                             hidden_variables=None,
#                             verbosity=0)

# print(causal_effects.get_optimal_set())

# tp.plot_time_series_graph(
#     graph = graph,
#     save_name='Example_graph_in.pdf',
#     # special_nodes=special_nodes,
#     # var_names=var_names,
#     figsize=(6, 4),
#     )

# tp.plot_time_series_graph(
#     graph = causal_effects.graph,
#     save_name='Example_graph_out.pdf',
#     # special_nodes=special_nodes,
#     # var_names=var_names,
#     figsize=(6, 4),
#     )

# causal_effects.fit_wright_effect(dataframe=dataframe,
#                         # mediation = [(1, 0), (1, -1), (1, -2)]
#                         )

# intervention_data = 1.*np.ones((1, 1))
# y1 = causal_effects.predict_wright_effect(
#         intervention_data=intervention_data,
#         )

# intervention_data = 0.*np.ones((1, 1))
# y2 = causal_effects.predict_wright_effect(
#         intervention_data=intervention_data,
#         )

# beta = (y1 - y2)
# print("Causal effect is %.5f" %(beta))

# tp.plot_time_series_graph(
#     graph = causal_effects.graph,
#     save_name='Example_graph.pdf',
#     # special_nodes=special_nodes,
#     var_names=var_names,
#     figsize=(8, 4),
#     )

T = 10000
def lin_f(x): return x

auto_coeff = 0.
coeff = 2.

0: [((0, -1), auto_coeff, lin_f)],
1: [((1, -1), auto_coeff, lin_f)],
2: [((2, -1), auto_coeff, lin_f), ((0, 0), coeff, lin_f)],
3: [((3, -1), auto_coeff, lin_f)],
}
noises=None, seed=7)

# # Create some missing values
# data[-10:,:] = 999.
# var_names = range(2)

dataframe = pp.DataFrame(data,
vector_vars={0:[(0,0), (1,0)],
1:[(2,0), (3,0)]}
)

# # Construct expert knowledge graph from links here
1: [(1, -1), (0, 0)],
}
# # Use staticmethod to get graph
# graph = np.array([['', '-->'],
#                   ['<--', '']], dtype='<U3')

# # We are interested in lagged total effect of X on Y
X = [(0, 0), (0, -1)]
Y = [(1, 0), (1, -1)]

# # Initialize class as `stationary_dag`
causal_effects = CausalEffects(graph, graph_type='stationary_dag',
X=X, Y=Y, S=None,
hidden_variables=None,
verbosity=1)

# print(data)
# # Optimal adjustment set (is used by default)
# # print(causal_effects.get_optimal_set())

# # # Fit causal effect model from observational data
causal_effects.fit_total_effect(
dataframe=dataframe,
estimator=LinearRegression(),
)

# # Fit causal effect model from observational data
# causal_effects.fit_bootstrap_of(
#     method='fit_total_effect',
#     method_args={'dataframe':dataframe,
#     'estimator':LinearRegression()
#     },
#     boot_samples=3,
#     boot_blocklength=1,
#     seed=5
#     )

# Predict effect of interventions do(X=0.), ..., do(X=1.) in one go
lenX = 4 # len(dataframe.vector_vars[X[0][0]])
dox_vals = np.linspace(0., 1., 3)
intervention_data = np.tile(dox_vals.reshape(len(dox_vals), 1), lenX)

intervention_data = np.array([[1., 0., 0., 0.]])

print(intervention_data)

pred_Y = causal_effects.predict_total_effect(
intervention_data=intervention_data)
print(pred_Y, pred_Y.shape)

# # Predict effect of interventions do(X=0.), ..., do(X=1.) in one go
# # dox_vals = np.array([1.]) #np.linspace(0., 1., 1)
# intervention_data = np.tile(dox_vals.reshape(len(dox_vals), 1), len(X))
# conf = causal_effects.predict_bootstrap_of(
#     method='predict_total_effect',
#     method_args={'intervention_data':intervention_data})
# print(conf, conf.shape)

# # # Predict effect of interventions do(X=0.), ..., do(X=1.) in one go
# # dox_vals = np.array([1.]) #np.linspace(0., 1., 1)
# # intervention_data = dox_vals.reshape(len(dox_vals), len(X))
# # pred_Y = causal_effects.predict_total_effect(
# #         intervention_data=intervention_data)
# # print(pred_Y)

# # Fit causal effect model from observational data
# causal_effects.fit_wright_effect(
#     dataframe=dataframe,