Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Bulk stats track the bulk per shard #52208

Merged
merged 18 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned the specific metrics."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned for `indices` metric to the specific index metrics. Isn't used if `indices` (or `all`) metric isn't specified."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.action.admin.indices.stats;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.index.bulk.stats.BulkStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -102,6 +104,9 @@ public class CommonStats implements Writeable, ToXContentFragment {
@Nullable
public RecoveryStats recoveryStats;

@Nullable
public BulkStats bulk;

public CommonStats() {
this(CommonStatsFlags.NONE);
}
Expand Down Expand Up @@ -159,6 +164,9 @@ public CommonStats(CommonStatsFlags flags) {
case Recovery:
recoveryStats = new RecoveryStats();
break;
case Bulk:
bulk = new BulkStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand Down Expand Up @@ -218,6 +226,9 @@ public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, C
case Recovery:
recoveryStats = indexShard.recoveryStats();
break;
case Bulk:
bulk = indexShard.bulkStats();
break;
default:
throw new IllegalStateException("Unknown Flag: " + flag);
}
Expand All @@ -244,6 +255,9 @@ public CommonStats(StreamInput in) throws IOException {
translog = in.readOptionalWriteable(TranslogStats::new);
requestCache = in.readOptionalWriteable(RequestCacheStats::new);
recoveryStats = in.readOptionalWriteable(RecoveryStats::new);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
bulk = in.readOptionalWriteable(BulkStats::new);
}
}

@Override
Expand All @@ -264,6 +278,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(translog);
out.writeOptionalWriteable(requestCache);
out.writeOptionalWriteable(recoveryStats);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalWriteable(bulk);
}
}

public void add(CommonStats stats) {
Expand Down Expand Up @@ -396,6 +413,14 @@ public void add(CommonStats stats) {
} else {
recoveryStats.add(stats.getRecoveryStats());
}
if (bulk == null) {
if (stats.getBulk() != null) {
bulk = new BulkStats();
bulk.add(stats.getBulk());
}
} else {
bulk.add(stats.getBulk());
}
}

@Nullable
Expand Down Expand Up @@ -478,6 +503,11 @@ public RecoveryStats getRecoveryStats() {
return recoveryStats;
}

@Nullable
public BulkStats getBulk() {
return bulk;
}

