Skip to content

Commit

Permalink
Make decision for shards at a node level to prevent increase in the p…
Browse files Browse the repository at this point in the history
…ending tasks.

We iterate through all shards and make a decision, while most of the decisions can be taken at a node level and all shards for that node can be skipped
as ultimately a don’t move decision will anyway be taken for all shards of that node elastic#27427. So we have implemented `canRemainOnNode` method per decider
which is equivalent to `canRemain` method except that decision is at a node level. This PR primarily addresses the review comments from the last PR elastic#27628
  • Loading branch information
Bukhtawar committed Jan 5, 2018
1 parent e13f1e2 commit cb034a6
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,65 @@ public class AllocationBenchmark {
// support to constrain the combinations of benchmark parameters and we do not want to rely on OptionsBuilder as each benchmark would
// need its own main method and we cannot execute more than one class with a main method per JAR.
@Param({
//indices| shards| replicas| nodes
" 50| 60| 1| 200",
" 500| 60| 1| 400"
// indices| shards| replicas| nodes
" 10| 1| 0| 1",
" 10| 3| 0| 1",
" 10| 10| 0| 1",
" 100| 1| 0| 1",
" 100| 3| 0| 1",
" 100| 10| 0| 1",

" 10| 1| 0| 10",
" 10| 3| 0| 10",
" 10| 10| 0| 10",
" 100| 1| 0| 10",
" 100| 3| 0| 10",
" 100| 10| 0| 10",

" 10| 1| 1| 10",
" 10| 3| 1| 10",
" 10| 10| 1| 10",
" 100| 1| 1| 10",
" 100| 3| 1| 10",
" 100| 10| 1| 10",

" 10| 1| 2| 10",
" 10| 3| 2| 10",
" 10| 10| 2| 10",
" 100| 1| 2| 10",
" 100| 3| 2| 10",
" 100| 10| 2| 10",

" 10| 1| 1| 20",
" 10| 3| 1| 20",
" 10| 10| 1| 20",
" 100| 1| 1| 20",
" 100| 3| 1| 20",
" 100| 10| 1| 20",

" 10| 1| 0| 50",
" 10| 3| 0| 50",
" 10| 10| 0| 50",
" 100| 1| 0| 50",
" 100| 3| 0| 50",
" 100| 10| 0| 50",

" 10| 1| 1| 50",
" 10| 3| 1| 50",
" 10| 10| 1| 50",
" 100| 1| 1| 50",
" 100| 3| 1| 50",
" 100| 10| 1| 50",

" 10| 1| 2| 50",
" 10| 3| 2| 50",
" 10| 10| 2| 50",
" 100| 1| 2| 50",
" 100| 3| 2| 50",
" 100| 10| 2| 50",

" 50| 60| 1| 200",
" 500| 60| 1| 400"

})
public String indicesShardsReplicasNodes = "10|1|0|1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand All @@ -47,7 +48,7 @@ public class RoutingNode implements Iterable<ShardRouting> {

private final Map<ShardRoutingState, Set<ShardRouting>> shardPerState = new LinkedHashMap<>();

private final Map<Index, Map<ShardRoutingState, Set<ShardRouting>>> shardPerIndexPerState = new LinkedHashMap<>();
private final Map<Index, Set<ShardRouting>> shardPerIndex = new HashMap<>();

public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this(nodeId, node, buildShardRoutingMap(shards));
Expand All @@ -59,8 +60,7 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this.shards = shards;
for (Map.Entry<ShardId, ShardRouting> shardMap : shards.entrySet()) {
shardPerState.computeIfAbsent(shardMap.getValue().state(), k -> new HashSet<>()).add(shardMap.getValue());
shardPerIndexPerState.computeIfAbsent(shardMap.getKey().getIndex(), k -> new LinkedHashMap<>())
.computeIfAbsent(shardMap.getValue().state(), k -> new HashSet<>()).add(shardMap.getValue());
shardPerIndex.computeIfAbsent(shardMap.getKey().getIndex(), k -> new HashSet<>()).add(shardMap.getValue());
}
}

