Skip to content

Commit

Permalink
Add Bulk stats track the bulk per shard (#52208)
Browse files Browse the repository at this point in the history
* Add Bulk stats track the bulk sizes per shard and the time spent on the bulk shard request (#50536)(#47345)
  • Loading branch information
zhichen authored Apr 20, 2020
1 parent d960a30 commit 05066ae
Show file tree
Hide file tree
Showing 30 changed files with 749 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned the specific metrics."
}
Expand Down Expand Up @@ -83,7 +84,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned the specific metrics."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned for `indices` metric to the specific index metrics. Isn't used if `indices` (or `all`) metric isn't specified."
}
Expand Down Expand Up @@ -167,7 +168,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned for `indices` metric to the specific index metrics. Isn't used if `indices` (or `all`) metric isn't specified."
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"Help":
- skip:
version: " - 7.99.99"
reason: shard path stats were added in 8.0.0
reason: bulk stats were added in 8.0.0
- do:
cat.shards:
help: true
Expand Down Expand Up @@ -80,6 +80,11 @@
warmer.total_time .+ \n
path.data .+ \n
path.state .+ \n
bulk.total_operations .+ \n
bulk.total_time .+ \n
bulk.total_size_in_bytes .+ \n
bulk.avg_time .+ \n
bulk.avg_size_in_bytes .+ \n
$/
---
"Test cat shards output":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.action.admin.indices.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.index.bulk.stats.BulkStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -102,6 +104,9 @@ public class CommonStats implements Writeable, ToXContentFragment {
@Nullable
public RecoveryStats recoveryStats;

@Nullable
public BulkStats bulk;

public CommonStats() {
this(CommonStatsFlags.NONE);
}
Expand Down Expand Up @@ -159,6 +164,9 @@ public CommonStats(CommonStatsFlags flags) {
case Recovery:
recoveryStats = new RecoveryStats();
break;
case Bulk:
bulk = new BulkStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand Down Expand Up @@ -218,6 +226,9 @@ public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, C
case Recovery:
recoveryStats = indexShard.recoveryStats();
break;
case Bulk:
bulk = indexShard.bulkStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand All @@ -244,6 +255,9 @@ public CommonStats(StreamInput in) throws IOException {
translog = in.readOptionalWriteable(TranslogStats::new);
requestCache = in.readOptionalWriteable(RequestCacheStats::new);
recoveryStats = in.readOptionalWriteable(RecoveryStats::new);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
bulk = in.readOptionalWriteable(BulkStats::new);
}
}

@Override
Expand All @@ -264,6 +278,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(translog);
out.writeOptionalWriteable(requestCache);
out.writeOptionalWriteable(recoveryStats);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalWriteable(bulk);
}
}

public void add(CommonStats stats) {
Expand Down Expand Up @@ -396,6 +413,14 @@ public void add(CommonStats stats) {
} else {
recoveryStats.add(stats.getRecoveryStats());
}
if (bulk == null) {
if (stats.getBulk() != null) {
bulk = new BulkStats();
bulk.add(stats.getBulk());
}
} else {
bulk.add(stats.getBulk());
}
}

@Nullable
Expand Down Expand Up @@ -478,6 +503,11 @@ public RecoveryStats getRecoveryStats() {
return recoveryStats;
}

@Nullable
public BulkStats getBulk() {
return bulk;
}

/**
* Utility method which computes total memory by adding
* FieldData, PercolatorCache, Segments (memory, index writer, version map)
Expand All @@ -504,7 +534,7 @@ public ByteSizeValue getTotalMemory() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
final Stream<ToXContent> stream = Arrays.stream(new ToXContent[] {
docs, store, indexing, get, search, merge, refresh, flush, warmer, queryCache,
fieldData, completion, segments, translog, requestCache, recoveryStats})
fieldData, completion, segments, translog, requestCache, recoveryStats, bulk})
.filter(Objects::nonNull);
for (ToXContent toXContent : ((Iterable<ToXContent>)stream::iterator)) {
toXContent.toXContent(builder, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ public enum Flag {
Translog("translog", 13),
// 14 was previously used for Suggest
RequestCache("request_cache", 15),
Recovery("recovery", 16);
Recovery("recovery", 16),
Bulk("bulk", 17);

private final String restName;
private final int index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ public boolean recovery() {
return flags.isSet(Flag.Recovery);
}

public IndicesStatsRequest bulk(boolean bulk) {
flags.set(Flag.Bulk, bulk);
return this;
}
public boolean bulk() {
return flags.isSet(Flag.Bulk);
}

public boolean includeSegmentFileSizes() {
return flags.includeSegmentFileSizes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public IndicesStatsRequestBuilder setRecovery(boolean recovery) {
return this;
}

public IndicesStatsRequestBuilder setBulk(boolean bulk) {
request.bulk(bulk);
return this;
}

public IndicesStatsRequestBuilder setIncludeSegmentFileSizes(boolean includeSegmentFileSizes) {
request.includeSegmentFileSizes(includeSegmentFileSizes);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -49,6 +52,24 @@ public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRe
setRefreshPolicy(refreshPolicy);
}

public long totalSizeInBytes() {
long totalSizeInBytes = 0;
for (int i = 0; i < items.length; i++) {
DocWriteRequest<?> request = items[i].request();
if (request instanceof IndexRequest) {
if (((IndexRequest) request).source() != null) {
totalSizeInBytes += ((IndexRequest) request).source().length();
}
} else if (request instanceof UpdateRequest) {
IndexRequest doc = ((UpdateRequest) request).doc();
if (doc != null && doc.source() != null) {
totalSizeInBytes += ((UpdateRequest) request).doc().source().length();
}
}
}
return totalSizeInBytes;
}

public BulkItemRequest[] items() {
return items;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ public static void performOnPrimary(

private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);

final long startBulkTime = System.nanoTime();

@Override
protected void doRun() throws Exception {
while (context.hasMoreOperationsToExecute()) {
Expand All @@ -164,6 +166,7 @@ protected void doRun() throws Exception {
}
assert context.isInitial(); // either completed and moved to next or reset
}
primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
// We're done, there's no more operations to execute so we resolve the wrapped listener
finishRequest();
}
Expand Down Expand Up @@ -405,7 +408,9 @@ private static BulkItemResponse processUpdateResponse(final UpdateRequest update

@Override
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
final long startBulkTime = System.nanoTime();
final Translog.Location location = performOnReplica(request, replica);
replica.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
return new WriteReplicaResult<>(request, location, null, replica, logger);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.bulk.stats;

/**
* An bulk operation listener for bulk events.
*/
public interface BulkOperationListener {
/**
* Called after the bulk operation occurred.
*/
default void afterBulk(long bulkShardSizeInBytes, long tookInNanos) {
}
}

Loading

0 comments on commit 05066ae

Please sign in to comment.