Skip to content

Commit

Permalink
Create condition based rebalancer (#2846)
Browse files Browse the repository at this point in the history
Create condition based rebalancer
  • Loading branch information
frankmu authored Jul 25, 2024
1 parent 6850e09 commit 9cf76fd
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
// a map from customized state type to customized view cache
private final Map<String, CustomizedViewCache> _customizedViewCacheMap;

// maintain a cache of ideal state (preference list + best possible assignment) which will be managed ondemand in rebalancer
private final Map<String, ZNRecord> _ondemandIdealStateCache;

// maintain a cache of bestPossible assignment across pipeline runs
// TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache.
private Map<String, ResourceAssignment> _resourceAssignmentCache;
Expand Down Expand Up @@ -145,6 +148,7 @@ public String getObjName(ExternalView obj) {
_refreshedChangeTypes = ConcurrentHashMap.newKeySet();
_customizedStateCache = new CustomizedStateCache(this, _aggregationEnabledTypes);
_customizedViewCacheMap = new HashMap<>();
_ondemandIdealStateCache = new HashMap<>();
}

public synchronized void refresh(HelixDataAccessor accessor) {
Expand Down Expand Up @@ -383,6 +387,28 @@ public Map<String, Map<String, String>> getLastTopStateLocationMap() {
return _lastTopStateLocationMap;
}

/**
* Get cached ideal state (preference list + best possible assignment) for a resource
* @param resource
* @return
*/
public ZNRecord getCachedOndemandIdealState(String resource) {
return _ondemandIdealStateCache.get(resource);
}

/**
* Cache ideal state (preference list + best possible assignment) for a resource
* @param resource
* @return
*/
public void setCachedOndemandIdealState(String resource, ZNRecord idealState) {
_ondemandIdealStateCache.put(resource, idealState);
}

public void clearCachedOndemandIdealStates() {
_ondemandIdealStateCache.clear();
}

/**
* Get cached resourceAssignment (bestPossible mapping) for a resource
* @param resource
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package org.apache.helix.controller.rebalancer;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.helix.HelixException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.condition.RebalanceCondition;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The {@code ConditionBasedRebalancer} class extends the {@link AbstractRebalancer} and
* perform the rebalance operation based on specific list of conditions defined by the
* {@link RebalanceCondition} interface.
*/
public class ConditionBasedRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(ConditionBasedRebalancer.class);
private final List<RebalanceCondition> _rebalanceConditions;

public ConditionBasedRebalancer() {
this._rebalanceConditions = new ArrayList<>();
}

public ConditionBasedRebalancer(List<RebalanceCondition> rebalanceConditions) {
this._rebalanceConditions = rebalanceConditions;
}

/**
* Compute new Ideal State iff all conditions are met, otherwise just return from cached Ideal State
*
* @param resourceName the name of the resource for which to compute the new ideal state.
* @param currentIdealState the current {@link IdealState} of the resource.
* @param currentStateOutput the current state output, containing the actual states of the
* partitions.
* @param clusterData the {@link ResourceControllerDataProvider} instance providing
* additional data required for the computation.
* @return the newly computed {@link IdealState} for the resource.
*/
@Override
public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
CurrentStateOutput currentStateOutput, ResourceControllerDataProvider clusterData) {
if (!this._rebalanceConditions.stream()
.allMatch(condition -> condition.shouldPerformRebalance(clusterData))) {
ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName);
if (cachedIdealState != null) {
return new IdealState(cachedIdealState);
}
// In theory, the cache should be populated already if no rebalance is needed
LOG.warn(
"Cannot fetch the cached Ideal State for resource: {}, will recompute the Ideal State",
resourceName);
}

LOG.info("Computing IdealState for " + resourceName);

List<String> partitions = getStablePartitionList(clusterData, currentIdealState);
String stateModelName = currentIdealState.getStateModelDefRef();
StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
if (stateModelDef == null) {
LOG.error("State Model Definition null for resource: " + resourceName);
throw new HelixException("State Model Definition null for resource: " + resourceName);
}
Map<String, LiveInstance> assignableLiveInstance = clusterData.getAssignableLiveInstances();
int replicas = currentIdealState.getReplicaCount(assignableLiveInstance.size());

LinkedHashMap<String, Integer> stateCountMap =
stateModelDef.getStateCountMap(assignableLiveInstance.size(), replicas);
List<String> assignableLiveNodes = new ArrayList<>(assignableLiveInstance.keySet());
List<String> assignableNodes = new ArrayList<>(clusterData.getAssignableInstances());
assignableNodes.removeAll(clusterData.getDisabledInstances());
assignableLiveNodes.retainAll(assignableNodes);

Map<String, Map<String, String>> currentMapping =
currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);

