import numpy as np
import torch
from captum.log import log_usage
from captum._utils.common import _format_inputs
from captum._utils.typing import (
TargetType,
TensorOrTupleOfTensorsGeneric,
)
from torch import Tensor
from typing import Any, Callable, Tuple, Union
from tint.utils import _validate_input
from .occlusion import Occlusion
[docs]class AugmentedOcclusion(Occlusion):
"""
Augmented Occlusion by sampling the baseline from a bootstrapped
distribution.
Instead of replacing occulted data by zero, this method samples data from
a distribution, which replace occulted data. The resulted occulted data
should be closer to the actual data as a result, limiting the amount of
out of distribution samples.
Args:
forward_func (callable): The forward function of the model or
any modification of it.
data (tuple, Tensor): The data from which the baselines are sampled.
The shape of the data must be the same as the inputs, except
on the first dimension.
n_sampling (int): Number of sampling to run for each occlusion.
Default: 1
is_temporal (bool): Whether the data is temporal or not.
If ``True``, the data will be ablated to the inputs
on the temporal dimension (dimension 1).
Default: False
References:
`What went wrong and when? Instance-wise Feature Importance for Time-series Models <https://arxiv.org/abs/2003.02821>`_
Examples:
>>> import torch as th
>>> from tint.attr import AugmentedOcclusion
>>> from tint.models import MLP
<BLANKLINE>
>>> inputs = th.rand(8, 7, 5)
>>> data = th.rand(32, 7, 5)
>>> mlp = MLP([5, 3, 1])
<BLANKLINE>
>>> explainer = AugmentedOcclusion(mlp, data)
>>> attr = explainer.attribute(inputs, (1, 1))
"""
def __init__(
self,
forward_func: Callable,
data: TensorOrTupleOfTensorsGeneric,
n_sampling: int = 1,
is_temporal: bool = False,
):
super().__init__(forward_func=forward_func)
self.data = _format_inputs(data)
self.n_sampling = n_sampling
self.is_temporal = is_temporal
assert (
isinstance(n_sampling, int) and n_sampling >= 1
), "N sampling must be an integer and at least 1."
[docs] @log_usage()
def attribute( # type: ignore
self,
inputs: TensorOrTupleOfTensorsGeneric,
sliding_window_shapes: Union[
Tuple[int, ...], Tuple[Tuple[int, ...], ...]
],
strides: Union[
None, int, Tuple[int, ...], Tuple[Union[int, Tuple[int, ...]], ...]
] = None,
target: TargetType = None,
additional_forward_args: Any = None,
perturbations_per_eval: int = 1,
attributions_fn: Callable = None,
show_progress: bool = False,
) -> TensorOrTupleOfTensorsGeneric:
"""
Args:
inputs (tensor or tuple of tensors): Input for which occlusion
attributions are computed. If forward_func takes a single
tensor as input, a single input tensor should be provided.
If forward_func takes multiple tensors as input, a tuple
of the input tensors should be provided. It is assumed
that for all given input tensors, dimension 0 corresponds
to the number of examples (aka batch size), and if
multiple input tensors are provided, the examples must
be aligned appropriately.
sliding_window_shapes (tuple or tuple of tuples): Shape of patch
(hyperrectangle) to occlude each input. For a single
input tensor, this must be a tuple of length equal to the
number of dimensions of the input tensor - 1, defining
the dimensions of the patch. If the input tensor is 1-d,
this should be an empty tuple. For multiple input tensors,
this must be a tuple containing one tuple for each input
tensor defining the dimensions of the patch for that
input tensor, as described for the single tensor case.
strides (int or tuple or tuple of ints or tuple of tuples, optional):
This defines the step by which the occlusion hyperrectangle
should be shifted by in each direction for each iteration.
For a single tensor input, this can be either a single
integer, which is used as the step size in each direction,
or a tuple of integers matching the number of dimensions
in the occlusion shape, defining the step size in the
corresponding dimension. For multiple tensor inputs, this
can be either a tuple of integers, one for each input
tensor (used for all dimensions of the corresponding
tensor), or a tuple of tuples, providing the stride per
dimension for each tensor.
To ensure that all inputs are covered by at least one
sliding window, the stride for any dimension must be
<= the corresponding sliding window dimension if the
sliding window dimension is less than the input
dimension.
If None is provided, a stride of 1 is used for each
dimension of each input tensor.
Default: None
target (int, tuple, tensor or list, optional): Output indices for
which difference is computed (for classification cases,
this is usually the target class).
If the network returns a scalar value per example,
no target index is necessary.
For general 2D outputs, targets can be either:
- a single integer or a tensor containing a single
integer, which is applied to all input examples
- a list of integers or a 1D tensor, with length matching
the number of examples in inputs (dim 0). Each integer
is applied as the target for the corresponding example.
For outputs with > 2 dimensions, targets can be either:
- A single tuple, which contains #output_dims - 1
elements. This target index is applied to all examples.
- A list of tuples with length equal to the number of
examples in inputs (dim 0), and each tuple containing
#output_dims - 1 elements. Each tuple is applied as the
target for the corresponding example.
Default: None
additional_forward_args (any, optional): If the forward function
requires additional arguments other than the inputs for
which attributions should not be computed, this argument
can be provided. It must be either a single additional
argument of a Tensor or arbitrary (non-tuple) type or a
tuple containing multiple additional arguments including
tensors or any arbitrary python types. These arguments
are provided to forward_func in order following the
arguments in inputs.
For a tensor, the first dimension of the tensor must
correspond to the number of examples. For all other types,
the given argument is used for all forward evaluations.
Note that attributions are not computed with respect
to these arguments.
Default: None
perturbations_per_eval (int, optional): Allows multiple occlusions
to be included in one batch (one call to forward_fn).
By default, perturbations_per_eval is 1, so each occlusion
is processed individually.
Each forward pass will contain a maximum of
perturbations_per_eval * #examples samples.
For DataParallel models, each batch is split among the
available devices, so evaluations on each available
device contain at most
(perturbations_per_eval * #examples) / num_devices
samples.
Default: 1
attributions_fn (Callable, optional): Applies a function to the
attributions before performing the weighted sum.
Default: None
show_progress (bool, optional): Displays the progress of
computation. It will try to use tqdm if available for
advanced features (e.g. time estimation). Otherwise, it
will fallback to a simple output of progress.
Default: False
Returns:
*tensor* or tuple of *tensors* of **attributions**:
- **attributions** (*tensor* or tuple of *tensors*):
The attributions with respect to each input feature.
Attributions will always be
the same size as the provided inputs, with each value
providing the attribution of the corresponding input index.
If a single tensor is provided as inputs, a single tensor
is returned. If a tuple is provided for inputs, a tuple of
corresponding sized tensors is returned.
"""
# Change input to tuple and check that its length is the same as data.
# Also check that each dimension between inputs and self.data matches
# except on the first one.
formatted_inputs = _format_inputs(inputs)
_validate_input(
inputs=formatted_inputs,
data=self.data,
is_temporal=self.is_temporal,
)
return super().attribute.__wrapped__(
self,
inputs=inputs,
sliding_window_shapes=sliding_window_shapes,
strides=strides,
# Baselines are used here to keep track of the input index
# The true baselines will be sampled from self.data
baselines=tuple(range(len(inputs))),
target=target,
additional_forward_args=additional_forward_args,
# We multiply perturbations_per_eval by the number of
# sampling to expand tensors along the first dim
perturbations_per_eval=perturbations_per_eval * self.n_sampling,
attributions_fn=attributions_fn,
show_progress=show_progress,
)
def _construct_ablated_input(
self,
expanded_input: Tensor,
input_mask: Union[None, Tensor],
baseline: Union[Tensor, int, float],
start_feature: int,
end_feature: int,
**kwargs: Any,
) -> Tuple[Tensor, Tensor]:
r"""
Ablates given expanded_input tensor with given feature mask, feature
range, and baselines, and any additional arguments.
expanded_input shape is (num_features, num_examples, ...)
with remaining dimensions corresponding to remaining original tensor
dimensions and num_features = end_feature - start_feature.
input_mask is None for occlusion, and the mask is constructed
using sliding_window_tensors, strides, and shift counts, which are
provided in kwargs. baseline is expected to
be broadcastable to match expanded_input.
This method returns the ablated input tensor, which has the same
dimensionality as expanded_input as well as the corresponding mask with
either the same dimensionality as expanded_input or second dimension
being 1. This mask contains 1s in locations which have been ablated
(and thus counted towards ablations for that feature) and 0s otherwise.
"""
input_mask = torch.stack(
[
self._occlusion_mask(
expanded_input,
j,
kwargs["sliding_window_tensors"],
kwargs["strides"],
kwargs["shift_counts"],
)
for j in range(start_feature, end_feature)
],
dim=0,
).long()
# We ablate data if temporal on the time dimension (dimension 1)
data = self.data[baseline]
if self.is_temporal:
time_shape = expanded_input.shape[2]
data = data[:, :time_shape, ...]
# We replace the original baseline with samples from a bootstrapped
# distribution over self.data.
# We query perturbations_per_eval x len(input) samples and reshape
# The baseline afterwards.
# The input baseline is used to get the index of the input.
size = expanded_input.shape[0] * expanded_input.shape[1]
baseline = torch.index_select(
data,
0,
torch.randint(high=len(data), size=(size,)).to(data.device),
)
baseline = baseline.reshape((-1,) + expanded_input.shape[1:])
ablated_tensor = (
expanded_input
* (
torch.ones(1, dtype=torch.long, device=expanded_input.device)
- input_mask
).to(expanded_input.dtype)
) + (baseline * input_mask.to(expanded_input.dtype))
return ablated_tensor, input_mask
def _get_feature_range_and_mask(
self, input: Tensor, input_mask: Tensor, **kwargs: Any
) -> Tuple[int, int, None]:
feature_max = int(np.prod(kwargs["shift_counts"]))
return 0, feature_max * self.n_sampling, None