Skip to content

Commit

Permalink
Make lag detector setting dynamic
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
  • Loading branch information
Rahul Karajgikar committed Oct 17, 2024
1 parent 6b6d11d commit 0f29002
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public Coordinator(
);
this.lagDetector = new LagDetector(
settings,
clusterSettings,
transportService.getThreadPool(),
n -> removeNode(n, "lagging"),
transportService::getLocalNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -68,23 +69,26 @@ public class LagDetector {
"cluster.follower_lag.timeout",
TimeValue.timeValueMillis(90000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final TimeValue clusterStateApplicationTimeout;
private TimeValue clusterStateApplicationTimeout;
private final Consumer<DiscoveryNode> onLagDetected;
private final Supplier<DiscoveryNode> localNodeSupplier;
private final ThreadPool threadPool;
private final Map<DiscoveryNode, NodeAppliedStateTracker> appliedStateTrackersByNode = newConcurrentMap();

public LagDetector(
final Settings settings,
final ClusterSettings clusterSettings,
final ThreadPool threadPool,
final Consumer<DiscoveryNode> onLagDetected,
final Supplier<DiscoveryNode> localNodeSupplier
) {
this.threadPool = threadPool;
this.clusterStateApplicationTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING, this::setFollowerLagTimeout);
this.onLagDetected = onLagDetected;
this.localNodeSupplier = localNodeSupplier;
}
Expand Down Expand Up @@ -136,6 +140,10 @@ public String toString() {
}
}

private void setFollowerLagTimeout(TimeValue followerCheckLagTimeout) {
this.clusterStateApplicationTimeout = followerCheckLagTimeout;
}

@Override
public String toString() {
return "LagDetector{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.opensearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING;
import static org.opensearch.cluster.coordination.LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING;
import static org.opensearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.opensearch.common.unit.TimeValue.timeValueSeconds;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -177,4 +178,37 @@ public void testClusterPublishTimeoutMinValue() {
);
}

public void testLagDetectorTimeoutUpdate() {
Setting<TimeValue> setting1 = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING;
Settings lagDetectorTimeout = Settings.builder().put(setting1.getKey(), "30s").build();
try {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(lagDetectorTimeout)
.execute()
.actionGet();

assertAcked(response);
assertEquals(timeValueSeconds(30), setting1.get(response.getPersistentSettings()));
} finally {
// cleanup
lagDetectorTimeout = Settings.builder().putNull(setting1.getKey()).build();
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(lagDetectorTimeout).execute().actionGet();
}
}

public void testLagDetectorTimeoutMinValue() {
Setting<TimeValue> setting1 = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING;
Settings lagDetectorTimeout = Settings.builder().put(setting1.getKey(), "0s").build();

assertThrows(
"failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(lagDetectorTimeout).execute().actionGet();
}
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.cluster.coordination;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.OpenSearchTestCase;
Expand Down Expand Up @@ -70,8 +71,9 @@ public void setupFixture() {
} else {
followerLagTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(Settings.EMPTY);
}

lagDetector = new LagDetector(settingsBuilder.build(), deterministicTaskQueue.getThreadPool(), failedNodes::add, () -> localNode);
Settings settings = settingsBuilder.build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
lagDetector = new LagDetector(settings, clusterSettings, deterministicTaskQueue.getThreadPool(), failedNodes::add, () -> localNode);

localNode = CoordinationStateTests.createNode("local");
node1 = CoordinationStateTests.createNode("node1");
Expand Down

0 comments on commit 0f29002

Please sign in to comment.