Skip to content

Commit

Permalink
Some structural changes and added UTs to cover all possible configura…
Browse files Browse the repository at this point in the history
…tion

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>
  • Loading branch information
Shubh Sahu committed Mar 18, 2024
1 parent 4548951 commit 6672cbe
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ protected void clusterManagerOperation(
final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
IndexMetadata indexMetadata = state.metadata().index(sourceIndex);
ClusterSettings clusterSettings = clusterService.getClusterSettings();
validateClusterModeSettings(resizeRequest.getResizeType(),indexMetadata,clusterSettings);

if (resizeRequest.getResizeType().equals(ResizeType.SHRINK)
&& state.metadata().isSegmentReplicationEnabled(sourceIndex)
&& indexMetadata != null
Expand All @@ -166,7 +164,7 @@ protected void clusterManagerOperation(
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, i -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, indicesStatsResponse.getPrimaries().store, sourceIndex, targetIndex);
}, indicesStatsResponse.getPrimaries().store, clusterSettings,sourceIndex, targetIndex);

if (indicesStatsResponse.getIndex(sourceIndex)
.getTotal()
Expand Down Expand Up @@ -205,7 +203,7 @@ protected void clusterManagerOperation(
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, i -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, indicesStatsResponse.getPrimaries().store, sourceIndex, targetIndex);
}, indicesStatsResponse.getPrimaries().store, clusterSettings, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest,
ActionListener.map(
Expand All @@ -228,6 +226,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
final ClusterState state,
final IntFunction<DocsStats> perShardDocStats,
final StoreStats primaryShardsStoreStats,
final ClusterSettings clusterSettings,
String sourceIndexName,
String targetIndexName
) {
Expand All @@ -236,6 +235,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
if (metadata == null) {
throw new IndexNotFoundException(sourceIndexName);
}
validateClusterModeSettings(resizeRequest.getResizeType(),metadata,clusterSettings);
final Settings.Builder targetIndexSettingsBuilder = Settings.builder()
.put(targetIndex.settings())
.normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX);
Expand Down Expand Up @@ -379,10 +379,7 @@ private static void validateClusterModeSettings(final ResizeType type,IndexMetad
.equals(RemoteStoreNodeService.CompatibilityMode.MIXED);
boolean isRemoteStoreMigrationDirection = clusterSettings.get(RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING)
.equals(RemoteStoreNodeService.Direction.REMOTE_STORE);
boolean isRemoteStoreEnabled = false;
if(sourceIndexMetadata!=null) {
isRemoteStoreEnabled = sourceIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false);
}
boolean isRemoteStoreEnabled = sourceIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false);
if (isMixed && isRemoteStoreMigrationDirection && !isRemoteStoreEnabled) {
throw new IllegalStateException("index Resizing for type [" + type + "] is not allowed as Cluster mode is [Mixed]"
+ " and migration direction is [Remote Store]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,34 @@
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.query.Rewriteable;
import org.opensearch.index.query.RewriteableTests;
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.store.StoreStats;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.gateway.TestGatewayAllocator;

import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.Matchers.*;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;


import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;

public class TransportResizeActionTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -95,13 +108,23 @@ private ClusterState createClusterState(String name, int numShards, int numRepli
return clusterState;
}

private ClusterSettings createClusterSettings(CompatibilityMode compatibilityMode, RemoteStoreNodeService.Direction migrationDirection) {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterSettings.applySettings(
(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), compatibilityMode)
.put(MIGRATION_DIRECTION_SETTING.getKey(), migrationDirection)).build()
);
return clusterSettings;
}

