Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
return get feature failure;add todos
Browse files Browse the repository at this point in the history
  • Loading branch information
ylwu-amzn committed Jan 7, 2021
1 parent 0257a36 commit 7c9d4b9
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class CommonName {
public static final String CHECKPOINT_INDEX_NAME = ".opendistro-anomaly-checkpoints";
// index name for anomaly detection state. Will store AD task in this index as well.
public static final String DETECTION_STATE_INDEX = ".opendistro-anomaly-detection-state";
// TODO: move other index name here

// The alias of the index in which to write AD result history
public static final String ANOMALY_RESULT_INDEX_ALIAS = ".opendistro-anomaly-results";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void getCurrentFeatures(AnomalyDetector detector, long startTime, long en
listener.onFailure(new EndRunException(detector.getDetectorId(), CommonErrorMessages.INVALID_SEARCH_QUERY_MSG, e, true));
}
} else {
getProcessedFeatures(shingle, detector, endTime, listener);
listener.onResponse(getProcessedFeatures(shingle, detector, endTime));
}
}

Expand Down Expand Up @@ -220,26 +220,7 @@ private void updateUnprocessedFeatures(
.mapToObj(time -> featuresMap.getOrDefault(time, new SimpleImmutableEntry<>(time, Optional.empty())))
.forEach(e -> shingle.add(e));

getProcessedFeatures(shingle, detector, endTime, listener);
}

private void getProcessedFeatures(
Deque<Entry<Long, Optional<double[]>>> shingle,
AnomalyDetector detector,
long endTime,
ActionListener<SinglePointFeatures> listener
) {
int shingleSize = detector.getShingleSize();
Optional<double[]> currentPoint = shingle.peekLast().getValue();
listener
.onResponse(
new SinglePointFeatures(
currentPoint,
Optional
.ofNullable(currentPoint.isPresent() ? filterAndFill(shingle, endTime, detector) : null)
.map(points -> batchShingle(points, shingleSize)[0])
)
);
listener.onResponse(getProcessedFeatures(shingle, detector, endTime));
}

