From 7c9d4b9e9dcf71f6b175570c5d38067e91c14bf3 Mon Sep 17 00:00:00 2001 From: Yaliang Wu Date: Thu, 7 Jan 2021 05:46:01 +0000 Subject: [PATCH] return get feature failure;add todos --- .../ad/constant/CommonName.java | 1 + .../ad/feature/FeatureManager.java | 26 ++-------- .../ad/task/ADBatchTaskRunner.java | 49 ++++++++----------- .../ad/task/ADTaskManager.java | 1 - .../ad/util/ParseUtils.java | 1 + ...nomalyDetectorJobTransportActionTests.java | 2 +- 6 files changed, 28 insertions(+), 52 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java index 9c022a59..b57da50f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonName.java @@ -23,6 +23,7 @@ public class CommonName { public static final String CHECKPOINT_INDEX_NAME = ".opendistro-anomaly-checkpoints"; // index name for anomaly detection state. Will store AD task in this index as well. public static final String DETECTION_STATE_INDEX = ".opendistro-anomaly-detection-state"; + // TODO: move other index name here // The alias of the index in which to write AD result history public static final String ANOMALY_RESULT_INDEX_ALIAS = ".opendistro-anomaly-results"; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java index 3b4f8dca..0686d83f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/feature/FeatureManager.java @@ -172,7 +172,7 @@ public void getCurrentFeatures(AnomalyDetector detector, long startTime, long en listener.onFailure(new EndRunException(detector.getDetectorId(), CommonErrorMessages.INVALID_SEARCH_QUERY_MSG, e, true)); } } else { - getProcessedFeatures(shingle, detector, endTime, listener); + listener.onResponse(getProcessedFeatures(shingle, detector, endTime)); } } @@ -220,26 +220,7 @@ private void updateUnprocessedFeatures( .mapToObj(time -> featuresMap.getOrDefault(time, new SimpleImmutableEntry<>(time, Optional.empty()))) .forEach(e -> shingle.add(e)); - getProcessedFeatures(shingle, detector, endTime, listener); - } - - private void getProcessedFeatures( - Deque>> shingle, - AnomalyDetector detector, - long endTime, - ActionListener listener - ) { - int shingleSize = detector.getShingleSize(); - Optional currentPoint = shingle.peekLast().getValue(); - listener - .onResponse( - new SinglePointFeatures( - currentPoint, - Optional - .ofNullable(currentPoint.isPresent() ? filterAndFill(shingle, endTime, detector) : null) - .map(points -> batchShingle(points, shingleSize)[0]) - ) - ); + listener.onResponse(getProcessedFeatures(shingle, detector, endTime)); } private double[][] filterAndFill(Deque>> shingle, long endTime, AnomalyDetector detector) { @@ -708,11 +689,12 @@ public void getFeatureDataPointsByBatch( ) { try { searchFeatureDao.getFeaturesForPeriodByBatch(detector, startTime, endTime, ActionListener.wrap(points -> { - logger.info("features size: {}", points.size()); + logger.debug("features size: {}", points.size()); listener.onResponse(points); }, listener::onFailure)); } catch (Exception e) { logger.error("Failed to get features for detector: " + detector.getDetectorId()); + listener.onFailure(e); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADBatchTaskRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADBatchTaskRunner.java index ff353cd8..0fc18fd1 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADBatchTaskRunner.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADBatchTaskRunner.java @@ -187,7 +187,7 @@ public void run(ADTask adTask, TransportService transportService, ActionListener }); adTaskManager - .updateADTask(adTask.getTaskId(), updatedFields, ActionListener.wrap(r -> getNodeStats(adTask, ActionListener.wrap(node -> { + .updateADTask(adTask.getTaskId(), updatedFields, ActionListener.wrap(r -> dispatchTask(adTask, ActionListener.wrap(node -> { if (clusterService.localNode().getId().equals(node.getId())) { // Execute batch task locally logger @@ -219,7 +219,7 @@ public void run(ADTask adTask, TransportService transportService, ActionListener }, e -> delegatedListener.onFailure(e))), e -> delegatedListener.onFailure(e))); } - private void getNodeStats(ADTask adTask, ActionListener listener) { + private void dispatchTask(ADTask adTask, ActionListener listener) { DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes(); ADStatsRequest adStatsRequest = new ADStatsRequest(dataNodes); adStatsRequest.addAll(ImmutableSet.of(AD_EXECUTING_BATCH_TASK_COUNT.getName(), JVM_HEAP_USAGE.getName())); @@ -251,31 +251,21 @@ private void getNodeStats(ADTask adTask, ActionListener listener) listener.onFailure(new LimitExceededException(adTask.getDetectorId(), errorMessage)); return; } - candidateNodeResponse = candidateNodeResponse + Optional targetNode = candidateNodeResponse .stream() - .sorted( - (ADStatsNodeResponse r1, ADStatsNodeResponse r2) -> ((Long) r1 - .getStatsMap() - .get(AD_EXECUTING_BATCH_TASK_COUNT.getName())) - .compareTo((Long) r2.getStatsMap().get(AD_EXECUTING_BATCH_TASK_COUNT.getName())) - ) - .collect(Collectors.toList()); - - if (candidateNodeResponse.size() == 1) { - listener.onResponse(candidateNodeResponse.get(0).getNode()); - } else { - // if multiple nodes have same running task count, choose the one with least JVM heap usage. - Long minTaskCount = (Long) candidateNodeResponse.get(0).getStatsMap().get(AD_EXECUTING_BATCH_TASK_COUNT.getName()); - Optional first = candidateNodeResponse - .stream() - .filter(c -> minTaskCount.equals(c.getStatsMap().get(AD_EXECUTING_BATCH_TASK_COUNT.getName()))) - .sorted( - (ADStatsNodeResponse r1, ADStatsNodeResponse r2) -> ((Long) r1.getStatsMap().get(JVM_HEAP_USAGE.getName())) - .compareTo((Long) r2.getStatsMap().get(JVM_HEAP_USAGE.getName())) - ) - .findFirst(); - listener.onResponse(first.get().getNode()); - } + .sorted((ADStatsNodeResponse r1, ADStatsNodeResponse r2) -> { + int result = ((Long) r1.getStatsMap().get(AD_EXECUTING_BATCH_TASK_COUNT.getName())) + .compareTo((Long) r2.getStatsMap().get(AD_EXECUTING_BATCH_TASK_COUNT.getName())); + if (result == 0) { + // if multiple nodes have same running task count, choose the one with least + // JVM heap usage. + return ((Long) r1.getStatsMap().get(JVM_HEAP_USAGE.getName())) + .compareTo((Long) r2.getStatsMap().get(JVM_HEAP_USAGE.getName())); + } + return result; + }) + .findFirst(); + listener.onResponse(targetNode.get().getNode()); }, exception -> { logger.error("Failed to get node's task stats", exception); listener.onFailure(exception); @@ -347,6 +337,7 @@ private void executeADBatchTask(ADTask adTask, ActionListener internalLi // start to run first piece Instant executeStartTime = Instant.now(); + // TODO: refactor to make the workflow more clear runFirstPiece(adTask, executeStartTime, internalListener); } @@ -518,10 +509,12 @@ private void getFeatureData( internalListener.onFailure(e); } }, exception -> { - logger.error("Fail to execute onFeatureResponseLocalRCF", exception); + logger.debug("Fail to get feature data by batch for this piece with end time: " + pieceEndTime); + // TODO: Exception may be caused by wrong feature query or some bad data. Differentiate these + // and skip current piece if error caused by bad data. internalListener.onFailure(exception); }); - ThreadedActionListener threadedActionListener = new ThreadedActionListener<>( + ThreadedActionListener>> threadedActionListener = new ThreadedActionListener<>( logger, threadPool, AD_BATCH_TASK_THREAD_POOL_NAME, diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java index 9417b40d..1391bdac 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java @@ -335,7 +335,6 @@ private void cleanOldAdTaskDocs(IndexResponse response, ADTask adTask, ActionLis .from(maxAdTaskDocsPerDetector - 1) .trackTotalHits(true) .size(1); - String s = sourceBuilder.toString(); searchRequest.source(sourceBuilder).indices(CommonName.DETECTION_STATE_INDEX); String detectorId = adTask.getDetectorId(); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ParseUtils.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ParseUtils.java index 5da90f3a..33facdce 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ParseUtils.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ParseUtils.java @@ -616,6 +616,7 @@ public static Optional getLatestDataTime(SearchResponse searchResponse) { * @param xContentRegistry content registry * @return search source builder * @throws IOException throw IO exception if fail to parse feature aggregation + * @throws AnomalyDetectionException throw AD exception if no enabled feature */ public static SearchSourceBuilder batchFeatureQuery( AnomalyDetector detector, diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java index 87865874..572b1ff3 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java @@ -181,7 +181,7 @@ public void testCleanOldTaskDocs() throws IOException, InterruptedException { response.set(r); }, e -> { latch.countDown(); })); latch.await(); - + Thread.sleep(10000); count = countDocs(CommonName.DETECTION_STATE_INDEX); // we have one latest task, so total count should add 1 assertEquals(maxOldAdTaskDocsPerDetector + 1, count);