Expand All @@ -76,8 +76,8 @@ private static LinkedHashMap<ShardId, ShardRouting> buildShardRoutingMap(ShardRo
return shards;
}

public Map<Index, Map<ShardRoutingState, Set<ShardRouting>>> getShardsPerIndexPerState() {
return Collections.unmodifiableMap(shardPerIndexPerState);
public Set<Index> indices() {
return Collections.unmodifiableSet(shardPerIndex.keySet());
}

@Override
Expand Down Expand Up @@ -122,8 +122,7 @@ void add(ShardRouting shard) {
}
shards.put(shard.shardId(), shard);
shardPerState.computeIfAbsent(shard.state(), k -> new HashSet<>()).add(shard);
shardPerIndexPerState.computeIfAbsent(shard.index(), k -> new LinkedHashMap<>())
.computeIfAbsent(shard.state(), k -> new HashSet<>()).add(shard);
shardPerIndex.computeIfAbsent(shard.index(), k -> new HashSet<>()).add(shard);

}

Expand All @@ -137,15 +136,14 @@ void update(ShardRouting oldShard, ShardRouting newShard) {
c.remove(oldShard);
return c;
});
shardPerIndexPerState.computeIfPresent(oldShard.index(), (k, c) -> {
if (c.get(oldShard.state()) != null) {
c.get(oldShard.state()).remove(oldShard);
}
shardPerIndex.computeIfPresent(oldShard.index(), (k, c) -> {
c.remove(oldShard);
if (c.isEmpty())
shardPerIndex.remove(oldShard.index());
return c;
});
shardPerState.computeIfAbsent(newShard.state(), k -> new HashSet<>()).add(newShard);
shardPerIndexPerState.computeIfAbsent(newShard.index(), k -> new LinkedHashMap<>())
.computeIfAbsent(newShard.state(), k -> new HashSet<>()).add(newShard);
shardPerIndex.computeIfAbsent(newShard.index(), k -> new HashSet<>()).add(newShard);
ShardRouting previousValue = shards.put(newShard.shardId(), newShard);
assert previousValue == oldShard : "expected shard " + previousValue + " but was " + oldShard;
}
Expand All @@ -157,10 +155,10 @@ void remove(ShardRouting shard) {
c.remove(previousValue);
return c.isEmpty() ? null : c;
});
shardPerIndexPerState.computeIfPresent(shard.index(), (k, c) -> {
if (c.get(shard.state()) != null) {
c.get(shard.state()).remove(shard);
}
shardPerIndex.computeIfPresent(shard.index(), (k, c) -> {
c.remove(previousValue);
if (c.isEmpty())
shardPerIndex.remove(shard.index());
return c;
});
}
Expand Down Expand Up @@ -194,23 +192,6 @@ public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
return shards;
}

/**
* Determine the shards of an index with a specific state
* @param index id of the index
* @param states set of states which should be listed
* @return a list of shards
*/
public List<ShardRouting> shardsWithState(Index index, ShardRoutingState... states) {
List<ShardRouting> shards = new ArrayList<>();
Map<ShardRoutingState, Set<ShardRouting>> shardPerIndexMap = shardPerIndexPerState.get(index);
for (ShardRoutingState state : states) {
if (shardPerIndexMap != null && shardPerIndexMap.get(state) != null) {
shards.addAll(shardPerIndexMap.get(state));
}
}
return shards;
}

