From 5b541b7a28c40e78ababdea8ecc35aae233e611f Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Tue, 1 Oct 2024 16:47:43 -0700 Subject: [PATCH 1/5] Add test to prove and preserve the behavior of allowing a topology migration which can isolate shuffling to a single resource group. --- .../controller/TestTopologyMigration.java | 351 ++++++++++++++++++ 1 file changed, 351 insertions(+) create mode 100644 helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java new file mode 100644 index 0000000000..37da21e604 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java @@ -0,0 +1,351 @@ +package org.apache.helix.integration.controller; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore; +import org.apache.helix.examples.LeaderStandbyStateModelFactory; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBucketDataAccessor; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestTopologyMigration extends ZkTestBase { + private static final int START_PORT = 12918; // Starting port for mock participants + private static int _nextStartPort = START_PORT; // Incremental port for participants + private static final String TEST_CAPACITY_KEY = "TestCapacityKey"; + private static final int TEST_CAPACITY_VALUE = 100; // Default instance capacity for testing + private static final String RACK = "rack"; // Rack identifier in topology + private static final String HOST = "host"; // Host identifier in topology + private static final String APPLICATION_INSTANCE_ID = "applicationInstanceId"; + private static final String MZ = "mz"; // Migrated zone identifier + private static final String INIT_TOPOLOGY = String.format("/%s/%s", RACK, HOST); + // Initial topology format + private static final String MIGRATED_TOPOLOGY = + String.format("/%s/%s/%s", MZ, HOST, APPLICATION_INSTANCE_ID); // New topology format + private static final int INIT_ZONE_COUNT = 12; // Initial zone count + private static final int MIGRATE_ZONE_COUNT = 6; // Zone count post-migration + private static final int RESOURCE_COUNT = 2; // Number of resources in the cluster + private static final int INSTANCES_PER_RESOURCE = 12; // Number of instances per resource + private static final int PARTITIONS = 3; // Number of partitions + private static final int REPLICA = 6; // Number of replicas + private static final long DEFAULT_RESOURCE_DELAY_TIME = 1800000L; + // Delay time for resource rebalance + + private final String CLASS_NAME = getShortClassName(); // Test class name + private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; // Cluster name for testing + + protected ClusterControllerManager _controller; // Cluster controller instance + private final List _participants = new ArrayList<>(); + // List of participant managers + private final Set _allDBs = new HashSet<>(); // Set of all databases + private ZkHelixClusterVerifier _clusterVerifier; // Cluster verifier + private ConfigAccessor _configAccessor; // Config accessor + private HelixDataAccessor _dataAccessor; // Data accessor + protected AssignmentMetadataStore _assignmentMetadataStore; // Metadata store for assignments + + /** + * Sets up the test cluster and initializes participants before running tests. + */ + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + // Start cluster controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + _configAccessor = new ConfigAccessor(_gZkClient); + _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + + // Set up cluster configuration and participants + setupClusterConfig(INIT_TOPOLOGY, RACK); + setupInitResourcesAndParticipants(); + setUpWagedBaseline(); + + // Initialize cluster verifier for validating state + _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setDeactivatedNodeAwareness(true).setResources(_allDBs) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + } + + /** + * Cleans up after the test by stopping participants and dropping resources. + */ + @AfterClass + public void afterClass() { + // Drop all databases from the cluster + for (String db : _allDBs) { + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Stop all participants and controller + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + _controller.syncStop(); + } + + /** + * Sets up the cluster configuration with the given topology and fault zone type. + */ + private void setupClusterConfig(String topology, String faultZoneType) { + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.stateTransitionCancelEnabled(true); + clusterConfig.setDelayRebalaceEnabled(true); + clusterConfig.setRebalanceDelayTime(DEFAULT_RESOURCE_DELAY_TIME); + clusterConfig.setTopology(topology); + clusterConfig.setFaultZoneType(faultZoneType); + clusterConfig.setTopologyAwareEnabled(true); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + } + + /** + * Sets up initial resources and mock participants for the cluster. + */ + private void setupInitResourcesAndParticipants() throws Exception { + for (int i = 0; i < RESOURCE_COUNT; i++) { + String dbName = "TestDB_" + i; + + // Create and start participants for the resource + for (int j = 0; j < INSTANCES_PER_RESOURCE; j++) { + String participantName = "localhost_" + _nextStartPort; + + InstanceConfig instanceConfig = new InstanceConfig.Builder().setDomain( + String.format("%s=%s, %s=%s", RACK, j % INIT_ZONE_COUNT, HOST, participantName)) + .addTag(dbName).build(participantName); + + _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, instanceConfig); + + MockParticipantManager participant = createParticipant(participantName); + participant.syncStart(); + _nextStartPort++; + _participants.add(participant); + } + + // Set up IdealState for the resource + IdealState is = createResourceWithWagedRebalance(CLUSTER_NAME, dbName, + BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1); + is.setResourceGroupName(dbName); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, dbName, is); + + _allDBs.add(dbName); + } + } + + /** + * Creates and starts a mock participant with a registered state model factory. + */ + private MockParticipantManager createParticipant(String participantName) throws Exception { + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName, 10, null); + participant.getStateMachineEngine() + .registerStateModelFactory("LeaderStandby", new LeaderStandbyStateModelFactory()); + return participant; + } + + /** + * Tests topology migration with and without domain updates, ensuring no shuffling occurs. + */ + @Test + public void testTopologyMigration() { + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Step 1: Migrate to new topology in maintenance mode + Map originalEVs = getEVs(); + List instanceConfigs = + _gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME).stream().map( + instanceName -> _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceName)).collect(Collectors.toList()); + + _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, true); + setupClusterConfig(MIGRATED_TOPOLOGY, MZ); + migrateInstanceConfigTopology(instanceConfigs); + _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, false); + + // Verify cluster state after topology migration + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateNoShufflingOccurred(originalEVs, null); + + // Step 2: Update domain values for one resource group at a time + for (String updatingDb : _allDBs) { + Map preMigrationEVs = getEVs(); + migrateDomainForResourceGroup(updatingDb); + validateNoShufflingOccurred(preMigrationEVs, updatingDb); + } + } + + /** + * Retrieves the ExternalViews for all databases in the cluster. + */ + private Map getEVs() { + Map externalViews = new HashMap<>(); + for (String db : _allDBs) { + externalViews.put(db, + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db)); + } + return externalViews; + } + + /** + * Compares the contents of two ExternalViews to determine if they are equal. + */ + private boolean compareExternalViews(ExternalView oldEV, ExternalView newEV) { + if (oldEV == null || newEV == null) { + return false; + } + + Map> oldEVMap = oldEV.getRecord().getMapFields(); + Map> newEVMap = newEV.getRecord().getMapFields(); + + if (oldEVMap.size() != newEVMap.size()) { + return false; + } + + for (String partition : oldEVMap.keySet()) { + if (!oldEVMap.get(partition).equals(newEVMap.get(partition))) { + return false; + } + } + return true; + } + + private void setUpWagedBaseline() { + _assignmentMetadataStore = + new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), CLUSTER_NAME) { + public Map getBaseline() { + // Ensure this metadata store always read from the ZK without using cache. + super.reset(); + return super.getBaseline(); + } + + public synchronized Map getBestPossibleAssignment() { + // Ensure this metadata store always read from the ZK without using cache. + super.reset(); + return super.getBestPossibleAssignment(); + } + }; + + // Set test instance capacity and partition weights + ClusterConfig clusterConfig = + _dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig()); + clusterConfig.setInstanceCapacityKeys(Collections.singletonList(TEST_CAPACITY_KEY)); + clusterConfig.setDefaultInstanceCapacityMap( + Collections.singletonMap(TEST_CAPACITY_KEY, TEST_CAPACITY_VALUE)); + clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(TEST_CAPACITY_KEY, 1)); + _dataAccessor.setProperty(_dataAccessor.keyBuilder().clusterConfig(), clusterConfig); + } + + private void validateNoShufflingOccurred(Map originalEVs, + String shouldShuffleDB) { + Map updatedEVs = getEVs(); + for (String db : _allDBs) { + if (db.equals(shouldShuffleDB)) { + Assert.assertFalse(compareExternalViews(originalEVs.get(db), updatedEVs.get(db)), + String.format("Expected shuffling didn't occur for database %s", db)); + } else { + Assert.assertTrue(compareExternalViews(originalEVs.get(db), updatedEVs.get(db)), + String.format("Unexpected shuffling occurred for database %s", db)); + } + } + } + + private void migrateInstanceConfigTopology(List instanceConfigs) { + // Enter maintenance mode to update instance configurations + _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, true); + + for (InstanceConfig instanceConfig : instanceConfigs) { + String rackId = instanceConfig.getDomainAsMap().get(RACK); + String hostId = instanceConfig.getDomainAsMap().get(HOST); + + // Set new domain based on the new topology format + String newDomain = + String.format("%s=%s, %s=%s, %s=%s", MZ, rackId, HOST, hostId, APPLICATION_INSTANCE_ID, + hostId); + instanceConfig.setDomain(newDomain); + + // Update the instance configuration in the cluster + _gSetupTool.getClusterManagementTool() + .setInstanceConfig(CLUSTER_NAME, instanceConfig.getInstanceName(), instanceConfig); + } + + // Exit maintenance mode after updating instance configurations + _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, false); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + private void migrateDomainForResourceGroup(String resourceGroup) { + // Enter maintenance mode to update domain values + _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, true); + + int instanceIndex = 0; + for (MockParticipantManager participant : _participants) { + InstanceConfig instanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, participant.getInstanceName()); + if (instanceConfig.containsTag(resourceGroup)) { + Map newDomain = instanceConfig.getDomainAsMap(); + newDomain.put(MZ, String.valueOf(instanceIndex % MIGRATE_ZONE_COUNT)); + newDomain.put(APPLICATION_INSTANCE_ID, UUID.randomUUID().toString()); + instanceConfig.setDomain(newDomain); + _gSetupTool.getClusterManagementTool() + .setInstanceConfig(CLUSTER_NAME, participant.getInstanceName(), instanceConfig); + instanceIndex++; + } + } + + // Exit maintenance mode after updating domain values + _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, false); + + // Verify cluster state after domain update + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } +} From d1d5ffdffd0c8074e8e0ebb1d0ff0f4532da4248 Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Tue, 1 Oct 2024 16:49:49 -0700 Subject: [PATCH 2/5] Fix test naming --- .../helix/integration/controller/TestTopologyMigration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java index 37da21e604..50d2d02e2b 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java @@ -197,7 +197,7 @@ private MockParticipantManager createParticipant(String participantName) throws * Tests topology migration with and without domain updates, ensuring no shuffling occurs. */ @Test - public void testTopologyMigration() { + public void testTopologyMigrationByResourceGroup() { Assert.assertTrue(_clusterVerifier.verifyByPolling()); // Step 1: Migrate to new topology in maintenance mode From 29f7f70eac40d2b027391e9b9a5921e5019b830f Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Tue, 1 Oct 2024 16:55:22 -0700 Subject: [PATCH 3/5] Remove unused method. --- .../controller/TestTopologyMigration.java | 42 +++---------------- 1 file changed, 5 insertions(+), 37 deletions(-) diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java index 50d2d02e2b..fb02570473 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java @@ -31,21 +31,16 @@ import java.util.stream.Collectors; import org.apache.helix.ConfigAccessor; -import org.apache.helix.HelixDataAccessor; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; -import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore; import org.apache.helix.examples.LeaderStandbyStateModelFactory; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.manager.zk.ZkBucketDataAccessor; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.ResourceAssignment; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; @@ -84,8 +79,6 @@ public class TestTopologyMigration extends ZkTestBase { private final Set _allDBs = new HashSet<>(); // Set of all databases private ZkHelixClusterVerifier _clusterVerifier; // Cluster verifier private ConfigAccessor _configAccessor; // Config accessor - private HelixDataAccessor _dataAccessor; // Data accessor - protected AssignmentMetadataStore _assignmentMetadataStore; // Metadata store for assignments /** * Sets up the test cluster and initializes participants before running tests. @@ -100,15 +93,12 @@ public void beforeClass() throws Exception { String controllerName = CONTROLLER_PREFIX + "_0"; _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); _controller.syncStart(); - - enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); _configAccessor = new ConfigAccessor(_gZkClient); - _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); // Set up cluster configuration and participants + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); setupClusterConfig(INIT_TOPOLOGY, RACK); setupInitResourcesAndParticipants(); - setUpWagedBaseline(); // Initialize cluster verifier for validating state _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) @@ -146,6 +136,10 @@ private void setupClusterConfig(String topology, String faultZoneType) { clusterConfig.setTopology(topology); clusterConfig.setFaultZoneType(faultZoneType); clusterConfig.setTopologyAwareEnabled(true); + clusterConfig.setInstanceCapacityKeys(Collections.singletonList(TEST_CAPACITY_KEY)); + clusterConfig.setDefaultInstanceCapacityMap( + Collections.singletonMap(TEST_CAPACITY_KEY, TEST_CAPACITY_VALUE)); + clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(TEST_CAPACITY_KEY, 1)); _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); } @@ -259,32 +253,6 @@ private boolean compareExternalViews(ExternalView oldEV, ExternalView newEV) { return true; } - private void setUpWagedBaseline() { - _assignmentMetadataStore = - new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), CLUSTER_NAME) { - public Map getBaseline() { - // Ensure this metadata store always read from the ZK without using cache. - super.reset(); - return super.getBaseline(); - } - - public synchronized Map getBestPossibleAssignment() { - // Ensure this metadata store always read from the ZK without using cache. - super.reset(); - return super.getBestPossibleAssignment(); - } - }; - - // Set test instance capacity and partition weights - ClusterConfig clusterConfig = - _dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig()); - clusterConfig.setInstanceCapacityKeys(Collections.singletonList(TEST_CAPACITY_KEY)); - clusterConfig.setDefaultInstanceCapacityMap( - Collections.singletonMap(TEST_CAPACITY_KEY, TEST_CAPACITY_VALUE)); - clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(TEST_CAPACITY_KEY, 1)); - _dataAccessor.setProperty(_dataAccessor.keyBuilder().clusterConfig(), clusterConfig); - } - private void validateNoShufflingOccurred(Map originalEVs, String shouldShuffleDB) { Map updatedEVs = getEVs(); From 8f1e6864bc4f43174491837f7efc90c23cedd196 Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Thu, 3 Oct 2024 16:08:52 -0700 Subject: [PATCH 4/5] Address comments about validating that MM is enabled before proceeding with test logic. --- .../controller/TestTopologyMigration.java | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java index fb02570473..c68830efa6 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java @@ -191,7 +191,7 @@ private MockParticipantManager createParticipant(String participantName) throws * Tests topology migration with and without domain updates, ensuring no shuffling occurs. */ @Test - public void testTopologyMigrationByResourceGroup() { + public void testTopologyMigrationByResourceGroup() throws Exception { Assert.assertTrue(_clusterVerifier.verifyByPolling()); // Step 1: Migrate to new topology in maintenance mode @@ -201,10 +201,10 @@ public void testTopologyMigrationByResourceGroup() { instanceName -> _gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceName)).collect(Collectors.toList()); - _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, true); + setAndVerifyMaintenanceMode(true); setupClusterConfig(MIGRATED_TOPOLOGY, MZ); migrateInstanceConfigTopology(instanceConfigs); - _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, false); + setAndVerifyMaintenanceMode(false); // Verify cluster state after topology migration Assert.assertTrue(_clusterVerifier.verifyByPolling()); @@ -218,6 +218,17 @@ public void testTopologyMigrationByResourceGroup() { } } + /** + * Set MaintenanceMode and verify that controller has processed it. + */ + private void setAndVerifyMaintenanceMode(boolean enable) throws Exception { + _gSetupTool.getClusterManagementTool() + .manuallyEnableMaintenanceMode(CLUSTER_NAME, enable, "", Collections.emptyMap()); + TestHelper.verify( + () -> _gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME) == enable, + 2000L); + } + /** * Retrieves the ExternalViews for all databases in the cluster. */ @@ -267,9 +278,10 @@ private void validateNoShufflingOccurred(Map originalEVs, } } - private void migrateInstanceConfigTopology(List instanceConfigs) { + private void migrateInstanceConfigTopology(List instanceConfigs) + throws Exception { // Enter maintenance mode to update instance configurations - _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, true); + setAndVerifyMaintenanceMode(true); for (InstanceConfig instanceConfig : instanceConfigs) { String rackId = instanceConfig.getDomainAsMap().get(RACK); @@ -287,13 +299,13 @@ private void migrateInstanceConfigTopology(List instanceConfigs) } // Exit maintenance mode after updating instance configurations - _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, false); + setAndVerifyMaintenanceMode(false); Assert.assertTrue(_clusterVerifier.verifyByPolling()); } - private void migrateDomainForResourceGroup(String resourceGroup) { + private void migrateDomainForResourceGroup(String resourceGroup) throws Exception { // Enter maintenance mode to update domain values - _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, true); + setAndVerifyMaintenanceMode(true); int instanceIndex = 0; for (MockParticipantManager participant : _participants) { @@ -311,7 +323,7 @@ private void migrateDomainForResourceGroup(String resourceGroup) { } // Exit maintenance mode after updating domain values - _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, false); + setAndVerifyMaintenanceMode(false); // Verify cluster state after domain update Assert.assertTrue(_clusterVerifier.verifyByPolling()); From 56b4cbe0b7d11ed2facf62da9c6a8307a595c64b Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Fri, 4 Oct 2024 13:44:59 -0700 Subject: [PATCH 5/5] Use best possible external view verifier instead of strict external view verifier to ensure that the cluster is converged with the bps that should eventually be calculated by the controller once it processes current state of the cluster. --- .../controller/TestTopologyMigration.java | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java index c68830efa6..9a076ece34 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTopologyMigration.java @@ -31,6 +31,8 @@ import java.util.stream.Collectors; import org.apache.helix.ConfigAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyPathBuilder; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.examples.LeaderStandbyStateModelFactory; @@ -38,9 +40,11 @@ import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ControllerHistory; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; @@ -101,9 +105,9 @@ public void beforeClass() throws Exception { setupInitResourcesAndParticipants(); // Initialize cluster verifier for validating state - _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) - .setDeactivatedNodeAwareness(true).setResources(_allDBs) - .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(_allDBs).setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) + .build(); } /** @@ -206,14 +210,18 @@ public void testTopologyMigrationByResourceGroup() throws Exception { migrateInstanceConfigTopology(instanceConfigs); setAndVerifyMaintenanceMode(false); - // Verify cluster state after topology migration - Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // Verify cluster did not have shuffling anywhere after + // the migration to the new topology validateNoShufflingOccurred(originalEVs, null); // Step 2: Update domain values for one resource group at a time for (String updatingDb : _allDBs) { Map preMigrationEVs = getEVs(); + setAndVerifyMaintenanceMode(true); migrateDomainForResourceGroup(updatingDb); + setAndVerifyMaintenanceMode(false); + + // Verify cluster only had shuffling in the resource group that was updated validateNoShufflingOccurred(preMigrationEVs, updatingDb); } } @@ -222,11 +230,20 @@ public void testTopologyMigrationByResourceGroup() throws Exception { * Set MaintenanceMode and verify that controller has processed it. */ private void setAndVerifyMaintenanceMode(boolean enable) throws Exception { + if (enable) { + // Check that the cluster converged to the best possible state that should be calculated + // by the controller before we change the maintenance mode. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + _gSetupTool.getClusterManagementTool() .manuallyEnableMaintenanceMode(CLUSTER_NAME, enable, "", Collections.emptyMap()); - TestHelper.verify( - () -> _gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME) == enable, - 2000L); + + if (!enable) { + // Check that the cluster converged to the best possible state that should be calculated + // by the controller after we have changed the maintenance mode. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } } /** @@ -280,8 +297,6 @@ private void validateNoShufflingOccurred(Map originalEVs, private void migrateInstanceConfigTopology(List instanceConfigs) throws Exception { - // Enter maintenance mode to update instance configurations - setAndVerifyMaintenanceMode(true); for (InstanceConfig instanceConfig : instanceConfigs) { String rackId = instanceConfig.getDomainAsMap().get(RACK); @@ -297,16 +312,9 @@ private void migrateInstanceConfigTopology(List instanceConfigs) _gSetupTool.getClusterManagementTool() .setInstanceConfig(CLUSTER_NAME, instanceConfig.getInstanceName(), instanceConfig); } - - // Exit maintenance mode after updating instance configurations - setAndVerifyMaintenanceMode(false); - Assert.assertTrue(_clusterVerifier.verifyByPolling()); } private void migrateDomainForResourceGroup(String resourceGroup) throws Exception { - // Enter maintenance mode to update domain values - setAndVerifyMaintenanceMode(true); - int instanceIndex = 0; for (MockParticipantManager participant : _participants) { InstanceConfig instanceConfig = _gSetupTool.getClusterManagementTool() @@ -321,11 +329,5 @@ private void migrateDomainForResourceGroup(String resourceGroup) throws Exceptio instanceIndex++; } } - - // Exit maintenance mode after updating domain values - setAndVerifyMaintenanceMode(false); - - // Verify cluster state after domain update - Assert.assertTrue(_clusterVerifier.verifyByPolling()); } }