Skip to content

Commit

Permalink
[Batch Fetch] Fix for hasInitiatedFetching to fix allocation explain …
Browse files Browse the repository at this point in the history
…and manual reroute APIs (#14972)

* Fix for hasInitiatedFetching() in batch mode

Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
  • Loading branch information
rahulkarajgikar authored Jul 29, 2024
1 parent 59302a3 commit d08c425
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))

### Dependencies
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -797,11 +798,26 @@ public void testBatchModeEnabledWithoutTimeout() throws Exception {
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());
// Replica shard would be marked ineligible since there are no data nodes.
// It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
// Now start one data node
logger.info("--> restarting the first stopped node");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
ensureStableCluster(2);
ensureYellow("test");
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());

// calling reroute and asserting on reroute response
logger.info("--> calling reroute while cluster is yellow");
clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

// Now start last data node and ensure batch mode is working and cluster goes green
logger.info("--> restarting the second stopped node");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);
ensureGreen("test");
Expand Down Expand Up @@ -842,11 +858,26 @@ public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Ex
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());
// Replica shard would be marked ineligible since there are no data nodes.
// It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
// Now start one data nodes and ensure batch mode is working
logger.info("--> restarting the first stopped node");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
ensureStableCluster(2);
ensureYellow("test");
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());

// calling reroute and asserting on reroute response
logger.info("--> calling reroute while cluster is yellow");
clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

// Now start last data node and ensure batch mode is working and cluster goes green
logger.info("--> restarting the second stopped node");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);
ensureGreen("test");
Expand Down Expand Up @@ -907,7 +938,9 @@ public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws

assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
// All replica shards would be marked ineligible since there are no data nodes.
// They would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
assertFalse(health.isTimedOut());
assertEquals(RED, health.getStatus());
Expand Down Expand Up @@ -1051,6 +1084,18 @@ public void testMultipleReplicaShardAssignmentWithDelayedAllocationAndDifferentN
ensureGreen("test");
}

public void testAllocationExplainReturnsNoWhenExtraReplicaShardInNonBatchMode() throws Exception {
// Non batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are
// returning NO
this.allocationExplainReturnsNoWhenExtraReplicaShard(false);
}

public void testAllocationExplainReturnsNoWhenExtraReplicaShardInBatchMode() throws Exception {
// Batch mode - This test is to validate that we don't return AWAITING_INFO in allocation explain API when the deciders are
// returning NO
this.allocationExplainReturnsNoWhenExtraReplicaShard(true);
}

public void testNBatchesCreationAndAssignment() throws Exception {
// we will reduce batch size to 5 to make sure we have enough batches to test assignment
// Total number of primary shards = 50 (50 indices*1)
Expand Down Expand Up @@ -1104,7 +1149,9 @@ public void testNBatchesCreationAndAssignment() throws Exception {
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
// All replica shards would be marked ineligible since there are no data nodes.
// They would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
assertFalse(health.isTimedOut());
assertEquals(RED, health.getStatus());
Expand Down Expand Up @@ -1193,7 +1240,9 @@ public void testCulpritShardInBatch() throws Exception {
);
assertTrue(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(internalCluster().clusterService().getSettings()));
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());
// Replica shard would be marked ineligible since there are no data nodes.
// It would then be removed from any batch and batches would get deleted, so we would have 0 replica batches
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
assertTrue(clusterRerouteResponse.isAcknowledged());
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
assertFalse(health.isTimedOut());
Expand Down Expand Up @@ -1511,4 +1560,97 @@ private List<String> findNodesWithShard(final boolean primary) {
Collections.shuffle(requiredStartedShards, random());
return requiredStartedShards.stream().map(shard -> state.nodes().get(shard.currentNodeId()).getName()).collect(Collectors.toList());
}

private void allocationExplainReturnsNoWhenExtraReplicaShard(boolean batchModeEnabled) throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), batchModeEnabled).build()
);
internalCluster().startDataOnlyNodes(5);
createIndex(
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 4).build()
);
ensureGreen("test");
ensureStableCluster(6);

// Stop one of the nodes to make the cluster yellow
// We cannot directly create an index with replica = data node count because then the whole flow will get skipped due to
// INDEX_CREATED
List<String> nodesWithReplicaShards = findNodesWithShard(false);
Settings replicaNodeDataPathSettings = internalCluster().dataPathSettings(nodesWithReplicaShards.get(0));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodesWithReplicaShards.get(0)));

