Skip to content

Commit

Permalink
Fix Empty IdealState Calculation with Disabled Nodes in AutoRebalance… (
Browse files Browse the repository at this point in the history
#2877)

Fix IdealState Calculation in AutoRebalanceStrategy for Disabled Nodes
  • Loading branch information
MarkGaox authored Sep 5, 2024
1 parent 7b238a9 commit d6e5315
Show file tree
Hide file tree
Showing 6 changed files with 490 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.TreeSet;

import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.slf4j.Logger;
Expand Down Expand Up @@ -82,7 +84,6 @@ public void init(String resourceName, final List<String> partitions,
@Override
public ZNRecord computePartitionAssignment(final List<String> allNodes, final List<String> liveNodes,
final Map<String, Map<String, String>> currentMapping, ResourceControllerDataProvider clusterData) {
int numReplicas = countStateReplicas();
ZNRecord znRecord = new ZNRecord(_resourceName);
if (liveNodes.size() == 0) {
return znRecord;
Expand All @@ -97,9 +98,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li
List<String> sortedLiveNodes = new ArrayList<String>(liveNodes);
Collections.sort(sortedLiveNodes, currentStateNodeComparator);

int distRemainder = (numReplicas * _partitions.size()) % sortedLiveNodes.size();
int distFloor = (numReplicas * _partitions.size()) / sortedLiveNodes.size();
_nodeMap = new HashMap<String, Node>();
_nodeMap = new HashMap<>();
_liveNodesList = new ArrayList<Node>();

for (String id : sortedAllNodes) {
Expand All @@ -108,6 +107,10 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li
node.hasCeilingCapacity = false;
_nodeMap.put(id, node);
}

int numReplicas = calculateExpectedReplicaCount(clusterData);
int distRemainder = (numReplicas * _partitions.size()) % sortedLiveNodes.size();
int distFloor = (numReplicas * _partitions.size()) / sortedLiveNodes.size();
for (int i = 0; i < sortedLiveNodes.size(); i++) {
boolean usingCeiling = false;
int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor;
Expand All @@ -116,7 +119,8 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li
distRemainder = distRemainder - 1;
usingCeiling = true;
}
Node node = _nodeMap.get(sortedLiveNodes.get(i));
String nodeName = sortedLiveNodes.get(i);
Node node = _nodeMap.get(nodeName);
node.isAlive = true;
node.capacity = targetSize;
node.hasCeilingCapacity = usingCeiling;
Expand All @@ -127,15 +131,16 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li
_stateMap = generateStateMap();

// compute the preferred mapping if all nodes were up
_preferredAssignment = computePreferredPlacement(sortedAllNodes);
_preferredAssignment = computePreferredPlacement(sortedAllNodes, clusterData);

// logger.info("preferred mapping:"+ preferredAssignment);
// from current mapping derive the ones in preferred location
// this will update the nodes with their current fill status
_existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);

// from current mapping derive the ones not in preferred location
_existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping);
_existingNonPreferredAssignment =
computeExistingNonPreferredPlacement(currentMapping, clusterData);

// compute orphaned replicas that are not assigned to any node
_orphaned = computeOrphaned();
Expand All @@ -152,7 +157,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li
forceToAssignOrphans();
}

prepareResult(znRecord);
prepareResult(znRecord, clusterData);
return znRecord;
}

