Skip to content

Commit

Permalink
Prioritizing primary shards movement
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Jain <jain.ankitk@gmail.com>
  • Loading branch information
jainankitk committed Oct 26, 2021
1 parent be2dcf0 commit d85c55c
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 34 deletions.
135 changes: 105 additions & 30 deletions server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,63 +48,137 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards
* that are hosted on that nodes. Each {@link RoutingNode} has a unique node id that can be used to identify the node.
*/
public class RoutingNode implements Iterable<ShardRouting> {

static class BucketedShards implements Iterable<ShardRouting> {
private static Map<Boolean, Integer> map = new HashMap<Boolean, Integer>() {{
put(true, 0);
put(false, 1);
}};

private final LinkedHashMap<ShardId, ShardRouting>[] shards; // LinkedHashMap to preserve order

BucketedShards (LinkedHashMap<ShardId, ShardRouting> primaryShards, LinkedHashMap<ShardId, ShardRouting> replicaShards) {
this.shards = new LinkedHashMap[2];
this.shards[0] = primaryShards;
this.shards[1] = replicaShards;
}

public boolean isEmpty() {
return this.shards[0].isEmpty() && this.shards[1].isEmpty();
}

public int size() {
return this.shards[0].size() + this.shards[1].size();
}

public boolean containsKey(ShardId shardId) {
return this.shards[0].containsKey(shardId) || this.shards[1].containsKey(shardId);
}

public ShardRouting get(ShardId shardId) {
if (this.shards[0].containsKey(shardId)) {
return this.shards[0].get(shardId);
}
return this.shards[1].get(shardId);
}

public ShardRouting add(ShardRouting shardRouting) {
return put(shardRouting.shardId(), shardRouting);
}

public ShardRouting put(ShardId shardId, ShardRouting shardRouting) {
ShardRouting ret = this.shards[map.get(shardRouting.primary())].put(shardId, shardRouting);
if (this.shards[map.get(!shardRouting.primary())].containsKey(shardId)) {
return this.shards[map.get(!shardRouting.primary())].remove(shardId);
}

return ret;
}

public ShardRouting remove(ShardId shardId) {
if (this.shards[0].containsKey(shardId)) {
return this.shards[0].remove(shardId);
}
return this.shards[1].remove(shardId);
}

@Override
public Iterator<ShardRouting> iterator() {
final Iterator<ShardRouting> iterator1 = Collections.unmodifiableCollection(shards[0].values()).iterator();
final Iterator<ShardRouting> iterator2 = Collections.unmodifiableCollection(shards[1].values()).iterator();
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
return iterator1.hasNext() || iterator2.hasNext();
}

@Override
public ShardRouting next() {
if (iterator1.hasNext()) {
return iterator1.next();
}
return iterator2.next();
}
};
}
}

private final String nodeId;

private final DiscoveryNode node;

private final LinkedHashMap<ShardId, ShardRouting> shards; // LinkedHashMap to preserve order
private final BucketedShards shards;

private final LinkedHashSet<ShardRouting> initializingShards;

private final LinkedHashSet<ShardRouting> relocatingShards;

private final HashMap<Index, LinkedHashSet<ShardRouting>> shardsByIndex;

public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this(nodeId, node, buildShardRoutingMap(shards));
}

RoutingNode(String nodeId, DiscoveryNode node, LinkedHashMap<ShardId, ShardRouting> shards) {
public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shardRoutings) {
this.nodeId = nodeId;
this.node = node;
this.shards = shards;
final LinkedHashMap<ShardId, ShardRouting> primaryShards = new LinkedHashMap<>();
final LinkedHashMap<ShardId, ShardRouting> replicaShards = new LinkedHashMap<>();
this.shards = new BucketedShards(primaryShards, replicaShards);
this.relocatingShards = new LinkedHashSet<>();
this.initializingShards = new LinkedHashSet<>();
this.shardsByIndex = new LinkedHashMap<>();
for (ShardRouting shardRouting : shards.values()) {

for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.initializing()) {
initializingShards.add(shardRouting);
} else if (shardRouting.relocating()) {
relocatingShards.add(shardRouting);
}
shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting);
}
assert invariant();
}

