Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
start historical detector
Browse files Browse the repository at this point in the history
  • Loading branch information
ylwu-amzn committed Jan 5, 2021
1 parent 21418a4 commit bb1967b
Show file tree
Hide file tree
Showing 50 changed files with 3,612 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.IndexStatusSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.ModelsOnNodeSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;
import com.amazon.opendistroforelasticsearch.ad.task.ADBatchTaskRunner;
import com.amazon.opendistroforelasticsearch.ad.task.ADTaskCacheManager;
import com.amazon.opendistroforelasticsearch.ad.task.ADTaskManager;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchAnomalyResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADBatchTaskRemoteExecutionTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADStatsNodesAction;
Expand Down Expand Up @@ -148,6 +155,7 @@
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ThresholdResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.AnomalyResultBulkIndexHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.DetectionStateHandler;
import com.amazon.opendistroforelasticsearch.ad.transport.handler.MultiEntityResultHandler;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
Expand All @@ -173,7 +181,9 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip

public static final String AD_BASE_URI = "/_opendistro/_anomaly_detection";
public static final String AD_BASE_DETECTORS_URI = AD_BASE_URI + "/detectors";
public static final String AD_THREAD_POOL_PREFIX = "opendistro.ad.";
public static final String AD_THREAD_POOL_NAME = "ad-threadpool";
public static final String AD_BATCH_TASK_THREAD_POOL_NAME = "ad-batch-task-threadpool";
public static final String AD_JOB_TYPE = "opendistro_anomaly_detector";
private static Gson gson;
private AnomalyDetectionIndices anomalyDetectionIndices;
Expand All @@ -186,6 +196,9 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private DetectionStateHandler detectorStateHandler;
private ADTaskCacheManager adTaskCacheManager;
private ADTaskManager adTaskManager;
private ADBatchTaskRunner adBatchTaskRunner;

static {
SpecialPermission.check();
Expand Down Expand Up @@ -473,7 +486,7 @@ public Collection<Object> createComponents(
client,
settings,
threadPool,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initDetectorStateIndex),
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initDetectionStateIndex),
anomalyDetectionIndices::doesDetectorStateIndexExist,
this.clientUtil,
this.indexUtils,
Expand All @@ -493,6 +506,35 @@ public Collection<Object> createComponents(
stateManager
);

adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
adTaskManager = new ADTaskManager(settings, clusterService, client, xContentRegistry, anomalyDetectionIndices);
AnomalyResultBulkIndexHandler anomalyResultBulkIndexHandler = new AnomalyResultBulkIndexHandler(
client,
settings,
threadPool,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
this.clientUtil,
this.indexUtils,
clusterService,
anomalyDetectionIndices
);
adBatchTaskRunner = new ADBatchTaskRunner(
settings,
threadPool,
clusterService,
client,
nodeFilter,
indexNameExpressionResolver,
adCircuitBreakerService,
featureManager,
adTaskManager,
anomalyDetectionIndices,
adStats,
anomalyResultBulkIndexHandler,
adTaskCacheManager
);

// return objects used by Guice to inject dependencies for e.g.,
// transport action handler constructors
return ImmutableList
Expand All @@ -517,7 +559,9 @@ public Collection<Object> createComponents(
multiEntityResultHandler,
checkpoint,
modelPartitioner,
cacheProvider
cacheProvider,
adTaskManager,
adBatchTaskRunner
);
}