private double[][] filterAndFill(Deque<Entry<Long, Optional<double[]>>> shingle, long endTime, AnomalyDetector detector) {
Expand Down Expand Up @@ -708,11 +689,12 @@ public void getFeatureDataPointsByBatch(
) {
try {
searchFeatureDao.getFeaturesForPeriodByBatch(detector, startTime, endTime, ActionListener.wrap(points -> {
logger.info("features size: {}", points.size());
logger.debug("features size: {}", points.size());
listener.onResponse(points);
}, listener::onFailure));
} catch (Exception e) {
logger.error("Failed to get features for detector: " + detector.getDetectorId());
listener.onFailure(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void run(ADTask adTask, TransportService transportService, ActionListener
});

adTaskManager
.updateADTask(adTask.getTaskId(), updatedFields, ActionListener.wrap(r -> getNodeStats(adTask, ActionListener.wrap(node -> {
.updateADTask(adTask.getTaskId(), updatedFields, ActionListener.wrap(r -> dispatchTask(adTask, ActionListener.wrap(node -> {
if (clusterService.localNode().getId().equals(node.getId())) {
// Execute batch task locally
logger
Expand Down Expand Up @@ -219,7 +219,7 @@ public void run(ADTask adTask, TransportService transportService, ActionListener
}, e -> delegatedListener.onFailure(e))), e -> delegatedListener.onFailure(e)));
}

private void getNodeStats(ADTask adTask, ActionListener<DiscoveryNode> listener) {
private void dispatchTask(ADTask adTask, ActionListener<DiscoveryNode> listener) {
DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes();
ADStatsRequest adStatsRequest = new ADStatsRequest(dataNodes);
adStatsRequest.addAll(ImmutableSet.of(AD_EXECUTING_BATCH_TASK_COUNT.getName(), JVM_HEAP_USAGE.getName()));
Expand Down Expand Up @@ -251,31 +251,21 @@ private void getNodeStats(ADTask adTask, ActionListener<DiscoveryNode> listener)
listener.onFailure(new LimitExceededException(adTask.getDetectorId(), errorMessage));
return;
}
candidateNodeResponse = candidateNodeResponse
Optional<ADStatsNodeResponse> targetNode = candidateNodeResponse
.stream()
.sorted(
(ADStatsNodeResponse r1, ADStatsNodeResponse r2) -> ((Long) r1
.getStatsMap()
.get(AD_EXECUTING_BATCH_TASK_COUNT.getName()))
.compareTo((Long) r2.getStatsMap().get(AD_EXECUTING_BATCH_TASK_COUNT.getName()))
)
.collect(Collectors.toList());

if (candidateNodeResponse.size() == 1) {
listener.onResponse(candidateNodeResponse.get(0).getNode());
} else {
// if multiple nodes have same running task count, choose the one with least JVM heap usage.
Long minTaskCount = (Long) candidateNodeResponse.get(0).getStatsMap().get(AD_EXECUTING_BATCH_TASK_COUNT.getName());
Optional<ADStatsNodeResponse> first = candidateNodeResponse
.stream()
.filter(c -> minTaskCount.equals(c.getStatsMap().get(AD_EXECUTING_BATCH_TASK_COUNT.getName())))
.sorted(
(ADStatsNodeResponse r1, ADStatsNodeResponse r2) -> ((Long) r1.getStatsMap().get(JVM_HEAP_USAGE.getName()))
.compareTo((Long) r2.getStatsMap().get(JVM_HEAP_USAGE.getName()))
)
.findFirst();
listener.onResponse(first.get().getNode());
}
.sorted((ADStatsNodeResponse r1, ADStatsNodeResponse r2) -> {
int result = ((Long) r1.getStatsMap().get(AD_EXECUTING_BATCH_TASK_COUNT.getName()))
.compareTo((Long) r2.getStatsMap().get(AD_EXECUTING_BATCH_TASK_COUNT.getName()));
if (result == 0) {
// if multiple nodes have same running task count, choose the one with least
// JVM heap usage.
return ((Long) r1.getStatsMap().get(JVM_HEAP_USAGE.getName()))
.compareTo((Long) r2.getStatsMap().get(JVM_HEAP_USAGE.getName()));
}
return result;
})
.findFirst();
listener.onResponse(targetNode.get().getNode());
}, exception -> {
logger.error("Failed to get node's task stats", exception);
listener.onFailure(exception);
Expand Down Expand Up @@ -347,6 +337,7 @@ private void executeADBatchTask(ADTask adTask, ActionListener<String> internalLi

// start to run first piece
Instant executeStartTime = Instant.now();
// TODO: refactor to make the workflow more clear
runFirstPiece(adTask, executeStartTime, internalListener);
}

Expand Down Expand Up @@ -518,10 +509,12 @@ private void getFeatureData(
internalListener.onFailure(e);
}
}, exception -> {
logger.error("Fail to execute onFeatureResponseLocalRCF", exception);
logger.debug("Fail to get feature data by batch for this piece with end time: " + pieceEndTime);
// TODO: Exception may be caused by wrong feature query or some bad data. Differentiate these
// and skip current piece if error caused by bad data.
internalListener.onFailure(exception);
});
ThreadedActionListener threadedActionListener = new ThreadedActionListener<>(
ThreadedActionListener<Map<Long, Optional<double[]>>> threadedActionListener = new ThreadedActionListener<>(
logger,
threadPool,
AD_BATCH_TASK_THREAD_POOL_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ private void cleanOldAdTaskDocs(IndexResponse response, ADTask adTask, ActionLis
.from(maxAdTaskDocsPerDetector - 1)
.trackTotalHits(true)
.size(1);
String s = sourceBuilder.toString();
searchRequest.source(sourceBuilder).indices(CommonName.DETECTION_STATE_INDEX);
String detectorId = adTask.getDetectorId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ public static Optional<Long> getLatestDataTime(SearchResponse searchResponse) {
* @param xContentRegistry content registry
* @return search source builder
* @throws IOException throw IO exception if fail to parse feature aggregation
* @throws AnomalyDetectionException throw AD exception if no enabled feature
*/
public static SearchSourceBuilder batchFeatureQuery(
AnomalyDetector detector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void testCleanOldTaskDocs() throws IOException, InterruptedException {
response.set(r);
}, e -> { latch.countDown(); }));
latch.await();

Thread.sleep(10000);
count = countDocs(CommonName.DETECTION_STATE_INDEX);
// we have one latest task, so total count should add 1
assertEquals(maxOldAdTaskDocsPerDetector + 1, count);
Expand Down

0 comments on commit 7c9d4b9

Please sign in to comment.