Skip to content

Commit

Permalink
Enhance search preference based routing for WRR (opensearch-project#6834
Browse files Browse the repository at this point in the history
)

Signed-off-by: Varun Bansal <bansvaru@amazon.com>
(cherry picked from commit d8d6e73)
  • Loading branch information
linuxpi committed Apr 3, 2023
1 parent 0c32a58 commit a9ba98f
Show file tree
Hide file tree
Showing 9 changed files with 771 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791))
- Enable sort optimization for all NumericTypes ([#6464](https://github.com/opensearch-project/OpenSearch/pull/6464)
- Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331))
- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.List;

import static org.opensearch.cluster.routing.OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING;

/**
* This class contains logic to find next shard to retry search request in case of failure from other shard copy.
* This decides if retryable shard search requests can be tried on shard copies present in data
Expand Down Expand Up @@ -72,9 +74,13 @@ public SearchShardTarget findNext(
Runnable onShardSkipped
) {
SearchShardTarget next = shardIt.nextOrNull();
if (ignoreWeightedRouting(clusterState)) {
return next;
}

while (next != null && WeightedRoutingUtils.isWeighedAway(next.getNodeId(), clusterState)) {
SearchShardTarget nextShard = next;
if (canFailOpen(nextShard.getShardId(), exception, clusterState)) {
if (canFailOpen(nextShard.getShardId(), shardIt.size(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception);
getWeightedRoutingStats().updateFailOpenCount();
break;
Expand All @@ -98,10 +104,13 @@ public SearchShardTarget findNext(
*/
public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception, Runnable onShardSkipped) {
ShardRouting next = shardsIt.nextOrNull();
if (ignoreWeightedRouting(clusterState)) {
return next;
}

while (next != null && WeightedRoutingUtils.isWeighedAway(next.currentNodeId(), clusterState)) {
ShardRouting nextShard = next;
if (canFailOpen(nextShard.shardId(), exception, clusterState)) {
if (canFailOpen(nextShard.shardId(), shardsIt.size(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception);
getWeightedRoutingStats().updateFailOpenCount();
break;
Expand All @@ -117,8 +126,8 @@ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState cluster
* @return true if can fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*/
private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) {
return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId);
private boolean canFailOpen(ShardId shardId, int shardItSize, Exception exception, ClusterState clusterState) {
return shardItSize == 1 || isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId);
}

private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) {
Expand All @@ -131,6 +140,10 @@ private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardI
return false;
}

private boolean ignoreWeightedRouting(ClusterState clusterState) {
return IGNORE_WEIGHTED_SHARD_ROUTING.get(clusterState.getMetadata().settings());
}

public WeightedRoutingStats getWeightedRoutingStats() {
return WeightedRoutingStats.getInstance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,17 +324,13 @@ public ShardIterator activeInitializingShardsWeightedIt(
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight,
boolean isFailOpenEnabled
boolean isFailOpenEnabled,
@Nullable Integer seed
) {
final int seed = shufflerForWeightedRouting.nextSeed();
List<ShardRouting> ordered = new ArrayList<>();
List<ShardRouting> orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight);
List<ShardRouting> orderedListWithDistinctShards;
ordered.addAll(shufflerForWeightedRouting.shuffle(orderedActiveShards, seed));
if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(orderedInitializingShards);
if (seed == null) {
seed = shufflerForWeightedRouting.nextSeed();
}
List<ShardRouting> ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed);

// append shards for attribute value with weight zero, so that shard search requests can be tried on
// shard copies in case of request failure from other attribute values.
Expand All @@ -357,8 +353,26 @@ public ShardIterator activeInitializingShardsWeightedIt(
logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId);
}
}

return new PlainShardIterator(shardId, ordered);
}

private List<ShardRouting> activeInitializingShardsWithWeights(
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight,
int seed
) {
List<ShardRouting> ordered = new ArrayList<>();
List<ShardRouting> orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(shufflerForWeightedRouting.shuffle(orderedActiveShards, seed));
if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(orderedInitializingShards);
}
List<ShardRouting> orderedListWithDistinctShards;
orderedListWithDistinctShards = ordered.stream().distinct().collect(Collectors.toList());
return new PlainShardIterator(shardId, orderedListWithDistinctShards);
return orderedListWithDistinctShards;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,30 @@ public class OperationRouting {

public static final Setting<Boolean> STRICT_WEIGHTED_SHARD_ROUTING_ENABLED = Setting.boolSetting(
"cluster.routing.weighted.strict",
true,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Boolean> IGNORE_WEIGHTED_SHARD_ROUTING = Setting.boolSetting(
"cluster.routing.ignore_weighted_routing",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private static final List<Preference> WEIGHTED_ROUTING_RESTRICTED_PREFERENCES = Arrays.asList(
Preference.ONLY_NODES,
Preference.PREFER_NODES
);

private volatile List<String> awarenessAttributes;
private volatile boolean useAdaptiveReplicaSelection;
private volatile boolean ignoreAwarenessAttr;
private volatile double weightedRoutingDefaultWeight;
private volatile boolean isFailOpenEnabled;
private volatile boolean isStrictWeightedShardRouting;
private volatile boolean ignoreWeightedRouting;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
Expand All @@ -116,11 +130,13 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings);
this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings);
this.isStrictWeightedShardRouting = STRICT_WEIGHTED_SHARD_ROUTING_ENABLED.get(settings);
this.ignoreWeightedRouting = IGNORE_WEIGHTED_SHARD_ROUTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting);
}

