Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
add ad task stats (#332)
Browse files Browse the repository at this point in the history
* add ad task stats

* change historical detector stats name to historical single entity detector;change getAdStatsResponse as protected method

* move jvm heap stats to internal stats
  • Loading branch information
ylwu-amzn authored Dec 17, 2020
1 parent 9c3b972 commit e0dcaec
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ public Collection<Object> createComponents(
new ADStat<>(true, new IndexStatusSupplier(indexUtils, DetectorInternalState.DETECTOR_STATE_INDEX))
)
.put(StatNames.DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
.put(StatNames.HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT.getName(), new ADStat<>(true, new SettableSupplier()))
.put(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.put(StatNames.AD_CANCELED_BATCH_TASK_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.put(StatNames.AD_TOTAL_BATCH_TASK_EXECUTION_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.put(StatNames.AD_BATCH_TASK_FAILURE_COUNT.getName(), new ADStat<>(false, new CounterSupplier()))
.build();

adStats = new ADStats(indexUtils, modelManager, stats);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.stats;

/**
* Enum containing names of all internal stats which will not be returned
* in AD stats REST API.
*/
public enum InternalStatNames {
JVM_HEAP_USAGE("jvm_heap_usage");

private String name;

InternalStatNames(String name) {
this.name = name;
}

/**
* Get internal stat name
*
* @return name
*/
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import java.util.Set;

/**
* Enum containing names of all stats
* Enum containing names of all external stats which will be returned in
* AD stats REST API.
*/
public enum StatNames {
AD_EXECUTE_REQUEST_COUNT("ad_execute_request_count"),
Expand All @@ -32,7 +33,12 @@ public enum StatNames {
MODELS_CHECKPOINT_INDEX_STATUS("models_checkpoint_index_status"),
ANOMALY_DETECTION_JOB_INDEX_STATUS("anomaly_detection_job_index_status"),
ANOMALY_DETECTION_STATE_STATUS("anomaly_detection_state_status"),
MODEL_INFORMATION("models");
MODEL_INFORMATION("models"),
HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT("historical_single_entity_detector_count"),
AD_EXECUTING_BATCH_TASK_COUNT("ad_executing_batch_task_count"),
AD_CANCELED_BATCH_TASK_COUNT("ad_canceled_batch_task_count"),
AD_TOTAL_BATCH_TASK_EXECUTION_COUNT("ad_total_batch_task_execution_count"),
AD_BATCH_TASK_FAILURE_COUNT("ad_batch_task_failure_count");

private String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.monitor.jvm.JvmService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.InternalStatNames;

/**
* ADStatsNodesTransportAction contains the logic to extract the stats from the nodes
Expand All @@ -39,6 +41,7 @@ public class ADStatsNodesTransportAction extends
TransportNodesAction<ADStatsRequest, ADStatsNodesResponse, ADStatsNodeRequest, ADStatsNodeResponse> {

private ADStats adStats;
private final JvmService jvmService;

/**
* Constructor
Expand All @@ -55,7 +58,8 @@ public ADStatsNodesTransportAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
ADStats adStats
ADStats adStats,
JvmService jvmService
) {
super(
ADStatsNodesAction.NAME,
Expand All @@ -69,6 +73,7 @@ public ADStatsNodesTransportAction(
ADStatsNodeResponse.class
);
this.adStats = adStats;
this.jvmService = jvmService;
}

@Override
Expand Down Expand Up @@ -99,6 +104,11 @@ private ADStatsNodeResponse createADStatsNodeResponse(ADStatsRequest adStatsRequ
Map<String, Object> statValues = new HashMap<>();
Set<String> statsToBeRetrieved = adStatsRequest.getStatsToBeRetrieved();

if (statsToBeRetrieved.contains(InternalStatNames.JVM_HEAP_USAGE.getName())) {
long heapUsedPercent = jvmService.stats().getMem().getHeapUsedPercent();
statValues.put(InternalStatNames.JVM_HEAP_USAGE.getName(), heapUsedPercent);
}

for (String statName : adStats.getNodeStats().keySet()) {
if (statsToBeRetrieved.contains(statName)) {
statValues.put(statName, adStats.getStats().get(statName).getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
adStatsResponse.toXContent(builder, params);
return builder;
}

protected ADStatsResponse getAdStatsResponse() {
return adStatsResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.ad.transport;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -31,16 +32,22 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorType;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStatsResponse;
import com.amazon.opendistroforelasticsearch.ad.stats.StatNames;
import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener;

public class StatsAnomalyDetectorTransportAction extends HandledTransportAction<ADStatsRequest, StatsAnomalyDetectorResponse> {
public static final String DETECTOR_TYPE_AGG = "detector_type_agg";
private final Logger logger = LogManager.getLogger(StatsAnomalyDetectorTransportAction.class);

private final Client client;
Expand Down Expand Up @@ -120,23 +127,36 @@ private void getClusterStats(
ADStatsRequest adStatsRequest
) {
ADStatsResponse adStatsResponse = new ADStatsResponse();
if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())) {
if (clusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) {
final SearchRequest request = client
.prepareSearch(AnomalyDetector.ANOMALY_DETECTORS_INDEX)
.setSize(0)
.setTrackTotalHits(true)
.request();
client.search(request, ActionListener.wrap(indicesStatsResponse -> {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(indicesStatsResponse.getHits().getTotalHits().value);
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}, e -> listener.onFailure(new RuntimeException("Failed to get AD cluster stats", e))));
} else {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(0L);
if ((adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())
|| adStatsRequest.getStatsToBeRetrieved().contains(StatNames.HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT.getName()))
&& clusterService.state().getRoutingTable().hasIndex(AnomalyDetector.ANOMALY_DETECTORS_INDEX)) {

TermsAggregationBuilder termsAgg = AggregationBuilders.terms(DETECTOR_TYPE_AGG).field(AnomalyDetector.DETECTOR_TYPE_FIELD);
SearchRequest request = new SearchRequest()
.indices(AnomalyDetector.ANOMALY_DETECTORS_INDEX)
.source(new SearchSourceBuilder().aggregation(termsAgg).size(0).trackTotalHits(true));

client.search(request, ActionListener.wrap(r -> {
StringTerms aggregation = r.getAggregations().get(DETECTOR_TYPE_AGG);
List<StringTerms.Bucket> buckets = aggregation.getBuckets();
long totalDetectors = r.getHits().getTotalHits().value;
long totalHistoricalSingleEntityDetectors = 0;
for (StringTerms.Bucket b : buckets) {
if (AnomalyDetectorType.HISTORICAL_SINGLE_ENTITY.name().equals(b.getKeyAsString())) {
totalHistoricalSingleEntityDetectors += b.getDocCount();
}
}
if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.DETECTOR_COUNT.getName())) {
adStats.getStat(StatNames.DETECTOR_COUNT.getName()).setValue(totalDetectors);
}
if (adStatsRequest.getStatsToBeRetrieved().contains(StatNames.HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT.getName())) {
adStats
.getStat(StatNames.HISTORICAL_SINGLE_ENTITY_DETECTOR_COUNT.getName())
.setValue(totalHistoricalSingleEntityDetectors);
}
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
}
}, e -> listener.onFailure(e)));
} else {
adStatsResponse.setClusterStats(getClusterStatsMap(adStatsRequest));
listener.onResponse(adStatsResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.ad;

import static org.apache.http.entity.ContentType.APPLICATION_JSON;
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES;
import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand All @@ -41,6 +42,7 @@
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -49,11 +51,13 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetadata;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
Expand Down Expand Up @@ -627,6 +631,11 @@ public static ThreadPool createThreadPool() {
return pool;
}

public static CreateIndexResponse createIndex(AdminClient adminClient, String indexName, String indexMapping) {
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(AnomalyDetector.TYPE, indexMapping, XContentType.JSON);
return adminClient.indices().create(request).actionGet(5_000);
}

public static void createIndex(RestClient client, String indexName, HttpEntity data) throws IOException {
TestHelpers
.makeRequest(
Expand Down Expand Up @@ -757,4 +766,17 @@ public static ADTask randomAdTask(String taskId, ADTaskState state, Instant exec
.build();
return task;
}

public static HttpEntity toHttpEntity(ToXContentObject object) throws IOException {
return new StringEntity(toJsonString(object), APPLICATION_JSON);
}

public static HttpEntity toHttpEntity(String jsonString) throws IOException {
return new StringEntity(jsonString, APPLICATION_JSON);
}

public static String toJsonString(ToXContentObject object) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
return TestHelpers.xContentBuilderToString(object.toXContent(builder, ToXContent.EMPTY_PARAMS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.monitor.jvm.JvmService;
import org.elasticsearch.monitor.jvm.JvmStats;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -40,9 +42,11 @@
import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStat;
import com.amazon.opendistroforelasticsearch.ad.stats.ADStats;
import com.amazon.opendistroforelasticsearch.ad.stats.InternalStatNames;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.CounterSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.IndexStatusSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.ModelsOnNodeSupplier;
import com.amazon.opendistroforelasticsearch.ad.stats.suppliers.SettableSupplier;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.Throttler;
Expand Down Expand Up @@ -87,17 +91,26 @@ public void setUp() throws Exception {
put(nodeStatName2, new ADStat<>(false, new ModelsOnNodeSupplier(modelManager, cacheProvider)));
put(clusterStatName1, new ADStat<>(true, new IndexStatusSupplier(indexUtils, "index1")));
put(clusterStatName2, new ADStat<>(true, new IndexStatusSupplier(indexUtils, "index2")));
put(InternalStatNames.JVM_HEAP_USAGE.getName(), new ADStat<>(true, new SettableSupplier()));
}
};

adStats = new ADStats(indexUtils, modelManager, statsMap);
JvmService jvmService = mock(JvmService.class);
JvmStats jvmStats = mock(JvmStats.class);
JvmStats.Mem mem = mock(JvmStats.Mem.class);

when(jvmService.stats()).thenReturn(jvmStats);
when(jvmStats.getMem()).thenReturn(mem);
when(mem.getHeapUsedPercent()).thenReturn(randomShort());

action = new ADStatsNodesTransportAction(
client().threadPool(),
clusterService(),
mock(TransportService.class),
mock(ActionFilters.class),
adStats
adStats,
jvmService
);
}

Expand Down Expand Up @@ -133,4 +146,26 @@ public void testNodeOperation() {
assertTrue(statsToBeRetrieved.contains(statName));
}
}

@Test
public void testNodeOperationWithJvmHeapUsage() {
String nodeId = clusterService().localNode().getId();
ADStatsRequest adStatsRequest = new ADStatsRequest((nodeId));
adStatsRequest.clear();

Set<String> statsToBeRetrieved = new HashSet<>(Arrays.asList(nodeStatName1, InternalStatNames.JVM_HEAP_USAGE.getName()));

for (String stat : statsToBeRetrieved) {
adStatsRequest.addStat(stat);
}

ADStatsNodeResponse response = action.nodeOperation(new ADStatsNodeRequest(adStatsRequest));

Map<String, Object> stats = response.getStatsMap();

assertEquals(statsToBeRetrieved.size(), stats.size());
for (String statName : stats.keySet()) {
assertTrue(statsToBeRetrieved.contains(statName));
}
}
}
Loading

0 comments on commit e0dcaec

Please sign in to comment.