public List<ShardRouting> shardsWithState(String index, ShardRoutingState... states) {
List<ShardRouting> shards = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@
import java.util.Queue;
import java.util.Set;
import java.util.function.Predicate;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;


/**
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
Expand Down Expand Up @@ -762,10 +759,10 @@ private void assignedShardsRemove(ShardRouting shard) {
// yes we check identity here
if (shard == iterator.next()) {
iterator.remove();
removeFromShardPerAttributeCache(shard);
return;
}
}
removeFromShardPerAttributeCache(shard);
}
assert false : "No shard found to remove";
}
Expand Down Expand Up @@ -822,28 +819,20 @@ public int size() {
return nodesToShards.size();
}

private void updateShardPerAttributeCache(ShardRouting oldShard, ShardRouting newShard) {
private void updateShardPerAttributeCache(ShardRouting oldShard, ShardRouting newShard) {
if (attributes.size() > 0) {
for (String awarenessAttribute : attributes) {
Map<String, ObjectIntHashMap<String>> shardPerAttributeMap = shardIdPerAttributes.get(oldShard.shardId());
if (shardPerAttributeMap != null && shardPerAttributeMap.containsKey(awarenessAttribute)) {
shardPerAttributeMap.get(awarenessAttribute).putOrAdd(node(oldShard.currentNodeId()).node().getAttributes().get(awarenessAttribute), 0, -1);
}
if (newShard.initializing() || newShard.started()) {
ObjectIntHashMap<String> attributeMap = shardIdPerAttributes.computeIfAbsent(newShard.shardId(),
k -> new HashMap<>()).computeIfAbsent(awarenessAttribute, k-> new ObjectIntHashMap<>());
attributeMap.addTo(node(newShard.currentNodeId()).node().getAttributes().get(awarenessAttribute), 1);
}
}
addToShardPerAttributeCache(newShard);
removeFromShardPerAttributeCache(oldShard);
}
}

private void addToShardPerAttributeCache(ShardRouting shardRouting) {
if (attributes.size() > 0) {
for (String awarenessAttribute : attributes) {
if (shardRouting.initializing() || shardRouting.started()) {
ObjectIntHashMap<String> attributeMap = shardIdPerAttributes.computeIfAbsent(shardRouting.shardId(),
k -> new HashMap<>()).computeIfAbsent(awarenessAttribute, k-> new ObjectIntHashMap<>());
ObjectIntHashMap<String> attributeMap = shardIdPerAttributes
.computeIfAbsent(shardRouting.shardId(), k -> new HashMap<>()).computeIfAbsent(awarenessAttribute,
k -> new ObjectIntHashMap<>());
attributeMap.addTo(node(shardRouting.currentNodeId()).node().getAttributes().get(awarenessAttribute), 1);
}
}
Expand All @@ -854,9 +843,12 @@ private void addToShardPerAttributeCache(ShardRouting shardRouting) {
private void removeFromShardPerAttributeCache(ShardRouting shardRouting) {
if (attributes.size() > 0) {
for (String awarenessAttribute : attributes) {
Map<String, ObjectIntHashMap<String>> attributeMap = shardIdPerAttributes.get(shardRouting.shardId());
if (attributeMap != null && attributeMap.containsKey(awarenessAttribute)) {
attributeMap.get(awarenessAttribute).putOrAdd(node(shardRouting.currentNodeId()).node().getAttributes().get(awarenessAttribute), 0, -1);
if (shardRouting.initializing() || shardRouting.started()) {
Map<String, ObjectIntHashMap<String>> attributeMap = shardIdPerAttributes.get(shardRouting.shardId());
if (attributeMap != null && attributeMap.containsKey(awarenessAttribute)) {
attributeMap.get(awarenessAttribute)
.putOrAdd(node(shardRouting.currentNodeId()).node().getAttributes().get(awarenessAttribute), 0, -1);
}
}
}
}
Expand Down Expand Up @@ -1187,62 +1179,43 @@ private void ensureMutable() {
}

/**
* Creates an iterator over shards interleaving between nodes: The iterator returns the first shard from
* the first node, then the first shard of the second node, etc. until one shard from each node has been returned.
* The iterator then resumes on the first node by returning the second shard and continues until all shards from
* all the nodes have been returned.
* Creates an iterator over nodes iterating in a circular fashion: The iterator returns the first shard iterator
* from the first node, then the first shard iterator of the second node, etc. The iterator removes a node(including
* all the shards) if all the shards on the node can stay.
*/
public Iterator<ShardRouting> nodeInterleavedShardIterator(RoutingAllocation allocation) {
// This iterator should eliminate nodes based on node throttling criteria.
final Queue<Pair> queue = new ArrayDeque<>();

public Iterator<Iterator<ShardRouting>> nodeShardIterator() {
final Queue<Iterator<ShardRouting>> queue = new ArrayDeque<>();
for (Map.Entry<String, RoutingNode> entry : nodesToShards.entrySet()) {
queue.add(new Pair(entry.getKey(), entry.getValue().copyShards().iterator()));
queue.add(entry.getValue().copyShards().iterator());
}
return new Iterator<ShardRouting>() {
private Iterator<ShardRouting> nextIter;

return new Iterator<Iterator<ShardRouting>>() {
public boolean hasNext() {
nextIter = null;
while (!queue.isEmpty()) {
if (queue.peek().iter.hasNext()) {
Pair pair = queue.poll();
RoutingNode routingNode = nodesToShards.get(pair.nodeId);
Decision decision = allocation.deciders().canRemainOnNode(routingNode, allocation);
// Iterate through all the shards only when the best decision is NO
if (decision == Decision.NO) {
((ArrayDeque) queue).offerFirst(pair);
nextIter = pair.iter;
return true;
}
} else {
queue.poll();
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
return false;
}

public ShardRouting next() {
if (nextIter == null && !nextIter.hasNext() && hasNext() == false) {
public Iterator<ShardRouting> next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
return nextIter.next();
Iterator<ShardRouting> iter = queue.poll();
queue.offer(iter);
return iter;
}

@SuppressWarnings("rawtypes")
public void remove() {
throw new UnsupportedOperationException();
((ArrayDeque)queue).pollLast();
}
};
}

private static final class Pair {
private String nodeId;
private Iterator<ShardRouting> iter;

Pair(String nodeId, Iterator<ShardRouting> iter) {
this.nodeId = nodeId;
this.iter = iter;
}
}

private static final class Recoveries {
private static final Recoveries EMPTY = new Recoveries();
private int incoming = 0;
Expand Down Expand Up @@ -1279,4 +1252,4 @@ public static Recoveries getOrAdd(Map<String, Recoveries> map, String key) {
return recoveries;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,15 @@ public void moveShards() {
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(allocation); it
.hasNext();) {
ShardRouting shardRouting = it.next();
for (Iterator<Iterator<ShardRouting>> it = allocation.routingNodes().nodeShardIterator(); it.hasNext();) {
Iterator<ShardRouting> shardRoutingIter = it.next();
ShardRouting shardRouting = shardRoutingIter.next();
RoutingNode routingNode = allocation.routingNodes().node(shardRouting.currentNodeId());
Decision decision = allocation.deciders().canRemainOnNode(routingNode, allocation);
if (decision != Decision.NO) {
it.remove();
continue;
}
final MoveDecision moveDecision = decideMove(shardRouting);
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,17 @@ public Decision canRebalance(RoutingAllocation allocation) {
public Decision canRemainOnNode(RoutingNode node, RoutingAllocation allocation){
return Decision.ALWAYS;
}

/**
* Returns a {@link Decision} whether all the shards on the given
* {@link RoutingNode}} can be remain The default is {@link Decision#ALWAYS}.
* All implementations that override this behaviour must take a
* {@link Decision}} whether or not to skip iterating over the remaining
* deciders over all the shards of this node.
*/
public Decision canMoveAnyShardFromNode(RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}

/**
* Returns a {@link Decision} whether the given primary shard can be
Expand Down
Loading

0 comments on commit cb034a6

Please sign in to comment.