ensureStableCluster(5);
ensureYellow("test");

logger.info("--> calling allocation explain API");
// shard should have decision NO because there is no valid node for the extra replica to go to
AllocateUnassignedDecision aud = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision();

assertEquals(AllocationDecision.NO, aud.getAllocationDecision());
assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation());

// Now creating a new index with too many replicas and trying again
createIndex(
"test2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 5).build()
);

ensureYellowAndNoInitializingShards("test2");

logger.info("--> calling allocation explain API again");
// shard should have decision NO because there are 6 replicas and 4 data nodes
aud = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision();

assertEquals(AllocationDecision.NO, aud.getAllocationDecision());
assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation());

logger.info("--> restarting the stopped node");
internalCluster().startDataOnlyNode(
Settings.builder().put("node.name", nodesWithReplicaShards.get(0)).put(replicaNodeDataPathSettings).build()
);

ensureStableCluster(6);
ensureGreen("test");

logger.info("--> calling allocation explain API 3rd time");
// shard should still have decision NO because there are 6 replicas and 5 data nodes
aud = client().admin()
.cluster()
.prepareAllocationExplain()
.setIndex("test2")
.setShard(0)
.setPrimary(false)
.get()
.getExplanation()
.getShardAllocationDecision()
.getAllocateDecision();

assertEquals(AllocationDecision.NO, aud.getAllocationDecision());
assertEquals("cannot allocate because allocation is not permitted to any of the nodes", aud.getExplanation());

internalCluster().startDataOnlyNodes(1);

ensureStableCluster(7);
ensureGreen("test2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ public synchronized void clearShard(ShardId shardId) {
this.cache.deleteShard(shardId);
}

public boolean hasEmptyCache() {
return this.cache.getCache().isEmpty();
}

public AsyncShardFetchCache<T> getCache() {
return this.cache;
}

/**
* Cache implementation of transport actions returning batch of shards related data in the response.
* Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shardRouting))) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", shardRouting);
logger.trace("{}: ignoring allocation, can't be allocated on any node. Decision: {}", shardRouting, allocationDecision.type());
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,37 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadataBatch.

@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
/**
* This function is to check if asyncFetch has happened before for this shard batch, or is ongoing.
* It should return false if there has never been a fetch for this batch.
* This function is currently only used in the case of replica shards when all deciders returned NO/THROTTLE, and explain mode is ON.
* Allocation explain and manual reroute APIs try to append shard store information (matching bytes) to the allocation decision.
* However, these APIs do not want to trigger a new asyncFetch for these ineligible shards, unless the data from nodes is already there.
* This function is used to see if a fetch has happened to decide if it is possible to append shard store info without a new async fetch.
* In the case when shard has a batch but no fetch has happened before, it would be because it is a new batch.
* In the case when shard has a batch, and a fetch has happened before, and no fetch is ongoing, it would be because we have already completed fetch for all nodes.
*
* In order to check if a fetch has ever happened, we check 2 things:
* 1. If the shard batch cache is empty, we know that fetch has never happened so we return false.
* 2. If we see that the list of nodes to fetch from is empty, we know that all nodes have data or are ongoing a fetch. So we return true.
* 3. Otherwise we return false.
*
* see {@link AsyncShardFetchCache#findNodesToFetch()}
*/
String batchId = getBatchId(shard, shard.primary());
return batchId != null;
if (batchId == null) {
return false;
}
logger.trace("Checking if fetching done for batch id {}", batchId);
ShardsBatch shardsBatch = shard.primary() ? batchIdToStartedShardBatch.get(batchId) : batchIdToStoreShardBatch.get(batchId);
// if fetchData has never been called, the per node cache will be empty and have no nodes
// this is because cache.fillShardCacheWithDataNodes(nodes) initialises this map and is called in AsyncShardFetch.fetchData
if (shardsBatch == null || shardsBatch.getAsyncFetcher().hasEmptyCache()) {
logger.trace("Batch cache is empty for batch {} ", batchId);
return false;
}
// this check below is to make sure we already have all the data and that we wouldn't create a new async fetchData call
return shardsBatch.getAsyncFetcher().getCache().findNodesToFetch().isEmpty();
}
}

Expand Down

0 comments on commit d08c425

Please sign in to comment.