import numpy as np
import os
import pickle as pkl
import torch as th
from .dataset import DataModule
try:
from statsmodels.tsa.arima_process import ArmaProcess
except ImportError:
ArmaProcess = None
file_dir = os.path.dirname(__file__)
[docs]class Arma(DataModule):
"""
Arma dataset.
Args:
times (int): Length of each time series. Default to 50
features (int): Number of features in each time series. Default to 50
ar (list): Coefficient for autoregressive lag polynomial, including
zero lag. If ``None``, use default values. Default to ``None``
ma (list): Coefficient for moving-average lag polynomial, including
zero lag. If ``None``, use default values. Default to ``None``
data_dir (str): Where to download files.
batch_size (int): Batch size. Default to 32
n_folds (int): Number of folds for cross validation. If ``None``,
the dataset is only split once between train and val using
``prop_val``. Default to ``None``
fold (int): Index of the fold to use with cross-validation.
Ignored if n_folds is None. Default to ``None``
prop_val (float): Proportion of validation. Default to .2
num_workers (int): Number of workers for the loaders. Default to 0
seed (int): For the random split. Default to 42
References:
#. `Explaining Time Series Predictions with Dynamic Masks <https://arxiv.org/abs/2106.05303>`_
#. https://www.statsmodels.org/dev/generated/statsmodels.tsa.arima_process.ArmaProcess.html
Examples:
>>> from tint.datasets import Arma
<BLANKLINE>
>>> arma = Arma()
>>> arma.download(split="train")
>>> x_train = arma.preprocess(split="train")["x"]
>>> y_train = arma.preprocess(split="train")["y"]
"""
def __init__(
self,
times: int = 50,
features: int = 50,
subset: int = 5,
ar: list = None,
ma: list = None,
data_dir: str = os.path.join(
os.path.split(file_dir)[0],
"data",
"arma",
),
batch_size: int = 32,
prop_val: float = 0.2,
n_folds: int = None,
fold: int = None,
num_workers: int = 0,
seed: int = 42,
):
super().__init__(
data_dir=data_dir,
batch_size=batch_size,
prop_val=prop_val,
n_folds=n_folds,
fold=fold,
num_workers=num_workers,
seed=seed,
)
self.times = times
self.features = features
self.subset = subset
self.ar = ar or np.array([2, 0.5, 0.2, 0.1])
self.ma = ma or np.array([2])
@staticmethod
def collate_fn(batch: list) -> (th.Tensor, th.Tensor):
return th.stack([b["x"] for b in batch]), None
def download(
self,
train_size: int = 1000,
test_size: int = 100,
split: str = "train",
):
assert (
ArmaProcess is not None
), "You must install statsmodels to generate arma data."
file = os.path.join(self.data_dir, f"{split}.npz")
if split == "train":
exp = train_size
elif split == "test":
exp = test_size
else:
raise NotImplementedError
# Generate data
data_arma = ArmaProcess(ar=self.ar, ma=self.ma).generate_sample(
nsample=(exp, self.times, self.features),
axis=1,
)
with open(file, "wb") as fp:
pkl.dump(obj=data_arma, file=fp)
def preprocess(self, split: str = "train") -> dict:
file = os.path.join(self.data_dir, f"{split}.npz")
# Load data
with open(file, "rb") as fp:
features = pkl.load(file=fp)
# There is no labels here
return {"x": th.Tensor(features)}
def true_saliency(self, split: str = "train", dim: int = 1) -> th.Tensor:
file = os.path.join(self.data_dir, f"{split}.npz")
# Load data
with open(file, "rb") as fp:
features = th.from_numpy(pkl.load(file=fp)).float()
outputs = th.zeros_like(features)
if dim == 1:
# Create a fixed permutation for each experiment
for i in range(len(features)):
perm = th.randperm(
self.features,
generator=th.Generator().manual_seed(self.seed),
)
outputs[
i,
int(self.times / 4) : int(3 * self.times / 4),
perm[: self.subset],
] = 1
elif dim == 2:
for i in range(len(features)):
t_rand = th.randint(
low=0,
high=self.times - self.subset,
size=(1,),
generator=th.Generator().manual_seed(self.seed),
)
outputs[
i,
t_rand : t_rand + self.subset,
int(self.features / 4) : int(3 * self.features / 4),
] = 1
else:
raise NotImplementedError("dim must be 1 or 2")
return outputs
[docs] @staticmethod
def get_white_box(
inputs: th.Tensor,
true_saliency: th.Tensor,
) -> th.Tensor:
"""
Create a white box regressor to be interpreted.
Args:
inputs (th.Tensor): The input data.
true_saliency (th.Tensor): The true saliency.
Returns:
th.Tensor: Output data.
"""
outputs = th.zeros(inputs.shape).to(inputs.device)
# Populate the features
outputs[true_saliency.bool()] = inputs[true_saliency.bool()]
outputs = (outputs**2).sum(dim=-1)
return outputs