public void testErrorCondition() {
ClusterState state = createClusterState(
"source",
randomIntBetween(2, 42),
randomIntBetween(0, 10),
Settings.builder().put("index.blocks.write", true).build()
);
ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT,RemoteStoreNodeService.Direction.NONE);
assertTrue(
expectThrows(
IllegalStateException.class,
Expand All @@ -110,6 +133,7 @@ public void testErrorCondition() {
state,
(i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)),
new StoreStats(between(1, 10000), between(1, 10000)),
clusterSettings,
"source",
"target"
)
Expand All @@ -125,6 +149,7 @@ public void testErrorCondition() {
clusterState,
(i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE / 2, between(1, 1000), between(1, 10000)) : null,
new StoreStats(between(1, 10000), between(1, 10000)),
clusterSettings,
"source",
"target"
);
Expand All @@ -144,6 +169,7 @@ public void testErrorCondition() {
clusterState,
(i) -> new DocsStats(between(10, 1000), between(1, 10), between(1, 10000)),
new StoreStats(between(1, 10000), between(1, 10000)),
clusterSettings,
"source",
"target"
);
Expand Down Expand Up @@ -173,6 +199,7 @@ public void testErrorCondition() {
clusterState,
(i) -> new DocsStats(between(1, 1000), between(1, 1000), between(0, 10000)),
new StoreStats(between(1, 10000), between(1, 10000)),
clusterSettings,
"source",
"target"
);
Expand All @@ -189,7 +216,7 @@ public void testPassNumRoutingShards() {
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);

ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT,RemoteStoreNodeService.Direction.NONE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
Expand All @@ -204,6 +231,7 @@ public void testPassNumRoutingShards() {
clusterState,
null,
new StoreStats(between(1, 10000), between(1, 10000)),
clusterSettings,
"source",
"target"
);
Expand All @@ -217,6 +245,7 @@ public void testPassNumRoutingShards() {
clusterState,
null,
new StoreStats(between(1, 10000), between(1, 10000)),
clusterSettings,
"source",
"target"
);
Expand All @@ -235,6 +264,7 @@ public void testPassNumRoutingShardsAndFail() {
EmptySnapshotsInfoService.INSTANCE
);

ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT,RemoteStoreNodeService.Direction.NONE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
Expand All @@ -249,6 +279,7 @@ public void testPassNumRoutingShardsAndFail() {
clusterState,
null,
new StoreStats(between(1, 10000), between(1, 10000)),
clusterSettings,
"source",
"target"
);
Expand All @@ -265,6 +296,7 @@ public void testPassNumRoutingShardsAndFail() {
finalState,
null,
new StoreStats(between(1, 10000), between(1, 10000)),
clusterSettings,
"source",
"target"
)
Expand All @@ -286,6 +318,7 @@ public void testShrinkIndexSettings() {
EmptySnapshotsInfoService.INSTANCE
);

ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT,RemoteStoreNodeService.Direction.NONE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
Expand All @@ -301,6 +334,7 @@ public void testShrinkIndexSettings() {
clusterState,
(i) -> stats,
new StoreStats(between(1, 10000), between(1, 10000)),
clusterSettings,
indexName,
"target"
);
Expand All @@ -325,6 +359,8 @@ public void testShrinkWithMaxShardSize() {
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);

ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT,RemoteStoreNodeService.Direction.NONE);
RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
Expand All @@ -345,6 +381,7 @@ public void testShrinkWithMaxShardSize() {
clusterState,
(i) -> stats,
new StoreStats(100, between(1, 10000)),
clusterSettings,
indexName,
"target"
);
Expand All @@ -366,6 +403,7 @@ public void testShrinkWithMaxShardSize() {
clusterState,
(i) -> stats,
new StoreStats(100, between(1, 10000)),
clusterSettings,
indexName,
"target"
);
Expand All @@ -387,6 +425,7 @@ public void testShrinkWithMaxShardSize() {
clusterState,
(i) -> stats,
new StoreStats(100, between(1, 10000)),
clusterSettings,
indexName,
"target"
);
Expand Down Expand Up @@ -477,6 +516,7 @@ public void testIndexBlocks() {
createClusterState(indexName, 10, 0, 40, Settings.builder().put("index.blocks.read_only", true).build())
).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build();