Expand Down Expand Up @@ -301,7 +306,7 @@ private void moveExcessReplicas() {
* Update a ZNRecord with the results of the rebalancing.
* @param znRecord
*/
private void prepareResult(ZNRecord znRecord) {
private void prepareResult(ZNRecord znRecord, ResourceControllerDataProvider clusterData) {
// The map fields are keyed on partition name to a pair of node and state, i.e. it
// indicates that the partition with given state is served by that node
//
Expand Down Expand Up @@ -336,7 +341,10 @@ private void prepareResult(ZNRecord znRecord) {
}
}
}
normalizePreferenceLists(znRecord.getListFields(), newPreferences);
normalizePreferenceLists(znRecord.getListFields(), newPreferences, clusterData);

String stateModelDef = clusterData.getIdealState(_resourceName).getStateModelDefRef();
StateModelDefinition stateModel = clusterData.getStateModelDef(stateModelDef);

// generate preference maps based on the preference lists
for (String partition : _partitions) {
Expand All @@ -359,6 +367,9 @@ private void forceToAssignOrphans() {
&& receiver.currentlyAssigned < _maximumPerNode && receiver
.canAddIfCapacity(replica)) {
nodeToAssign = receiver;
// Should update the minOverloadedCapacity to find the node with minimum overloaded capacity
minOverloadedCapacity =
Math.min(receiver.currentlyAssigned - receiver.capacity, minOverloadedCapacity);
}
}

Expand All @@ -380,15 +391,15 @@ private void forceToAssignOrphans() {
* assignment
*/
private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
Map<String, List<String>> newPreferences) {
Map<String, List<String>> newPreferences, ResourceControllerDataProvider clusterData) {

Map<String, Map<String, Integer>> nodeReplicaCounts =
new HashMap<String, Map<String, Integer>>();
for (String partition : preferenceLists.keySet()) {
normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts);
normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts, clusterData);
}
for (String partition : newPreferences.keySet()) {
normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts);
normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts, clusterData);
preferenceLists.get(partition).addAll(newPreferences.get(partition));
}
}
Expand All @@ -399,9 +410,13 @@ private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
* @param nodeReplicaCounts map of (node --> state --> count)
*/
private void normalizePreferenceList(List<String> preferenceList,
Map<String, Map<String, Integer>> nodeReplicaCounts) {
Map<String, Map<String, Integer>> nodeReplicaCounts,
ResourceControllerDataProvider clusterData) {
List<String> newPreferenceList = new ArrayList<String>();
int replicas = Math.min(countStateReplicas(), preferenceList.size());
// Use the expected replica count instead of relying on the _states map.
// This prevents the preference list from being truncated when ANY_LIVEINSTANCE
// is used as the replication factor.
int replicas = Math.min(calculateExpectedReplicaCount(clusterData), preferenceList.size());

// make this a LinkedHashSet to preserve iteration order
Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
Expand Down Expand Up @@ -463,14 +478,14 @@ private int getReplicaCountForNode(String state, String node,

/**
* Compute the subset of the current mapping where replicas are not mapped according to their
* preferred assignment.
* existing preferred assignment.
* @param currentMapping Current mapping of replicas to nodes
* @return The current assignments that do not conform to the preferred assignment
*/
private Map<Replica, Node> computeExistingNonPreferredPlacement(
Map<String, Map<String, String>> currentMapping) {
Map<String, Map<String, String>> currentMapping, ResourceControllerDataProvider clusterData) {
Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
int count = countStateReplicas();
int count = calculateExpectedReplicaCount(clusterData);
for (String partition : currentMapping.keySet()) {
Map<String, String> nodeStateMap = currentMapping.get(partition);
nodeStateMap.keySet().retainAll(_nodeMap.keySet());
Expand All @@ -496,12 +511,11 @@ private Map<Replica, Node> computeExistingNonPreferredPlacement(
throw new IllegalArgumentException("partition: " + replica + " is in currentMapping but not in partitions");
}

if (_preferredAssignment.get(replica).id != node.id
if (!_preferredAssignment.get(replica).id.equals(node.id)
&& !_existingPreferredAssignment.containsKey(replica)
&& !existingNonPreferredAssignment.containsKey(replica)) {
existingNonPreferredAssignment.put(replica, node);
node.nonPreferred.add(replica);

break;
}
}
Expand Down Expand Up @@ -548,7 +562,7 @@ private Set<Replica> computeOrphaned() {
private Map<Replica, Node> computeExistingPreferredPlacement(
final Map<String, Map<String, String>> currentMapping) {
Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
int count = countStateReplicas();
int count = calculateStatesReplicaCount();
for (String partition : currentMapping.keySet()) {
Map<String, String> nodeStateMap = currentMapping.get(partition);
nodeStateMap.keySet().retainAll(_nodeMap.keySet());
Expand All @@ -560,7 +574,7 @@ private Map<Replica, Node> computeExistingPreferredPlacement(
Replica replica = new Replica(partition, replicaId);
if (_preferredAssignment.containsKey(replica)
&& !existingPreferredAssignment.containsKey(replica)
&& _preferredAssignment.get(replica).id == node.id) {
&& _preferredAssignment.get(replica).id.equals(node.id)) {
existingPreferredAssignment.put(replica, node);
node.preferred.add(replica);
break;
Expand All @@ -576,16 +590,18 @@ private Map<Replica, Node> computeExistingPreferredPlacement(
* Given a predefined set of all possible nodes, compute an assignment of replicas to
* nodes that evenly assigns all replicas to nodes.
* @param allNodes Identifiers to all nodes, live and non-live
* @param clusterData
* @return Preferred assignment of replicas
*/
private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) {
private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes,
ResourceControllerDataProvider clusterData) {
Map<Replica, Node> preferredMapping;
preferredMapping = new HashMap<Replica, Node>();
int partitionId = 0;
int numReplicas = countStateReplicas();
int count = countStateReplicas();
// Count the total number of replicas that should be assigned assuming all nodes are up
int numReplicas = calculateExpectedReplicaCount(clusterData);
for (String partition : _partitions) {
for (int replicaId = 0; replicaId < count; replicaId++) {
for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
Replica replica = new Replica(partition, replicaId);
String nodeName =
_placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
Expand All @@ -598,17 +614,40 @@ private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes
}

/**
* Counts the total number of replicas given a state-count mapping
* Calculates the total number of replicas based on the state-count mapping
* which only includes the states of live instances.
* @return
*/
private int countStateReplicas() {
private int calculateStatesReplicaCount() {
int total = 0;
for (Integer count : _states.values()) {
total += count;
}
return total;
}

/**
* Calculates the expected total number of replicas assuming full cluster availability.
* @param clusterData the cache that stores all cluster data
* @return The total number of replicas that should be assigned
*/
private int calculateExpectedReplicaCount(ResourceControllerDataProvider clusterData) {
IdealState currentIdealState = clusterData.getIdealState(_resourceName);
// Recompute the total number of replicas because for resources with ANY_LIVEINSTANCE,
// the replica count should match the total number of instances in the cluster.
// The _states map cannot be used for this calculation, as it only accounts for live instances.
int totalReplicaCount = currentIdealState.getReplicaCount(_nodeMap.keySet().size());
StateModelDefinition stateModelDef =
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef());
LinkedHashMap<String, Integer> stateToCountMap =
stateModelDef.getStateCountMap(_nodeMap.keySet().size(), totalReplicaCount);
int total = 0;
for (Integer count : stateToCountMap.values()) {
total += count;
}
return total;
}

/**
* Compute a map of replica ids to state names
* @return Map: replica id -> state name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public ZNRecord computePartitionAssignment(final List<String> liveNodes,
// compute the preferred mapping if all nodes were up
_preferredAssignment = computePreferredPlacement(allNodes);

// logger.info("preferred mapping:"+ preferredAssignment);
// from current mapping derive the ones in preferred location
// this will update the nodes with their current fill status
_existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);
Expand Down
29 changes: 29 additions & 0 deletions helix-core/src/test/java/org/apache/helix/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

import org.apache.commons.io.FileUtils;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.integration.manager.ZkTestManager;
Expand All @@ -48,8 +50,10 @@
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
Expand All @@ -73,6 +77,9 @@
import org.slf4j.LoggerFactory;
import org.testng.Assert;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestHelper {
private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
Expand Down Expand Up @@ -864,4 +871,26 @@ public static void printZkListeners(HelixZkClient client) throws Exception {
}
System.out.println("}");
}

public static ResourceControllerDataProvider buildMockDataCache(String resourceName,
String numOfReplicas, String stateModelDef, StateModelDefinition stateModel,
Set<String> disabledInstances) {
ClusterConfig config = new ClusterConfig("cluster");
config.setRebalanceDelayTime(0);
IdealState idealState = new IdealState(resourceName);
idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
idealState.setReplicas(numOfReplicas);
idealState.setStateModelDefRef(stateModelDef);
idealState.setRebalanceStrategy(
"org.apache.helix.controller.rebalancer.strategy." + "AutoRebalanceStrategy");
ResourceControllerDataProvider dataCache = mock(ResourceControllerDataProvider.class);
when(dataCache.getStateModelDef(stateModelDef)).thenReturn(stateModel);
when(dataCache.getIdealState(resourceName)).thenReturn(idealState);
when(dataCache.getDisabledInstances()).thenReturn(disabledInstances);
when(dataCache.getClusterConfig()).thenReturn(config);
when(dataCache.getAbnormalStateResolver(any()))
.thenReturn(MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
when(dataCache.getDisabledInstancesForPartition(any(), any())).thenReturn(disabledInstances);
return dataCache;
}
}
Loading

0 comments on commit d6e5315

Please sign in to comment.