Skip to content

Commit

Permalink
Merge pull request #55 from NorskRegnesentral/penalties
Browse files Browse the repository at this point in the history
[ENH] Penalty classes
  • Loading branch information
Tveten authored Jan 4, 2025
2 parents 498dac2 + 2c546b6 commit c5b6e05
Show file tree
Hide file tree
Showing 37 changed files with 1,861 additions and 825 deletions.
139 changes: 99 additions & 40 deletions skchange/anomaly_detectors/capa.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,92 @@
import pandas as pd

from skchange.anomaly_detectors.base import BaseSegmentAnomalyDetector
from skchange.anomaly_detectors.mvcapa import capa_penalty, run_base_capa
from skchange.anomaly_scores import BaseSaving, L2Saving, to_saving
from skchange.compose import PenalisedScore
from skchange.costs import BaseCost
from skchange.penalties import BasePenalty, ChiSquarePenalty, as_penalty
from skchange.utils.numba import njit
from skchange.utils.validation.data import check_data
from skchange.utils.validation.parameters import check_larger_than


@njit
def get_anomalies(
anomaly_starts: np.ndarray,
) -> tuple[list[tuple[int, int]], list[tuple[int, int]]]:
segment_anomalies = []
point_anomalies = []
i = anomaly_starts.size - 1
while i >= 0:
start_i = anomaly_starts[i]
size = i - start_i + 1
if size > 1:
segment_anomalies.append((int(start_i), i + 1))
i = int(start_i)
elif size == 1:
point_anomalies.append((i, i + 1))
i -= 1
return segment_anomalies, point_anomalies


def run_capa(
X: np.ndarray,
segment_saving: BaseSaving,
point_saving: BaseSaving,
segment_alpha: float,
point_alpha: float,
segment_penalty: BasePenalty,
point_penalty: BasePenalty,
min_segment_length: int,
max_segment_length: int,
) -> tuple[np.ndarray, list[tuple[int, int]], list[tuple[int, int]]]:
segment_betas = np.zeros(1)
point_betas = np.zeros(1)
segment_saving.fit(X)
point_saving.fit(X)
return run_base_capa(
segment_saving,
point_saving,
segment_alpha,
segment_betas,
point_alpha,
point_betas,
min_segment_length,
max_segment_length,
)
segment_penalised_saving = PenalisedScore(segment_saving, segment_penalty)
point_penalised_saving = PenalisedScore(point_saving, point_penalty)
segment_penalised_saving.fit(X)
point_penalised_saving.fit(X)

n = segment_penalised_saving._X.shape[0]
opt_savings = np.zeros(n + 1)
# Store the optimal start and affected components of an anomaly for each t.
# Used to get the final set of anomalies after the loop.
opt_anomaly_starts = np.repeat(np.nan, n)
starts = np.array([], dtype=int)

ts = np.arange(min_segment_length - 1, n)
for t in ts:
# Segment anomalies
t_array = np.array([t])

starts = np.concatenate((starts, t_array - min_segment_length + 1))
ends = np.repeat(t + 1, len(starts))
intervals = np.column_stack((starts, ends))
segment_savings = segment_penalised_saving.evaluate(intervals).flatten()
candidate_savings = opt_savings[starts] + segment_savings
candidate_argmax = np.argmax(candidate_savings)
opt_segment_saving = candidate_savings[candidate_argmax]
opt_start = starts[0] + candidate_argmax

# Point anomalies
point_interval = np.column_stack((t_array, t_array + 1))
point_savings = point_penalised_saving.evaluate(point_interval).flatten()
opt_point_saving = opt_savings[t] + point_savings[0]

# Combine and store results
savings = np.array([opt_savings[t], opt_segment_saving, opt_point_saving])
argmax = np.argmax(savings)
opt_savings[t + 1] = savings[argmax]
if argmax == 1:
opt_anomaly_starts[t] = opt_start
elif argmax == 2:
opt_anomaly_starts[t] = t

# Pruning the admissible starts
penalty_sum = segment_penalised_saving.penalty.values[-1]
saving_too_low = candidate_savings + penalty_sum <= opt_savings[t + 1]
too_long_segment = starts < t - max_segment_length + 2
prune = saving_too_low | too_long_segment
starts = starts[~prune]

segment_anomalies, point_anomalies = get_anomalies(opt_anomaly_starts)
return opt_savings[1:], segment_anomalies, point_anomalies


