Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
start historical detector (#355)
Browse files Browse the repository at this point in the history
* start historical detector

* return get feature failure; add todos

* simplify shingle method for historical detector
  • Loading branch information
ylwu-amzn authored Jan 11, 2021
1 parent 76589fe commit 19b8f9d
Show file tree
Hide file tree
Showing 62 changed files with 3,791 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.IndexStatusSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.ModelsOnNodeSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;
import com.amazon.opendistroforelasticsearch.ad.task.ADBatchTaskRunner;
import com.amazon.opendistroforelasticsearch.ad.task.ADTaskCacheManager;
import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
Expand Down Expand Up @@ -151,6 +158,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultBulkIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectionStateHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.MultiEntityResultHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
Expand All @@ -176,7 +184,9 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip

public static final String AD_BASE_URI = "/_opendistro/_anomaly_detection";
public static final String AD_BASE_DETECTORS_URI = AD_BASE_URI + "/detectors";
public static final String AD_THREAD_POOL_PREFIX = "opendistro.ad.";
public static final String AD_THREAD_POOL_NAME = "ad-threadpool";
public static final String AD_BATCH_TASK_THREAD_POOL_NAME = "ad-batch-task-threadpool";
public static final String AD_JOB_TYPE = "opendistro_anomaly_detector";
private static Gson gson;
private AnomalyDetectionIndices anomalyDetectionIndices;
Expand All @@ -189,6 +199,9 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private DetectionStateHandler detectorStateHandler;
private ADTaskCacheManager adTaskCacheManager;
private ADTaskManager adTaskManager;
private ADBatchTaskRunner adBatchTaskRunner;

static {
SpecialPermission.check();
Expand Down Expand Up @@ -458,7 +471,7 @@ public Collection<Object> createComponents(
)
.put(
StatNames.ANOMALY_DETECTION_STATE_STATUS.getName(),
new ADStat<>(true, new IndexStatusSupplier(indexUtils, DetectorInternalState.DETECTOR_STATE_INDEX))
new ADStat<>(true, new IndexStatusSupplier(indexUtils, CommonName.DETECTION_STATE_INDEX))
)
.put(StatNames.DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
.put(StatNames.HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
Expand All @@ -474,7 +487,7 @@ public Collection<Object> createComponents(
client,
settings,
threadPool,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initDetectorStateIndex),
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initDetectionStateIndex),
anomalyDetectionIndices::doesDetectorStateIndexExist,
this.clientUtil,
this.indexUtils,
Expand All @@ -494,6 +507,35 @@ public Collection<Object> createComponents(
stateManager
);

adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
adTaskManager = new ADTaskManager(settings, clusterService, client, xContentRegistry, anomalyDetectionIndices);
AnomalyResultBulkIndexHandler anomalyResultBulkIndexHandler = new AnomalyResultBulkIndexHandler(
client,
settings,
threadPool,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
this.clientUtil,
this.indexUtils,
clusterService,
anomalyDetectionIndices
);
adBatchTaskRunner = new ADBatchTaskRunner(
settings,
threadPool,
clusterService,
client,
nodeFilter,
indexNameExpressionResolver,
adCircuitBreakerService,
featureManager,
adTaskManager,
anomalyDetectionIndices,
adStats,
anomalyResultBulkIndexHandler,
adTaskCacheManager
);

// return objects used by Guice to inject dependencies for e.g.,
// transport action handler constructors
return ImmutableList
Expand All @@ -518,7 +560,9 @@ public Collection<Object> createComponents(
multiEntityResultHandler,
checkpoint,
modelPartitioner,
cacheProvider
cacheProvider,
adTaskManager,
adBatchTaskRunner
);
}

Expand All @@ -533,14 +577,21 @@ protected Clock getClock() {

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return Collections
.singletonList(
return ImmutableList
.of(
new FixedExecutorBuilder(
settings,
AD_THREAD_POOL_NAME,
Math.max(1, EsExecutors.allocatedProcessors(settings) / 4),
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE,
"opendistro.ad." + AD_THREAD_POOL_NAME
AD_THREAD_POOL_PREFIX + AD_THREAD_POOL_NAME
),
new FixedExecutorBuilder(
settings,
AD_BATCH_TASK_THREAD_POOL_NAME,
Math.max(1, EsExecutors.allocatedProcessors(settings) / 8),
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE,
AD_THREAD_POOL_PREFIX + AD_BATCH_TASK_THREAD_POOL_NAME
)
);
}
Expand Down Expand Up @@ -572,7 +623,10 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND,
AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE
AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE,
AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS,
AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR,
AnomalyDetectorSettings.BATCH_TASK_PIECE_SIZE
);
return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList()));
}
Expand Down Expand Up @@ -615,7 +669,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class),
new ActionHandler<>(EntityProfileAction.INSTANCE, EntityProfileTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class),
new ActionHandler<>(PreviewAnomalyDetectorAction.INSTANCE, PreviewAnomalyDetectorTransportAction.class)
new ActionHandler<>(PreviewAnomalyDetectorAction.INSTANCE, PreviewAnomalyDetectorTransportAction.class),
new ActionHandler<>(ADBatchAnomalyResultAction.INSTANCE, ADBatchAnomalyResultTransportAction.class),
new ActionHandler<>(ADBatchTaskRemoteExecutionAction.INSTANCE, ADBatchTaskRemoteExecutionTransportAction.class)
);
}

