-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Faster access to INITIALIZING/RELOCATING shards #47817
Changes from 4 commits
990cabb
48d7338
896fffb
be79a2d
bc5a058
28edf71
f969808
917655d
2cdd65a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -28,6 +28,9 @@ | |||||
import java.util.Iterator; | ||||||
import java.util.LinkedHashMap; | ||||||
import java.util.List; | ||||||
import java.util.LinkedHashSet; | ||||||
import java.util.Collection; | ||||||
import java.util.stream.Collectors; | ||||||
|
||||||
/** | ||||||
* A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards | ||||||
|
@@ -41,6 +44,10 @@ public class RoutingNode implements Iterable<ShardRouting> { | |||||
|
||||||
private final LinkedHashMap<ShardId, ShardRouting> shards; // LinkedHashMap to preserve order | ||||||
|
||||||
private final LinkedHashSet<ShardRouting> initializingShards; | ||||||
|
||||||
private final LinkedHashSet<ShardRouting> relocatingShards; | ||||||
|
||||||
public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) { | ||||||
this(nodeId, node, buildShardRoutingMap(shards)); | ||||||
} | ||||||
|
@@ -49,6 +56,17 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) { | |||||
this.nodeId = nodeId; | ||||||
this.node = node; | ||||||
this.shards = shards; | ||||||
this.relocatingShards = new LinkedHashSet<>(); | ||||||
this.initializingShards = new LinkedHashSet<>(); | ||||||
for (ShardRouting shardRouting : shards.values()) { | ||||||
if (shardRouting.initializing()) { | ||||||
initializingShards.add(shardRouting); | ||||||
} else if (shardRouting.relocating()) { | ||||||
relocatingShards.add(shardRouting); | ||||||
} | ||||||
} | ||||||
|
||||||
assert invariant(); | ||||||
} | ||||||
|
||||||
private static LinkedHashMap<ShardId, ShardRouting> buildShardRoutingMap(ShardRouting... shardRoutings) { | ||||||
|
@@ -104,6 +122,14 @@ void add(ShardRouting shard) { | |||||
+ "] where it already exists. current [" + shards.get(shard.shardId()) + "]. new [" + shard + "]"); | ||||||
} | ||||||
shards.put(shard.shardId(), shard); | ||||||
|
||||||
if (shard.initializing()) { | ||||||
initializingShards.add(shard); | ||||||
} else if(shard.relocating()) { | ||||||
relocatingShards.add(shard); | ||||||
} | ||||||
|
||||||
assert invariant(); | ||||||
} | ||||||
|
||||||
void update(ShardRouting oldShard, ShardRouting newShard) { | ||||||
|
@@ -114,11 +140,36 @@ void update(ShardRouting oldShard, ShardRouting newShard) { | |||||
} | ||||||
ShardRouting previousValue = shards.put(newShard.shardId(), newShard); | ||||||
assert previousValue == oldShard : "expected shard " + previousValue + " but was " + oldShard; | ||||||
|
||||||
if (oldShard.initializing()) { | ||||||
boolean exist = initializingShards.remove(oldShard); | ||||||
assert exist : "expected shard " + oldShard + "exists in initializingShards, but not"; | ||||||
} else if (oldShard.relocating()) { | ||||||
boolean exist = relocatingShards.remove(oldShard); | ||||||
assert exist : "expected shard " + oldShard + "exists in relocatingShards, but not"; | ||||||
} | ||||||
|
||||||
if (newShard.initializing()) { | ||||||
initializingShards.add(newShard); | ||||||
} else if (newShard.relocating()) { | ||||||
relocatingShards.add(newShard); | ||||||
} | ||||||
|
||||||
assert invariant(); | ||||||
} | ||||||
|
||||||
void remove(ShardRouting shard) { | ||||||
ShardRouting previousValue = shards.remove(shard.shardId()); | ||||||
assert previousValue == shard : "expected shard " + previousValue + " but was " + shard; | ||||||
if (shard.initializing()) { | ||||||
boolean exist = initializingShards.remove(shard); | ||||||
assert exist : "expected shard " + shard + "exists in initializingShards, but not "; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wording/whitespace suggestion:
Suggested change
|
||||||
} else if (shard.relocating()) { | ||||||
boolean exist = relocatingShards.remove(shard); | ||||||
assert exist : "expected shard " + shard + "exists in relocatingShards, but not "; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wording/whitespace suggestion:
Suggested change
|
||||||
} | ||||||
|
||||||
assert invariant(); | ||||||
} | ||||||
|
||||||
/** | ||||||
|
@@ -127,6 +178,14 @@ void remove(ShardRouting shard) { | |||||
* @return number of shards | ||||||
*/ | ||||||
public int numberOfShardsWithState(ShardRoutingState... states) { | ||||||
if (states.length == 1) { | ||||||
if(states[0] == ShardRoutingState.INITIALIZING) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whitespace nit:
Suggested change
|
||||||
return initializingShards.size(); | ||||||
} else if (states[0] == ShardRoutingState.RELOCATING) { | ||||||
return relocatingShards.size(); | ||||||
} | ||||||
} | ||||||
|
||||||
int count = 0; | ||||||
for (ShardRouting shardEntry : this) { | ||||||
for (ShardRoutingState state : states) { | ||||||
|
@@ -144,6 +203,14 @@ public int numberOfShardsWithState(ShardRoutingState... states) { | |||||
* @return List of shards | ||||||
*/ | ||||||
public List<ShardRouting> shardsWithState(ShardRoutingState... states) { | ||||||
if (states.length == 1) { | ||||||
if(states[0] == ShardRoutingState.INITIALIZING) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whitespace nit:
Suggested change
|
||||||
return new ArrayList<>(initializingShards); | ||||||
} else if (states[0] == ShardRoutingState.RELOCATING) { | ||||||
return new ArrayList<>(relocatingShards); | ||||||
} | ||||||
} | ||||||
|
||||||
List<ShardRouting> shards = new ArrayList<>(); | ||||||
for (ShardRouting shardEntry : this) { | ||||||
for (ShardRoutingState state : states) { | ||||||
|
@@ -164,6 +231,26 @@ public List<ShardRouting> shardsWithState(ShardRoutingState... states) { | |||||
public List<ShardRouting> shardsWithState(String index, ShardRoutingState... states) { | ||||||
List<ShardRouting> shards = new ArrayList<>(); | ||||||
|
||||||
if (states.length == 1) { | ||||||
if(states[0] == ShardRoutingState.INITIALIZING) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whitespace nit:
Suggested change
|
||||||
for (ShardRouting shardEntry : initializingShards) { | ||||||
if (shardEntry.getIndexName().equals(index) == false) { | ||||||
continue; | ||||||
} | ||||||
shards.add(shardEntry); | ||||||
} | ||||||
return shards; | ||||||
} else if (states[0] == ShardRoutingState.RELOCATING) { | ||||||
for (ShardRouting shardEntry : relocatingShards) { | ||||||
if (shardEntry.getIndexName().equals(index) == false) { | ||||||
continue; | ||||||
} | ||||||
shards.add(shardEntry); | ||||||
} | ||||||
return shards; | ||||||
} | ||||||
} | ||||||
|
||||||
for (ShardRouting shardEntry : this) { | ||||||
if (!shardEntry.getIndexName().equals(index)) { | ||||||
continue; | ||||||
|
@@ -181,14 +268,7 @@ public List<ShardRouting> shardsWithState(String index, ShardRoutingState... sta | |||||
* The number of shards on this node that will not be eventually relocated. | ||||||
*/ | ||||||
public int numberOfOwningShards() { | ||||||
int count = 0; | ||||||
for (ShardRouting shardEntry : this) { | ||||||
if (shardEntry.state() != ShardRoutingState.RELOCATING) { | ||||||
count++; | ||||||
} | ||||||
} | ||||||
|
||||||
return count; | ||||||
return shards.size() - relocatingShards.size(); | ||||||
} | ||||||
|
||||||
public String prettyPrint() { | ||||||
|
@@ -223,4 +303,21 @@ public List<ShardRouting> copyShards() { | |||||
public boolean isEmpty() { | ||||||
return shards.isEmpty(); | ||||||
} | ||||||
|
||||||
private boolean invariant() { | ||||||
DaveCTurner marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
// initializingShards must consistent with that in shards | ||||||
Collection<ShardRouting> shardRoutingsInitializing = | ||||||
shards.values().stream().filter(ShardRouting::initializing).collect(Collectors.toList()); | ||||||
assert initializingShards.size() == shardRoutingsInitializing.size(); | ||||||
assert initializingShards.containsAll(shardRoutingsInitializing); | ||||||
kkewwei marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
// relocatingShards must consistent with that in shards | ||||||
Collection<ShardRouting> shardRoutingsRelocating = | ||||||
shards.values().stream().filter(ShardRouting::relocating).collect(Collectors.toList()); | ||||||
assert relocatingShards.size() == shardRoutingsRelocating.size(); | ||||||
assert relocatingShards.containsAll(shardRoutingsRelocating); | ||||||
kkewwei marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
return true; | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.cluster.routing; | ||
|
||
import org.elasticsearch.Version; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.common.transport.TransportAddress; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.test.ESTestCase; | ||
|
||
import java.net.InetAddress; | ||
|
||
import static java.util.Collections.emptyMap; | ||
import static java.util.Collections.emptySet; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.hamcrest.Matchers.nullValue; | ||
|
||
public class RoutingNodeTests extends ESTestCase { | ||
private ShardRouting unassignedShard0 = | ||
TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED); | ||
private ShardRouting initializingShard0 = | ||
TestShardRouting.newShardRouting("test", 1, "node-1", false, ShardRoutingState.INITIALIZING); | ||
private ShardRouting relocatingShard0 = | ||
TestShardRouting.newShardRouting("test", 2, "node-1","node-2", false, ShardRoutingState.RELOCATING); | ||
private RoutingNode routingNode; | ||
|
||
@Override | ||
public void setUp() throws Exception { | ||
super.setUp(); | ||
InetAddress inetAddress = InetAddress.getByAddress("name1", new byte[] { (byte) 192, (byte) 168, (byte) 0, (byte) 1}); | ||
TransportAddress transportAddress = new TransportAddress(inetAddress, randomIntBetween(0, 65535)); | ||
DiscoveryNode discoveryNode = new DiscoveryNode("name1", "node-1", transportAddress, emptyMap(), emptySet(), Version.CURRENT); | ||
routingNode = new RoutingNode("node1", discoveryNode, unassignedShard0, initializingShard0, relocatingShard0); | ||
} | ||
|
||
public void testAdd() { | ||
ShardRouting initializingShard1 = | ||
TestShardRouting.newShardRouting("test", 3, "node-1", false, ShardRoutingState.INITIALIZING); | ||
ShardRouting relocatingShard0 = | ||
TestShardRouting.newShardRouting("test", 4, "node-1", "node-2",false, ShardRoutingState.RELOCATING); | ||
routingNode.add(initializingShard1); | ||
routingNode.add(relocatingShard0); | ||
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 3)), equalTo(initializingShard1)); | ||
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 4)), equalTo(relocatingShard0)); | ||
} | ||
|
||
public void testUpdate() { | ||
ShardRouting startedShard0 = | ||
TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED); | ||
ShardRouting startedShard1 = | ||
TestShardRouting.newShardRouting("test", 1, "node-1", "node-2",false, ShardRoutingState.RELOCATING); | ||
ShardRouting startedShard2 = | ||
TestShardRouting.newShardRouting("test", 2, "node-1", false, ShardRoutingState.INITIALIZING); | ||
routingNode.update(unassignedShard0, startedShard0); | ||
routingNode.update(initializingShard0, startedShard1); | ||
routingNode.update(relocatingShard0, startedShard2); | ||
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 0)).state(), | ||
equalTo(ShardRoutingState.STARTED)); | ||
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 1)).state(), | ||
equalTo(ShardRoutingState.RELOCATING)); | ||
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 2)).state(), | ||
equalTo(ShardRoutingState.INITIALIZING)); | ||
} | ||
|
||
public void testRemove() { | ||
routingNode.remove(unassignedShard0); | ||
routingNode.remove(initializingShard0); | ||
routingNode.remove(relocatingShard0); | ||
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 0)), is(nullValue())); | ||
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 1)), is(nullValue())); | ||
assertThat(routingNode.getByShardId(new ShardId("test", IndexMetaData.INDEX_UUID_NA_VALUE, 2)), is(nullValue())); | ||
} | ||
|
||
public void testNumberOfShardsWithState() { | ||
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED), equalTo(2)); | ||
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(1)); | ||
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1)); | ||
assertThat(routingNode.numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1)); | ||
} | ||
|
||
public void testShardsWithState() { | ||
assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size(), equalTo(2)); | ||
assertThat(routingNode.shardsWithState(ShardRoutingState.STARTED).size(), equalTo(1)); | ||
assertThat(routingNode.shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1)); | ||
assertThat(routingNode.shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); | ||
} | ||
|
||
public void testShardsWithStateInIndex() { | ||
assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING, ShardRoutingState.STARTED).size(), equalTo(2)); | ||
assertThat(routingNode.shardsWithState("test", ShardRoutingState.STARTED).size(), equalTo(1)); | ||
assertThat(routingNode.shardsWithState("test", ShardRoutingState.RELOCATING).size(), equalTo(1)); | ||
assertThat(routingNode.shardsWithState("test", ShardRoutingState.INITIALIZING).size(), equalTo(1)); | ||
} | ||
|
||
public void testNumberOfOwningShards() { | ||
assertThat(routingNode.numberOfOwningShards(), equalTo(2)); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whitespace nit: