import pickle as pkl
import numpy as np
import os
import torch as th
from torch.nn.utils.rnn import pad_sequence
from .dataset import DataModule
try:
from tick.hawkes import SimuHawkes, HawkesKernelExp
except ImportError:
SimuHawkes = None
HawkesKernelExp = None
file_dir = os.path.dirname(__file__)
[docs]class Hawkes(DataModule):
"""
Hawkes dataset.
Args:
mu (list): Intensity baselines. If ``None``, use default values.
Default to ``None``
alpha (list): Events parameters. If ``None``, use default values.
Default to ``None``
decay (list): Intensity decays. If ``None``, use default values.
Default to ``None``
window (int): The window of the simulated process. If ``None``, use
default value. Default to ``None``
data_dir (str): Where to download files.
batch_size (int): Batch size. Default to 32
prop_val (float): Proportion of validation. Default to .2
n_folds (int): Number of folds for cross validation. If ``None``,
the dataset is only split once between train and val using
``prop_val``. Default to ``None``
fold (int): Index of the fold to use with cross-validation.
Ignored if n_folds is None. Default to ``None``
num_workers (int): Number of workers for the loaders. Default to 0
seed (int): For the random split. Default to 42
References:
https://x-datainitiative.github.io/tick/modules/hawkes.html
Examples:
>>> from tint.datasets import Hawkes()
<BLANKLINE>
>>> hawkes = Hawkes()
>>> hawkes.download(split="train")
>>> x_train = hawkes.preprocess(split="train")["x"]
>>> y_train = hawkes.preprocess(split="train")["y"]
"""
def __init__(
self,
mu: list = None,
alpha: list = None,
decay: list = None,
window: int = None,
data_dir: str = os.path.join(
os.path.split(file_dir)[0],
"data",
"hawkes",
),
batch_size: int = 32,
prop_val: float = 0.2,
n_folds: int = None,
fold: int = None,
num_workers: int = 0,
seed: int = 42,
):
super().__init__(
data_dir=data_dir,
batch_size=batch_size,
prop_val=prop_val,
n_folds=n_folds,
fold=fold,
num_workers=num_workers,
seed=seed,
)
self.mu = mu or [0.05, 0.05]
self.alpha = alpha or [[0.5, 0.05], [0.02, 0.3]]
self.decay = decay or [[1.0, 0.0], [0.0, 1.0]]
self.window = window or 100
def download(
self,
train_size: int = 1000,
test_size: int = 100,
split: str = "train",
):
assert (
SimuHawkes is not None
), "You must install tick to generate hawkes data."
file = os.path.join(self.data_dir, f"{split}.npz")
if split == "train":
idx = range(train_size)
elif split == "test":
idx = range(train_size, train_size + test_size)
else:
raise NotImplementedError
points = [
self.generate_points(
mu=self.mu,
alpha=self.alpha,
decay=self.decay,
window=self.window,
seed=i,
)
for i in idx
]
with open(file, "wb") as fp:
pkl.dump(obj=points, file=fp)
def preprocess(self, split: str = "train") -> dict:
file = os.path.join(self.data_dir, f"{split}.npz")
# Load data
with open(file, "rb") as fp:
data = pkl.load(file=fp)
# Create features
features = pad_sequence(
[self.get_features(x) for x in data],
batch_first=True,
).unsqueeze(-1)
# Create labels
labels = pad_sequence(
[self.get_labels(x) for x in data],
batch_first=True,
)
return {"x": features.float(), "y": labels.long()}
[docs] @staticmethod
def generate_points(
mu: list,
alpha: list,
decay: list,
window: int,
seed: int,
dt: float = 0.01,
):
"""
Generates points of an marked Hawkes processes using the tick library.
Args:
mu (list): Hawkes baseline.
alpha (list): Event parameter.
decay (list): Decay parameter.
window (int): The window of the simulated process.
seed (int): The random seed.
dt (float): Granularity. Default to 0.01
"""
hawkes = SimuHawkes(
n_nodes=len(mu), end_time=window, verbose=False, seed=seed
)
for i in range(len(mu)):
for j in range(len(mu)):
hawkes.set_kernel(
i=i,
j=j,
kernel=HawkesKernelExp(
intensity=alpha[i][j], decay=decay[i][j]
),
)
hawkes.set_baseline(i, mu[i])
hawkes.track_intensity(dt)
hawkes.simulate()
return hawkes.timestamps
[docs] def true_saliency(self, split: str = "train"):
"""
Get process true saliency.
Args:
split (str): Data split. Default to ``'train'``
Returns:
th.Tensor: The true saliency.
"""
# Load data
data = self.preprocess(split=split)
times, labels = data["x"], data["y"]
# Compute true saliency for each process
# The query is times[1:], window
true_saliency = list()
for i in range(len(times)):
true_saliency_i = self.true_saliency_t(
t=th.cat(
[
times[i][times[i].nonzero(as_tuple=True)].squeeze(-1)[
1:
],
th.Tensor([self.window]),
]
),
times=times[i].unsqueeze(0),
labels=labels[i].unsqueeze(0),
)[0]
true_saliency.append(true_saliency_i)
return pad_sequence(true_saliency).transpose(0, 1)
[docs] def true_saliency_t(
self,
t: th.Tensor,
mu: th.Tensor = None,
alpha: th.Tensor = None,
decay: th.Tensor = None,
times: th.Tensor = None,
labels: th.Tensor = None,
split: str = "train",
):
"""
Compute the true saliency given some time queries.
B: Batch size.
T: Temporal dim.
N: Number of processes.
Q: Number of time queries.
Args:
t (th.Tensor): Time queries. Shape Q
mu (th.Tensor): Intensity baselines. Shape N, Values 0..1
alpha (th.Tensor): Events parameters. Shape N x N, Values 0..1
decay (th.Tensor): Intensity decays. Shape N x N, Values 0..1
times (th.Tensor): Times of the process. Shape B x T x 1
labels (th.Tensor: Labels of the process. Shape B x T x 1
split (str): Data split. Default to ``'train'``
Returns:
th.Tensor: true_saliency
"""
# Get params
mu = th.Tensor(self.mu) if mu is None else mu
alpha = th.Tensor(self.alpha) if alpha is None else alpha
decay = th.Tensor(self.decay) if decay is None else decay
# Get data
if times is None or labels is None:
data = self.preprocess(split=split)
times, labels = data["x"], data["y"]
# Compute influence of each element
# Reshape some data
t = t.unsqueeze(0).unsqueeze(0)
labels = (
labels.unsqueeze(-1).unsqueeze(-1).repeat(1, 1, t.shape[-1], 1)
)
# Get exp(-(t-ti))
diff = (times - t) * (times > 0) * (times < t)
exp = (th.exp(diff) * (times > 0) * (times < t)).float()
# Scatter to get exp(-(t-ti))_m for m marks
labelled_exp = th.zeros(exp.shape + (len(mu),)).scatter(
-1,
labels,
exp.unsqueeze(-1),
)
# Multiply by process params
true_saliency = th.matmul(labelled_exp, decay)
true_saliency = th.matmul(true_saliency, alpha)
# Gather to get the marked points
true_saliency = true_saliency.gather(-1, labels)
# Normalise saliency
true_saliency = true_saliency / true_saliency.sum(1, keepdim=True)
# We set eventual nans to zeros due to the division
# We transpose to get the temporal dim first after batch
return true_saliency.nan_to_num_().transpose(1, 2)
[docs] @staticmethod
def intensity(
mu: th.Tensor,
alpha: th.Tensor,
decay: th.Tensor,
times: th.Tensor,
labels: th.Tensor,
t: th.Tensor,
) -> th.Tensor:
"""
Given parameters mu, alpha and decay, some
times and labels, and a vector of query times t,
compute intensities at these time points.
B: Batch size.
T: Temporal dim.
N: Number of processes.
Q: Number of time queries.
Args:
mu (th.Tensor): Intensity baselines. Shape N, Values 0..1
alpha (th.Tensor): Events parameters. Shape N x N, Values 0..1
decay (th.Tensor): Intensity decays. Shape N x N, Values 0..1
times (th.Tensor): Times of the process. Shape B x T x 1
labels (th.Tensor: Labels of the process. Shape B x T x 1
t (th.Tensor): Query times. Shape Q
Returns:
th.Tensor: Intensities. Shape B x Q x N
"""
# Reshape some data
t = t.unsqueeze(0).unsqueeze(0)
labels = (
labels.unsqueeze(-1).unsqueeze(-1).repeat(1, 1, t.shape[-1], 1)
)
# Get exp(-(t-ti))
diff = (times - t) * (times > 0) * (times < t)
exp = (th.exp(diff) * (times > 0) * (times < t)).float()
# Scatter to get exp(-(t-ti))_m for m marks
labelled_exp = th.zeros(exp.shape + (len(mu),)).scatter(
-1,
labels,
exp.unsqueeze(-1),
)
# Get sum(decay * exp(-(t-ti))
sum_ = th.matmul(labelled_exp, decay).sum(1)
# Scale by params process
return th.matmul(sum_, alpha) + mu
[docs] @staticmethod
def get_features(point: list) -> th.Tensor:
"""
Create features and labels from a hawkes process.
Args:
point (list): A hawkes process.
"""
times = np.concatenate(point)
sort_idx = np.argsort(times)
times = times[sort_idx]
return th.from_numpy(times)
[docs] @staticmethod
def get_labels(point: list) -> th.Tensor:
"""
Create features and labels from a hawkes process.
Args:
point (list): A hawkes process.
"""
times = np.concatenate(point)
labels = np.concatenate([[i] * len(x) for i, x in enumerate(point)])
sort_idx = np.argsort(times)
labels = labels[sort_idx]
return th.from_numpy(labels)