diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java index 995be24dd3fcc..1494513dee94e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java @@ -24,10 +24,13 @@ import org.elasticsearch.index.shard.ShardId; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; +import java.util.stream.Collectors; /** * A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards @@ -41,6 +44,10 @@ public class RoutingNode implements Iterable { private final LinkedHashMap shards; // LinkedHashMap to preserve order + private final LinkedHashSet initializingShards; + + private final LinkedHashSet relocatingShards; + public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) { this(nodeId, node, buildShardRoutingMap(shards)); } @@ -49,6 +56,16 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) { this.nodeId = nodeId; this.node = node; this.shards = shards; + this.relocatingShards = new LinkedHashSet<>(); + this.initializingShards = new LinkedHashSet<>(); + for (ShardRouting shardRouting : shards.values()) { + if (shardRouting.initializing()) { + initializingShards.add(shardRouting); + } else if (shardRouting.relocating()) { + relocatingShards.add(shardRouting); + } + } + assert invariant(); } private static LinkedHashMap buildShardRoutingMap(ShardRouting... shardRoutings) { @@ -99,14 +116,23 @@ public int size() { * @param shard Shard to crate on this Node */ void add(ShardRouting shard) { + assert invariant(); if (shards.containsKey(shard.shardId())) { throw new IllegalStateException("Trying to add a shard " + shard.shardId() + " to a node [" + nodeId + "] where it already exists. current [" + shards.get(shard.shardId()) + "]. new [" + shard + "]"); } shards.put(shard.shardId(), shard); + + if (shard.initializing()) { + initializingShards.add(shard); + } else if (shard.relocating()) { + relocatingShards.add(shard); + } + assert invariant(); } void update(ShardRouting oldShard, ShardRouting newShard) { + assert invariant(); if (shards.containsKey(oldShard.shardId()) == false) { // Shard was already removed by routing nodes iterator // TODO: change caller logic in RoutingNodes so that this check can go away @@ -114,11 +140,34 @@ void update(ShardRouting oldShard, ShardRouting newShard) { } ShardRouting previousValue = shards.put(newShard.shardId(), newShard); assert previousValue == oldShard : "expected shard " + previousValue + " but was " + oldShard; + + if (oldShard.initializing()) { + boolean exist = initializingShards.remove(oldShard); + assert exist : "expected shard " + oldShard + " to exist in initializingShards"; + } else if (oldShard.relocating()) { + boolean exist = relocatingShards.remove(oldShard); + assert exist : "expected shard " + oldShard + " to exist in relocatingShards"; + } + if (newShard.initializing()) { + initializingShards.add(newShard); + } else if (newShard.relocating()) { + relocatingShards.add(newShard); + } + assert invariant(); } void remove(ShardRouting shard) { + assert invariant(); ShardRouting previousValue = shards.remove(shard.shardId()); assert previousValue == shard : "expected shard " + previousValue + " but was " + shard; + if (shard.initializing()) { + boolean exist = initializingShards.remove(shard); + assert exist : "expected shard " + shard + " to exist in initializingShards"; + } else if (shard.relocating()) { + boolean exist = relocatingShards.remove(shard); + assert exist : "expected shard " + shard + " to exist in relocatingShards"; + } + assert invariant(); } /** @@ -127,6 +176,14 @@ void remove(ShardRouting shard) { * @return number of shards */ public int numberOfShardsWithState(ShardRoutingState... states) { + if (states.length == 1) { + if (states[0] == ShardRoutingState.INITIALIZING) { + return initializingShards.size(); + } else if (states[0] == ShardRoutingState.RELOCATING) { + return relocatingShards.size(); + } + } + int count = 0; for (ShardRouting shardEntry : this) { for (ShardRoutingState state : states) { @@ -144,6 +201,14 @@ public int numberOfShardsWithState(ShardRoutingState... states) { * @return List of shards */ public List shardsWithState(ShardRoutingState... states) { + if (states.length == 1) { + if (states[0] == ShardRoutingState.INITIALIZING) { + return new ArrayList<>(initializingShards); + } else if (states[0] == ShardRoutingState.RELOCATING) { + return new ArrayList<>(relocatingShards); + } + } + List shards = new ArrayList<>(); for (ShardRouting shardEntry : this) { for (ShardRoutingState state : states) { @@ -164,6 +229,26 @@ public List shardsWithState(ShardRoutingState... states) { public List shardsWithState(String index, ShardRoutingState... states) { List shards = new ArrayList<>(); + if (states.length == 1) { + if (states[0] == ShardRoutingState.INITIALIZING) { + for (ShardRouting shardEntry : initializingShards) { + if (shardEntry.getIndexName().equals(index) == false) { + continue; + } + shards.add(shardEntry); + } + return shards; + } else if (states[0] == ShardRoutingState.RELOCATING) { + for (ShardRouting shardEntry : relocatingShards) { + if (shardEntry.getIndexName().equals(index) == false) { + continue; + } + shards.add(shardEntry); + } + return shards; + } + } + for (ShardRouting shardEntry : this) { if (!shardEntry.getIndexName().equals(index)) { continue; @@ -181,14 +266,7 @@ public List shardsWithState(String index, ShardRoutingState... sta * The number of shards on this node that will not be eventually relocated. */ public int numberOfOwningShards() { - int count = 0; - for (ShardRouting shardEntry : this) { - if (shardEntry.state() != ShardRoutingState.RELOCATING) { - count++; - } - } - - return count; + return shards.size() - relocatingShards.size(); } public String prettyPrint() { @@ -223,4 +301,21 @@ public List copyShards() { public boolean isEmpty() { return shards.isEmpty(); } + + private boolean invariant() { + + // initializingShards must consistent with that in shards + Collection shardRoutingsInitializing = + shards.values().stream().filter(ShardRouting::initializing).collect(Collectors.toList()); + assert initializingShards.size() == shardRoutingsInitializing.size(); + assert initializingShards.containsAll(shardRoutingsInitializing); + + // relocatingShards must consistent with that in shards + Collection shardRoutingsRelocating = + shards.values().stream().filter(ShardRouting::relocating).collect(Collectors.toList()); + assert relocatingShards.size() == shardRoutingsRelocating.size(); + assert relocatingShards.containsAll(shardRoutingsRelocating); + + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 47ecc1b894b32..12f622880cc32 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -122,10 +123,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing // count *just the primaries* currently doing recovery on the node and check against primariesInitialRecoveries int primariesInRecovery = 0; - for (ShardRouting shard : node) { + for (ShardRouting shard : node.shardsWithState(ShardRoutingState.INITIALIZING)) { // when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node* // we only count initial recoveries here, so we need to make sure that relocating node is null - if (shard.initializing() && shard.primary() && shard.relocatingNodeId() == null) { + if (shard.primary() && shard.relocatingNodeId() == null) { primariesInRecovery++; } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java new file mode 100644 index 0000000000000..c88708a33f457 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; + +import java.net.InetAddress; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class RoutingNodeTests extends ESTestCase { + private ShardRouting unassignedShard0 = + TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED); + private ShardRouting initializingShard0 = + TestShardRouting.newShardRouting("test", 1, "node-1", false, ShardRoutingState.INITIALIZING); + private ShardRouting relocatingShard0 = + TestShardRouting.newShardRouting("test", 2, "node-1", "node-2", false, ShardRoutingState.RELOCATING); + private RoutingNode routingNode; + + @Override + public void setUp() throws Exception { + super.setUp(); + InetAddress inetAddress = InetAddress.getByAddress("name1", new byte[] { (byte) 192, (byte) 168, (byte) 0, (byte) 1}); + TransportAddress transportAddress = new TransportAddress(inetAddress, randomIntBetween(0, 65535)); + DiscoveryNode discoveryNode = new DiscoveryNode("name1", "node-1", transportAddress, emptyMap(), emptySet(), Version.CURRENT); + routingNode = new RoutingNode("node1", discoveryNode, unassignedShard0, initializingShard0, relocatingShard0); + } + + public void testAdd() { + ShardRouting initializingShard1 = + TestShardRouting.newShardRouting("test", 3, "node-1", false, ShardRoutingState.INITIALIZING); + ShardRouting relocatingShard0 = + TestShardRouting.newShardRouting("test", 4, "node-1", "node-2",false, ShardRoutingState.RELOCATING); + routingNode.add(initializingShard1); + routingNode.add(relocatingShard0); + assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 3)), equalTo(initializingShard1)); + assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 4)), equalTo(relocatingShard0)); + } + + public void testUpdate() { + ShardRouting startedShard0 = + TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED); + ShardRouting startedShard1 = + TestShardRouting.newShardRouting("test", 1, "node-1", "node-2",false, ShardRoutingState.RELOCATING); + ShardRouting startedShard2 = + TestShardRouting.newShardRouting("test", 2, "node-1", false, ShardRoutingState.INITIALIZING); + routingNode.update(unassignedShard0, startedShard0); + routingNode.update(initializingShard0, startedShard1); + routingNode.update(relocatingShard0, startedShard2); + assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 0)).state(), + equalTo(ShardRoutingState.STARTED)); + assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 1)).state(), + equalTo(ShardRoutingState.RELOCATING)); + assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 2)).state(), + equalTo(ShardRoutingState.INITIALIZING)); + } + + public void testRemove() { + routingNode.remove(unassignedShard0); + routingNode.remove(initializingShard0); + routingNode.remove(relocatingShard0); + assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 0)), is(nullValue())); + assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 1)), is(nullValue())); + assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 2)), is(nullValue())); + } + + public void testNumberOfShardsWithState() { + assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED), equalTo(2)); + assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(1)); + assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); + assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1)); + } + + public void testShardsWithState() { + assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size(), equalTo(2)); + assertThat(routingNode.shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); + assertThat(routingNode.shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); + } + + public void testShardsWithStateInIndex() { + assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size(), equalTo(2)); + assertThat(routingNode.shardsWithState("test", ShardRoutingState.STARTED).size(), equalTo(1)); + assertThat(routingNode.shardsWithState("test", ShardRoutingState.RELOCATING).size(), equalTo(1)); + assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING).size(), equalTo(1)); + } + + public void testNumberOfOwningShards() { + assertThat(routingNode.numberOfOwningShards(), equalTo(2)); + } + +}