From cb034a6c519617ad4591e29da1fbf1d8c72bceff Mon Sep 17 00:00:00 2001 From: Bukhtawar Date: Sat, 6 Jan 2018 00:21:54 +0530 Subject: [PATCH] =?UTF-8?q?Make=20decision=20for=20shards=20at=20a=20node?= =?UTF-8?q?=20level=20to=20prevent=20increase=20in=20the=20pending=20tasks?= =?UTF-8?q?.=20We=20iterate=20through=20all=20shards=20and=20make=20a=20de?= =?UTF-8?q?cision,=20while=20most=20of=20the=20decisions=20can=20be=20take?= =?UTF-8?q?n=20at=20a=20node=20level=20and=20all=20shards=20for=20that=20n?= =?UTF-8?q?ode=20can=20be=20skipped=20as=20ultimately=20a=20don=E2=80=99t?= =?UTF-8?q?=20move=20decision=20will=20anyway=20be=20taken=20for=20all=20s?= =?UTF-8?q?hards=20of=20that=20node=20#27427.=20So=20we=20have=20implement?= =?UTF-8?q?ed=20`canRemainOnNode`=20method=20per=20decider=20which=20is=20?= =?UTF-8?q?equivalent=20to=20`canRemain`=20method=20except=20that=20decisi?= =?UTF-8?q?on=20is=20at=20a=20node=20level.=20This=20PR=20primarily=20addr?= =?UTF-8?q?esses=20the=20review=20comments=20from=20the=20last=20PR=20#276?= =?UTF-8?q?28?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../allocation/AllocationBenchmark.java | 62 ++++++++++++- .../cluster/routing/RoutingNode.java | 49 +++------- .../cluster/routing/RoutingNodes.java | 93 +++++++------------ .../allocator/BalancedShardsAllocator.java | 12 ++- .../allocation/decider/AllocationDecider.java | 11 +++ .../decider/AllocationDeciders.java | 43 ++++----- .../decider/AwarenessAllocationDecider.java | 32 ++----- .../ConcurrentRebalanceAllocationDecider.java | 16 ---- .../decider/DiskThresholdDecider.java | 83 ++++++++--------- .../decider/FilterAllocationDecider.java | 16 ++-- .../decider/ShardsLimitAllocationDecider.java | 73 +++++++++------ .../decider/ThrottlingAllocationDecider.java | 13 +++ 12 files changed, 264 insertions(+), 239 deletions(-) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java index 4c93e7905bb21..ede3e84e852fa 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/routing/allocation/AllocationBenchmark.java @@ -66,9 +66,65 @@ public class AllocationBenchmark { // support to constrain the combinations of benchmark parameters and we do not want to rely on OptionsBuilder as each benchmark would // need its own main method and we cannot execute more than one class with a main method per JAR. @Param({ - //indices| shards| replicas| nodes - " 50| 60| 1| 200", - " 500| 60| 1| 400" + // indices| shards| replicas| nodes + " 10| 1| 0| 1", + " 10| 3| 0| 1", + " 10| 10| 0| 1", + " 100| 1| 0| 1", + " 100| 3| 0| 1", + " 100| 10| 0| 1", + + " 10| 1| 0| 10", + " 10| 3| 0| 10", + " 10| 10| 0| 10", + " 100| 1| 0| 10", + " 100| 3| 0| 10", + " 100| 10| 0| 10", + + " 10| 1| 1| 10", + " 10| 3| 1| 10", + " 10| 10| 1| 10", + " 100| 1| 1| 10", + " 100| 3| 1| 10", + " 100| 10| 1| 10", + + " 10| 1| 2| 10", + " 10| 3| 2| 10", + " 10| 10| 2| 10", + " 100| 1| 2| 10", + " 100| 3| 2| 10", + " 100| 10| 2| 10", + + " 10| 1| 1| 20", + " 10| 3| 1| 20", + " 10| 10| 1| 20", + " 100| 1| 1| 20", + " 100| 3| 1| 20", + " 100| 10| 1| 20", + + " 10| 1| 0| 50", + " 10| 3| 0| 50", + " 10| 10| 0| 50", + " 100| 1| 0| 50", + " 100| 3| 0| 50", + " 100| 10| 0| 50", + + " 10| 1| 1| 50", + " 10| 3| 1| 50", + " 10| 10| 1| 50", + " 100| 1| 1| 50", + " 100| 3| 1| 50", + " 100| 10| 1| 50", + + " 10| 1| 2| 50", + " 10| 3| 2| 50", + " 10| 10| 2| 50", + " 100| 1| 2| 50", + " 100| 3| 2| 50", + " 100| 10| 2| 50", + + " 50| 60| 1| 200", + " 500| 60| 1| 400" }) public String indicesShardsReplicasNodes = "10|1|0|1"; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 93e7f9c302c0d..3ae48a9205d22 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; @@ -47,7 +48,7 @@ public class RoutingNode implements Iterable { private final Map> shardPerState = new LinkedHashMap<>(); - private final Map>> shardPerIndexPerState = new LinkedHashMap<>(); + private final Map> shardPerIndex = new HashMap<>(); public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) { this(nodeId, node, buildShardRoutingMap(shards)); @@ -59,8 +60,7 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) { this.shards = shards; for (Map.Entry shardMap : shards.entrySet()) { shardPerState.computeIfAbsent(shardMap.getValue().state(), k -> new HashSet<>()).add(shardMap.getValue()); - shardPerIndexPerState.computeIfAbsent(shardMap.getKey().getIndex(), k -> new LinkedHashMap<>()) - .computeIfAbsent(shardMap.getValue().state(), k -> new HashSet<>()).add(shardMap.getValue()); + shardPerIndex.computeIfAbsent(shardMap.getKey().getIndex(), k -> new HashSet<>()).add(shardMap.getValue()); } } @@ -76,8 +76,8 @@ private static LinkedHashMap buildShardRoutingMap(ShardRo return shards; } - public Map>> getShardsPerIndexPerState() { - return Collections.unmodifiableMap(shardPerIndexPerState); + public Set indices() { + return Collections.unmodifiableSet(shardPerIndex.keySet()); } @Override @@ -122,8 +122,7 @@ void add(ShardRouting shard) { } shards.put(shard.shardId(), shard); shardPerState.computeIfAbsent(shard.state(), k -> new HashSet<>()).add(shard); - shardPerIndexPerState.computeIfAbsent(shard.index(), k -> new LinkedHashMap<>()) - .computeIfAbsent(shard.state(), k -> new HashSet<>()).add(shard); + shardPerIndex.computeIfAbsent(shard.index(), k -> new HashSet<>()).add(shard); } @@ -137,15 +136,14 @@ void update(ShardRouting oldShard, ShardRouting newShard) { c.remove(oldShard); return c; }); - shardPerIndexPerState.computeIfPresent(oldShard.index(), (k, c) -> { - if (c.get(oldShard.state()) != null) { - c.get(oldShard.state()).remove(oldShard); - } + shardPerIndex.computeIfPresent(oldShard.index(), (k, c) -> { + c.remove(oldShard); + if (c.isEmpty()) + shardPerIndex.remove(oldShard.index()); return c; }); shardPerState.computeIfAbsent(newShard.state(), k -> new HashSet<>()).add(newShard); - shardPerIndexPerState.computeIfAbsent(newShard.index(), k -> new LinkedHashMap<>()) - .computeIfAbsent(newShard.state(), k -> new HashSet<>()).add(newShard); + shardPerIndex.computeIfAbsent(newShard.index(), k -> new HashSet<>()).add(newShard); ShardRouting previousValue = shards.put(newShard.shardId(), newShard); assert previousValue == oldShard : "expected shard " + previousValue + " but was " + oldShard; } @@ -157,10 +155,10 @@ void remove(ShardRouting shard) { c.remove(previousValue); return c.isEmpty() ? null : c; }); - shardPerIndexPerState.computeIfPresent(shard.index(), (k, c) -> { - if (c.get(shard.state()) != null) { - c.get(shard.state()).remove(shard); - } + shardPerIndex.computeIfPresent(shard.index(), (k, c) -> { + c.remove(previousValue); + if (c.isEmpty()) + shardPerIndex.remove(shard.index()); return c; }); } @@ -194,23 +192,6 @@ public List shardsWithState(ShardRoutingState... states) { return shards; } - /** - * Determine the shards of an index with a specific state - * @param index id of the index - * @param states set of states which should be listed - * @return a list of shards - */ - public List shardsWithState(Index index, ShardRoutingState... states) { - List shards = new ArrayList<>(); - Map> shardPerIndexMap = shardPerIndexPerState.get(index); - for (ShardRoutingState state : states) { - if (shardPerIndexMap != null && shardPerIndexMap.get(state) != null) { - shards.addAll(shardPerIndexMap.get(state)); - } - } - return shards; - } - public List shardsWithState(String index, ShardRoutingState... states) { List shards = new ArrayList<>(); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index d156cf6a76b6f..a48bf3192cd8e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -50,9 +50,6 @@ import java.util.Queue; import java.util.Set; import java.util.function.Predicate; -import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.decider.Decision; - /** * {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}. @@ -762,10 +759,10 @@ private void assignedShardsRemove(ShardRouting shard) { // yes we check identity here if (shard == iterator.next()) { iterator.remove(); + removeFromShardPerAttributeCache(shard); return; } } - removeFromShardPerAttributeCache(shard); } assert false : "No shard found to remove"; } @@ -822,19 +819,10 @@ public int size() { return nodesToShards.size(); } - private void updateShardPerAttributeCache(ShardRouting oldShard, ShardRouting newShard) { + private void updateShardPerAttributeCache(ShardRouting oldShard, ShardRouting newShard) { if (attributes.size() > 0) { - for (String awarenessAttribute : attributes) { - Map> shardPerAttributeMap = shardIdPerAttributes.get(oldShard.shardId()); - if (shardPerAttributeMap != null && shardPerAttributeMap.containsKey(awarenessAttribute)) { - shardPerAttributeMap.get(awarenessAttribute).putOrAdd(node(oldShard.currentNodeId()).node().getAttributes().get(awarenessAttribute), 0, -1); - } - if (newShard.initializing() || newShard.started()) { - ObjectIntHashMap attributeMap = shardIdPerAttributes.computeIfAbsent(newShard.shardId(), - k -> new HashMap<>()).computeIfAbsent(awarenessAttribute, k-> new ObjectIntHashMap<>()); - attributeMap.addTo(node(newShard.currentNodeId()).node().getAttributes().get(awarenessAttribute), 1); - } - } + addToShardPerAttributeCache(newShard); + removeFromShardPerAttributeCache(oldShard); } } @@ -842,8 +830,9 @@ private void addToShardPerAttributeCache(ShardRouting shardRouting) { if (attributes.size() > 0) { for (String awarenessAttribute : attributes) { if (shardRouting.initializing() || shardRouting.started()) { - ObjectIntHashMap attributeMap = shardIdPerAttributes.computeIfAbsent(shardRouting.shardId(), - k -> new HashMap<>()).computeIfAbsent(awarenessAttribute, k-> new ObjectIntHashMap<>()); + ObjectIntHashMap attributeMap = shardIdPerAttributes + .computeIfAbsent(shardRouting.shardId(), k -> new HashMap<>()).computeIfAbsent(awarenessAttribute, + k -> new ObjectIntHashMap<>()); attributeMap.addTo(node(shardRouting.currentNodeId()).node().getAttributes().get(awarenessAttribute), 1); } } @@ -854,9 +843,12 @@ private void addToShardPerAttributeCache(ShardRouting shardRouting) { private void removeFromShardPerAttributeCache(ShardRouting shardRouting) { if (attributes.size() > 0) { for (String awarenessAttribute : attributes) { - Map> attributeMap = shardIdPerAttributes.get(shardRouting.shardId()); - if (attributeMap != null && attributeMap.containsKey(awarenessAttribute)) { - attributeMap.get(awarenessAttribute).putOrAdd(node(shardRouting.currentNodeId()).node().getAttributes().get(awarenessAttribute), 0, -1); + if (shardRouting.initializing() || shardRouting.started()) { + Map> attributeMap = shardIdPerAttributes.get(shardRouting.shardId()); + if (attributeMap != null && attributeMap.containsKey(awarenessAttribute)) { + attributeMap.get(awarenessAttribute) + .putOrAdd(node(shardRouting.currentNodeId()).node().getAttributes().get(awarenessAttribute), 0, -1); + } } } } @@ -1187,62 +1179,43 @@ private void ensureMutable() { } /** - * Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from - * the first node, then the first shard of the second node, etc. until one shard from each node has been returned. - * The iterator then resumes on the first node by returning the second shard and continues until all shards from - * all the nodes have been returned. + * Creates an iterator over nodes iterating in a circular fashion: The iterator returns the first shard iterator + * from the first node, then the first shard iterator of the second node, etc. The iterator removes a node(including + * all the shards) if all the shards on the node can stay. */ - public Iterator nodeInterleavedShardIterator(RoutingAllocation allocation) { - // This iterator should eliminate nodes based on node throttling criteria. - final Queue queue = new ArrayDeque<>(); + + public Iterator> nodeShardIterator() { + final Queue> queue = new ArrayDeque<>(); for (Map.Entry entry : nodesToShards.entrySet()) { - queue.add(new Pair(entry.getKey(), entry.getValue().copyShards().iterator())); + queue.add(entry.getValue().copyShards().iterator()); } - return new Iterator() { - private Iterator nextIter; - + return new Iterator>() { public boolean hasNext() { - nextIter = null; while (!queue.isEmpty()) { - if (queue.peek().iter.hasNext()) { - Pair pair = queue.poll(); - RoutingNode routingNode = nodesToShards.get(pair.nodeId); - Decision decision = allocation.deciders().canRemainOnNode(routingNode, allocation); - // Iterate through all the shards only when the best decision is NO - if (decision == Decision.NO) { - ((ArrayDeque) queue).offerFirst(pair); - nextIter = pair.iter; - return true; - } - } else { - queue.poll(); + if (queue.peek().hasNext()) { + return true; } + queue.poll(); } return false; } - public ShardRouting next() { - if (nextIter == null && !nextIter.hasNext() && hasNext() == false) { + public Iterator next() { + if (hasNext() == false) { throw new NoSuchElementException(); } - return nextIter.next(); + Iterator iter = queue.poll(); + queue.offer(iter); + return iter; } + @SuppressWarnings("rawtypes") public void remove() { - throw new UnsupportedOperationException(); + ((ArrayDeque)queue).pollLast(); } }; } - - private static final class Pair { - private String nodeId; - private Iterator iter; - - Pair(String nodeId, Iterator iter) { - this.nodeId = nodeId; - this.iter = iter; - } - } + private static final class Recoveries { private static final Recoveries EMPTY = new Recoveries(); private int incoming = 0; @@ -1279,4 +1252,4 @@ public static Recoveries getOrAdd(Map map, String key) { return recoveries; } } -} +} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index c1f534b34f82a..cb59abdd36348 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -643,9 +643,15 @@ public void moveShards() { // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are // offloading the shards. - for (Iterator it = allocation.routingNodes().nodeInterleavedShardIterator(allocation); it - .hasNext();) { - ShardRouting shardRouting = it.next(); + for (Iterator> it = allocation.routingNodes().nodeShardIterator(); it.hasNext();) { + Iterator shardRoutingIter = it.next(); + ShardRouting shardRouting = shardRoutingIter.next(); + RoutingNode routingNode = allocation.routingNodes().node(shardRouting.currentNodeId()); + Decision decision = allocation.deciders().canRemainOnNode(routingNode, allocation); + if (decision != Decision.NO) { + it.remove(); + continue; + } final MoveDecision moveDecision = decideMove(shardRouting); if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java index 709ccf6b083c5..56114e797df0f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java @@ -109,6 +109,17 @@ public Decision canRebalance(RoutingAllocation allocation) { public Decision canRemainOnNode(RoutingNode node, RoutingAllocation allocation){ return Decision.ALWAYS; } + + /** + * Returns a {@link Decision} whether all the shards on the given + * {@link RoutingNode}} can be remain The default is {@link Decision#ALWAYS}. + * All implementations that override this behaviour must take a + * {@link Decision}} whether or not to skip iterating over the remaining + * deciders over all the shards of this node. + */ + public Decision canMoveAnyShardFromNode(RoutingNode node, RoutingAllocation allocation) { + return Decision.ALWAYS; + } /** * Returns a {@link Decision} whether the given primary shard can be diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java index ea66c907d8130..94f81864664b5 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -230,37 +230,38 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n return ret; } - /** - * Aggregates {@link Decision}} from various {@link AllocationDecider} and selects the best {@link Decision}} based - * on the below criteria - *
    - *
  • {@link Decision#THROTTLE}} would always be the best {@link Decision}} to skip iteration of the current node - * and wouldn't need to check on the {@link Decision} returned by other {@link AllocationDeciders} - *
  • {@link Decision#NO}} for a particular {@link AllocationDecider}} would still want to wait on the best decision {@link Decision#THROTTLE}} - *
  • {@link Decision#YES}} for a particular {@link AllocationDecider}} would need all the other {@link AllocationDeciders}} to return - * {@link Decision#YES}} for a {@link RoutingNode}} to skip iteration for all the {@link ShardRouting}} - *
- */ + + /** + * Aggregates {@link Decision}} from various {@link AllocationDecider} and selects the best {@link Decision}} based + * on the below criteria + *
    + *
  • {@link Decision#NO}} for a particular {@link AllocationDecider}} would be the best decision. + *
  • {@link Decision#YES}} for a particular {@link AllocationDecider}} would need all the other {@link AllocationDeciders}} to return + * {@link Decision#YES}} for a {@link RoutingNode}} to skip iteration for all the {@link ShardRouting}} + *
+ */ @Override public Decision canRemainOnNode(RoutingNode node, RoutingAllocation allocation) { - Decision bestDecision = Decision.ALWAYS; + Decision decision = Decision.ALWAYS; for (AllocationDecider decider : allocations) { - Decision decision = decider.canRemainOnNode(node, allocation); - if (decision == Decision.THROTTLE) { + Decision throttlingDecision = decider.canMoveAnyShardFromNode(node, allocation); + if (throttlingDecision == Decision.NO) { if (logger.isTraceEnabled()) { - logger.trace("Pre-emptively returning decision [{}] from decider [{}] for node [{}]", decision.type(), + logger.trace("Pre-emptively returning decision [{}] from decider [{}] for node [{}]", throttlingDecision.type(), decider.getClass().getSimpleName(), node.nodeId()); } - return decision; - } else if (decision == Decision.NO) { - bestDecision = decision; + return Decision.THROTTLE; } } + for (AllocationDecider decider : allocations) { + decision = decider.canRemainOnNode(node, allocation); + if (decision == Decision.NO) + break; + } if (logger.isTraceEnabled()) { - logger.trace("Returning decision after iterating all the deciders best decision [{}] for node [{}]", bestDecision.type(), + logger.trace("Returning decision after iterating all the deciders best decision [{}] for node [{}]", decision.type(), node.nodeId()); } - return bestDecision; - + return decision; } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index 5ac22fb1177e4..1cfe8054aefde 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -117,16 +117,15 @@ private void setAwarenessAttributes(String[] awarenessAttributes) { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - return underCapacity(shardRouting, node, allocation, true, false); + return underCapacity(shardRouting, node, allocation, true); } @Override public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - return underCapacity(shardRouting, node, allocation, false, false); + return underCapacity(shardRouting, node, allocation, false); } - private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode, - boolean useCache) { + private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) { if (awarenessAttributes.length == 0) { return allocation.decision(Decision.YES, NAME, "allocation awareness is not enabled, set cluster setting [%s] to enable it", @@ -146,24 +145,13 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout // build attr_value -> nodes map ObjectIntHashMap nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute); + + // build the count of shards per attribute value ObjectIntHashMap shardPerAttribute = new ObjectIntHashMap<>(); - if (useCache) { - Map> shardIdMap = allocation.routingNodes().getShardIdPerAttributeMap() - .get(shardRouting.shardId()); - if (shardIdMap != null && shardIdMap.get(awarenessAttribute) != null) { - shardPerAttribute = shardIdMap.get(awarenessAttribute); - } - } else { - // build the count of shards per attribute value - for (ShardRouting assignedShard : allocation.routingNodes().assignedShards(shardRouting.shardId())) { - if (assignedShard.started() || assignedShard.initializing()) { - // Note: this also counts relocation targets as that will be the new location of - // the shard. - // Relocation sources should not be counted as the shard is moving away - RoutingNode routingNode = allocation.routingNodes().node(assignedShard.currentNodeId()); - shardPerAttribute.addTo(routingNode.node().getAttributes().get(awarenessAttribute), 1); - } - } + Map> shardIdMap = allocation.routingNodes().getShardIdPerAttributeMap() + .get(shardRouting.shardId()); + if (shardIdMap != null && shardIdMap.get(awarenessAttribute) != null) { + shardPerAttribute = shardIdMap.get(awarenessAttribute).clone(); } if (moveToNode) { @@ -242,7 +230,7 @@ public Decision canRemainOnNode(RoutingNode node, RoutingAllocation allocation) return allocation.decision(Decision.YES, NAME, "allocation awareness is not enabled"); } for (ShardRouting shardRouting : node) { - decision = underCapacity(shardRouting, node, allocation, false, true); + decision = underCapacity(shardRouting, node, allocation, false); if (decision == Decision.NO) { return decision; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index d284048ee816a..63fbad59b922a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -76,20 +76,4 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", clusterConcurrentRebalance, relocatingShards); } - - @Override - public Decision canRebalance(RoutingAllocation allocation) { - if (clusterConcurrentRebalance == -1) { - return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); - } - int relocatingShards = allocation.routingNodes().getRelocatingShardCount(); - if (relocatingShards >= clusterConcurrentRebalance) { - return allocation.decision(Decision.NO, NAME, - "too many shards are concurrently rebalancing [%d], limit: [%d]", - relocatingShards, clusterConcurrentRebalance); - } - return allocation.decision(Decision.YES, NAME, - "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", - clusterConcurrentRebalance, relocatingShards); - } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 16c6ed37e3c44..d27396053eea6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -250,68 +250,67 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing @Override public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - return canRemain(shardRouting, node, allocation, false); - } - - private Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean remainOnNode) { - if (!remainOnNode && shardRouting.currentNodeId().equals(node.nodeId()) == false) { + if (shardRouting.currentNodeId().equals(node.nodeId()) == false) { throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]"); } + final ClusterInfo clusterInfo = allocation.clusterInfo(); + final ImmutableOpenMap usages = clusterInfo.getNodeLeastAvailableDiskUsages(); + final DiskUsage usage = getDiskUsage(node, allocation, usages, true); + // If this node is already above the high threshold, the shard cannot remain + // (get it off!) + final String dataPath = clusterInfo.getDataPath(shardRouting); + if (dataPath == null || usage.getPath().equals(dataPath) == false) { + return allocation.decision(Decision.YES, NAME, "this shard is not allocated on the most utilized disk and can remain"); + } + return canRemainOnNode(node, allocation); + } + + + @Override + public Decision canRemainOnNode(RoutingNode node, RoutingAllocation allocation) { final ClusterInfo clusterInfo = allocation.clusterInfo(); final ImmutableOpenMap usages = clusterInfo.getNodeLeastAvailableDiskUsages(); final Decision decision = earlyTerminate(allocation, usages); if (decision != null) { return decision; } - - // subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk + // subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk // since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check. final DiskUsage usage = getDiskUsage(node, allocation, usages, true); - // If this node is already above the high threshold, the shard cannot remain (get it off!) final double freeDiskPercentage = usage.getFreeDiskAsPercentage(); final long freeBytes = usage.getFreeBytes(); if (logger.isTraceEnabled()) { logger.trace("node [{}] has {}% free disk ({} bytes)", node.nodeId(), freeDiskPercentage, freeBytes); } - if (!remainOnNode) { - final String dataPath = clusterInfo.getDataPath(shardRouting); - if (dataPath == null || usage.getPath().equals(dataPath) == false) { - return allocation.decision(Decision.YES, NAME, "this shard is not allocated on the most utilized disk and can remain"); - } - } if (freeBytes < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { if (logger.isDebugEnabled()) { logger.debug("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain", - diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytes, node.nodeId()); - } - return allocation.decision(Decision.NO, NAME, - "the shard cannot remain on this node because it is above the high watermark cluster setting [%s=%s] " + - "and there is less than the required [%s] free space on node, actual free: [%s]", - CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), - diskThresholdSettings.getHighWatermarkRaw(), - diskThresholdSettings.getFreeBytesThresholdHigh(), new ByteSizeValue(freeBytes)); - } - if (freeDiskPercentage < diskThresholdSettings.getFreeDiskThresholdHigh()) { - if (logger.isDebugEnabled()) { - logger.debug("less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain", - diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage, node.nodeId()); - } - return allocation.decision(Decision.NO, NAME, - "the shard cannot remain on this node because it is above the high watermark cluster setting [%s=%s] " + - "and there is less than the required [%s%%] free disk on node, actual free: [%s%%]", - CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), - diskThresholdSettings.getHighWatermarkRaw(), - diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage); - } + diskThresholdSettings.getFreeBytesThresholdHigh(), freeBytes, node.nodeId()); + } + return allocation.decision(Decision.NO, NAME, + "the shard cannot remain on this node because it is above the high watermark cluster setting [%s=%s] " + + "and there is less than the required [%s] free space on node, actual free: [%s]", + CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), + diskThresholdSettings.getHighWatermarkRaw(), + diskThresholdSettings.getFreeBytesThresholdHigh(), new ByteSizeValue(freeBytes)); + } + if (freeDiskPercentage < diskThresholdSettings.getFreeDiskThresholdHigh()) { + if (logger.isDebugEnabled()) { + logger.debug("less than the required {}% free disk threshold ({}% free) on node {}, shard cannot remain", + diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage, node.nodeId()); + } + return allocation.decision(Decision.NO, NAME, + "the shard cannot remain on this node because it is above the high watermark cluster setting [%s=%s] " + + "and there is less than the required [%s%%] free disk on node, actual free: [%s%%]", + CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), + diskThresholdSettings.getHighWatermarkRaw(), + diskThresholdSettings.getFreeDiskThresholdHigh(), freeDiskPercentage); + } - return allocation.decision(Decision.YES, NAME, - "there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes)); - } - - @Override - public Decision canRemainOnNode(RoutingNode node, RoutingAllocation allocation) { - return canRemain(null, node, allocation, true); + return allocation.decision(Decision.YES, NAME, + "there is enough disk on this node for the shard to remain, free: [%s]", new ByteSizeValue(freeBytes)); } + private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, ImmutableOpenMap usages, boolean subtractLeavingShards) { DiskUsage usage = usages.get(node.nodeId()); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index 17d47c2c27c67..eba6391b47afd 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -198,15 +198,13 @@ public Decision canRemainOnNode(RoutingNode node, RoutingAllocation allocation) private Decision shouldIndexNodeFilter(RoutingNode node, RoutingAllocation allocation) { Decision decision = null; - Map>> shardsPerIndexPerState = node.getShardsPerIndexPerState(); - ImmutableOpenMap indexMd = allocation.metaData().getIndices(); - for (ObjectObjectCursor indexMdEntry : indexMd) { - Set keySet = shardsPerIndexPerState.keySet(); - if (keySet != null && keySet.contains(indexMdEntry.value.getIndex())) { - decision = shouldIndexFilter(indexMdEntry.value, node, allocation); - if (decision != null) { - return decision; - } + Set indices = node.indices(); + for (Index index : indices) { + ImmutableOpenMap indexMd = allocation.metaData().getIndices(); + IndexMetaData indexMetaData = indexMd.get(index.getName()); + decision = shouldIndexFilter(indexMetaData, node, allocation); + if (decision != null) { + return decision; } } return null; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java index dbec56851e90a..90bb579c4af8f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java @@ -124,19 +124,35 @@ private Decision doDecide(ShardRouting shardRouting, RoutingNode node, RoutingAl } } - if (clusterShardLimit > 0 && decider.test(nodeShardCount, clusterShardLimit)) { - return allocation.decision(Decision.NO, NAME, - "too many shards [%d] allocated to this node, cluster setting [%s=%d]", - nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit); - } - if (indexShardLimit > 0 && decider.test(indexShardCount, indexShardLimit)) { - return allocation.decision(Decision.NO, NAME, - "too many shards [%d] allocated to this node for index [%s], index setting [%s=%d]", - indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit); - } + Decision clusterShardLimitDecision = decideClusterShardLimit(nodeShardCount, clusterShardLimit, decider, allocation); + if (clusterShardLimitDecision == Decision.NO) + return clusterShardLimitDecision; + + Decision indexShardLimitDecision = decideIndexShardLimit(indexShardCount, indexShardLimit, decider, allocation, + shardRouting.getIndexName()); + if (indexShardLimitDecision == Decision.NO) + return indexShardLimitDecision; + return allocation.decision(Decision.YES, NAME, - "the shard count [%d] for this node is under the index limit [%d] and cluster level node limit [%d]", - nodeShardCount, indexShardLimit, clusterShardLimit); + "the shard count [%d] for this node is under the index limit [%d] and cluster level node limit [%d]", nodeShardCount, + indexShardLimit, clusterShardLimit); + } + + private Decision decideClusterShardLimit(int nodeShardCount, int clusterShardLimit, BiPredicate decider, + RoutingAllocation allocation) { + if (clusterShardLimit > 0 && decider.test(nodeShardCount, clusterShardLimit)) + return allocation.decision(Decision.NO, NAME, "too many shards [%d] allocated to this node, cluster setting [%s=%d]", + nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit); + return Decision.ALWAYS; + } + + private Decision decideIndexShardLimit(int indexShardCount, int indexShardLimit, BiPredicate decider, + RoutingAllocation allocation, String indexName) { + if (indexShardLimit > 0 && decider.test(indexShardCount, indexShardLimit)) + return allocation.decision(Decision.NO, NAME, + "too many shards [%d] allocated to this node for index [%s], index setting [%s=%d]", indexShardCount, indexName, + INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit); + return Decision.ALWAYS; } @Override @@ -145,25 +161,24 @@ public Decision canRemainOnNode(RoutingNode node, RoutingAllocation allocation) int nodeShardCount = node.numberOfShardsWithState(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED, ShardRoutingState.UNASSIGNED); - if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) { - return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], cluster-level limit per node: [%d]", - nodeShardCount, clusterShardLimit); - } + Decision clusterShardLimitDecision = decideClusterShardLimit(nodeShardCount, clusterShardLimit, (count, limit) -> count > limit, + allocation); + if (clusterShardLimitDecision == Decision.NO) + return clusterShardLimitDecision; + ImmutableOpenMap indexMd = allocation.metaData().getIndices(); for (ObjectObjectCursor indexMdEntry : indexMd) { final int indexShardLimit = INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(indexMdEntry.value.getSettings(), settings); if (indexShardLimit > 0) { List shardPerIndex = node.shardsWithState(indexMdEntry.key, ShardRoutingState.INITIALIZING, - ShardRoutingState.STARTED, ShardRoutingState.UNASSIGNED); - if (indexShardLimit > 0 && shardPerIndex.size() > indexShardLimit) { - return allocation.decision(Decision.NO, NAME, - "too many shards [%d] allocated to this node for index [%s], index setting [%s=%d]", shardPerIndex.size(), - indexMdEntry.key, INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit); - } + ShardRoutingState.STARTED); + Decision indexShardLimitDecision = decideIndexShardLimit(shardPerIndex.size(), indexShardLimit, + (count, limit) -> count > limit, allocation, indexMdEntry.key); + if (indexShardLimitDecision == Decision.NO) + return indexShardLimitDecision; } } - return allocation.decision(Decision.YES, NAME, - "the shard count is under index limit [%d] and cluster level node limit [%d] of total shards per node", clusterShardLimit); + return allocation.decision(Decision.YES, NAME, "the shard count is under index and cluster limit per node"); } @Override @@ -186,11 +201,11 @@ public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) { } nodeShardCount++; } - if (clusterShardLimit >= 0 && nodeShardCount >= clusterShardLimit) { - return allocation.decision(Decision.NO, NAME, - "too many shards [%d] allocated to this node, cluster setting [%s=%d]", - nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit); - } + Decision clusterShardLimitDecision = decideClusterShardLimit(nodeShardCount, clusterShardLimit, (count, limit) -> count >= limit, + allocation); + if (clusterShardLimitDecision == Decision.NO) + return clusterShardLimitDecision; + return allocation.decision(Decision.YES, NAME, "the shard count [%d] for this node is under the cluster level node limit [%d]", nodeShardCount, clusterShardLimit); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index e7e387a0804ee..28ef9466b034d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -217,4 +217,17 @@ public Decision canRemainOnNode(RoutingNode node, RoutingAllocation allocation) concurrentOutgoingRecoveries); } } + + @Override + public Decision canMoveAnyShardFromNode(RoutingNode node, RoutingAllocation allocation) { + int outgoingRecoveries = allocation.routingNodes().getOutgoingRecoveries(node.nodeId()); + if (outgoingRecoveries >= concurrentOutgoingRecoveries) { + return allocation.decision(Decision.NO, NAME, "too many outgoing shards are currently recovering [%d], limit: [%d]", + outgoingRecoveries, concurrentOutgoingRecoveries); + } else { + return allocation.decision(YES, NAME, "below shard recovery limit of outgoing: [%d < %d]", outgoingRecoveries, + concurrentOutgoingRecoveries); + } + } + }