Expand Down Expand Up @@ -652,7 +708,7 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
new SystemIndexDescriptor(AnomalyDetector.ANOMALY_DETECTORS_INDEX, "detector definition"),
new SystemIndexDescriptor(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, "detector job"),
new SystemIndexDescriptor(CommonName.CHECKPOINT_INDEX_NAME, "model checkpoint"),
new SystemIndexDescriptor(DetectorInternalState.DETECTOR_STATE_INDEX, "detector information like total rcf updates")
new SystemIndexDescriptor(CommonName.DETECTION_STATE_INDEX, "detector information like total rcf updates")
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private void prepareProfile(
false
);
if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
GetRequest getStateRequest = new GetRequest(DetectorInternalState.DETECTOR_STATE_INDEX, detectorId);
GetRequest getStateRequest = new GetRequest(CommonName.DETECTION_STATE_INDEX, detectorId);
client.get(getStateRequest, onGetDetectorState(delegateListener, detectorId, enabledTimeMs));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ private SearchRequest createLastSampleTimeRequest(String detectorId, long enable

SearchSourceBuilder source = new SearchSourceBuilder()
.query(boolQueryBuilder)
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX).field(AnomalyResult.EXECUTION_END_TIME_FIELD))
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(AnomalyResult.EXECUTION_END_TIME_FIELD))
.trackTotalHits(false)
.size(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
*/
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {

private static final short defaultThreshold = 85;
public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85;
private final JvmService jvmService;

public MemoryCircuitBreaker(JvmService jvmService) {
super(defaultThreshold);
super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD);
this.jvmService = jvmService;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.common.exception;

public class ADTaskCancelledException extends AnomalyDetectionException {
private String cancelledBy;

public ADTaskCancelledException(String msg, String user) {
super(msg);
this.cancelledBy = user;
this.countedInStats(false);
}

public String getCancelledBy() {
return cancelledBy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,17 @@ public LimitExceededException(String anomalyDetectorId, String message) {
* @param message explanation for the limit
*/
public LimitExceededException(String message) {
super(null, message, true);
super(message, true);
}

/**
* Constructor with error message.
*
* @param message explanation for the limit
* @param endRun end detector run or not
*/
public LimitExceededException(String message, boolean endRun) {
super(null, message, endRun);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class CommonName {
// ======================================
// index name for anomaly checkpoint of each model. One model one document.
public static final String CHECKPOINT_INDEX_NAME = ".opendistro-anomaly-checkpoints";
// index name for anomaly detection state. Will store AD task in this index as well.
public static final String DETECTION_STATE_INDEX = ".opendistro-anomaly-detection-state";
// TODO: move other index name here

// The alias of the index in which to write AD result history
public static final String ANOMALY_RESULT_INDEX_ALIAS = ".opendistro-anomaly-results";
Expand Down Expand Up @@ -83,5 +86,11 @@ public class CommonName {
// Query
// ======================================
// Used in finding the max timestamp
public static final String AGG_NAME_MAX = "max_timefield";
public static final String AGG_NAME_MAX_TIME = "max_timefield";
// Used in finding the min timestamp
public static final String AGG_NAME_MIN_TIME = "min_timefield";
// date histogram aggregation name
public static final String DATE_HISTOGRAM = "date_histogram";
// feature aggregation name
public static final String FEATURE_AGGS = "feature_aggs";
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.AbstractMap;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -172,7 +173,7 @@ public void getCurrentFeatures(AnomalyDetector detector, long startTime, long en
listener.onFailure(new EndRunException(detector.getDetectorId(), CommonErrorMessages.INVALID_SEARCH_QUERY_MSG, e, true));
}
} else {
getProcessedFeatures(shingle, detector, endTime, listener);
listener.onResponse(getProcessedFeatures(shingle, detector, endTime));
}
}

Expand Down Expand Up @@ -220,26 +221,7 @@ private void updateUnprocessedFeatures(
.mapToObj(time -> featuresMap.getOrDefault(time, new SimpleImmutableEntry<>(time, Optional.empty())))
.forEach(e -> shingle.add(e));

getProcessedFeatures(shingle, detector, endTime, listener);
}

private void getProcessedFeatures(
Deque<Entry<Long, Optional<double[]>>> shingle,
AnomalyDetector detector,
long endTime,
ActionListener<SinglePointFeatures> listener
) {
int shingleSize = detector.getShingleSize();
Optional<double[]> currentPoint = shingle.peekLast().getValue();
listener
.onResponse(
new SinglePointFeatures(
currentPoint,
Optional
.ofNullable(currentPoint.isPresent() ? filterAndFill(shingle, endTime, detector) : null)
.map(points -> batchShingle(points, shingleSize)[0])
)
);
listener.onResponse(getProcessedFeatures(shingle, detector, endTime));
}

private double[][] filterAndFill(Deque<Entry<Long, Optional<double[]>>> shingle, long endTime, AnomalyDetector detector) {
Expand Down Expand Up @@ -699,4 +681,53 @@ public int getShingleSize(String detectorId) {
return -1;
}
}

public void getFeatureDataPointsByBatch(
AnomalyDetector detector,
long startTime,
long endTime,
ActionListener<Map<Long, Optional<double[]>>> listener
) {
try {
searchFeatureDao.getFeaturesForPeriodByBatch(detector, startTime, endTime, ActionListener.wrap(points -> {
logger.debug("features size: {}", points.size());
listener.onResponse(points);
}, listener::onFailure));
} catch (Exception e) {
logger.error("Failed to get features for detector: " + detector.getDetectorId());
listener.onFailure(e);
}
}

public SinglePointFeatures getShingledFeatureForHistoricalDetector(
AnomalyDetector detector,
Deque<Entry<Long, Optional<double[]>>> shingle,
Optional<double[]> dataPoint,
long endTime
) {
while (shingle.size() >= detector.getShingleSize()) {
shingle.poll();
}
shingle.add(new AbstractMap.SimpleEntry<>(endTime, dataPoint));

return getProcessedFeatures(shingle, detector, endTime);
}

private SinglePointFeatures getProcessedFeatures(
Deque<Entry<Long, Optional<double[]>>> shingle,
AnomalyDetector detector,
long endTime
) {
int shingleSize = detector.getShingleSize();
Optional<double[]> currentPoint = shingle.peekLast().getValue();
return new SinglePointFeatures(
currentPoint,
Optional
// if current point is not present or current shingle has more missing data points than
// max missing rate, will return null
.ofNullable(currentPoint.isPresent() ? filterAndFill(shingle, endTime, detector) : null)
.map(points -> batchShingle(points, shingleSize)[0])
);
}

}
Loading

0 comments on commit 19b8f9d

Please sign in to comment.