Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster access to INITIALIZING/RELOCATING shards #47817

Merged
merged 9 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.elasticsearch.index.shard.ShardId;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.stream.Collectors;

/**
* A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards
Expand All @@ -41,6 +44,10 @@ public class RoutingNode implements Iterable<ShardRouting> {

private final LinkedHashMap<ShardId, ShardRouting> shards; // LinkedHashMap to preserve order

private final LinkedHashSet<ShardRouting> initializingShards;

private final LinkedHashSet<ShardRouting> relocatingShards;

public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this(nodeId, node, buildShardRoutingMap(shards));
}
Expand All @@ -49,6 +56,16 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this.nodeId = nodeId;
this.node = node;
this.shards = shards;
this.relocatingShards = new LinkedHashSet<>();
this.initializingShards = new LinkedHashSet<>();
for (ShardRouting shardRouting : shards.values()) {
if (shardRouting.initializing()) {
initializingShards.add(shardRouting);
} else if (shardRouting.relocating()) {
relocatingShards.add(shardRouting);
}
}
assert invariant();
}

private static LinkedHashMap<ShardId, ShardRouting> buildShardRoutingMap(ShardRouting... shardRoutings) {
Expand Down Expand Up @@ -99,26 +116,58 @@ public int size() {
* @param shard Shard to crate on this Node
*/
void add(ShardRouting shard) {
assert invariant();
if (shards.containsKey(shard.shardId())) {
throw new IllegalStateException("Trying to add a shard " + shard.shardId() + " to a node [" + nodeId
+ "] where it already exists. current [" + shards.get(shard.shardId()) + "]. new [" + shard + "]");
}
shards.put(shard.shardId(), shard);

if (shard.initializing()) {
initializingShards.add(shard);
} else if (shard.relocating()) {
relocatingShards.add(shard);
}
assert invariant();
}

void update(ShardRouting oldShard, ShardRouting newShard) {
assert invariant();
if (shards.containsKey(oldShard.shardId()) == false) {
// Shard was already removed by routing nodes iterator
// TODO: change caller logic in RoutingNodes so that this check can go away
return;
}
ShardRouting previousValue = shards.put(newShard.shardId(), newShard);
assert previousValue == oldShard : "expected shard " + previousValue + " but was " + oldShard;

if (oldShard.initializing()) {
boolean exist = initializingShards.remove(oldShard);
assert exist : "expected shard " + oldShard + " to exist in initializingShards";
} else if (oldShard.relocating()) {
boolean exist = relocatingShards.remove(oldShard);
assert exist : "expected shard " + oldShard + " to exist in relocatingShards";
}
if (newShard.initializing()) {
initializingShards.add(newShard);
} else if (newShard.relocating()) {
relocatingShards.add(newShard);
}
assert invariant();
}

void remove(ShardRouting shard) {
assert invariant();
ShardRouting previousValue = shards.remove(shard.shardId());
assert previousValue == shard : "expected shard " + previousValue + " but was " + shard;
if (shard.initializing()) {
boolean exist = initializingShards.remove(shard);
assert exist : "expected shard " + shard + " to exist in initializingShards";
} else if (shard.relocating()) {
boolean exist = relocatingShards.remove(shard);
assert exist : "expected shard " + shard + " to exist in relocatingShards";
}
assert invariant();
}

