Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay the request size calculation until required by the indexing pressure framework #1592

Merged
merged 2 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,8 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(bulkRequest::ramBytesUsed, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
try {
Expand Down Expand Up @@ -631,7 +630,7 @@ protected void doRun() {
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we delay this isOnlySystem call as well? Like the ramBytesUsed() it's not needed for anything other than the releasable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this consistent with the other interface for Node level accounting markCoordinatingOperationStarted(LongSupplier bytes, boolean forceExecution) as it is supplied with a precomputed boolean, being used elsewhere too.

Also given isOnlySystem streams over a precomputed collection (unmodifiable set) of different indices being updated in that bulk request, we don’t expect this number (indices in a single request) to grow too high to be an overhead.

final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(
shardId,
bulkShardRequest.ramBytesUsed(),
bulkShardRequest::ramBytesUsed,
isOnlySystem
);
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;

import java.util.function.LongSupplier;

/**
* Sets up classes for node/shard level indexing pressure.
* Provides abstraction and orchestration for indexing pressure interfaces when called from Transport Actions or for Stats.
Expand All @@ -25,22 +27,48 @@ public IndexingPressureService(Settings settings, ClusterService clusterService)
shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
}

public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) {
/**
* Marks the beginning of coordinating operation for an indexing request on the node. Rejects the operation if node's
* memory limit is breached.
* Performs the node level accounting only if shard indexing pressure is disabled. Else empty releasable is returned.
* @param bytes memory bytes to be tracked for the current operation
* @param forceExecution permits operation even if the node level memory limit is breached
* @return Releasable to mark the completion of operation and release the accounted bytes
*/
public Releasable markCoordinatingOperationStarted(LongSupplier bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled() == false) {
return shardIndexingPressure.markCoordinatingOperationStarted(bytes, forceExecution);
return shardIndexingPressure.markCoordinatingOperationStarted(bytes.getAsLong(), forceExecution);
} else {
return () -> {};
}
}

public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
/**
* Marks the beginning of coordinating operation for an indexing request on the Shard. Rejects the operation if shard's
* memory limit is breached.
* Performs the shard level accounting only if shard indexing pressure is enabled. Else empty releasable is returned.
* @param shardId Shard ID for which the current indexing operation is targeted for
* @param bytes memory bytes to be tracked for the current operation
* @param forceExecution permits operation even if the node level memory limit is breached
* @return Releasable to mark the completion of operation and release the accounted bytes
*/
public Releasable markCoordinatingOperationStarted(ShardId shardId, LongSupplier bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, forceExecution);
return shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes.getAsLong(), forceExecution);
} else {
return () -> {};
}
}

/**
* Marks the beginning of primary operation for an indexing request. Rejects the operation if memory limit is breached.
* Performs the node level accounting only if shard indexing pressure is not disabled. Else shard level accounting
* is performed.
* @param shardId Shard ID for which the current indexing operation is targeted for
* @param bytes memory bytes to be tracked for the current operation
* @param forceExecution permits operation even if the memory limit is breached
* @return Releasable to mark the completion of operation and release the accounted bytes
*/
public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markPrimaryOperationStarted(shardId, bytes, forceExecution);
Expand All @@ -49,6 +77,15 @@ public Releasable markPrimaryOperationStarted(ShardId shardId, long bytes, boole
}
}

/**
* Marks the beginning of primary operation for an indexing request, when primary shard is local to the coordinator node.
* Rejects the operation if memory limit is breached.
* Performs the node level accounting only if shard indexing pressure is not disabled. Else shard level accounting
* is performed.
* @param shardId Shard ID for which the current indexing operation is targeted for
* @param bytes memory bytes to be tracked for the current operation
* @return Releasable to mark the completion of operation and release the accounted bytes
*/
public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(ShardId shardId, long bytes) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markPrimaryOperationLocalToCoordinatingNodeStarted(shardId, bytes);
Expand All @@ -57,6 +94,15 @@ public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(ShardId sha
}
}

/**
* Marks the beginning of replication operation for an indexing request. Rejects the operation if memory limit is breached.
* Performs the node level accounting only if shard indexing pressure is not disabled. Else shard level accounting
* is performed.
* @param shardId Shard ID for which the current indexing operation is targeted for
* @param bytes memory bytes to be tracked for the current operation
* @param forceExecution permits operation even if the memory limit is breached
* @return Releasable to mark the completion of operation and release the accounted bytes
*/
public Releasable markReplicaOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) {
return shardIndexingPressure.markReplicaOperationStarted(shardId, bytes, forceExecution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
package org.opensearch.index;

import org.junit.Before;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.bulk.BulkItemRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Requests;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -43,11 +50,18 @@ public void testCoordinatingOperationForShardIndexingPressure() {
IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);

Releasable releasable = service.markCoordinatingOperationStarted(shardId, 1024, false);
BulkItemRequest[] items = new BulkItemRequest[1];
DocWriteRequest<IndexRequest> writeRequest = new IndexRequest("index", "_doc", "id").source(
Requests.INDEX_CONTENT_TYPE,
"foo",
"bar"
);
items[0] = new BulkItemRequest(0, writeRequest);
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, WriteRequest.RefreshPolicy.NONE, items);
Releasable releasable = service.markCoordinatingOperationStarted(shardId, bulkShardRequest::ramBytesUsed, false);

IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertEquals(1024, shardStats.getCurrentCoordinatingBytes());
assertEquals(bulkShardRequest.ramBytesUsed(), shardStats.getCurrentCoordinatingBytes());
releasable.close();
}

Expand All @@ -64,11 +78,12 @@ public void testCoordinatingOperationForIndexingPressure() {
);
clusterSettings.applySettings(updated.build());

Releasable releasable = service.markCoordinatingOperationStarted(1024, false);
BulkRequest bulkRequest = new BulkRequest();
Releasable releasable = service.markCoordinatingOperationStarted(bulkRequest::ramBytesUsed, false);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertNull(shardStats);
IndexingPressureStats nodeStats = service.nodeStats();
assertEquals(1024, nodeStats.getCurrentCoordinatingBytes());
assertEquals(bulkRequest.ramBytesUsed(), nodeStats.getCurrentCoordinatingBytes());
releasable.close();
}

Expand Down