Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge helix-stickiness-rebalancer into helix-gateway-service branch #2907

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,25 @@ public boolean canAdd(String resource, String partition) {
&& _partitionMap.get(resource).contains(partition))) {
return false;
}

// Add the partition to the resource's set of partitions in this node
_partitionMap.computeIfAbsent(resource, k -> new HashSet<>()).add(partition);
_currentlyAssigned++;
return true;
}

/**
* Checks if a specific resource + partition is assigned to this node.
*
* @param resource the name of the resource
* @param partition the partition
* @return {@code true} if the resource + partition is assigned to this node, {@code false} otherwise
*/
public boolean hasPartition(String resource, String partition) {
Set<String> partitions = _partitionMap.get(resource);
return partitions != null && partitions.contains(partition);
}

/**
* Set the capacity of this node
* @param capacity The capacity to set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.CapacityNode;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
import org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.MissingTopStateRecord;
import org.apache.helix.model.CustomizedState;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
Expand All @@ -73,6 +75,9 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider {
// a map from customized state type to customized view cache
private final Map<String, CustomizedViewCache> _customizedViewCacheMap;

// maintain a cache of ideal state (preference list + best possible assignment) which will be managed ondemand in rebalancer
private final Map<String, ZNRecord> _ondemandIdealStateCache;

// maintain a cache of bestPossible assignment across pipeline runs
// TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache.
private Map<String, ResourceAssignment> _resourceAssignmentCache;
Expand Down Expand Up @@ -149,6 +154,7 @@ public String getObjName(ExternalView obj) {
_refreshedChangeTypes = ConcurrentHashMap.newKeySet();
_customizedStateCache = new CustomizedStateCache(this, _aggregationEnabledTypes);
_customizedViewCacheMap = new HashMap<>();
_ondemandIdealStateCache = new HashMap<>();
}

public synchronized void refresh(HelixDataAccessor accessor) {
Expand Down Expand Up @@ -182,16 +188,17 @@ public synchronized void refresh(HelixDataAccessor accessor) {
refreshStablePartitionList(getIdealStates());
refreshDisabledInstancesForAllPartitionsSet();

if (getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) {
if (getClusterConfig() != null
&& getClusterConfig().getGlobalMaxPartitionAllowedPerInstance() != -1) {
buildSimpleCapacityMap(getClusterConfig().getGlobalMaxPartitionAllowedPerInstance());
// Remove all cached IdealState because it is a global computation cannot partially be
// performed for some resources. The computation is simple as well not taking too much resource
// to recompute the assignments.
Set<String> cachedGreedyIdealStates = _idealMappingCache.values().stream().filter(
Set<String> cachedStickyIdealStates = _idealMappingCache.values().stream().filter(
record -> record.getSimpleField(IdealState.IdealStateProperty.REBALANCE_STRATEGY.name())
.equals(GreedyRebalanceStrategy.class.getName())).map(ZNRecord::getId)
.equals(StickyRebalanceStrategy.class.getName())).map(ZNRecord::getId)
.collect(Collectors.toSet());
_idealMappingCache.keySet().removeAll(cachedGreedyIdealStates);
_idealMappingCache.keySet().removeAll(cachedStickyIdealStates);
}

LogUtil.logInfo(logger, getClusterEventId(), String.format(
Expand Down Expand Up @@ -388,6 +395,28 @@ public Map<String, Map<String, String>> getLastTopStateLocationMap() {
return _lastTopStateLocationMap;
}

/**
* Get cached ideal state (preference list + best possible assignment) for a resource
* @param resource
* @return
*/
public ZNRecord getCachedOndemandIdealState(String resource) {
return _ondemandIdealStateCache.get(resource);
}

/**
* Cache ideal state (preference list + best possible assignment) for a resource
* @param resource
* @return
*/
public void setCachedOndemandIdealState(String resource, ZNRecord idealState) {
_ondemandIdealStateCache.put(resource, idealState);
}

public void clearCachedOndemandIdealStates() {
_ondemandIdealStateCache.clear();
}

