diff --git a/NOTES.md b/NOTES.md index e6dfda2f..aef08432 100644 --- a/NOTES.md +++ b/NOTES.md @@ -141,7 +141,7 @@ using the same example data as for anomaly detection. ### Changepoints in univariate data or multivariate data without subset changes ```python -detector = ChangepointDetector().fit(x_univariate) +detector = ChangeDetector().fit(x_univariate) detector.predict(x_univariate) 0 0 1 1 @@ -152,7 +152,7 @@ dtype: int64 ``` ### Subset changepoints in multivariate data ```python -detector = SubsetChangepointDetector().fit(x_multivariate) +detector = SubsetChangeDetector().fit(x_multivariate) detector.predict(x_multivariate) index columns 0 0 [0] diff --git a/README.md b/README.md index 7ddd9a89..6d920bcd 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ from skchange.change_detectors.moscore import Moscore from skchange.datasets.generate import generate_teeth_data df = generate_teeth_data(n_segments=10, segment_length=50, mean=5, random_state=1) -detector = Moscore(bandwidth=10, fmt="sparse") +detector = Moscore(bandwidth=10) detector.fit_predict(df) >>> 0 49 @@ -51,7 +51,7 @@ df = generate_teeth_data( affected_proportion=0.2, random_state=2, ) -detector = Mvcapa(collective_penalty="sparse", fmt="sparse") +detector = Mvcapa(collective_penalty="sparse") detector.fit_predict(df) >>> start end components diff --git a/interactive/benchmark.py b/interactive/benchmark.py index ef057706..6f9ca4a0 100644 --- a/interactive/benchmark.py +++ b/interactive/benchmark.py @@ -1,20 +1,23 @@ +"""Benchmarking the computational efficiency of the detectors.""" + from timeit import timeit import numpy as np import pandas as pd import plotly.express as px -from skchange.anomaly_detectors.tests.test_anomaly_detectors import anomaly_detectors -from skchange.change_detectors.tests.test_change_detectors import change_detectors +from skchange.anomaly_detectors import ANOMALY_DETECTORS +from skchange.change_detectors import CHANGE_DETECTORS # TODO: Add all the different scores and costs. -detector_classes = anomaly_detectors + change_detectors +# TODO: Make sure hyperparameters are set such that comparisons are fair. +detector_classes = ANOMALY_DETECTORS + CHANGE_DETECTORS ns = [1000, 10000, 100000, 1000000] n_runs = [100, 10, 1, 1] timings = {} for detector_class in detector_classes: detector_name = detector_class.__name__ - detector = detector_class.create_test_instance().set_params(fmt="sparse") + detector = detector_class.create_test_instance() setup_data = pd.DataFrame(np.random.normal(0, 1, size=1000)) detector.fit_predict(setup_data) # Compile numba timings[detector_name] = [] diff --git a/interactive/explore_capa.py b/interactive/explore_capa.py index 3a7106b3..40fb41d7 100644 --- a/interactive/explore_capa.py +++ b/interactive/explore_capa.py @@ -1,3 +1,6 @@ +"""Interactive exploration of the Capa and Mvcapa anomaly detectors.""" + +import pandas as pd import plotly.express as px from skchange.anomaly_detectors.capa import Capa @@ -6,30 +9,44 @@ from skchange.utils.benchmarking.profiler import Profiler # Unviariate -df = generate_teeth_data(n_segments=5, mean=10, segment_length=10, p=1, random_state=2) -capa = Capa(fmt="sparse", max_segment_length=20) -anomalies = capa.fit_predict(df) +df = generate_teeth_data(n_segments=5, segment_length=10, mean=10, random_state=2)[0] +detector = Capa(max_segment_length=20) + +anomalies = detector.fit_predict(df) +print(anomalies) -capa = Capa(labels="score", fmt="dense", max_segment_length=20) -scores = capa.fit_predict(df) +anomaly_labels = detector.fit_transform(df) +px.scatter(x=df.index, y=df, color=anomaly_labels.astype(str)) -capa = Capa(labels="indicator", fmt="dense", max_segment_length=20) -anomalies = capa.fit_predict(df) -px.scatter(x=df.index, y=df.values[:, 0], color=anomalies) +scores = detector.score_transform(df) +px.scatter(scores) # Multivariate -# TODO: Add plotting functionality to assess the affected subset. df = generate_teeth_data(5, 10, p=10, mean=10, affected_proportion=0.2, random_state=2) -capa = Mvcapa(collective_penalty="sparse", fmt="sparse") -anomalies = capa.fit_predict(df) +detector = Mvcapa(collective_penalty="sparse") + +anomalies = detector.fit_predict(df) +print(anomalies) + +anomaly_labels = detector.fit_transform(df) +anomaly_labels = (anomaly_labels > 0).astype(int) +anomaly_labels[anomaly_labels == 0] = 0.1 +plot_df = pd.concat( + [ + df.melt(ignore_index=False).reset_index(), + anomaly_labels.melt(value_name="anomaly_label")["anomaly_label"], + ], + axis=1, +) +plot_df["variable"] = plot_df["variable"].astype(str) +px.scatter(plot_df, x="index", y="value", color="variable", size="anomaly_label") -capa = Mvcapa(labels="score", fmt="dense", max_segment_length=20) -scores = capa.fit_predict(df) +fig = px.line(df) +fig.add_scatter(anomaly_labels) +px.line(anomaly_labels) -capa = Mvcapa(collective_penalty_scale=5, labels="indicator", fmt="dense") -anomalies = capa.fit_predict(df) -df.plot(kind="line", backend="plotly") -anomalies.plot(kind="line", backend="plotly") +scores = detector.score_transform(df) +px.scatter(scores) # Profiling diff --git a/interactive/explore_circular_binseg.py b/interactive/explore_circular_binseg.py index c0b581c3..d9cbdbfb 100644 --- a/interactive/explore_circular_binseg.py +++ b/interactive/explore_circular_binseg.py @@ -1,3 +1,5 @@ +"""Interacive exploration of the Circular Binary Segmentation anomaly detector.""" + import plotly.express as px from skchange.anomaly_detectors.circular_binseg import ( @@ -12,9 +14,7 @@ score="mean", growth_factor=1.5, min_segment_length=10 ) anomalies = detector.fit_predict(df) - -df.plot(kind="line", backend="plotly") - +px.line(df) px.scatter(detector.scores, x="argmax_anomaly_start", y="score") # Test anomaly intervals diff --git a/interactive/explore_moscore.py b/interactive/explore_moscore.py index f9f33200..ec26d553 100644 --- a/interactive/explore_moscore.py +++ b/interactive/explore_moscore.py @@ -1,17 +1,20 @@ +"""Interactive exploration of the Moscore change detector.""" + import numpy as np import plotly.express as px from numba import njit -from skchange.change_detectors.moscore import Moscore, where +from skchange.change_detectors.moscore import Moscore from skchange.datasets.generate import add_linspace_outliers, generate_teeth_data -from skchange.scores.mean_score import init_mean_score, mean_score from skchange.utils.benchmarking.profiler import Profiler # Simple univariate example df = generate_teeth_data(n_segments=2, mean=10, segment_length=100, p=1, random_state=2) detector = Moscore() changepoints = detector.fit_predict(df) -px.scatter(detector.scores) +labels = detector.transform(df) +scores = detector.score_transform(df) +px.scatter(scores) # Profiling @@ -24,13 +27,6 @@ profiler.stop() -# Various unit tests -df = generate_teeth_data(n_segments=1, mean=10, segment_length=10, p=1) -precomputed_params = init_mean_score(df.values) -mean_score(precomputed_params, start=0, end=9, split=4) -where(np.array([True, True, True, False, False])) - - # Variance score df = generate_teeth_data( n_segments=2, variance=16, segment_length=100, p=1, random_state=1 @@ -44,6 +40,7 @@ # Custom score @njit def col_median(X: np.ndarray) -> np.ndarray: + """Compute the median of each column of X.""" m = X.shape[1] medians = np.zeros(m) for j in range(m): @@ -53,27 +50,28 @@ def col_median(X: np.ndarray) -> np.ndarray: @njit def init_spike_score(X: np.ndarray) -> np.ndarray: + """Initialize the spike score.""" return X -def spike_score_factory(margin: int = 0): - @njit - def spike_score( - precomputed_params: np.ndarray, start: int, end: int, split: int - ) -> float: - X = precomputed_params - interval_X = np.concatenate( - (X[start : split - margin], X[split + margin + 1 : end + 1]) - ) - baseline_median = col_median(interval_X) - return np.sum(np.abs(X[split] - baseline_median)) - - return spike_score +@njit +def spike_score( + precomputed_params: np.ndarray, + start: np.ndarray, + end: np.ndarray, + split: np.ndarray, +) -> float: + """Calculate the score for a spike at the split point.""" + X = precomputed_params + baseline_median = np.zeros((len(start), X.shape[1])) + for i, (s, e) in enumerate(zip(start, end)): + baseline_median[i] = col_median(X[s : e + 1]) + return np.sum(np.abs(X[split] - baseline_median), axis=1) df = generate_teeth_data(n_segments=1, mean=0, segment_length=100, p=1) df = add_linspace_outliers(df, n_outliers=4, outlier_size=10) -score = (spike_score_factory(margin=0), init_spike_score) +score = (spike_score, init_spike_score) detector = Moscore(score, bandwidth=5) anomalies = detector.fit_predict(df) px.scatter(detector.scores) diff --git a/interactive/explore_moscore_anomaly.py b/interactive/explore_moscore_anomaly.py index d874bb00..6ca957cf 100644 --- a/interactive/explore_moscore_anomaly.py +++ b/interactive/explore_moscore_anomaly.py @@ -1,3 +1,5 @@ +"""Interactive exploration of the MoscoreAnomaly detector.""" + import numpy as np import plotly.express as px @@ -20,17 +22,7 @@ left_bandwidth=50, ) anomalies = detector.fit_predict(df) - -detector = MoscoreAnomaly( - score="mean", - min_anomaly_length=10, - max_anomaly_length=1000, - left_bandwidth=20, - labels="score", -) -scores = detector.fit_predict(df) -scores["length"] = scores["anomaly_end"] - scores["anomaly_start"] + 1 -px.scatter(scores, x="anomaly_start", y="score", color="length") +print(anomalies) # Profiling diff --git a/interactive/explore_pelt.py b/interactive/explore_pelt.py index e2d03f29..505bdcc9 100644 --- a/interactive/explore_pelt.py +++ b/interactive/explore_pelt.py @@ -1,3 +1,5 @@ +"""Interactive exploration of the Pelt change detector.""" + import numpy as np from skchange.change_detectors.pelt import Pelt diff --git a/interactive/explore_seeded_binseg.py b/interactive/explore_seeded_binseg.py index 19d35858..29cb418e 100644 --- a/interactive/explore_seeded_binseg.py +++ b/interactive/explore_seeded_binseg.py @@ -1,3 +1,5 @@ +"""Interactive exploration of Seeded Binary Segmentation.""" + import plotly.express as px from skchange.change_detectors.seeded_binseg import SeededBinarySegmentation @@ -8,9 +10,8 @@ detector = SeededBinarySegmentation(score="mean", growth_factor=2) detector.fit_predict(df) -df.plot(kind="line", backend="plotly") - -px.scatter(detector.scores, x="maximizer", y="score", hover_data=["start", "end"]) +px.line(df) +px.scatter(detector.scores, x="argmax_cpt", y="score", hover_data=["start", "end"]) # Profiling diff --git a/interactive/explore_stat_threshold_anomaliser.py b/interactive/explore_stat_threshold_anomaliser.py index f1b7b094..74966bd9 100644 --- a/interactive/explore_stat_threshold_anomaliser.py +++ b/interactive/explore_stat_threshold_anomaliser.py @@ -1,3 +1,5 @@ +"""Interactive exploration of the StatThresholdAnomaliser.""" + import numpy as np from skchange.anomaly_detectors.anomalisers import StatThresholdAnomaliser @@ -16,3 +18,4 @@ change_detector, stat=np.mean, stat_lower=-1.0, stat_upper=1.0 ) anomalies = detector.fit_predict(df) +print(anomalies) diff --git a/pyproject.toml b/pyproject.toml index bfc24b29..71f105a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,10 +34,10 @@ classifiers = [ ] requires-python = ">=3.9,<3.13" dependencies = [ - "numpy<1.27,>=1.21", # required for framework layer and base class logic - "pandas<2.2.0,>=1.3", # pandas is the main in-memory data container + "numpy>=1.21", + "pandas>=1.1", "numba>=0.56", # numba is used for fast computation throughout - "sktime>=0.23.0,<0.30.0", + "sktime>=0.30", ] [project.urls] diff --git a/skchange/anomaly_detectors/__init__.py b/skchange/anomaly_detectors/__init__.py index e406f81f..04d55ac4 100644 --- a/skchange/anomaly_detectors/__init__.py +++ b/skchange/anomaly_detectors/__init__.py @@ -1 +1,24 @@ """Anomaly detection algorithms.""" + +from skchange.anomaly_detectors.anomalisers import StatThresholdAnomaliser +from skchange.anomaly_detectors.base import ( + CollectiveAnomalyDetector, + PointAnomalyDetector, +) +from skchange.anomaly_detectors.capa import Capa +from skchange.anomaly_detectors.circular_binseg import CircularBinarySegmentation +from skchange.anomaly_detectors.moscore_anomaly import MoscoreAnomaly +from skchange.anomaly_detectors.mvcapa import Mvcapa + +BASE_ANOMALY_DETECTORS = [CollectiveAnomalyDetector, PointAnomalyDetector] +COLLECTIVE_ANOMALY_DETECTORS = [ + Capa, + CircularBinarySegmentation, + MoscoreAnomaly, + Mvcapa, + StatThresholdAnomaliser, +] +POINT_ANOMALY_DETECTORS = [] +ANOMALY_DETECTORS = COLLECTIVE_ANOMALY_DETECTORS + POINT_ANOMALY_DETECTORS + +__all__ = BASE_ANOMALY_DETECTORS + ANOMALY_DETECTORS diff --git a/skchange/anomaly_detectors/anomalisers.py b/skchange/anomaly_detectors/anomalisers.py index 7500fe43..b9ce8bb8 100644 --- a/skchange/anomaly_detectors/anomalisers.py +++ b/skchange/anomaly_detectors/anomalisers.py @@ -4,17 +4,17 @@ import numpy as np import pandas as pd -from sktime.annotation.base import BaseSeriesAnnotator -from skchange.anomaly_detectors.utils import format_anomaly_output +from skchange.anomaly_detectors.base import CollectiveAnomalyDetector +from skchange.change_detectors.base import ChangeDetector -class StatThresholdAnomaliser(BaseSeriesAnnotator): +class StatThresholdAnomaliser(CollectiveAnomalyDetector): """Anomaly detection based on thresholding the values of segment statistics. Parameters ---------- - change_detector : BaseSeriesAnnotator + change_detector : ChangeDetector Change detector to use for detecting segments. stat : callable, optional (default=np.mean) Statistic to calculate per segment. @@ -32,32 +32,30 @@ class StatThresholdAnomaliser(BaseSeriesAnnotator): def __init__( self, - change_detector: BaseSeriesAnnotator, + change_detector: ChangeDetector, stat: Callable = np.mean, stat_lower: float = -1.0, stat_upper: float = 1.0, - fmt: str = "sparse", - labels: str = "indicator", ): self.change_detector = change_detector self.stat = stat self.stat_lower = stat_lower self.stat_upper = stat_upper - super().__init__(fmt=fmt, labels=labels) + super().__init__() if self.stat_lower > self.stat_upper: message = f"stat_lower ({self.stat_lower}) must be less" +f" than or equal to stat_upper ({self.stat_upper})." raise ValueError(message) - def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): + def _fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None): """Fits the change detector to training data. Parameters ---------- X : pd.DataFrame training data to fit the threshold to. - Y : pd.Series, optional + y : pd.Series, optional Does nothing. Only here to make the fit method compatible with sktime and scikit-learn. @@ -65,7 +63,8 @@ def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): ------- self : returns a reference to self """ - self.change_detector.fit(X, Y) + self.change_detector_ = self.change_detector.clone() + self.change_detector_.fit(X, y) return self def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: @@ -77,28 +76,19 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: Returns ------- - Y : pd.Series - annotations for sequence X + y : pd.Series - annotations for sequence X exact format depends on annotation type """ # This is the required output format for the rest of the code to work. - self.change_detector.fmt = "dense" - self.change_detector.labels = "int_label" - self.segments = self.change_detector.predict(X) - - df = pd.concat([X, self.segments], axis=1) - self.anomalies = [] - for _, segment in df.reset_index(drop=True).groupby("int_label"): + segments = self.change_detector_.transform(X) + df = pd.concat([X, segments], axis=1) + anomalies = [] + for _, segment in df.reset_index(drop=True).groupby("segment_label"): segment_stat = self.stat(segment.iloc[:, 0].values) if (segment_stat < self.stat_lower) | (segment_stat > self.stat_upper): - self.anomalies.append((int(segment.index[0]), int(segment.index[-1]))) + anomalies.append((int(segment.index[0]), int(segment.index[-1]))) - return format_anomaly_output( - self.fmt, - self.labels, - X.index, - self.anomalies, - scores=self.change_detector.scores, - ) + return CollectiveAnomalyDetector._format_sparse_output(anomalies) @classmethod def get_test_params(cls, parameter_set="default"): @@ -123,10 +113,16 @@ def get_test_params(cls, parameter_set="default"): params = [ { - "change_detector": Moscore(bandwidth=10), + "change_detector": Moscore(bandwidth=3), "stat": np.mean, "stat_lower": -1.0, "stat_upper": 1.0, }, + { + "change_detector": Moscore(bandwidth=5), + "stat": np.median, + "stat_lower": -2.0, + "stat_upper": 2.0, + }, ] return params diff --git a/skchange/anomaly_detectors/base.py b/skchange/anomaly_detectors/base.py new file mode 100644 index 00000000..fd4ce347 --- /dev/null +++ b/skchange/anomaly_detectors/base.py @@ -0,0 +1,306 @@ +"""Base classes for anomaly detectors.""" + +import numpy as np +import pandas as pd + +from skchange.base import BaseDetector + + +class PointAnomalyDetector(BaseDetector): + """Base class for anomaly detectors. + + Anomaly detectors detect individual data points that are considered anomalous. + + Output format of the predict method: See the dense_to_sparse method. + Output format of the transform method: See the sparse_to_dense method. + + Needs to be implemented: + - _fit(self, X, y=None) -> self + - _predict(self, X) -> pd.Series + + Optional to implement: + - _score_transform(self, X) -> pd.Series + - _update(self, X, y=None) -> self + """ + + @staticmethod + def sparse_to_dense( + y_sparse: pd.Series, index: pd.Index, columns: pd.Index = None + ) -> pd.Series: + """Convert the sparse output from the predict method to a dense format. + + Parameters + ---------- + y_sparse : pd.Series + The sparse output from an anomaly detector's predict method. + index : array-like + Indices that are to be annotated according to ``y_sparse``. + columns: array-like + Not used. Only for API compatibility. + + Returns + ------- + pd.Series where 0-entries are normal and 1-entries are anomalous. + """ + y_dense = pd.Series(0, index=index, name="anomaly_label", dtype="int64") + y_dense.iloc[y_sparse.values] = 1 + return y_dense + + @staticmethod + def dense_to_sparse(y_dense: pd.Series) -> pd.Series: + """Convert the dense output from the transform method to a sparse format. + + Parameters + ---------- + y_dense : pd.Series + The dense output from an anomaly detector's transform method. + 0-entries are normal and >0-entries are anomalous. + + Returns + ------- + pd.Series of the integer locations of the anomalous data points. + """ + # The sparse format only uses integer positions, so we reset the index. + y_dense = y_dense.reset_index(drop=True) + + anomalies = y_dense.iloc[y_dense.values > 0].index + return PointAnomalyDetector._format_sparse_output(anomalies) + + @staticmethod + def _format_sparse_output(anomalies) -> pd.Series: + """Format the sparse output of anomaly detectors. + + Can be reused by subclasses to format the output of the _predict method. + """ + return pd.Series(anomalies, name="anomaly", dtype="int64") + + +class CollectiveAnomalyDetector(BaseDetector): + """Base class for collective anomaly detectors. + + Collective anomaly detectors detect segments of data points that are considered + anomalous. + + Output format of the predict method: See the dense_to_sparse method. + Output format of the transform method: See the sparse_to_dense method. + + Needs to be implemented: + - _fit(self, X, y=None) -> self + - _predict(self, X) -> pd.Series + + Optional to implement: + - _score_transform(self, X) -> pd.Series + - _update(self, X, y=None) -> self + """ + + @staticmethod + def sparse_to_dense( + y_sparse: pd.Series, index: pd.Index, columns: pd.Index = None + ) -> pd.Series: + """Convert the sparse output from the predict method to a dense format. + + Parameters + ---------- + y_sparse : pd.Series[pd.Interval] + The collective anomaly intervals. + index : array-like + Indices that are to be annotated according to ``y_sparse``. + columns: array-like + Not used. Only for API compatibility. + + Returns + ------- + pd.Series where 0-entries are normal and each collective anomaly are labelled + from 1, ..., K. + """ + labels = pd.IntervalIndex(y_sparse).get_indexer(index) + # get_indexer return values 0 for the values inside the first interval, 1 to + # the values within the next interval and so on, and -1 for values outside any + # interval. The skchange convention is that 0 is normal and > 0 is anomalous, + # so we add 1 to the result. + labels += 1 + return pd.Series(labels, index=index, name="anomaly_label", dtype="int64") + + @staticmethod + def dense_to_sparse(y_dense: pd.Series) -> pd.Series: + """Convert the dense output from the transform method to a sparse format. + + Parameters + ---------- + y_dense : pd.Series + The dense output from a collective anomaly detector's transform method: + An integer series where 0-entries are normal and each collective anomaly + are labelled from 1, ..., K. + + Returns + ------- + pd.Series[pd.Interval] containing the collective anomaly intervals. + + Notes + ----- + The start and end points of the intervals can be accessed by + output.array.left and output.array.right, respectively. + """ + # The sparse format only uses integer positions, so we reset the index. + y_dense = y_dense.reset_index(drop=True) + + y_anomaly = y_dense.loc[y_dense.values > 0] + anomaly_locations_diff = y_anomaly.index.diff() + + first_anomaly_start = y_anomaly.index[:1].to_numpy() + anomaly_starts = y_anomaly.index[anomaly_locations_diff > 1] + anomaly_starts = np.insert(anomaly_starts, 0, first_anomaly_start) + + last_anomaly_end = y_anomaly.index[-1:].to_numpy() + anomaly_ends = y_anomaly.index[np.roll(anomaly_locations_diff > 1, -1)] + anomaly_ends = np.insert(anomaly_ends, len(anomaly_ends), last_anomaly_end) + + anomaly_intervals = list(zip(anomaly_starts, anomaly_ends)) + return CollectiveAnomalyDetector._format_sparse_output( + anomaly_intervals, closed="both" + ) + + @staticmethod + def _format_sparse_output( + anomaly_intervals: list[tuple[int, int]], closed: str = "both" + ) -> pd.Series: + """Format the sparse output of collective anomaly detectors. + + Can be reused by subclasses to format the output of the _predict method. + """ + return pd.Series( + pd.IntervalIndex.from_tuples(anomaly_intervals, closed=closed), + name="anomaly_interval", + ) + + +class SubsetCollectiveAnomalyDetector(BaseDetector): + """Base class for subset collective anomaly detectors. + + Subset collective anomaly detectors detect segments of multivariate time series data + that are considered anomalous, and also provide information on which components of + the data are affected. + + Output format of the predict method: See the dense_to_sparse method. + Output format of the transform method: See the sparse_to_dense method. + + Output format of the predict method: + + Needs to be implemented: + - _fit(self, X, y=None) -> self + - _predict(self, X) -> pd.DataFrame + + Optional to implement: + - _score_transform(self, X) -> pd.Series + - _update(self, X, y=None) -> self + """ + + @staticmethod + def sparse_to_dense( + y_sparse: pd.DataFrame, index: pd.Index, columns: pd.Index + ) -> pd.DataFrame: + """Convert the sparse output from the predict method to a dense format. + + Parameters + ---------- + y_sparse : pd.DataFrame + The sparse output from the predict method. The first column must contain the + anomaly intervals, the second column must contain a list of the affected + columns. + index : array-like + Indices that are to be annotated according to ``y_sparse``. + columns : array-like + Columns that are to be annotated according to ``y_sparse``. + + Returns + ------- + pd.DataFrame where 0-entries are normal and each collective anomaly are labelled + from 1, ..., K. + """ + anomaly_intervals = y_sparse.iloc[:, 0].array + anomaly_starts = anomaly_intervals.left + anomaly_ends = anomaly_intervals.right + anomaly_columns = y_sparse.iloc[:, 1] + + start_is_open = anomaly_intervals.closed in ["neither", "right"] + if start_is_open: + anomaly_starts += 1 # Exclude the start index in the for loop below. + end_is_closed = anomaly_intervals.closed in ["both", "right"] + if end_is_closed: + anomaly_ends += 1 # Include the end index in the for loop below. + + labels = np.zeros((len(index), len(columns)), dtype="int64") + anomalies = zip(anomaly_starts, anomaly_ends, anomaly_columns) + for i, (start, end, affected_columns) in enumerate(anomalies): + labels[start:end, affected_columns] = i + 1 + + return pd.DataFrame(labels, index=index, columns=columns) + + @staticmethod + def dense_to_sparse(y_dense: pd.DataFrame): + """Convert the dense output from the transform method to a sparse format. + + Parameters + ---------- + y_dense : pd.DataFrame + The dense output from the transform method. + + Returns + ------- + pd.DataFrame with columns + anomaly_interval: Intervals of the collective anomalies. + anomaly_columns: Affected columns of the collective anomalies. + """ + # The sparse format only uses integer positions, so we reset index and columns. + y_dense = y_dense.reset_index(drop=True) + y_dense.columns = range(y_dense.columns.size) + + anomaly_intervals = [] + unique_labels = np.unique(y_dense.values) + for i in unique_labels[unique_labels > 0]: + anomaly_mask = y_dense == i + which_columns = anomaly_mask.any(axis=0) + which_rows = anomaly_mask.any(axis=1) + anomaly_columns = anomaly_mask.columns[which_columns].to_list() + anomaly_start = anomaly_mask.index[which_rows][0] + anomaly_end = anomaly_mask.index[which_rows][-1] + anomaly_intervals.append((anomaly_start, anomaly_end, anomaly_columns)) + + return SubsetCollectiveAnomalyDetector._format_sparse_output( + anomaly_intervals, closed="both" + ) + + @staticmethod + def _format_sparse_output( + collective_anomalies: list[tuple[int, int, np.ndarray]], + closed: str = "both", + ) -> pd.DataFrame: + """Format the sparse output of subset collective anomaly detectors. + + Can be reused by subclasses to format the output of the _predict method. + + Parameters + ---------- + collective_anomalies : list + List of tuples containing start and end indices of collective + anomalies and a np.array of the affected components/columns. + closed : str + Whether the (start, end) tuple correspond to intervals that are closed + on the left, right, both, or neither. + + Returns + ------- + pd.DataFrame with columns + anomaly_interval: Intervals of the collective anomalies. + anomaly_columns: Affected columns of the collective anomalies. + """ + anomaly_intervals = [(start, end) for start, end, _ in collective_anomalies] + affected_components = [components for _, _, components in collective_anomalies] + return pd.DataFrame( + { + "anomaly_interval": pd.IntervalIndex.from_tuples( + anomaly_intervals, closed=closed + ), + "anomaly_columns": affected_components, + } + ) diff --git a/skchange/anomaly_detectors/capa.py b/skchange/anomaly_detectors/capa.py index 13e67db1..e0e7ef8b 100644 --- a/skchange/anomaly_detectors/capa.py +++ b/skchange/anomaly_detectors/capa.py @@ -8,10 +8,9 @@ import numpy as np import pandas as pd from numba import njit -from sktime.annotation.base import BaseSeriesAnnotator +from skchange.anomaly_detectors.base import CollectiveAnomalyDetector from skchange.anomaly_detectors.mvcapa import dense_capa_penalty, run_base_capa -from skchange.anomaly_detectors.utils import format_anomaly_output from skchange.costs.saving_factory import saving_factory from skchange.utils.validation.data import check_data from skchange.utils.validation.parameters import check_larger_than @@ -43,7 +42,7 @@ def run_capa( ) -class Capa(BaseSeriesAnnotator): +class Capa(CollectiveAnomalyDetector): """Collective and point anomaly detection. An efficient implementation of the CAPA algorithm [1]_ for anomaly detection. @@ -67,19 +66,8 @@ class Capa(BaseSeriesAnnotator): Maximum length of a segment. ignore_point_anomalies : bool, optional (default=False) If True, detected point anomalies are not returned by .predict(). I.e., only - collective anomalies are returned. - fmt : str {"dense", "sparse"}, optional (default="sparse") - Annotation output format: - * If "sparse", a sub-series of labels for only the outliers in X is returned, - * If "dense", a series of labels for all values in X is returned. - labels : str {"indicator", "score", "int_label"}, optional (default="int_label") - Annotation output labels: - * If "indicator", returned values are boolean, indicating whether a value is - an outlier, - * If "score", returned values are floats, giving the outlier score. - * If "int_label", returned values are integer, indicating which segment a - value belongs to. - + collective anomalies are returned. If False, point anomalies are included in the + output as collective anomalies of length 1. References ---------- @@ -113,8 +101,6 @@ def __init__( min_segment_length: int = 2, max_segment_length: int = 1000, ignore_point_anomalies: bool = False, - fmt: str = "sparse", - labels: str = "int_label", ): self.saving = saving self.collective_penalty_scale = collective_penalty_scale @@ -122,7 +108,7 @@ def __init__( self.min_segment_length = min_segment_length self.max_segment_length = max_segment_length self.ignore_point_anomalies = ignore_point_anomalies - super().__init__(fmt=fmt, labels=labels) + super().__init__() self.saving_func, self.saving_init_func = saving_factory(self.saving) @@ -149,7 +135,7 @@ def _get_penalty_components(self, X: pd.DataFrame) -> tuple[np.ndarray, float]: point_penalty = self.point_penalty_scale * n_params * p * np.log(n) return collective_penalty, point_penalty - def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): + def _fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None): """Fit to training data. Sets the penalty of the detector. @@ -164,7 +150,7 @@ def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): ---------- X : pd.DataFrame training data to fit the threshold to. - Y : pd.Series, optional + y : pd.Series, optional Does nothing. Only here to make the fit method compatible with sktime and scikit-learn. @@ -185,23 +171,30 @@ def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): return self def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: - """Create annotations on test/deployment data. + """Detect events in test/deployment data. + + core logic Parameters ---------- - X : pd.DataFrame - data to annotate, time series + X : pd.DataFrame + Data to detect events in (time series). Returns ------- - Y : pd.Series - annotations for sequence X - exact format depends on annotation type + pd.Series[pd.Interval] containing the collective anomaly intervals. + + Notes + ----- + The start and end points of the intervals can be accessed by + output.array.left and output.array.right, respectively. """ X = check_data( X, min_length=self.min_segment_length, min_length_name="min_segment_length", ) - opt_savings, self.collective_anomalies, self.point_anomalies = run_capa( + opt_savings, collective_anomalies, point_anomalies = run_capa( X.values, self.saving_func, self.saving_init_func, @@ -211,15 +204,32 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: self.max_segment_length, ) self.scores = pd.Series(opt_savings, index=X.index, name="score") - anomalies = format_anomaly_output( - self.fmt, - self.labels, - X.index, - self.collective_anomalies, - self.point_anomalies if not self.ignore_point_anomalies else None, - scores=self.scores, - ) - return anomalies + + anomalies = collective_anomalies + if not self.ignore_point_anomalies: + anomalies += point_anomalies + anomalies = sorted(anomalies) + + return CollectiveAnomalyDetector._format_sparse_output(anomalies) + + def _score_transform(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: + """Compute the CAPA scores for the input data. + + Parameters + ---------- + X : pd.DataFrame - data to compute scores for, time series + + Returns + ------- + scores : pd.Series - scores for sequence X + + Notes + ----- + The CAPA scores are the cumulative optimal savings, so the scores are increasing + and are not per observation scores. + """ + self.predict(X) + return self.scores @classmethod def get_test_params(cls, parameter_set="default"): @@ -242,5 +252,6 @@ def get_test_params(cls, parameter_set="default"): """ params = [ {"saving": "mean", "min_segment_length": 5, "max_segment_length": 100}, + {"saving": "mean", "min_segment_length": 2, "max_segment_length": 20}, ] return params diff --git a/skchange/anomaly_detectors/circular_binseg.py b/skchange/anomaly_detectors/circular_binseg.py index 3543df9b..bfdf7d36 100644 --- a/skchange/anomaly_detectors/circular_binseg.py +++ b/skchange/anomaly_detectors/circular_binseg.py @@ -8,9 +8,8 @@ import numpy as np import pandas as pd from numba import njit -from sktime.annotation.base import BaseSeriesAnnotator -from skchange.anomaly_detectors.utils import format_anomaly_output +from skchange.anomaly_detectors.base import CollectiveAnomalyDetector from skchange.change_detectors.seeded_binseg import make_seeded_intervals from skchange.scores.score_factory import anomaly_score_factory from skchange.utils.validation.data import check_data @@ -97,7 +96,7 @@ def run_circular_binseg( return anomalies, anomaly_scores, maximizers, starts, ends -class CircularBinarySegmentation(BaseSeriesAnnotator): +class CircularBinarySegmentation(CollectiveAnomalyDetector): """Circular binary segmentation algorithm for multiple collective anomaly detection. Binary segmentation type changepoint detection algorithms recursively split the data @@ -137,17 +136,6 @@ class CircularBinarySegmentation(BaseSeriesAnnotator): starting at 'interval_len'='min_interval_length'. It also governs the amount of overlap between intervals of the same length, as the start of each interval is shifted by a factor of '1 + 1 / growth_factor'. Must be a float in (1, 2]. - fmt : str {"dense", "sparse"}, optional (default="sparse") - Annotation output format: - * If "sparse", a sub-series of labels for only the outliers in X is returned, - * If "dense", a series of labels for all values in X is returned. - labels : str {"indicator", "score", "int_label"}, optional (default="int_label") - Annotation output labels: - * If "indicator", returned values are boolean, indicating whether a value is an - outlier, - * If "score", returned values are floats, giving the outlier score. - * If "int_label", returned values are integer, indicating which segment a value - belongs to. References ---------- @@ -184,8 +172,6 @@ def __init__( min_segment_length: int = 5, max_interval_length: int = 100, growth_factor: float = 1.5, - fmt: str = "sparse", - labels: str = "int_label", ): self.score = score self.threshold_scale = threshold_scale # Just holds the input value. @@ -193,7 +179,7 @@ def __init__( self.min_segment_length = min_segment_length self.max_interval_length = max_interval_length self.growth_factor = growth_factor - super().__init__(fmt=fmt, labels=labels) + super().__init__() self.score_f, self.score_init_f = anomaly_score_factory(self.score) check_larger_than(0.0, self.threshold_scale, "threshold_scale", allow_none=True) @@ -262,7 +248,7 @@ def _get_threshold(self, X: pd.DataFrame) -> float: p = X.shape[1] return self.threshold_scale * self.get_default_threshold(n, p) - def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): + def _fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None): """Fit to training data. Sets the threshold of the detector. @@ -277,7 +263,7 @@ def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): ---------- X : pd.DataFrame training data to fit the threshold to. - Y : pd.Series, optional + y : pd.Series, optional Does nothing. Only here to make the fit method compatible with sktime and scikit-learn. @@ -294,16 +280,23 @@ def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): return self def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: - """Create annotations on test/deployment data. + """Detect events in test/deployment data. + + core logic Parameters ---------- - X : pd.DataFrame - data to annotate, time series + X : pd.DataFrame + Data to detect events in (time series). Returns ------- - Y : pd.Series - annotations for sequence X - exact format depends on annotation type + pd.Series[pd.Interval] containing the collective anomaly intervals. + + Notes + ----- + The start and end points of the intervals can be accessed by + output.array.left and output.array.right, respectively. """ X = check_data( X, @@ -319,7 +312,6 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: self.max_interval_length, self.growth_factor, ) - self.anomalies = anomalies self.scores = pd.DataFrame( { "interval_start": starts, @@ -329,9 +321,7 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: "score": scores, } ) - return format_anomaly_output( - self.fmt, self.labels, X.index, self.anomalies, scores=self.scores - ) + return CollectiveAnomalyDetector._format_sparse_output(anomalies) @classmethod def get_test_params(cls, parameter_set="default"): @@ -353,6 +343,7 @@ def get_test_params(cls, parameter_set="default"): `create_test_instance` uses the first (or only) dictionary in `params` """ params = [ - {"score": "mean", "min_segment_length": 5, "max_interval_length": 100}, + {"score": "mean", "min_segment_length": 5, "max_interval_length": 50}, + {"score": "mean", "min_segment_length": 2, "max_interval_length": 20}, ] return params diff --git a/skchange/anomaly_detectors/moscore_anomaly.py b/skchange/anomaly_detectors/moscore_anomaly.py index b9079170..f51282ed 100644 --- a/skchange/anomaly_detectors/moscore_anomaly.py +++ b/skchange/anomaly_detectors/moscore_anomaly.py @@ -7,10 +7,9 @@ import numpy as np import pandas as pd -from sktime.annotation.base import BaseSeriesAnnotator +from skchange.anomaly_detectors.base import CollectiveAnomalyDetector from skchange.anomaly_detectors.circular_binseg import greedy_anomaly_selection -from skchange.anomaly_detectors.utils import format_anomaly_output from skchange.scores.score_factory import anomaly_score_factory from skchange.utils.validation.data import check_data from skchange.utils.validation.parameters import check_larger_than, check_smaller_than @@ -53,7 +52,7 @@ def run_moscore_anomaly( return anomalies, scores, starts, ends -class MoscoreAnomaly(BaseSeriesAnnotator): +class MoscoreAnomaly(CollectiveAnomalyDetector): """Moving score algorithm for multiple collective anomaly detection. A custom version of the MOSUM (moving sum) algorithm [1]_ for collective anomaly @@ -62,7 +61,7 @@ class MoscoreAnomaly(BaseSeriesAnnotator): `left_bandwidth` values to the left and `right_bandwidth` samples to the right of the anomaly window. - Experimental for now. + Experimental. Efficently implemented using numba. @@ -102,17 +101,6 @@ class MoscoreAnomaly(BaseSeriesAnnotator): `min_anomaly_length` and `max_anomaly_length` are considered. If it is not important to consider all candidates, just a sparse subset for example, customising the anomaly lengths can significantly speed up the algorithm. - fmt : str {"dense", "sparse"}, optional (default="sparse") - Annotation output format: - * If "sparse", a sub-series of labels for only the outliers in X is returned, - * If "dense", a series of labels for all values in X is returned. - labels : str {"indicator", "score", "int_label"}, optional (default="int_label") - Annotation output labels: - * If "indicator", returned values are boolean, indicating whether a value is an - outlier, - * If "score", returned values are floats, giving the outlier score. - * If "int_label", returned values are integer, indicating which segment a value - belongs to. References ---------- @@ -146,8 +134,6 @@ def __init__( threshold_scale: Optional[float] = 2.0, level: float = 0.01, anomaly_lengths: np.ndarray = None, - fmt: str = "sparse", - labels: str = "int_label", ): self.score = score self.min_anomaly_length = min_anomaly_length @@ -157,7 +143,7 @@ def __init__( self.threshold_scale = threshold_scale self.level = level self.anomaly_lengths = anomaly_lengths - super().__init__(fmt=fmt, labels=labels) + super().__init__() self.score_f, self.score_init_f = anomaly_score_factory(score) self._right_bandwidth = right_bandwidth if right_bandwidth else left_bandwidth @@ -247,7 +233,7 @@ def _get_threshold(self, X: pd.DataFrame) -> float: p = X.shape[1] return self.threshold_scale * self.get_default_threshold(n, p) - def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): + def _fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None): """Fit to training data. Sets the threshold of the detector. @@ -262,7 +248,7 @@ def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): ---------- X : pd.DataFrame training data to fit the threshold to. - Y : pd.Series, optional + y : pd.Series, optional Does nothing. Only here to make the fit method compatible with sktime and scikit-learn. @@ -282,16 +268,21 @@ def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): return self def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: - """Create annotations on test/deployment data. + """Detect events in test/deployment data. Parameters ---------- - X : pd.DataFrame - data to annotate, time series + X : pd.DataFrame + Data to detect events in (time series). Returns ------- - Y : pd.Series - annotations for sequence X - exact format depends on annotation type + pd.Series[pd.Interval] containing the collective anomaly intervals. + + Notes + ----- + The start and end points of the intervals can be accessed by + output.array.left and output.array.right, respectively. """ min_length = ( self.left_bandwidth + self._right_bandwidth + self._min_anomaly_length @@ -301,7 +292,7 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: min_length=min_length, min_length_name="left_bandwidth + _right_bandwidth + _min_anomaly_length", ) - self.anomalies, scores, starts, ends = run_moscore_anomaly( + anomalies, scores, starts, ends = run_moscore_anomaly( X.values, self.score_f, self.score_init_f, @@ -313,9 +304,7 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: self.scores = pd.DataFrame( {"anomaly_start": starts, "anomaly_end": ends, "score": scores} ) - return format_anomaly_output( - self.fmt, self.labels, X.index, self.anomalies, scores=self.scores - ) + return CollectiveAnomalyDetector._format_sparse_output(anomalies) @classmethod def get_test_params(cls, parameter_set="default"): @@ -339,9 +328,15 @@ def get_test_params(cls, parameter_set="default"): params = [ { "score": "mean", - "min_anomaly_length": 5, - "max_anomaly_length": 100, - "left_bandwidth": 50, + "min_anomaly_length": 2, + "max_anomaly_length": 8, + "left_bandwidth": 4, + }, + { + "score": "mean", + "min_anomaly_length": 2, + "max_anomaly_length": 6, + "left_bandwidth": 3, }, ] return params diff --git a/skchange/anomaly_detectors/mvcapa.py b/skchange/anomaly_detectors/mvcapa.py index 611ead02..a8a010ab 100644 --- a/skchange/anomaly_detectors/mvcapa.py +++ b/skchange/anomaly_detectors/mvcapa.py @@ -9,9 +9,8 @@ import pandas as pd from numba import njit from scipy.stats import chi2 -from sktime.annotation.base import BaseSeriesAnnotator -from skchange.anomaly_detectors.utils import format_multivariate_anomaly_output +from skchange.anomaly_detectors.base import SubsetCollectiveAnomalyDetector from skchange.costs.saving_factory import saving_factory from skchange.utils.validation.data import check_data from skchange.utils.validation.parameters import check_larger_than @@ -354,7 +353,7 @@ def run_mvcapa( return opt_savings, collective_anomalies, point_anomalies -class Mvcapa(BaseSeriesAnnotator): +class Mvcapa(SubsetCollectiveAnomalyDetector): """Subset multivariate collective and point anomaly detection. An efficient implementation of the MVCAPA algorithm [1]_ for anomaly detection. @@ -381,18 +380,6 @@ class Mvcapa(BaseSeriesAnnotator): ignore_point_anomalies : bool, optional (default=False) If True, detected point anomalies are not returned by .predict(). I.e., only collective anomalies are returned. - fmt : str {"dense", "sparse"}, optional (default="sparse") - Annotation output format: - * If "sparse", a sub-series of labels for only the outliers in X is returned, - * If "dense", a series of labels for all values in X is returned. - labels : str {"indicator", "score", "int_label"}, optional (default="int_label") - Annotation output labels: - * If "indicator", returned values are boolean, indicating whether a value is - an outlier, - * If "score", returned values are floats, giving the outlier score. - * If "int_label", returned values are integer, indicating which segment a - value belongs to. - References ---------- @@ -406,7 +393,7 @@ class Mvcapa(BaseSeriesAnnotator): from skchange.datasets.generate import generate_teeth_data df = generate_teeth_data(5, 10, p=10, mean=10, affected_proportion=0.2) - capa = Capa(collective_penalty_scale=5, fmt="sparse", max_segment_length=20) + capa = Capa(collective_penalty_scale=5, max_segment_length=20) capa.fit_predict(df) """ @@ -426,8 +413,6 @@ def __init__( min_segment_length: int = 2, max_segment_length: int = 1000, ignore_point_anomalies: bool = False, - fmt: str = "sparse", - labels: str = "int_label", ): self.saving = saving self.collective_penalty = collective_penalty @@ -437,7 +422,7 @@ def __init__( self.min_segment_length = min_segment_length self.max_segment_length = max_segment_length self.ignore_point_anomalies = ignore_point_anomalies - super().__init__(fmt=fmt, labels=labels) + super().__init__() self.saving_func, self.saving_init_func = saving_factory(self.saving) @@ -463,7 +448,7 @@ def _get_penalty_components(self, X: pd.DataFrame) -> tuple[np.ndarray, float]: ) return collective_alpha, collective_betas, point_alpha, point_betas - def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): + def _fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None): """Fit to training data. Sets the penalty of the detector. @@ -478,7 +463,7 @@ def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): ---------- X : pd.DataFrame training data to fit the threshold to. - Y : pd.Series, optional + y : pd.Series, optional Does nothing. Only here to make the fit method compatible with sktime and scikit-learn. @@ -511,7 +496,7 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: Returns ------- - Y : pd.Series or pd.DataFrame + y : pd.Series or pd.DataFrame Annotations for sequence X, exact format depends on annotation type. """ X = check_data( @@ -519,7 +504,7 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: min_length=self.min_segment_length, min_length_name="min_segment_length", ) - opt_savings, self.collective_anomalies, self.point_anomalies = run_mvcapa( + opt_savings, collective_anomalies, point_anomalies = run_mvcapa( X.values, self.saving_func, self.saving_init_func, @@ -531,16 +516,32 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: self.max_segment_length, ) self.scores = pd.Series(opt_savings, index=X.index, name="score") - anomalies = format_multivariate_anomaly_output( - self.fmt, - self.labels, - X.index, - X.columns, - self.collective_anomalies, - self.point_anomalies if not self.ignore_point_anomalies else None, - self.scores, - ) - return anomalies + + anomalies = collective_anomalies + if not self.ignore_point_anomalies: + anomalies += point_anomalies + anomalies = sorted(anomalies) + + return SubsetCollectiveAnomalyDetector._format_sparse_output(anomalies) + + def _score_transform(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: + """Compute the MVCAPA scores for the input data. + + Parameters + ---------- + X : pd.DataFrame - data to compute scores for, time series + + Returns + ------- + scores : pd.Series - scores for sequence X + + Notes + ----- + The MVCAPA scores are the cumulative optimal savings, so the scores are + increasing and are not per observation scores. + """ + self.predict(X) + return self.scores @classmethod def get_test_params(cls, parameter_set="default"): @@ -563,5 +564,6 @@ def get_test_params(cls, parameter_set="default"): """ params = [ {"saving": "mean", "min_segment_length": 5, "max_segment_length": 100}, + {"saving": "mean", "min_segment_length": 2, "max_segment_length": 20}, ] return params diff --git a/skchange/anomaly_detectors/tests/test_anomaly_detectors.py b/skchange/anomaly_detectors/tests/test_anomaly_detectors.py index 2a4fc418..28a1847c 100644 --- a/skchange/anomaly_detectors/tests/test_anomaly_detectors.py +++ b/skchange/anomaly_detectors/tests/test_anomaly_detectors.py @@ -2,121 +2,73 @@ import pandas as pd import pytest -from sktime.tests.test_switch import run_test_for_class -from sktime.utils._testing.annotation import make_annotation_problem -from skchange.anomaly_detectors.anomalisers import StatThresholdAnomaliser -from skchange.anomaly_detectors.capa import Capa -from skchange.anomaly_detectors.circular_binseg import CircularBinarySegmentation -from skchange.anomaly_detectors.moscore_anomaly import MoscoreAnomaly -from skchange.anomaly_detectors.mvcapa import Mvcapa +from skchange.anomaly_detectors import ANOMALY_DETECTORS, COLLECTIVE_ANOMALY_DETECTORS +from skchange.anomaly_detectors.base import CollectiveAnomalyDetector from skchange.datasets.generate import generate_anomalous_data -anomaly_detectors = [ - Capa, - CircularBinarySegmentation, - MoscoreAnomaly, - Mvcapa, - StatThresholdAnomaliser, -] - -true_anomalies = [(50, 59), (120, 129)] +true_anomalies = [(30, 34), (70, 75)] anomaly_data = generate_anomalous_data( - 200, anomalies=true_anomalies, means=[10.0, 5.0], random_state=39 + 100, anomalies=true_anomalies, means=[10.0, 15.0], random_state=2 ) -@pytest.mark.parametrize("Estimator", anomaly_detectors) -def test_output_type(Estimator): - """Test annotator output type.""" - estimator = Estimator.create_test_instance() - if not run_test_for_class(Estimator): - return None - - arg = make_annotation_problem( - n_timepoints=500, estimator_type=estimator.get_tag("distribution_type") - ) - estimator.fit(arg) - arg = make_annotation_problem( - n_timepoints=200, estimator_type=estimator.get_tag("distribution_type") - ) - y_pred = estimator.predict(arg) - assert isinstance(y_pred, (pd.DataFrame, pd.Series)) - - -@pytest.mark.parametrize("Estimator", anomaly_detectors) -def test_anomaly_detector_sparse_int(Estimator): - """Test sparse int label anomaly detector output. - - Check if the predicted anomalies match. - """ +@pytest.mark.parametrize("Estimator", COLLECTIVE_ANOMALY_DETECTORS) +def test_collective_anomaly_detector_predict(Estimator): + """Test collective anomaly detector's predict method (sparse output).""" detector = Estimator.create_test_instance() - detector.set_params(fmt="sparse", labels="int_label") anomalies = detector.fit_predict(anomaly_data) - assert len(anomalies) == len(true_anomalies) - for i, (start, end) in enumerate(true_anomalies): - assert anomalies.loc[i, "start"] == start and anomalies.loc[i, "end"] == end - + if isinstance(anomalies, pd.DataFrame): + anomalies = anomalies.iloc[:, 0] -@pytest.mark.parametrize("Estimator", anomaly_detectors) -def test_anomaly_detector_sparse_indicator(Estimator): - """Test sparse indicator anomaly detector output. - - Check if the predicted anomalies match. - """ - detector = Estimator.create_test_instance() - detector.set_params(fmt="sparse", labels="indicator") - anomalies = detector.fit_predict(anomaly_data) assert len(anomalies) == len(true_anomalies) for i, (start, end) in enumerate(true_anomalies): - assert anomalies.loc[i, "start"] == start and anomalies.loc[i, "end"] == end + assert anomalies.array.left[i] == start and anomalies.array.right[i] == end -@pytest.mark.parametrize("Estimator", anomaly_detectors) -def test_anomaly_detector_score(Estimator): - """Test score anomaly detector output.""" - sparse_detector = Estimator.create_test_instance() - sparse_detector.set_params(fmt="sparse", labels="score") - dense_detector = Estimator.create_test_instance() - dense_detector.set_params(fmt="dense", labels="score") - sparse_scores = sparse_detector.fit_predict(anomaly_data) - dense_scores = dense_detector.fit_predict(anomaly_data) - assert (sparse_scores == dense_scores).all(axis=None) - if isinstance(sparse_scores, pd.DataFrame): - assert "score" in sparse_scores.columns - else: - assert sparse_scores.name == "score" - - -@pytest.mark.parametrize("Estimator", anomaly_detectors) -def test_anomaly_detector_dense_int(Estimator): - """Tests dense int label anomaly detector output. - - Check if the predicted anomalies matches. - """ +@pytest.mark.parametrize("Estimator", COLLECTIVE_ANOMALY_DETECTORS) +def test_collective_anomaly_detector_transform(Estimator): + """Test collective anomaly detector's transform method (dense output).""" detector = Estimator.create_test_instance() - detector.set_params(fmt="dense", labels="int_label") - labels = detector.fit_predict(anomaly_data) + labels = detector.fit_transform(anomaly_data) if isinstance(labels, pd.DataFrame): labels = labels.iloc[:, 0] + true_collective_anomalies = pd.IntervalIndex.from_tuples( + true_anomalies, closed="both" + ) + true_anomaly_labels = CollectiveAnomalyDetector.sparse_to_dense( + true_collective_anomalies, anomaly_data.index + ) + labels.equals(true_anomaly_labels) + + # Similar test that does not depend on sparse_to_dense, just to be sure. assert labels.nunique() == len(true_anomalies) + 1 for i, (start, end) in enumerate(true_anomalies): assert (labels.iloc[start : end + 1] == i + 1).all() -@pytest.mark.parametrize("Estimator", anomaly_detectors) -def test_anomaly_detector_dense_indicator(Estimator): - """Tests dense indicator anomaly detector output. - - Check if the predicted anomalies matches. - """ +@pytest.mark.parametrize("Estimator", ANOMALY_DETECTORS) +def test_anomaly_detector_sparse_to_dense(Estimator): + """Test that predict + sparse_to_dense == transform.""" detector = Estimator.create_test_instance() - detector.set_params(fmt="dense", labels="indicator") - labels = detector.fit_predict(anomaly_data) - if isinstance(labels, pd.DataFrame): - labels = labels.iloc[:, 0] + anomalies = detector.fit_predict(anomaly_data) + labels_predict_convert = detector.sparse_to_dense( + anomalies, anomaly_data.index, anomaly_data.columns + ) + if isinstance(labels_predict_convert, pd.Series): + # transforms does output conversion to match the input. This is not required of + # spare_to_dense. + labels_predict_convert = labels_predict_convert.to_frame() + labels_transform = detector.fit_transform(anomaly_data) + assert labels_predict_convert.equals(labels_transform) + - for start, end in true_anomalies: - assert labels.iloc[start : end + 1].all() - assert not labels.iloc[start - 1] and not labels.iloc[end + 1] +@pytest.mark.parametrize("Estimator", ANOMALY_DETECTORS) +def test_anomaly_detector_dense_to_sparse(Estimator): + """Test that transform + dense_to_sparse == predict.""" + detector = Estimator.create_test_instance() + labels = detector.fit_transform(anomaly_data) + anomalies_transform_convert = detector.dense_to_sparse(labels) + anomalies_predict = detector.fit_predict(anomaly_data) + assert anomalies_transform_convert.equals(anomalies_predict) diff --git a/skchange/anomaly_detectors/tests/test_capa.py b/skchange/anomaly_detectors/tests/test_capa.py index 7c8b7305..b21406c1 100644 --- a/skchange/anomaly_detectors/tests/test_capa.py +++ b/skchange/anomaly_detectors/tests/test_capa.py @@ -1,5 +1,6 @@ """Tests for CAPA and all available savings.""" +import pandas as pd import pytest from skchange.anomaly_detectors.capa import Capa @@ -9,21 +10,30 @@ @pytest.mark.parametrize("saving", VALID_SAVINGS) -def test_capa_anomalies(saving): +@pytest.mark.parametrize("detector_class", [Capa, Mvcapa]) +def test_capa_anomalies(detector_class, saving): """Test Capa anomalies.""" n_segments = 2 seg_len = 20 df = generate_teeth_data( - n_segments=n_segments, mean=10, segment_length=seg_len, p=5, random_state=8 + n_segments=n_segments, + mean=20, + segment_length=seg_len, + p=5, + affected_proportion=0.2, + random_state=8, + ) + detector = detector_class( + saving=saving, + collective_penalty_scale=2.0, + ignore_point_anomalies=True, # To get test coverage. + ) + anomalies = detector.fit_predict(df) + if isinstance(anomalies, pd.DataFrame): + anomalies = anomalies.iloc[:, 0] + # End point also included as a changepoint + assert ( + len(anomalies) == 1 + and anomalies.array.left[0] == seg_len + and anomalies.array.right[0] == 2 * seg_len - 1 ) - for detector_class in [Capa, Mvcapa]: - detector = detector_class( - saving=saving, fmt="sparse", collective_penalty_scale=2.0 - ) - anomalies = detector.fit_predict(df) - # End point also included as a changepoint - assert ( - len(anomalies) == 1 - and anomalies.loc[0, "start"] == seg_len - and anomalies.loc[0, "end"] == 2 * seg_len - 1 - ) diff --git a/skchange/anomaly_detectors/tests/test_moscore_anomaly.py b/skchange/anomaly_detectors/tests/test_moscore_anomaly.py index 8d78f147..d9173d8a 100644 --- a/skchange/anomaly_detectors/tests/test_moscore_anomaly.py +++ b/skchange/anomaly_detectors/tests/test_moscore_anomaly.py @@ -7,38 +7,38 @@ from skchange.datasets.generate import generate_anomalous_data from skchange.scores.score_factory import VALID_ANOMALY_SCORES -true_anomalies = [(50, 59), (120, 129)] +true_anomalies = [(30, 34), (70, 75)] anomaly_data = generate_anomalous_data( - 200, anomalies=true_anomalies, means=[10.0, 5.0], random_state=5 + 100, anomalies=true_anomalies, means=[10.0, 15.0], random_state=103 ) @pytest.mark.parametrize("score", VALID_ANOMALY_SCORES) def test_moscore_anomalies(score): """Test Moscore anomalies.""" - detector = MoscoreAnomaly.create_test_instance() - detector.set_params(score=score, fmt="sparse", labels="int_label") + detector = MoscoreAnomaly( + score, min_anomaly_length=4, max_anomaly_length=10, left_bandwidth=20 + ) + detector.set_params(score=score) anomalies = detector.fit_predict(anomaly_data) assert len(anomalies) == len(true_anomalies) for i, (start, end) in enumerate(true_anomalies): - assert anomalies.loc[i, "start"] == start and anomalies.loc[i, "end"] == end + assert anomalies.array.left[i] == start and anomalies.array.right[i] == end @pytest.mark.parametrize("score", VALID_ANOMALY_SCORES) def test_moscore_scores(score): - """Test Moscore scores.""" + """Test MoscoreAnomaly scores.""" detector = MoscoreAnomaly.create_test_instance() - detector.set_params(score=score, fmt="sparse", labels="int_label") - scores = detector.fit_predict(anomaly_data) - assert np.all(scores >= 0.0) + detector.set_params(score=score) + detector.fit_predict(anomaly_data) + assert np.all(detector.scores >= 0.0) @pytest.mark.parametrize("score", VALID_ANOMALY_SCORES) def test_moscore_tuning(score): """Test Moscore tuning.""" detector = MoscoreAnomaly.create_test_instance() - detector.set_params( - score=score, threshold_scale=None, fmt="dense", labels="indicator" - ) + detector.set_params(score=score, threshold_scale=None) detector.fit(anomaly_data) assert detector.threshold_ > 0.0 diff --git a/skchange/anomaly_detectors/utils.py b/skchange/anomaly_detectors/utils.py deleted file mode 100644 index 6ab6c12f..00000000 --- a/skchange/anomaly_detectors/utils.py +++ /dev/null @@ -1,205 +0,0 @@ -"""Utility functions for anomaly detection.""" - -from typing import Union - -import numpy as np -import pandas as pd - - -def merge_anomalies( - collective_anomalies: Union[ - list[tuple[int, int]], list[tuple[int, int, np.ndarray]] - ] = None, - point_anomalies: Union[ - list[int], - list[tuple[int, int]], - list[tuple[int, np.ndarray]], - list[tuple[int, int, np.ndarray]], - ] = None, -) -> list[tuple[int, int, np.ndarray]]: - """Merge collective and point anomalies into a single list of intervals. - - Parameters - ---------- - collective_anomalies : list, optional (default=None) - List of tuples containing inclusive start and end indices of collective - anomalies. - point_anomalies : list, optional (default=None) - List of point anomaly indices. - - Returns - ------- - list - List of tuples containing inclusive start and end indices of collective - anomalies and point anomalies. - """ - if collective_anomalies is None and point_anomalies is None: - raise ValueError( - "Either collective_anomalies or point_anomalies must be given." - ) - - anomalies = [] - if collective_anomalies: - anomalies += collective_anomalies - if point_anomalies: - # Convert point anomalies to the same format as collective anomalies - if isinstance(point_anomalies[0], int): - anomalies += [(i, i) for i in point_anomalies] - elif len(point_anomalies[0]) == 2 and isinstance( - point_anomalies[0][-1], np.ndarray - ): - anomalies += [(i, i, components) for (i, components) in point_anomalies] - else: - anomalies += point_anomalies - - anomalies = sorted(anomalies) - return anomalies - - -def anomalies_to_labels( - anomalies: list[tuple[int, int]], n: int, p: int = None -) -> np.ndarray: - """Convert anomaly indices to labels. - - Parameters - ---------- - anomalies : list - List of tuples containing inclusive start and end indices of collective - anomalies and point anomalies. - n : int - Sample size. - p : int - Dimensionality of the data input to the anomaly detector. - - Returns - ------- - np.ndarray - Array of labels, where 0 is the normal class, and 1, 2, ... are labels for each - distinct collective and/or point_anomaly. - """ - labels = np.zeros(n, dtype=int) if p is None else np.zeros((n, p), dtype=int) - if len(anomalies) == 0: - return labels - - if len(anomalies[0]) == 2: - for i, (start, end) in enumerate(anomalies): - labels[start : end + 1] = i + 1 - elif len(anomalies[0]) == 3: - # Multivariate - for i, (start, end, components) in enumerate(anomalies): - labels[start : end + 1, components] = i + 1 - return labels - - -def format_anomaly_output( - fmt: str, - labels: str, - X_index: pd.Index, - collective_anomalies: list[tuple] = None, - point_anomalies: list[tuple] = None, - scores: Union[pd.Series, pd.DataFrame] = None, -) -> pd.Series: - """Format the predict method output of change detectors. - - Parameters - ---------- - fmt : str - Format of the output. Either "sparse" or "dense". - labels : str - Labels of the output. Either "indicator", "score" or "int_label". - X_index : pd.Index - Index of the input data. - collective_anomalies : list, optional (default=None) - List of tuples containing inclusive start and end indices of collective - anomalies. - point_anomalies : list, optional (default=None) - List of point anomaly indices. - scores : pd.Series or pd.DataFrame, optional (default=None) - Series or DataFrame of scores. If Series, it must be named 'score', and if - DataFrame, it must have a column named 'score'. - - Returns - ------- - pd.Series - Either a sparse or dense pd.Series of boolean labels, integer labels or scores. - """ - n = X_index.size - anomalies = merge_anomalies(collective_anomalies, point_anomalies) - if labels == "int_label": - if fmt == "dense": - anomaly_labels = anomalies_to_labels(anomalies, n) - out = pd.Series(anomaly_labels, index=X_index, name="int_label", dtype=int) - elif fmt == "sparse": - out = pd.DataFrame(anomalies, columns=["start", "end"]) - elif labels == "indicator": - if fmt == "dense": - anomaly_labels = anomalies_to_labels(anomalies, n) - out = pd.Series(anomaly_labels > 0, index=X_index, name="indicator") - elif fmt == "sparse": - out = pd.DataFrame(anomalies, columns=["start", "end"]) - elif labels == "score": - # There is no sparse version of 'score'. - # The scores are formatted in each class' _predict method, as what is a good - # format for the scores is method dependent. - out = scores - return out - - -def format_multivariate_anomaly_output( - fmt: str, - labels: str, - X_index: pd.Index, - X_columns: pd.Index, - collective_anomalies: list[dict] = None, - point_anomalies: list[dict] = None, - scores: Union[pd.Series, pd.DataFrame] = None, -) -> pd.Series: - """Format the predict method output of change detectors. - - Parameters - ---------- - fmt : str - Format of the output. Either "sparse" or "dense". - labels : str - Labels of the output. Either "indicator", "score" or "int_label". - X_index : pd.Index - Index of the input data. - X_columns : pd.Index - Columns of the input data. - collective_anomalies : list, optional (default=None) - List of tuples containing inclusive start and end indices of collective - anomalies. - point_anomalies : list, optional (default=None) - List of point anomaly indices. - scores : pd.Series or pd.DataFrame, optional (default=None) - Series or DataFrame of scores. If Series, it must be named 'score', and if - DataFrame, it must have a column named 'score'. - - Returns - ------- - pd.Series - Either a sparse or dense pd.Series of boolean labels, integer labels or scores. - """ - n = X_index.size - p = X_columns.size - anomalies = merge_anomalies(collective_anomalies, point_anomalies) - if labels == "int_label": - if fmt == "dense": - anomaly_labels = anomalies_to_labels(anomalies, n, p) - out = pd.DataFrame( - anomaly_labels, index=X_index, columns=X_columns, dtype=int - ) - elif fmt == "sparse": - out = pd.DataFrame(anomalies, columns=["start", "end", "components"]) - elif labels == "indicator": - if fmt == "dense": - anomaly_labels = anomalies_to_labels(anomalies, n, p) - out = pd.DataFrame(anomaly_labels > 0, index=X_index, columns=X_columns) - elif fmt == "sparse": - out = pd.DataFrame(anomalies, columns=["start", "end", "components"]) - elif labels == "score": - # There is no sparse version of 'score'. - # The scores are formatted in each class' _predict method, as what is a good - # format for the scores is method dependent. - out = scores - return out diff --git a/skchange/base.py b/skchange/base.py new file mode 100644 index 00000000..429e289a --- /dev/null +++ b/skchange/base.py @@ -0,0 +1,401 @@ +"""Detector base class. + + class name: BaseDetector + + Adapted from the BaseSeriesAnnotator and BaseTransformer class in sktime. + +Scitype defining methods: + fitting - fit(self, X, y=None) + detecting, sparse format - predict(self, X) + detecting, dense format - transform(self, X, y=None) + detection scores, dense - score_transform(self, X) [optional] + updating (temporal) - update(self, X, y=None) [optional] + +Each detector type (e.g. point anomaly detector, collective anomaly detector, +changepoint detector) are subclasses of BaseDetector (task tag in sktime). +A detector type is defined by the content and format of the output of the predict +method. Each detector type therefore has the following methods for converting between +sparse and dense output formats: + converting sparse output to dense - sparse_to_dense(y_sparse, index, columns) + converting dense output to sparse - dense_to_sparse(y_dense) [optional] + +Convenience methods: + update&detect - update_predict(self, X) + fit&detect - fit_predict(self, X, y=None) + fit&transform - fit_transform(self, X, y=None) + +Inspection methods: + hyper-parameter inspection - get_params() + fitted parameter inspection - get_fitted_params() + +State: + fitted model/strategy - by convention, any attributes ending in "_" + fitted state flag - check_is_fitted() +""" + +__author__ = ["mtveten"] +__all__ = ["BaseDetector"] + +from sktime.transformations.base import BaseTransformer +from sktime.utils.validation.series import check_series + + +class BaseDetector(BaseTransformer): + """Base detector. + + An alternative implementation to the BaseSeriesAnnotator class from sktime, + more focused on the detection of events of interest. + Safer for now since the annotation module is still experimental. + + All detectors share the common feature that each element of the output from .predict + indicates the detection of a specific event of interest, such as an anomaly, a + changepoint, or something else. + + Needs to be implemented: + - _fit(self, X, y=None) -> self + - _predict(self, X) -> pd.Series or pd.DataFrame + - sparse_to_dense(y_sparse, index) -> pd.Series or pd.DataFrame + * Enables the transform method to work. + + Optional to implement: + - dense_to_sparse(y_dense) -> pd.Series or pd.DataFrame + - _score_transform(self, X) -> pd.Series or pd.DataFrame + - _update(self, X, y=None) -> self + """ + + # _tags are adapted from BaseTransformer in sktime. + _tags = { + "object_type": "transformer", # type of object + "scitype:transform-input": "Series", + # what is the scitype of X: Series, or Panel + "scitype:transform-output": "Series", + # what scitype is returned: Primitives, Series, Panel + "scitype:transform-labels": "None", + # what is the scitype of y: None (not needed), Primitives, Series, Panel + "scitype:instancewise": True, # is this an instance-wise transform? + "capability:inverse_transform": False, # can the transformer inverse transform? + "capability:inverse_transform:range": None, + "capability:inverse_transform:exact": True, + # inverting range of inverse transform = domain of invertibility of transform + "univariate-only": False, # can the transformer handle multivariate X? + "X_inner_mtype": "pd.DataFrame", # which mtypes do _fit/_predict support for X? + # this can be a Panel mtype even if transform-input is Series, vectorized + "y_inner_mtype": "None", # which mtypes do _fit/_predict support for y? + "requires_X": True, # does X need to be passed in fit? + "requires_y": False, # does y need to be passed in fit? + "enforce_index_type": None, # index type that needs to be enforced in X/y + "fit_is_empty": False, # is fit empty and can be skipped? Yes = True + "X-y-must-have-same-index": False, # can estimator handle different X/y index? + "transform-returns-same-time-index": True, + # does transform return have the same time index as input X + "skip-inverse-transform": False, # is inverse-transform skipped when called? + "capability:unequal_length": True, + # can the transformer handle unequal length time series (if passed Panel)? + "capability:unequal_length:removes": False, + # is transform result always guaranteed to be equal length (and series)? + "handles-missing-data": False, # can estimator handle missing data? + # todo: rename to capability:missing_values + "capability:missing_values": False, + # is transform result always guaranteed to contain no missing values? + "remember_data": True, # whether all data seen is remembered as self._X + "python_version": None, # PEP 440 python version specifier to limit versions + "authors": "mtveten", # author(s) of the object + "maintainers": "mtveten", # current maintainer(s) of the object + } + + def __init__(self): + self.task = self.get_class_tag("task") + self.learning_type = self.get_class_tag("learning_type") + + self._is_fitted = False + + self._X = None + self._y = None + + super().__init__() + + def _fit(self, X, y=None): + """Fit to training data. + + core logic + + Parameters + ---------- + X : pd.DataFrame + Training data to fit model to time series. + y : pd.Series, optional + Ground truth annotations for training if annotator is supervised. + + Returns + ------- + self : + Reference to self. + + Notes + ----- + Updates fitted model that updates attributes ending in "_". + """ + raise NotImplementedError("abstract method") + + def predict(self, X): + """Detect events in test/deployment data. + + Parameters + ---------- + X : pd.DataFrame + Data to detect events in (time series). + + Returns + ------- + y : pd.Series or pd.DataFrame + Each element or row corresponds to a detected event. Exact format depends on + the specific detector type. + """ + self.check_is_fitted() + + X = check_series(X, allow_index_names=True) + + # fkiraly: insert checks/conversions here, after PR #1012 I suggest + + y = self._predict(X=X) + return y + + def _predict(self, X): + """Detect events in test/deployment data. + + core logic + + Parameters + ---------- + X : pd.DataFrame + Data to detect events in (time series). + + Returns + ------- + y : pd.Series or pd.DataFrame + Each element or row corresponds to a detected event. Exact format depends on + the specific detector type. + """ + raise NotImplementedError("abstract method") + + def _transform(self, X, y=None): + """Detect events and return the result in a dense format. + + Parameters + ---------- + X : pd.DataFrame + Data to detect events in (time series). + + Returns + ------- + y : pd.Series or pd.DataFrame + Detections for sequence X. The returned detections will be in the dense + format, meaning that each element in X will be annotated according to the + detection results in some meaningful way depending on the detector type. + """ + y = self.predict(X) + y_dense = self.sparse_to_dense(y, X.index, X.columns) + + # sktime does not support transformations that change the state of the object. + # Some detectors store detection score information a self.scores during predict. + # For now remove self.scores in transform to pass tests. + if hasattr(self, "scores"): + del self.scores + + return y_dense + + @staticmethod + def sparse_to_dense(y_sparse, index, columns=None): + """Convert the sparse output from a detector to a dense format. + + Parameters + ---------- + y_sparse : pd.Series + The sparse output from a detector's predict method. The format of the + series depends on the task and capability of the annotator. + index : array-like + Indices that are to be annotated according to ``y_sparse``. + columns : array-like, optional + Columns that are to be annotated according to ``y_sparse``. + + Returns + ------- + pd.Series or pd.DataFrame of detection labels. + """ + raise NotImplementedError("abstract method") + + @staticmethod + def dense_to_sparse(y_dense): + """Convert the dense output from a detector to a sparse format. + + Parameters + ---------- + y_dense : pd.Series + The dense output from a detector's transform method. The format of the + series depends on the task and capability of the annotator. + + Returns + ------- + pd.Series + """ + raise NotImplementedError("abstract method") + + def score_transform(self, X): + """Return scores for predicted annotations on test/deployment data. + + Parameters + ---------- + X : pd.DataFrame + Data to annotate (time series). + + Returns + ------- + y : pd.Series + Scores for sequence X exact format depends on annotation type. + """ + self.check_is_fitted() + X = check_series(X, allow_index_names=True) + return self._score_transform(X) + + def _score_transform(self, X): + """Return scores for predicted annotations on test/deployment data. + + core logic + + Parameters + ---------- + X : pd.DataFrame + Data to annotate, time series. + + Returns + ------- + y : pd.Series + One score for each element in X. + Annotations for sequence X exact format depends on annotation type. + """ + raise NotImplementedError("abstract method") + + def update(self, X, y=None): + """Update model with new data and optional ground truth annotations. + + Parameters + ---------- + X : pd.DataFrame + Training data to update model with (time series). + y : pd.Series, optional + Ground truth annotations for training if annotator is supervised. + + Returns + ------- + self : + Reference to self. + + Notes + ----- + Updates fitted model that updates attributes ending in "_". + """ + self.check_is_fitted() + + X = check_series(X, allow_index_names=True) + + if y is not None: + y = check_series(y, allow_index_names=True) + + self._X = X.combine_first(self._X) + + if y is not None: + self._y = y.combine_first(self._y) + + self._update(X=X, y=y) + + return self + + def _update(self, X, y=None): + """Update model with new data and optional ground truth annotations. + + core logic + + Parameters + ---------- + X : pd.DataFrame + Training data to update model with time series + y : pd.Series, optional + Ground truth annotations for training if annotator is supervised. + + Returns + ------- + self : + Reference to self. + + Notes + ----- + Updates fitted model that updates attributes ending in "_". + """ + # default/fallback: re-fit to all data + self._fit(self._X, self._y) + + return self + + def update_predict(self, X): + """Update model with new data and create annotations for it. + + Parameters + ---------- + X : pd.DataFrame + Training data to update model with, time series. + + Returns + ------- + y : pd.Series + Annotations for sequence X exact format depends on annotation type. + + Notes + ----- + Updates fitted model that updates attributes ending in "_". + """ + self.update(X=X) + y = self.predict(X=X) + + return y + + def fit_predict(self, X, y=None): + """Fit to data, then predict it. + + Fits model to X and y with given annotation parameters + and returns the annotations made by the model. + + Parameters + ---------- + X : pd.DataFrame, pd.Series or np.ndarray + Data to be transformed + y : pd.Series or np.ndarray, optional (default=None) + Target values of data to be predicted. + + Returns + ------- + self : pd.Series + Annotations for sequence X exact format depends on annotation type. + """ + # Non-optimized default implementation; override when a better + # method is possible for a given algorithm. + return self.fit(X, y).predict(X) + + def fit_transform(self, X, y=None): + """Fit to data, then transform it. + + Fits model to X and y with given annotation parameters + and returns the annotations made by the model. + + Parameters + ---------- + X : pd.DataFrame, pd.Series or np.ndarray + Data to be transformed + y : pd.Series or np.ndarray, optional (default=None) + Target values of data to be predicted. + + Returns + ------- + self : pd.Series + Annotations for sequence X exact format depends on annotation type. + """ + return self.fit(X).transform(X) diff --git a/skchange/change_detectors/__init__.py b/skchange/change_detectors/__init__.py index 3064220c..f081b171 100644 --- a/skchange/change_detectors/__init__.py +++ b/skchange/change_detectors/__init__.py @@ -1 +1,11 @@ """Change detection algorithms.""" + +from skchange.change_detectors.base import ChangeDetector +from skchange.change_detectors.moscore import Moscore +from skchange.change_detectors.pelt import Pelt +from skchange.change_detectors.seeded_binseg import SeededBinarySegmentation + +BASE_CHANGE_DETECTORS = [ChangeDetector] +CHANGE_DETECTORS = [Moscore, Pelt, SeededBinarySegmentation] + +__all__ = BASE_CHANGE_DETECTORS + CHANGE_DETECTORS diff --git a/skchange/change_detectors/base.py b/skchange/change_detectors/base.py new file mode 100644 index 00000000..d03d9a81 --- /dev/null +++ b/skchange/change_detectors/base.py @@ -0,0 +1,94 @@ +"""Base classes for changepoint detectors.""" + +import numpy as np +import pandas as pd + +from skchange.base import BaseDetector + + +class ChangeDetector(BaseDetector): + """Base class for changepoint detectors. + + Changepoint detectors detect points in time where a change in the data occurs. + Data between two changepoints is a segment where the data is considered to be + homogeneous, i.e., of the same distribution. A changepoint is defined as the + location of the last element of a segment. + + Output format of the predict method: See the dense_to_sparse method. + Output format of the transform method: See the sparse_to_dense method. + + Subclasses should set the following tags for sktime compatibility: + - task: "change_point_detection" + - learning_type: "unsupervised" or "supervised" + - And possibly other tags, such as + * "capability:missing_values": False, + * "capability:multivariate": True, + * "fit_is_empty": False, + + Needs to be implemented: + - _fit(self, X, y=None) -> self + - _predict(self, X) -> pd.Series + + Optional to implement: + - _score_transform(self, X) -> pd.Series + - _update(self, X, y=None) -> self + """ + + @staticmethod + def sparse_to_dense( + y_sparse: pd.Series, index: pd.Index, columns: pd.Index = None + ) -> pd.Series: + """Convert the sparse output from the predict method to a dense format. + + Parameters + ---------- + y_sparse : pd.DataFrame + The sparse output from a changepoint detector's predict method. + index : array-like + Indices that are to be annotated according to ``y_sparse``. + columns: array-like + Not used. Only for API compatibility. + + Returns + ------- + pd.Series with integer labels 0, ..., K for each segment between two + changepoints. + """ + changepoints = y_sparse.to_list() + n = len(index) + changepoints = [-1] + changepoints + [n - 1] + segment_labels = np.zeros(n) + for i in range(len(changepoints) - 1): + segment_labels[changepoints[i] + 1 : changepoints[i + 1] + 1] = i + + return pd.Series( + segment_labels, index=index, name="segment_label", dtype="int64" + ) + + @staticmethod + def dense_to_sparse(y_dense: pd.Series) -> pd.Series: + """Convert the dense output from the transform method to a sparse format. + + Parameters + ---------- + y_dense : pd.Series + The dense output from a changepoint detector's transform method. + + Returns + ------- + pd.Series of changepoint locations. Changepoints are defined as the last element + of a segment. + """ + y_dense = y_dense.reset_index(drop=True) + # changepoint = end of segment, so the label diffs > 0 must be shiftet by -1. + is_changepoint = np.roll(y_dense.diff().abs() > 0, -1) + changepoints = y_dense.index[is_changepoint] + return ChangeDetector._format_sparse_output(changepoints) + + @staticmethod + def _format_sparse_output(changepoints) -> pd.Series: + """Format the sparse output of changepoint detectors. + + Can be reused by subclasses to format the output of the _predict method. + """ + return pd.Series(changepoints, name="changepoint", dtype="int64") diff --git a/skchange/change_detectors/moscore.py b/skchange/change_detectors/moscore.py index ca501c7a..4c50313a 100644 --- a/skchange/change_detectors/moscore.py +++ b/skchange/change_detectors/moscore.py @@ -8,9 +8,8 @@ import numpy as np import pandas as pd from numba import njit -from sktime.annotation.base import BaseSeriesAnnotator -from skchange.change_detectors.utils import format_changepoint_output +from skchange.change_detectors.base import ChangeDetector from skchange.scores.score_factory import score_factory from skchange.utils.numba.general import where from skchange.utils.validation.data import check_data @@ -49,7 +48,7 @@ def moscore_transform( return scores -class Moscore(BaseSeriesAnnotator): +class Moscore(ChangeDetector): """Moving score algorithm for multiple changepoint detection. A generalized version of the MOSUM (moving sum) algorithm [1]_ for changepoint @@ -85,17 +84,6 @@ class Moscore(BaseSeriesAnnotator): min_detection_interval : int, optional (default=1) Minimum number of consecutive scores above the threshold to be considered a changepoint. Must be between 1 and `bandwidth`/2. - fmt : str {"dense", "sparse"}, optional (default="sparse") - Annotation output format: - * If "sparse", a sub-series of labels for only the outliers in X is returned, - * If "dense", a series of labels for all values in X is returned. - labels : str {"indicator", "score", "int_label"}, optional (default="int_label") - Annotation output labels: - * If "indicator", returned values are boolean, indicating whether a value is an - outlier, - * If "score", returned values are floats, giving the outlier score. - * If "int_label", returned values are integer, indicating which segment a value - belongs to. References ---------- @@ -125,22 +113,20 @@ def __init__( threshold_scale: Optional[float] = 2.0, level: float = 0.01, min_detection_interval: int = 1, - fmt: str = "sparse", - labels: str = "int_label", ): self.score = score self.bandwidth = bandwidth self.threshold_scale = threshold_scale # Just holds the input value. self.level = level self.min_detection_interval = min_detection_interval - super().__init__(fmt=fmt, labels=labels) + super().__init__() self.score_f, self.score_init_f = score_factory(self.score) check_larger_than(1, self.bandwidth, "bandwidth") check_larger_than(0, threshold_scale, "threshold_scale", allow_none=True) check_larger_than(0, self.level, "level") check_in_interval( - pd.Interval(1, self.bandwidth / 2 - 1, closed="both"), + pd.Interval(1, max(1, self.bandwidth / 2 - 1), closed="both"), self.min_detection_interval, "min_detection_interval", ) @@ -217,7 +203,7 @@ def _get_threshold(self, X: pd.DataFrame) -> float: n, p, self.bandwidth, self.level ) - def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): + def _fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None): """Fit to training data. Sets the threshold of the detector. @@ -234,7 +220,7 @@ def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): ---------- X : pd.DataFrame training data to fit the threshold to. - Y : pd.Series, optional + y : pd.Series, optional Does nothing. Only here to make the fit method compatible with sktime and scikit-learn. @@ -254,17 +240,18 @@ def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): self.threshold_ = self._get_threshold(X) return self - def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: - """Create annotations on test/deployment data. + def _score_transform(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: + """Return scores for predicted annotations on test/deployment data. Parameters ---------- - X : pd.DataFrame - data to annotate, time series + X : pd.DataFrame + Data to annotate, time series. Returns ------- - Y : pd.Series - annotations for sequence X - exact format depends on annotation type + y : pd.Series + Annotations for sequence X exact format depends on annotation type. """ X = check_data( X, @@ -277,13 +264,25 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: self.score_init_f, self.bandwidth, ) - self.changepoints = get_moscore_changepoints( - scores, self.threshold_, self.min_detection_interval - ) - self.scores = pd.Series(scores, index=X.index, name="score") - return format_changepoint_output( - self.fmt, self.labels, self.changepoints, X.index, self.scores + return pd.Series(scores, index=X.index, name="score") + + def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: + """Create annotations on test/deployment data. + + Parameters + ---------- + X : pd.DataFrame - data to annotate, time series + + Returns + ------- + y : pd.Series - annotations for sequence X + exact format depends on annotation type + """ + self.scores = self.score_transform(X) + changepoints = get_moscore_changepoints( + self.scores.values, self.threshold_, self.min_detection_interval ) + return ChangeDetector._format_sparse_output(changepoints) @classmethod def get_test_params(cls, parameter_set="default"): @@ -305,6 +304,7 @@ def get_test_params(cls, parameter_set="default"): `create_test_instance` uses the first (or only) dictionary in `params` """ params = [ - {"score": "mean", "bandwidth": 10}, + {"score": "mean", "bandwidth": 5}, + {"score": "meanvar", "bandwidth": 5}, ] return params diff --git a/skchange/change_detectors/pelt.py b/skchange/change_detectors/pelt.py index 5bc29238..479f1c77 100644 --- a/skchange/change_detectors/pelt.py +++ b/skchange/change_detectors/pelt.py @@ -9,9 +9,8 @@ import numpy as np import pandas as pd from numba import njit -from sktime.annotation.base import BaseSeriesAnnotator -from skchange.change_detectors.utils import format_changepoint_output +from skchange.change_detectors.base import ChangeDetector from skchange.costs.cost_factory import cost_factory from skchange.utils.validation.data import check_data from skchange.utils.validation.parameters import check_larger_than @@ -62,7 +61,7 @@ def run_pelt( return opt_cost[1:], get_changepoints(prev_cpts) -class Pelt(BaseSeriesAnnotator): +class Pelt(ChangeDetector): """Pruned exact linear time changepoint detection. An efficient implementation of the PELT algorithm [1]_ for changepoint detection. @@ -80,18 +79,6 @@ class Pelt(BaseSeriesAnnotator): input to .fit() (not supported yet). min_segment_length : int, optional (default=2) Minimum length of a segment. - fmt : str {"dense", "sparse"}, optional (default="sparse") - Annotation output format: - * If "sparse", a sub-series of labels for only the outliers in X is returned, - * If "dense", a series of labels for all values in X is returned. - labels : str {"indicator", "score", "int_label"}, optional (default="int_label") - Annotation output labels: - * If "indicator", returned values are boolean, indicating whether a value is an - outlier, - * If "score", returned values are floats, giving the outlier score. - * If "int_label", returned values are integer, indicating which segment a value - belongs to. - References ---------- @@ -120,13 +107,11 @@ def __init__( cost: Union[str, Callable] = "mean", penalty_scale: Optional[float] = 2.0, min_segment_length: int = 2, - fmt: str = "sparse", - labels: str = "int_label", ): self.cost = cost self.penalty_scale = penalty_scale self.min_segment_length = min_segment_length - super().__init__(fmt=fmt, labels=labels) + super().__init__() self.cost_func, self.cost_init_func = cost_factory(self.cost) check_larger_than(0, penalty_scale, "penalty_scale", allow_none=True) @@ -175,7 +160,11 @@ def _get_penalty(self, X: pd.DataFrame) -> float: p = X.shape[1] return self.penalty_scale * self.get_default_penalty(n, p) - def _fit(self, X: Union[pd.Series, pd.DataFrame], Y: Optional[pd.DataFrame] = None): + def _fit( + self, + X: Union[pd.Series, pd.DataFrame], + y: Optional[Union[pd.Series, pd.DataFrame]] = None, + ): """Fit to training data. Sets the penalty of the detector. @@ -190,7 +179,7 @@ def _fit(self, X: Union[pd.Series, pd.DataFrame], Y: Optional[pd.DataFrame] = No ---------- X : pd.DataFrame training data to fit the penalty to. - Y : pd.Series, optional + y : pd.Series, optional Does nothing. Only here to make the fit method compatible with sktime and scikit-learn. @@ -215,7 +204,7 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: Returns ------- - Y : pd.Series - annotations for sequence X + y : pd.Series - annotations for sequence X exact format depends on annotation type """ X = check_data( @@ -223,17 +212,35 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: min_length=2 * self.min_segment_length, min_length_name="2*min_segment_length", ) - opt_costs, self.changepoints = run_pelt( + opt_costs, changepoints = run_pelt( X.values, self.cost_func, self.cost_init_func, self.penalty_, self.min_segment_length, ) + # Store the scores for introspection without recomputing using score_transform self.scores = pd.Series(opt_costs, index=X.index, name="score") - return format_changepoint_output( - self.fmt, self.labels, self.changepoints, X.index, self.scores - ) + return ChangeDetector._format_sparse_output(changepoints) + + def _score_transform(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: + """Compute the pelt scores for the input data. + + Parameters + ---------- + X : pd.DataFrame - data to compute scores for, time series + + Returns + ------- + scores : pd.Series - scores for sequence X + + Notes + ----- + The PELT scores are the cumulative optimal costs, so the scores are increasing + and are not per observation scores. + """ + self.predict(X) + return self.scores @classmethod def get_test_params(cls, parameter_set="default"): @@ -256,5 +263,6 @@ def get_test_params(cls, parameter_set="default"): """ params = [ {"cost": "mean", "min_segment_length": 5}, + {"cost": "mean", "penalty_scale": 0.0, "min_segment_length": 1}, ] return params diff --git a/skchange/change_detectors/seeded_binseg.py b/skchange/change_detectors/seeded_binseg.py index bbbb6585..5d5b3312 100644 --- a/skchange/change_detectors/seeded_binseg.py +++ b/skchange/change_detectors/seeded_binseg.py @@ -8,9 +8,8 @@ import numpy as np import pandas as pd from numba import njit -from sktime.annotation.base import BaseSeriesAnnotator -from skchange.change_detectors.utils import format_changepoint_output +from skchange.change_detectors.base import ChangeDetector from skchange.scores.score_factory import score_factory from skchange.utils.validation.data import check_data from skchange.utils.validation.parameters import check_in_interval, check_larger_than @@ -95,7 +94,7 @@ def run_seeded_binseg( return cpts, amoc_scores, maximizers, starts, ends -class SeededBinarySegmentation(BaseSeriesAnnotator): +class SeededBinarySegmentation(ChangeDetector): """Seeded binary segmentation algorithm for multiple changepoint detection. Binary segmentation type changepoint detection algorithms recursively split the data @@ -138,17 +137,6 @@ class SeededBinarySegmentation(BaseSeriesAnnotator): starting at 'interval_len'='min_interval_length'. It also governs the amount of overlap between intervals of the same length, as the start of each interval is shifted by a factor of '1 + 1 / growth_factor'. Must be a float in (1, 2]. - fmt : str {"dense", "sparse"}, optional (default="sparse") - Annotation output format: - * If "sparse", a sub-series of labels for only the outliers in X is returned, - * If "dense", a series of labels for all values in X is returned. - labels : str {"indicator", "score", "int_label"}, optional (default="int_label") - Annotation output labels: - * If "indicator", returned values are boolean, indicating whether a value is an - outlier, - * If "score", returned values are floats, giving the outlier score. - * If "int_label", returned values are integer, indicating which segment a value - belongs to. References ---------- @@ -180,8 +168,6 @@ def __init__( min_segment_length: int = 5, max_interval_length: int = 200, growth_factor: float = 1.5, - fmt: str = "sparse", - labels: str = "int_label", ): self.score = score self.threshold_scale = threshold_scale # Just holds the input value. @@ -189,7 +175,7 @@ def __init__( self.min_segment_length = min_segment_length self.max_interval_length = max_interval_length self.growth_factor = growth_factor - super().__init__(fmt=fmt, labels=labels) + super().__init__() self.score_f, self.score_init_f = score_factory(self.score) check_larger_than(0.0, self.threshold_scale, "threshold_scale", allow_none=True) @@ -258,7 +244,7 @@ def _get_threshold(self, X: pd.DataFrame) -> float: p = X.shape[1] return self.threshold_scale * self.get_default_threshold(n, p) - def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): + def _fit(self, X: pd.DataFrame, y: Optional[pd.DataFrame] = None): """Fit to training data. Sets the threshold of the detector. @@ -273,7 +259,7 @@ def _fit(self, X: pd.DataFrame, Y: Optional[pd.DataFrame] = None): ---------- X : pd.DataFrame training data to fit the threshold to. - Y : pd.Series, optional + y : pd.Series, optional Does nothing. Only here to make the fit method compatible with sktime and scikit-learn. @@ -298,7 +284,7 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: Returns ------- - Y : pd.Series - annotations for sequence X + y : pd.Series - annotations for sequence X exact format depends on annotation type """ X = check_data( @@ -315,13 +301,10 @@ def _predict(self, X: Union[pd.DataFrame, pd.Series]) -> pd.Series: self.max_interval_length, self.growth_factor, ) - self.changepoints = cpts self.scores = pd.DataFrame( {"start": starts, "end": ends, "argmax_cpt": maximizers, "score": scores} ) - return format_changepoint_output( - self.fmt, self.labels, self.changepoints, X.index, self.scores - ) + return ChangeDetector._format_sparse_output(cpts) @classmethod def get_test_params(cls, parameter_set="default"): @@ -344,5 +327,6 @@ def get_test_params(cls, parameter_set="default"): """ params = [ {"score": "mean", "min_segment_length": 5, "max_interval_length": 100}, + {"score": "mean", "min_segment_length": 1, "max_interval_length": 20}, ] return params diff --git a/skchange/change_detectors/tests/test_change_detectors.py b/skchange/change_detectors/tests/test_change_detectors.py index 8e9c1411..c9ea7b07 100644 --- a/skchange/change_detectors/tests/test_change_detectors.py +++ b/skchange/change_detectors/tests/test_change_detectors.py @@ -1,110 +1,49 @@ """Basic tests for all change detectors.""" -import pandas as pd import pytest -from sktime.tests.test_switch import run_test_for_class -from sktime.utils._testing.annotation import make_annotation_problem -from skchange.change_detectors.moscore import Moscore -from skchange.change_detectors.pelt import Pelt -from skchange.change_detectors.seeded_binseg import SeededBinarySegmentation +from skchange.change_detectors import CHANGE_DETECTORS from skchange.datasets.generate import generate_teeth_data -change_detectors = [Moscore, Pelt, SeededBinarySegmentation] +n_segments = 2 +seg_len = 50 +changepoint_data = generate_teeth_data( + n_segments=n_segments, mean=10, segment_length=seg_len, p=1, random_state=2 +)[0] -@pytest.mark.parametrize("Estimator", change_detectors) -def test_output_type(Estimator): - """Test annotator output type.""" - estimator = Estimator.create_test_instance() - if not run_test_for_class(Estimator): - return None - - arg = make_annotation_problem( - n_timepoints=50, estimator_type=estimator.get_tag("distribution_type") - ) - estimator.fit(arg) - arg = make_annotation_problem( - n_timepoints=30, estimator_type=estimator.get_tag("distribution_type") - ) - y_pred = estimator.predict(arg) - assert isinstance(y_pred, (pd.DataFrame, pd.Series)) - - -@pytest.mark.parametrize("Estimator", change_detectors) -def test_change_detector_sparse_int(Estimator): - """Test sparse int_label segmentation.""" - n_segments = 2 - seg_len = 50 - df = generate_teeth_data( - n_segments=n_segments, mean=10, segment_length=seg_len, p=1, random_state=2 - ) +@pytest.mark.parametrize("Estimator", CHANGE_DETECTORS) +def test_change_detector_predict(Estimator): + """Test changepoint detector predict (sparse output).""" detector = Estimator.create_test_instance() - detector.set_params(fmt="sparse", labels="int_label") - changepoints = detector.fit_predict(df) + changepoints = detector.fit_predict(changepoint_data) assert len(changepoints) == n_segments - 1 and changepoints[0] == seg_len - 1 -@pytest.mark.parametrize("Estimator", change_detectors) -def test_change_detector_sparse_indicator(Estimator): - """Test sparse indicator segmentation.""" - n_segments = 2 - seg_len = 50 - df = generate_teeth_data( - n_segments=n_segments, mean=10, segment_length=seg_len, p=1, random_state=3 - ) +@pytest.mark.parametrize("Estimator", CHANGE_DETECTORS) +def test_change_detector_transform(Estimator): + """Test changepoint detector transform (dense output).""" detector = Estimator.create_test_instance() - detector.set_params(fmt="sparse", labels="indicator") - changepoints = detector.fit_predict(df) - assert len(changepoints) == n_segments - 1 and changepoints[0] == seg_len - 1 - - -@pytest.mark.parametrize("Estimator", change_detectors) -def test_change_detector_score(Estimator): - """Test sparse score segmentation.""" - n_segments = 2 - seg_len = 50 - df = generate_teeth_data( - n_segments=n_segments, mean=10, segment_length=seg_len, p=1, random_state=4 - ) - sparse_detector = Estimator.create_test_instance() - sparse_detector.set_params(fmt="sparse", labels="score") - dense_detector = Estimator.create_test_instance() - dense_detector.set_params(fmt="dense", labels="score") - sparse_scores = sparse_detector.fit_predict(df) - dense_scores = dense_detector.fit_predict(df) - assert (sparse_scores == dense_scores).all(axis=None) - if isinstance(sparse_scores, pd.DataFrame): - assert "score" in sparse_scores.columns - else: - assert sparse_scores.name == "score" + labels = detector.fit_transform(changepoint_data) + assert labels.nunique() == n_segments + assert labels[seg_len - 1] == 0.0 and labels[seg_len] == 1.0 -@pytest.mark.parametrize("Estimator", change_detectors) -def test_change_detector_dense_int(Estimator): - """Tests dense int_label segmentation.""" - n_segments = 2 - seg_len = 50 - df = generate_teeth_data( - n_segments=n_segments, mean=10, segment_length=seg_len, p=1, random_state=2 - ) +@pytest.mark.parametrize("Estimator", CHANGE_DETECTORS) +def test_change_detector_sparse_to_dense(Estimator): + """Test that predict + sparse_to_dense == transform.""" detector = Estimator.create_test_instance() - detector.set_params(fmt="dense", labels="int_label") - labels = detector.fit_predict(df) - assert labels.nunique() == n_segments - assert labels[seg_len - 1] == 0.0 and labels[seg_len] == 1.0 + changepoints = detector.fit_predict(changepoint_data) + labels = detector.sparse_to_dense(changepoints, changepoint_data.index) + labels_transform = detector.fit_transform(changepoint_data) + assert labels.equals(labels_transform) -@pytest.mark.parametrize("Estimator", change_detectors) -def test_change_detector_dense_indicator(Estimator): - """Tests dense indicator segmentation.""" - n_segments = 2 - seg_len = 50 - df = generate_teeth_data( - n_segments=n_segments, mean=10, segment_length=seg_len, p=1, random_state=8 - ) +@pytest.mark.parametrize("Estimator", CHANGE_DETECTORS) +def test_change_detector_dense_to_sparse(Estimator): + """Test that transform + dense_to_sparse == predict.""" detector = Estimator.create_test_instance() - detector.set_params(fmt="dense", labels="indicator") - cpt_indicator = detector.fit_predict(df) - assert cpt_indicator.sum() == n_segments - 1 - assert cpt_indicator[seg_len - 1] + labels = detector.fit_transform(changepoint_data) + changepoints = detector.dense_to_sparse(labels) + changepoints_predict = detector.fit_predict(changepoint_data) + assert changepoints.equals(changepoints_predict) diff --git a/skchange/change_detectors/tests/test_moscore.py b/skchange/change_detectors/tests/test_moscore.py index 7d4ecbab..f090403c 100644 --- a/skchange/change_detectors/tests/test_moscore.py +++ b/skchange/change_detectors/tests/test_moscore.py @@ -16,7 +16,7 @@ def test_moscore_changepoint(score): df = generate_teeth_data( n_segments=n_segments, mean=10, segment_length=seg_len, p=1, random_state=2 ) - detector = Moscore(score, fmt="sparse", labels="int_label") + detector = Moscore(score) changepoints = detector.fit_predict(df) assert len(changepoints) == n_segments - 1 and changepoints[0] == seg_len - 1 @@ -29,9 +29,10 @@ def test_moscore_scores(score): df = generate_teeth_data( n_segments=n_segments, mean=10, segment_length=seg_len, p=1, random_state=3 ) - detector = Moscore(score, fmt="dense", labels="score") - scores = detector.fit_predict(df) + detector = Moscore(score) + scores = detector.fit(df).score_transform(df) assert np.all(scores >= 0.0) + assert len(scores) == len(df) @pytest.mark.parametrize("score", VALID_CHANGE_SCORES) @@ -42,6 +43,6 @@ def test_moscore_tuning(score): df = generate_teeth_data( n_segments=n_segments, mean=10, segment_length=seg_len, p=1, random_state=4 ) - detector = Moscore(score, threshold_scale=None, fmt="dense", labels="indicator") + detector = Moscore(score, threshold_scale=None) detector.fit(df) assert detector.threshold_ > 0.0 diff --git a/skchange/change_detectors/tests/test_seeded_binseg.py b/skchange/change_detectors/tests/test_seeded_binseg.py index d4a49ff3..7bc69295 100644 --- a/skchange/change_detectors/tests/test_seeded_binseg.py +++ b/skchange/change_detectors/tests/test_seeded_binseg.py @@ -51,10 +51,7 @@ def test_binseg_tuning(score): df = generate_teeth_data( n_segments=n_segments, mean=10, segment_length=seg_len, p=1, random_state=4 ) - detector = SeededBinarySegmentation( - score, threshold_scale=None, fmt="dense", labels="score" - ) - detector.fit(df) - scores = detector.predict(df) - assert detector.threshold_ >= scores["score"].mean() - assert detector.threshold_ <= scores["score"].max() + detector = SeededBinarySegmentation(score, threshold_scale=None) + detector.fit_predict(df) + assert detector.threshold_ >= detector.scores["score"].mean() + assert detector.threshold_ <= detector.scores["score"].max() diff --git a/skchange/change_detectors/utils.py b/skchange/change_detectors/utils.py deleted file mode 100644 index ab667cea..00000000 --- a/skchange/change_detectors/utils.py +++ /dev/null @@ -1,72 +0,0 @@ -"""Utility functions for change detection.""" - -from typing import Union - -import numpy as np -import pandas as pd - - -def changepoints_to_labels(changepoints: list, n) -> np.ndarray: - """Convert a list of changepoints to a list of labels. - - Parameters - ---------- - changepoints : list - List of changepoint indices. - n: int - Sample size. - - Returns - ------- - labels : np.ndarray - 1D array of labels: 0 for the first segment, 1 for the second, etc. - """ - changepoints = [-1] + changepoints + [n - 1] - labels = np.zeros(n) - for i in range(len(changepoints) - 1): - labels[changepoints[i] + 1 : changepoints[i + 1] + 1] = i - return labels - - -def format_changepoint_output( - fmt: str, - labels: str, - changepoints: list, - X_index: pd.Index, - scores: Union[pd.Series, pd.DataFrame] = None, -) -> pd.Series: - """Format the predict method output of change detectors. - - Parameters - ---------- - fmt : str - Format of the output. Either "sparse" or "dense". - labels : str - Labels of the output. Either "indicator", "score" or "int_label". - changepoints : list - List of changepoint indices. - X_index : pd.Index - Index of the input data. - scores : pd.Series or pd.DataFrame, optional (default=None) - Series or DataFrame of scores. If Series, it must be named 'score', and if - DataFrame, it must have a column named 'score'. - - Returns - ------- - pd.Series - Either a sparse or dense pd.Series of boolean labels, integer labels or scores. - """ - if fmt == "sparse" and labels in ["int_label", "indicator"]: - out = pd.Series(changepoints, name="changepoints", dtype=int) - elif fmt == "dense" and labels == "int_label": - out = changepoints_to_labels(changepoints, len(X_index)) - out = pd.Series(out, index=X_index, name="int_label", dtype=int) - elif fmt == "dense" and labels == "indicator": - out = pd.Series(False, index=X_index, name="indicator", dtype=bool) - out.iloc[changepoints] = True - elif labels == "score": - # There is no sparse version of 'score'. - # The scores are formatted in each class' _predict method, as what is a good - # format for the scores is method dependent. - out = scores - return out diff --git a/skchange/tests/test_all_detectors.py b/skchange/tests/test_all_detectors.py index d053cd1d..3f99e4cb 100644 --- a/skchange/tests/test_all_detectors.py +++ b/skchange/tests/test_all_detectors.py @@ -1,19 +1,92 @@ """Tests for all annotators/detectors in skchange.""" +import pandas as pd import pytest -from sktime.tests.test_switch import run_test_for_class -from sktime.utils.estimator_checks import check_estimator +from sktime.utils._testing.annotation import make_annotation_problem +from sktime.utils.estimator_checks import check_estimator, parametrize_with_checks -from skchange.anomaly_detectors.tests.test_anomaly_detectors import anomaly_detectors -from skchange.change_detectors.tests.test_change_detectors import change_detectors +from skchange.anomaly_detectors import ANOMALY_DETECTORS +from skchange.base import BaseDetector +from skchange.change_detectors import CHANGE_DETECTORS +from skchange.datasets.generate import generate_anomalous_data -all_annotators = anomaly_detectors + change_detectors +ALL_DETECTORS = ANOMALY_DETECTORS + CHANGE_DETECTORS -@pytest.mark.parametrize("Estimator", all_annotators) -def test_sktime_annotator_compatibility(Estimator): - """Check compatibility with sktime annotator interface.""" - if not run_test_for_class(Estimator): - return None +@parametrize_with_checks(ALL_DETECTORS) +def test_sktime_compatible_estimators(obj, test_name): + check_estimator(obj, tests_to_run=test_name, raise_exceptions=True) - check_estimator(Estimator, raise_exceptions=True) + +@pytest.mark.parametrize("Detector", ALL_DETECTORS) +def test_detector_fit(Detector): + """Test fit method output.""" + detector = Detector.create_test_instance() + x = make_annotation_problem(n_timepoints=50, estimator_type="None") + fit_detector = detector.fit(x) + assert issubclass(detector.__class__, BaseDetector) + assert issubclass(fit_detector.__class__, BaseDetector) + assert isinstance(fit_detector, Detector) + + +@pytest.mark.parametrize("Detector", ALL_DETECTORS) +def test_detector_predict(Detector): + """Test predict method output.""" + detector = Detector.create_test_instance() + x = generate_anomalous_data(means=10, random_state=60) + y = detector.fit_predict(x) + assert isinstance(y, (pd.Series, pd.DataFrame)) + + +@pytest.mark.parametrize("Detector", ALL_DETECTORS) +def test_detector_transform(Detector): + """Test transform method output.""" + detector = Detector.create_test_instance() + x = generate_anomalous_data(means=10, random_state=61) + y = detector.fit_transform(x) + assert isinstance(y, (pd.Series, pd.DataFrame)) + assert len(x) == len(y) + + +@pytest.mark.parametrize("Detector", ALL_DETECTORS) +def test_detector_score_transform(Detector): + """Test score_transform method output.""" + detector = Detector.create_test_instance() + x = generate_anomalous_data(means=10, random_state=62) + try: + y = detector.fit(x).score_transform(x) + assert isinstance(y, (pd.Series, pd.DataFrame)) + except NotImplementedError: + pass + + +@pytest.mark.parametrize("Detector", ALL_DETECTORS) +def test_detector_update(Detector): + """Test update method output.""" + detector = Detector.create_test_instance() + x = make_annotation_problem(n_timepoints=30, estimator_type="None") + x_train = x[:20].to_frame() + x_next = x[20:].to_frame() + detector.fit(x_train) + detector.update_predict(x_next) + assert issubclass(detector.__class__, BaseDetector) + assert isinstance(detector, Detector) + + +def test_detector_not_implemented_methods(): + detector = BaseDetector() + x = make_annotation_problem(n_timepoints=20, estimator_type="None") + with pytest.raises(NotImplementedError): + detector.fit(x) + + detector._is_fitted = True # Required for the following functions to run + with pytest.raises(NotImplementedError): + detector.predict(x) + with pytest.raises(NotImplementedError): + detector.transform(x) + with pytest.raises(NotImplementedError): + detector.score_transform(x) + with pytest.raises(NotImplementedError): + detector.dense_to_sparse(x) + with pytest.raises(NotImplementedError): + detector.sparse_to_dense(x, x.index, pd.Index(["a"]))