Skip to content

Commit

Permalink
Add Bulk stats track the bulk sizes per shard and the time spent on t…
Browse files Browse the repository at this point in the history
…he bulk shard request (#50536)(#47345)
  • Loading branch information
zhichen committed Feb 11, 2020
1 parent 8cb8841 commit 270e8d5
Show file tree
Hide file tree
Showing 26 changed files with 512 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned the specific metrics."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned for `indices` metric to the specific index metrics. Isn't used if `indices` (or `all`) metric isn't specified."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.action.admin.indices.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.index.bulk.stats.BulkStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -102,6 +104,9 @@ public class CommonStats implements Writeable, ToXContentFragment {
@Nullable
public RecoveryStats recoveryStats;

@Nullable
public BulkStats bulk;

public CommonStats() {
this(CommonStatsFlags.NONE);
}
Expand Down Expand Up @@ -159,6 +164,9 @@ public CommonStats(CommonStatsFlags flags) {
case Recovery:
recoveryStats = new RecoveryStats();
break;
case Bulk:
bulk = new BulkStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand Down Expand Up @@ -218,6 +226,9 @@ public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, C
case Recovery:
recoveryStats = indexShard.recoveryStats();
break;
case Bulk:
bulk = indexShard.bulkStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand All @@ -244,6 +255,9 @@ public CommonStats(StreamInput in) throws IOException {
translog = in.readOptionalWriteable(TranslogStats::new);
requestCache = in.readOptionalWriteable(RequestCacheStats::new);
recoveryStats = in.readOptionalWriteable(RecoveryStats::new);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
bulk = in.readOptionalWriteable(BulkStats::new);
}
}

@Override
Expand All @@ -264,6 +278,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(translog);
out.writeOptionalWriteable(requestCache);
out.writeOptionalWriteable(recoveryStats);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalWriteable(bulk);
}
}

public void add(CommonStats stats) {
Expand Down Expand Up @@ -396,6 +413,14 @@ public void add(CommonStats stats) {
} else {
recoveryStats.add(stats.getRecoveryStats());
}
if (bulk == null) {
if (stats.getBulk() != null) {
bulk = new BulkStats();
bulk.add(stats.getBulk());
}
} else {
bulk.add(stats.getBulk());
}
}

@Nullable
Expand Down Expand Up @@ -478,6 +503,11 @@ public RecoveryStats getRecoveryStats() {
return recoveryStats;
}

@Nullable
public BulkStats getBulk() {
return bulk;
}