// If there are nodes tagged with resource name, use only those nodes
Set<String> taggedNodes = new HashSet<String>();
Set<String> taggedLiveNodes = new HashSet<String>();
if (currentIdealState.getInstanceGroupTag() != null) {
for (String instanceName : assignableNodes) {
if (clusterData.getAssignableInstanceConfigMap().get(instanceName)
.containsTag(currentIdealState.getInstanceGroupTag())) {
taggedNodes.add(instanceName);
if (assignableLiveInstance.containsKey(instanceName)) {
taggedLiveNodes.add(instanceName);
}
}
}
if (!taggedLiveNodes.isEmpty()) {
// live nodes exist that have this tag
if (LOG.isInfoEnabled()) {
LOG.info(
"found the following participants with tag " + currentIdealState.getInstanceGroupTag()
+ " for " + resourceName + ": " + taggedLiveNodes);
}
} else if (taggedNodes.isEmpty()) {
// no live nodes and no configured nodes have this tag
LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag()
+ " but no configured participants have this tag");
} else {
// configured nodes have this tag, but no live nodes have this tag
LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag()
+ " but no live participants have this tag");
}
assignableNodes = new ArrayList<>(taggedNodes);
assignableLiveNodes = new ArrayList<>(taggedLiveNodes);
}

// sort node lists to ensure consistent preferred assignments
Collections.sort(assignableNodes);
Collections.sort(assignableLiveNodes);

int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
_rebalanceStrategy =
getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName,
stateCountMap, maxPartition);
ZNRecord newMapping =
_rebalanceStrategy.computePartitionAssignment(assignableNodes, assignableLiveNodes,
currentMapping, clusterData);

if (LOG.isDebugEnabled()) {
LOG.debug("currentMapping: {}", currentMapping);
LOG.debug("stateCountMap: {}", stateCountMap);
LOG.debug("assignableLiveNodes: {}", assignableLiveNodes);
LOG.debug("assignableNodes: {}", assignableNodes);
LOG.debug("maxPartition: {}", maxPartition);
LOG.debug("newMapping: {}", newMapping);
}

IdealState newIdealState = new IdealState(resourceName);
newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
newIdealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
newIdealState.getRecord().setListFields(newMapping.getListFields());

clusterData.setCachedOndemandIdealState(resourceName, newIdealState.getRecord());

return newIdealState;
}

/**
* Compute new assignment iff all conditions are met, otherwise just return from cached assignment
*
* @param cache the {@link ResourceControllerDataProvider} instance providing
* metadata and state information about the cluster.
* @param idealState the {@link IdealState} representing the current ideal state.
* @param resource the {@link Resource} for which to compute the best possible partition
* state.
* @param currentStateOutput the {@link CurrentStateOutput} containing the current states of the
* partitions.
* @return the {@link ResourceAssignment} representing the best possible state assignment for the
* partitions of the resource.
*/
@Override
public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDataProvider cache,
IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) {
ZNRecord cachedIdealState = cache.getCachedOndemandIdealState(resource.getResourceName());
if (!this._rebalanceConditions.stream()
.allMatch(condition -> condition.shouldPerformRebalance(cache))) {
if (cachedIdealState != null && cachedIdealState.getMapFields() != null) {
ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName());
for (Partition partition : resource.getPartitions()) {
partitionMapping.addReplicaMap(partition, cachedIdealState.getMapFields().get(partition));
}
return new ResourceAssignment(cachedIdealState);
}
// In theory, the cache should be populated already if no rebalance is needed
LOG.warn("Cannot fetch the cached assignment for resource: {}, will recompute the assignment",
resource.getResourceName());
}

LOG.info("Computing BestPossibleMapping for " + resource.getResourceName());

// TODO: Change the logic to apply different assignment strategy
ResourceAssignment assignment =
super.computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput);
// Cache the assignment so no need to recompute the result next time
cachedIdealState.setMapFields(assignment.getRecord().getMapFields());
cache.setCachedOndemandIdealState(resource.getResourceName(), cachedIdealState);

return assignment;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.helix.controller.rebalancer.condition;

import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;

public class ConfigChangeBasedCondition implements RebalanceCondition {
@Override
public boolean shouldPerformRebalance(ResourceControllerDataProvider cache) {
// TODO: implement the condition check for config change
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.apache.helix.controller.rebalancer.condition;

import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;

/**
* The {@code RebalanceCondition} interface defines a condition under which a rebalance operation
* should be performed. Implementations of this interface provide specific criteria to determine
* whether a rebalance is necessary based on the current state of the system.
*/
public interface RebalanceCondition {
/**
* Determines whether a rebalance should be performed based on the provided
* {@link ResourceControllerDataProvider} cache data.
*
* @param cache the {@code ResourceControllerDataProvider} cached data of the resources being managed.
* @return {@code true} if the rebalance should be performed, {@code false} otherwise.
*/
boolean shouldPerformRebalance(ResourceControllerDataProvider cache);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.apache.helix.controller.rebalancer.condition;

import java.util.ArrayList;
import java.util.List;

public class RebalanceConditionsBuilder {
private final List<RebalanceCondition> _rebalanceConditions = new ArrayList<>();

public RebalanceConditionsBuilder withConfigChangeBasedCondition() {
_rebalanceConditions.add(new ConfigChangeBasedCondition());
return this;
}

public RebalanceConditionsBuilder withTopologyChangeBasedCondition() {
_rebalanceConditions.add(new TopologyChangeBasedCondition());
return this;
}

public List<RebalanceCondition> build() {
return _rebalanceConditions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.helix.controller.rebalancer.condition;

import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;

public class TopologyChangeBasedCondition implements RebalanceCondition {
@Override
public boolean shouldPerformRebalance(ResourceControllerDataProvider cache) {
// TODO: implement the condition check for topology change
return false;
}
}

0 comments on commit 9cf76fd

Please sign in to comment.