diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json index 0a8960f2f9e89..2a3659dfb892a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.stats.json @@ -37,7 +37,8 @@ "segments", "store", "warmer", - "suggest" + "suggest", + "bulk" ], "description":"Limit the information returned the specific metrics." } @@ -83,7 +84,8 @@ "segments", "store", "warmer", - "suggest" + "suggest", + "bulk" ], "description":"Limit the information returned the specific metrics." } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json index 1aa57ee849c66..0dc5e159a4528 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json @@ -120,7 +120,8 @@ "segments", "store", "warmer", - "suggest" + "suggest", + "bulk" ], "description":"Limit the information returned for `indices` metric to the specific index metrics. Isn't used if `indices` (or `all`) metric isn't specified." } @@ -167,7 +168,8 @@ "segments", "store", "warmer", - "suggest" + "suggest", + "bulk" ], "description":"Limit the information returned for `indices` metric to the specific index metrics. Isn't used if `indices` (or `all`) metric isn't specified." }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml index aa4abc7a11eae..a2e54366eb793 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml @@ -2,7 +2,7 @@ "Help": - skip: version: " - 7.99.99" - reason: shard path stats were added in 8.0.0 + reason: bulk stats were added in 8.0.0 - do: cat.shards: help: true @@ -80,6 +80,11 @@ warmer.total_time .+ \n path.data .+ \n path.state .+ \n + bulk.total_operations .+ \n + bulk.total_time .+ \n + bulk.total_size_in_bytes .+ \n + bulk.avg_time .+ \n + bulk.avg_size_in_bytes .+ \n $/ --- "Test cat shards output": diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java index e9309bf8ac9c8..ade4ddb316da9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java @@ -20,6 +20,8 @@ package org.elasticsearch.action.admin.indices.stats; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Version; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -102,6 +104,9 @@ public class CommonStats implements Writeable, ToXContentFragment { @Nullable public RecoveryStats recoveryStats; + @Nullable + public BulkStats bulk; + public CommonStats() { this(CommonStatsFlags.NONE); } @@ -159,6 +164,9 @@ public CommonStats(CommonStatsFlags flags) { case Recovery: recoveryStats = new RecoveryStats(); break; + case Bulk: + bulk = new BulkStats(); + break; default: throw new IllegalStateException("Unknown Flag: " + flag); } @@ -218,6 +226,9 @@ public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, C case Recovery: recoveryStats = indexShard.recoveryStats(); break; + case Bulk: + bulk = indexShard.bulkStats(); + break; default: throw new IllegalStateException("Unknown Flag: " + flag); } @@ -244,6 +255,9 @@ public CommonStats(StreamInput in) throws IOException { translog = in.readOptionalWriteable(TranslogStats::new); requestCache = in.readOptionalWriteable(RequestCacheStats::new); recoveryStats = in.readOptionalWriteable(RecoveryStats::new); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + bulk = in.readOptionalWriteable(BulkStats::new); + } } @Override @@ -264,6 +278,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(translog); out.writeOptionalWriteable(requestCache); out.writeOptionalWriteable(recoveryStats); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalWriteable(bulk); + } } public void add(CommonStats stats) { @@ -396,6 +413,14 @@ public void add(CommonStats stats) { } else { recoveryStats.add(stats.getRecoveryStats()); } + if (bulk == null) { + if (stats.getBulk() != null) { + bulk = new BulkStats(); + bulk.add(stats.getBulk()); + } + } else { + bulk.add(stats.getBulk()); + } } @Nullable @@ -478,6 +503,11 @@ public RecoveryStats getRecoveryStats() { return recoveryStats; } + @Nullable + public BulkStats getBulk() { + return bulk; + } + /** * Utility method which computes total memory by adding * FieldData, PercolatorCache, Segments (memory, index writer, version map) @@ -504,7 +534,7 @@ public ByteSizeValue getTotalMemory() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { final Stream stream = Arrays.stream(new ToXContent[] { docs, store, indexing, get, search, merge, refresh, flush, warmer, queryCache, - fieldData, completion, segments, translog, requestCache, recoveryStats}) + fieldData, completion, segments, translog, requestCache, recoveryStats, bulk}) .filter(Objects::nonNull); for (ToXContent toXContent : ((Iterable)stream::iterator)) { toXContent.toXContent(builder, params); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java index a25f8e417a885..ee041235410ef 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java @@ -227,7 +227,8 @@ public enum Flag { Translog("translog", 13), // 14 was previously used for Suggest RequestCache("request_cache", 15), - Recovery("recovery", 16); + Recovery("recovery", 16), + Bulk("bulk", 17); private final String restName; private final int index; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java index 5f7eb2b3b95ad..6af50d5e2eacb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java @@ -254,6 +254,14 @@ public boolean recovery() { return flags.isSet(Flag.Recovery); } + public IndicesStatsRequest bulk(boolean bulk) { + flags.set(Flag.Bulk, bulk); + return this; + } + public boolean bulk() { + return flags.isSet(Flag.Bulk); + } + public boolean includeSegmentFileSizes() { return flags.includeSegmentFileSizes(); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java index 8108299721d82..8fce7c79f5064 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java @@ -149,6 +149,11 @@ public IndicesStatsRequestBuilder setRecovery(boolean recovery) { return this; } + public IndicesStatsRequestBuilder setBulk(boolean bulk) { + request.bulk(bulk); + return this; + } + public IndicesStatsRequestBuilder setIncludeSegmentFileSizes(boolean includeSegmentFileSizes) { request.includeSegmentFileSizes(includeSegmentFileSizes); return this; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index 0f05f071ad4f3..540db58da864c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -19,8 +19,11 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -49,6 +52,24 @@ public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRe setRefreshPolicy(refreshPolicy); } + public long totalSizeInBytes() { + long totalSizeInBytes = 0; + for (int i = 0; i < items.length; i++) { + DocWriteRequest request = items[i].request(); + if (request instanceof IndexRequest) { + if (((IndexRequest) request).source() != null) { + totalSizeInBytes += ((IndexRequest) request).source().length(); + } + } else if (request instanceof UpdateRequest) { + IndexRequest doc = ((UpdateRequest) request).doc(); + if (doc != null && doc.source() != null) { + totalSizeInBytes += ((UpdateRequest) request).doc().source().length(); + } + } + } + return totalSizeInBytes; + } + public BulkItemRequest[] items() { return items; } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 3eded2888e564..84c2e089d9a2c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -153,6 +153,8 @@ public static void performOnPrimary( private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); + final long startBulkTime = System.nanoTime(); + @Override protected void doRun() throws Exception { while (context.hasMoreOperationsToExecute()) { @@ -164,6 +166,7 @@ protected void doRun() throws Exception { } assert context.isInitial(); // either completed and moved to next or reset } + primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); // We're done, there's no more operations to execute so we resolve the wrapped listener finishRequest(); } @@ -405,7 +408,9 @@ private static BulkItemResponse processUpdateResponse(final UpdateRequest update @Override public WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + final long startBulkTime = System.nanoTime(); final Translog.Location location = performOnReplica(request, replica); + replica.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); return new WriteReplicaResult<>(request, location, null, replica, logger); } diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkOperationListener.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkOperationListener.java new file mode 100644 index 0000000000000..c9164e800d199 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkOperationListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.bulk.stats; + +/** + * An bulk operation listener for bulk events. + */ +public interface BulkOperationListener { + /** + * Called after the bulk operation occurred. + */ + default void afterBulk(long bulkShardSizeInBytes, long tookInNanos) { + } +} + diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java new file mode 100644 index 0000000000000..e0ce32b753bf3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java @@ -0,0 +1,164 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.bulk.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Bulk related statistics, including the time and size of shard bulk requests, + * starting at the shard level and allowing aggregation to indices and node level + */ +public class BulkStats implements Writeable, ToXContentFragment { + + private long totalOperations = 0; + private long totalTimeInMillis = 0; + private long totalSizeInBytes = 0; + private long avgTimeInMillis = 0; + private long avgSizeInBytes = 0; + + public BulkStats() { + + } + + public BulkStats(StreamInput in) throws IOException { + totalOperations = in.readVLong(); + totalTimeInMillis = in.readVLong(); + totalSizeInBytes = in.readVLong(); + avgTimeInMillis = in.readVLong(); + avgSizeInBytes = in.readVLong(); + } + + public BulkStats(long totalOperations, long totalTimeInMillis, long totalSizeInBytes, long avgTimeInMillis, long avgSizeInBytes) { + this.totalOperations = totalOperations; + this.totalTimeInMillis = totalTimeInMillis; + this.totalSizeInBytes = totalSizeInBytes; + this.avgTimeInMillis = avgTimeInMillis; + this.avgSizeInBytes = avgSizeInBytes; + } + + public void add(BulkStats bulkStats) { + addTotals(bulkStats); + } + + public void addTotals(BulkStats bulkStats) { + if (bulkStats == null) { + return; + } + if (this.totalOperations > 0 || bulkStats.totalOperations > 0) { + this.avgTimeInMillis = + (avgTimeInMillis * totalOperations + bulkStats.avgTimeInMillis * bulkStats.totalOperations) / (totalOperations + + bulkStats.totalOperations); + this.avgSizeInBytes = + (avgSizeInBytes * totalOperations + bulkStats.avgSizeInBytes * bulkStats.totalOperations) / (totalOperations + + bulkStats.totalOperations); + } + this.totalOperations += bulkStats.totalOperations; + this.totalTimeInMillis += bulkStats.totalTimeInMillis; + this.totalSizeInBytes += bulkStats.totalSizeInBytes; + } + + public long getTotalSizeInBytes() { + return totalSizeInBytes; + } + + public long getTotalOperations() { + return totalOperations; + } + + public TimeValue getTotalTime() { + return new TimeValue(totalTimeInMillis); + } + + public TimeValue getAvgTime() { + return new TimeValue(avgTimeInMillis); + } + + public long getTotalTimeInMillis() { + return totalTimeInMillis; + } + + public long getAvgTimeInMillis() { + return avgTimeInMillis; + } + + public long getAvgSizeInBytes() { + return avgSizeInBytes; + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalOperations); + out.writeVLong(totalTimeInMillis); + out.writeVLong(totalSizeInBytes); + out.writeVLong(avgTimeInMillis); + out.writeVLong(avgSizeInBytes); + } + + @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(Fields.BULK); + builder.field(Fields.TOTAL_OPERATIONS, totalOperations); + builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime()); + builder.field(Fields.TOTAL_SIZE_IN_BYTES, totalSizeInBytes); + builder.humanReadableField(Fields.AVG_TIME_IN_MILLIS, Fields.AVG_TIME, getAvgTime()); + builder.field(Fields.AVG_SIZE_IN_BYTES, avgSizeInBytes); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final BulkStats that = (BulkStats) o; + return Objects.equals(this.totalOperations, that.totalOperations) && Objects.equals(this.totalTimeInMillis, that.totalTimeInMillis) + && Objects.equals(this.totalSizeInBytes, that.totalSizeInBytes) && Objects.equals(this.avgTimeInMillis, that.avgTimeInMillis) + && Objects.equals(this.avgSizeInBytes, that.avgSizeInBytes); + } + + @Override + public int hashCode() { + return Objects.hash(totalOperations, totalTimeInMillis, totalSizeInBytes, avgTimeInMillis, avgSizeInBytes); + } + + static final class Fields { + static final String BULK = "bulk"; + static final String TOTAL_OPERATIONS = "total_operations"; + static final String TOTAL_TIME = "total_time"; + static final String AVG_TIME = "avg_time"; + static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; + static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes"; + static final String AVG_TIME_IN_MILLIS = "avg_time_in_millis"; + static final String AVG_SIZE_IN_BYTES = "avg_size_in_bytes"; + } +} + diff --git a/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java new file mode 100644 index 0000000000000..db09a98319877 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/bulk/stats/ShardBulkStats.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.bulk.stats; + +import org.elasticsearch.common.ExponentiallyWeightedMovingAverage; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.index.shard.IndexShard; + +import java.util.concurrent.TimeUnit; + +/** + * Internal class that maintains relevant shard bulk statistics / metrics. + * @see IndexShard + */ +public class ShardBulkStats implements BulkOperationListener { + + private final StatsHolder totalStats = new StatsHolder(); + private static final double ALPHA = 0.1; + + public BulkStats stats() { + return totalStats.stats(); + } + + @Override + public void afterBulk(long shardBulkSizeInBytes, long tookInNanos) { + totalStats.totalSizeInBytes.inc(shardBulkSizeInBytes); + totalStats.shardBulkMetric.inc(tookInNanos); + totalStats.timeInMillis.addValue(tookInNanos); + totalStats.sizeInBytes.addValue(shardBulkSizeInBytes); + } + + static final class StatsHolder { + final MeanMetric shardBulkMetric = new MeanMetric(); + final CounterMetric totalSizeInBytes = new CounterMetric(); + ExponentiallyWeightedMovingAverage timeInMillis = new ExponentiallyWeightedMovingAverage(ALPHA, 0.0); + ExponentiallyWeightedMovingAverage sizeInBytes = new ExponentiallyWeightedMovingAverage(ALPHA, 0.0); + + BulkStats stats() { + return new BulkStats( + shardBulkMetric.count(), + TimeUnit.NANOSECONDS.toMillis(shardBulkMetric.sum()), + totalSizeInBytes.count(), + TimeUnit.NANOSECONDS.toMillis((long) timeInMillis.getAverage()), + (long) sizeInBytes.getAverage()); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9b99c7be77eab..55d0342cde9e3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -46,6 +46,9 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.elasticsearch.index.bulk.stats.BulkOperationListener; +import org.elasticsearch.index.bulk.stats.BulkStats; +import org.elasticsearch.index.bulk.stats.ShardBulkStats; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; @@ -208,6 +211,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final SearchOperationListener searchOperationListener; + private final ShardBulkStats bulkOperationListener; private final GlobalCheckpointListeners globalCheckpointListeners; private final ReplicationTracker replicationTracker; @@ -310,6 +314,7 @@ public IndexShard( final List listenersList = new ArrayList<>(listeners); listenersList.add(internalIndexingStats); this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger); + this.bulkOperationListener = new ShardBulkStats(); this.globalCheckpointSyncer = globalCheckpointSyncer; this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); final List searchListenersList = new ArrayList<>(searchOperationListener); @@ -399,6 +404,10 @@ public SearchOperationListener getSearchOperationListener() { return this.searchOperationListener; } + public BulkOperationListener getBulkOperationListener() { + return this.bulkOperationListener; + } + public ShardIndexWarmerService warmerService() { return this.shardWarmerService; } @@ -1028,6 +1037,10 @@ public CompletionStats completionStats(String... fields) { return getEngine().completionStats(fields); } + public BulkStats bulkStats() { + return bulkOperationListener.stats(); + } + /** * Executes the given flush request against the engine. * diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 9f0c88ff7fe1e..ddf512da54cbe 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -35,6 +35,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -403,6 +404,9 @@ public NodeIndicesStats stats(CommonStatsFlags flags) { case Flush: commonStats.flush.add(oldShardsStats.flushStats); break; + case Bulk: + commonStats.bulk.add(oldShardsStats.bulkStats); + break; } } @@ -817,6 +821,7 @@ static class OldShardsStats implements IndexEventListener { final RefreshStats refreshStats = new RefreshStats(); final FlushStats flushStats = new FlushStats(); final RecoveryStats recoveryStats = new RecoveryStats(); + final BulkStats bulkStats = new BulkStats(); @Override public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { @@ -829,6 +834,7 @@ public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable Index refreshStats.addTotals(indexShard.refreshStats()); flushStats.addTotals(indexShard.flushStats()); recoveryStats.addTotals(indexShard.recoveryStats()); + bulkStats.addTotals(indexShard.bulkStats()); } } diff --git a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java index aa9e880180327..47fb58f3cae13 100644 --- a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -172,6 +173,11 @@ public RecoveryStats getRecoveryStats() { return stats.getRecoveryStats(); } + @Nullable + public BulkStats getBulk() { + return stats.getBulk(); + } + @Override public void writeTo(StreamOutput out) throws IOException { stats.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java index 36fe3b1233d10..e386eec005a64 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestIndicesAction.java @@ -495,6 +495,26 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("search.throttled", "alias:sth;default:false;desc:indicates if the index is search throttled"); + table.addCell("bulk.total_operations", + "sibling:pri;alias:bto,bulkTotalOperation;default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("pri.bulk.total_operations", "default:false;text-align:right;desc:number of bulk shard ops"); + + table.addCell("bulk.total_time", + "sibling:pri;alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); + table.addCell("pri.bulk.total_time", "default:false;text-align:right;desc:time spend in shard bulk"); + + table.addCell("bulk.total_size_in_bytes", + "sibling:pri;alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.addCell("pri.bulk.total_size_in_bytes", "default:false;text-align:right;desc:total size in bytes of shard bulk"); + + table.addCell("bulk.avg_time", + "sibling:pri;alias:bati,bulkAvgTime;default:false;text-align:right;desc:average time spend in shard bulk"); + table.addCell("pri.bulk.avg_time", "default:false;text-align:right;desc:average time spend in shard bulk"); + + table.addCell("bulk.avg_size_in_bytes", + "sibling:pri;alias:basi,bulkAvgSizeInBytes;default:false;text-align:right;desc:average size in bytes of shard bulk"); + table.addCell("pri.bulk.avg_size_in_bytes", "default:false;text-align:right;desc:average size in bytes of shard bulk"); + table.endHeaders(); return table; } @@ -754,6 +774,21 @@ Table buildTable(final RestRequest request, table.addCell(searchThrottled); + table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotalOperations()); + table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotalOperations()); + + table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotalTime()); + table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotalTime()); + + table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotalSizeInBytes()); + table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotalSizeInBytes()); + + table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getAvgTime()); + table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getAvgTime()); + + table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getAvgSizeInBytes()); + table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getAvgSizeInBytes()); + table.endRow(); }); diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java index 272f60dfc8a2e..b2004e455a7d0 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestNodesAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.http.HttpInfo; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.cache.request.RequestCacheStats; import org.elasticsearch.index.engine.SegmentsStats; @@ -254,6 +255,15 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("suggest.time", "alias:suti,suggestTime;default:false;text-align:right;desc:time spend in suggest"); table.addCell("suggest.total", "alias:suto,suggestTotal;default:false;text-align:right;desc:number of suggest ops"); + table.addCell("bulk.total_operations", + "alias:bto,bulkTotalOperations;default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); + table.addCell("bulk.total_size_in_bytes", + "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.addCell("bulk.avg_time", "alias:bati,bulkAvgTime;default:false;text-align:right;desc:average time spend in shard bulk"); + table.addCell("bulk.avg_size_in_bytes", + "alias:basi,bulkAvgSizeInBytes;default:false;text-align:right;desc:average size in bytes of shard bulk"); + table.endHeaders(); return table; } @@ -428,6 +438,13 @@ Table buildTable(boolean fullId, RestRequest req, ClusterStateResponse state, No table.addCell(searchStats == null ? null : searchStats.getTotal().getSuggestTime()); table.addCell(searchStats == null ? null : searchStats.getTotal().getSuggestCount()); + BulkStats bulkStats = indicesStats == null ? null : indicesStats.getBulk(); + table.addCell(bulkStats == null ? null : bulkStats.getTotalOperations()); + table.addCell(bulkStats == null ? null : bulkStats.getTotalTime()); + table.addCell(bulkStats == null ? null : bulkStats.getTotalSizeInBytes()); + table.addCell(bulkStats == null ? null : bulkStats.getAvgTime()); + table.addCell(bulkStats == null ? null : bulkStats.getAvgSizeInBytes()); + table.endRow(); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java index 35ca27573ae3a..04be1b920915a 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestShardsAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.Table; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; @@ -202,6 +203,15 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("path.data", "alias:pd,dataPath;default:false;text-align:right;desc:shard data path"); table.addCell("path.state", "alias:ps,statsPath;default:false;text-align:right;desc:shard state path"); + + table.addCell("bulk.total_operations", + "alias:bto,bulkTotalOperations;default:false;text-align:right;desc:number of bulk shard ops"); + table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk"); + table.addCell("bulk.total_size_in_bytes", + "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk"); + table.addCell("bulk.avg_time", "alias:bati,bulkAvgTime;default:false;text-align:right;desc:average time spend in shard bulk"); + table.addCell("bulk.avg_size_in_bytes", + "alias:basi,bulkAvgSizeInBytes;default:false;text-align:right;desc:avg size in bytes of shard bulk"); table.endHeaders(); return table; @@ -357,6 +367,12 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe table.addCell(getOrNull(shardStats, ShardStats::getDataPath, s -> s)); table.addCell(getOrNull(shardStats, ShardStats::getStatePath, s -> s)); + + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalOperations)); + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalTime)); + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalSizeInBytes)); + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getAvgTime)); + table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getAvgSizeInBytes)); table.endRow(); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 25f1b9603ab8a..197fc3a42a98e 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.bulk.stats.ShardBulkStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperService; @@ -787,6 +788,7 @@ public void testRetries() throws Exception { when(shard.indexSettings()).thenReturn(indexSettings); when(shard.shardId()).thenReturn(shardId); when(shard.mapperService()).thenReturn(mock(MapperService.class)); + when(shard.getBulkOperationListener()).thenReturn(mock(ShardBulkStats.class)); UpdateHelper updateHelper = mock(UpdateHelper.class); when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( diff --git a/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java b/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java new file mode 100644 index 0000000000000..5086b8dd9bd09 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/bulk/stats/BulkStatsTests.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.bulk.stats; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +public class BulkStatsTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return BulkStats::new; + } + + @Override + protected BulkStats createTestInstance() { + return new BulkStats(randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong()); + } + + @Override + protected BulkStats mutateInstance(BulkStats instance) { + switch (between(0, 4)) { + case 0: + return new BulkStats(randomValueOtherThan(instance.getTotalOperations(), ESTestCase::randomNonNegativeLong), + instance.getTotalTimeInMillis(), + instance.getTotalSizeInBytes(), + instance.getAvgTimeInMillis(), + instance.getAvgTimeInMillis()); + case 1: + return new BulkStats(instance.getTotalOperations(), + randomValueOtherThan(instance.getTotalTimeInMillis(), ESTestCase::randomNonNegativeLong), + instance.getTotalSizeInBytes(), + instance.getAvgTimeInMillis(), + instance.getAvgTimeInMillis()); + case 2: + return new BulkStats(instance.getTotalOperations(), + instance.getTotalTimeInMillis(), + randomValueOtherThan(instance.getTotalSizeInBytes(), ESTestCase::randomNonNegativeLong), + instance.getAvgTimeInMillis(), + instance.getAvgTimeInMillis()); + case 3: + return new BulkStats(instance.getTotalOperations(), + instance.getTotalTimeInMillis(), + instance.getTotalSizeInBytes(), + randomValueOtherThan(instance.getAvgTimeInMillis(), ESTestCase::randomNonNegativeLong), + instance.getAvgTimeInMillis()); + case 4: + return new BulkStats(instance.getTotalOperations(), + instance.getTotalTimeInMillis(), + instance.getTotalSizeInBytes(), + instance.getAvgTimeInMillis(), + randomValueOtherThan(instance.getAvgSizeInBytes(), ESTestCase::randomNonNegativeLong)); + default: + throw new AssertionError("failure, got illegal switch case"); + } + } + + public void testAddTotals() { + BulkStats bulkStats1 = new BulkStats(1, 1, 1, 2, 2); + BulkStats bulkStats2 = new BulkStats(1, 1, 1, 2, 2); + + // adding these two bulk stats and checking stats are correct + bulkStats1.add(bulkStats2); + assertStats(bulkStats1, 2); + + // another call, adding again ... + bulkStats1.add(bulkStats2); + assertStats(bulkStats1, 3); + + } + + private static void assertStats(BulkStats stats, long equalTo) { + assertEquals(equalTo, stats.getTotalOperations()); + assertEquals(equalTo, stats.getTotalTimeInMillis()); + assertEquals(equalTo, stats.getTotalSizeInBytes()); + } + +} + diff --git a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index 8e748b22beb42..9ea1581bccc25 100644 --- a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -32,11 +32,16 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -726,7 +731,7 @@ public void testEncodeDecodeCommonStats() throws IOException { public void testFlagOrdinalOrder() { Flag[] flags = new Flag[]{Flag.Store, Flag.Indexing, Flag.Get, Flag.Search, Flag.Merge, Flag.Flush, Flag.Refresh, Flag.QueryCache, Flag.FieldData, Flag.Docs, Flag.Warmer, Flag.Completion, Flag.Segments, - Flag.Translog, Flag.RequestCache, Flag.Recovery}; + Flag.Translog, Flag.RequestCache, Flag.Recovery, Flag.Bulk}; assertThat(flags.length, equalTo(Flag.values().length)); for (int i = 0; i < flags.length; i++) { @@ -902,6 +907,9 @@ private static void set(Flag flag, IndicesStatsRequestBuilder builder, boolean s case Recovery: builder.setRecovery(set); break; + case Bulk: + builder.setBulk(set); + break; default: fail("new flag? " + flag); break; @@ -942,6 +950,8 @@ private static boolean isSet(Flag flag, CommonStats response) { return response.getRequestCache() != null; case Recovery: return response.getRecoveryStats() != null; + case Bulk: + return response.getBulk() != null; default: fail("new flag? " + flag); return false; @@ -1067,6 +1077,37 @@ public void testFilterCacheStats() throws Exception { assertThat(response.getTotal().queryCache.getMemorySizeInBytes(), equalTo(0L)); } + public void testBulkStats() throws Exception { + final String index = "test"; + assertAcked(prepareCreate(index).setSettings(settingsBuilder().put("index.number_of_shards", 2) + .put("index.number_of_replicas", 1))); + ensureGreen(); + final BulkRequest request1 = new BulkRequest(); + for (int i = 0; i < 20; ++i) { + request1.add(new IndexRequest(index).source(Collections.singletonMap("key", "value" + i))) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + BulkResponse bulkResponse = client().bulk(request1).get(); + assertThat(bulkResponse.hasFailures(), equalTo(false)); + assertThat(bulkResponse.getItems().length, equalTo(20)); + for (BulkItemResponse bulkItemResponse : bulkResponse) { + assertThat(bulkItemResponse.getIndex(), equalTo(index)); + } + IndicesStatsResponse stats = client().admin().indices().prepareStats(index).setBulk(true).get(); + + assertThat(stats.getTotal().bulk.getTotalOperations(), equalTo(4L)); + assertThat(stats.getTotal().bulk.getTotalTimeInMillis(), greaterThan(0L)); + assertThat(stats.getTotal().bulk.getTotalSizeInBytes(), greaterThan(0L)); + assertThat(stats.getTotal().bulk.getAvgTimeInMillis(), greaterThan(0L)); + assertThat(stats.getTotal().bulk.getAvgSizeInBytes(), greaterThan(0L)); + + assertThat(stats.getPrimaries().bulk.getTotalOperations(), equalTo(2L)); + assertThat(stats.getPrimaries().bulk.getTotalTimeInMillis(), greaterThan(0L)); + assertThat(stats.getPrimaries().bulk.getTotalSizeInBytes(), greaterThan(0L)); + assertThat(stats.getPrimaries().bulk.getAvgTimeInMillis(), greaterThan(0L)); + assertThat(stats.getPrimaries().bulk.getAvgSizeInBytes(), greaterThan(0L)); + } + /** * Test that we can safely concurrently index and get stats. This test was inspired by a serialization issue that arose due to a race * getting doc stats during heavy indexing. The race could lead to deleted docs being negative which would then be serialized as a diff --git a/x-pack/plugin/core/src/main/resources/monitoring-es.json b/x-pack/plugin/core/src/main/resources/monitoring-es.json index c937adb4174fb..9517c4a49c88b 100644 --- a/x-pack/plugin/core/src/main/resources/monitoring-es.json +++ b/x-pack/plugin/core/src/main/resources/monitoring-es.json @@ -84,6 +84,25 @@ "type": "long" } } + }, + "bulk": { + "properties": { + "total_operations": { + "type": "long" + }, + "total_time_in_millis": { + "type": "long" + }, + "total_size_in_bytes": { + "type": "long" + }, + "avg_time_in_millis": { + "type": "long" + }, + "avg_size_in_bytes": { + "type": "long" + } + } } } }, @@ -115,6 +134,25 @@ "type": "long" } } + }, + "bulk": { + "properties": { + "total_operations": { + "type": "long" + }, + "total_time_in_millis": { + "type": "long" + }, + "total_size_in_bytes": { + "type": "long" + }, + "avg_time_in_millis": { + "type": "long" + }, + "avg_size_in_bytes": { + "type": "long" + } + } } } } @@ -258,6 +296,25 @@ "type": "long" } } + }, + "bulk": { + "properties": { + "total_operations": { + "type": "long" + }, + "total_time_in_millis": { + "type": "long" + }, + "total_size_in_bytes": { + "type": "long" + }, + "avg_time_in_millis": { + "type": "long" + }, + "avg_size_in_bytes": { + "type": "long" + } + } } } }, @@ -392,6 +449,25 @@ "type": "long" } } + }, + "bulk": { + "properties": { + "total_operations": { + "type": "long" + }, + "total_time_in_millis": { + "type": "long" + }, + "total_size_in_bytes": { + "type": "long" + }, + "avg_time_in_millis": { + "type": "long" + }, + "avg_size_in_bytes": { + "type": "long" + } + } } } } @@ -560,6 +636,25 @@ "type": "long" } } + }, + "bulk": { + "properties": { + "total_operations": { + "type": "long" + }, + "total_time_in_millis": { + "type": "long" + }, + "total_size_in_bytes": { + "type": "long" + }, + "avg_time_in_millis": { + "type": "long" + }, + "avg_size_in_bytes": { + "type": "long" + } + } } } }, diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java index 6f21c02657bac..b41723a9c754f 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsCollector.java @@ -70,6 +70,7 @@ protected Collection doCollect(final MonitoringDoc.Node node, .setRefresh(true) .setQueryCache(true) .setRequestCache(true) + .setBulk(true) .get(getCollectionTimeout()); final long timestamp = timestamp(); diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java index 411fe857b003c..61ec7874f9968 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java @@ -163,6 +163,11 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "index_stats.primaries.store.size_in_bytes", "index_stats.primaries.refresh.total_time_in_millis", "index_stats.primaries.refresh.external_total_time_in_millis", + "index_stats.primaries.bulk.total_operations", + "index_stats.primaries.bulk.total_time_in_millis", + "index_stats.primaries.bulk.total_size_in_bytes", + "index_stats.primaries.bulk.avg_time_in_millis", + "index_stats.primaries.bulk.avg_size_in_bytes", "index_stats.total.docs.count", "index_stats.total.fielddata.memory_size_in_bytes", "index_stats.total.fielddata.evictions", @@ -193,5 +198,10 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "index_stats.total.segments.fixed_bit_set_memory_in_bytes", "index_stats.total.store.size_in_bytes", "index_stats.total.refresh.total_time_in_millis", - "index_stats.total.refresh.external_total_time_in_millis"); + "index_stats.total.refresh.external_total_time_in_millis", + "index_stats.total.bulk.total_operations", + "index_stats.total.bulk.total_time_in_millis", + "index_stats.total.bulk.total_size_in_bytes", + "index_stats.total.bulk.avg_time_in_millis", + "index_stats.total.bulk.avg_size_in_bytes"); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java index 6fbf26b7c39dd..86b9472532b9c 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDoc.java @@ -100,6 +100,11 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "indices_stats._all.primaries.search.query_time_in_millis", "indices_stats._all.primaries.search.query_total", "indices_stats._all.primaries.store.size_in_bytes", + "indices_stats._all.primaries.bulk.total_operations", + "indices_stats._all.primaries.bulk.total_time_in_millis", + "indices_stats._all.primaries.bulk.total_size_in_bytes", + "indices_stats._all.primaries.bulk.avg_time_in_millis", + "indices_stats._all.primaries.bulk.avg_size_in_bytes", "indices_stats._all.total.docs.count", "indices_stats._all.total.indexing.index_time_in_millis", "indices_stats._all.total.indexing.index_total", @@ -107,5 +112,10 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "indices_stats._all.total.indexing.throttle_time_in_millis", "indices_stats._all.total.search.query_time_in_millis", "indices_stats._all.total.search.query_total", - "indices_stats._all.total.store.size_in_bytes"); + "indices_stats._all.total.store.size_in_bytes", + "indices_stats._all.total.bulk.total_operations", + "indices_stats._all.total.bulk.total_time_in_millis", + "indices_stats._all.total.bulk.total_size_in_bytes", + "indices_stats._all.total.bulk.avg_time_in_millis", + "indices_stats._all.total.bulk.avg_size_in_bytes"); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java index 0354ecf43476b..35a40e2b00512 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsCollector.java @@ -44,7 +44,8 @@ public class NodeStatsCollector extends Collector { CommonStatsFlags.Flag.QueryCache, CommonStatsFlags.Flag.RequestCache, CommonStatsFlags.Flag.Search, - CommonStatsFlags.Flag.Segments); + CommonStatsFlags.Flag.Segments, + CommonStatsFlags.Flag.Bulk); private final Client client; diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java index 6ed4271b7b989..beb5fa220f11e 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java @@ -104,6 +104,11 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO "node_stats.indices.segments.index_writer_memory_in_bytes", "node_stats.indices.segments.version_map_memory_in_bytes", "node_stats.indices.segments.fixed_bit_set_memory_in_bytes", + "node_stats.indices.bulk.total_operations", + "node_stats.indices.bulk.total_time_in_millis", + "node_stats.indices.bulk.total_size_in_bytes", + "node_stats.indices.bulk.avg_time_in_millis", + "node_stats.indices.bulk.avg_size_in_bytes", "node_stats.fs.io_stats.total.operations", "node_stats.fs.io_stats.total.read_operations", "node_stats.fs.io_stats.total.write_operations", diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java index 97de35ed96ae3..ff8ac3439d77e 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java @@ -196,6 +196,13 @@ public void testToXContent() throws IOException { + " \"evictions\": 10," + " \"hit_count\": 11," + " \"miss_count\": 12" + + " }," + + " \"bulk\": {" + + " \"total_operations\": 0," + + " \"total_time_in_millis\": 0," + + " \"total_size_in_bytes\": 0," + + " \"avg_time_in_millis\": 0," + + " \"avg_size_in_bytes\": 0" + " }" + " }," + " \"primaries\": {" @@ -249,6 +256,13 @@ public void testToXContent() throws IOException { + " \"evictions\": 10," + " \"hit_count\": 11," + " \"miss_count\": 12" + + " }," + + " \"bulk\": {" + + " \"total_operations\": 0," + + " \"total_time_in_millis\": 0," + + " \"total_size_in_bytes\": 0," + + " \"avg_time_in_millis\": 0," + + " \"avg_size_in_bytes\": 0" + " }" + " }" + " }" diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java index f27d48b014191..181f17009f66e 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.bulk.stats.BulkStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.IndexingStats; @@ -117,6 +118,13 @@ public void testToXContent() throws IOException { + " \"search\": {" + " \"query_total\": 12," + " \"query_time_in_millis\": 14" + + " }," + + " \"bulk\": {" + + " \"total_operations\": 0," + + " \"total_time_in_millis\": 0," + + " \"total_size_in_bytes\": 0," + + " \"avg_time_in_millis\": 0," + + " \"avg_size_in_bytes\": 0" + " }" + " }," + " \"total\": {" @@ -135,6 +143,13 @@ public void testToXContent() throws IOException { + " \"search\": {" + " \"query_total\": 18," + " \"query_time_in_millis\": 21" + + " }," + + " \"bulk\": {" + + " \"total_operations\": 0," + + " \"total_time_in_millis\": 0," + + " \"total_size_in_bytes\": 0," + + " \"avg_time_in_millis\": 0," + + " \"avg_size_in_bytes\": 0" + " }" + " }" + " }" @@ -155,6 +170,9 @@ private CommonStats mockCommonStats() { final SearchStats.Stats searchStats = new SearchStats.Stats(6L, 7L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L); commonStats.getSearch().add(new SearchStats(searchStats, 0L, null)); + final BulkStats bulkStats = new BulkStats(0L, 0L, 0L, 0L, 0L); + commonStats.getBulk().add(bulkStats); + return commonStats; } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java index eff7cb6552829..bb0416b013a1d 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java @@ -167,6 +167,13 @@ public void testToXContent() throws IOException { + " \"evictions\": 14," + " \"hit_count\": 15," + " \"miss_count\": 16" + + " }," + + " \"bulk\": {" + + " \"total_operations\": 0," + + " \"total_time_in_millis\": 0," + + " \"total_size_in_bytes\": 0," + + " \"avg_time_in_millis\": 0," + + " \"avg_size_in_bytes\": 0" + " }" + " }," + " \"os\": {"