Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

add AD task cache #337

Merged
merged 5 commits into from
Dec 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.INDEX_PRESSURE_SOFT_LIMIT,
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND,
AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE
);
return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public class MemoryTracker {

public enum Origin {
SINGLE_ENTITY_DETECTOR,
MULTI_ENTITY_DETECTOR
MULTI_ENTITY_DETECTOR,
HISTORICAL_SINGLE_ENTITY_DETECTOR,
}

// memory tracker for total consumption of bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ public LimitExceededException(String anomalyDetectorId, String message) {
super(anomalyDetectorId, message, true);
}

/**
* Constructor with error message.
*
* @param message explanation for the limit
*/
public LimitExceededException(String message) {
super(null, message, true);
}

/**
* Constructor with an anomaly detector ID and an explanation, and a flag for stopping.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,4 +321,18 @@ private AnomalyDetectorSettings() {}
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

// Maximum number of batch tasks running on one node.
// TODO: performance test and tune the setting.
public static final Setting<Integer> MAX_BATCH_TASK_PER_NODE = Setting
.intSetting(
"opendistro.anomaly_detection.max_batch_task_per_node",
2,
1,
100,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static int THRESHOLD_MODEL_TRAINING_SIZE = 1000;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.task;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.NUM_MIN_SAMPLES;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.NUM_SAMPLES_PER_TREE;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.NUM_TREES;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.THRESHOLD_MODEL_TRAINING_SIZE;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.TIME_DECAY;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.amazon.opendistroforelasticsearch.ad.ml.HybridThresholdingModel;
import com.amazon.opendistroforelasticsearch.ad.ml.ThresholdingModel;
import com.amazon.opendistroforelasticsearch.ad.model.ADTask;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.randomcutforest.RandomCutForest;

/**
* AD batch task cache which will hold RCF, threshold model, shingle and training data.
*/
public class ADBatchTaskCache {
kaituo marked this conversation as resolved.
Show resolved Hide resolved
private final String detectorId;
private RandomCutForest rcfModel;
weicongs-amazon marked this conversation as resolved.
Show resolved Hide resolved
private ThresholdingModel thresholdModel;
private boolean thresholdModelTrained;
private Deque<Map.Entry<Long, Optional<double[]>>> shingle;
private AtomicInteger thresholdModelTrainingDataSize = new AtomicInteger(0);
private double[] thresholdModelTrainingData;
private AtomicBoolean cancelled = new AtomicBoolean(false);
private AtomicLong cacheMemorySize = new AtomicLong(0);
private String cancelReason;
private String cancelledBy;
kaituo marked this conversation as resolved.
Show resolved Hide resolved

protected ADBatchTaskCache(ADTask adTask) {
this.detectorId = adTask.getDetectorId();

AnomalyDetector detector = adTask.getDetector();
rcfModel = RandomCutForest
.builder()
.dimensions(detector.getShingleSize() * detector.getEnabledFeatureIds().size())
.numberOfTrees(NUM_TREES)
.lambda(TIME_DECAY)
.sampleSize(NUM_SAMPLES_PER_TREE)
.outputAfter(NUM_MIN_SAMPLES)
.parallelExecutionEnabled(false)
.build();

this.thresholdModel = new HybridThresholdingModel(
AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE,
AnomalyDetectorSettings.THRESHOLD_MAX_RANK_ERROR,
AnomalyDetectorSettings.THRESHOLD_MAX_SCORE,
AnomalyDetectorSettings.THRESHOLD_NUM_LOGNORMAL_QUANTILES,
AnomalyDetectorSettings.THRESHOLD_DOWNSAMPLES,
AnomalyDetectorSettings.THRESHOLD_MAX_SAMPLES
);
this.thresholdModelTrainingData = new double[THRESHOLD_MODEL_TRAINING_SIZE];
this.thresholdModelTrained = false;
this.shingle = new ArrayDeque<>(detector.getShingleSize());
}

protected String getDetectorId() {
return detectorId;
}

protected RandomCutForest getRcfModel() {
return rcfModel;
}

protected Deque<Map.Entry<Long, Optional<double[]>>> getShingle() {
return shingle;
}

protected ThresholdingModel getThresholdModel() {
return thresholdModel;
}

protected void setThresholdModelTrained(boolean thresholdModelTrained) {
this.thresholdModelTrained = thresholdModelTrained;
}

protected boolean isThresholdModelTrained() {
return thresholdModelTrained;
}

protected double[] getThresholdModelTrainingData() {
return thresholdModelTrainingData;
}

protected void clearTrainingData() {
this.thresholdModelTrainingData = null;
this.thresholdModelTrainingDataSize.set(0);
}

public AtomicInteger getThresholdModelTrainingDataSize() {
return thresholdModelTrainingDataSize;
}

protected AtomicLong getCacheMemorySize() {
return cacheMemorySize;
}

protected boolean isCancelled() {
return cancelled.get();
}

protected String getCancelReason() {
return cancelReason;
}

protected String getCancelledBy() {
return cancelledBy;
}

protected void cancel(String reason, String userName) {
this.cancelled.compareAndSet(false, true);
this.cancelReason = reason;
this.cancelledBy = userName;
}
}
Loading