Skip to content

Commit

Permalink
Make Reconfigurator and PreVoteCollector pluggable (#95248)
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez authored Apr 14, 2023
1 parent 707c547 commit 83fadff
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.ElectionStrategy;
import org.elasticsearch.cluster.coordination.LeaderHeartbeatService;
import org.elasticsearch.cluster.coordination.PreVoteCollector;
import org.elasticsearch.cluster.coordination.Reconfigurator;
import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -48,6 +49,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -182,6 +184,9 @@ public DiscoveryModule(
);
}

var reconfigurator = getReconfigurator(settings, clusterSettings, clusterCoordinationPlugins);
var preVoteCollectorFactory = getPreVoteCollectorFactory(clusterCoordinationPlugins);

if (MULTI_NODE_DISCOVERY_TYPE.equals(discoveryType)
|| LEGACY_MULTI_NODE_DISCOVERY_TYPE.equals(discoveryType)
|| SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
Expand All @@ -203,9 +208,9 @@ public DiscoveryModule(
electionStrategy,
nodeHealthService,
circuitBreakerService,
new Reconfigurator(settings, clusterSettings),
reconfigurator,
LeaderHeartbeatService.NO_OP,
StatefulPreVoteCollector::new
preVoteCollectorFactory
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand All @@ -214,6 +219,46 @@ public DiscoveryModule(
logger.info("using discovery type [{}] and seed hosts providers {}", discoveryType, seedProviderNames);
}

// visible for testing
static Reconfigurator getReconfigurator(
Settings settings,
ClusterSettings clusterSettings,
List<ClusterCoordinationPlugin> clusterCoordinationPlugins
) {
final var reconfiguratorFactories = clusterCoordinationPlugins.stream()
.map(ClusterCoordinationPlugin::getReconfiguratorFactory)
.flatMap(Optional::stream)
.toList();

if (reconfiguratorFactories.size() > 1) {
throw new IllegalStateException("multiple reconfigurator factories found: " + reconfiguratorFactories);
}

if (reconfiguratorFactories.size() == 1) {
return reconfiguratorFactories.get(0).newReconfigurator(settings, clusterSettings);
}

return new Reconfigurator(settings, clusterSettings);
}

// visible for testing
static PreVoteCollector.Factory getPreVoteCollectorFactory(List<ClusterCoordinationPlugin> clusterCoordinationPlugins) {
final var preVoteCollectorFactories = clusterCoordinationPlugins.stream()
.map(ClusterCoordinationPlugin::getPreVoteCollectorFactory)
.flatMap(Optional::stream)
.toList();

if (preVoteCollectorFactories.size() > 1) {
throw new IllegalStateException("multiple pre-vote collector factories found: " + preVoteCollectorFactories);
}

if (preVoteCollectorFactories.size() == 1) {
return preVoteCollectorFactories.get(0);
}

return StatefulPreVoteCollector::new;
}

public static boolean isSingleNodeDiscovery(Settings settings) {
return SINGLE_NODE_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(settings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.coordination.ElectionStrategy;
import org.elasticsearch.cluster.coordination.PreVoteCollector;
import org.elasticsearch.cluster.coordination.Reconfigurator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -52,6 +54,14 @@ default Optional<PersistedClusterStateServiceFactory> getPersistedClusterStateSe
return Optional.empty();
}

default Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
return Optional.empty();
}

default Optional<PreVoteCollector.Factory> getPreVoteCollectorFactory() {
return Optional.empty();
}

interface PersistedStateFactory {
CoordinationState.PersistedState createPersistedState(
Settings settings,
Expand All @@ -68,4 +78,8 @@ PersistedClusterStateService newPersistedClusterStateService(
ThreadPool threadPool
);
}

interface ReconfiguratorFactory {
Reconfigurator newReconfigurator(Settings settings, ClusterSettings clusterSettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.PreVoteCollector;
import org.elasticsearch.cluster.coordination.Reconfigurator;
import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.service.ClusterApplier;
Expand All @@ -24,6 +27,7 @@
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.plugins.ClusterCoordinationPlugin;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -36,10 +40,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;

public class DiscoveryModuleTests extends ESTestCase {
Expand Down Expand Up @@ -219,4 +226,77 @@ public void testLegacyDiscoveryType() {
+ "[multi-node] instead."
);
}

public void testRejectsMultipleReconfigurators() {
assertThat(
expectThrows(
IllegalStateException.class,
() -> DiscoveryModule.getReconfigurator(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
List.of(
new BaseTestClusterCoordinationPlugin(),
new TestClusterCoordinationPlugin1(),
new TestClusterCoordinationPlugin2()
)
)
).getMessage(),
containsString("multiple reconfigurator factories found")
);

assertThat(
DiscoveryModule.getReconfigurator(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
List.of(new BaseTestClusterCoordinationPlugin())
),
is(BaseTestClusterCoordinationPlugin.reconfiguratorInstance)
);
}

public void testRejectsMultiplePreVoteCollectorFactories() {
assertThat(
expectThrows(
IllegalStateException.class,
() -> DiscoveryModule.getPreVoteCollectorFactory(
List.of(new BaseTestClusterCoordinationPlugin(), new TestClusterCoordinationPlugin1() {
@Override
public Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
return Optional.empty();
}
}, new TestClusterCoordinationPlugin2() {
@Override
public Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
return Optional.empty();
}
})
)
).getMessage(),
containsString("multiple pre-vote collector factories found")
);

assertThat(
DiscoveryModule.getPreVoteCollectorFactory(List.of(new BaseTestClusterCoordinationPlugin())),
is(BaseTestClusterCoordinationPlugin.preVoteCollectorFactory)
);
}

private static class BaseTestClusterCoordinationPlugin extends Plugin implements ClusterCoordinationPlugin {
static Reconfigurator reconfiguratorInstance;
static PreVoteCollector.Factory preVoteCollectorFactory = StatefulPreVoteCollector::new;

@Override
public Optional<ReconfiguratorFactory> getReconfiguratorFactory() {
return Optional.of((settings, clusterSettings) -> reconfiguratorInstance = new Reconfigurator(settings, clusterSettings));
}

@Override
public Optional<PreVoteCollector.Factory> getPreVoteCollectorFactory() {
return Optional.of(preVoteCollectorFactory);
}
}

public static class TestClusterCoordinationPlugin1 extends BaseTestClusterCoordinationPlugin {}

public static class TestClusterCoordinationPlugin2 extends BaseTestClusterCoordinationPlugin {}
}

0 comments on commit 83fadff

Please sign in to comment.