/**
* Utility method which computes total memory by adding
* FieldData, PercolatorCache, Segments (memory, index writer, version map)
Expand All @@ -504,7 +534,7 @@ public ByteSizeValue getTotalMemory() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
final Stream<ToXContent> stream = Arrays.stream(new ToXContent[] {
docs, store, indexing, get, search, merge, refresh, flush, warmer, queryCache,
fieldData, completion, segments, translog, requestCache, recoveryStats})
fieldData, completion, segments, translog, requestCache, recoveryStats, bulk})
.filter(Objects::nonNull);
for (ToXContent toXContent : ((Iterable<ToXContent>)stream::iterator)) {
toXContent.toXContent(builder, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ public enum Flag {
Translog("translog", 13),
// 14 was previously used for Suggest
RequestCache("request_cache", 15),
Recovery("recovery", 16);
Recovery("recovery", 16),
Bulk("bulk", 17);

private final String restName;
private final int index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ public boolean recovery() {
return flags.isSet(Flag.Recovery);
}

public IndicesStatsRequest bulk(boolean bulk) {
flags.set(Flag.Bulk, bulk);
return this;
}
public boolean bulk() {
return flags.isSet(Flag.Bulk);
}

public boolean includeSegmentFileSizes() {
return flags.includeSegmentFileSizes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public IndicesStatsRequestBuilder setRecovery(boolean recovery) {
return this;
}

public IndicesStatsRequestBuilder setBulk(boolean bulk) {
request.bulk(bulk);
return this;
}

public IndicesStatsRequestBuilder setIncludeSegmentFileSizes(boolean includeSegmentFileSizes) {
request.includeSegmentFileSizes(includeSegmentFileSizes);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -49,6 +52,24 @@ public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRe
setRefreshPolicy(refreshPolicy);
}

public long totalSizeInBytes() {
long totalSizeInBytes = 0;
for (int i = 0; i < items.length; i++) {
DocWriteRequest<?> request = items[i].request();
if (request instanceof IndexRequest) {
if (((IndexRequest) request).source() != null) {
totalSizeInBytes += ((IndexRequest) request).source().length();
}
} else if (request instanceof UpdateRequest) {
IndexRequest doc = ((UpdateRequest) request).doc();
if (doc != null && doc.source() != null) {
totalSizeInBytes += ((UpdateRequest) request).doc().source().length();
}
}
}
return totalSizeInBytes;
}

public BulkItemRequest[] items() {
return items;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ public static void performOnPrimary(

private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);

final long startBulkTime = System.nanoTime();

@Override
protected void doRun() throws Exception {
while (context.hasMoreOperationsToExecute()) {
Expand Down Expand Up @@ -187,6 +189,7 @@ private void finishRequest() {
() -> new WritePrimaryResult<>(
context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), null,
context.getPrimary(), logger));
primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
}
}.run();
}
Expand Down Expand Up @@ -392,7 +395,9 @@ private static BulkItemResponse processUpdateResponse(final UpdateRequest update

@Override
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
final long startBulkTime = System.nanoTime();
final Translog.Location location = performOnReplica(request, replica);
replica.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
return new WriteReplicaResult<>(request, location, null, replica, logger);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.bulk.stats;

/**
* An bulk operation listener for bulk events.
*/
public interface BulkOperationListener {
/**
* Called after the bulk operation occurred.
*/
default void afterBulk(long bulkShardSizeInBytes, long tookInNanos) {
}
}

106 changes: 106 additions & 0 deletions server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.bulk.stats;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;

public class BulkStats implements Writeable, ToXContentFragment {

private long total = 0;
private long totalTimeInMillis = 0;
private long totalSizeInBytes = 0;

public BulkStats() {

}

public BulkStats(StreamInput in) throws IOException {
total = in.readVLong();
totalTimeInMillis = in.readVLong();
totalSizeInBytes = in.readVLong();
}

public BulkStats(long total, long totalTimeInMillis, long totalSizeInBytes) {
this.total = total;
this.totalTimeInMillis = totalTimeInMillis;
this.totalSizeInBytes = totalSizeInBytes;
}

public void add(BulkStats bulkStats) {
addTotals(bulkStats);
}

public void addTotals(BulkStats bulkStats) {
if (bulkStats == null) {
return;
}
this.total += bulkStats.total;
this.totalTimeInMillis += bulkStats.totalTimeInMillis;
this.totalSizeInBytes += bulkStats.totalSizeInBytes;
}

public long getTotalSizeInBytes() {
return totalSizeInBytes;
}

public long getTotal() {
return total;
}

public TimeValue getTotalTime() {
return new TimeValue(totalTimeInMillis);
}

public long getTotalTimeInMillis() {
return totalTimeInMillis;
}

@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(total);
out.writeVLong(totalTimeInMillis);
out.writeVLong(totalSizeInBytes);
}

@Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(Fields.BULK);
builder.field(Fields.TOTAL, total);
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime());
builder.field(Fields.TOTAL_SIZE_IN_BYTES, totalSizeInBytes);
builder.endObject();
return builder;
}

static final class Fields {
static final String BULK = "bulk";
static final String TOTAL = "total";
static final String TOTAL_TIME = "total_time";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
}
}

Loading

0 comments on commit 270e8d5

Please sign in to comment.