private static LinkedHashMap<ShardId, ShardRouting> buildShardRoutingMap(ShardRouting... shardRoutings) {
final LinkedHashMap<ShardId, ShardRouting> shards = new LinkedHashMap<>();
for (ShardRouting shardRouting : shardRoutings) {
ShardRouting previousValue = shards.put(shardRouting.shardId(), shardRouting);
ShardRouting previousValue;
if (shardRouting.primary()) {
previousValue = primaryShards.put(shardRouting.shardId(), shardRouting);
} else {
previousValue = replicaShards.put(shardRouting.shardId(), shardRouting);
}

if (previousValue != null) {
throw new IllegalArgumentException(
"Cannot have two different shards with same shard id " + shardRouting.shardId() + " on same node "
);
throw new IllegalArgumentException("Cannot have two different shards with same shard id " + shardRouting.shardId() +
" on same node ");
}
}
return shards;

assert invariant();
}

@Override
public Iterator<ShardRouting> iterator() {
return Collections.unmodifiableCollection(shards.values()).iterator();
return shards.iterator();
}

/**
Expand Down Expand Up @@ -139,7 +213,7 @@ public int size() {
*/
void add(ShardRouting shard) {
assert invariant();
if (shards.containsKey(shard.shardId())) {
if (shards.add(shard) != null) {
throw new IllegalStateException(
"Trying to add a shard "
+ shard.shardId()
Expand All @@ -152,7 +226,6 @@ void add(ShardRouting shard) {
+ "]"
);
}
shards.put(shard.shardId(), shard);

if (shard.initializing()) {
initializingShards.add(shard);
Expand Down Expand Up @@ -322,7 +395,7 @@ public int numberOfOwningShardsForIndex(final Index index) {
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n");
for (ShardRouting entry : shards.values()) {
for (ShardRouting entry : shards) {
sb.append("--------").append(entry.shortSummary()).append('\n');
}
return sb.toString();
Expand All @@ -345,7 +418,9 @@ public String toString() {
}

public List<ShardRouting> copyShards() {
return new ArrayList<>(shards.values());
List<ShardRouting> result = new ArrayList<>();
shards.forEach(result::add);
return result;
}

public boolean isEmpty() {
Expand All @@ -355,23 +430,23 @@ public boolean isEmpty() {
private boolean invariant() {

// initializingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsInitializing = shards.values()
.stream()
Collection<ShardRouting> shardRoutingsInitializing = StreamSupport
.stream(shards.spliterator(), false)
.filter(ShardRouting::initializing)
.collect(Collectors.toList());
assert initializingShards.size() == shardRoutingsInitializing.size();
assert initializingShards.containsAll(shardRoutingsInitializing);

// relocatingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsRelocating = shards.values()
.stream()
Collection<ShardRouting> shardRoutingsRelocating = StreamSupport
.stream(shards.spliterator(), false)
.filter(ShardRouting::relocating)
.collect(Collectors.toList());
assert relocatingShards.size() == shardRoutingsRelocating.size();
assert relocatingShards.containsAll(shardRoutingsRelocating);

final Map<Index, Set<ShardRouting>> shardRoutingsByIndex = shards.values()
.stream()
final Map<Index, Set<ShardRouting>> shardRoutingsByIndex = StreamSupport
.stream(shards.spliterator(), false)
.collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet()));
assert shardRoutingsByIndex.equals(shardsByIndex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1284,24 +1284,50 @@ public Iterator<ShardRouting> nodeInterleavedShardIterator() {
queue.add(entry.getValue().copyShards().iterator());
}
return new Iterator<ShardRouting>() {
private Queue<ShardRouting> replicaShards = new ArrayDeque<>();
private Queue<Iterator<ShardRouting>> replicaIterators = new ArrayDeque<>();
public boolean hasNext() {
while (!queue.isEmpty()) {
if (queue.peek().hasNext()) {
return true;
}
queue.poll();
}
if (!replicaShards.isEmpty()) {
return true;
}
while (!replicaIterators.isEmpty()) {
if (replicaIterators.peek().hasNext()) {
return true;
}
replicaIterators.poll();
}
return false;
}

public ShardRouting next() {
if (hasNext() == false) {
throw new NoSuchElementException();
}
Iterator<ShardRouting> iter = queue.poll();
ShardRouting result = iter.next();
queue.offer(iter);
return result;
while (!queue.isEmpty()) {
Iterator<ShardRouting> iter = queue.poll();
if (iter.hasNext()) {
ShardRouting result = iter.next();
if (result.primary()) {
queue.offer(iter);
return result;
}
replicaShards.offer(result);
replicaIterators.offer(iter);
}
}
if (!replicaShards.isEmpty()) {
return replicaShards.poll();
}
Iterator<ShardRouting> replicaIterator = replicaIterators.poll();
ShardRouting replicaShard = replicaIterator.next();
replicaIterators.offer(replicaIterator);
return replicaShard;
}

public void remove() {
Expand Down

0 comments on commit d85c55c

Please sign in to comment.