ClusterSettings clusterSettings = createClusterSettings(CompatibilityMode.STRICT,RemoteStoreNodeService.Direction.NONE);
// Target index will be blocked by [index.blocks.read_only=true] copied from the source index
ResizeRequest resizeRequest = new ResizeRequest("target", indexName);
ResizeType resizeType;
Expand All @@ -500,6 +540,7 @@ public void testIndexBlocks() {
finalState,
null,
new StoreStats(between(1, 10000), between(1, 10000)),
clusterSettings,
indexName,
"target"
)
Expand Down Expand Up @@ -551,6 +592,7 @@ public void testIndexBlocks() {
clusterState,
(i) -> stats,
new StoreStats(100, between(1, 10000)),
clusterSettings,
indexName,
"target"
);
Expand All @@ -561,6 +603,107 @@ public void testIndexBlocks() {
assertEquals(request.waitForActiveShards(), activeShardCount);
}

public void testResizeFailuresDuringMigration() {
//We will keep all other settings correct for resize request,
//So we only need to test for the failures due to cluster setting validation while migration
final Settings directionEnabledNodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings);
boolean isRemoteStoreEnabled = randomBoolean();
CompatibilityMode compatibilityMode = randomFrom(CompatibilityMode.values());
RemoteStoreNodeService.Direction migrationDirection = randomFrom(RemoteStoreNodeService.Direction.values());
//If not mixed mode, then migration direction is NONE.
if(!compatibilityMode.equals(CompatibilityMode.MIXED)){
migrationDirection = RemoteStoreNodeService.Direction.NONE;
}
ClusterSettings clusterSettings = createClusterSettings(compatibilityMode,migrationDirection);

ClusterState clusterState = ClusterState.builder(
createClusterState("source", 10, 0,40,
Settings.builder().put("index.blocks.write", true)
.put(SETTING_REMOTE_STORE_ENABLED, isRemoteStoreEnabled)
.build())
).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build();
AllocationService service = new AllocationService(
new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())),
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);

RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
// now we start the shard
routingTable = OpenSearchAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, "source").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
DocsStats stats = new DocsStats(between(0, (IndexWriter.MAX_DOCS) / 10), between(1, 1000), between(1, 10000));
ResizeRequest resizeRequest = new ResizeRequest("target", "source");
ResizeType resizeType;
int expectedShardsNum;
String cause;
switch (randomIntBetween(0, 2)) {
case 0:
resizeType = ResizeType.SHRINK;
expectedShardsNum = 5;
cause = "shrink_index";
break;
case 1:
resizeType = ResizeType.SPLIT;
expectedShardsNum = 20;
cause = "split_index";
break;
default:
resizeType = ResizeType.CLONE;
expectedShardsNum = 10;
cause = "clone_index";
}
resizeRequest.setResizeType(resizeType);
resizeRequest.getTargetIndexRequest()
.settings(Settings.builder().put("index.number_of_shards", expectedShardsNum).put("index.blocks.read_only", false).build());
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
resizeRequest.setWaitForActiveShards(activeShardCount);
//startsWith("index Resizing for type [")
if (compatibilityMode == CompatibilityMode.MIXED
&& migrationDirection == RemoteStoreNodeService.Direction.REMOTE_STORE
&& !isRemoteStoreEnabled) {
ClusterState finalState = clusterState;
IllegalStateException ise = expectThrows(
IllegalStateException.class,
() ->TransportResizeAction.prepareCreateIndexRequest(
new ResizeRequest("target", "source"),
finalState,
(i) -> stats,
new StoreStats(between(1, 10000), between(1, 10000)),
clusterSettings,
"source",
"target"
)
);
assertThat(
ise.getMessage(),
allOf(
startsWith("index Resizing for type"),
endsWith("Cluster mode is [Mixed] and migration direction is [Remote Store]")
)
);
} else {
CreateIndexClusterStateUpdateRequest request = TransportResizeAction.prepareCreateIndexRequest(
resizeRequest,
clusterState,
(i) -> stats,
new StoreStats(100, between(1, 10000)),
clusterSettings,
"source",
"target"
);
assertNotNull(request.recoverFrom());
assertEquals("source", request.recoverFrom().getName());
assertEquals(String.valueOf(expectedShardsNum), request.settings().get("index.number_of_shards"));
assertEquals(cause, request.cause());
assertEquals(request.waitForActiveShards(), activeShardCount);
}
}

private DiscoveryNode newNode(String nodeId) {
final Set<DiscoveryNodeRole> roles = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE))
Expand Down

0 comments on commit 6672cbe

Please sign in to comment.