Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay the request size calculation until required by the indexing pre… #1589

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,8 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(bulkRequest, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
try {
Expand Down Expand Up @@ -631,7 +630,7 @@ protected void doRun() {
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(
shardId,
bulkShardRequest.ramBytesUsed(),
bulkShardRequest,
isOnlySystem
);
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.index;

import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
Expand All @@ -25,16 +27,18 @@ public IndexingPressureService(Settings settings, ClusterService clusterService)
shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
}

public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) {
public Releasable markCoordinatingOperationStarted(BulkRequest bulkRequest, boolean forceExecution) {
if (isShardIndexingPressureEnabled() == false) {
return shardIndexingPressure.markCoordinatingOperationStarted(bytes, forceExecution);
final long indexingBytes = bulkRequest.ramBytesUsed();
return shardIndexingPressure.markCoordinatingOperationStarted(indexingBytes, forceExecution);
} else {
return () -> {};
}
}

public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
public Releasable markCoordinatingOperationStarted(ShardId shardId, BulkShardRequest bulkShardRequest, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) {
final long bytes = bulkShardRequest.ramBytesUsed();
return shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, forceExecution);
} else {
return () -> {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
package org.opensearch.index;

import org.junit.Before;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.bulk.BulkItemRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Requests;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -43,11 +50,18 @@ public void testCoordinatingOperationForShardIndexingPressure() {
IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);

Releasable releasable = service.markCoordinatingOperationStarted(shardId, 1024, false);
BulkItemRequest[] items = new BulkItemRequest[1];
DocWriteRequest<IndexRequest> writeRequest = new IndexRequest("index", "_doc", "id").source(
Requests.INDEX_CONTENT_TYPE,
"foo",
"bar"
);
items[0] = new BulkItemRequest(0, writeRequest);
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, WriteRequest.RefreshPolicy.NONE, items);
Releasable releasable = service.markCoordinatingOperationStarted(shardId, bulkShardRequest, false);

IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertEquals(1024, shardStats.getCurrentCoordinatingBytes());
assertEquals(bulkShardRequest.ramBytesUsed(), shardStats.getCurrentCoordinatingBytes());
releasable.close();
}

Expand All @@ -64,11 +78,12 @@ public void testCoordinatingOperationForIndexingPressure() {
);
clusterSettings.applySettings(updated.build());

Releasable releasable = service.markCoordinatingOperationStarted(1024, false);
BulkRequest bulkRequest = new BulkRequest();
Releasable releasable = service.markCoordinatingOperationStarted(bulkRequest, false);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertNull(shardStats);
IndexingPressureStats nodeStats = service.nodeStats();
assertEquals(1024, nodeStats.getCurrentCoordinatingBytes());
assertEquals(bulkRequest.ramBytesUsed(), nodeStats.getCurrentCoordinatingBytes());
releasable.close();
}

Expand Down