diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java index 1f331bbe..90a74fa7 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/task/ADTaskManager.java @@ -181,10 +181,13 @@ public void startDetector( TransportService transportService, ActionListener listener ) { - getDetector( - detectorId, - (detector) -> handler.startAnomalyDetectorJob(detector), // run realtime detector - (detector) -> { + getDetector(detectorId, (detector) -> { + if (validateDetector(detector, listener)) { + // run realtime detector + handler.startAnomalyDetectorJob(detector); + } + }, (detector) -> { + if (validateDetector(detector, listener)) { // run historical detector Optional owningNode = hashRing.getOwningNode(detector.getDetectorId()); if (!owningNode.isPresent()) { @@ -194,9 +197,8 @@ public void startDetector( return; } forwardToCoordinatingNode(detector, user, ADTaskAction.START, transportService, owningNode.get(), listener); - }, - listener - ); + } + }, listener); } /** @@ -293,12 +295,6 @@ public void getDetector( ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion()); - String error = validateDetector(detector); - if (error != null) { - listener.onFailure(new ElasticsearchStatusException(error, RestStatus.BAD_REQUEST)); - return; - } - if (detector.isRealTimeDetector()) { // run realtime detector realTimeDetectorConsumer.accept(detector); @@ -595,14 +591,18 @@ public ADTaskProfile getLocalADTaskProfileByDetectorId(String detectorId) { return adTaskProfile; } - private String validateDetector(AnomalyDetector detector) { + private boolean validateDetector(AnomalyDetector detector, ActionListener listener) { + String error = null; if (detector.getFeatureAttributes().size() == 0) { - return "Can't start detector job as no features configured"; + error = "Can't start detector job as no features configured"; + } else if (detector.getEnabledFeatureIds().size() == 0) { + error = "Can't start detector job as no enabled features configured"; } - if (detector.getEnabledFeatureIds().size() == 0) { - return "Can't start detector job as no enabled features configured"; + if (error != null) { + listener.onFailure(new ElasticsearchStatusException(error, RestStatus.BAD_REQUEST)); + return false; } - return null; + return true; } /** diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java index bc1d9d6b..9a4ed94c 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -304,7 +305,14 @@ public void testStopHistoricalDetector() throws IOException, InterruptedExceptio assertNull(adTask.getUser()); AnomalyDetectorJobRequest request = stopDetectorJobRequest(adTask.getDetectorId()); client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); - Thread.sleep(10000); + waitUntil(() -> { + try { + ADTask task = getADTask(adTask.getTaskId()); + return !TestHelpers.historicalDetectorRunningStats.contains(task.getState()); + } catch (IOException e) { + return false; + } + }, 20, TimeUnit.SECONDS); ADTask stoppedTask = getADTask(adTask.getTaskId()); assertEquals(ADTaskState.STOPPED.name(), stoppedTask.getState()); assertEquals(0, getExecutingADTask()); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/DeleteAnomalyDetectorTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/DeleteAnomalyDetectorTransportActionTests.java new file mode 100644 index 00000000..bbeba934 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/DeleteAnomalyDetectorTransportActionTests.java @@ -0,0 +1,72 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Before; + +import com.amazon.opendistroforelasticsearch.ad.HistoricalDetectorIntegTestCase; +import com.amazon.opendistroforelasticsearch.ad.TestHelpers; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.Feature; +import com.google.common.collect.ImmutableList; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2) +public class DeleteAnomalyDetectorTransportActionTests extends HistoricalDetectorIntegTestCase { + private Instant startTime; + private Instant endTime; + private String type = "error"; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + startTime = Instant.now().minus(10, ChronoUnit.DAYS); + endTime = Instant.now(); + ingestTestData(testIndex, startTime, detectionIntervalInMinutes, type, 2000); + createDetectorIndex(); + } + + public void testDeleteAnomalyDetectorWithoutFeature() throws IOException { + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(null); + testDeleteDetector(detector); + } + + public void testDeleteAnomalyDetectorWithoutEnabledFeature() throws IOException { + Feature feature = TestHelpers.randomFeature(false); + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableList.of(feature)); + testDeleteDetector(detector); + } + + public void testDeleteAnomalyDetectorWithEnabledFeature() throws IOException { + Feature feature = TestHelpers.randomFeature(true); + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableList.of(feature)); + testDeleteDetector(detector); + } + + private void testDeleteDetector(AnomalyDetector detector) throws IOException { + String detectorId = createDetector(detector); + DeleteAnomalyDetectorRequest request = new DeleteAnomalyDetectorRequest(detectorId); + DeleteResponse deleteResponse = client().execute(DeleteAnomalyDetectorAction.INSTANCE, request).actionGet(10000); + System.out.println(deleteResponse); + assertEquals("deleted", deleteResponse.getResult().getLowercase()); + } +}