Skip to content

Commit

Permalink
Initial changes for making settings dynamic
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
  • Loading branch information
Rahul Karajgikar committed Oct 16, 2024
1 parent d404359 commit 3ef5783
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,17 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
"cluster.publish.info_timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

// the timeout for the publication of each value
public static final Setting<TimeValue> PUBLISH_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.publish.timeout",
TimeValue.timeValueMillis(30000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final Settings settings;
Expand All @@ -164,8 +166,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final Random random;
private final ElectionSchedulerFactory electionSchedulerFactory;
private final SeedHostsResolver configuredHostsResolver;
private final TimeValue publishTimeout;
private final TimeValue publishInfoTimeout;
private TimeValue publishTimeout;
private TimeValue publishInfoTimeout;
private final PublicationTransportHandler publicationHandler;
private final LeaderChecker leaderChecker;
private final FollowersChecker followersChecker;
Expand Down Expand Up @@ -246,9 +248,11 @@ public Coordinator(
this.lastJoin = Optional.empty();
this.joinAccumulator = new InitialJoinAccumulator();
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(PUBLISH_TIMEOUT_SETTING, this::setPublishTimeout);
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(PUBLISH_INFO_TIMEOUT_SETTING, this::setPublishInfoTimeout);
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, clusterSettings, random, transportService.getThreadPool());
this.preVoteCollector = new PreVoteCollector(
transportService,
this::startElection,
Expand All @@ -259,6 +263,7 @@ public Coordinator(
configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider);
this.peerFinder = new CoordinatorPeerFinder(
settings,
clusterSettings,
transportService,
new HandshakingTransportAddressConnector(settings, transportService),
configuredHostsResolver
Expand Down Expand Up @@ -317,6 +322,14 @@ public Coordinator(
this.remoteClusterStateService = remoteClusterStateService;
}

protected void setPublishTimeout(TimeValue publishTimeout) {
this.publishTimeout = publishTimeout;
}

protected void setPublishInfoTimeout(TimeValue publishInfoTimeout) {
this.publishInfoTimeout = publishInfoTimeout;
}

private ClusterFormationState getClusterFormationState() {
return new ClusterFormationState(
settings,
Expand Down Expand Up @@ -1471,12 +1484,14 @@ private class CoordinatorPeerFinder extends PeerFinder {

CoordinatorPeerFinder(
Settings settings,
ClusterSettings clusterSettings,
TransportService transportService,
TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver
) {
super(
settings,
clusterSettings,
transportService,
transportAddressConnector,
singleNodeDiscovery ? hostsResolver -> Collections.emptyList() : configuredHostsResolver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -87,7 +88,8 @@ public class ElectionSchedulerFactory {
TimeValue.timeValueMillis(100),
TimeValue.timeValueMillis(1),
TimeValue.timeValueSeconds(10),
Property.NodeScope
Property.NodeScope,
Property.Dynamic
);

public static final Setting<TimeValue> ELECTION_BACK_OFF_TIME_SETTING = Setting.timeSetting(
Expand All @@ -111,24 +113,27 @@ public class ElectionSchedulerFactory {
TimeValue.timeValueMillis(500),
TimeValue.timeValueMillis(1),
TimeValue.timeValueSeconds(300),
Property.NodeScope
Property.NodeScope,
Property.Dynamic
);

private final TimeValue initialTimeout;
private TimeValue initialTimeout;
private final TimeValue backoffTime;
private final TimeValue maxTimeout;
private final TimeValue duration;
private TimeValue duration;
private final ThreadPool threadPool;
private final Random random;

public ElectionSchedulerFactory(Settings settings, Random random, ThreadPool threadPool) {
public ElectionSchedulerFactory(Settings settings, ClusterSettings clusterSettings, Random random, ThreadPool threadPool) {
this.random = random;
this.threadPool = threadPool;

initialTimeout = ELECTION_INITIAL_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(ELECTION_INITIAL_TIMEOUT_SETTING, this::setElectionInitialTimeout);
backoffTime = ELECTION_BACK_OFF_TIME_SETTING.get(settings);
maxTimeout = ELECTION_MAX_TIMEOUT_SETTING.get(settings);
duration = ELECTION_DURATION_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(ELECTION_DURATION_SETTING, this::setElectionDuration);

if (maxTimeout.millis() < initialTimeout.millis()) {
throw new IllegalArgumentException(
Expand All @@ -143,6 +148,14 @@ public ElectionSchedulerFactory(Settings settings, Random random, ThreadPool thr
}
}

protected void setElectionInitialTimeout(TimeValue electionInitialTimeout) {
this.initialTimeout = electionInitialTimeout;
}

protected void setElectionDuration(TimeValue electionDuration) {
this.duration = electionDuration;
}

/**
* Start the process to schedule repeated election attempts.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,16 @@ public class FollowersChecker {
"cluster.fault_detection.follower_check.interval",
TimeValue.timeValueMillis(1000),
TimeValue.timeValueMillis(100),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

// the timeout for each check sent to each node
public static final Setting<TimeValue> FOLLOWER_CHECK_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.fault_detection.follower_check.timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(1),
TimeValue.timeValueMillis(60000),
TimeValue.timeValueMillis(150000),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand All @@ -110,14 +111,15 @@ public class FollowersChecker {
"cluster.fault_detection.follower_check.retry_count",
3,
1,
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final Settings settings;

private final TimeValue followerCheckInterval;
private TimeValue followerCheckInterval;
private TimeValue followerCheckTimeout;
private final int followerCheckRetryCount;
private int followerCheckRetryCount;
private final BiConsumer<DiscoveryNode, String> onNodeFailure;
private final Consumer<FollowerCheckRequest> handleRequestAndUpdateState;

Expand Down Expand Up @@ -148,7 +150,9 @@ public FollowersChecker(
followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings);
followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings);
followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_INTERVAL_SETTING, this::setFollowerCheckInterval);
clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_TIMEOUT_SETTING, this::setFollowerCheckTimeout);
clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_RETRY_COUNT_SETTING, this::setFollowerCheckRetryCount);
updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(
FOLLOWER_CHECK_ACTION_NAME,
Expand All @@ -167,10 +171,18 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
this.clusterManagerMetrics = clusterManagerMetrics;
}

private void setFollowerCheckInterval(TimeValue followerCheckInterval) {
this.followerCheckInterval = followerCheckInterval;
}

private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
this.followerCheckTimeout = followerCheckTimeout;
}

private void setFollowerCheckRetryCount(int followerCheckRetryCount) {
this.followerCheckRetryCount = followerCheckRetryCount;
}

/**
* Update the set of known nodes, starting to check any new ones and stopping checking any previously-known-but-now-unknown ones.
*/
Expand Down
12 changes: 10 additions & 2 deletions server/src/main/java/org/opensearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -95,12 +96,13 @@ public abstract class PeerFinder {
"discovery.request_peers_timeout",
TimeValue.timeValueMillis(3000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final Settings settings;
private TimeValue findPeersInterval;
private final TimeValue requestPeersTimeout;
private TimeValue requestPeersTimeout;

private final Object mutex = new Object();
private final TransportService transportService;
Expand All @@ -116,13 +118,15 @@ public abstract class PeerFinder {

public PeerFinder(
Settings settings,
ClusterSettings clusterSettings,
TransportService transportService,
TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver
) {
this.settings = settings;
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING, this::setRequestPeersTimeout);
this.transportService = transportService;
this.transportAddressConnector = transportAddressConnector;
this.configuredHostsResolver = configuredHostsResolver;
Expand Down Expand Up @@ -235,6 +239,10 @@ private DiscoveryNode getLocalNode() {
return localNode;
}

private void setRequestPeersTimeout(TimeValue requestPeersTimeout) {
this.requestPeersTimeout = requestPeersTimeout;
}

/**
* Invoked on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join.
* Note that invocations of this method are not synchronised. By the time it is called we may have been deactivated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {

public static final String ALLOCATOR_NAME = "shards_batch_gateway_allocator";
private static final Logger logger = LogManager.getLogger(ShardsBatchGatewayAllocator.class);
private final long maxBatchSize;
private long maxBatchSize;
private static final short DEFAULT_SHARD_BATCH_SIZE = 2000;

public static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
Expand All @@ -93,7 +93,8 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
DEFAULT_SHARD_BATCH_SIZE,
1,
10000,
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
Expand Down Expand Up @@ -172,6 +173,7 @@ public ShardsBatchGatewayAllocator(
this.batchStartedAction = batchStartedAction;
this.batchStoreAction = batchStoreAction;
this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings);
clusterSettings.addSettingsUpdateConsumer(GATEWAY_ALLOCATOR_BATCH_SIZE, this::setMaxBatchSize);
this.primaryShardsBatchGatewayAllocatorTimeout = PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setPrimaryBatchAllocatorTimeout);
this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings);
Expand Down Expand Up @@ -906,6 +908,10 @@ public int getNumberOfStoreShardBatches() {
return batchIdToStoreShardBatch.size();
}

protected void setMaxBatchSize(long maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}

protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatewayAllocatorTimeout) {
this.primaryShardsBatchGatewayAllocatorTimeout = primaryShardsBatchGatewayAllocatorTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public void testFollowerCheckTimeoutValueUpdate() {

public void testFollowerCheckTimeoutMaxValue() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "61s").build();
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "151s").build();

assertThrows(
"failed to parse value [61s] for setting [" + setting1.getKey() + "], must be <= [60000ms]",
"failed to parse value [151s] for setting [" + setting1.getKey() + "], must be <= [150000ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.cluster.coordination;

import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.Settings.Builder;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -165,8 +166,10 @@ public void testRetriesOnCorrectSchedule() {
final long duration = ELECTION_DURATION_SETTING.get(settings).millis();

final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(settings, random());
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final ElectionSchedulerFactory electionSchedulerFactory = new ElectionSchedulerFactory(
settings,
clusterSettings,
random(),
deterministicTaskQueue.getThreadPool()
);
Expand Down Expand Up @@ -234,8 +237,8 @@ public void testSettingsValidation() {
assertThat(ELECTION_INITIAL_TIMEOUT_SETTING.get(settings), is(TimeValue.timeValueMillis(initialTimeoutMillis)));
assertThat(ELECTION_BACK_OFF_TIME_SETTING.get(settings), is(TimeValue.timeValueMillis(backOffMillis)));
assertThat(ELECTION_MAX_TIMEOUT_SETTING.get(settings), is(TimeValue.timeValueMillis(maxTimeoutMillis)));

assertThat(new ElectionSchedulerFactory(settings, random(), null), not(nullValue())); // doesn't throw an IAE
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
assertThat(new ElectionSchedulerFactory(settings, clusterSettings, random(), null), not(nullValue())); // doesn't throw an IAE
}

{
Expand All @@ -246,10 +249,10 @@ public void testSettingsValidation() {
.put(ELECTION_INITIAL_TIMEOUT_SETTING.getKey(), initialTimeoutMillis + "ms")
.put(ELECTION_MAX_TIMEOUT_SETTING.getKey(), maxTimeoutMillis + "ms")
.build();

final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> new ElectionSchedulerFactory(settings, random(), null)
() -> new ElectionSchedulerFactory(settings, clusterSettings, random(), null)
);
assertThat(
e.getMessage(),
Expand Down
14 changes: 10 additions & 4 deletions server/src/test/java/org/opensearch/discovery/PeerFinderTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.node.DiscoveryNodes.Builder;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -169,8 +170,13 @@ class TestPeerFinder extends PeerFinder {
DiscoveryNode discoveredClusterManagerNode;
OptionalLong discoveredClusterManagerTerm = OptionalLong.empty();

TestPeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector) {
super(settings, transportService, transportAddressConnector, PeerFinderTests.this::resolveConfiguredHosts);
TestPeerFinder(
Settings settings,
ClusterSettings clusterSettings,
TransportService transportService,
TransportAddressConnector transportAddressConnector
) {
super(settings, clusterSettings, transportService, transportAddressConnector, PeerFinderTests.this::resolveConfiguredHosts);
}

@Override
Expand Down Expand Up @@ -251,8 +257,8 @@ public void setup() {
transportService.acceptIncomingRequests();

lastAcceptedNodes = DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build();

peerFinder = new TestPeerFinder(settings, transportService, transportAddressConnector);
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
peerFinder = new TestPeerFinder(settings, clusterSettings, transportService, transportAddressConnector);
foundPeersFromNotification = emptyList();
}

Expand Down

0 comments on commit 3ef5783

Please sign in to comment.