Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.x] An allocation constraint mechanism, that de-prioritizes nodes from getting picked for allocation if they breach certain constraints #777

Merged
merged 4 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.cluster.routing.allocation;

import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;

/**
* Allocation constraints specify conditions which, if breached, reduce the
* priority of a node for receiving shard allocations.
*/
public class AllocationConstraints {
public final long CONSTRAINT_WEIGHT = 1000000L;
private List<Predicate<ConstraintParams>> constraintPredicates;

public AllocationConstraints() {
this.constraintPredicates = new ArrayList<>(1);
this.constraintPredicates.add(isIndexShardsPerNodeBreached());
}

class ConstraintParams {
private BalancedShardsAllocator.Balancer balancer;
private BalancedShardsAllocator.ModelNode node;
private String index;

ConstraintParams(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node,
String index) {
this.balancer = balancer;
this.node = node;
this.index = index;
}
}

/**
* Evaluates configured allocation constraint predicates for given node - index
* combination; and returns a weight value based on the number of breached
* constraints.
*
* Constraint weight should be added to the weight calculated via weight
* function, to reduce priority of allocating on nodes with breached
* constraints.
*
* This weight function is used only in case of unassigned shards to avoid overloading a newly added node.
* Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this function.
*/
public long weight(BalancedShardsAllocator.Balancer balancer, BalancedShardsAllocator.ModelNode node,
String index) {
int constraintsBreached = 0;
ConstraintParams params = new ConstraintParams(balancer, node, index);
for (Predicate<ConstraintParams> predicate : constraintPredicates) {
if (predicate.test(params)) {
constraintsBreached++;
}
}
return constraintsBreached * CONSTRAINT_WEIGHT;
}

/**
* Constraint to control number of shards of an index allocated on a single
* node.
*
* In current weight function implementation, when a node has significantly
* fewer shards than other nodes (e.g. during single new node addition or node
* replacement), its weight is much less than other nodes. All shard allocations
* at this time tend to land on the new node with skewed weight. This breaks
* index level balance in the cluster, by creating all shards of the same index
* on one node, often resulting in a hotspot on that node.
*
* This constraint is breached when balancer attempts to allocate more than
* average shards per index per node.
*/
private Predicate<ConstraintParams> isIndexShardsPerNodeBreached() {
return (params) -> {
int currIndexShardsOnNode = params.node.numShards(params.index);
int allowedIndexShardsPerNode = (int) Math.ceil(params.balancer.avgShardsPerNode(params.index));
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationConstraints;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
import org.opensearch.cluster.routing.allocation.MoveDecision;
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
Expand Down Expand Up @@ -192,7 +193,6 @@ public float getShardBalance() {
return weightFunction.shardBalance;
}


/**
* This class is the primary weight function used to create balanced over nodes and shards in the cluster.
* Currently this function has 3 properties:
Expand All @@ -216,13 +216,16 @@ public float getShardBalance() {
* </li>
* </ul>
* <code>weight(node, index) = weight<sub>index</sub>(node, index) + weight<sub>node</sub>(node, index)</code>
*
* package-private for testing
*/
private static class WeightFunction {
static class WeightFunction {

private final float indexBalance;
private final float shardBalance;
private final float theta0;
private final float theta1;
private AllocationConstraints constraints;

WeightFunction(float indexBalance, float shardBalance) {
float sum = indexBalance + shardBalance;
Expand All @@ -233,6 +236,12 @@ private static class WeightFunction {
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
this.constraints = new AllocationConstraints();
}

public float weightWithAllocationConstraints(Balancer balancer, ModelNode node, String index) {
float balancerWeight = weight(balancer, node, index);
return balancerWeight + constraints.weight(balancer, node, index);
}

float weight(Balancer balancer, ModelNode node, String index) {
Expand Down Expand Up @@ -411,7 +420,10 @@ private MoveDecision decideRebalance(final ShardRouting shard) {
boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false;
// calculate the delta of the weights of the two nodes if we were to add the shard to the
// node in question and move it away from the node that currently holds it.
boolean betterWeightWithShardAdded = nodeWeight + 1.0f < currentWeight;
// hence we add 2.0f to the weight delta
float proposedDelta = 2.0f + nodeWeight - currentWeight;
boolean betterWeightWithShardAdded = proposedDelta < currentDelta;

rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded;
// if the simulated weight delta with the shard moved away is better than the weight delta
// with the shard remaining on the current node, and we are allowed to allocate to the
Expand Down Expand Up @@ -964,7 +976,7 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
}

// weight of this index currently on the node
float currentWeight = weight.weight(this, node, shard.getIndexName());
float currentWeight = weight.weightWithAllocationConstraints(this, node, shard.getIndexName());
// moving the shard would not improve the balance, and we are not in explain mode, so short circuit
if (currentWeight > minWeight && explain == false) {
continue;
Expand Down Expand Up @@ -1002,7 +1014,7 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
updateMinNode = currentDecision.type() == Type.YES;
}
} else {
updateMinNode = true;
updateMinNode = currentWeight < minWeight;
}
if (updateMinNode) {
minNode = node;
Expand Down Expand Up @@ -1086,7 +1098,7 @@ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String id

}

static class ModelNode implements Iterable<ModelIndex> {
public static class ModelNode implements Iterable<ModelIndex> {
private final Map<String, ModelIndex> indices = new HashMap<>();
private int numShards = 0;
private final RoutingNode routingNode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;

import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class AllocationConstraintsTests extends OpenSearchAllocationTestCase {

public void testSettings() {
Settings.Builder settings = Settings.builder();
ClusterSettings service = new ClusterSettings(Settings.builder().build(),
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BalancedShardsAllocator allocator = new BalancedShardsAllocator(settings.build(), service);

settings = Settings.builder();
float indexBalanceFactor = randomFloat();
float shardBalance = randomFloat();
float threshold = randomFloat();
settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalanceFactor);
settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance);
settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), threshold);

service.applySettings(settings.build());

assertEquals(indexBalanceFactor, allocator.getIndexBalance(), 0.01);
assertEquals(shardBalance, allocator.getShardBalance(), 0.01);
assertEquals(threshold, allocator.getThreshold(), 0.01);

}

/**
* Test constraint evaluation logic when with different values of ConstraintMode
* for IndexShardPerNode constraint satisfied and breached.
*/
public void testIndexShardsPerNodeConstraint() {
BalancedShardsAllocator.Balancer balancer = mock(BalancedShardsAllocator.Balancer.class);
BalancedShardsAllocator.ModelNode node = mock(BalancedShardsAllocator.ModelNode.class);
AllocationConstraints constraints = new AllocationConstraints();

int shardCount = randomIntBetween(1, 500);
float avgShardsPerNode = 1.0f + (random().nextFloat()) * 999.0f;

when(balancer.avgShardsPerNode(anyString())).thenReturn(avgShardsPerNode);
when(node.numShards(anyString())).thenReturn(shardCount);
when(node.getNodeId()).thenReturn("test-node");

long expectedWeight = (shardCount >= avgShardsPerNode) ? constraints.CONSTRAINT_WEIGHT : 0;
assertEquals(expectedWeight, constraints.weight(balancer, node, "index"));

}

}
Loading