Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
validate detector only when start detector; fix flaky test case
Browse files Browse the repository at this point in the history
  • Loading branch information
ylwu-amzn committed Jan 29, 2021
1 parent b40e12a commit a532ead
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,13 @@ public void startDetector(
TransportService transportService,
ActionListener<AnomalyDetectorJobResponse> listener
) {
getDetector(
detectorId,
(detector) -> handler.startAnomalyDetectorJob(detector), // run realtime detector
(detector) -> {
getDetector(detectorId, (detector) -> {
if (validateDetector(detector, listener)) {
// run realtime detector
handler.startAnomalyDetectorJob(detector);
}
}, (detector) -> {
if (validateDetector(detector, listener)) {
// run historical detector
Optional<DiscoveryNode> owningNode = hashRing.getOwningNode(detector.getDetectorId());
if (!owningNode.isPresent()) {
Expand All @@ -194,9 +197,8 @@ public void startDetector(
return;
}
forwardToCoordinatingNode(detector, user, ADTaskAction.START, transportService, owningNode.get(), listener);
},
listener
);
}
}, listener);
}

/**
Expand Down Expand Up @@ -293,12 +295,6 @@ public <T> void getDetector(
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion());

String error = validateDetector(detector);
if (error != null) {
listener.onFailure(new ElasticsearchStatusException(error, RestStatus.BAD_REQUEST));
return;
}

if (detector.isRealTimeDetector()) {
// run realtime detector
realTimeDetectorConsumer.accept(detector);
Expand Down Expand Up @@ -595,14 +591,18 @@ public ADTaskProfile getLocalADTaskProfileByDetectorId(String detectorId) {
return adTaskProfile;
}

private String validateDetector(AnomalyDetector detector) {
private boolean validateDetector(AnomalyDetector detector, ActionListener<AnomalyDetectorJobResponse> listener) {
String error = null;
if (detector.getFeatureAttributes().size() == 0) {
return "Can't start detector job as no features configured";
error = "Can't start detector job as no features configured";
} else if (detector.getEnabledFeatureIds().size() == 0) {
error = "Can't start detector job as no enabled features configured";
}
if (detector.getEnabledFeatureIds().size() == 0) {
return "Can't start detector job as no enabled features configured";
if (error != null) {
listener.onFailure(new ElasticsearchStatusException(error, RestStatus.BAD_REQUEST));
return false;
}
return null;
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -304,7 +305,14 @@ public void testStopHistoricalDetector() throws IOException, InterruptedExceptio
assertNull(adTask.getUser());
AnomalyDetectorJobRequest request = stopDetectorJobRequest(adTask.getDetectorId());
client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000);
Thread.sleep(10000);
waitUntil(() -> {
try {
ADTask task = getADTask(adTask.getTaskId());
return !TestHelpers.historicalDetectorRunningStats.contains(task.getState());
} catch (IOException e) {
return false;
}
}, 20, TimeUnit.SECONDS);
ADTask stoppedTask = getADTask(adTask.getTaskId());
assertEquals(ADTaskState.STOPPED.name(), stoppedTask.getState());
assertEquals(0, getExecutingADTask());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.transport;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Before;

import com.amazon.opendistroforelasticsearch.ad.HistoricalDetectorIntegTestCase;
import com.amazon.opendistroforelasticsearch.ad.TestHelpers;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.Feature;
import com.google.common.collect.ImmutableList;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2)
public class DeleteAnomalyDetectorTransportActionTests extends HistoricalDetectorIntegTestCase {
private Instant startTime;
private Instant endTime;
private String type = "error";

@Override
@Before
public void setUp() throws Exception {
super.setUp();
startTime = Instant.now().minus(10, ChronoUnit.DAYS);
endTime = Instant.now();
ingestTestData(testIndex, startTime, detectionIntervalInMinutes, type, 2000);
createDetectorIndex();
}

public void testDeleteAnomalyDetectorWithoutFeature() throws IOException {
AnomalyDetector detector = TestHelpers.randomAnomalyDetector(null);
testDeleteDetector(detector);
}

public void testDeleteAnomalyDetectorWithoutEnabledFeature() throws IOException {
Feature feature = TestHelpers.randomFeature(false);
AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableList.of(feature));
testDeleteDetector(detector);
}

public void testDeleteAnomalyDetectorWithEnabledFeature() throws IOException {
Feature feature = TestHelpers.randomFeature(true);
AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableList.of(feature));
testDeleteDetector(detector);
}

private void testDeleteDetector(AnomalyDetector detector) throws IOException {
String detectorId = createDetector(detector);
DeleteAnomalyDetectorRequest request = new DeleteAnomalyDetectorRequest(detectorId);
DeleteResponse deleteResponse = client().execute(DeleteAnomalyDetectorAction.INSTANCE, request).actionGet(10000);
System.out.println(deleteResponse);
assertEquals("deleted", deleteResponse.getResult().getLowercase());
}
}

0 comments on commit a532ead

Please sign in to comment.