From 555a56dba55da5be98abbff5ed2fad4d7320f073 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Fri, 9 Aug 2024 00:44:57 +0530 Subject: [PATCH] Optimise unassigned shards iteration after allocator timeout (#14977) * Optimise unassigned shards iteration after allocator timeout Signed-off-by: Rishab Nahata --- .../common/util/BatchRunnableExecutor.java | 7 ++++ .../gateway/BaseGatewayShardAllocator.java | 11 ++---- .../gateway/ShardsBatchGatewayAllocator.java | 30 ++++++++++----- .../util/BatchRunnableExecutorTests.java | 37 +++++++++++++++++-- .../gateway/GatewayAllocatorTests.java | 21 +++++++++++ .../PrimaryShardBatchAllocatorTests.java | 33 +++++++++++++---- .../ReplicaShardBatchAllocatorTests.java | 8 ++-- .../TestShardBatchGatewayAllocator.java | 21 +++++++++++ 8 files changed, 136 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java b/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java index d3d3304cb909a..cfe2bbb85bda4 100644 --- a/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/BatchRunnableExecutor.java @@ -61,6 +61,13 @@ public void run() { "Time taken to execute timed runnables in this cycle:[{}ms]", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) ); + onComplete(); } + /** + * Callback method that is invoked after all {@link TimeoutAwareRunnable} instances in the batch have been processed. + * By default, this method does nothing, but it can be overridden by subclasses or modified in the implementation if + * there is a need to perform additional actions once the batch execution is completed. + */ + public void onComplete() {} } diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index 0d6af943d39e0..41704545c7a6f 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -47,7 +47,6 @@ import org.opensearch.core.index.shard.ShardId; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -82,17 +81,15 @@ public void allocateUnassigned( executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler); } - protected void allocateUnassignedBatchOnTimeout(List shardRoutings, RoutingAllocation allocation, boolean primary) { - Set shardIdsFromBatch = new HashSet<>(); - for (ShardRouting shardRouting : shardRoutings) { - ShardId shardId = shardRouting.shardId(); - shardIdsFromBatch.add(shardId); + protected void allocateUnassignedBatchOnTimeout(Set shardIds, RoutingAllocation allocation, boolean primary) { + if (shardIds.isEmpty()) { + return; } RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { ShardRouting unassignedShard = iterator.next(); AllocateUnassignedDecision allocationDecision; - if (unassignedShard.primary() == primary && shardIdsFromBatch.contains(unassignedShard.shardId())) { + if (unassignedShard.primary() == primary && shardIds.contains(unassignedShard.shardId())) { allocationDecision = AllocateUnassignedDecision.throttle(null); executeDecision(unassignedShard, allocationDecision, allocation, iterator); } diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 6c6b1126a78d6..d18304ea73ed0 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -277,17 +277,14 @@ protected BatchRunnableExecutor innerAllocateUnassignedBatch( } List runnables = new ArrayList<>(); if (primary) { + Set timedOutPrimaryShardIds = new HashSet<>(); batchIdToStartedShardBatch.values() .stream() .filter(batch -> batchesToAssign.contains(batch.batchId)) .forEach(shardsBatch -> runnables.add(new TimeoutAwareRunnable() { @Override public void onTimeout() { - primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout( - shardsBatch.getBatchedShardRoutings(), - allocation, - true - ); + timedOutPrimaryShardIds.addAll(shardsBatch.getBatchedShards()); } @Override @@ -295,15 +292,22 @@ public void run() { primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation); } })); - return new BatchRunnableExecutor(runnables, () -> primaryShardsBatchGatewayAllocatorTimeout); + return new BatchRunnableExecutor(runnables, () -> primaryShardsBatchGatewayAllocatorTimeout) { + @Override + public void onComplete() { + logger.trace("Triggering oncomplete after timeout for [{}] primary shards", timedOutPrimaryShardIds.size()); + primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutPrimaryShardIds, allocation, true); + } + }; } else { + Set timedOutReplicaShardIds = new HashSet<>(); batchIdToStoreShardBatch.values() .stream() .filter(batch -> batchesToAssign.contains(batch.batchId)) .forEach(batch -> runnables.add(new TimeoutAwareRunnable() { @Override public void onTimeout() { - replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(batch.getBatchedShardRoutings(), allocation, false); + timedOutReplicaShardIds.addAll(batch.getBatchedShards()); } @Override @@ -311,7 +315,13 @@ public void run() { replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation); } })); - return new BatchRunnableExecutor(runnables, () -> replicaShardsBatchGatewayAllocatorTimeout); + return new BatchRunnableExecutor(runnables, () -> replicaShardsBatchGatewayAllocatorTimeout) { + @Override + public void onComplete() { + logger.trace("Triggering oncomplete after timeout for [{}] replica shards", timedOutReplicaShardIds.size()); + replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(timedOutReplicaShardIds, allocation, false); + } + }; } } @@ -846,11 +856,11 @@ public int getNumberOfStoreShardBatches() { return batchIdToStoreShardBatch.size(); } - private void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatewayAllocatorTimeout) { + protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatewayAllocatorTimeout) { this.primaryShardsBatchGatewayAllocatorTimeout = primaryShardsBatchGatewayAllocatorTimeout; } - private void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) { + protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) { this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout; } } diff --git a/server/src/test/java/org/opensearch/common/util/BatchRunnableExecutorTests.java b/server/src/test/java/org/opensearch/common/util/BatchRunnableExecutorTests.java index 269f89faec54d..2f63ae43d0ded 100644 --- a/server/src/test/java/org/opensearch/common/util/BatchRunnableExecutorTests.java +++ b/server/src/test/java/org/opensearch/common/util/BatchRunnableExecutorTests.java @@ -15,6 +15,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.function.Supplier; import static org.mockito.Mockito.atMost; @@ -42,7 +43,13 @@ public void setupRunnables() { public void testRunWithoutTimeout() { setupRunnables(); timeoutSupplier = () -> TimeValue.timeValueSeconds(1); - BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier); + CountDownLatch countDownLatch = new CountDownLatch(1); + BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier) { + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }; executor.run(); verify(runnable1, times(1)).run(); verify(runnable2, times(1)).run(); @@ -50,12 +57,19 @@ public void testRunWithoutTimeout() { verify(runnable1, never()).onTimeout(); verify(runnable2, never()).onTimeout(); verify(runnable3, never()).onTimeout(); + assertEquals(0, countDownLatch.getCount()); } public void testRunWithTimeout() { setupRunnables(); timeoutSupplier = () -> TimeValue.timeValueNanos(1); - BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier); + CountDownLatch countDownLatch = new CountDownLatch(1); + BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier) { + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }; executor.run(); verify(runnable1, times(1)).onTimeout(); verify(runnable2, times(1)).onTimeout(); @@ -63,12 +77,19 @@ public void testRunWithTimeout() { verify(runnable1, never()).run(); verify(runnable2, never()).run(); verify(runnable3, never()).run(); + assertEquals(0, countDownLatch.getCount()); } public void testRunWithPartialTimeout() { setupRunnables(); timeoutSupplier = () -> TimeValue.timeValueMillis(50); - BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier); + CountDownLatch countDownLatch = new CountDownLatch(1); + BatchRunnableExecutor executor = new BatchRunnableExecutor(runnableList, timeoutSupplier) { + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }; doAnswer(invocation -> { Thread.sleep(100); return null; @@ -81,11 +102,18 @@ public void testRunWithPartialTimeout() { verify(runnable3, atMost(1)).onTimeout(); verify(runnable2, atMost(1)).onTimeout(); verify(runnable3, atMost(1)).onTimeout(); + assertEquals(0, countDownLatch.getCount()); } public void testRunWithEmptyRunnableList() { setupRunnables(); - BatchRunnableExecutor executor = new BatchRunnableExecutor(Collections.emptyList(), timeoutSupplier); + CountDownLatch countDownLatch = new CountDownLatch(1); + BatchRunnableExecutor executor = new BatchRunnableExecutor(Collections.emptyList(), timeoutSupplier) { + @Override + public void onComplete() { + countDownLatch.countDown(); + } + }; executor.run(); verify(runnable1, never()).onTimeout(); verify(runnable2, never()).onTimeout(); @@ -93,5 +121,6 @@ public void testRunWithEmptyRunnableList() { verify(runnable1, never()).run(); verify(runnable2, never()).run(); verify(runnable3, never()).run(); + assertEquals(1, countDownLatch.getCount()); } } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index 1596a0b566b28..c7eae77d6deba 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -32,6 +32,7 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BatchRunnableExecutor; import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.shard.ShardId; @@ -45,6 +46,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING; @@ -423,6 +426,24 @@ public void testReplicaAllocatorTimeout() { assertEquals(-1, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis()); } + public void testCollectTimedOutShards() throws InterruptedException { + createIndexAndUpdateClusterState(2, 5, 2); + CountDownLatch latch = new CountDownLatch(10); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(latch); + testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + executor.run(); + assertTrue(latch.await(1, TimeUnit.MINUTES)); + latch = new CountDownLatch(10); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(latch); + testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + executor.run(); + assertTrue(latch.await(1, TimeUnit.MINUTES)); + } + private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas) { if (count == 0) return; Metadata.Builder metadata = Metadata.builder(); diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java index 270cf465d0f80..48183fed66671 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java @@ -41,7 +41,6 @@ import org.junit.Before; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -264,8 +263,9 @@ public void testAllocateUnassignedBatchOnTimeoutWithMatchingPrimaryShards() { final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0"); ShardRouting shardRouting = routingAllocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - List shardRoutings = Arrays.asList(shardRouting); - batchAllocator.allocateUnassignedBatchOnTimeout(shardRoutings, routingAllocation, true); + Set shardIds = new HashSet<>(); + shardIds.add(shardRouting.shardId()); + batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, true); List ignoredShards = routingAllocation.routingNodes().unassigned().ignored(); assertEquals(1, ignoredShards.size()); @@ -277,8 +277,7 @@ public void testAllocateUnassignedBatchOnTimeoutWithNoMatchingPrimaryShards() { AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random()); setUpShards(1); final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0"); - List shardRoutings = new ArrayList<>(); - batchAllocator.allocateUnassignedBatchOnTimeout(shardRoutings, routingAllocation, true); + batchAllocator.allocateUnassignedBatchOnTimeout(new HashSet<>(), routingAllocation, true); List ignoredShards = routingAllocation.routingNodes().unassigned().ignored(); assertEquals(0, ignoredShards.size()); @@ -296,13 +295,33 @@ public void testAllocateUnassignedBatchOnTimeoutWithNonPrimaryShards() { .shard(shardId.id()) .replicaShards() .get(0); - List shardRoutings = Arrays.asList(shardRouting); - batchAllocator.allocateUnassignedBatchOnTimeout(shardRoutings, routingAllocation, false); + Set shardIds = new HashSet<>(); + shardIds.add(shardRouting.shardId()); + batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, false); List ignoredShards = routingAllocation.routingNodes().unassigned().ignored(); assertEquals(1, ignoredShards.size()); } + public void testAllocateUnassignedBatchOnTimeoutWithNoShards() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random()); + setUpShards(1); + final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0"); + + ShardRouting shardRouting = routingAllocation.routingTable() + .getIndicesRouting() + .get("test") + .shard(shardId.id()) + .replicaShards() + .get(0); + Set shardIds = new HashSet<>(); + batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, false); + + List ignoredShards = routingAllocation.routingNodes().unassigned().ignored(); + assertEquals(0, ignoredShards.size()); + } + private RoutingAllocation routingAllocationWithOnePrimary( AllocationDeciders deciders, UnassignedInfo.Reason reason, diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java index 435fd78be2bcd..78ed3f2c7d38c 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTests.java @@ -720,9 +720,9 @@ public void testAllocateUnassignedBatchThrottlingAllocationDeciderIsHonoured() t public void testAllocateUnassignedBatchOnTimeoutWithUnassignedReplicaShard() { RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders()); final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); - List shards = new ArrayList<>(); + Set shards = new HashSet<>(); while (iterator.hasNext()) { - shards.add(iterator.next()); + shards.add(iterator.next().shardId()); } testBatchAllocator.allocateUnassignedBatchOnTimeout(shards, allocation, false); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -736,9 +736,9 @@ public void testAllocateUnassignedBatchOnTimeoutWithUnassignedReplicaShard() { public void testAllocateUnassignedBatchOnTimeoutWithAlreadyRecoveringReplicaShard() { RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); - List shards = new ArrayList<>(); + Set shards = new HashSet<>(); while (iterator.hasNext()) { - shards.add(iterator.next()); + shards.add(iterator.next().shardId()); } testBatchAllocator.allocateUnassignedBatchOnTimeout(shards, allocation, false); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0)); diff --git a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java index 0eb4bb6935bac..156b1d7c620e6 100644 --- a/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java +++ b/test/framework/src/main/java/org/opensearch/test/gateway/TestShardBatchGatewayAllocator.java @@ -29,13 +29,20 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; public class TestShardBatchGatewayAllocator extends ShardsBatchGatewayAllocator { + CountDownLatch latch; + public TestShardBatchGatewayAllocator() { } + public TestShardBatchGatewayAllocator(CountDownLatch latch) { + this.latch = latch; + } + public TestShardBatchGatewayAllocator(long maxBatchSize) { super(maxBatchSize); } @@ -83,6 +90,13 @@ protected AsyncShardFetch.FetchResult(foundShards, shardsToIgnoreNodes); } + + @Override + protected void allocateUnassignedBatchOnTimeout(Set shardIds, RoutingAllocation allocation, boolean primary) { + for (int i = 0; i < shardIds.size(); i++) { + latch.countDown(); + } + } }; ReplicaShardBatchAllocator replicaBatchShardAllocator = new ReplicaShardBatchAllocator() { @@ -100,6 +114,13 @@ protected AsyncShardFetch.FetchResult shardIds, RoutingAllocation allocation, boolean primary) { + for (int i = 0; i < shardIds.size(); i++) { + latch.countDown(); + } + } }; @Override