class CAPA(BaseSegmentAnomalyDetector):
Expand All @@ -63,10 +119,14 @@ class CAPA(BaseSegmentAnomalyDetector):
minimum size of 1 are permitted.
If a `BaseCost` is given, the saving function is constructed from the cost. The
cost must have a fixed parameter that represents the baseline cost.
segment_penalty_scale : float, optional, default=2.0
Scaling factor for the segment penalty.
point_penalty_scale : float, optional, default=2.0
Scaling factor for the point penalty.
segment_penalty : Union[BasePenalty, float], optional, default=`ChiSquarePenalty`
The penalty to use for segment anomaly detection. If a float is given, it is
interpreted as a constant penalty. If `None`, the default penalty is fit to the
input data to `fit`.
point_penalty : Union[BasePenalty, float], optional, default=`ChiSquarePenalty`
The penalty to use for point anomaly detection. If a float is given, it is
interpreted as a constant penalty. If `None`, the default penalty is fit to the
input data to `fit`.
min_segment_length : int, optional, default=2
Minimum length of a segment.
max_segment_length : int, optional, default=1000
Expand Down Expand Up @@ -116,16 +176,16 @@ def __init__(
self,
segment_saving: Optional[Union[BaseSaving, BaseCost]] = None,
point_saving: Optional[Union[BaseSaving, BaseCost]] = None,
segment_penalty_scale: float = 2.0,
point_penalty_scale: float = 2.0,
segment_penalty: Union[BasePenalty, float, None] = None,
point_penalty: Union[BasePenalty, float, None] = None,
min_segment_length: int = 2,
max_segment_length: int = 1000,
ignore_point_anomalies: bool = False,
):
self.segment_saving = segment_saving
self.point_saving = point_saving
self.segment_penalty_scale = segment_penalty_scale
self.point_penalty_scale = point_penalty_scale
self.segment_penalty = segment_penalty
self.point_penalty = point_penalty
self.min_segment_length = min_segment_length
self.max_segment_length = max_segment_length
self.ignore_point_anomalies = ignore_point_anomalies
Expand All @@ -135,26 +195,24 @@ def __init__(
self._segment_saving = to_saving(_segment_saving)

_point_saving = L2Saving() if point_saving is None else point_saving
if _point_saving.min_size is not None and _point_saving.min_size > 1:
if _point_saving.min_size != 1:
raise ValueError("Point saving must have a minimum size of 1.")
self._point_saving = to_saving(_point_saving)

check_larger_than(0, segment_penalty_scale, "segment_penalty_scale")
check_larger_than(0, point_penalty_scale, "point_penalty_scale")
self._segment_penalty = as_penalty(
self.segment_penalty,
default=ChiSquarePenalty(),
require_penalty_type="constant",
)
self._point_penalty = as_penalty(
self.point_penalty,
default=ChiSquarePenalty(),
require_penalty_type="constant",
)

check_larger_than(2, min_segment_length, "min_segment_length")
check_larger_than(min_segment_length, max_segment_length, "max_segment_length")

def _get_penalty_components(self, X: pd.DataFrame) -> tuple[np.ndarray, float]:
# TODO: Add penalty tuning.
# if self.tune:
# return self._tune_threshold(X)
n = X.shape[0]
p = X.shape[1]
n_params = self._segment_saving.get_param_size(p)
segment_penalty = capa_penalty(n, n_params, self.segment_penalty_scale)
point_penalty = self.point_penalty_scale * n_params * p * np.log(n)
return segment_penalty, point_penalty

def _fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None):
"""Fit to training data.
Expand Down Expand Up @@ -188,7 +246,8 @@ def _fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None):
min_length=self.min_segment_length,
min_length_name="min_segment_length",
)
self.segment_penalty_, self.point_penalty_ = self._get_penalty_components(X)
self.segment_penalty_ = self._segment_penalty.fit(X, self._segment_saving)
self.point_penalty_ = self._point_penalty.fit(X, self._point_saving)
return self

def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series:
Expand Down
87 changes: 16 additions & 71 deletions skchange/anomaly_detectors/circular_binseg.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from skchange.anomaly_scores import BaseLocalAnomalyScore, to_local_anomaly_score
from skchange.change_detectors.seeded_binseg import make_seeded_intervals
from skchange.costs import BaseCost, L2Cost
from skchange.penalties import BasePenalty, BICPenalty, as_penalty
from skchange.utils.numba import njit
from skchange.utils.validation.data import check_data
from skchange.utils.validation.parameters import check_in_interval, check_larger_than
Expand Down Expand Up @@ -119,15 +120,12 @@ class CircularBinarySegmentation(BaseSegmentAnomalyDetector):
anomaly_score : BaseLocalAnomalyScore or BaseCost, optional, default=L2Cost()
The local anomaly score to use for anomaly detection. If a cost is given, it is
converted to a local anomaly score using the `LocalAnomalyScore` class.
threshold_scale : float, default=2.0
Scaling factor for the threshold. The threshold is set to
``threshold_scale * 2 * p * np.sqrt(np.log(n))``, where ``n`` is the sample size
and ``p`` is the number of variables. If ``None``, the threshold is tuned on the
data input to `fit`.
level : float, default=0.01
If `threshold_scale` is ``None``, the threshold is set to the ``1-level``
quantile of the changepoint scores of all the seeded intervals on the training
data. For this to be correct, the training data must contain no changepoints.
penalty : Union[BasePenalty, float], optional, default=`BICPenalty`
The penalty to use for the changepoint detection. If a float is given, it is
interpreted as a constant penalty. If `None`, the penalty is set to a BIC
penalty with ``n=X.shape[0]`` and
``n_params=anomaly_score.get_param_size(X.shape[1])``, where ``X`` is the input
data to `fit`.
min_segment_length : int, default=5
Minimum length between two changepoints. Must be greater than or equal to 1.
max_interval_length : int, default=100
Expand Down Expand Up @@ -176,15 +174,13 @@ class CircularBinarySegmentation(BaseSegmentAnomalyDetector):
def __init__(
self,
anomaly_score: Optional[Union[BaseCost, BaseLocalAnomalyScore]] = None,
threshold_scale: Optional[float] = 2.0,
level: float = 1e-8,
penalty: Union[BasePenalty, float, None] = None,
min_segment_length: int = 5,
max_interval_length: int = 1000,
growth_factor: float = 1.5,
):
self.anomaly_score = anomaly_score
self.threshold_scale = threshold_scale # Just holds the input value.
self.level = level
self.penalty = penalty
self.min_segment_length = min_segment_length
self.max_interval_length = max_interval_length
self.growth_factor = growth_factor
Expand All @@ -193,8 +189,10 @@ def __init__(
_anomaly_score = L2Cost() if anomaly_score is None else anomaly_score
self._anomaly_score = to_local_anomaly_score(_anomaly_score)

check_larger_than(0.0, self.threshold_scale, "threshold_scale", allow_none=True)
check_in_interval(pd.Interval(0.0, 1.0, closed="neither"), self.level, "level")
self._penalty = as_penalty(
self.penalty, default=BICPenalty(), require_penalty_type="constant"
)

check_larger_than(1.0, self.min_segment_length, "min_segment_length")
check_larger_than(
2 * self.min_segment_length, self.max_interval_length, "max_interval_length"
Expand All @@ -205,59 +203,6 @@ def __init__(
"growth_factor",
)

def _tune_threshold(self, X: pd.DataFrame) -> float:
"""Tune the threshold.
The threshold is set to the ``1-level`` quantile of the changepoint scores
from all the seeded intervals on the training data `X`. For this to be
correct, the training data must contain no changepoints.
Parameters
----------
X : pd.DataFrame
Training data to tune the threshold on.
Returns
-------
threshold : float
The tuned threshold.
"""
_, scores, _, _, _ = run_circular_binseg(
X.values,
self._anomaly_score,
np.inf,
self.min_segment_length,
self.max_interval_length,
self.growth_factor,
)
return np.quantile(scores, 1 - self.level)

@staticmethod
def get_default_threshold(n: int, p: int, max_interval_length) -> float:
"""Get the default threshold for Circular Binary Segmentation.
Parameters
----------
n : int
Sample size.
p : int
Number of variables.
Returns
-------
threshold : float
The default threshold.
"""
return 2 * p * np.log(n * max_interval_length)

def _get_threshold(self, X: pd.DataFrame) -> float:
if self.threshold_scale is None:
return self._tune_threshold(X)
else:
return self.threshold_scale * self.get_default_threshold(
X.shape[0], X.shape[1], self.max_interval_length
)

def _fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None):
"""Fit to training data.
Expand Down Expand Up @@ -291,7 +236,7 @@ def _fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None):
min_length=2 * self.min_segment_length,
min_length_name="min_interval_length",
)
self.threshold_ = self._get_threshold(X)
self.penalty_ = self._penalty.fit(X, self._anomaly_score)
return self

def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series:
Expand All @@ -317,7 +262,7 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series:
anomalies, scores, maximizers, starts, ends = run_circular_binseg(
X.values,
self._anomaly_score,
self.threshold_,
self.penalty_.values[0],
self.min_segment_length,
self.max_interval_length,
self.growth_factor,
Expand Down Expand Up @@ -355,7 +300,7 @@ def get_test_params(cls, parameter_set="default"):
from skchange.costs import L2Cost, MultivariateGaussianCost

params = [
{"anomaly_score": L2Cost(), "threshold_scale": 5},
{"anomaly_score": L2Cost(), "penalty": 20},
{
"anomaly_score": L2Cost(),
"min_segment_length": 3,
Expand Down
Loading

0 comments on commit c5b6e05

Please sign in to comment.