Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Bulk stats track the bulk per shard #52208

Merged
merged 18 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned the specific metrics."
}
Expand Down Expand Up @@ -83,7 +84,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned the specific metrics."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned for `indices` metric to the specific index metrics. Isn't used if `indices` (or `all`) metric isn't specified."
}
Expand Down Expand Up @@ -167,7 +168,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned for `indices` metric to the specific index metrics. Isn't used if `indices` (or `all`) metric isn't specified."
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
"Help":
- skip:
version: " - 7.1.99"
reason: external refresh stats were added in 7.2.0
version: " - 7.9.99"
reason: bulk stats were added in 8.0.0
- do:
cat.shards:
help: true
Expand Down Expand Up @@ -78,6 +78,11 @@
warmer.current .+ \n
warmer.total .+ \n
warmer.total_time .+ \n
bulk.total_operations .+ \n
bulk.total_time .+ \n
bulk.total_size_in_bytes .+ \n
bulk.avg_time .+ \n
bulk.avg_size_in_bytes .+ \n
$/
---
"Test cat shards output":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.action.admin.indices.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.index.bulk.stats.BulkStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -102,6 +104,9 @@ public class CommonStats implements Writeable, ToXContentFragment {
@Nullable
public RecoveryStats recoveryStats;

@Nullable
public BulkStats bulk;

public CommonStats() {
this(CommonStatsFlags.NONE);
}
Expand Down Expand Up @@ -159,6 +164,9 @@ public CommonStats(CommonStatsFlags flags) {
case Recovery:
recoveryStats = new RecoveryStats();
break;
case Bulk:
bulk = new BulkStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand Down Expand Up @@ -218,6 +226,9 @@ public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, C
case Recovery:
recoveryStats = indexShard.recoveryStats();
break;
case Bulk:
bulk = indexShard.bulkStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand All @@ -244,6 +255,9 @@ public CommonStats(StreamInput in) throws IOException {
translog = in.readOptionalWriteable(TranslogStats::new);
requestCache = in.readOptionalWriteable(RequestCacheStats::new);
recoveryStats = in.readOptionalWriteable(RecoveryStats::new);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
bulk = in.readOptionalWriteable(BulkStats::new);
}
}

@Override
Expand All @@ -264,6 +278,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(translog);
out.writeOptionalWriteable(requestCache);
out.writeOptionalWriteable(recoveryStats);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalWriteable(bulk);
}
}

public void add(CommonStats stats) {
Expand Down Expand Up @@ -396,6 +413,14 @@ public void add(CommonStats stats) {
} else {
recoveryStats.add(stats.getRecoveryStats());
}
if (bulk == null) {
if (stats.getBulk() != null) {
bulk = new BulkStats();
bulk.add(stats.getBulk());
}
} else {
bulk.add(stats.getBulk());
}
}

@Nullable
Expand Down Expand Up @@ -478,6 +503,11 @@ public RecoveryStats getRecoveryStats() {
return recoveryStats;
}

@Nullable
public BulkStats getBulk() {
return bulk;
}

/**
* Utility method which computes total memory by adding
* FieldData, PercolatorCache, Segments (memory, index writer, version map)
Expand All @@ -504,7 +534,7 @@ public ByteSizeValue getTotalMemory() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
final Stream<ToXContent> stream = Arrays.stream(new ToXContent[] {
docs, store, indexing, get, search, merge, refresh, flush, warmer, queryCache,
fieldData, completion, segments, translog, requestCache, recoveryStats})
fieldData, completion, segments, translog, requestCache, recoveryStats, bulk})
.filter(Objects::nonNull);
for (ToXContent toXContent : ((Iterable<ToXContent>)stream::iterator)) {
toXContent.toXContent(builder, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ public enum Flag {
Translog("translog", 13),
// 14 was previously used for Suggest
RequestCache("request_cache", 15),
Recovery("recovery", 16);
Recovery("recovery", 16),
Bulk("bulk", 17);

private final String restName;
private final int index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ public boolean recovery() {
return flags.isSet(Flag.Recovery);
}

public IndicesStatsRequest bulk(boolean bulk) {
flags.set(Flag.Bulk, bulk);
return this;
}
public boolean bulk() {
return flags.isSet(Flag.Bulk);
}

public boolean includeSegmentFileSizes() {
return flags.includeSegmentFileSizes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public IndicesStatsRequestBuilder setRecovery(boolean recovery) {
return this;
}

public IndicesStatsRequestBuilder setBulk(boolean bulk) {
request.bulk(bulk);
return this;
}

public IndicesStatsRequestBuilder setIncludeSegmentFileSizes(boolean includeSegmentFileSizes) {
request.includeSegmentFileSizes(includeSegmentFileSizes);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -49,6 +52,24 @@ public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRe
setRefreshPolicy(refreshPolicy);
}

public long totalSizeInBytes() {
long totalSizeInBytes = 0;
for (int i = 0; i < items.length; i++) {
DocWriteRequest<?> request = items[i].request();
if (request instanceof IndexRequest) {
if (((IndexRequest) request).source() != null) {
totalSizeInBytes += ((IndexRequest) request).source().length();
}
} else if (request instanceof UpdateRequest) {
IndexRequest doc = ((UpdateRequest) request).doc();
if (doc != null && doc.source() != null) {
totalSizeInBytes += ((UpdateRequest) request).doc().source().length();
}
}
}
return totalSizeInBytes;
}

public BulkItemRequest[] items() {
return items;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ public static void performOnPrimary(

private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);

final long startBulkTime = System.nanoTime();

@Override
protected void doRun() throws Exception {
while (context.hasMoreOperationsToExecute()) {
Expand All @@ -164,6 +166,7 @@ protected void doRun() throws Exception {
}
assert context.isInitial(); // either completed and moved to next or reset
}
primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
// We're done, there's no more operations to execute so we resolve the wrapped listener
finishRequest();
}
Expand Down Expand Up @@ -405,7 +408,9 @@ private static BulkItemResponse processUpdateResponse(final UpdateRequest update

@Override
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
final long startBulkTime = System.nanoTime();
final Translog.Location location = performOnReplica(request, replica);
replica.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
return new WriteReplicaResult<>(request, location, null, replica, logger);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.bulk.stats;

/**
* An bulk operation listener for bulk events.
*/
public interface BulkOperationListener {
/**
* Called after the bulk operation occurred.
*/
default void afterBulk(long bulkShardSizeInBytes, long tookInNanos) {
}
}

Loading