Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate routing commands using updated routing state #42066

Merged
merged 7 commits into from
May 28, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,20 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}

final ShardRouting shardRouting;
try {
shardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (shardRouting.unassigned() == false) {

ShardRouting shardRouting = null;
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
shardRouting = shard;
break;
}
}
if (shardRouting == null) {
return explainOrThrowRejectedCommand(explain, allocation, "primary [" + index + "][" + shardId + "] is already assigned");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
Expand All @@ -35,6 +34,7 @@
import org.elasticsearch.index.shard.ShardNotFoundException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -101,20 +101,34 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}

final ShardRouting primaryShardRouting;
try {
primaryShardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (primaryShardRouting.unassigned()) {

ShardRouting primaryShardRouting = null;
for (RoutingNode node : allocation.routingNodes()) {
for (ShardRouting shard : node) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
primaryShardRouting = shard;
break;
}
}
}
if (primaryShardRouting == null) {
return explainOrThrowRejectedCommand(explain, allocation,
"trying to allocate a replica shard [" + index + "][" + shardId +
"], while corresponding primary shard is still unassigned");
}

List<ShardRouting> replicaShardRoutings =
allocation.routingTable().shardRoutingTable(index, shardId).replicaShardsWithState(ShardRoutingState.UNASSIGNED);
List<ShardRouting> replicaShardRoutings = new ArrayList<>();
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary() == false) {
replicaShardRoutings.add(shard);
}
}

ShardRouting shardRouting;
if (replicaShardRoutings.isEmpty()) {
return explainOrThrowRejectedCommand(explain, allocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,20 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
}

final ShardRouting shardRouting;
try {
shardRouting = allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
allocation.routingTable().shardRoutingTable(index, shardId).primaryShard();
} catch (IndexNotFoundException | ShardNotFoundException e) {
return explainOrThrowRejectedCommand(explain, allocation, e);
}
if (shardRouting.unassigned() == false) {

ShardRouting shardRouting = null;
for (ShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.getIndexName().equals(index) && shard.getId() == shardId && shard.primary()) {
shardRouting = shard;
break;
}
}
if (shardRouting == null) {
return explainOrThrowRejectedCommand(explain, allocation, "primary [" + index + "][" + shardId + "] is already assigned");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,4 +677,76 @@ public void testMoveShardFromNonDataNode() {
assertEquals("[move_allocation] can't move [test][0] from " + node2 + " to " + node1 +
": source [" + node2.getName() + "] is not a data node.", e.getMessage());
}

public void testConflictingCommandsInSingleRequest() {
AllocationService allocation = createAllocationService(Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")
.build());

final String index1 = "test1";
final String index2 = "test2";
final String index3 = "test3";
logger.info("--> building initial routing table");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(index1).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)
.putInSyncAllocationIds(0, Collections.singleton("randomAllocID"))
.putInSyncAllocationIds(1, Collections.singleton("randomAllocID2")))
.put(IndexMetaData.builder(index2).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)
.putInSyncAllocationIds(0, Collections.singleton("randomAllocID"))
.putInSyncAllocationIds(1, Collections.singleton("randomAllocID2")))
.put(IndexMetaData.builder(index3).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)
.putInSyncAllocationIds(0, Collections.singleton("randomAllocID"))
.putInSyncAllocationIds(1, Collections.singleton("randomAllocID2")))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsRecovery(metaData.index(index1))
.addAsRecovery(metaData.index(index2))
.addAsRecovery(metaData.index(index3))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData).routingTable(routingTable).build();

final String node1 = "node1";
final String node2 = "node2";
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.add(newNode(node1))
.add(newNode(node2))
).build();
final ClusterState finalClusterState = allocation.reroute(clusterState, "reroute");

logger.info("--> allocating same index primary in multiple commands should fail");
assertThat(expectThrows(IllegalArgumentException.class, () -> {
allocation.reroute(finalClusterState,
new AllocationCommands(
new AllocateStalePrimaryAllocationCommand(index1, 0, node1, true),
new AllocateStalePrimaryAllocationCommand(index1, 0, node2, true)
), false, false);
}).getMessage(), containsString("primary [" + index1 + "][0] is already assigned"));

assertThat(expectThrows(IllegalArgumentException.class, () -> {
allocation.reroute(finalClusterState,
new AllocationCommands(
new AllocateEmptyPrimaryAllocationCommand(index2, 0, node1, true),
new AllocateEmptyPrimaryAllocationCommand(index2, 0, node2, true)
), false, false);
}).getMessage(), containsString("primary [" + index2 + "][0] is already assigned"));


clusterState = allocation.reroute(clusterState,
new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand(index3, 0, node1, true)), false, false).getClusterState();
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));

final ClusterState updatedClusterState = clusterState;
assertThat(updatedClusterState.getRoutingNodes().node(node1).shardsWithState(STARTED).size(), equalTo(1));

logger.info("--> subsequent replica allocation fails as all configured replicas have been allocated");
assertThat(expectThrows(IllegalArgumentException.class, () -> {
allocation.reroute(updatedClusterState,
new AllocationCommands(
new AllocateReplicaAllocationCommand(index3, 0, node2),
new AllocateReplicaAllocationCommand(index3, 0, node2)
), false, false);
}).getMessage(), containsString("all copies of [" + index3 + "][0] are already assigned. Use the move allocation command instead"));
}
}