Expand All @@ -532,14 +576,21 @@ protected Clock getClock() {

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return Collections
.singletonList(
return ImmutableList
.of(
new FixedExecutorBuilder(
settings,
AD_THREAD_POOL_NAME,
Math.max(1, EsExecutors.allocatedProcessors(settings) / 4),
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE,
"opendistro.ad." + AD_THREAD_POOL_NAME
AD_THREAD_POOL_PREFIX + AD_THREAD_POOL_NAME
),
new FixedExecutorBuilder(
settings,
AD_BATCH_TASK_THREAD_POOL_NAME,
Math.max(1, EsExecutors.allocatedProcessors(settings) / 8),
AnomalyDetectorSettings.AD_THEAD_POOL_QUEUE_SIZE,
AD_THREAD_POOL_PREFIX + AD_BATCH_TASK_THREAD_POOL_NAME
)
);
}
Expand Down Expand Up @@ -571,7 +622,10 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND,
AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE
AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE,
AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS,
AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR,
AnomalyDetectorSettings.BATCH_TASK_PIECE_SIZE
);
return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList()));
}
Expand Down Expand Up @@ -613,7 +667,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new ActionHandler<>(ADResultBulkAction.INSTANCE, ADResultBulkTransportAction.class),
new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class),
new ActionHandler<>(EntityProfileAction.INSTANCE, EntityProfileTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class)
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class),
new ActionHandler<>(ADBatchAnomalyResultAction.INSTANCE, ADBatchAnomalyResultTransportAction.class),
new ActionHandler<>(ADBatchTaskRemoteExecutionAction.INSTANCE, ADBatchTaskRemoteExecutionTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ private SearchRequest createLastSampleTimeRequest(String detectorId, long enable

SearchSourceBuilder source = new SearchSourceBuilder()
.query(boolQueryBuilder)
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX).field(AnomalyResult.EXECUTION_END_TIME_FIELD))
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(AnomalyResult.EXECUTION_END_TIME_FIELD))
.trackTotalHits(false)
.size(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
*/
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {

private static final short defaultThreshold = 85;
public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85;
private final JvmService jvmService;

public MemoryCircuitBreaker(JvmService jvmService) {
super(defaultThreshold);
super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD);
this.jvmService = jvmService;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.common.exception;

public class ADTaskCancelledException extends AnomalyDetectionException {
private String cancelledBy;

public ADTaskCancelledException(String msg, String user) {
super(msg);
this.cancelledBy = user;
this.countedInStats(false);
}

public String getCancelledBy() {
return cancelledBy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,17 @@ public LimitExceededException(String anomalyDetectorId, String message) {
* @param message explanation for the limit
*/
public LimitExceededException(String message) {
super(null, message, true);
super(message, true);
}

/**
* Constructor with error message.
*
* @param message explanation for the limit
* @param endRun end detector run or not
*/
public LimitExceededException(String message, boolean endRun) {
super(null, message, endRun);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,11 @@ public class CommonName {
// Query
// ======================================
// Used in finding the max timestamp
public static final String AGG_NAME_MAX = "max_timefield";
public static final String AGG_NAME_MAX_TIME = "max_timefield";
// Used in finding the min timestamp
public static final String AGG_NAME_MIN_TIME = "min_timefield";
// date histogram aggregation name
public static final String DATE_HISTOGRAM = "date_histogram";
// feature aggregation name
public static final String FEATURE_AGGS = "feature_aggs";
}
Original file line number Diff line number Diff line change
Expand Up @@ -699,4 +699,63 @@ public int getShingleSize(String detectorId) {
return -1;
}
}

public void getFeatureDataPoints(
AnomalyDetector detector,
long startTime,
long endTime,
ActionListener<Map<Long, Optional<double[]>>> listener
) {
try {
searchFeatureDao.getFeaturesForPeriodByBatch(detector, startTime, endTime, ActionListener.wrap(points -> {
logger.info("features size: {}", points.size());
listener.onResponse(points);
}, listener::onFailure));
} catch (Exception e) {
logger.error("Failed to get features for detector: " + detector.getDetectorId());
}
}

public SinglePointFeatures getShingledFeature(
AnomalyDetector detector,
Deque<Entry<Long, Optional<double[]>>> shingle,
Map<Long, Optional<double[]>> dataPoints,
long endTime
) {
long maxTimeDifference = detector.getDetectorIntervalInMilliseconds() / 2;
Map<Long, Entry<Long, Optional<double[]>>> featuresMap = getNearbyPointsForShingle(detector, shingle, endTime, maxTimeDifference)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
List<Entry<Long, Long>> missingRanges = getMissingRangesInShingle(detector, featuresMap, endTime);
missingRanges.stream().forEach(r -> {
if (dataPoints.containsKey(r.getKey())) {
featuresMap.put(r.getValue(), new SimpleImmutableEntry<>(r.getValue(), dataPoints.get(r.getKey())));
}
});
shingle.clear();

shingle.clear();
getFullShingleEndTimes(endTime, detector.getDetectorIntervalInMilliseconds(), detector.getShingleSize())
.mapToObj(time -> featuresMap.getOrDefault(time, new SimpleImmutableEntry<>(time, Optional.empty())))
.forEach(e -> shingle.add(e));

return getProcessedFeatures(shingle, detector, endTime);
}

private SinglePointFeatures getProcessedFeatures(
Deque<Entry<Long, Optional<double[]>>> shingle,
AnomalyDetector detector,
long endTime
) {
int shingleSize = detector.getShingleSize();
Optional<double[]> currentPoint = shingle.peekLast().getValue();
return new SinglePointFeatures(
currentPoint,
Optional
// if current point is not present or current shingle has more missing data points than
// max missing rate, will return null
.ofNullable(currentPoint.isPresent() ? filterAndFill(shingle, endTime, detector) : null)
.map(points -> batchShingle(points, shingleSize)[0])
);
}

}
Loading

0 comments on commit bb1967b

Please sign in to comment.