From 34dad76402222aadd36cc609a54df5371343d5f7 Mon Sep 17 00:00:00 2001 From: frankmu Date: Thu, 25 Jul 2024 14:36:56 -0700 Subject: [PATCH 1/6] Create condition based rebalancer (#2846) Create condition based rebalancer --- .../ResourceControllerDataProvider.java | 26 +++ .../rebalancer/ConditionBasedRebalancer.java | 215 ++++++++++++++++++ .../condition/ConfigChangeBasedCondition.java | 11 + .../condition/RebalanceCondition.java | 19 ++ .../condition/RebalanceConditionsBuilder.java | 22 ++ .../TopologyChangeBasedCondition.java | 11 + 6 files changed, 304 insertions(+) create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java index 021aab6a84..cdfcb0f246 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java @@ -73,6 +73,9 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider { // a map from customized state type to customized view cache private final Map _customizedViewCacheMap; + // maintain a cache of ideal state (preference list + best possible assignment) which will be managed ondemand in rebalancer + private final Map _ondemandIdealStateCache; + // maintain a cache of bestPossible assignment across pipeline runs // TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache. private Map _resourceAssignmentCache; @@ -149,6 +152,7 @@ public String getObjName(ExternalView obj) { _refreshedChangeTypes = ConcurrentHashMap.newKeySet(); _customizedStateCache = new CustomizedStateCache(this, _aggregationEnabledTypes); _customizedViewCacheMap = new HashMap<>(); + _ondemandIdealStateCache = new HashMap<>(); } public synchronized void refresh(HelixDataAccessor accessor) { @@ -388,6 +392,28 @@ public Map> getLastTopStateLocationMap() { return _lastTopStateLocationMap; } + /** + * Get cached ideal state (preference list + best possible assignment) for a resource + * @param resource + * @return + */ + public ZNRecord getCachedOndemandIdealState(String resource) { + return _ondemandIdealStateCache.get(resource); + } + + /** + * Cache ideal state (preference list + best possible assignment) for a resource + * @param resource + * @return + */ + public void setCachedOndemandIdealState(String resource, ZNRecord idealState) { + _ondemandIdealStateCache.put(resource, idealState); + } + + public void clearCachedOndemandIdealStates() { + _ondemandIdealStateCache.clear(); + } + /** * Get cached resourceAssignment (bestPossible mapping) for a resource * @param resource diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java new file mode 100644 index 0000000000..158699a97d --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java @@ -0,0 +1,215 @@ +package org.apache.helix.controller.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixException; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.condition.RebalanceCondition; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@code ConditionBasedRebalancer} class extends the {@link AbstractRebalancer} and + * perform the rebalance operation based on specific list of conditions defined by the + * {@link RebalanceCondition} interface. + */ +public class ConditionBasedRebalancer extends AbstractRebalancer { + private static final Logger LOG = LoggerFactory.getLogger(ConditionBasedRebalancer.class); + private final List _rebalanceConditions; + + public ConditionBasedRebalancer() { + this._rebalanceConditions = new ArrayList<>(); + } + + public ConditionBasedRebalancer(List rebalanceConditions) { + this._rebalanceConditions = rebalanceConditions; + } + + /** + * Compute new Ideal State iff all conditions are met, otherwise just return from cached Ideal State + * + * @param resourceName the name of the resource for which to compute the new ideal state. + * @param currentIdealState the current {@link IdealState} of the resource. + * @param currentStateOutput the current state output, containing the actual states of the + * partitions. + * @param clusterData the {@link ResourceControllerDataProvider} instance providing + * additional data required for the computation. + * @return the newly computed {@link IdealState} for the resource. + */ + @Override + public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, + CurrentStateOutput currentStateOutput, ResourceControllerDataProvider clusterData) { + if (!this._rebalanceConditions.stream() + .allMatch(condition -> condition.shouldPerformRebalance(clusterData))) { + ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName); + if (cachedIdealState != null) { + return new IdealState(cachedIdealState); + } + // In theory, the cache should be populated already if no rebalance is needed + LOG.warn( + "Cannot fetch the cached Ideal State for resource: {}, will recompute the Ideal State", + resourceName); + } + + LOG.info("Computing IdealState for " + resourceName); + + List partitions = getStablePartitionList(clusterData, currentIdealState); + String stateModelName = currentIdealState.getStateModelDefRef(); + StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName); + if (stateModelDef == null) { + LOG.error("State Model Definition null for resource: " + resourceName); + throw new HelixException("State Model Definition null for resource: " + resourceName); + } + Map assignableLiveInstance = clusterData.getAssignableLiveInstances(); + int replicas = currentIdealState.getReplicaCount(assignableLiveInstance.size()); + + LinkedHashMap stateCountMap = + stateModelDef.getStateCountMap(assignableLiveInstance.size(), replicas); + List assignableLiveNodes = new ArrayList<>(assignableLiveInstance.keySet()); + List assignableNodes = new ArrayList<>(clusterData.getAssignableInstances()); + assignableNodes.removeAll(clusterData.getDisabledInstances()); + assignableLiveNodes.retainAll(assignableNodes); + + Map> currentMapping = + currentMapping(currentStateOutput, resourceName, partitions, stateCountMap); + + // If there are nodes tagged with resource name, use only those nodes + Set taggedNodes = new HashSet(); + Set taggedLiveNodes = new HashSet(); + if (currentIdealState.getInstanceGroupTag() != null) { + for (String instanceName : assignableNodes) { + if (clusterData.getAssignableInstanceConfigMap().get(instanceName) + .containsTag(currentIdealState.getInstanceGroupTag())) { + taggedNodes.add(instanceName); + if (assignableLiveInstance.containsKey(instanceName)) { + taggedLiveNodes.add(instanceName); + } + } + } + if (!taggedLiveNodes.isEmpty()) { + // live nodes exist that have this tag + if (LOG.isInfoEnabled()) { + LOG.info( + "found the following participants with tag " + currentIdealState.getInstanceGroupTag() + + " for " + resourceName + ": " + taggedLiveNodes); + } + } else if (taggedNodes.isEmpty()) { + // no live nodes and no configured nodes have this tag + LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag() + + " but no configured participants have this tag"); + } else { + // configured nodes have this tag, but no live nodes have this tag + LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag() + + " but no live participants have this tag"); + } + assignableNodes = new ArrayList<>(taggedNodes); + assignableLiveNodes = new ArrayList<>(taggedLiveNodes); + } + + // sort node lists to ensure consistent preferred assignments + Collections.sort(assignableNodes); + Collections.sort(assignableLiveNodes); + + int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); + _rebalanceStrategy = + getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName, + stateCountMap, maxPartition); + ZNRecord newMapping = + _rebalanceStrategy.computePartitionAssignment(assignableNodes, assignableLiveNodes, + currentMapping, clusterData); + + if (LOG.isDebugEnabled()) { + LOG.debug("currentMapping: {}", currentMapping); + LOG.debug("stateCountMap: {}", stateCountMap); + LOG.debug("assignableLiveNodes: {}", assignableLiveNodes); + LOG.debug("assignableNodes: {}", assignableNodes); + LOG.debug("maxPartition: {}", maxPartition); + LOG.debug("newMapping: {}", newMapping); + } + + IdealState newIdealState = new IdealState(resourceName); + newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); + newIdealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); + newIdealState.getRecord().setListFields(newMapping.getListFields()); + + clusterData.setCachedOndemandIdealState(resourceName, newIdealState.getRecord()); + + return newIdealState; + } + + /** + * Compute new assignment iff all conditions are met, otherwise just return from cached assignment + * + * @param cache the {@link ResourceControllerDataProvider} instance providing + * metadata and state information about the cluster. + * @param idealState the {@link IdealState} representing the current ideal state. + * @param resource the {@link Resource} for which to compute the best possible partition + * state. + * @param currentStateOutput the {@link CurrentStateOutput} containing the current states of the + * partitions. + * @return the {@link ResourceAssignment} representing the best possible state assignment for the + * partitions of the resource. + */ + @Override + public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDataProvider cache, + IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) { + ZNRecord cachedIdealState = cache.getCachedOndemandIdealState(resource.getResourceName()); + if (!this._rebalanceConditions.stream() + .allMatch(condition -> condition.shouldPerformRebalance(cache))) { + if (cachedIdealState != null && cachedIdealState.getMapFields() != null) { + ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName()); + for (Partition partition : resource.getPartitions()) { + partitionMapping.addReplicaMap(partition, cachedIdealState.getMapFields().get(partition)); + } + return new ResourceAssignment(cachedIdealState); + } + // In theory, the cache should be populated already if no rebalance is needed + LOG.warn("Cannot fetch the cached assignment for resource: {}, will recompute the assignment", + resource.getResourceName()); + } + + LOG.info("Computing BestPossibleMapping for " + resource.getResourceName()); + + // TODO: Change the logic to apply different assignment strategy + ResourceAssignment assignment = + super.computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput); + // Cache the assignment so no need to recompute the result next time + cachedIdealState.setMapFields(assignment.getRecord().getMapFields()); + cache.setCachedOndemandIdealState(resource.getResourceName(), cachedIdealState); + + return assignment; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java new file mode 100644 index 0000000000..3089b4342f --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java @@ -0,0 +1,11 @@ +package org.apache.helix.controller.rebalancer.condition; + +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; + +public class ConfigChangeBasedCondition implements RebalanceCondition { + @Override + public boolean shouldPerformRebalance(ResourceControllerDataProvider cache) { + // TODO: implement the condition check for config change + return false; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java new file mode 100644 index 0000000000..eafff4ef00 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java @@ -0,0 +1,19 @@ +package org.apache.helix.controller.rebalancer.condition; + +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; + +/** + * The {@code RebalanceCondition} interface defines a condition under which a rebalance operation + * should be performed. Implementations of this interface provide specific criteria to determine + * whether a rebalance is necessary based on the current state of the system. + */ +public interface RebalanceCondition { + /** + * Determines whether a rebalance should be performed based on the provided + * {@link ResourceControllerDataProvider} cache data. + * + * @param cache the {@code ResourceControllerDataProvider} cached data of the resources being managed. + * @return {@code true} if the rebalance should be performed, {@code false} otherwise. + */ + boolean shouldPerformRebalance(ResourceControllerDataProvider cache); +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java new file mode 100644 index 0000000000..a5b11e8d6f --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java @@ -0,0 +1,22 @@ +package org.apache.helix.controller.rebalancer.condition; + +import java.util.ArrayList; +import java.util.List; + +public class RebalanceConditionsBuilder { + private final List _rebalanceConditions = new ArrayList<>(); + + public RebalanceConditionsBuilder withConfigChangeBasedCondition() { + _rebalanceConditions.add(new ConfigChangeBasedCondition()); + return this; + } + + public RebalanceConditionsBuilder withTopologyChangeBasedCondition() { + _rebalanceConditions.add(new TopologyChangeBasedCondition()); + return this; + } + + public List build() { + return _rebalanceConditions; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java new file mode 100644 index 0000000000..99356a6d0a --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java @@ -0,0 +1,11 @@ +package org.apache.helix.controller.rebalancer.condition; + +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; + +public class TopologyChangeBasedCondition implements RebalanceCondition { + @Override + public boolean shouldPerformRebalance(ResourceControllerDataProvider cache) { + // TODO: implement the condition check for topology change + return false; + } +} From 912f272700f5a9716b23fff67fac5f193e56413a Mon Sep 17 00:00:00 2001 From: frankmu Date: Fri, 26 Jul 2024 13:22:30 -0700 Subject: [PATCH 2/6] Add missing license for rebalance condition files (#2853) Add missing license for rebalance condition files Co-authored-by: Tengfei Mu --- .../condition/ConfigChangeBasedCondition.java | 19 +++++++++++++++++++ .../condition/RebalanceCondition.java | 19 +++++++++++++++++++ .../condition/RebalanceConditionsBuilder.java | 19 +++++++++++++++++++ .../TopologyChangeBasedCondition.java | 19 +++++++++++++++++++ 4 files changed, 76 insertions(+) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java index 3089b4342f..9441ed1b90 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/ConfigChangeBasedCondition.java @@ -1,5 +1,24 @@ package org.apache.helix.controller.rebalancer.condition; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; public class ConfigChangeBasedCondition implements RebalanceCondition { diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java index eafff4ef00..cf04f00f4c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceCondition.java @@ -1,5 +1,24 @@ package org.apache.helix.controller.rebalancer.condition; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; /** diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java index a5b11e8d6f..c62b9d7d22 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/RebalanceConditionsBuilder.java @@ -1,5 +1,24 @@ package org.apache.helix.controller.rebalancer.condition; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import java.util.ArrayList; import java.util.List; diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java index 99356a6d0a..82e9398a8b 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/condition/TopologyChangeBasedCondition.java @@ -1,5 +1,24 @@ package org.apache.helix.controller.rebalancer.condition; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; public class TopologyChangeBasedCondition implements RebalanceCondition { From 46962336ae740932d06865041d72ddcc6e10778b Mon Sep 17 00:00:00 2001 From: frankmu Date: Fri, 9 Aug 2024 12:13:41 -0700 Subject: [PATCH 3/6] Helix stickiness rebalancer (#2878) Create sticky assignment rebalance strategy --- .../helix/controller/common/CapacityNode.java | 14 ++ .../ResourceControllerDataProvider.java | 8 +- .../rebalancer/ConditionBasedRebalancer.java | 18 +- .../strategy/GreedyRebalanceStrategy.java | 103 --------- .../strategy/StickyRebalanceStrategy.java | 202 +++++++++++++++++ .../org/apache/helix/model/ClusterConfig.java | 2 +- .../TestGreedyRebalanceStrategy.java | 85 ------- .../TestStickyRebalanceStrategy.java | 214 ++++++++++++++++++ ...eWithGlobalPerInstancePartitionLimit.java} | 25 +- 9 files changed, 467 insertions(+), 204 deletions(-) delete mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java delete mode 100644 helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java create mode 100644 helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java rename helix-core/src/test/java/org/apache/helix/integration/rebalancer/{TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java => TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java} (77%) diff --git a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java index fa5068e132..208ee79139 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java @@ -52,11 +52,25 @@ public boolean canAdd(String resource, String partition) { && _partitionMap.get(resource).contains(partition))) { return false; } + + // Add the partition to the resource's set of partitions in this node _partitionMap.computeIfAbsent(resource, k -> new HashSet<>()).add(partition); _currentlyAssigned++; return true; } + /** + * Checks if a specific resource + partition is assigned to this node. + * + * @param resource the name of the resource + * @param partition the partition + * @return {@code true} if the resource + partition is assigned to this node, {@code false} otherwise + */ + public boolean hasPartition(String resource, String partition) { + Set partitions = _partitionMap.get(resource); + return partitions != null && partitions.contains(partition); + } + /** * Set the capacity of this node * @param capacity The capacity to set diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java index cdfcb0f246..74b5aa8b2e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java @@ -41,7 +41,7 @@ import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.common.CapacityNode; import org.apache.helix.controller.pipeline.Pipeline; -import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy; import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity; import org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider; import org.apache.helix.controller.stages.MissingTopStateRecord; @@ -191,11 +191,11 @@ public synchronized void refresh(HelixDataAccessor accessor) { // Remove all cached IdealState because it is a global computation cannot partially be // performed for some resources. The computation is simple as well not taking too much resource // to recompute the assignments. - Set cachedGreedyIdealStates = _idealMappingCache.values().stream().filter( + Set cachedStickyIdealStates = _idealMappingCache.values().stream().filter( record -> record.getSimpleField(IdealState.IdealStateProperty.REBALANCE_STRATEGY.name()) - .equals(GreedyRebalanceStrategy.class.getName())).map(ZNRecord::getId) + .equals(StickyRebalanceStrategy.class.getName())).map(ZNRecord::getId) .collect(Collectors.toSet()); - _idealMappingCache.keySet().removeAll(cachedGreedyIdealStates); + _idealMappingCache.keySet().removeAll(cachedStickyIdealStates); } LogUtil.logInfo(logger, getClusterEventId(), String.format( diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java index 158699a97d..84762ea51c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java @@ -20,12 +20,12 @@ */ import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.helix.HelixException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; @@ -98,8 +98,8 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId LinkedHashMap stateCountMap = stateModelDef.getStateCountMap(assignableLiveInstance.size(), replicas); - List assignableLiveNodes = new ArrayList<>(assignableLiveInstance.keySet()); - List assignableNodes = new ArrayList<>(clusterData.getAssignableInstances()); + Set assignableLiveNodes = new HashSet<>(assignableLiveInstance.keySet()); + Set assignableNodes = new HashSet<>(clusterData.getAssignableInstances()); assignableNodes.removeAll(clusterData.getDisabledInstances()); assignableLiveNodes.retainAll(assignableNodes); @@ -135,20 +135,22 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag() + " but no live participants have this tag"); } - assignableNodes = new ArrayList<>(taggedNodes); - assignableLiveNodes = new ArrayList<>(taggedLiveNodes); + assignableNodes = new HashSet<>(taggedNodes); + assignableLiveNodes = new HashSet<>(taggedLiveNodes); } // sort node lists to ensure consistent preferred assignments - Collections.sort(assignableNodes); - Collections.sort(assignableLiveNodes); + List assignableNodesList = + assignableNodes.stream().sorted().collect(Collectors.toList()); + List assignableLiveNodesList = + assignableLiveNodes.stream().sorted().collect(Collectors.toList()); int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); _rebalanceStrategy = getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName, stateCountMap, maxPartition); ZNRecord newMapping = - _rebalanceStrategy.computePartitionAssignment(assignableNodes, assignableLiveNodes, + _rebalanceStrategy.computePartitionAssignment(assignableNodesList, assignableLiveNodesList, currentMapping, clusterData); if (LOG.isDebugEnabled()) { diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java deleted file mode 100644 index 60580c40a4..0000000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java +++ /dev/null @@ -1,103 +0,0 @@ -package org.apache.helix.controller.rebalancer.strategy; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.helix.controller.common.CapacityNode; -import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class GreedyRebalanceStrategy implements RebalanceStrategy { - private static Logger logger = LoggerFactory.getLogger(GreedyRebalanceStrategy.class); - private String _resourceName; - private List _partitions; - private LinkedHashMap _states; - - public GreedyRebalanceStrategy() { - } - - @Override - public void init(String resourceName, final List partitions, - final LinkedHashMap states, int maximumPerNode) { - _resourceName = resourceName; - _partitions = partitions; - _states = states; - } - - @Override - public ZNRecord computePartitionAssignment(final List allNodes, final List liveNodes, - final Map> currentMapping, ResourceControllerDataProvider clusterData) { - int numReplicas = countStateReplicas(); - ZNRecord znRecord = new ZNRecord(_resourceName); - if (liveNodes.size() == 0) { - return znRecord; - } - - if (clusterData.getSimpleCapacitySet() == null) { - logger.warn("No capacity set for resource: " + _resourceName); - return znRecord; - } - - // Sort the assignable nodes by id - List assignableNodes = new ArrayList<>(clusterData.getSimpleCapacitySet()); - Collections.sort(assignableNodes, Comparator.comparing(CapacityNode::getId)); - - // Assign partitions to node by order. - for (int i = 0, index = 0; i < _partitions.size(); i++) { - int startIndex = index; - List preferenceList = new ArrayList<>(); - for (int j = 0; j < numReplicas; j++) { - while (index - startIndex < assignableNodes.size()) { - CapacityNode node = assignableNodes.get(index++ % assignableNodes.size()); - if (node.canAdd(_resourceName, _partitions.get(i))) { - preferenceList.add(node.getId()); - break; - } - } - - if (index - startIndex >= assignableNodes.size()) { - // If the all nodes have been tried out, then no node can be assigned. - logger.warn("No enough assignable nodes for resource: " + _resourceName); - } - } - znRecord.setListField(_partitions.get(i), preferenceList); - } - - return znRecord; - } - - private int countStateReplicas() { - int total = 0; - for (Integer count : _states.values()) { - total += count; - } - return total; - } -} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java new file mode 100644 index 0000000000..7def42d087 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java @@ -0,0 +1,202 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.helix.controller.common.CapacityNode; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StickyRebalanceStrategy implements RebalanceStrategy { + private static Logger logger = LoggerFactory.getLogger(StickyRebalanceStrategy.class); + private String _resourceName; + private List _partitions; + private LinkedHashMap _states; + private int _statesReplicaCount; + + public StickyRebalanceStrategy() { + } + + @Override + public void init(String resourceName, final List partitions, + final LinkedHashMap states, int maximumPerNode) { + _resourceName = resourceName; + _partitions = partitions; + _states = states; + if (_states != null) { + _statesReplicaCount = _states.values().stream().mapToInt(Integer::intValue).sum(); + } + } + + @Override + public ZNRecord computePartitionAssignment(final List allNodes, + final List liveNodes, final Map> currentMapping, + ResourceControllerDataProvider clusterData) { + ZNRecord znRecord = new ZNRecord(_resourceName); + if (liveNodes.isEmpty()) { + return znRecord; + } + + if (clusterData.getSimpleCapacitySet() == null) { + logger.warn("No capacity set for resource: {}", _resourceName); + return znRecord; + } + + // Sort the assignable nodes by id + List assignableNodes = new ArrayList<>(clusterData.getSimpleCapacitySet()); + assignableNodes.sort(Comparator.comparing(CapacityNode::getId)); + + // Filter out the nodes if not in the liveNodes parameter + // Note the liveNodes parameter here might be processed within the rebalancer, e.g. filter based on tags + Set liveNodesSet = new HashSet<>(liveNodes); + assignableNodes.removeIf(n -> !liveNodesSet.contains(n.getId())); + + // Populate valid state map given current mapping + Map> stateMap = + populateValidStateMapFromCurrentMapping(currentMapping, assignableNodes); + + if (logger.isDebugEnabled()) { + logger.debug("currentMapping: {}", currentMapping); + logger.debug("stateMap: {}", stateMap); + } + + // Assign partitions to node by order. + for (int i = 0, index = 0; i < _partitions.size(); i++) { + int startIndex = index; + for (Map.Entry entry : _states.entrySet()) { + String state = entry.getKey(); + int stateReplicaNumber = entry.getValue(); + // For this partition, compute existing number replicas + long existsReplicas = + stateMap.computeIfAbsent(_partitions.get(i), m -> new HashMap<>()).values().stream() + .filter(s -> s.equals(state)).count(); + for (int j = 0; j < stateReplicaNumber - existsReplicas; j++) { + while (index - startIndex < assignableNodes.size()) { + CapacityNode node = assignableNodes.get(index++ % assignableNodes.size()); + if (node.canAdd(_resourceName, _partitions.get(i))) { + stateMap.get(_partitions.get(i)).put(node.getId(), state); + break; + } + } + + if (index - startIndex >= assignableNodes.size()) { + // If the all nodes have been tried out, then no node can be assigned. + logger.warn("No enough assignable nodes for resource: {}", _resourceName); + } + } + } + } + for (Map.Entry> entry : stateMap.entrySet()) { + znRecord.setListField(entry.getKey(), new ArrayList<>(entry.getValue().keySet())); + } + if (logger.isDebugEnabled()) { + logger.debug("znRecord: {}", znRecord); + } + + return znRecord; + } + + /** + * Populates a valid state map from the current mapping, filtering out invalid nodes. + * + * @param currentMapping the current mapping of partitions to node states + * @param assignableNodes the list of nodes that can be assigned + * @return a map of partitions to valid node states + */ + private Map> populateValidStateMapFromCurrentMapping( + final Map> currentMapping, + final List assignableNodes) { + Map> validStateMap = new HashMap<>(); + // Convert the assignableNodes to map for quick lookup + Map assignableNodeMap = + assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId, node -> node)); + if (currentMapping != null) { + for (Map.Entry> entry : currentMapping.entrySet()) { + String partition = entry.getKey(); + Map currentNodeStateMap = new HashMap<>(entry.getValue()); + // Skip if current node state is invalid with state model + if (!isValidStateMap(currentNodeStateMap)) { + continue; + } + // Filter out invalid node assignment + currentNodeStateMap.entrySet() + .removeIf(e -> !isValidNodeAssignment(partition, e.getKey(), assignableNodeMap)); + + validStateMap.put(partition, currentNodeStateMap); + } + } + return validStateMap; + } + + /** + * Validates whether the provided state mapping is valid according to the defined state model. + * + * @param currentNodeStateMap A map representing the actual state mapping where the key is the node ID and the value is the state. + * @return true if the state map is valid, false otherwise + */ + private boolean isValidStateMap(final Map currentNodeStateMap) { + // Check if the size of the current state map exceeds the total state count in state model + if (currentNodeStateMap.size() > _statesReplicaCount) { + return false; + } + + Map tmpStates = new HashMap<>(_states); + for (String state : currentNodeStateMap.values()) { + // Return invalid if: + // The state is not defined in the state model OR + // The state count exceeds the defined count in state model + if (!tmpStates.containsKey(state) || tmpStates.get(state) <= 0) { + return false; + } + tmpStates.put(state, tmpStates.get(state) - 1); + } + + return true; + } + + /** + * Checks if a node assignment is valid for a given partition. + * + * @param partition the partition to be assigned + * @param nodeId the ID of the node to be checked + * @param assignableNodeMap the map of node IDs to CapacityNode objects + * @return true if the node is valid for the assignment, false otherwise + */ + private boolean isValidNodeAssignment(final String partition, final String nodeId, + final Map assignableNodeMap) { + CapacityNode node = assignableNodeMap.get(nodeId); + // Return valid when following conditions match: + // 1. Node is in assignableNodeMap + // 2. Node hold current partition or we can assign current partition to the node + return node != null && (node.hasPartition(_resourceName, partition) || node.canAdd( + _resourceName, partition)); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index edb7a76c6d..ab2c40b791 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -71,7 +71,7 @@ public enum ClusterConfigProperty { // The following concerns maintenance mode MAX_PARTITIONS_PER_INSTANCE, // The maximum number of partitions that an instance can serve in this cluster. - // This only works for GreedyRebalanceStrategy. + // This only works for StickyRebalanceStrategy. // TODO: if we want to support this for other rebalancers, we need to implement that logic GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE, // The following two include offline AND disabled instances diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java deleted file mode 100644 index d90e16136b..0000000000 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java +++ /dev/null @@ -1,85 +0,0 @@ -package org.apache.helix.controller.rebalancer; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.helix.controller.common.CapacityNode; -import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.Test; - -import static org.mockito.Mockito.when; - -public class TestGreedyRebalanceStrategy { - private static final String TEST_CLUSTER_NAME = "TestCluster"; - private static final String TEST_RESOURCE_PREFIX = "TestResource_"; - - @Test - public void testAssignmentWithGlobalPartitionLimit() { - - ResourceControllerDataProvider clusterDataCache = - Mockito.mock(ResourceControllerDataProvider.class); - LinkedHashMap states = new LinkedHashMap(2); - states.put("OFFLINE", 0); - states.put("ONLINE", 1); - - Set capacityNodeSet = new HashSet<>(); - for (int i = 0; i < 5; i++) { - CapacityNode capacityNode = new CapacityNode("Node-" + i); - capacityNode.setCapacity(1); - capacityNodeSet.add(capacityNode); - } - - List liveNodes = - capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList()); - - List partitions = new ArrayList<>(); - for (int i = 0; i < 3; i++) { - partitions.add(TEST_RESOURCE_PREFIX + "0_" + i); - } - when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet); - - GreedyRebalanceStrategy greedyRebalanceStrategy = new GreedyRebalanceStrategy(); - greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 1); - greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache); - - partitions = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - partitions.add(TEST_RESOURCE_PREFIX + "1_" + i); - } - greedyRebalanceStrategy = new GreedyRebalanceStrategy(); - greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 1, partitions, states, 1); - greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache); - - Assert.assertEquals( - capacityNodeSet.stream().filter(node -> node.getCurrentlyAssigned() != 1).count(), 0); - } -} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java new file mode 100644 index 0000000000..45211df4e7 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java @@ -0,0 +1,214 @@ +package org.apache.helix.controller.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.helix.controller.common.CapacityNode; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.when; + +public class TestStickyRebalanceStrategy { + private static final String TEST_CLUSTER_NAME = "TestCluster"; + private static final String TEST_RESOURCE_PREFIX = "TestResource_"; + + @Test + public void testAssignmentWithGlobalPartitionLimit() { + + ResourceControllerDataProvider clusterDataCache = + Mockito.mock(ResourceControllerDataProvider.class); + LinkedHashMap states = new LinkedHashMap(2); + states.put("OFFLINE", 0); + states.put("ONLINE", 1); + + Set capacityNodeSet = new HashSet<>(); + for (int i = 0; i < 5; i++) { + CapacityNode capacityNode = new CapacityNode("Node-" + i); + capacityNode.setCapacity(1); + capacityNodeSet.add(capacityNode); + } + + List liveNodes = + capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList()); + + List partitions = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + partitions.add(TEST_RESOURCE_PREFIX + "0_" + i); + } + when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet); + + StickyRebalanceStrategy greedyRebalanceStrategy = new StickyRebalanceStrategy(); + greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 1); + greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache); + + partitions = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + partitions.add(TEST_RESOURCE_PREFIX + "1_" + i); + } + greedyRebalanceStrategy = new StickyRebalanceStrategy(); + greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 1, partitions, states, 1); + greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache); + + Assert.assertEquals( + capacityNodeSet.stream().filter(node -> node.getCurrentlyAssigned() != 1).count(), 0); + } + + @Test + public void testStickyAssignment() { + final int nReplicas = 4; + final int nPartitions = 4; + final int nNode = 16; + + ResourceControllerDataProvider clusterDataCache = + Mockito.mock(ResourceControllerDataProvider.class); + LinkedHashMap states = new LinkedHashMap(2); + states.put("OFFLINE", 0); + states.put("ONLINE", nReplicas); + + Set capacityNodeSet = new HashSet<>(); + for (int i = 0; i < nNode; i++) { + CapacityNode capacityNode = new CapacityNode("Node-" + i); + capacityNode.setCapacity(1); + capacityNodeSet.add(capacityNode); + } + + List liveNodes = + capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList()); + + List partitions = new ArrayList<>(); + for (int i = 0; i < nPartitions; i++) { + partitions.add(TEST_RESOURCE_PREFIX + i); + } + when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet); + + // Populate previous assignment with currentMapping + Map> currentMapping = new HashMap<>(); + currentMapping.put(TEST_RESOURCE_PREFIX + "0", new HashMap<>()); + currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-0", "ONLINE"); + currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-2", "ONLINE"); + currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-4", "ONLINE"); + currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-6", "ONLINE"); + currentMapping.put(TEST_RESOURCE_PREFIX + "2", new HashMap<>()); + currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-1", "ONLINE"); + currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-5", "ONLINE"); + currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-8", "ONLINE"); + + StickyRebalanceStrategy greedyRebalanceStrategy = new StickyRebalanceStrategy(); + greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 1); + ZNRecord shardAssignment = + greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, currentMapping, + clusterDataCache); + + // Assert the existing assignment won't be changed + Assert.assertEquals(currentMapping.get(TEST_RESOURCE_PREFIX + "0").keySet(), + new HashSet<>(shardAssignment.getListField(TEST_RESOURCE_PREFIX + "0"))); + Assert.assertTrue(shardAssignment.getListField(TEST_RESOURCE_PREFIX + "2") + .containsAll(currentMapping.get(TEST_RESOURCE_PREFIX + "2").keySet())); + } + + @Test + public void testStickyAssignmentMultipleTimes() { + final int nReplicas = 4; + final int nPartitions = 4; + final int nNode = 12; + + ResourceControllerDataProvider clusterDataCache = + Mockito.mock(ResourceControllerDataProvider.class); + LinkedHashMap states = new LinkedHashMap(2); + states.put("OFFLINE", 0); + states.put("ONLINE", nReplicas); + + Set capacityNodeSet = new HashSet<>(); + for (int i = 0; i < nNode; i++) { + CapacityNode capacityNode = new CapacityNode("Node-" + i); + capacityNode.setCapacity(1); + capacityNodeSet.add(capacityNode); + } + + List liveNodes = + capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList()); + + List partitions = new ArrayList<>(); + for (int i = 0; i < nPartitions; i++) { + partitions.add(TEST_RESOURCE_PREFIX + i); + } + when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet); + + StickyRebalanceStrategy greedyRebalanceStrategy = new StickyRebalanceStrategy(); + greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 1); + // First round assignment computation: + // 1. Without previous assignment (currentMapping is null) + // 2. Without enough assignable nodes + ZNRecord firstRoundShardAssignment = + greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache); + + // Assert only 3 partitions are fulfilled with assignment + Assert.assertEquals(firstRoundShardAssignment.getListFields().entrySet().stream() + .filter(e -> e.getValue().size() == nReplicas).count(), 3); + + // Assign 4 more nodes which is used in second round assignment computation + for (int i = nNode; i < nNode + 4; i++) { + CapacityNode capacityNode = new CapacityNode("Node-" + i); + capacityNode.setCapacity(1); + capacityNodeSet.add(capacityNode); + } + + liveNodes = capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList()); + + // Populate previous assignment (currentMapping) with first round assignment computation result + Map> currentMapping = new HashMap<>(); + firstRoundShardAssignment.getListFields().entrySet().stream() + .filter(e -> e.getValue().size() == nReplicas).forEach(e -> { + currentMapping.put(e.getKey(), new HashMap<>()); + for (String nodeId : e.getValue()) { + currentMapping.get(e.getKey()).put(nodeId, "ONLINE"); + } + }); + + // Second round assignment computation: + // 1. With previous assignment (currentMapping) + // 2. With enough assignable nodes + ZNRecord secondRoundShardAssignment = + greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, currentMapping, + clusterDataCache); + + // Assert all partitions have been assigned with enough replica + Assert.assertEquals(secondRoundShardAssignment.getListFields().entrySet().stream() + .filter(e -> e.getValue().size() == nReplicas).count(), nPartitions); + // For previously existing assignment, assert there is no assignment change + currentMapping.forEach((partition, nodeMapping) -> { + Assert.assertEquals(nodeMapping.keySet(), + new HashSet<>(secondRoundShardAssignment.getListField(partition))); + }); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java similarity index 77% rename from helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java rename to helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java index 6b675a8a35..e0d160b78e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java @@ -1,5 +1,24 @@ package org.apache.helix.integration.rebalancer; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -15,7 +34,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -public class TestGreedyRebalanceWithGlobalPerInstancePartitionLimit extends TaskTestBase { +public class TestStickyRebalanceWithGlobalPerInstancePartitionLimit extends TaskTestBase { @BeforeClass public void beforeClass() throws Exception { @@ -48,14 +67,14 @@ public void testGreedyRebalanceWithGlobalPerInstancePartitionLimit() throws Inte IdealState idealState = _gSetupTool.getClusterManagementTool() .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB); idealState.setRebalanceStrategy( - "org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy"); + "org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy"); _gSetupTool.getClusterManagementTool() .setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, idealState); Assert.assertTrue(_clusterVerifier.verifyByPolling()); _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, "NewDB", 2, "OnlineOffline", IdealState.RebalanceMode.FULL_AUTO.name(), - "org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy"); + "org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy"); _gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, "NewDB", 1); Assert.assertTrue(_clusterVerifier.verifyByPolling()); From 510f5d84a3b659bab37671d275c312acb917c20e Mon Sep 17 00:00:00 2001 From: frankmu Date: Thu, 29 Aug 2024 16:12:41 -0700 Subject: [PATCH 4/6] Move existing assignments usage calculation to pre-process stage (#2888) Move existing assignments usage calculation to pre-process stage --- .../ResourceControllerDataProvider.java | 31 ++ .../rebalancer/ConditionBasedRebalancer.java | 36 +- .../strategy/StickyRebalanceStrategy.java | 96 ++--- .../stages/CurrentStateComputationStage.java | 8 + .../org/apache/helix/common/ZkTestBase.java | 18 +- .../TestStickyRebalanceStrategy.java | 353 ++++++++++++++++++ 6 files changed, 458 insertions(+), 84 deletions(-) create mode 100644 helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java index 74b5aa8b2e..37811d9a29 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java @@ -44,6 +44,7 @@ import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy; import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity; import org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider; +import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.stages.MissingTopStateRecord; import org.apache.helix.model.CustomizedState; import org.apache.helix.model.CustomizedStateConfig; @@ -51,6 +52,7 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.slf4j.Logger; @@ -583,6 +585,35 @@ public Set getSimpleCapacitySet() { return _simpleCapacitySet; } + public void populateSimpleCapacitySetUsage(final Set resourceNameSet, + final CurrentStateOutput currentStateOutput) { + // Convert the assignableNodes to map for quick lookup + Map simpleCapacityMap = new HashMap<>(); + for (CapacityNode node : _simpleCapacitySet) { + simpleCapacityMap.put(node.getId(), node); + } + for (String resourceName : resourceNameSet) { + // Process current state mapping + populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap, + currentStateOutput.getCurrentStateMap(resourceName)); + // Process pending state mapping + populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap, + currentStateOutput.getPendingMessageMap(resourceName)); + } + } + + private void populateCapacityNodeUsageFromStateMap(String resourceName, + Map simpleCapacityMap, Map> stateMap) { + for (Map.Entry> entry : stateMap.entrySet()) { + for (String instanceName : entry.getValue().keySet()) { + CapacityNode node = simpleCapacityMap.get(instanceName); + if (node != null) { + node.canAdd(resourceName, entry.getKey().getPartitionName()); + } + } + } + } + private void refreshDisabledInstancesForAllPartitionsSet() { _disabledInstancesForAllPartitionsSet.clear(); Collection allConfigs = getInstanceConfigMap().values(); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java index 84762ea51c..6e68033c49 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java @@ -72,16 +72,12 @@ public ConditionBasedRebalancer(List rebalanceConditions) { @Override public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, CurrentStateOutput currentStateOutput, ResourceControllerDataProvider clusterData) { - if (!this._rebalanceConditions.stream() + ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName); + // If previous placement list exists in cache && all condition met -> return cached value + if (cachedIdealState != null && cachedIdealState.getListFields() != null + && !cachedIdealState.getListFields().isEmpty() && !this._rebalanceConditions.stream() .allMatch(condition -> condition.shouldPerformRebalance(clusterData))) { - ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName); - if (cachedIdealState != null) { - return new IdealState(cachedIdealState); - } - // In theory, the cache should be populated already if no rebalance is needed - LOG.warn( - "Cannot fetch the cached Ideal State for resource: {}, will recompute the Ideal State", - resourceName); + return new IdealState(cachedIdealState); } LOG.info("Computing IdealState for " + resourceName); @@ -189,18 +185,16 @@ public IdealState computeNewIdealState(String resourceName, IdealState currentId public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDataProvider cache, IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) { ZNRecord cachedIdealState = cache.getCachedOndemandIdealState(resource.getResourceName()); - if (!this._rebalanceConditions.stream() + // If previous assignment map exists in cache && all condition met -> return cached value + if (cachedIdealState.getMapFields() != null && !cachedIdealState.getMapFields().isEmpty() + && !this._rebalanceConditions.stream() .allMatch(condition -> condition.shouldPerformRebalance(cache))) { - if (cachedIdealState != null && cachedIdealState.getMapFields() != null) { - ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName()); - for (Partition partition : resource.getPartitions()) { - partitionMapping.addReplicaMap(partition, cachedIdealState.getMapFields().get(partition)); - } - return new ResourceAssignment(cachedIdealState); + ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName()); + for (Partition partition : resource.getPartitions()) { + partitionMapping.addReplicaMap(partition, + cachedIdealState.getMapFields().get(partition.getPartitionName())); } - // In theory, the cache should be populated already if no rebalance is needed - LOG.warn("Cannot fetch the cached assignment for resource: {}, will recompute the assignment", - resource.getResourceName()); + return partitionMapping; } LOG.info("Computing BestPossibleMapping for " + resource.getResourceName()); @@ -212,6 +206,10 @@ public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDa cachedIdealState.setMapFields(assignment.getRecord().getMapFields()); cache.setCachedOndemandIdealState(resource.getResourceName(), cachedIdealState); + if (LOG.isDebugEnabled()) { + LOG.debug("Processed resource: {}", resource.getResourceName()); + LOG.debug("Final Mapping of resource : {}", assignment); + } return assignment; } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java index 7def42d087..3c3793cec2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java @@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory; public class StickyRebalanceStrategy implements RebalanceStrategy { - private static Logger logger = LoggerFactory.getLogger(StickyRebalanceStrategy.class); + private static final Logger logger = LoggerFactory.getLogger(StickyRebalanceStrategy.class); private String _resourceName; private List _partitions; private LinkedHashMap _states; @@ -70,52 +70,50 @@ public ZNRecord computePartitionAssignment(final List allNodes, return znRecord; } - // Sort the assignable nodes by id - List assignableNodes = new ArrayList<>(clusterData.getSimpleCapacitySet()); - assignableNodes.sort(Comparator.comparing(CapacityNode::getId)); - // Filter out the nodes if not in the liveNodes parameter // Note the liveNodes parameter here might be processed within the rebalancer, e.g. filter based on tags + Set assignableNodeSet = new HashSet<>(clusterData.getSimpleCapacitySet()); Set liveNodesSet = new HashSet<>(liveNodes); - assignableNodes.removeIf(n -> !liveNodesSet.contains(n.getId())); + assignableNodeSet.removeIf(n -> !liveNodesSet.contains(n.getId())); // Populate valid state map given current mapping - Map> stateMap = - populateValidStateMapFromCurrentMapping(currentMapping, assignableNodes); + Map> stateMap = + populateValidAssignmentMapFromCurrentMapping(currentMapping, assignableNodeSet); if (logger.isDebugEnabled()) { logger.debug("currentMapping: {}", currentMapping); logger.debug("stateMap: {}", stateMap); } + // Sort the assignable nodes by id + List assignableNodeList = + assignableNodeSet.stream().sorted(Comparator.comparing(CapacityNode::getId)) + .collect(Collectors.toList()); + // Assign partitions to node by order. for (int i = 0, index = 0; i < _partitions.size(); i++) { int startIndex = index; - for (Map.Entry entry : _states.entrySet()) { - String state = entry.getKey(); - int stateReplicaNumber = entry.getValue(); - // For this partition, compute existing number replicas - long existsReplicas = - stateMap.computeIfAbsent(_partitions.get(i), m -> new HashMap<>()).values().stream() - .filter(s -> s.equals(state)).count(); - for (int j = 0; j < stateReplicaNumber - existsReplicas; j++) { - while (index - startIndex < assignableNodes.size()) { - CapacityNode node = assignableNodes.get(index++ % assignableNodes.size()); - if (node.canAdd(_resourceName, _partitions.get(i))) { - stateMap.get(_partitions.get(i)).put(node.getId(), state); - break; - } + int remainingReplica = _statesReplicaCount; + if (stateMap.containsKey(_partitions.get(i))) { + remainingReplica = remainingReplica - stateMap.get(_partitions.get(i)).size(); + } + for (int j = 0; j < remainingReplica; j++) { + while (index - startIndex < assignableNodeList.size()) { + CapacityNode node = assignableNodeList.get(index++ % assignableNodeList.size()); + if (node.canAdd(_resourceName, _partitions.get(i))) { + stateMap.computeIfAbsent(_partitions.get(i), m -> new HashSet<>()).add(node.getId()); + break; } + } - if (index - startIndex >= assignableNodes.size()) { - // If the all nodes have been tried out, then no node can be assigned. - logger.warn("No enough assignable nodes for resource: {}", _resourceName); - } + if (index - startIndex >= assignableNodeList.size()) { + // If the all nodes have been tried out, then no node can be assigned. + logger.warn("No enough assignable nodes for resource: {}", _resourceName); } } } - for (Map.Entry> entry : stateMap.entrySet()) { - znRecord.setListField(entry.getKey(), new ArrayList<>(entry.getValue().keySet())); + for (Map.Entry> entry : stateMap.entrySet()) { + znRecord.setListField(entry.getKey(), new ArrayList<>(entry.getValue())); } if (logger.isDebugEnabled()) { logger.debug("znRecord: {}", znRecord); @@ -129,12 +127,12 @@ public ZNRecord computePartitionAssignment(final List allNodes, * * @param currentMapping the current mapping of partitions to node states * @param assignableNodes the list of nodes that can be assigned - * @return a map of partitions to valid node states + * @return a map of partitions to valid nodes */ - private Map> populateValidStateMapFromCurrentMapping( + private Map> populateValidAssignmentMapFromCurrentMapping( final Map> currentMapping, - final List assignableNodes) { - Map> validStateMap = new HashMap<>(); + final Set assignableNodes) { + Map> validAssignmentMap = new HashMap<>(); // Convert the assignableNodes to map for quick lookup Map assignableNodeMap = assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId, node -> node)); @@ -142,44 +140,14 @@ private Map> populateValidStateMapFromCurrentMapping for (Map.Entry> entry : currentMapping.entrySet()) { String partition = entry.getKey(); Map currentNodeStateMap = new HashMap<>(entry.getValue()); - // Skip if current node state is invalid with state model - if (!isValidStateMap(currentNodeStateMap)) { - continue; - } // Filter out invalid node assignment currentNodeStateMap.entrySet() .removeIf(e -> !isValidNodeAssignment(partition, e.getKey(), assignableNodeMap)); - validStateMap.put(partition, currentNodeStateMap); - } - } - return validStateMap; - } - - /** - * Validates whether the provided state mapping is valid according to the defined state model. - * - * @param currentNodeStateMap A map representing the actual state mapping where the key is the node ID and the value is the state. - * @return true if the state map is valid, false otherwise - */ - private boolean isValidStateMap(final Map currentNodeStateMap) { - // Check if the size of the current state map exceeds the total state count in state model - if (currentNodeStateMap.size() > _statesReplicaCount) { - return false; - } - - Map tmpStates = new HashMap<>(_states); - for (String state : currentNodeStateMap.values()) { - // Return invalid if: - // The state is not defined in the state model OR - // The state count exceeds the defined count in state model - if (!tmpStates.containsKey(state) || tmpStates.get(state) <= 0) { - return false; + validAssignmentMap.put(partition, new HashSet<>(currentNodeStateMap.keySet())); } - tmpStates.put(state, tmpStates.get(state) - 1); } - - return true; + return validAssignmentMap; } /** diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index da972d682c..64d113f0a7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -134,6 +134,14 @@ public void process(ClusterEvent event) throws Exception { handleResourceCapacityCalculation(event, (ResourceControllerDataProvider) cache, currentStateOutput); } + + // Populate the capacity for simple CapacityNode + if (cache.getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1 + && cache instanceof ResourceControllerDataProvider) { + final ResourceControllerDataProvider dataProvider = (ResourceControllerDataProvider) cache; + dataProvider.populateSimpleCapacitySetUsage(resourceToRebalance.keySet(), + currentStateExcludingUnknown); + } } // update all pending messages to CurrentStateOutput. diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index a265605185..28fa8d491f 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -19,7 +19,6 @@ * under the License. */ -import com.google.common.base.Preconditions; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.reflect.Method; @@ -33,6 +32,7 @@ import javax.management.MBeanServerConnection; import javax.management.ObjectName; +import com.google.common.base.Preconditions; import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; @@ -50,6 +50,7 @@ import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.Stage; import org.apache.helix.controller.pipeline.StageContext; +import org.apache.helix.controller.rebalancer.ConditionBasedRebalancer; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; @@ -365,6 +366,14 @@ protected void setLastOnDemandRebalanceTimeInCluster(HelixZkClient zkClient, configAccessor.setClusterConfig(clusterName, clusterConfig); } + protected void setGlobalMaxPartitionAllowedPerInstanceInCluster(HelixZkClient zkClient, + String clusterName, int maxPartitionAllowed) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setGlobalMaxPartitionAllowedPerInstance(maxPartitionAllowed); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, String stateModel, int numPartition, int replica, int minActiveReplica, long delay) { return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica, @@ -384,6 +393,13 @@ protected IdealState createResourceWithWagedRebalance(String clusterName, String -1, WagedRebalancer.class.getName(), null); } + protected IdealState createResourceWithConditionBasedRebalance(String clusterName, String db, + String stateModel, int numPartition, int replica, int minActiveReplica, + String rebalanceStrategy) { + return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica, -1, + ConditionBasedRebalancer.class.getName(), rebalanceStrategy); + } + private IdealState createResource(String clusterName, String db, String stateModel, int numPartition, int replica, int minActiveReplica, long delay, String rebalancerClassName, String rebalanceStrategy) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java new file mode 100644 index 0000000000..4e4717f39f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java @@ -0,0 +1,353 @@ +package org.apache.helix.integration.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class TestStickyRebalanceStrategy extends ZkTestBase { + static final int NUM_NODE = 18; + static final int ADDITIONAL_NUM_NODE = 2; + protected static final int START_PORT = 12918; + protected static final int PARTITIONS = 2; + protected static final int REPLICAS = 3; + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + + protected ClusterControllerManager _controller; + protected List _participants = new ArrayList<>(); + protected List _additionalParticipants = new ArrayList<>(); + protected int _minActiveReplica = 0; + protected ZkHelixClusterVerifier _clusterVerifier; + protected List _testDBs = new ArrayList<>(); + protected ConfigAccessor _configAccessor; + protected String[] TestStateModels = + {BuiltInStateModelDefinitions.MasterSlave.name(), BuiltInStateModelDefinitions.LeaderStandby.name(), BuiltInStateModelDefinitions.OnlineOffline.name()}; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + + // start dummy participants + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); + participant.syncStart(); + _participants.add(participant); + } + + for (int i = NUM_NODE; i < NUM_NODE + ADDITIONAL_NUM_NODE; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); + _additionalParticipants.add(participant); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + } + + @AfterClass + public void afterClass() throws Exception { + if (_clusterVerifier != null) { + _clusterVerifier.close(); + } + /* + shutdown order: 1) disconnect the controller 2) disconnect participants + */ + _controller.syncStop(); + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } + + @BeforeMethod + public void beforeTest() { + // Restart any participants that has been disconnected from last test + for (int i = 0; i < _participants.size(); i++) { + if (!_participants.get(i).isConnected()) { + _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, + _participants.get(i).getInstanceName())); + _participants.get(i).syncStart(); + } + } + + // Stop any additional participants that has been added from last test + for (MockParticipantManager additionalParticipant : _additionalParticipants) { + if (additionalParticipant.isConnected()) { + additionalParticipant.syncStop(); + } + } + } + + @AfterMethod + public void afterTest() throws InterruptedException { + // delete all DBs create in last test + for (String db : _testDBs) { + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + _testDBs.clear(); + _clusterVerifier.verifyByPolling(); + } + + @Test + public void testFirstTimeAssignmentWithNoInitialLiveNodes() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 1); + // Shut down all the nodes + for (int i = 0; i < NUM_NODE; i++) { + _participants.get(i).syncStop(); + } + // Create resource + Map externalViewsBefore = createTestDBs(); + // Start all the nodes + for (int i = 0; i < NUM_NODE; i++) { + _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, + _participants.get(i).getInstanceName())); + _participants.get(i).syncStart(); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + Map externalViewsAfter = new HashMap<>(); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + externalViewsAfter.put(db, ev); + } + validateAllPartitionAssigned(externalViewsAfter); + } + + @Test + public void testNoPartitionMovementWithNewInstanceAdd() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 1); + Map externalViewsBefore = createTestDBs(); + + // Start more new instances + for (int i = 0; i < _additionalParticipants.size(); i++) { + _additionalParticipants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, + _additionalParticipants.get(i).getInstanceName())); + _additionalParticipants.get(i).syncStart(); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // All partition assignment should remain the same + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev); + } + } + + @Test + public void testNoPartitionMovementWithInstanceDown() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 1); + Map externalViewsBefore = createTestDBs(); + + // Shut down 2 instances + _participants.get(0).syncStop(); + _participants.get(_participants.size() - 1).syncStop(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // No movement for previous remaining assignment + Map externalViewsAfter = new HashMap<>(); + Map idealStates = new HashMap<>(); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + externalViewsAfter.put(db, ev); + idealStates.put(db, is); + } + validateNoPartitionMoveWithDiffCount(idealStates, externalViewsBefore, externalViewsAfter, 2); + } + + @Test + public void testFirstTimeAssignmentWithStackingPlacement() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 2); + Map externalViewsBefore = createTestDBs(); + validateAllPartitionAssigned(externalViewsBefore); + } + + @Test + public void testNoPartitionMovementWithNewInstanceAddWithStackingPlacement() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 2); + Map externalViewsBefore = createTestDBs(); + + // Start more new instances + for (int i = 0; i < _additionalParticipants.size(); i++) { + _additionalParticipants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, + _additionalParticipants.get(i).getInstanceName())); + _additionalParticipants.get(i).syncStart(); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // All partition assignment should remain the same + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev); + } + } + + @Test + public void testNoPartitionMovementWithInstanceDownWithStackingPlacement() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 2); + // Shut down half of the nodes given we allow stacking placement + for (int i = 0; i < NUM_NODE / 2; i++) { + _participants.get(i).syncStop(); + } + Map externalViewsBefore = createTestDBs(); + + // Shut down 2 instances + _participants.get(_participants.size() - 1).syncStop(); + _participants.get(_participants.size() - 2).syncStop(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // No movement for previous remaining assignment + Map externalViewsAfter = new HashMap<>(); + Map idealStates = new HashMap<>(); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + externalViewsAfter.put(db, ev); + idealStates.put(db, is); + } + validateNoPartitionMoveWithDiffCount(idealStates, externalViewsBefore, externalViewsAfter, 4); + } + + // create test DBs, wait it converged and return externalviews + protected Map createTestDBs() throws InterruptedException { + Map externalViews = new HashMap(); + int i = 0; + for (String stateModel : TestStateModels) { + String db = "Test-DB-" + i++; + createResourceWithConditionBasedRebalance(CLUSTER_NAME, db, stateModel, PARTITIONS, REPLICAS, + _minActiveReplica, StickyRebalanceStrategy.class.getName()); + _testDBs.add(db); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + externalViews.put(db, ev); + } + return externalViews; + } + + protected void validateNoPartitionMove(IdealState is, ExternalView evBefore, + ExternalView evAfter) { + for (String partition : is.getPartitionSet()) { + Map assignmentsBefore = evBefore.getRecord().getMapField(partition); + Map assignmentsAfter = evAfter.getRecord().getMapField(partition); + + Set instancesBefore = new HashSet<>(assignmentsBefore.keySet()); + Set instancesAfter = new HashSet<>(assignmentsAfter.keySet()); + + Assert.assertEquals(instancesBefore, instancesAfter, + String.format("%s has been moved to new instances, before: %s, after: %s", partition, + assignmentsBefore, assignmentsAfter)); + } + } + + protected void validateNoPartitionMoveWithDiffCount(Map idealStates, + Map externalViewsBefore, Map externalViewsAfter, + int diffCount) { + for (Map.Entry entry : idealStates.entrySet()) { + String resourceName = entry.getKey(); + IdealState is = entry.getValue(); + for (String partition : is.getPartitionSet()) { + Map assignmentsBefore = + externalViewsBefore.get(resourceName).getRecord().getMapField(partition); + Map assignmentsAfter = + externalViewsAfter.get(resourceName).getRecord().getMapField(partition); + + Set instancesBefore = new HashSet<>(assignmentsBefore.keySet()); + Set instancesAfter = new HashSet<>(assignmentsAfter.keySet()); + + if (instancesBefore.size() == instancesAfter.size()) { + Assert.assertEquals(instancesBefore, instancesAfter, + String.format("%s has been moved to new instances, before: %s, after: %s", partition, + assignmentsBefore, assignmentsAfter)); + } else { + Assert.assertTrue(instancesBefore.containsAll(instancesAfter), + String.format("%s has been moved to new instances, before: %s, after: %s", partition, + assignmentsBefore, assignmentsAfter)); + diffCount = diffCount - (instancesBefore.size() - instancesAfter.size()); + } + } + } + Assert.assertEquals(diffCount, 0, + String.format("Partition movement detected, before: %s, after: %s", externalViewsBefore, + externalViewsAfter)); + } + + private void validateAllPartitionAssigned(Map externalViewsBefore) { + for (ExternalView ev : externalViewsBefore.values()) { + Map> assignments = ev.getRecord().getMapFields(); + Assert.assertNotNull(assignments); + Assert.assertEquals(assignments.size(), PARTITIONS); + for (Map assignmentMap : assignments.values()) { + Assert.assertEquals(assignmentMap.keySet().size(), REPLICAS); + } + } + } +} From b0b99017cb69e88815024b92edf8862843f82751 Mon Sep 17 00:00:00 2001 From: frankmu Date: Fri, 30 Aug 2024 13:48:15 -0700 Subject: [PATCH 5/6] Add test to ensure no partition movement when nodes restart (#2896) Add test to ensure no partition movement when nodes restart --- .../TestStickyRebalanceStrategy.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java index 4e4717f39f..97a27017e3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceStrategy.java @@ -218,6 +218,51 @@ public void testNoPartitionMovementWithInstanceDown() throws Exception { validateNoPartitionMoveWithDiffCount(idealStates, externalViewsBefore, externalViewsAfter, 2); } + @Test + public void testNoPartitionMovementWithInstanceRestart() throws Exception { + setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 1); + // Create resource + Map externalViewsBefore = createTestDBs(); + // Shut down half of the nodes + for (int i = 0; i < _participants.size(); i++) { + if (i % 2 == 0) { + _participants.get(i).syncStop(); + } + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + Map externalViewsAfter = new HashMap<>(); + Map idealStates = new HashMap<>(); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + externalViewsAfter.put(db, ev); + idealStates.put(db, is); + } + validateNoPartitionMoveWithDiffCount(idealStates, externalViewsBefore, externalViewsAfter, + NUM_NODE / 2); + + // Start all the nodes + for (int i = 0; i < _participants.size(); i++) { + if (!_participants.get(i).isConnected()) { + _participants.set(i, new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, + _participants.get(i).getInstanceName())); + _participants.get(i).syncStart(); + } + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev); + } + } + @Test public void testFirstTimeAssignmentWithStackingPlacement() throws Exception { setGlobalMaxPartitionAllowedPerInstanceInCluster(_gZkClient, CLUSTER_NAME, 2); From e433391418cb65c5bffa626d84be6b709671bda9 Mon Sep 17 00:00:00 2001 From: Tengfei Mu Date: Mon, 9 Sep 2024 14:46:37 -0700 Subject: [PATCH 6/6] Add null check for getClusterConfig() --- .../dataproviders/ResourceControllerDataProvider.java | 3 ++- .../helix/controller/stages/CurrentStateComputationStage.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java index 37811d9a29..057cee1556 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java @@ -188,7 +188,8 @@ public synchronized void refresh(HelixDataAccessor accessor) { refreshStablePartitionList(getIdealStates()); refreshDisabledInstancesForAllPartitionsSet(); - if (getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) { + if (getClusterConfig() != null + && getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) { buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance()); // Remove all cached IdealState because it is a global computation cannot partially be // performed for some resources. The computation is simple as well not taking too much resource diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 64d113f0a7..76b920874d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -136,7 +136,8 @@ public void process(ClusterEvent event) throws Exception { } // Populate the capacity for simple CapacityNode - if (cache.getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1 + if (cache.getClusterConfig() != null + && cache.getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1 && cache instanceof ResourceControllerDataProvider) { final ResourceControllerDataProvider dataProvider = (ResourceControllerDataProvider) cache; dataProvider.populateSimpleCapacitySetUsage(resourceToRebalance.keySet(),