/**
* Get cached resourceAssignment (bestPossible mapping) for a resource
* @param resource
Expand Down Expand Up @@ -557,6 +586,35 @@ public Set<CapacityNode> getSimpleCapacitySet() {
return _simpleCapacitySet;
}

public void populateSimpleCapacitySetUsage(final Set<String> resourceNameSet,
final CurrentStateOutput currentStateOutput) {
// Convert the assignableNodes to map for quick lookup
Map<String, CapacityNode> simpleCapacityMap = new HashMap<>();
for (CapacityNode node : _simpleCapacitySet) {
simpleCapacityMap.put(node.getId(), node);
}
for (String resourceName : resourceNameSet) {
// Process current state mapping
populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap,
currentStateOutput.getCurrentStateMap(resourceName));
// Process pending state mapping
populateCapacityNodeUsageFromStateMap(resourceName, simpleCapacityMap,
currentStateOutput.getPendingMessageMap(resourceName));
}
}

private <T> void populateCapacityNodeUsageFromStateMap(String resourceName,
Map<String, CapacityNode> simpleCapacityMap, Map<Partition, Map<String, T>> stateMap) {
for (Map.Entry<Partition, Map<String, T>> entry : stateMap.entrySet()) {
for (String instanceName : entry.getValue().keySet()) {
CapacityNode node = simpleCapacityMap.get(instanceName);
if (node != null) {
node.canAdd(resourceName, entry.getKey().getPartitionName());
}
}
}
}

private void refreshDisabledInstancesForAllPartitionsSet() {
_disabledInstancesForAllPartitionsSet.clear();
Collection<InstanceConfig> allConfigs = getInstanceConfigMap().values();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package org.apache.helix.controller.rebalancer;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.helix.HelixException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.condition.RebalanceCondition;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The {@code ConditionBasedRebalancer} class extends the {@link AbstractRebalancer} and
* perform the rebalance operation based on specific list of conditions defined by the
* {@link RebalanceCondition} interface.
*/
public class ConditionBasedRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(ConditionBasedRebalancer.class);
private final List<RebalanceCondition> _rebalanceConditions;

public ConditionBasedRebalancer() {
this._rebalanceConditions = new ArrayList<>();
}

public ConditionBasedRebalancer(List<RebalanceCondition> rebalanceConditions) {
this._rebalanceConditions = rebalanceConditions;
}

/**
* Compute new Ideal State iff all conditions are met, otherwise just return from cached Ideal State
*
* @param resourceName the name of the resource for which to compute the new ideal state.
* @param currentIdealState the current {@link IdealState} of the resource.
* @param currentStateOutput the current state output, containing the actual states of the
* partitions.
* @param clusterData the {@link ResourceControllerDataProvider} instance providing
* additional data required for the computation.
* @return the newly computed {@link IdealState} for the resource.
*/
@Override
public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState,
CurrentStateOutput currentStateOutput, ResourceControllerDataProvider clusterData) {
ZNRecord cachedIdealState = clusterData.getCachedOndemandIdealState(resourceName);
// If previous placement list exists in cache && all condition met -> return cached value
if (cachedIdealState != null && cachedIdealState.getListFields() != null
&& !cachedIdealState.getListFields().isEmpty() && !this._rebalanceConditions.stream()
.allMatch(condition -> condition.shouldPerformRebalance(clusterData))) {
return new IdealState(cachedIdealState);
}

LOG.info("Computing IdealState for " + resourceName);

List<String> partitions = getStablePartitionList(clusterData, currentIdealState);
String stateModelName = currentIdealState.getStateModelDefRef();
StateModelDefinition stateModelDef = clusterData.getStateModelDef(stateModelName);
if (stateModelDef == null) {
LOG.error("State Model Definition null for resource: " + resourceName);
throw new HelixException("State Model Definition null for resource: " + resourceName);
}
Map<String, LiveInstance> assignableLiveInstance = clusterData.getAssignableLiveInstances();
int replicas = currentIdealState.getReplicaCount(assignableLiveInstance.size());

LinkedHashMap<String, Integer> stateCountMap =
stateModelDef.getStateCountMap(assignableLiveInstance.size(), replicas);
Set<String> assignableLiveNodes = new HashSet<>(assignableLiveInstance.keySet());
Set<String> assignableNodes = new HashSet<>(clusterData.getAssignableInstances());
assignableNodes.removeAll(clusterData.getDisabledInstances());
assignableLiveNodes.retainAll(assignableNodes);

Map<String, Map<String, String>> currentMapping =
currentMapping(currentStateOutput, resourceName, partitions, stateCountMap);

// If there are nodes tagged with resource name, use only those nodes
Set<String> taggedNodes = new HashSet<String>();
Set<String> taggedLiveNodes = new HashSet<String>();
if (currentIdealState.getInstanceGroupTag() != null) {
for (String instanceName : assignableNodes) {
if (clusterData.getAssignableInstanceConfigMap().get(instanceName)
.containsTag(currentIdealState.getInstanceGroupTag())) {
taggedNodes.add(instanceName);
if (assignableLiveInstance.containsKey(instanceName)) {
taggedLiveNodes.add(instanceName);
}
}
}
if (!taggedLiveNodes.isEmpty()) {
// live nodes exist that have this tag
if (LOG.isInfoEnabled()) {
LOG.info(
"found the following participants with tag " + currentIdealState.getInstanceGroupTag()
+ " for " + resourceName + ": " + taggedLiveNodes);
}
} else if (taggedNodes.isEmpty()) {
// no live nodes and no configured nodes have this tag
LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag()
+ " but no configured participants have this tag");
} else {
// configured nodes have this tag, but no live nodes have this tag
LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag()
+ " but no live participants have this tag");
}
assignableNodes = new HashSet<>(taggedNodes);
assignableLiveNodes = new HashSet<>(taggedLiveNodes);
}

