Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merging from master
Browse files Browse the repository at this point in the history
  • Loading branch information
saratvemulapalli committed Jan 5, 2021
2 parents 84e0b85 + 21418a4 commit 4213a23
Show file tree
Hide file tree
Showing 69 changed files with 4,908 additions and 479 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ Currently we just put RCF jar in lib as dependency. Plan to publish to Maven and

1. `./gradlew build` builds and tests
1. `./gradlew :run` launches a single node cluster with the AD (and job-scheduler) plugin installed
1. `./gradlew :integTest` launches a single node cluster with the AD (and job-scheduler) plugin installed and runs all integration tests
1. `./gradlew :integTest` launches a single node cluster with the AD (and job-scheduler) plugin installed and runs all integration tests except security
1. ` ./gradlew :integTest --tests="**.test execute foo"` runs a single integration test class or method
1. `./gradlew integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername="docker-cluster" -Dhttps=true -Duser=admin -Dpassword=admin` launches integration tests against a local cluster and run tests with security
1. `./gradlew spotlessApply` formats code. And/or import formatting rules in `.eclipseformat.xml` with IDE.

When launching a cluster using one of the above commands logs are placed in `/build/cluster/run node0/elasticsearch-<version>/logs`. Though the logs are teed to the console, in practices it's best to check the actual log file.
Expand Down
15 changes: 7 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ integTest {
}
}

if (System.getProperty("https") == null) {
filter {
excludeTestsMatching "com.amazon.opendistroforelasticsearch.ad.rest.SecureADRestIT"
}
}

// The 'doFirst' delays till execution time.
doFirst {
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can
Expand Down Expand Up @@ -275,7 +281,6 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException',
'com.amazon.opendistroforelasticsearch.ad.util.ClientUtil',

'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorTransportAction',
Expand All @@ -291,14 +296,8 @@ List<String> jacocoExclusions = [
'com.amazon.opendistroforelasticsearch.ad.transport.DeleteAnomalyDetectorTransportAction*',
'com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorTransportAction*',
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorTransportAction*',
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyResultTransportAction*',
'com.amazon.opendistroforelasticsearch.ad.transport.SearchAnomalyDetectorInfoTransportAction*',
'com.amazon.opendistroforelasticsearch.ad.transport.GetAnomalyDetectorRequest',
'com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorResponse',
'com.amazon.opendistroforelasticsearch.ad.transport.IndexAnomalyDetectorTransportAction',
'com.amazon.opendistroforelasticsearch.ad.transport.PreviewAnomalyDetectorTransportAction*',

// TODO: hc caused coverage to drop
'com.amazon.opendistroforelasticsearch.ad.NodeStateManager',
Expand Down Expand Up @@ -347,7 +346,7 @@ dependencies {
compile "org.elasticsearch:elasticsearch:${es_version}"
compileOnly "org.elasticsearch.plugin:elasticsearch-scripting-painless-spi:${versions.elasticsearch}"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.12.0.0"
compile "com.amazon.opendistroforelasticsearch:common-utils:1.12.0.0"
compile "com.amazon.opendistroforelasticsearch:common-utils:1.12.0.2"
compile group: 'com.google.guava', name: 'guava', version:'29.0-jre'
compile group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.6'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,12 @@ Compatible with Elasticsearch 7.10.0
* Fix for upgrading mapping ([#309](https://github.com/opendistro-for-elasticsearch/anomaly-detection/pull/309))
* fix double nan error when parse to json ([#310](https://github.com/opendistro-for-elasticsearch/anomaly-detection/pull/310))
* Fix issue where data hole exists for Preview API ([#312](https://github.com/opendistro-for-elasticsearch/anomaly-detection/pull/312))
* fix delete running detector bug ([#320](https://github.com/opendistro-for-elasticsearch/anomaly-detection/pull/320))
* fix detector and feature serialization ([#322](https://github.com/opendistro-for-elasticsearch/anomaly-detection/pull/322))
* Moving common-utils to 1.12.0.2 ([#323](https://github.com/opendistro-for-elasticsearch/anomaly-detection/pull/323))

### Infrastructure
* Add multi node integration testing into CI workflow ([#318](https://github.com/opendistro-for-elasticsearch/anomaly-detection/pull/318))

### Maintenance
* Support ES 7.10.0 ([#313](https://github.com/opendistro-for-elasticsearch/anomaly-detection/pull/313))
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,11 @@ public Collection<Object> createComponents(
new ADStat<>(true, new IndexStatusSupplier(indexUtils, DetectorInternalState.DETECTOR_STATE_INDEX))
)
.put(StatNames.DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
.put(StatNames.HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
.put(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.put(StatNames.AD_CANCELED_BATCH_TASK_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.put(StatNames.AD_TOTAL_BATCH_TASK_EXECUTION_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.put(StatNames.AD_BATCH_TASK_FAILURE_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.build();

adStats = new ADStats(indexUtils, modelManager, stats);
Expand Down Expand Up @@ -566,7 +571,8 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.INDEX_PRESSURE_SOFT_LIMIT,
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND
AnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND,
AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE
);
return unmodifiableList(Stream.concat(enabledSetting.stream(), systemSetting.stream()).collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public void profile(String detectorId, ActionListener<DetectorProfile> listener,
listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT));
return;
}

calculateTotalResponsesToWait(detectorId, profilesToCollect, listener);
}

Expand All @@ -118,10 +117,38 @@ private void calculateTotalResponsesToWait(
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, xContentParser.nextToken(), xContentParser);
AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId);

prepareProfile(detector, listener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
}
} else {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
}
}, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception))));
}

private void prepareProfile(
AnomalyDetector detector,
ActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profilesToCollect
) {
String detectorId = detector.getDetectorId();
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

boolean isMultiEntityDetector = detector.isMultientityDetector();

int totalResponsesToWait = 0;

if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
totalResponsesToWait++;
}
Expand Down Expand Up @@ -158,50 +185,19 @@ private void calculateTotalResponsesToWait(
new MultiResponsesDelegateActionListener<DetectorProfile>(
listener,
totalResponsesToWait,
"Fail to fetch profile for " + detectorId,
CommonErrorMessages.FAIL_FETCH_ERR_MSG + detectorId,
false
);

prepareProfile(detector, delegateListener, profilesToCollect);
} catch (Exception e) {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e));
}
} else {
listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId));
}
}, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception))));
}

private void prepareProfile(
AnomalyDetector detector,
MultiResponsesDelegateActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profilesToCollect
) {
String detectorId = detector.getDetectorId();
GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (getResponse != null && getResponse.isExists()) {
try (
XContentParser parser = XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
GetRequest getStateRequest = new GetRequest(DetectorInternalState.DETECTOR_STATE_INDEX, detectorId);
client.get(getStateRequest, onGetDetectorState(listener, detectorId, enabledTimeMs));
client.get(getStateRequest, onGetDetectorState(delegateListener, detectorId, enabledTimeMs));
}

boolean isMultiEntityDetector = detector.isMultientityDetector();

// total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide
// when to consolidate results and return to users
if (isMultiEntityDetector) {
if (profilesToCollect.contains(DetectorProfileName.TOTAL_ENTITIES)) {
profileEntityStats(listener, detector);
profileEntityStats(delegateListener, detector);
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
Expand All @@ -210,24 +206,24 @@ private void prepareProfile(
|| profilesToCollect.contains(DetectorProfileName.ACTIVE_ENTITIES)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)
|| profilesToCollect.contains(DetectorProfileName.STATE)) {
profileModels(detector, profilesToCollect, job, true, listener);
profileModels(detector, profilesToCollect, job, true, delegateListener);
}
} else {
if (profilesToCollect.contains(DetectorProfileName.STATE)
|| profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) {
profileStateRelated(detector, listener, job.isEnabled(), profilesToCollect);
profileStateRelated(detector, delegateListener, job.isEnabled(), profilesToCollect);
}
if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE)
|| profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE)
|| profilesToCollect.contains(DetectorProfileName.TOTAL_SIZE_IN_BYTES)
|| profilesToCollect.contains(DetectorProfileName.MODELS)) {
profileModels(detector, profilesToCollect, job, false, listener);
profileModels(detector, profilesToCollect, job, false, delegateListener);
}
}

} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
} catch (Exception e) {
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
onGetDetectorForPrepare(listener, profilesToCollect);
Expand Down Expand Up @@ -261,20 +257,19 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
DetectorProfile profile = profileBuilder.totalEntities(value).build();
listener.onResponse(profile);
}, searchException -> { listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId()); })
);
}, searchException -> {
logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId());
listener.onFailure(searchException);
}));
}
}

