Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for dynamically updating Leader/follower checker timeouts #10528

Merged
merged 21 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [BUG] Disable sort optimization for HALF_FLOAT ([#10999](https://github.com/opensearch-project/OpenSearch/pull/10999))
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Use iterative approach to evaluate Regex.simpleMatch ([#11060](https://github.com/opensearch-project/OpenSearch/pull/11060))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,10 @@ public Coordinator(
this::handlePublishRequest,
this::handleApplyCommit
);
this.leaderChecker = new LeaderChecker(settings, transportService, this::onLeaderFailure, nodeHealthService);
this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService);
this.followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
this::onFollowerCheckRequest,
this::removeNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -98,7 +99,9 @@ public class FollowersChecker {
"cluster.fault_detection.follower_check.timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
TimeValue.timeValueMillis(60000),
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

// the number of failed checks that must happen before the follower is considered to have failed.
Expand All @@ -112,7 +115,7 @@ public class FollowersChecker {
private final Settings settings;

private final TimeValue followerCheckInterval;
private final TimeValue followerCheckTimeout;
private TimeValue followerCheckTimeout;
private final int followerCheckRetryCount;
private final BiConsumer<DiscoveryNode, String> onNodeFailure;
private final Consumer<FollowerCheckRequest> handleRequestAndUpdateState;
Expand All @@ -127,6 +130,7 @@ public class FollowersChecker {

public FollowersChecker(
Settings settings,
ClusterSettings clusterSettings,
TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure,
Expand All @@ -141,7 +145,7 @@ public FollowersChecker(
followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings);
followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings);
followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_TIMEOUT_SETTING, this::setFollowerCheckTimeout);
updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(
FOLLOWER_CHECK_ACTION_NAME,
Expand All @@ -159,6 +163,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
});
}

private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
this.followerCheckTimeout = followerCheckTimeout;
}

/**
* Update the set of known nodes, starting to check any new ones and stopping checking any previously-known-but-now-unknown ones.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -97,7 +98,9 @@ public class LeaderChecker {
"cluster.fault_detection.leader_check.timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
TimeValue.timeValueMillis(60000),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

// the number of failed checks that must happen before the leader is considered to have failed.
Expand All @@ -111,7 +114,7 @@ public class LeaderChecker {
private final Settings settings;

private final TimeValue leaderCheckInterval;
private final TimeValue leaderCheckTimeout;
private TimeValue leaderCheckTimeout;
private final int leaderCheckRetryCount;
private final TransportService transportService;
private final Consumer<Exception> onLeaderFailure;
Expand All @@ -123,6 +126,7 @@ public class LeaderChecker {

LeaderChecker(
final Settings settings,
final ClusterSettings clusterSettings,
final TransportService transportService,
final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService
Expand All @@ -134,6 +138,7 @@ public class LeaderChecker {
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
this.nodeHealthService = nodeHealthService;
clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout);

transportService.registerRequestHandler(
LEADER_CHECK_ACTION_NAME,
Expand All @@ -155,6 +160,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
});
}

private void setLeaderCheckTimeout(TimeValue leaderCheckTimeout) {
this.leaderCheckTimeout = leaderCheckTimeout;
}

public DiscoveryNode leader() {
CheckScheduler checkScheduler = currentChecker.get();
return checkScheduler == null ? null : checkScheduler.leader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static void sendErrorResponse(TransportChannel channel, String actionName, Trans

/**
* Returns the contextual property associated with this specific transport channel (the
* implementation of how such properties are managed depends on the the particular
* implementation of how such properties are managed depends on the particular
* transport engine).
*
* @param name the name of the property
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING;
import static org.opensearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.opensearch.common.unit.TimeValue.timeValueSeconds;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class CoordinationCheckerSettingsTests extends OpenSearchSingleNodeTestCase {
public void testFollowerCheckTimeoutValueUpdate() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build();
try {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(timeSettings1)
.execute()
.actionGet();

assertAcked(response);
assertEquals(timeValueSeconds(60), setting1.get(response.getPersistentSettings()));
} finally {
// cleanup
timeSettings1 = Settings.builder().putNull(setting1.getKey()).build();
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
}

public void testFollowerCheckTimeoutMaxValue() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "61s").build();

assertThrows(
"failed to parse value [61s] for setting [" + setting1.getKey() + "], must be <= [60000ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}

public void testFollowerCheckTimeoutMinValue() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "0s").build();

assertThrows(
"failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}

public void testLeaderCheckTimeoutValueUpdate() {
Setting<TimeValue> setting1 = LEADER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build();
try {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(timeSettings1)
.execute()
.actionGet();
assertAcked(response);
assertEquals(timeValueSeconds(60), setting1.get(response.getPersistentSettings()));
} finally {
// cleanup
timeSettings1 = Settings.builder().putNull(setting1.getKey()).build();
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
}

public void testLeaderCheckTimeoutMaxValue() {
Setting<TimeValue> setting1 = LEADER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "61s").build();

assertThrows(
"failed to parse value [61s] for setting [" + setting1.getKey() + "], must be <= [60000ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}

public void testLeaderCheckTimeoutMinValue() {
Setting<TimeValue> setting1 = LEADER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "0s").build();

assertThrows(
"failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.Settings.Builder;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -96,7 +97,7 @@ public class FollowersCheckerTests extends OpenSearchTestCase {
public void testChecksExpectedNodes() {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build();

final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DiscoveryNodes[] discoveryNodesHolder = new DiscoveryNodes[] {
DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build() };

Expand Down Expand Up @@ -132,6 +133,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req

final FollowersChecker followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
fcr -> { assert false : fcr; },
(node, reason) -> {
Expand Down Expand Up @@ -257,6 +259,7 @@ public void testFailsNodeThatDisconnects() {
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());

final MockTransport mockTransport = new MockTransport() {
Expand Down Expand Up @@ -297,6 +300,7 @@ public String toString() {

final FollowersChecker followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
fcr -> { assert false : fcr; },
(node, reason) -> {
Expand Down Expand Up @@ -336,6 +340,7 @@ private void testBehaviourOfFailingNode(
final DiscoveryNode localNode = new DiscoveryNode("local-node", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode otherNode = new DiscoveryNode("other-node", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).put(testSettings).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());

final MockTransport mockTransport = new MockTransport() {
Expand Down Expand Up @@ -384,6 +389,7 @@ public String toString() {

final FollowersChecker followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
fcr -> { assert false : fcr; },
(node, reason) -> {
Expand Down Expand Up @@ -464,6 +470,7 @@ public void testUnhealthyNodeRejectsImmediately() {
final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());

final MockTransport mockTransport = new MockTransport() {
Expand All @@ -488,7 +495,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
final AtomicBoolean calledCoordinator = new AtomicBoolean();
final AtomicReference<RuntimeException> coordinatorException = new AtomicReference<>();

final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> {
final FollowersChecker followersChecker = new FollowersChecker(settings, clusterSettings, transportService, fcr -> {
assertTrue(calledCoordinator.compareAndSet(false, true));
final RuntimeException exception = coordinatorException.get();
if (exception != null) {
Expand Down Expand Up @@ -536,6 +543,7 @@ public void testResponder() {
final DiscoveryNode leader = new DiscoveryNode("leader", buildNewFakeTransportAddress(), Version.CURRENT);
final DiscoveryNode follower = new DiscoveryNode("follower", buildNewFakeTransportAddress(), Version.CURRENT);
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), follower.getName()).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());

final MockTransport mockTransport = new MockTransport() {
Expand All @@ -560,7 +568,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
final AtomicBoolean calledCoordinator = new AtomicBoolean();
final AtomicReference<RuntimeException> coordinatorException = new AtomicReference<>();

final FollowersChecker followersChecker = new FollowersChecker(settings, transportService, fcr -> {
final FollowersChecker followersChecker = new FollowersChecker(settings, clusterSettings, transportService, fcr -> {
assertTrue(calledCoordinator.compareAndSet(false, true));
final RuntimeException exception = coordinatorException.get();
if (exception != null) {
Expand Down Expand Up @@ -700,6 +708,7 @@ public void testPreferClusterManagerNodes() {
DiscoveryNodes discoveryNodes = discoNodesBuilder.localNodeId(nodes.get(0).getId()).build();
CapturingTransport capturingTransport = new CapturingTransport();
final Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), nodes.get(0).getName()).build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
TransportService transportService = capturingTransport.createTransportService(
Settings.EMPTY,
Expand All @@ -710,15 +719,9 @@ public void testPreferClusterManagerNodes() {
emptySet(),
NoopTracer.INSTANCE
);
final FollowersChecker followersChecker = new FollowersChecker(
Settings.EMPTY,
transportService,
fcr -> { assert false : fcr; },
(node, reason) -> {
assert false : node;
},
() -> new StatusInfo(HEALTHY, "healthy-info")
);
final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, clusterSettings, transportService, fcr -> {
assert false : fcr;
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"));
followersChecker.setCurrentNodes(discoveryNodes);
List<DiscoveryNode> followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear())
.map(cr -> cr.node)
Expand Down Expand Up @@ -754,7 +757,7 @@ private static Settings randomSettings() {
settingsBuilder.put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), randomIntBetween(100, 100000) + "ms");
}
if (randomBoolean()) {
settingsBuilder.put(FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), randomIntBetween(1, 100000) + "ms");
settingsBuilder.put(FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), randomIntBetween(1, 60000) + "ms");
}
return settingsBuilder.build();
}
Expand Down
Loading
Loading