From 6eb8f6f307567892bbabbe37aff7cd42be486df0 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Thu, 20 Jan 2022 15:02:54 -0800 Subject: [PATCH] Prioritize primary shard movement during shard allocation (#1445) When some node or set of nodes is excluded (based on some cluster setting) BalancedShardsAllocator iterates over them in breadth first order picking 1 shard from each node and repeating the process until all shards are balanced. Since shards from each node are picked randomly it's possible the p and r of shard1 is relocated first leaving behind both p and r of shard2. If the excluded nodes were to go down the cluster becomes red. This commit introduces a new setting "cluster.routing.allocation.move.primary_first" that prioritizes the p of both shard1 and shard2 first so the cluster does not become red if the excluded nodes were to go down before relocating other shards. Note that with this setting enabled performance of this change is a direct function of number of indices, shards, replicas, and nodes. The larger the indices, replicas, and distribution scale, the slower the allocation becomes. This should be used with care. Signed-off-by: Ankit Jain --- .../cluster/routing/RoutingNode.java | 130 +++++++++++--- .../cluster/routing/RoutingNodes.java | 131 +++++++++----- .../allocator/BalancedShardsAllocator.java | 35 +++- .../common/settings/ClusterSettings.java | 1 + .../routing/MovePrimaryFirstTests.java | 117 +++++++++++++ .../cluster/routing/RoutingNodeTests.java | 24 +++ .../cluster/routing/RoutingNodesTests.java | 163 ++++++++++++++++++ 7 files changed, 530 insertions(+), 71 deletions(-) create mode 100644 server/src/test/java/org/opensearch/cluster/routing/MovePrimaryFirstTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java index 123d09246bb7b..1b1a9394ff306 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java @@ -34,6 +34,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; +import org.opensearch.common.collect.Tuple; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; @@ -48,6 +49,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards @@ -55,11 +57,87 @@ */ public class RoutingNode implements Iterable { + static class BucketedShards implements Iterable { + private final Tuple, LinkedHashMap> shardTuple; // LinkedHashMap to + // preserve order + + BucketedShards(LinkedHashMap primaryShards, LinkedHashMap replicaShards) { + this.shardTuple = new Tuple(primaryShards, replicaShards); + } + + public boolean isEmpty() { + return this.shardTuple.v1().isEmpty() && this.shardTuple.v2().isEmpty(); + } + + public int size() { + return this.shardTuple.v1().size() + this.shardTuple.v2().size(); + } + + public boolean containsKey(ShardId shardId) { + return this.shardTuple.v1().containsKey(shardId) || this.shardTuple.v2().containsKey(shardId); + } + + public ShardRouting get(ShardId shardId) { + if (this.shardTuple.v1().containsKey(shardId)) { + return this.shardTuple.v1().get(shardId); + } + return this.shardTuple.v2().get(shardId); + } + + public ShardRouting add(ShardRouting shardRouting) { + return put(shardRouting.shardId(), shardRouting); + } + + public ShardRouting put(ShardId shardId, ShardRouting shardRouting) { + ShardRouting ret; + if (shardRouting.primary()) { + ret = this.shardTuple.v1().put(shardId, shardRouting); + if (this.shardTuple.v2().containsKey(shardId)) { + ret = this.shardTuple.v2().remove(shardId); + } + } else { + ret = this.shardTuple.v2().put(shardId, shardRouting); + if (this.shardTuple.v1().containsKey(shardId)) { + ret = this.shardTuple.v1().remove(shardId); + } + } + + return ret; + } + + public ShardRouting remove(ShardId shardId) { + if (this.shardTuple.v1().containsKey(shardId)) { + return this.shardTuple.v1().remove(shardId); + } + return this.shardTuple.v2().remove(shardId); + } + + @Override + public Iterator iterator() { + final Iterator primaryIterator = Collections.unmodifiableCollection(this.shardTuple.v1().values()).iterator(); + final Iterator replicaIterator = Collections.unmodifiableCollection(this.shardTuple.v2().values()).iterator(); + return new Iterator() { + @Override + public boolean hasNext() { + return primaryIterator.hasNext() || replicaIterator.hasNext(); + } + + @Override + public ShardRouting next() { + if (primaryIterator.hasNext()) { + return primaryIterator.next(); + } + return replicaIterator.next(); + } + }; + } + } + private final String nodeId; private final DiscoveryNode node; - private final LinkedHashMap shards; // LinkedHashMap to preserve order + private final BucketedShards shards; private final LinkedHashSet initializingShards; @@ -67,44 +145,44 @@ public class RoutingNode implements Iterable { private final HashMap> shardsByIndex; - public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) { - this(nodeId, node, buildShardRoutingMap(shards)); - } - - RoutingNode(String nodeId, DiscoveryNode node, LinkedHashMap shards) { + public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shardRoutings) { this.nodeId = nodeId; this.node = node; - this.shards = shards; + final LinkedHashMap primaryShards = new LinkedHashMap<>(); + final LinkedHashMap replicaShards = new LinkedHashMap<>(); + this.shards = new BucketedShards(primaryShards, replicaShards); this.relocatingShards = new LinkedHashSet<>(); this.initializingShards = new LinkedHashSet<>(); this.shardsByIndex = new LinkedHashMap<>(); - for (ShardRouting shardRouting : shards.values()) { + + for (ShardRouting shardRouting : shardRoutings) { if (shardRouting.initializing()) { initializingShards.add(shardRouting); } else if (shardRouting.relocating()) { relocatingShards.add(shardRouting); } shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting); - } - assert invariant(); - } - private static LinkedHashMap buildShardRoutingMap(ShardRouting... shardRoutings) { - final LinkedHashMap shards = new LinkedHashMap<>(); - for (ShardRouting shardRouting : shardRoutings) { - ShardRouting previousValue = shards.put(shardRouting.shardId(), shardRouting); + ShardRouting previousValue; + if (shardRouting.primary()) { + previousValue = primaryShards.put(shardRouting.shardId(), shardRouting); + } else { + previousValue = replicaShards.put(shardRouting.shardId(), shardRouting); + } + if (previousValue != null) { throw new IllegalArgumentException( "Cannot have two different shards with same shard id " + shardRouting.shardId() + " on same node " ); } } - return shards; + + assert invariant(); } @Override public Iterator iterator() { - return Collections.unmodifiableCollection(shards.values()).iterator(); + return shards.iterator(); } /** @@ -139,7 +217,7 @@ public int size() { */ void add(ShardRouting shard) { assert invariant(); - if (shards.containsKey(shard.shardId())) { + if (shards.add(shard) != null) { throw new IllegalStateException( "Trying to add a shard " + shard.shardId() @@ -152,7 +230,6 @@ void add(ShardRouting shard) { + "]" ); } - shards.put(shard.shardId(), shard); if (shard.initializing()) { initializingShards.add(shard); @@ -322,7 +399,7 @@ public int numberOfOwningShardsForIndex(final Index index) { public String prettyPrint() { StringBuilder sb = new StringBuilder(); sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n"); - for (ShardRouting entry : shards.values()) { + for (ShardRouting entry : shards) { sb.append("--------").append(entry.shortSummary()).append('\n'); } return sb.toString(); @@ -345,7 +422,9 @@ public String toString() { } public List copyShards() { - return new ArrayList<>(shards.values()); + List result = new ArrayList<>(); + shards.forEach(result::add); + return result; } public boolean isEmpty() { @@ -355,23 +434,20 @@ public boolean isEmpty() { private boolean invariant() { // initializingShards must consistent with that in shards - Collection shardRoutingsInitializing = shards.values() - .stream() + Collection shardRoutingsInitializing = StreamSupport.stream(shards.spliterator(), false) .filter(ShardRouting::initializing) .collect(Collectors.toList()); assert initializingShards.size() == shardRoutingsInitializing.size(); assert initializingShards.containsAll(shardRoutingsInitializing); // relocatingShards must consistent with that in shards - Collection shardRoutingsRelocating = shards.values() - .stream() + Collection shardRoutingsRelocating = StreamSupport.stream(shards.spliterator(), false) .filter(ShardRouting::relocating) .collect(Collectors.toList()); assert relocatingShards.size() == shardRoutingsRelocating.size(); assert relocatingShards.containsAll(shardRoutingsRelocating); - final Map> shardRoutingsByIndex = shards.values() - .stream() + final Map> shardRoutingsByIndex = StreamSupport.stream(shards.spliterator(), false) .collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet())); assert shardRoutingsByIndex.equals(shardsByIndex); diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index d81e7fa6e22d9..b5353382f06b8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -56,7 +56,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.ListIterator; import java.util.Map; @@ -108,10 +107,10 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { this.readOnly = readOnly; final RoutingTable routingTable = clusterState.routingTable(); - Map> nodesToShards = new HashMap<>(); // fill in the nodeToShards with the "live" nodes for (ObjectCursor cursor : clusterState.nodes().getDataNodes().values()) { - nodesToShards.put(cursor.value.getId(), new LinkedHashMap<>()); // LinkedHashMap to preserve order + String nodeId = cursor.value.getId(); + this.nodesToShards.put(cursor.value.getId(), new RoutingNode(nodeId, clusterState.nodes().get(nodeId))); } // fill in the inverse of node -> shards allocated @@ -125,27 +124,23 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { // by the ShardId, as this is common for primary and replicas. // A replica Set might have one (and not more) replicas with the state of RELOCATING. if (shard.assignedToNode()) { - Map entries = nodesToShards.computeIfAbsent( + RoutingNode routingNode = this.nodesToShards.computeIfAbsent( shard.currentNodeId(), - k -> new LinkedHashMap<>() - ); // LinkedHashMap to preserve order - ShardRouting previousValue = entries.put(shard.shardId(), shard); - if (previousValue != null) { - throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node"); - } + k -> new RoutingNode(shard.currentNodeId(), clusterState.nodes().get(shard.currentNodeId())) + ); + routingNode.add(shard); assignedShardsAdd(shard); if (shard.relocating()) { relocatingShards++; - // LinkedHashMap to preserve order. // Add the counterpart shard with relocatingNodeId reflecting the source from which // it's relocating from. - entries = nodesToShards.computeIfAbsent(shard.relocatingNodeId(), k -> new LinkedHashMap<>()); + routingNode = nodesToShards.computeIfAbsent( + shard.relocatingNodeId(), + k -> new RoutingNode(shard.relocatingNodeId(), clusterState.nodes().get(shard.relocatingNodeId())) + ); ShardRouting targetShardRouting = shard.getTargetRelocatingShard(); addInitialRecovery(targetShardRouting, indexShard.primary); - previousValue = entries.put(targetShardRouting.shardId(), targetShardRouting); - if (previousValue != null) { - throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node"); - } + routingNode.add(targetShardRouting); assignedShardsAdd(targetShardRouting); } else if (shard.initializing()) { if (shard.primary()) { @@ -160,10 +155,6 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { } } } - for (Map.Entry> entry : nodesToShards.entrySet()) { - String nodeId = entry.getKey(); - this.nodesToShards.put(nodeId, new RoutingNode(nodeId, clusterState.nodes().get(nodeId), entry.getValue())); - } } private void addRecovery(ShardRouting routing) { @@ -1289,37 +1280,97 @@ private void ensureMutable() { * the first node, then the first shard of the second node, etc. until one shard from each node has been returned. * The iterator then resumes on the first node by returning the second shard and continues until all shards from * all the nodes have been returned. + * @param movePrimaryFirst if true, all primary shards are iterated over before iterating replica for any node + * @return iterator of shard routings */ - public Iterator nodeInterleavedShardIterator() { + public Iterator nodeInterleavedShardIterator(boolean movePrimaryFirst) { final Queue> queue = new ArrayDeque<>(); for (Map.Entry entry : nodesToShards.entrySet()) { queue.add(entry.getValue().copyShards().iterator()); } - return new Iterator() { - public boolean hasNext() { - while (!queue.isEmpty()) { - if (queue.peek().hasNext()) { + if (movePrimaryFirst) { + return new Iterator() { + private Queue replicaShards = new ArrayDeque<>(); + private Queue> replicaIterators = new ArrayDeque<>(); + + public boolean hasNext() { + while (!queue.isEmpty()) { + if (queue.peek().hasNext()) { + return true; + } + queue.poll(); + } + if (!replicaShards.isEmpty()) { return true; } - queue.poll(); + while (!replicaIterators.isEmpty()) { + if (replicaIterators.peek().hasNext()) { + return true; + } + replicaIterators.poll(); + } + return false; } - return false; - } - public ShardRouting next() { - if (hasNext() == false) { - throw new NoSuchElementException(); + public ShardRouting next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + while (!queue.isEmpty()) { + Iterator iter = queue.poll(); + if (iter.hasNext()) { + ShardRouting result = iter.next(); + if (result.primary()) { + queue.offer(iter); + return result; + } + replicaShards.offer(result); + replicaIterators.offer(iter); + } + } + if (!replicaShards.isEmpty()) { + return replicaShards.poll(); + } + Iterator replicaIterator = replicaIterators.poll(); + ShardRouting replicaShard = replicaIterator.next(); + replicaIterators.offer(replicaIterator); + + assert !replicaShard.primary(); + return replicaShard; } - Iterator iter = queue.poll(); - ShardRouting result = iter.next(); - queue.offer(iter); - return result; - } - public void remove() { - throw new UnsupportedOperationException(); - } - }; + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } else { + return new Iterator() { + @Override + public boolean hasNext() { + while (!queue.isEmpty()) { + if (queue.peek().hasNext()) { + return true; + } + queue.poll(); + } + return false; + } + + @Override + public ShardRouting next() { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + Iterator iter = queue.poll(); + queue.offer(iter); + return iter.next(); + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } } private static final class Recoveries { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 93c9df71656f9..b3a045af91952 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -109,6 +109,12 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.Dynamic, Property.NodeScope ); + public static final Setting SHARD_MOVE_PRIMARY_FIRST_SETTING = Setting.boolSetting( + "cluster.routing.allocation.move.primary_first", + false, + Property.Dynamic, + Property.NodeScope + ); public static final Setting THRESHOLD_SETTING = Setting.floatSetting( "cluster.routing.allocation.balance.threshold", 1.0f, @@ -117,6 +123,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + private volatile boolean movePrimaryFirst; private volatile WeightFunction weightFunction; private volatile float threshold; @@ -128,10 +135,15 @@ public BalancedShardsAllocator(Settings settings) { public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings)); setThreshold(THRESHOLD_SETTING.get(settings)); + clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); } + private void setMovePrimaryFirst(boolean movePrimaryFirst) { + this.movePrimaryFirst = movePrimaryFirst; + } + private void setWeightFunction(float indexBalance, float shardBalanceFactor) { weightFunction = new WeightFunction(indexBalance, shardBalanceFactor); } @@ -146,7 +158,7 @@ public void allocate(RoutingAllocation allocation) { failAllocationOfNewPrimaries(allocation); return; } - final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); + final Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); balancer.allocateUnassigned(); balancer.moveShards(); balancer.balance(); @@ -154,7 +166,7 @@ public void allocate(RoutingAllocation allocation) { @Override public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) { - Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold); + Balancer balancer = new Balancer(logger, allocation, movePrimaryFirst, weightFunction, threshold); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; if (shard.unassigned()) { @@ -283,6 +295,7 @@ public static class Balancer { private final Map nodes; private final RoutingAllocation allocation; private final RoutingNodes routingNodes; + private final boolean movePrimaryFirst; private final WeightFunction weight; private final float threshold; @@ -291,9 +304,10 @@ public static class Balancer { private final NodeSorter sorter; private final Set inEligibleTargetNode; - public Balancer(Logger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) { + public Balancer(Logger logger, RoutingAllocation allocation, boolean movePrimaryFirst, WeightFunction weight, float threshold) { this.logger = logger; this.allocation = allocation; + this.movePrimaryFirst = movePrimaryFirst; this.weight = weight; this.threshold = threshold; this.routingNodes = allocation.routingNodes(); @@ -725,7 +739,8 @@ public void moveShards() { for (ModelNode currentNode : sorter.modelNodes) { checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); } - for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext();) { + boolean primariesThrottled = false; + for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(movePrimaryFirst); it.hasNext();) { // Verify if the cluster concurrent recoveries have been reached. if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) { logger.info( @@ -745,11 +760,23 @@ public void moveShards() { ShardRouting shardRouting = it.next(); + // Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled + if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) { + logger.info( + "Cannot move any replica shard in the cluster as movePrimaryFirst is enabled and primary shards" + + "are being throttled. Skipping shard iteration" + ); + return; + } + // Verify if the shard is allowed to move if outgoing recovery on the node hosting the primary shard // is not being throttled. Decision canMoveAwayDecision = allocation.deciders().canMoveAway(shardRouting, allocation); if (canMoveAwayDecision.type() != Decision.Type.YES) { if (logger.isDebugEnabled()) logger.debug("Cannot move away shard [{}] Skipping this shard", shardRouting); + if (shardRouting.primary() && canMoveAwayDecision.type() == Type.THROTTLE) { + primariesThrottled = true; + } continue; } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index f1187f3344c83..3e666697fd317 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -219,6 +219,7 @@ public void apply(Settings value, Settings current, Settings previous) { AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, + BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/MovePrimaryFirstTests.java b/server/src/test/java/org/opensearch/cluster/routing/MovePrimaryFirstTests.java new file mode 100644 index 0000000000000..64e622888018f --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/MovePrimaryFirstTests.java @@ -0,0 +1,117 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Iterator; +import java.util.concurrent.CountDownLatch; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class MovePrimaryFirstTests extends OpenSearchIntegTestCase { + + protected String startDataOnlyNode(final String zone) { + final Settings settings = Settings.builder().put("node.attr.zone", zone).build(); + return internalCluster().startDataOnlyNode(settings); + } + + protected void createAndIndex(String index, int replicaCount, int shardCount) { + assertAcked( + prepareCreate( + index, + -1, + Settings.builder() + .put("number_of_shards", shardCount) + .put("number_of_replicas", replicaCount) + .put("max_result_window", 20000) + ) + ); + int startDocCountId = 0; + for (int i = 0; i < 10; i++) { + index(index, "_doc", Integer.toString(startDocCountId), "foo", "bar" + startDocCountId); + ++startDocCountId; + } + flushAndRefresh(index); + } + + /** + * Creates two nodes each in two zones and shuts down nodes in one zone + * after relocating half the number of shards. Since, primaries are relocated + * first, cluster should stay green as primary should have relocated + */ + public void testClusterGreenAfterPartialRelocation() throws InterruptedException { + internalCluster().startMasterOnlyNodes(1); + final String z1 = "zone-1", z2 = "zone-2"; + final int primaryShardCount = 100; + final String z1n1 = startDataOnlyNode(z1); + ensureGreen(); + createAndIndex("foo", 1, primaryShardCount); + ensureYellow(); + // Start second node in same zone only after yellow cluster to ensure + // that one gets all primaries and other all secondaries + final String z1n2 = startDataOnlyNode(z1); + ensureGreen(); + + // Enable cluster level setting for moving primaries first and keep new + // zone nodes excluded to prevent any shard relocation + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + settingsRequest.persistentSettings( + Settings.builder().put("cluster.routing.allocation.move.primary_first", true).put("cluster.routing.allocation.exclude.zone", z2) + ); + client().admin().cluster().updateSettings(settingsRequest).actionGet(); + + final String z2n1 = startDataOnlyNode(z2); + final String z2n2 = startDataOnlyNode(z2); + + // Create cluster state listener to compute number of shards on new zone + // nodes before counting down the latch + final CountDownLatch primaryMoveLatch = new CountDownLatch(1); + final ClusterStateListener listener = event -> { + if (event.routingTableChanged()) { + final RoutingNodes routingNodes = event.state().getRoutingNodes(); + int startedz2n1 = 0; + int startedz2n2 = 0; + for (Iterator it = routingNodes.iterator(); it.hasNext();) { + RoutingNode routingNode = it.next(); + final String nodeName = routingNode.node().getName(); + if (nodeName.equals(z2n1)) { + startedz2n1 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED); + } else if (nodeName.equals(z2n2)) { + startedz2n2 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED); + } + } + if (startedz2n1 >= primaryShardCount / 2 && startedz2n2 >= primaryShardCount / 2) { + primaryMoveLatch.countDown(); + } + } + }; + internalCluster().clusterService().addListener(listener); + + // Exclude zone1 nodes for allocation and await latch count down + settingsRequest = new ClusterUpdateSettingsRequest(); + settingsRequest.persistentSettings(Settings.builder().put("cluster.routing.allocation.exclude.zone", z1)); + client().admin().cluster().updateSettings(settingsRequest); + primaryMoveLatch.await(); + + // Shutdown both nodes in zone and ensure cluster stays green + try { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(z1n1)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(z1n2)); + } catch (Exception e) {} + ensureGreen(); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java index 8451633710ce5..5bd5b7d9f6a67 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java @@ -41,6 +41,7 @@ import org.opensearch.test.OpenSearchTestCase; import java.net.InetAddress; +import java.util.Iterator; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -86,6 +87,29 @@ public void testAdd() { assertThat(routingNode.getByShardId(new ShardId("test", IndexMetadata.INDEX_UUID_NA_VALUE, 4)), equalTo(relocatingShard0)); } + public void testPrimaryFirstIterator() { + ShardRouting initializingShard3 = TestShardRouting.newShardRouting("test", 3, "node-1", false, ShardRoutingState.INITIALIZING); + ShardRouting relocatingShard4 = TestShardRouting.newShardRouting("test", 4, "node-1", "node-2", true, ShardRoutingState.RELOCATING); + ShardRouting initializingShard5 = TestShardRouting.newShardRouting("test", 5, "node-1", true, ShardRoutingState.INITIALIZING); + routingNode.add(initializingShard3); + routingNode.add(relocatingShard4); + routingNode.add(initializingShard5); + final Iterator shardRoutingIterator = routingNode.iterator(); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(relocatingShard4)); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(initializingShard5)); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(unassignedShard0)); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(initializingShard0)); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(relocatingShard0)); + assertTrue(shardRoutingIterator.hasNext()); + assertThat(shardRoutingIterator.next(), equalTo(initializingShard3)); + assertFalse(shardRoutingIterator.hasNext()); + } + public void testUpdate() { ShardRouting startedShard0 = TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED); ShardRouting startedShard1 = TestShardRouting.newShardRouting("test", 1, "node-1", "node-2", false, ShardRoutingState.RELOCATING); diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java new file mode 100644 index 0000000000000..3e9088d63cfb4 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing; + +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.opensearch.common.settings.Settings; + +import java.util.Iterator; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; + +public class RoutingNodesTests extends OpenSearchAllocationTestCase { + private static final String TEST_INDEX_1 = "test1"; + private static final String TEST_INDEX_2 = "test2"; + private RoutingTable emptyRoutingTable; + private int numberOfShards; + private int numberOfReplicas; + private int shardsPerIndex; + private int totalNumberOfShards; + private static final Settings DEFAULT_SETTINGS = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build(); + private final AllocationService ALLOCATION_SERVICE = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", Integer.MAX_VALUE) // don't limit recoveries + .put("cluster.routing.allocation.node_initial_primaries_recoveries", Integer.MAX_VALUE) + .put( + ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), + Integer.MAX_VALUE + ) + .build() + ); + private ClusterState clusterState; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + this.numberOfShards = 5; + this.numberOfReplicas = 2; + this.shardsPerIndex = this.numberOfShards * (this.numberOfReplicas + 1); + this.totalNumberOfShards = this.shardsPerIndex * 2; + logger.info("Setup test with {} shards and {} replicas.", this.numberOfShards, this.numberOfReplicas); + this.emptyRoutingTable = new RoutingTable.Builder().build(); + Metadata metadata = Metadata.builder().put(createIndexMetadata(TEST_INDEX_1)).put(createIndexMetadata(TEST_INDEX_2)).build(); + + RoutingTable testRoutingTable = new RoutingTable.Builder().add( + new IndexRoutingTable.Builder(metadata.index(TEST_INDEX_1).getIndex()).initializeAsNew(metadata.index(TEST_INDEX_1)).build() + ) + .add( + new IndexRoutingTable.Builder(metadata.index(TEST_INDEX_2).getIndex()).initializeAsNew(metadata.index(TEST_INDEX_2)).build() + ) + .build(); + this.clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(testRoutingTable) + .build(); + } + + /** + * Puts primary shard index routings into initializing state + */ + private void initPrimaries() { + logger.info("adding {} nodes and performing rerouting", this.numberOfReplicas + 1); + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < this.numberOfReplicas + 1; i++) { + discoBuilder = discoBuilder.add(newNode("node" + i)); + } + this.clusterState = ClusterState.builder(clusterState).nodes(discoBuilder).build(); + ClusterState rerouteResult = ALLOCATION_SERVICE.reroute(clusterState, "reroute"); + assertThat(rerouteResult, not(equalTo(this.clusterState))); + this.clusterState = rerouteResult; + } + + /** + * Moves initializing shards into started state + */ + private void startInitializingShards(String index) { + clusterState = startInitializingShardsAndReroute(ALLOCATION_SERVICE, clusterState, index); + } + + private IndexMetadata.Builder createIndexMetadata(String indexName) { + return new IndexMetadata.Builder(indexName).settings(DEFAULT_SETTINGS) + .numberOfReplicas(this.numberOfReplicas) + .numberOfShards(this.numberOfShards); + } + + public void testInterleavedShardIterator() { + // Initialize all the shards for test index 1 and 2 + initPrimaries(); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_2); + startInitializingShards(TEST_INDEX_2); + + // Create primary shard count imbalance between two nodes + final RoutingNode node0 = this.clusterState.getRoutingNodes().node("node0"); + final RoutingNode node1 = this.clusterState.getRoutingNodes().node("node1"); + final List shardRoutingList = node0.shardsWithState(TEST_INDEX_1, ShardRoutingState.STARTED); + for (ShardRouting routing : shardRoutingList) { + if (routing.primary()) { + node0.remove(routing); + ShardRouting swap = node1.getByShardId(routing.shardId()); + node0.add(swap); + node1.remove(swap); + node1.add(routing); + } + } + + // Get primary first shard iterator and assert primary shards are iterated over first + final Iterator iterator = this.clusterState.getRoutingNodes().nodeInterleavedShardIterator(true); + boolean iteratingPrimary = true; + int shardCount = 0; + while (iterator.hasNext()) { + final ShardRouting shard = iterator.next(); + if (iteratingPrimary) { + iteratingPrimary = shard.primary(); + } else { + assert shard.primary() == false; + } + shardCount++; + } + assert shardCount == this.totalNumberOfShards; + } +}