// sort node lists to ensure consistent preferred assignments
List<String> assignableNodesList =
assignableNodes.stream().sorted().collect(Collectors.toList());
List<String> assignableLiveNodesList =
assignableLiveNodes.stream().sorted().collect(Collectors.toList());

int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
_rebalanceStrategy =
getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName,
stateCountMap, maxPartition);
ZNRecord newMapping =
_rebalanceStrategy.computePartitionAssignment(assignableNodesList, assignableLiveNodesList,
currentMapping, clusterData);

if (LOG.isDebugEnabled()) {
LOG.debug("currentMapping: {}", currentMapping);
LOG.debug("stateCountMap: {}", stateCountMap);
LOG.debug("assignableLiveNodes: {}", assignableLiveNodes);
LOG.debug("assignableNodes: {}", assignableNodes);
LOG.debug("maxPartition: {}", maxPartition);
LOG.debug("newMapping: {}", newMapping);
}

IdealState newIdealState = new IdealState(resourceName);
newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
newIdealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
newIdealState.getRecord().setListFields(newMapping.getListFields());

clusterData.setCachedOndemandIdealState(resourceName, newIdealState.getRecord());

return newIdealState;
}

/**
* Compute new assignment iff all conditions are met, otherwise just return from cached assignment
*
* @param cache the {@link ResourceControllerDataProvider} instance providing
* metadata and state information about the cluster.
* @param idealState the {@link IdealState} representing the current ideal state.
* @param resource the {@link Resource} for which to compute the best possible partition
* state.
* @param currentStateOutput the {@link CurrentStateOutput} containing the current states of the
* partitions.
* @return the {@link ResourceAssignment} representing the best possible state assignment for the
* partitions of the resource.
*/
@Override
public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDataProvider cache,
IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) {
ZNRecord cachedIdealState = cache.getCachedOndemandIdealState(resource.getResourceName());
// If previous assignment map exists in cache && all condition met -> return cached value
if (cachedIdealState.getMapFields() != null && !cachedIdealState.getMapFields().isEmpty()
&& !this._rebalanceConditions.stream()
.allMatch(condition -> condition.shouldPerformRebalance(cache))) {
ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName());
for (Partition partition : resource.getPartitions()) {
partitionMapping.addReplicaMap(partition,
cachedIdealState.getMapFields().get(partition.getPartitionName()));
}
return partitionMapping;
}

LOG.info("Computing BestPossibleMapping for " + resource.getResourceName());

// TODO: Change the logic to apply different assignment strategy
ResourceAssignment assignment =
super.computeBestPossiblePartitionState(cache, idealState, resource, currentStateOutput);
// Cache the assignment so no need to recompute the result next time
cachedIdealState.setMapFields(assignment.getRecord().getMapFields());
cache.setCachedOndemandIdealState(resource.getResourceName(), cachedIdealState);

if (LOG.isDebugEnabled()) {
LOG.debug("Processed resource: {}", resource.getResourceName());
LOG.debug("Final Mapping of resource : {}", assignment);
}
return assignment;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.apache.helix.controller.rebalancer.condition;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;

public class ConfigChangeBasedCondition implements RebalanceCondition {
@Override
public boolean shouldPerformRebalance(ResourceControllerDataProvider cache) {
// TODO: implement the condition check for config change
return false;
}
}
Loading
Loading