/**
Expand All @@ -127,6 +176,14 @@ void remove(ShardRouting shard) {
* @return number of shards
*/
public int numberOfShardsWithState(ShardRoutingState... states) {
if (states.length == 1) {
if (states[0] == ShardRoutingState.INITIALIZING) {
return initializingShards.size();
} else if (states[0] == ShardRoutingState.RELOCATING) {
return relocatingShards.size();
}
}

int count = 0;
for (ShardRouting shardEntry : this) {
for (ShardRoutingState state : states) {
Expand All @@ -144,6 +201,14 @@ public int numberOfShardsWithState(ShardRoutingState... states) {
* @return List of shards
*/
public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
if (states.length == 1) {
if (states[0] == ShardRoutingState.INITIALIZING) {
return new ArrayList<>(initializingShards);
} else if (states[0] == ShardRoutingState.RELOCATING) {
return new ArrayList<>(relocatingShards);
}
}

List<ShardRouting> shards = new ArrayList<>();
for (ShardRouting shardEntry : this) {
for (ShardRoutingState state : states) {
Expand All @@ -164,6 +229,26 @@ public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
public List<ShardRouting> shardsWithState(String index, ShardRoutingState... states) {
List<ShardRouting> shards = new ArrayList<>();

if (states.length == 1) {
if (states[0] == ShardRoutingState.INITIALIZING) {
for (ShardRouting shardEntry : initializingShards) {
if (shardEntry.getIndexName().equals(index) == false) {
continue;
}
shards.add(shardEntry);
}
return shards;
} else if (states[0] == ShardRoutingState.RELOCATING) {
for (ShardRouting shardEntry : relocatingShards) {
if (shardEntry.getIndexName().equals(index) == false) {
continue;
}
shards.add(shardEntry);
}
return shards;
}
}

for (ShardRouting shardEntry : this) {
if (!shardEntry.getIndexName().equals(index)) {
continue;
Expand All @@ -181,14 +266,7 @@ public List<ShardRouting> shardsWithState(String index, ShardRoutingState... sta
* The number of shards on this node that will not be eventually relocated.
*/
public int numberOfOwningShards() {
int count = 0;
for (ShardRouting shardEntry : this) {
if (shardEntry.state() != ShardRoutingState.RELOCATING) {
count++;
}
}

return count;
return shards.size() - relocatingShards.size();
}

public String prettyPrint() {
Expand Down Expand Up @@ -223,4 +301,21 @@ public List<ShardRouting> copyShards() {
public boolean isEmpty() {
return shards.isEmpty();
}

private boolean invariant() {
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved

// initializingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsInitializing =
shards.values().stream().filter(ShardRouting::initializing).collect(Collectors.toList());
assert initializingShards.size() == shardRoutingsInitializing.size();
assert initializingShards.containsAll(shardRoutingsInitializing);
kkewwei marked this conversation as resolved.
Show resolved Hide resolved

// relocatingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsRelocating =
shards.values().stream().filter(ShardRouting::relocating).collect(Collectors.toList());
assert relocatingShards.size() == shardRoutingsRelocating.size();
assert relocatingShards.containsAll(shardRoutingsRelocating);
kkewwei marked this conversation as resolved.
Show resolved Hide resolved

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -122,10 +123,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
// count *just the primaries* currently doing recovery on the node and check against primariesInitialRecoveries

int primariesInRecovery = 0;
for (ShardRouting shard : node) {
for (ShardRouting shard : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
// when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node*
// we only count initial recoveries here, so we need to make sure that relocating node is null
if (shard.initializing() && shard.primary() && shard.relocatingNodeId() == null) {
if (shard.primary() && shard.relocatingNodeId() == null) {
primariesInRecovery++;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;

import java.net.InetAddress;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

public class RoutingNodeTests extends ESTestCase {
private ShardRouting unassignedShard0 =
TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED);
private ShardRouting initializingShard0 =
TestShardRouting.newShardRouting("test", 1, "node-1", false, ShardRoutingState.INITIALIZING);
private ShardRouting relocatingShard0 =
TestShardRouting.newShardRouting("test", 2, "node-1", "node-2", false, ShardRoutingState.RELOCATING);
private RoutingNode routingNode;

@Override
public void setUp() throws Exception {
super.setUp();
InetAddress inetAddress = InetAddress.getByAddress("name1", new byte[] { (byte) 192, (byte) 168, (byte) 0, (byte) 1});
TransportAddress transportAddress = new TransportAddress(inetAddress, randomIntBetween(0, 65535));
DiscoveryNode discoveryNode = new DiscoveryNode("name1", "node-1", transportAddress, emptyMap(), emptySet(), Version.CURRENT);
routingNode = new RoutingNode("node1", discoveryNode, unassignedShard0, initializingShard0, relocatingShard0);
}

public void testAdd() {
ShardRouting initializingShard1 =
TestShardRouting.newShardRouting("test", 3, "node-1", false, ShardRoutingState.INITIALIZING);
ShardRouting relocatingShard0 =
TestShardRouting.newShardRouting("test", 4, "node-1", "node-2",false, ShardRoutingState.RELOCATING);
routingNode.add(initializingShard1);
routingNode.add(relocatingShard0);
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 3)), equalTo(initializingShard1));
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 4)), equalTo(relocatingShard0));
}

public void testUpdate() {
ShardRouting startedShard0 =
TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED);
ShardRouting startedShard1 =
TestShardRouting.newShardRouting("test", 1, "node-1", "node-2",false, ShardRoutingState.RELOCATING);
ShardRouting startedShard2 =
TestShardRouting.newShardRouting("test", 2, "node-1", false, ShardRoutingState.INITIALIZING);
routingNode.update(unassignedShard0, startedShard0);
routingNode.update(initializingShard0, startedShard1);
routingNode.update(relocatingShard0, startedShard2);
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 0)).state(),
equalTo(ShardRoutingState.STARTED));
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 1)).state(),
equalTo(ShardRoutingState.RELOCATING));
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 2)).state(),
equalTo(ShardRoutingState.INITIALIZING));
}

public void testRemove() {
routingNode.remove(unassignedShard0);
routingNode.remove(initializingShard0);
routingNode.remove(relocatingShard0);
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 0)), is(nullValue()));
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 1)), is(nullValue()));
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 2)), is(nullValue()));
}

public void testNumberOfShardsWithState() {
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED), equalTo(2));
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(1));
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
}

public void testShardsWithState() {
assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size(), equalTo(2));
assertThat(routingNode.shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1));
assertThat(routingNode.shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1));
assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
}

public void testShardsWithStateInIndex() {
assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size(), equalTo(2));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.STARTED).size(), equalTo(1));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.RELOCATING).size(), equalTo(1));
assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING).size(), equalTo(1));
}

public void testNumberOfOwningShards() {
assertThat(routingNode.numberOfOwningShards(), equalTo(2));
}

}