/**
* Utility method which computes total memory by adding
* FieldData, PercolatorCache, Segments (memory, index writer, version map)
Expand All @@ -504,7 +534,7 @@ public ByteSizeValue getTotalMemory() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
final Stream<ToXContent> stream = Arrays.stream(new ToXContent[] {
docs, store, indexing, get, search, merge, refresh, flush, warmer, queryCache,
fieldData, completion, segments, translog, requestCache, recoveryStats})
fieldData, completion, segments, translog, requestCache, recoveryStats, bulk})
.filter(Objects::nonNull);
for (ToXContent toXContent : ((Iterable<ToXContent>)stream::iterator)) {
toXContent.toXContent(builder, params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ public enum Flag {
Translog("translog", 13),
// 14 was previously used for Suggest
RequestCache("request_cache", 15),
Recovery("recovery", 16);
Recovery("recovery", 16),
Bulk("bulk", 17);

private final String restName;
private final int index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ public boolean recovery() {
return flags.isSet(Flag.Recovery);
}

public IndicesStatsRequest bulk(boolean bulk) {
flags.set(Flag.Bulk, bulk);
return this;
}
public boolean bulk() {
return flags.isSet(Flag.Bulk);
}

public boolean includeSegmentFileSizes() {
return flags.includeSegmentFileSizes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public IndicesStatsRequestBuilder setRecovery(boolean recovery) {
return this;
}

public IndicesStatsRequestBuilder setBulk(boolean bulk) {
request.bulk(bulk);
return this;
}

public IndicesStatsRequestBuilder setIncludeSegmentFileSizes(boolean includeSegmentFileSizes) {
request.includeSegmentFileSizes(includeSegmentFileSizes);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -49,6 +52,24 @@ public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRe
setRefreshPolicy(refreshPolicy);
}

public long totalSizeInBytes() {
long totalSizeInBytes = 0;
for (int i = 0; i < items.length; i++) {
DocWriteRequest<?> request = items[i].request();
if (request instanceof IndexRequest) {
if (((IndexRequest) request).source() != null) {
totalSizeInBytes += ((IndexRequest) request).source().length();
}
} else if (request instanceof UpdateRequest) {
IndexRequest doc = ((UpdateRequest) request).doc();
if (doc != null && doc.source() != null) {
totalSizeInBytes += ((UpdateRequest) request).doc().source().length();
}
}
}
return totalSizeInBytes;
}

public BulkItemRequest[] items() {
return items;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ public static void performOnPrimary(

private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);

final long startBulkTime = System.nanoTime();

@Override
protected void doRun() throws Exception {
while (context.hasMoreOperationsToExecute()) {
Expand Down Expand Up @@ -187,6 +189,7 @@ private void finishRequest() {
() -> new WritePrimaryResult<>(
context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), null,
context.getPrimary(), logger));
primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
zhichen marked this conversation as resolved.
Show resolved Hide resolved
}
}.run();
}
Expand Down Expand Up @@ -392,7 +395,9 @@ private static BulkItemResponse processUpdateResponse(final UpdateRequest update

@Override
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
final long startBulkTime = System.nanoTime();
final Translog.Location location = performOnReplica(request, replica);
replica.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
return new WriteReplicaResult<>(request, location, null, replica, logger);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.bulk.stats;

/**
* An bulk operation listener for bulk events.
*/
public interface BulkOperationListener {
/**
* Called after the bulk operation occurred.
*/
default void afterBulk(long bulkShardSizeInBytes, long tookInNanos) {
}
}

106 changes: 106 additions & 0 deletions server/src/main/java/org/elasticsearch/index/bulk/stats/BulkStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.bulk.stats;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;

public class BulkStats implements Writeable, ToXContentFragment {
zhichen marked this conversation as resolved.
Show resolved Hide resolved

private long total = 0;
zhichen marked this conversation as resolved.
Show resolved Hide resolved
private long totalTimeInMillis = 0;
private long totalSizeInBytes = 0;

public BulkStats() {

}

public BulkStats(StreamInput in) throws IOException {
total = in.readVLong();
totalTimeInMillis = in.readVLong();
totalSizeInBytes = in.readVLong();
}

public BulkStats(long total, long totalTimeInMillis, long totalSizeInBytes) {
this.total = total;
this.totalTimeInMillis = totalTimeInMillis;
this.totalSizeInBytes = totalSizeInBytes;
}

public void add(BulkStats bulkStats) {
addTotals(bulkStats);
}

public void addTotals(BulkStats bulkStats) {
if (bulkStats == null) {
return;
}
this.total += bulkStats.total;
this.totalTimeInMillis += bulkStats.totalTimeInMillis;
this.totalSizeInBytes += bulkStats.totalSizeInBytes;
}

public long getTotalSizeInBytes() {
return totalSizeInBytes;
}

public long getTotal() {
return total;
}

public TimeValue getTotalTime() {
return new TimeValue(totalTimeInMillis);
}

public long getTotalTimeInMillis() {
return totalTimeInMillis;
}

@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(total);
out.writeVLong(totalTimeInMillis);
out.writeVLong(totalSizeInBytes);
}

@Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(Fields.BULK);
builder.field(Fields.TOTAL, total);
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime());
builder.field(Fields.TOTAL_SIZE_IN_BYTES, totalSizeInBytes);
builder.endObject();
return builder;
}

static final class Fields {
static final String BULK = "bulk";
static final String TOTAL = "total";
zhichen marked this conversation as resolved.
Show resolved Hide resolved
static final String TOTAL_TIME = "total_time";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
}
}

Loading