void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
Expand All @@ -143,6 +159,10 @@ void setStrictWeightedShardRouting(boolean strictWeightedShardRouting) {
this.isStrictWeightedShardRouting = strictWeightedShardRouting;
}

void setIgnoreWeightedRouting(boolean isWeightedRoundRobinEnabled) {
this.ignoreWeightedRouting = isWeightedRoundRobinEnabled;
}

public boolean isIgnoreAwarenessAttr() {
return ignoreAwarenessAttr;
}
Expand Down Expand Up @@ -281,11 +301,6 @@ private ShardIterator preferenceActiveShardIterator(
if (preference == null || preference.isEmpty()) {
return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata);
}
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
throw new PreferenceBasedSearchNotAllowedException(
"Preference based routing not allowed with strict weighted shard routing setting"
);
}
if (preference.charAt(0) == '_') {
Preference preferenceType = Preference.parse(preference);
if (preferenceType == Preference.SHARDS) {
Expand Down Expand Up @@ -318,6 +333,7 @@ private ShardIterator preferenceActiveShardIterator(
}
}
preferenceType = Preference.parse(preference);
checkPreferenceBasedRoutingAllowed(preferenceType, weightedRoutingMetadata);
switch (preferenceType) {
case PREFER_NODES:
final Set<String> nodesIds = Arrays.stream(preference.substring(Preference.PREFER_NODES.type().length() + 1).split(","))
Expand All @@ -343,7 +359,19 @@ private ShardIterator preferenceActiveShardIterator(
// for a different element in the list by also incorporating the
// shard ID into the hash of the user-supplied preference key.
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
if (ignoreAwarenessAttributes()) {
if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting(
isStrictWeightedShardRouting,
ignoreWeightedRouting,
weightedRoutingMetadata
)) {
return indexShard.activeInitializingShardsWeightedIt(
weightedRoutingMetadata.getWeightedRouting(),
nodes,
getWeightedRoutingDefaultWeight(),
isFailOpenEnabled,
routingHash
);
} else if (ignoreAwarenessAttributes()) {
return indexShard.activeInitializingShardsIt(routingHash);
} else {
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
Expand All @@ -357,12 +385,13 @@ private ShardIterator shardRoutings(
@Nullable Map<String, Long> nodeCounts,
@Nullable WeightedRoutingMetadata weightedRoutingMetadata
) {
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet()) {
if (WeightedRoutingUtils.shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata)) {
return indexShard.activeInitializingShardsWeightedIt(
weightedRoutingMetadata.getWeightedRouting(),
nodes,
getWeightedRoutingDefaultWeight(),
isFailOpenEnabled
isFailOpenEnabled,
null
);
} else if (ignoreAwarenessAttributes()) {
if (useAdaptiveReplicaSelection) {
Expand Down Expand Up @@ -430,4 +459,15 @@ private static int calculateScaledShardId(IndexMetadata indexMetadata, String ef
return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();
}

private void checkPreferenceBasedRoutingAllowed(Preference preference, @Nullable WeightedRoutingMetadata weightedRoutingMetadata) {
if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting(
isStrictWeightedShardRouting,
ignoreWeightedRouting,
weightedRoutingMetadata
) && WEIGHTED_ROUTING_RESTRICTED_PREFERENCES.contains(preference)) {
throw new PreferenceBasedSearchNotAllowedException(
"Preference type based routing not allowed with strict weighted shard routing enabled"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,16 @@ public static boolean isWeighedAway(String nodeId, ClusterState clusterState) {
}
return false;
}

public static boolean shouldPerformWeightedRouting(boolean ignoreWeightedRouting, WeightedRoutingMetadata weightedRoutingMetadata) {
return !ignoreWeightedRouting && weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet();
}

public static boolean shouldPerformStrictWeightedRouting(
boolean isStrictWeightedShardRouting,
boolean ignoreWeightedRouting,
WeightedRoutingMetadata weightedRoutingMetadata
) {
return isStrictWeightedShardRouting && shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ public void apply(Settings value, Settings current, Settings previous) {
OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT,
OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED,
OperationRouting.STRICT_WEIGHTED_SHARD_ROUTING_ENABLED,
OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING,
IndexGraveyard.SETTING_MAX_TOMBSTONES,
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
Expand Down
Loading

0 comments on commit a9ba98f

Please sign in to comment.