From 021f58588dd32aeaf4645a434fbfc7b7b765ad68 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Thu, 24 Oct 2024 17:30:05 +0530 Subject: [PATCH] Add Setting to adjust the primary constraint weights Signed-off-by: Arpit Bandejiya --- CHANGELOG.md | 1 + .../allocation/AllocationConstraints.java | 4 +-- .../routing/allocation/Constraint.java | 13 ++++--- .../routing/allocation/ConstraintTypes.java | 10 ++++++ .../allocation/RebalanceConstraints.java | 4 +-- .../allocator/BalancedShardsAllocator.java | 35 ++++++++++++++++--- .../common/settings/ClusterSettings.java | 1 + .../AllocationConstraintsTests.java | 28 ++++++++------- 8 files changed, 71 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5dc69a9ec290..d6e76d946866b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938)) - [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289)) - Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111)) +- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index 6702db4b43e91..4b71347e4071b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -39,8 +39,8 @@ public void updateAllocationConstraint(String constraint, boolean enable) { this.constraints.get(constraint).setEnable(enable); } - public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { - Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index); + public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryThresholdWeight) { + Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryThresholdWeight); return params.weight(constraints); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java index e9c3c0afcbe88..ce0bb70d7d0b7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java @@ -14,7 +14,7 @@ import java.util.Map; import java.util.function.Predicate; -import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CONSTRAINT_WEIGHT; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.predicateKeyToWeightMap; /** * Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or @@ -44,11 +44,13 @@ static class ConstraintParams { private ShardsBalancer balancer; private BalancedShardsAllocator.ModelNode node; private String index; + private long PrimaryConstraintThreshold; - ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { + ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) { this.balancer = balancer; this.node = node; this.index = index; + this.PrimaryConstraintThreshold = primaryConstraintThreshold; } public ShardsBalancer getBalancer() { @@ -75,9 +77,12 @@ public String getIndex() { */ public long weight(Map constraints) { long totalConstraintWeight = 0; - for (Constraint constraint : constraints.values()) { + for (Map.Entry entry : constraints.entrySet()) { + String key = entry.getKey(); + Constraint constraint = entry.getValue(); if (constraint.test(this)) { - totalConstraintWeight += CONSTRAINT_WEIGHT; + double weight = predicateKeyToWeightMap(key, PrimaryConstraintThreshold); + totalConstraintWeight += weight; } } return totalConstraintWeight; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index 28ad199218884..ff40556a4894b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -86,4 +86,14 @@ public static Predicate isPrimaryShardsPerNodeBreac return primaryShardCount >= allowedPrimaryShardCount; }; } + + public static long predicateKeyToWeightMap(String key, long primaryConstraintWeight) { + switch (key) { + case CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID: + case CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID: + return primaryConstraintWeight; + default: + return CONSTRAINT_WEIGHT; + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java index 2c2138af18abc..803ef04ce1675 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java @@ -42,8 +42,8 @@ public void updateRebalanceConstraint(String constraint, boolean enable) { this.constraints.get(constraint).setEnable(enable); } - public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { - Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index); + public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) { + Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryConstraintThreshold); return params.weight(constraints); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 785636fa7ff2a..cfbb4d34c3a38 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -139,6 +139,14 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting PRIMARY_CONSTRAINT_THRESHOLD_SETTING = Setting.longSetting( + "cluster.routing.allocation.primary_constraint.threshold", + 10, + 0, + Property.Dynamic, + Property.NodeScope + ); + /** * This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached()} * and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation @@ -201,6 +209,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile float shardBalanceFactor; private volatile WeightFunction weightFunction; private volatile float threshold; + private volatile long primaryConstraintThreshold; private volatile boolean ignoreThrottleInRestore; private volatile TimeValue allocatorTimeout; @@ -219,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setIgnoreThrottleInRestore(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.get(settings)); updateWeightFunction(); setThreshold(THRESHOLD_SETTING.get(settings)); + setPrimaryConstraintThresholdSetting(PRIMARY_CONSTRAINT_THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); @@ -231,6 +241,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); + clusterSettings.addSettingsUpdateConsumer(PRIMARY_CONSTRAINT_THRESHOLD_SETTING, this::setPrimaryConstraintThresholdSetting); clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore); clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout); } @@ -294,7 +305,12 @@ private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalan } private void updateWeightFunction() { - weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer); + weightFunction = new WeightFunction( + this.indexBalanceFactor, + this.shardBalanceFactor, + this.preferPrimaryShardRebalanceBuffer, + this.primaryConstraintThreshold + ); } /** @@ -317,6 +333,11 @@ private void setThreshold(float threshold) { this.threshold = threshold; } + private void setPrimaryConstraintThresholdSetting(long threshold) { + this.primaryConstraintThreshold = threshold; + this.weightFunction.updatePrimaryConstraintThreshold(threshold); + } + private void setAllocatorTimeout(TimeValue allocatorTimeout) { this.allocatorTimeout = allocatorTimeout; } @@ -489,10 +510,11 @@ static class WeightFunction { private final float shardBalance; private final float theta0; private final float theta1; + private long primaryConstraintThreshold; private AllocationConstraints constraints; private RebalanceConstraints rebalanceConstraints; - WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) { + WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer, long primaryConstraintThreshold) { float sum = indexBalance + shardBalance; if (sum <= 0.0f) { throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); @@ -501,6 +523,7 @@ static class WeightFunction { theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; + this.primaryConstraintThreshold = primaryConstraintThreshold; RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); this.constraints = new AllocationConstraints(); this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter); @@ -510,12 +533,12 @@ static class WeightFunction { public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) { float balancerWeight = weight(balancer, node, index); - return balancerWeight + constraints.weight(balancer, node, index); + return balancerWeight + constraints.weight(balancer, node, index, primaryConstraintThreshold); } public float weightWithRebalanceConstraints(ShardsBalancer balancer, ModelNode node, String index) { float balancerWeight = weight(balancer, node, index); - return balancerWeight + rebalanceConstraints.weight(balancer, node, index); + return balancerWeight + rebalanceConstraints.weight(balancer, node, index, primaryConstraintThreshold); } float weight(ShardsBalancer balancer, ModelNode node, String index) { @@ -531,6 +554,10 @@ void updateAllocationConstraint(String constraint, boolean enable) { void updateRebalanceConstraint(String constraint, boolean add) { this.rebalanceConstraints.updateRebalanceConstraint(constraint, add); } + + void updatePrimaryConstraintThreshold(long primaryConstraintThreshold) { + this.primaryConstraintThreshold = primaryConstraintThreshold; + } } /** diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index f769f8729c25b..c1f4e52706465 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -276,6 +276,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.THRESHOLD_SETTING, BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE, BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING, + BalancedShardsAllocator.PRIMARY_CONSTRAINT_THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, BreakerSettings.CIRCUIT_BREAKER_TYPE, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java index 4c9fcd1650664..23d0b708441f8 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java @@ -25,6 +25,8 @@ public class AllocationConstraintsTests extends OpenSearchAllocationTestCase { + long constraintWeight = 20L; + public void testSettings() { Settings.Builder settings = Settings.builder(); ClusterSettings service = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -69,7 +71,7 @@ public void testIndexShardsPerNodeConstraint() { when(node.getNodeId()).thenReturn("test-node"); long expectedWeight = (shardCount >= avgShardsPerNode) ? CONSTRAINT_WEIGHT : 0; - assertEquals(expectedWeight, constraints.weight(balancer, node, "index")); + assertEquals(expectedWeight, constraints.weight(balancer, node, "index", constraintWeight)); } @@ -91,14 +93,14 @@ public void testPerIndexPrimaryShardsConstraint() { when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); when(node.getNodeId()).thenReturn("test-node"); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); perIndexPrimaryShardCount = 2; when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); - assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName, constraintWeight)); constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); } /** @@ -118,14 +120,14 @@ public void testGlobalPrimaryShardsConstraint() { when(node.numPrimaryShards()).thenReturn(primaryShardCount); when(node.getNodeId()).thenReturn("test-node"); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); primaryShardCount = 3; when(node.numPrimaryShards()).thenReturn(primaryShardCount); - assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + assertEquals(constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight)); constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); } /** @@ -150,22 +152,22 @@ public void testPrimaryShardsConstraints() { when(node.numPrimaryShards()).thenReturn(primaryShardCount); when(node.getNodeId()).thenReturn("test-node"); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); // breaching global primary shard count but not per index primary shard count primaryShardCount = 5; when(node.numPrimaryShards()).thenReturn(primaryShardCount); - assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + assertEquals(constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight)); // when per index primary shard count constraint is also breached perIndexPrimaryShardCount = 3; when(node.numPrimaryShards(indexName)).thenReturn(perIndexPrimaryShardCount); - assertEquals(2 * CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + assertEquals(CONSTRAINT_WEIGHT + constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight)); // disable both constraints constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); } /** @@ -202,8 +204,8 @@ public void testAllConstraints() { long expectedWeight = (shardCount >= (int) Math.ceil(avgPerIndexShardsPerNode)) ? CONSTRAINT_WEIGHT : 0; expectedWeight += perIndexPrimaryShardCount > (int) Math.ceil(avgPerIndexPrimaryShardsPerNode) ? CONSTRAINT_WEIGHT : 0; - expectedWeight += primaryShardsPerNode >= (int) Math.ceil(avgPrimaryShardsPerNode) ? CONSTRAINT_WEIGHT : 0; - assertEquals(expectedWeight, constraints.weight(balancer, node, indexName)); + expectedWeight += primaryShardsPerNode >= (int) Math.ceil(avgPrimaryShardsPerNode) ? constraintWeight : 0; + assertEquals(expectedWeight, constraints.weight(balancer, node, indexName, constraintWeight)); } }