import numpy as np
import torch
import typing
import warnings
from captum.attr._utils.approximation_methods import approximation_parameters
from captum.attr._utils.attribution import GradientAttribution
from captum.attr._utils.common import (
_format_input_baseline,
_reshape_and_sum,
_validate_input,
)
from captum.log import log_usage
from captum._utils.common import (
_expand_additional_forward_args,
_expand_target,
_format_additional_forward_args,
_format_inputs,
_format_output,
_is_tuple,
)
from captum._utils.typing import (
BaselineType,
TargetType,
TensorOrTupleOfTensorsGeneric,
)
from scipy import sparse
from sklearn.neighbors import NearestNeighbors
from torch import Tensor
from typing import Any, Callable, List, Tuple, Union
from tint.utils import astar_path, unsqueeze_like, _geodesic_batch_attribution
[docs]class GeodesicIntegratedGradients(GradientAttribution):
r"""
Geodesic Integrated Gradients.
This method uses K Nearest Neighbors on the input data to approximate the
geodesic path between inputs and reference baselines. The input space is
seen here as a Riemannian manifold, whose metric is the inner product of
the gradient of the model:
.. math::
<\nabla F(x)^T, \nabla F(x)>
Using this path reduces the risk of creating artifacts compared with
the original Integrated Gradients (IG). It also supports
``internal_batch_size`` for faster compute. The number of steps is also
set by default to 5, as less steps are required between two neighbors.
With this setting, and the number of neighbors set to 10, the number of
gradients to compute is 10 * 5 = 50, the same number as the original IG.
The shortest path is computed using Dijkstra or A* algorithms. This can
be computationally expensive for a number of inputs greater than a few
thousands.
Args:
forward_func (callable): The forward function of the model or any
modification of it.
nn (NearestNeighbors, tuple): Nearest neighbors method.
If not provided, will be created when calling __init__ or
attribute.
Default: None
data (Tensor, tuple): Data to fit the knn algorithm. If not provided,
the knn will be fitted when calling attribute using the provided
inputs data.
Default: None
n_neighbors (int, tuple): Number of neighbors to use by default.
Can be an integer (same for every inputs) or a tuple.
Default: None
multiply_by_inputs (bool, optional): Indicates whether to factor
model inputs' multiplier in the final attribution scores.
In the literature this is also known as local vs global
attribution. If inputs' multiplier isn't factored in,
then that type of attribution method is also called local
attribution. If it is, then that type of attribution
method is called global.
More detailed can be found here:
https://arxiv.org/abs/1711.06104
In case of integrated gradients, if `multiply_by_inputs`
is set to True, final sensitivity scores are being multiplied by
(inputs - baselines).
Examples:
>>> import torch as th
>>> from tint.attr import GeodesicIntegratedGradients
>>> from tint.models import MLP
<BLANKLINE>
>>> inputs = th.rand(50, 5)
>>> data = th.rand(100, 5)
>>> mlp = MLP([5, 3, 1])
<BLANKLINE>
>>> explainer = GeodesicIntegratedGradients(
... mlp, data=data, n_neighbors=10,
... )
>>> attr = explainer.attribute(inputs)
"""
def __init__(
self,
forward_func: Callable,
nn: Union[NearestNeighbors, Tuple[NearestNeighbors, ...]] = None,
data: TensorOrTupleOfTensorsGeneric = None,
n_neighbors: Union[int, Tuple[int]] = None,
multiply_by_inputs: bool = True,
**kwargs,
):
super().__init__(forward_func=forward_func)
self._multiply_by_inputs = multiply_by_inputs
self.n_neighbors = None
self.nn = None
self.data = None
# Register nn if provided
if nn is not None:
if not isinstance(nn, tuple):
nn = (nn,)
self.nn = nn
self.n_neighbors = tuple(nn_.n_neighbors for nn_ in self.nn)
# Fit NearestNeighbors if data is provided
if data is not None:
data = _format_inputs(data)
assert n_neighbors is not None, "You must provide n_neighbors"
if isinstance(n_neighbors, int):
n_neighbors = tuple(n_neighbors for _ in data)
self.n_neighbors = n_neighbors
self.nn = tuple(
NearestNeighbors(n_neighbors=n, **kwargs).fit(
X=x.reshape(len(x), -1).cpu()
)
for x, n in zip(data, n_neighbors)
)
self.data = data
n_components = tuple(
sparse.csgraph.connected_components(nn.kneighbors_graph())[0]
for nn in self.nn
)
if any(n > 1 for n in n_components):
warnings.warn(
"The knn graph is disconnected. You should increase n_neighbors."
)
# The following overloaded method signatures correspond to the case where
# return_convergence_delta is False, then only attributions are returned,
# and when return_convergence_delta is True, the return type is
# a tuple with both attributions and deltas.
@typing.overload
def attribute(
self,
inputs: TensorOrTupleOfTensorsGeneric,
baselines: BaselineType = None,
target: TargetType = None,
additional_forward_args: Any = None,
n_neighbors: Union[int, Tuple[int]] = None,
n_steps: int = 5,
n_steiner: int = None,
method: str = "gausslegendre",
internal_batch_size: Union[None, int] = None,
return_curvature: bool = False,
return_convergence_delta: bool = False,
distance: str = "geodesic",
show_progress: bool = False,
**kwargs,
) -> TensorOrTupleOfTensorsGeneric:
...
@typing.overload
def attribute(
self,
inputs: TensorOrTupleOfTensorsGeneric,
baselines: BaselineType = None,
target: TargetType = None,
additional_forward_args: Any = None,
n_neighbors: Union[int, Tuple[int]] = None,
n_steps: int = 5,
n_steiner: int = None,
method: str = "gausslegendre",
internal_batch_size: Union[None, int] = None,
*,
return_curvature: bool = False,
return_convergence_delta: bool,
distance: str = "geodesic",
show_progress: bool = False,
**kwargs,
) -> Tuple[TensorOrTupleOfTensorsGeneric, Tensor]:
...
[docs] @log_usage()
def attribute( # type: ignore
self,
inputs: TensorOrTupleOfTensorsGeneric,
baselines: BaselineType = None,
target: TargetType = None,
additional_forward_args: Any = None,
n_neighbors: Union[int, Tuple[int]] = None,
n_steps: int = 5,
n_steiner: int = None,
method: str = "gausslegendre",
internal_batch_size: Union[None, int] = None,
return_curvature: bool = False,
return_convergence_delta: bool = False,
distance: str = "geodesic",
show_progress: bool = False,
**kwargs,
) -> Union[
TensorOrTupleOfTensorsGeneric,
Tuple[TensorOrTupleOfTensorsGeneric, Tensor],
Tuple[TensorOrTupleOfTensorsGeneric, TensorOrTupleOfTensorsGeneric],
Tuple[
TensorOrTupleOfTensorsGeneric,
Tensor,
TensorOrTupleOfTensorsGeneric,
],
]:
r"""
This method attributes the output of the model with given target index
(in case it is provided, otherwise it assumes that output is a
scalar) to the inputs of the model using the approach described above.
In addition to that it also returns, if `return_convergence_delta` is
set to True, integral approximation delta based on the completeness
property of integrated gradients.
It also returns the curvature if `return_curvature` is set to True.
Args:
inputs (tensor or tuple of tensors): Input for which integrated
gradients are computed. If forward_func takes a single
tensor as input, a single input tensor should be provided.
If forward_func takes multiple tensors as input, a tuple
of the input tensors should be provided. It is assumed
that for all given input tensors, dimension 0 corresponds
to the number of examples, and if multiple input tensors
are provided, the examples must be aligned appropriately.
baselines (scalar, tensor, tuple of scalars or tensors, optional):
Baselines define the starting point from which integral
is computed and can be provided as:
- a single tensor, if inputs is a single tensor, with
exactly the same dimensions as inputs or the first
dimension is one and the remaining dimensions match
with inputs.
- a single scalar, if inputs is a single tensor, which will
be broadcasted for each input value in input tensor.
- a tuple of tensors or scalars, the baseline corresponding
to each tensor in the inputs' tuple can be:
- either a tensor with matching dimensions to
corresponding tensor in the inputs' tuple
or the first dimension is one and the remaining
dimensions match with the corresponding
input tensor.
- or a scalar, corresponding to a tensor in the
inputs' tuple. This scalar value is broadcasted
for corresponding input tensor.
In the cases when `baselines` is not provided, we internally
use zero scalar corresponding to each input tensor.
Default: None
target (int, tuple, tensor or list, optional): Output indices for
which gradients are computed (for classification cases,
this is usually the target class).
If the network returns a scalar value per example,
no target index is necessary.
For general 2D outputs, targets can be either:
- a single integer or a tensor containing a single
integer, which is applied to all input examples
- a list of integers or a 1D tensor, with length matching
the number of examples in inputs (dim 0). Each integer
is applied as the target for the corresponding example.
For outputs with > 2 dimensions, targets can be either:
- A single tuple, which contains #output_dims - 1
elements. This target index is applied to all examples.
- A list of tuples with length equal to the number of
examples in inputs (dim 0), and each tuple containing
#output_dims - 1 elements. Each tuple is applied as the
target for the corresponding example.
Default: None
additional_forward_args (any, optional): If the forward function
requires additional arguments other than the inputs for
which attributions should not be computed, this argument
can be provided. It must be either a single additional
argument of a Tensor or arbitrary (non-tuple) type or a
tuple containing multiple additional arguments including
tensors or any arbitrary python types. These arguments
are provided to forward_func in order following the
arguments in inputs.
For a tensor, the first dimension of the tensor must
correspond to the number of examples. It will be
repeated for each of `n_steps` along the integrated
path. For all other types, the given argument is used
for all forward evaluations.
Note that attributions are not computed with respect
to these arguments.
Default: None
n_neighbors (int, optional): Number of neighbors to use by default.
Must be provided if it has not been set in the init.
Can be an integer (same for every inputs) or a tuple.
Default: None
n_steps (int, optional): The number of steps used by the approximation
method. Default: 5.
n_steiner (int, optional): Add a certain number of steiner points
into the graph. These points are added following the fixed
scheme. For more information, please refer to the section 4.1.3
of https://arxiv.org/pdf/2007.10430.
Default: None
method (string, optional): Method for approximating the integral,
one of `riemann_right`, `riemann_left`, `riemann_middle`,
`riemann_trapezoid` or `gausslegendre`.
Default: `gausslegendre` if no method is provided.
internal_batch_size (int, optional): Divides total #steps * #examples
data points into chunks of size at most internal_batch_size,
which are computed (forward / backward passes)
sequentially. internal_batch_size must be at least equal to
#examples.
For DataParallel models, each batch is split among the
available devices, so evaluations on each available
device contain internal_batch_size / num_devices examples.
If internal_batch_size is None, then all evaluations are
processed in one batch.
Default: None
return_curvature (bool, optional): Indicates whether to return
the curvature or not. If `return_curvature`
is set to True curvature will be returned in a tuple following
attributions and optionally convergence delta.
Default: False
return_convergence_delta (bool, optional): Indicates whether to return
convergence delta or not. If `return_convergence_delta`
is set to True convergence delta will be returned in
a tuple following attributions.
Default: False
distance (str, optional): Which distance to use with the A*
algorithm:
- 'geodesic': the geodesic distance using the gradients norms.
- 'euclidean': using the plain euclidean distance between
points. This method amounts to the one described here:
https://genomebiology.biomedcentral.com/articles/10.1186/s13059-020-02055-7
Default: 'geodesic'
show_progress (bool, optional): Displays the progress of computation.
It will try to use tqdm if available for advanced features
(e.g. time estimation). Otherwise, it will fallback to
a simple output of progress.
Default: False
Returns:
**attributions** or tuple with **attributions**, **delta** (optional) or /and curvature (optional):
- **attributions** (*tensor* or tuple of *tensors*):
Integrated gradients with respect to each input feature.
attributions will always be the same size as the provided
inputs, with each value providing the attribution of the
corresponding input index.
If a single tensor is provided as inputs, a single tensor is
returned. If a tuple is provided for inputs, a tuple of
corresponding sized tensors is returned.
- **delta** (*tensor*, returned if return_convergence_delta=True):
The difference between the total approximated and true
integrated gradients. This is computed using the property
that the total sum of forward_func(inputs) -
forward_func(baselines) must equal the total sum of the
integrated gradient.
Delta is calculated per example, meaning that the number of
elements in returned delta tensor is equal to the number of
examples in inputs.
- **curvature** (*tensor*, returned if return_curvature=True):
The difference between the distance along the path computed
by the A* algorithm and the euclidean distance between
inputs and baselines. This value, always positive,
returns a measure of the curvature of the input space, with
the inner product of the gradient of the model:
.. math::
<\nabla F(x)^T, \nabla F(x)>
as a metric. A higher value indicates a higher curvature.
This value however depends on the path and is as such only
an indication of the true curvature of the input space.
"""
# Keeps track whether original input is a tuple or not before
# converting it into a tuple.
is_inputs_tuple = _is_tuple(inputs)
inputs, baselines = _format_input_baseline(inputs, baselines)
# If baseline is float or int, create a tensor
baselines = tuple(
torch.ones_like(input) * baseline
if isinstance(baseline, (int, float))
else baseline
for input, baseline in zip(inputs, baselines)
)
_validate_input(inputs, baselines, n_steps, method)
# If additional_forward_args has a tensor, assert inputs
# consists of one sample
if additional_forward_args is not None:
if any(isinstance(x, Tensor) for x in additional_forward_args):
assert (
len(inputs[0]) == 1
), "Only one sample must be passed when additional_forward_args has a tensor."
# Check distance
assert distance in [
"geodesic",
"euclidean",
], f"distance must be either 'geodesic' or 'euclidean', got {distance}"
# Fit NearestNeighbors if not provided
n_neighbors = n_neighbors or self.n_neighbors
assert n_neighbors is not None, "You must provide n_neighbors"
nn = self.nn
if nn is None:
if isinstance(n_neighbors, int):
n_neighbors = tuple(n_neighbors for _ in inputs)
nn = tuple(
NearestNeighbors(n_neighbors=n, **kwargs).fit(
X=x.reshape(len(x), -1).cpu()
)
for x, n in zip(inputs, n_neighbors)
)
n_components = tuple(
sparse.csgraph.connected_components(nn_.kneighbors_graph())[0]
for nn_ in nn
)
if any(n > 1 for n in n_components):
warnings.warn(
"The knn graph is disconnected. You should increase n_neighbors."
)
# Concat data, inputs and baselines
if self.data is None:
data = tuple(torch.cat([x, y]) for x, y in zip(inputs, baselines))
else:
data = tuple(
torch.cat([x, y, z])
for x, y, z in zip(self.data, inputs, baselines)
)
# Get knns
idx, knns, dists = self._get_knns(
nn=nn,
inputs=data,
n_neighbors=n_neighbors,
)
# If steiner is provided, augment inputs
if n_steiner is not None:
# Get number of points to add
max_dists = tuple(d.max() for d in dists)
n_points = tuple(
torch.div(d, m / n_steiner, rounding_mode="floor").long() + 1
for d, m in zip(dists, max_dists)
)
# Augment inputs
data = tuple(
torch.cat(
[x]
+ [
torch.stack(
[
x[id][i] + (k / n) * (x[knn][i] - x[id][i])
for k in range(1, n)
]
)
for i, n in enumerate(n_point)
if n > 1
],
dim=0,
)
for x, knn, id, n_point in zip(data, knns, idx, n_points)
)
# Get knns
knns, idx, _ = self._get_knns(
nn=nn,
inputs=data,
n_neighbors=n_neighbors,
)
# Compute grads for inputs and baselines
if internal_batch_size is not None:
grads_norm, total_grads = _geodesic_batch_attribution(
attr_method=self,
inputs=data,
idx=idx,
knns=knns,
internal_batch_size=internal_batch_size,
show_progress=show_progress,
target=target,
additional_forward_args=additional_forward_args,
n_steps=n_steps,
method=method,
)
else:
grads_norm, total_grads = self._attribute(
inputs=tuple(x[knn] for x, knn in zip(data, knns)),
baselines=tuple(x[id] for x, id in zip(data, idx)),
target=target,
additional_forward_args=additional_forward_args,
n_steps=n_steps,
method=method,
)
# Get ||xi - xj|| for all data if euclidean
if distance == "euclidean":
grads_norm = tuple(
torch.linalg.norm(
(x[knn] - x[id]).reshape(len(x[knn]), -1),
dim=1,
)
for x, knn, id in zip(data, knns, idx)
)
# Create undirected graph for the A* algorithm
graphs = tuple(dict() for _ in data)
for graph, id, knn, attr in zip(graphs, idx, knns, grads_norm):
for i, k, a in zip(id.tolist(), knn.tolist(), attr.tolist()):
graph[k] = list(set(graph.get(k, list()) + [(i, a)]))
graph[i] = list(set(graph.get(i, list()) + [(k, a)]))
# Def heuristic for A* algorithm: euclidean distance to target
def heuristic(u, v, d):
return torch.linalg.norm(d[u] - d[v]).item()
# Compute A* paths
if self.data is not None:
inputs_idx = tuple(
range(len(x), len(x) + len(y))
for x, y in zip(self.data, inputs)
)
baselines_idx = tuple(
range(len(x) + len(y), len(x) + 2 * len(y))
for x, y in zip(self.data, inputs)
)
else:
inputs_idx = tuple(range(len(x)) for x in inputs)
baselines_idx = tuple(range(len(x), 2 * len(x)) for x in inputs)
paths = tuple(
[
astar_path(graph, i, j, heuristic=None, d=d)
for i, j in zip(input_idx, baseline_idx)
]
for graph, input_idx, baseline_idx, d in zip(
graphs, inputs_idx, baselines_idx, data
)
)
# Get paths lengths
paths_len = tuple([len(x) - 1 for x in path] for path in paths)
# Make them pairwise
paths = tuple(
torch.cat([torch.Tensor(list(zip(x, x[1:]))).long() for x in path])
for path in paths
)
# Get grad indexes
grads_idx = tuple(
[
torch.where((id == i) * (knn == j) + (id == j) * (knn == i))[
0
][0].item()
for i, j in zip(path[:, 0], path[:, 1])
]
for id, knn, path in zip(idx, knns, paths)
)
# Get grads of each path
total_grads = tuple(
grad[grad_idx] for grad, grad_idx in zip(total_grads, grads_idx)
)
# Get and apply sign to make sure we use the correct input - baseline.
# This is ignored if the gradients are not multiplied by inputs.
if self.multiplies_by_inputs:
sign = tuple(
2 * (knn[grad_idx] == path[:, 0]) - 1
for knn, grad_idx, path in zip(knns, grads_idx, paths)
)
total_grads = tuple(
grad * unsqueeze_like(s.to(grad.device), grad)
for grad, s in zip(total_grads, sign)
)
# Split for each path
total_grads = tuple(
torch.split(grad, split_size_or_sections=path_len, dim=0)
for grad, path_len in zip(total_grads, paths_len)
)
# Sum over points and paths
# and stack result
total_grads = tuple(
tuple(x.sum(0) for x in grad) for grad in total_grads
)
total_grads = tuple(torch.stack(grad) for grad in total_grads)
# Optionally compute curvature
curvature = None
if return_curvature:
curvature = self.compute_curvature(
inputs=inputs,
baselines=baselines,
data=data,
knns=knns,
idx=idx,
grads_idx=grads_idx,
paths_len=paths_len,
)
if return_convergence_delta:
start_point, end_point = baselines, inputs
# computes approximation error based on the completeness axiom
delta = self.compute_convergence_delta(
total_grads,
start_point,
end_point,
additional_forward_args=additional_forward_args,
target=target,
)
if curvature is not None:
return (
_format_output(is_inputs_tuple, total_grads),
delta,
_format_output(is_inputs_tuple, curvature),
)
return _format_output(is_inputs_tuple, total_grads), delta
if curvature is not None:
return _format_output(
is_inputs_tuple, total_grads
), _format_output(is_inputs_tuple, curvature)
return _format_output(is_inputs_tuple, total_grads)
def _attribute(
self,
inputs: Tuple[Tensor, ...],
baselines: Tuple[Union[Tensor, int, float], ...],
target: TargetType = None,
additional_forward_args: Any = None,
n_steps: int = 50,
method: str = "gausslegendre",
step_sizes_and_alphas: Union[
None, Tuple[List[float], List[float]]
] = None,
) -> (Tuple[Tensor, ...], Tuple[Tensor, ...]):
if step_sizes_and_alphas is None:
# retrieve step size and scaling factor for specified
# approximation method
step_sizes_func, alphas_func = approximation_parameters(method)
step_sizes, alphas = step_sizes_func(n_steps), alphas_func(n_steps)
else:
step_sizes, alphas = step_sizes_and_alphas
# scale features and compute gradients. (batch size is abbreviated as bsz)
# scaled_features' dim -> (bsz * #steps x inputs[0].shape[1:], ...)
scaled_features_tpl = tuple(
torch.cat(
[baseline + alpha * (input - baseline) for alpha in alphas],
dim=0,
).requires_grad_()
for input, baseline in zip(inputs, baselines)
)
additional_forward_args = _format_additional_forward_args(
additional_forward_args
)
# apply number of steps to additional forward args
# currently, number of steps is applied only to additional forward arguments
# that are nd-tensors. It is assumed that the first dimension is
# the number of batches.
# dim -> (bsz * #steps x additional_forward_args[0].shape[1:], ...)
input_additional_args = (
_expand_additional_forward_args(additional_forward_args, n_steps)
if additional_forward_args is not None
else None
)
expanded_target = _expand_target(target, n_steps)
# grads: dim -> (bsz * #steps x inputs[0].shape[1:], ...)
grads = self.gradient_func(
forward_fn=self.forward_func,
inputs=scaled_features_tpl,
target_ind=expanded_target,
additional_forward_args=input_additional_args,
)
# flattening grads so that we can multiply it with step-size
# calling contiguous to avoid `memory whole` problems
scaled_grads = [
grad.contiguous().view(n_steps, -1)
* torch.tensor(step_sizes).view(n_steps, 1).to(grad.device)
for grad in grads
]
# Reshape scaled_grads
scaled_grads = tuple(
scaled_grad.reshape(
(n_steps, grad.shape[0] // n_steps) + grad.shape[1:]
)
for scaled_grad, grad in zip(scaled_grads, grads)
)
# Compute norm of grads
grads_norm = tuple(
torch.linalg.norm(
grad.reshape(grad.shape[:2] + (-1,)),
dim=2,
).sum(0)
for grad in scaled_grads
)
# Multiply by inputs - baselines
grads_norm = tuple(
grad_norm
* torch.linalg.norm(
(input - baseline).reshape(len(input), -1), dim=1
)
for grad_norm, input, baseline in zip(
grads_norm, inputs, baselines
)
)
# aggregates across all steps for each tensor in the input tuple
# total_grads has the same dimensionality as inputs
total_grads = tuple(
_reshape_and_sum(
scaled_grad, n_steps, grad.shape[0] // n_steps, grad.shape[1:]
)
for (scaled_grad, grad) in zip(scaled_grads, grads)
)
# Multiply by inputs - baselines if necessary
if self.multiplies_by_inputs:
total_grads = tuple(
total_grad * (input - baseline)
for total_grad, input, baseline in zip(
total_grads, inputs, baselines
)
)
return grads_norm, total_grads
@staticmethod
def _get_knns(
nn: Tuple[NearestNeighbors, ...],
inputs: Tuple[Tensor, ...],
n_neighbors: Tuple[int, ...],
) -> Tuple[Tuple[Tensor, ...], Tuple[Tensor, ...], Tuple[Tensor, ...]]:
"""
Get k nearest neighbors.
Args:
nn (tuple): A tuple of NN methods.
inputs (tuple): Input data.
n_neighbors (tuple): Number of neighbors for each method.
"""
# Get kneighbors_graph
graphs = tuple(
nn_.kneighbors_graph(
x.reshape(len(x), -1).detach().cpu(),
n_neighbors=n,
mode="distance",
)
for nn_, x, n in zip(nn, inputs, n_neighbors)
)
# Get graphs components
components = tuple(
sparse.csgraph.connected_components(graph[: graph.shape[1]])
for graph in graphs
)
# Get smallest norms and indexes if multiple components
add_dists = tuple()
add_idx = tuple()
add_knns = tuple()
for component, input in zip(components, inputs):
add_dist_list = list()
add_id_list = list()
add_knn_list = list()
for i in range(component[0]):
for j in range(component[0]):
if i < j:
input_i = input[: len(component[1])][component[1] == i]
input_j = input[: len(component[1])][component[1] == j]
norm = torch.linalg.norm(
(
torch.cat([input_i] * len(input_j), dim=0)
- input_j.repeat_interleave(
len(input_i), dim=0
)
).reshape(len(input_i) * len(input_j), -1),
dim=1,
)
dist = norm.min().item()
norm_idx = norm.argmin().item()
id = norm_idx % len(input_i)
id = np.where(component[1] == i)[0][id]
knn = norm_idx // len(input_i)
knn = np.where(component[1] == j)[0][knn]
add_dist_list.append(dist)
add_id_list.append(id)
add_knn_list.append(knn)
add_dists += (torch.Tensor(add_dist_list),)
add_idx += (torch.Tensor(add_id_list).long(),)
add_knns += (torch.Tensor(add_knn_list).long(),)
# Get dists
dists = tuple(torch.from_numpy(graph.data) for graph in graphs)
dists = tuple(
torch.cat([d, a], dim=0) if len(a) > 0 else d
for d, a in zip(dists, add_dists)
)
# Get idx and knns
idx = tuple(
torch.from_numpy(graph.tocoo().row).long() for graph in graphs
)
idx = tuple(
torch.cat([i, a], dim=0) if len(a) > 0 else i
for i, a in zip(idx, add_idx)
)
knns = tuple(
torch.from_numpy(graph.tocoo().col).long() for graph in graphs
)
knns = tuple(
torch.cat([k, a], dim=0) if len(a) > 0 else k
for k, a in zip(knns, add_knns)
)
return idx, knns, dists
[docs] @staticmethod
def compute_curvature(
inputs: TensorOrTupleOfTensorsGeneric,
baselines: Tuple[Union[Tensor, int, float], ...],
data: Tuple[Tensor, ...],
knns: Tuple[Tensor, ...],
idx: Tuple[Tensor, ...],
grads_idx: Tuple[List, ...],
paths_len: Tuple[List[int]],
) -> Tuple[Tensor, ...]:
"""
Compute the curvature of the input space, as the difference between
the euclidean distance along the path computed by the A* algorithm
and the euclidean distance between the inputs and baseline.
The curvature is always positive.
"""
# Compute euclidean distances for each neighbors
distances_tpl = tuple(
torch.linalg.norm(
(x[knn] - x[id]).reshape(len(x[knn]), -1),
dim=1,
)
for x, knn, id in zip(data, knns, idx)
)
# Get distance of each path
distances_tpl = tuple(
dist[grad_idx] for dist, grad_idx in zip(distances_tpl, grads_idx)
)
# Split for each path
distances_tpl = tuple(
torch.split(grad, split_size_or_sections=path_len, dim=0)
for grad, path_len in zip(distances_tpl, paths_len)
)
# Sum over points and paths
# and stack result
distances_tpl = tuple(
tuple(x.sum(0) for x in dist) for dist in distances_tpl
)
distances_tpl = tuple(torch.stack(dist) for dist in distances_tpl)
# Compute euclidean distance between inputs and baselines
euclidean_tpl = tuple(
torch.linalg.norm(
(input - baseline).reshape(len(input), -1),
dim=1,
)
for input, baseline in zip(inputs, baselines)
)
# The curvature is the diff between path-based distance and euclidean
# distance.
curvature = tuple(
dist - euclidean
for dist, euclidean in zip(distances_tpl, euclidean_tpl)
)
return curvature
[docs] def has_convergence_delta(self) -> bool:
return True
@property
def multiplies_by_inputs(self):
return self._multiply_by_inputs