private void onGetDetectorForPrepare(
MultiResponsesDelegateActionListener<DetectorProfile> listener,
Set<DetectorProfileName> profiles
) {
private void onGetDetectorForPrepare(ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profiles) {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
if (profiles.contains(DetectorProfileName.STATE)) {
profileBuilder.state(DetectorState.DISABLED);
}
listener.respondImmediately(profileBuilder.build());
listener.onResponse(profileBuilder.build());
}

/**
Expand Down Expand Up @@ -340,8 +335,8 @@ private ActionListener<GetResponse> onGetDetectorState(
listener.onResponse(profileBuilder.build());

} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e);
listener.onFailure(e);
}
} else {
// detector state for this detector does not exist
Expand Down Expand Up @@ -463,8 +458,8 @@ private ActionListener<SearchResponse> onInittedEver(
processInitResponse(detector, profilesToCollect, totalUpdates, false, profileBuilder, listener);
} else {
createRunningStateAndInitProgress(profilesToCollect, profileBuilder);
listener.onResponse(profileBuilder.build());
}
listener.onResponse(profileBuilder.build());
}, exception -> {
if (exception instanceof IndexNotFoundException) {
// anomaly result index is not created yet
Expand All @@ -475,7 +470,7 @@ private ActionListener<SearchResponse> onInittedEver(
"Fail to find any anomaly result with anomaly score larger than 0 after AD job enabled time for detector {}",
detector.getDetectorId()
);
listener.failImmediately(new RuntimeException("Fail to find detector state: " + detector.getDetectorId(), exception));
listener.onFailure(exception);
}
});
}
Expand Down Expand Up @@ -523,7 +518,7 @@ private ActionListener<RCFPollingResponse> onPollRCFUpdates(
new ParameterizedMessage("Fail to get init progress through messaging for {}", detector.getDetectorId()),
exception
);
listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detector.getDetectorId(), exception);
listener.onFailure(exception);
}
});
}
Expand Down Expand Up @@ -558,7 +553,6 @@ private void processInitResponse(
} else {
long intervalMins = ((IntervalTimeConfiguration) detector.getDetectionInterval()).toDuration().toMinutes();
InitProgressProfile initProgress = computeInitProgressProfile(totalUpdates, intervalMins);

builder.initProgress(initProgress);
}
}
Expand Down
Loading

0 comments on commit 4213a23

Please sign in to comment.