From dec4d6b063b0b6cb21419a34c2befb0479de0850 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 11 May 2021 13:45:41 +0200 Subject: [PATCH] [FLINK-22636][zk] Group job specific zNodes under /jobs zNode In order to better clean up job specific HA services, this commit changes the layout of the zNode structure so that the JobMaster leader, checkpoints and checkpoint counter is now grouped below the jobs/ zNode. Moreover, this commit groups the leaders of the cluster components (Dispatcher, ResourceManager, RestServer) under /leader/process/latch and /leader/process/connection-info. --- .../expert_high_availability_zk_section.html | 24 -- .../high_availability_configuration.html | 24 -- .../flink/configuration/ConfigConstants.java | 18 +- .../HighAvailabilityOptions.java | 33 --- .../KubernetesHaServices.java | 14 +- .../ZooKeeperCheckpointIDCounter.java | 14 +- .../ZooKeeperCheckpointRecoveryFactory.java | 10 +- .../highavailability/AbstractHaServices.java | 34 +-- .../HighAvailabilityServices.java | 2 +- .../HighAvailabilityServicesUtils.java | 2 +- .../zookeeper/ZooKeeperClientHAServices.java | 10 +- .../zookeeper/ZooKeeperHaServices.java | 79 ++++--- .../ZooKeeperLeaderElectionDriver.java | 37 ++-- .../ZooKeeperLeaderElectionDriverFactory.java | 17 +- .../ZooKeeperLeaderRetrievalDriver.java | 23 +- .../flink/runtime/util/ZooKeeperUtils.java | 206 ++++++++++-------- ...ZKCheckpointIDCounterMultiServersTest.java | 2 +- .../ZooKeeperCheckpointIDCounterITCase.java | 18 +- .../ZooKeeperDefaultDispatcherRunnerTest.java | 4 +- .../AbstractHaServicesTest.java | 8 +- .../leaderelection/LeaderElectionTest.java | 2 +- ...rLeaderElectionConnectionHandlingTest.java | 32 +-- .../ZooKeeperLeaderElectionTest.java | 42 ++-- 23 files changed, 313 insertions(+), 342 deletions(-) diff --git a/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html b/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html index 1886ef38017834..084f64f9c57c3a 100644 --- a/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html +++ b/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html @@ -38,36 +38,12 @@ Integer Defines the session timeout for the ZooKeeper session in ms. - -
high-availability.zookeeper.path.checkpoint-counter
- "/checkpoint-counter" - String - ZooKeeper root path (ZNode) for checkpoint counters. - - -
high-availability.zookeeper.path.checkpoints
- "/checkpoints" - String - ZooKeeper root path (ZNode) for completed checkpoints. -
high-availability.zookeeper.path.jobgraphs
"/jobgraphs" String ZooKeeper root path (ZNode) for job graphs - -
high-availability.zookeeper.path.latch
- "/leaderlatch" - String - Defines the znode of the leader latch which is used to elect the leader. - - -
high-availability.zookeeper.path.leader
- "/leader" - String - Defines the znode of the leader which contains the URL to the leader and the current leader session ID. -
high-availability.zookeeper.path.mesos-workers
"/mesos-workers" diff --git a/docs/layouts/shortcodes/generated/high_availability_configuration.html b/docs/layouts/shortcodes/generated/high_availability_configuration.html index 95d4746f9851fc..93f0335c9336e0 100644 --- a/docs/layouts/shortcodes/generated/high_availability_configuration.html +++ b/docs/layouts/shortcodes/generated/high_availability_configuration.html @@ -62,36 +62,12 @@ Integer Defines the session timeout for the ZooKeeper session in ms. - -
high-availability.zookeeper.path.checkpoint-counter
- "/checkpoint-counter" - String - ZooKeeper root path (ZNode) for checkpoint counters. - - -
high-availability.zookeeper.path.checkpoints
- "/checkpoints" - String - ZooKeeper root path (ZNode) for completed checkpoints. -
high-availability.zookeeper.path.jobgraphs
"/jobgraphs" String ZooKeeper root path (ZNode) for job graphs - -
high-availability.zookeeper.path.latch
- "/leaderlatch" - String - Defines the znode of the leader latch which is used to elect the leader. - - -
high-availability.zookeeper.path.leader
- "/leader" - String - Defines the znode of the leader which contains the URL to the leader and the current leader session ID. -
high-availability.zookeeper.path.mesos-workers
"/mesos-workers" diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index a694e50f557787..1f64963b12d083 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -1013,7 +1013,7 @@ public final class ConfigConstants { public static final String HA_ZOOKEEPER_NAMESPACE_KEY = "high-availability.zookeeper.path.namespace"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */ + /** @deprecated no longer used. */ @PublicEvolving @Deprecated public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch"; @@ -1026,14 +1026,14 @@ public final class ConfigConstants { public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH = "high-availability.zookeeper.path.jobgraphs"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */ + /** @deprecated no longer used. */ @PublicEvolving @Deprecated public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader"; /** * ZooKeeper root path (ZNode) for completed checkpoints. * - * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. + * @deprecated no longer used. */ @PublicEvolving @Deprecated public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH = @@ -1042,7 +1042,7 @@ public final class ConfigConstants { /** * ZooKeeper root path (ZNode) for checkpoint counters. * - * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}. + * @deprecated no longer used. */ @PublicEvolving @Deprecated public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = @@ -1691,21 +1691,19 @@ public final class ConfigConstants { /** @deprecated in favor of {@link HighAvailabilityOptions#HA_CLUSTER_ID}. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */ + /** @deprecated no longer used. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */ + /** @deprecated no longer used. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader"; /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. */ + /** @deprecated no longer used. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints"; - /** - * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH} - */ + /** @deprecated no longer used. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter"; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java index e630d941cd5379..d6cfbc32fc1103 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java @@ -115,14 +115,6 @@ public class HighAvailabilityOptions { .withDescription( "The root path under which Flink stores its entries in ZooKeeper."); - @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) - public static final ConfigOption HA_ZOOKEEPER_LATCH_PATH = - key("high-availability.zookeeper.path.latch") - .defaultValue("/leaderlatch") - .withDeprecatedKeys("recovery.zookeeper.path.latch") - .withDescription( - "Defines the znode of the leader latch which is used to elect the leader."); - /** ZooKeeper root path (ZNode) for job graphs. */ @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) public static final ConfigOption HA_ZOOKEEPER_JOBGRAPHS_PATH = @@ -131,31 +123,6 @@ public class HighAvailabilityOptions { .withDeprecatedKeys("recovery.zookeeper.path.jobgraphs") .withDescription("ZooKeeper root path (ZNode) for job graphs"); - @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) - public static final ConfigOption HA_ZOOKEEPER_LEADER_PATH = - key("high-availability.zookeeper.path.leader") - .defaultValue("/leader") - .withDeprecatedKeys("recovery.zookeeper.path.leader") - .withDescription( - "Defines the znode of the leader which contains the URL to the leader and the current" - + " leader session ID."); - - /** ZooKeeper root path (ZNode) for completed checkpoints. */ - @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) - public static final ConfigOption HA_ZOOKEEPER_CHECKPOINTS_PATH = - key("high-availability.zookeeper.path.checkpoints") - .defaultValue("/checkpoints") - .withDeprecatedKeys("recovery.zookeeper.path.checkpoints") - .withDescription("ZooKeeper root path (ZNode) for completed checkpoints."); - - /** ZooKeeper root path (ZNode) for checkpoint counters. */ - @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) - public static final ConfigOption HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = - key("high-availability.zookeeper.path.checkpoint-counter") - .defaultValue("/checkpoint-counter") - .withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter") - .withDescription("ZooKeeper root path (ZNode) for checkpoint counters."); - /** ZooKeeper root path (ZNode) for Mesos workers. */ @PublicEvolving @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java index f5ff366710251d..055ff1819bf50e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java @@ -112,20 +112,20 @@ public CheckpointRecoveryFactory createCheckpointRecoveryFactory() { kubeClient, configuration, ioExecutor, - this::getLeaderNameForJobManager, + this::getLeaderPathForJobManager, lockIdentity); } @Override public JobGraphStore createJobGraphStore() throws Exception { return KubernetesUtils.createJobGraphStore( - configuration, kubeClient, getLeaderNameForDispatcher(), lockIdentity); + configuration, kubeClient, getLeaderPathForDispatcher(), lockIdentity); } @Override public RunningJobsRegistry createRunningJobsRegistry() { return new KubernetesRunningJobsRegistry( - kubeClient, getLeaderNameForDispatcher(), lockIdentity); + kubeClient, getLeaderPathForDispatcher(), lockIdentity); } @Override @@ -143,21 +143,21 @@ public void internalCleanup() throws Exception { } @Override - protected String getLeaderNameForResourceManager() { + protected String getLeaderPathForResourceManager() { return getLeaderName(RESOURCE_MANAGER_NAME); } @Override - protected String getLeaderNameForDispatcher() { + protected String getLeaderPathForDispatcher() { return getLeaderName(DISPATCHER_NAME); } - public String getLeaderNameForJobManager(final JobID jobID) { + public String getLeaderPathForJobManager(final JobID jobID) { return getLeaderName(jobID.toString() + NAME_SEPARATOR + JOB_MANAGER_NAME); } @Override - protected String getLeaderNameForRestServer() { + protected String getLeaderPathForRestServer() { return getLeaderName(REST_SERVER_NAME); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java index 9fb480f51704e2..1e4180a4e804f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.shared.SharedCount; @@ -76,14 +78,11 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { * Creates a {@link ZooKeeperCheckpointIDCounter} instance. * * @param client Curator ZooKeeper client - * @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job. */ public ZooKeeperCheckpointIDCounter( - CuratorFramework client, - String counterPath, - LastStateConnectionStateListener connectionStateListener) { + CuratorFramework client, LastStateConnectionStateListener connectionStateListener) { this.client = checkNotNull(client, "Curator client"); - this.counterPath = checkNotNull(counterPath, "Counter path"); + this.counterPath = ZooKeeperUtils.getCheckpointIdCounterPath(); this.sharedCount = new SharedCount(client, counterPath, 1); this.connectionStateListener = connectionStateListener; } @@ -176,4 +175,9 @@ private void checkConnectionState() { } }); } + + @VisibleForTesting + String getPath() { + return counterPath; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java index 432865d27a6833..379d6a3bff96d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java @@ -51,11 +51,17 @@ public CompletedCheckpointStore createCheckpointStore( throws Exception { return ZooKeeperUtils.createCompletedCheckpoints( - client, config, jobId, maxNumberOfCheckpointsToRetain, executor); + ZooKeeperUtils.useNamespaceAndEnsurePath( + client, ZooKeeperUtils.getPathForJob(jobId)), + config, + maxNumberOfCheckpointsToRetain, + executor); } @Override public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception { - return ZooKeeperUtils.createCheckpointIDCounter(client, config, jobID); + return ZooKeeperUtils.createCheckpointIDCounter( + ZooKeeperUtils.useNamespaceAndEnsurePath( + client, ZooKeeperUtils.getPathForJob(jobID))); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java index 9a322a8d924875..e2c980668d7fb9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java @@ -38,9 +38,9 @@ /** * Abstract high availability services based on distributed system(e.g. Zookeeper, Kubernetes). It * will help with creating all the leader election/retrieval services and the cleanup. Please return - * a proper leader name int the implementation of {@link #getLeaderNameForResourceManager}, {@link - * #getLeaderNameForDispatcher}, {@link #getLeaderNameForJobManager}, {@link - * #getLeaderNameForRestServer}. The returned leader name is the ConfigMap name in Kubernetes and + * a proper leader name int the implementation of {@link #getLeaderPathForResourceManager}, {@link + * #getLeaderPathForDispatcher}, {@link #getLeaderPathForJobManager}, {@link + * #getLeaderPathForRestServer}. The returned leader name is the ConfigMap name in Kubernetes and * child path in Zookeeper. * *

{@link #close()} and {@link #closeAndCleanupAllData()} should be implemented to destroy the @@ -75,17 +75,17 @@ public AbstractHaServices( @Override public LeaderRetrievalService getResourceManagerLeaderRetriever() { - return createLeaderRetrievalService(getLeaderNameForResourceManager()); + return createLeaderRetrievalService(getLeaderPathForResourceManager()); } @Override public LeaderRetrievalService getDispatcherLeaderRetriever() { - return createLeaderRetrievalService(getLeaderNameForDispatcher()); + return createLeaderRetrievalService(getLeaderPathForDispatcher()); } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { - return createLeaderRetrievalService(getLeaderNameForJobManager(jobID)); + return createLeaderRetrievalService(getLeaderPathForJobManager(jobID)); } @Override @@ -96,31 +96,31 @@ public LeaderRetrievalService getJobManagerLeaderRetriever( @Override public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() { - return createLeaderRetrievalService(getLeaderNameForRestServer()); + return createLeaderRetrievalService(getLeaderPathForRestServer()); } @Override public LeaderElectionService getResourceManagerLeaderElectionService() { - return createLeaderElectionService(getLeaderNameForResourceManager()); + return createLeaderElectionService(getLeaderPathForResourceManager()); } @Override public LeaderElectionService getDispatcherLeaderElectionService() { - return createLeaderElectionService(getLeaderNameForDispatcher()); + return createLeaderElectionService(getLeaderPathForDispatcher()); } @Override public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { - return createLeaderElectionService(getLeaderNameForJobManager(jobID)); + return createLeaderElectionService(getLeaderPathForJobManager(jobID)); } @Override public LeaderElectionService getClusterRestEndpointLeaderElectionService() { - return createLeaderElectionService(getLeaderNameForRestServer()); + return createLeaderElectionService(getLeaderPathForRestServer()); } @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { return createCheckpointRecoveryFactory(); } @@ -226,7 +226,7 @@ public void closeAndCleanupAllData() throws Exception { * * @return Checkpoint recovery factory */ - protected abstract CheckpointRecoveryFactory createCheckpointRecoveryFactory(); + protected abstract CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws Exception; /** * Create the submitted job graph store for the job manager. @@ -266,7 +266,7 @@ public void closeAndCleanupAllData() throws Exception { * @return Return the ResourceManager leader name. It is ConfigMap name in Kubernetes or child * node path in Zookeeper. */ - protected abstract String getLeaderNameForResourceManager(); + protected abstract String getLeaderPathForResourceManager(); /** * Get the leader name for Dispatcher. @@ -274,7 +274,7 @@ public void closeAndCleanupAllData() throws Exception { * @return Return the Dispatcher leader name. It is ConfigMap name in Kubernetes or child node * path in Zookeeper. */ - protected abstract String getLeaderNameForDispatcher(); + protected abstract String getLeaderPathForDispatcher(); /** * Get the leader name for specific JobManager. @@ -283,7 +283,7 @@ public void closeAndCleanupAllData() throws Exception { * @return Return the JobManager leader name for specified job id. It is ConfigMap name in * Kubernetes or child node path in Zookeeper. */ - protected abstract String getLeaderNameForJobManager(final JobID jobID); + protected abstract String getLeaderPathForJobManager(final JobID jobID); /** * Get the leader name for RestServer. @@ -291,5 +291,5 @@ public void closeAndCleanupAllData() throws Exception { * @return Return the RestServer leader name. It is ConfigMap name in Kubernetes or child node * path in Zookeeper. */ - protected abstract String getLeaderNameForRestServer(); + protected abstract String getLeaderPathForRestServer(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index fdb3fa7dcb5359..650ab971a66ab9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -158,7 +158,7 @@ default LeaderElectionService getWebMonitorLeaderElectionService() { * * @return Checkpoint recovery factory */ - CheckpointRecoveryFactory getCheckpointRecoveryFactory(); + CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception; /** * Gets the submitted job graph store for the job manager. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index 0f6ebb05eb0e6a..3f134317879367 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -140,7 +140,7 @@ public static ClientHighAvailabilityServices createClientHAService(Configuration return new StandaloneClientHAServices(webMonitorAddress); case ZOOKEEPER: final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration); - return new ZooKeeperClientHAServices(client, configuration); + return new ZooKeeperClientHAServices(client); case FACTORY_CLASS: return createCustomClientHAServices(configuration); default: diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java index e82d2cb85a0c12..76002c8d667cd4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.highavailability.zookeeper; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.ZooKeeperUtils; @@ -30,21 +29,16 @@ /** ZooKeeper based implementation for {@link ClientHighAvailabilityServices}. */ public class ZooKeeperClientHAServices implements ClientHighAvailabilityServices { - private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock"; - private final CuratorFramework client; - private final Configuration configuration; - public ZooKeeperClientHAServices( - @Nonnull CuratorFramework client, @Nonnull Configuration configuration) { + public ZooKeeperClientHAServices(@Nonnull CuratorFramework client) { this.client = client; - this.configuration = configuration; } @Override public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService( - client, configuration, REST_SERVER_LEADER_PATH); + client, ZooKeeperUtils.getLeaderPathForRestServer()); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index f6ab9e859d630a..f9d43afafe9e38 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -47,21 +47,37 @@ * *

  * /flink
- *      +/cluster_id_1/resource_manager_lock
+ *      +/cluster_id_1/leader/resource_manager/latch
+ *      |            |                        /connection_info
+ *      |            |       /dispatcher/latch
+ *      |            |                  /connection_info
+ *      |            |       /rest_server/latch
+ *      |            |                   /connection_info
  *      |            |
- *      |            +/job-id-1/job_manager_lock
- *      |            |         /checkpoints/latest
- *      |            |                     /latest-1
- *      |            |                     /latest-2
  *      |            |
- *      |            +/job-id-2/job_manager_lock
+ *      |            +jobgraphs/job-id-1
+ *      |            |         /job-id-2
+ *      |            +jobs/job-id-1/leader/latch
+ *      |                 |               /connection_info
+ *      |                 |        /checkpoints/latest
+ *      |                 |                    /latest-1
+ *      |                 |                    /latest-2
+ *      |                 |       /checkpoint_id_counter
  *      |
- *      +/cluster_id_2/resource_manager_lock
- *                   |
- *                   +/job-id-1/job_manager_lock
- *                            |/checkpoints/latest
- *                            |            /latest-1
- *                            |/persisted_job_graph
+ *      +/cluster_id_2/leader/resource_manager/latch
+ *      |            |                        /connection_info
+ *      |            |       /dispatcher/latch
+ *      |            |                  /connection_info
+ *      |            |       /rest_server/latch
+ *      |            |                   /connection_info
+ *      |            |
+ *      |            +jobgraphs/job-id-2
+ *      |            +jobs/job-id-2/leader/latch
+ *      |                 |               /connection_info
+ *      |                 |        /checkpoints/latest
+ *      |                 |                    /latest-1
+ *      |                 |                    /latest-2
+ *      |                 |       /checkpoint_id_counter
  * 
* *

The root path "/flink" is configurable via the option {@link @@ -81,14 +97,6 @@ */ public class ZooKeeperHaServices extends AbstractHaServices { - private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock"; - - private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock"; - - private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock"; - - private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock"; - // ------------------------------------------------------------------------ /** The ZooKeeper client to use. */ @@ -104,8 +112,11 @@ public ZooKeeperHaServices( } @Override - public CheckpointRecoveryFactory createCheckpointRecoveryFactory() { - return new ZooKeeperCheckpointRecoveryFactory(client, configuration, ioExecutor); + public CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws Exception { + return new ZooKeeperCheckpointRecoveryFactory( + ZooKeeperUtils.useNamespaceAndEnsurePath(client, ZooKeeperUtils.getJobsPath()), + configuration, + ioExecutor); } @Override @@ -119,13 +130,13 @@ public RunningJobsRegistry createRunningJobsRegistry() { } @Override - protected LeaderElectionService createLeaderElectionService(String leaderName) { - return ZooKeeperUtils.createLeaderElectionService(client, configuration, leaderName); + protected LeaderElectionService createLeaderElectionService(String leaderPath) { + return ZooKeeperUtils.createLeaderElectionService(client, leaderPath); } @Override - protected LeaderRetrievalService createLeaderRetrievalService(String leaderName) { - return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, leaderName); + protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath) { + return ZooKeeperUtils.createLeaderRetrievalService(client, leaderPath); } @Override @@ -139,22 +150,22 @@ public void internalCleanup() throws Exception { } @Override - protected String getLeaderNameForResourceManager() { - return RESOURCE_MANAGER_LEADER_PATH; + protected String getLeaderPathForResourceManager() { + return ZooKeeperUtils.getLeaderPathForResourceManager(); } @Override - protected String getLeaderNameForDispatcher() { - return DISPATCHER_LEADER_PATH; + protected String getLeaderPathForDispatcher() { + return ZooKeeperUtils.getLeaderPathForDispatcher(); } - public String getLeaderNameForJobManager(final JobID jobID) { - return "/" + jobID + JOB_MANAGER_LEADER_PATH; + public String getLeaderPathForJobManager(final JobID jobID) { + return ZooKeeperUtils.getLeaderPathForJobManager(jobID); } @Override - protected String getLeaderNameForRestServer() { - return REST_SERVER_LEADER_PATH; + protected String getLeaderPathForRestServer() { + return ZooKeeperUtils.getLeaderPathForRestServer(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java index 855ebca247a38c..522f36eb93b856 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java @@ -18,7 +18,9 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; @@ -68,7 +70,7 @@ public class ZooKeeperLeaderElectionDriver private final NodeCache cache; /** ZooKeeper path of the node which stores the current leader information. */ - private final String leaderPath; + private final String connectionInformationPath; private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState); @@ -85,29 +87,27 @@ public class ZooKeeperLeaderElectionDriver * Creates a ZooKeeperLeaderElectionDriver object. * * @param client Client which is connected to the ZooKeeper quorum - * @param latchPath ZooKeeper node path for the leader election latch - * @param leaderPath ZooKeeper node path for the node which stores the current leader - * information + * @param path ZooKeeper node path for the leader election * @param leaderElectionEventHandler Event handler for processing leader change events * @param fatalErrorHandler Fatal error handler * @param leaderContenderDescription Leader contender description */ public ZooKeeperLeaderElectionDriver( CuratorFramework client, - String latchPath, - String leaderPath, + String path, LeaderElectionEventHandler leaderElectionEventHandler, FatalErrorHandler fatalErrorHandler, String leaderContenderDescription) throws Exception { + checkNotNull(path); this.client = checkNotNull(client); - this.leaderPath = checkNotNull(leaderPath); + this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path); this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.leaderContenderDescription = checkNotNull(leaderContenderDescription); - leaderLatch = new LeaderLatch(client, checkNotNull(latchPath)); - cache = new NodeCache(client, leaderPath); + leaderLatch = new LeaderLatch(client, ZooKeeperUtils.generateLeaderLatchPath(path)); + cache = new NodeCache(client, connectionInformationPath); client.getUnhandledErrorListenable().addListener(this); @@ -220,7 +220,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) { boolean dataWritten = false; while (!dataWritten && leaderLatch.hasLeadership()) { - Stat stat = client.checkExists().forPath(leaderPath); + Stat stat = client.checkExists().forPath(connectionInformationPath); if (stat != null) { long owner = stat.getEphemeralOwner(); @@ -228,7 +228,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) { if (owner == sessionID) { try { - client.setData().forPath(leaderPath, baos.toByteArray()); + client.setData().forPath(connectionInformationPath, baos.toByteArray()); dataWritten = true; } catch (KeeperException.NoNodeException noNode) { @@ -236,7 +236,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) { } } else { try { - client.delete().forPath(leaderPath); + client.delete().forPath(connectionInformationPath); } catch (KeeperException.NoNodeException noNode) { // node was deleted in the meantime --> try again } @@ -246,7 +246,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) - .forPath(leaderPath, baos.toByteArray()); + .forPath(connectionInformationPath, baos.toByteArray()); dataWritten = true; } catch (KeeperException.NodeExistsException nodeExists) { @@ -301,6 +301,15 @@ public void unhandledError(String message, Throwable e) { @Override public String toString() { - return "ZooKeeperLeaderElectionDriver{" + "leaderPath='" + leaderPath + '\'' + '}'; + return "ZooKeeperLeaderElectionDriver{" + + "leaderPath='" + + connectionInformationPath + + '\'' + + '}'; + } + + @VisibleForTesting + String getConnectionInformationPath() { + return connectionInformationPath; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java index a408882ef43e05..eef7d0cb3547e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java @@ -27,15 +27,11 @@ public class ZooKeeperLeaderElectionDriverFactory implements LeaderElectionDrive private final CuratorFramework client; - private final String latchPath; + private final String path; - private final String leaderPath; - - public ZooKeeperLeaderElectionDriverFactory( - CuratorFramework client, String latchPath, String leaderPath) { + public ZooKeeperLeaderElectionDriverFactory(CuratorFramework client, String path) { this.client = client; - this.latchPath = latchPath; - this.leaderPath = leaderPath; + this.path = path; } @Override @@ -45,11 +41,6 @@ public ZooKeeperLeaderElectionDriver createLeaderElectionDriver( String leaderContenderDescription) throws Exception { return new ZooKeeperLeaderElectionDriver( - client, - latchPath, - leaderPath, - leaderEventHandler, - fatalErrorHandler, - leaderContenderDescription); + client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java index f447a17e1ce4e7..c1dc1ff21405a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java @@ -18,9 +18,11 @@ package org.apache.flink.runtime.leaderretrieval; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.leaderelection.LeaderInformation; import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; @@ -57,7 +59,7 @@ public class ZooKeeperLeaderRetrievalDriver /** Curator recipe to watch changes of a specific ZooKeeper node. */ private final NodeCache cache; - private final String retrievalPath; + private final String connectionInformationPath; private final ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState); @@ -72,19 +74,19 @@ public class ZooKeeperLeaderRetrievalDriver * Creates a leader retrieval service which uses ZooKeeper to retrieve the leader information. * * @param client Client which constitutes the connection to the ZooKeeper quorum - * @param retrievalPath Path of the ZooKeeper node which contains the leader information + * @param path Path of the ZooKeeper node which contains the leader information * @param leaderRetrievalEventHandler Handler to notify the leader changes. * @param fatalErrorHandler Fatal error handler */ public ZooKeeperLeaderRetrievalDriver( CuratorFramework client, - String retrievalPath, + String path, LeaderRetrievalEventHandler leaderRetrievalEventHandler, FatalErrorHandler fatalErrorHandler) throws Exception { this.client = checkNotNull(client, "CuratorFramework client"); - this.cache = new NodeCache(client, retrievalPath); - this.retrievalPath = checkNotNull(retrievalPath); + this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path); + this.cache = new NodeCache(client, connectionInformationPath); this.leaderRetrievalEventHandler = checkNotNull(leaderRetrievalEventHandler); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); @@ -188,6 +190,15 @@ public void unhandledError(String s, Throwable throwable) { @Override public String toString() { - return "ZookeeperLeaderRetrievalDriver{" + "retrievalPath='" + retrievalPath + '\'' + '}'; + return "ZookeeperLeaderRetrievalDriver{" + + "retrievalPath='" + + connectionInformationPath + + '\'' + + '}'; + } + + @VisibleForTesting + public String getConnectionInformationPath() { + return connectionInformationPath; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 0af2248340ffed..4164942b93e580 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -47,7 +47,6 @@ import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; import org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFrameworkFactory; @@ -62,12 +61,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.io.IOException; import java.io.Serializable; import java.util.List; import java.util.concurrent.Executor; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** Class containing helper functions to interact with ZooKeeper. */ public class ZooKeeperUtils { @@ -80,6 +84,56 @@ public class ZooKeeperUtils { /** The prefix of the completed checkpoint file. */ public static final String HA_STORAGE_COMPLETED_CHECKPOINT = "completedCheckpoint"; + private static final String RESOURCE_MANAGER_LEADER = "/resource_manager"; + + private static final String DISPATCHER_LEADER = "/dispatcher"; + + private static final String LEADER_NODE = "/leader"; + + private static final String REST_SERVER_LEADER = "/rest_server"; + + public static String getLeaderPathForResourceManager() { + return getLeaderPath(RESOURCE_MANAGER_LEADER); + } + + public static String getLeaderPathForDispatcher() { + return getLeaderPath(DISPATCHER_LEADER); + } + + public static String getLeaderPathForRestServer() { + return getLeaderPath(REST_SERVER_LEADER); + } + + public static String getLeaderPathForJobManager(JobID jobId) { + return generateZookeeperPath( + generateZookeeperPath(getJobsPath(), getPathForJob(jobId)), LEADER_NODE); + } + + public static String getJobsPath() { + return "/jobs"; + } + + private static String getCheckpointsPath() { + return "/checkpoints"; + } + + public static String getCheckpointIdCounterPath() { + return "/checkpoint_id_counter"; + } + + private static String getLeaderPath(String suffix) { + return generateZookeeperPath(LEADER_NODE, suffix); + } + + @Nonnull + public static String generateConnectionInformationPath(String path) { + return generateZookeeperPath(path, "connection_info"); + } + + public static String generateLeaderLatchPath(String path) { + return generateZookeeperPath(path, "latch"); + } + /** * Starts a {@link CuratorFramework} instance and connects it to the given ZooKeeper quorum. * @@ -87,7 +141,7 @@ public class ZooKeeperUtils { * @return {@link CuratorFramework} instance */ public static CuratorFramework startCuratorFramework(Configuration configuration) { - Preconditions.checkNotNull(configuration, "configuration"); + checkNotNull(configuration, "configuration"); String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM); if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) { @@ -123,7 +177,7 @@ public static CuratorFramework startCuratorFramework(Configuration configuration if (disableSaslClient && aclMode == ZkClientACLMode.CREATOR) { String errorMessage = "Cannot set ACL role to " - + aclMode + + ZkClientACLMode.CREATOR + " since SASL authentication is " + "disabled through the " + SecurityOptions.ZOOKEEPER_SASL_DISABLE.key() @@ -193,12 +247,11 @@ public static String getZooKeeperEnsemble(Configuration flinkConf) * ZooKeeperLeaderRetrievalDriver}. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values * @return {@link DefaultLeaderRetrievalService} instance. */ public static DefaultLeaderRetrievalService createLeaderRetrievalService( - final CuratorFramework client, final Configuration configuration) { - return createLeaderRetrievalService(client, configuration, ""); + final CuratorFramework client) { + return createLeaderRetrievalService(client, ""); } /** @@ -206,46 +259,35 @@ public static DefaultLeaderRetrievalService createLeaderRetrievalService( * ZooKeeperLeaderRetrievalDriver}. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values - * @param pathSuffix The path suffix which we want to append + * @param path The path for the leader retrieval * @return {@link DefaultLeaderRetrievalService} instance. */ public static DefaultLeaderRetrievalService createLeaderRetrievalService( - final CuratorFramework client, - final Configuration configuration, - final String pathSuffix) { - return new DefaultLeaderRetrievalService( - createLeaderRetrievalDriverFactory(client, configuration, pathSuffix)); + final CuratorFramework client, final String path) { + return new DefaultLeaderRetrievalService(createLeaderRetrievalDriverFactory(client, path)); } /** * Creates a {@link LeaderRetrievalDriverFactory} implemented by ZooKeeper. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values * @return {@link LeaderRetrievalDriverFactory} instance. */ public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory( - final CuratorFramework client, final Configuration configuration) { - return createLeaderRetrievalDriverFactory(client, configuration, ""); + final CuratorFramework client) { + return createLeaderRetrievalDriverFactory(client, ""); } /** * Creates a {@link LeaderRetrievalDriverFactory} implemented by ZooKeeper. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values - * @param pathSuffix The path suffix which we want to append + * @param path The path for the leader zNode * @return {@link LeaderRetrievalDriverFactory} instance. */ public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory( - final CuratorFramework client, - final Configuration configuration, - final String pathSuffix) { - final String leaderPath = - configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) - + pathSuffix; - return new ZooKeeperLeaderRetrievalDriverFactory(client, leaderPath); + final CuratorFramework client, final String path) { + return new ZooKeeperLeaderRetrievalDriverFactory(client, path); } /** @@ -253,13 +295,12 @@ public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverF * ZooKeeperLeaderElectionDriver}. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values * @return {@link DefaultLeaderElectionService} instance. */ public static DefaultLeaderElectionService createLeaderElectionService( - CuratorFramework client, Configuration configuration) { + CuratorFramework client) { - return createLeaderElectionService(client, configuration, ""); + return createLeaderElectionService(client, ""); } /** @@ -267,50 +308,35 @@ public static DefaultLeaderElectionService createLeaderElectionService( * ZooKeeperLeaderElectionDriver}. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values - * @param pathSuffix The path suffix which we want to append + * @param path The path for the leader election * @return {@link DefaultLeaderElectionService} instance. */ public static DefaultLeaderElectionService createLeaderElectionService( - final CuratorFramework client, - final Configuration configuration, - final String pathSuffix) { - return new DefaultLeaderElectionService( - createLeaderElectionDriverFactory(client, configuration, pathSuffix)); + final CuratorFramework client, final String path) { + return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path)); } /** * Creates a {@link LeaderElectionDriverFactory} implemented by ZooKeeper. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values * @return {@link LeaderElectionDriverFactory} instance. */ public static ZooKeeperLeaderElectionDriverFactory createLeaderElectionDriverFactory( - final CuratorFramework client, final Configuration configuration) { - return createLeaderElectionDriverFactory(client, configuration, ""); + final CuratorFramework client) { + return createLeaderElectionDriverFactory(client, ""); } /** * Creates a {@link LeaderElectionDriverFactory} implemented by ZooKeeper. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values - * @param pathSuffix The path suffix which we want to append + * @param path The path suffix which we want to append * @return {@link LeaderElectionDriverFactory} instance. */ public static ZooKeeperLeaderElectionDriverFactory createLeaderElectionDriverFactory( - final CuratorFramework client, - final Configuration configuration, - final String pathSuffix) { - final String latchPath = - configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) - + pathSuffix; - final String leaderPath = - configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) - + pathSuffix; - - return new ZooKeeperLeaderElectionDriverFactory(client, latchPath, leaderPath); + final CuratorFramework client, final String path) { + return new ZooKeeperLeaderElectionDriverFactory(client, path); } /** @@ -359,7 +385,6 @@ public static JobGraphStore createJobGraphs( * * @param client The {@link CuratorFramework} ZooKeeper client to use * @param configuration {@link Configuration} object - * @param jobId ID of job to create the instance for * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain * @param executor to run ZooKeeper callbacks * @return {@link DefaultCompletedCheckpointStore} instance @@ -368,23 +393,17 @@ public static JobGraphStore createJobGraphs( public static CompletedCheckpointStore createCompletedCheckpoints( CuratorFramework client, Configuration configuration, - JobID jobId, int maxNumberOfCheckpointsToRetain, Executor executor) throws Exception { checkNotNull(configuration, "Configuration"); - String checkpointsPath = - configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH); - RetrievableStateStorageHelper stateStorage = createFileSystemStateStorage(configuration, HA_STORAGE_COMPLETED_CHECKPOINT); - checkpointsPath += getPathForJob(jobId); - final ZooKeeperStateHandleStore completedCheckpointStateHandleStore = - createZooKeeperStateHandleStore(client, checkpointsPath, stateStorage); + createZooKeeperStateHandleStore(client, getCheckpointsPath(), stateStorage); final CompletedCheckpointStore zooKeeperCompletedCheckpointStore = new DefaultCompletedCheckpointStore<>( maxNumberOfCheckpointsToRetain, @@ -396,7 +415,7 @@ public static CompletedCheckpointStore createCompletedCheckpoints( "Initialized {} in '{}' with {}.", DefaultCompletedCheckpointStore.class.getSimpleName(), completedCheckpointStateHandleStore, - checkpointsPath); + getCheckpointsPath()); return zooKeeperCompletedCheckpointStore; } @@ -431,21 +450,11 @@ ZooKeeperStateHandleStore createZooKeeperStateHandleStore( * Creates a {@link ZooKeeperCheckpointIDCounter} instance. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object - * @param jobId ID of job to create the instance for * @return {@link ZooKeeperCheckpointIDCounter} instance */ - public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter( - CuratorFramework client, Configuration configuration, JobID jobId) { - - String checkpointIdCounterPath = - configuration.getString( - HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH); - - checkpointIdCounterPath += getPathForJob(jobId); - + public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(CuratorFramework client) { return new ZooKeeperCheckpointIDCounter( - client, checkpointIdCounterPath, new DefaultLastStateConnectionStateListener()); + client, new DefaultLastStateConnectionStateListener()); } /** @@ -466,20 +475,40 @@ FileSystemStateStorageHelper createFileSystemStateStorage( prefix); } + /** Creates a ZooKeeper path of the form "/root/namespace". */ public static String generateZookeeperPath(String root, String namespace) { - if (!namespace.startsWith("/")) { - namespace = '/' + namespace; - } + final String result = + Stream.of(root, namespace) + .map(ZooKeeperUtils::trimSlashes) + .filter(s -> !s.isEmpty()) + .collect(Collectors.joining("/")); + + checkState( + !result.isEmpty(), + "Concatenating the root path %s and namespace %s resulted in an empty path. This is invalid.", + root, + namespace); + + return '/' + result; + } + + private static String trimSlashes(String input) { + int left = 0; + int right = input.length() - 1; - if (namespace.endsWith("/")) { - namespace = namespace.substring(0, namespace.length() - 1); + while (left < right && input.charAt(left) == '/') { + left++; } - if (root.endsWith("/")) { - root = root.substring(0, root.length() - 1); + while (right > left && input.charAt(right) == '/') { + right--; } - return root + namespace; + if (left < right) { + return input.substring(left, right); + } else { + return ""; + } } /** @@ -493,14 +522,17 @@ public static String generateZookeeperPath(String root, String namespace) { */ public static CuratorFramework useNamespaceAndEnsurePath( final CuratorFramework client, final String path) throws Exception { - Preconditions.checkNotNull(client, "client must not be null"); - Preconditions.checkNotNull(path, "path must not be null"); + checkNotNull(client, "client must not be null"); + checkNotNull(path, "path must not be null"); // Ensure that the checkpoints path exists client.newNamespaceAwareEnsurePath(path).ensure(client.getZookeeperClient()); // All operations will have the path as root - return client.usingNamespace(generateZookeeperPath(client.getNamespace(), path)); + return client.usingNamespace( + // Curator prepends a '/' manually and throws an Exception if the + // namespace starts with a '/'. + generateZookeeperPath(client.getNamespace(), path).substring(1)); } /** Secure {@link ACLProvider} implementation. */ @@ -530,10 +562,10 @@ public enum ZkClientACLMode { */ public static ZkClientACLMode fromConfig(Configuration config) { String aclMode = config.getString(HighAvailabilityOptions.ZOOKEEPER_CLIENT_ACL); - if (aclMode == null || aclMode.equalsIgnoreCase(ZkClientACLMode.OPEN.name())) { - return ZkClientACLMode.OPEN; - } else if (aclMode.equalsIgnoreCase(ZkClientACLMode.CREATOR.name())) { - return ZkClientACLMode.CREATOR; + if (aclMode == null || aclMode.equalsIgnoreCase(OPEN.name())) { + return OPEN; + } else if (aclMode.equalsIgnoreCase(CREATOR.name())) { + return CREATOR; } else { String message = "Unsupported ACL option: [" + aclMode + "] provided"; LOG.error(message); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java index d6a6eb5d42e18a..f0eb180cda4bfb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java @@ -64,7 +64,7 @@ public void testRecoveredAfterConnectionLoss() throws Exception { connectionLossLatch, reconnectedLatch); ZooKeeperCheckpointIDCounter idCounter = - new ZooKeeperCheckpointIDCounter(client, "/checkpoint-id-counter", listener); + new ZooKeeperCheckpointIDCounter(client, listener); idCounter.start(); AtomicLong localCounter = new AtomicLong(1L); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java index 29d7d93e8bf73e..55ec0920fddd4d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java @@ -51,34 +51,32 @@ public void cleanUp() throws Exception { /** Tests that counter node is removed from ZooKeeper after shutdown. */ @Test public void testShutdownRemovesState() throws Exception { - CheckpointIDCounter counter = createCheckpointIdCounter(); + ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter(); counter.start(); CuratorFramework client = ZooKeeper.getClient(); - assertNotNull(client.checkExists().forPath("/checkpoint-id-counter")); + assertNotNull(client.checkExists().forPath(counter.getPath())); counter.shutdown(JobStatus.FINISHED); - assertNull(client.checkExists().forPath("/checkpoint-id-counter")); + assertNull(client.checkExists().forPath(counter.getPath())); } /** Tests that counter node is NOT removed from ZooKeeper after suspend. */ @Test public void testSuspendKeepsState() throws Exception { - CheckpointIDCounter counter = createCheckpointIdCounter(); + ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter(); counter.start(); CuratorFramework client = ZooKeeper.getClient(); - assertNotNull(client.checkExists().forPath("/checkpoint-id-counter")); + assertNotNull(client.checkExists().forPath(counter.getPath())); counter.shutdown(JobStatus.SUSPENDED); - assertNotNull(client.checkExists().forPath("/checkpoint-id-counter")); + assertNotNull(client.checkExists().forPath(counter.getPath())); } @Override - protected CheckpointIDCounter createCheckpointIdCounter() throws Exception { + protected ZooKeeperCheckpointIDCounter createCheckpointIdCounter() throws Exception { return new ZooKeeperCheckpointIDCounter( - ZooKeeper.getClient(), - "/checkpoint-id-counter", - new DefaultLastStateConnectionStateListener()); + ZooKeeper.getClient(), new DefaultLastStateConnectionStateListener()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java index b9f5bb7331484f..e4582a2754e455 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java @@ -149,9 +149,7 @@ public void testResourceCleanupUnderLeadershipChange() throws Exception { new ZooKeeperRunningJobsRegistry(client, configuration)) .setDispatcherLeaderElectionService(dispatcherLeaderElectionService) .setJobMasterLeaderRetrieverFunction( - jobId -> - ZooKeeperUtils.createLeaderRetrievalService( - client, configuration)) + jobId -> ZooKeeperUtils.createLeaderRetrievalService(client)) .build()) { final PartialDispatcherServices partialDispatcherServices = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java index a90dc61c40a1bd..2810d0b4058a84 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java @@ -204,22 +204,22 @@ protected void internalCleanup() throws Exception { } @Override - protected String getLeaderNameForResourceManager() { + protected String getLeaderPathForResourceManager() { throw new UnsupportedOperationException("Not supported by this test implementation."); } @Override - protected String getLeaderNameForDispatcher() { + protected String getLeaderPathForDispatcher() { throw new UnsupportedOperationException("Not supported by this test implementation."); } @Override - protected String getLeaderNameForJobManager(JobID jobID) { + protected String getLeaderPathForJobManager(JobID jobID) { throw new UnsupportedOperationException("Not supported by this test implementation."); } @Override - protected String getLeaderNameForRestServer() { + protected String getLeaderPathForRestServer() { throw new UnsupportedOperationException("Not supported by this test implementation."); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java index cc1bc68ce21b58..db2117f5dabab7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java @@ -201,7 +201,7 @@ public void teardown() throws Exception { @Override public LeaderElectionService createLeaderElectionService() throws Exception { - return ZooKeeperUtils.createLeaderElectionService(client, configuration); + return ZooKeeperUtils.createLeaderElectionService(client); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java index 10f66986e558d5..8f1e01a369f547 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java @@ -95,7 +95,7 @@ public void testConnectionSuspendedHandlingDuringInitialization() throws Excepti LeaderRetrievalDriver leaderRetrievalDriver = null; try { leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(zooKeeperClient, config) + ZooKeeperUtils.createLeaderRetrievalDriverFactory(zooKeeperClient) .createLeaderRetrievalDriver( queueLeaderElectionListener, fatalErrorHandler); @@ -126,12 +126,12 @@ public void testConnectionSuspendedHandlingDuringInitialization() throws Excepti @Test public void testConnectionSuspendedHandling() throws Exception { - final String retrievalPath = "/testConnectionSuspendedHandling/leaderAddress"; + final String retrievalPath = "/testConnectionSuspendedHandling"; final String leaderAddress = "localhost"; final QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1); - LeaderRetrievalDriver leaderRetrievalDriver = null; + ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null; try { leaderRetrievalDriver = new ZooKeeperLeaderRetrievalDriver( @@ -140,7 +140,10 @@ public void testConnectionSuspendedHandling() throws Exception { queueLeaderElectionListener, fatalErrorHandler); - writeLeaderInformationToZooKeeper(retrievalPath, leaderAddress, UUID.randomUUID()); + writeLeaderInformationToZooKeeper( + leaderRetrievalDriver.getConnectionInformationPath(), + leaderAddress, + UUID.randomUUID()); // do the testing CompletableFuture firstAddress = queueLeaderElectionListener.next(); @@ -166,11 +169,10 @@ public void testConnectionSuspendedHandling() throws Exception { @Test public void testSameLeaderAfterReconnectTriggersListenerNotification() throws Exception { - final String retrievalPath = - "/testSameLeaderAfterReconnectTriggersListenerNotification/leaderAddress"; + final String retrievalPath = "/testSameLeaderAfterReconnectTriggersListenerNotification"; final QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1); - LeaderRetrievalDriver leaderRetrievalDriver = null; + ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null; try { leaderRetrievalDriver = new ZooKeeperLeaderRetrievalDriver( @@ -181,7 +183,8 @@ public void testSameLeaderAfterReconnectTriggersListenerNotification() throws Ex final String leaderAddress = "foobar"; final UUID sessionId = UUID.randomUUID(); - writeLeaderInformationToZooKeeper(retrievalPath, leaderAddress, sessionId); + writeLeaderInformationToZooKeeper( + leaderRetrievalDriver.getConnectionInformationPath(), leaderAddress, sessionId); // pop new leader queueLeaderElectionListener.next(); @@ -233,12 +236,11 @@ private byte[] createLeaderInformation(String leaderAddress, UUID sessionId) @Test public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exception { - final String retrievalPath = - "/testNewLeaderAfterReconnectTriggersListenerNotification/leaderAddress"; + final String retrievalPath = "/testNewLeaderAfterReconnectTriggersListenerNotification"; final QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1); - LeaderRetrievalDriver leaderRetrievalDriver = null; + ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null; try { leaderRetrievalDriver = new ZooKeeperLeaderRetrievalDriver( @@ -249,7 +251,8 @@ public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exc final String leaderAddress = "foobar"; final UUID sessionId = UUID.randomUUID(); - writeLeaderInformationToZooKeeper(retrievalPath, leaderAddress, sessionId); + writeLeaderInformationToZooKeeper( + leaderRetrievalDriver.getConnectionInformationPath(), leaderAddress, sessionId); // pop new leader queueLeaderElectionListener.next(); @@ -266,7 +269,10 @@ public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exc final String newLeaderAddress = "barfoo"; final UUID newSessionId = UUID.randomUUID(); - writeLeaderInformationToZooKeeper(retrievalPath, newLeaderAddress, newSessionId); + writeLeaderInformationToZooKeeper( + leaderRetrievalDriver.getConnectionInformationPath(), + newLeaderAddress, + newSessionId); // check that we find the new leader information eventually CommonTestUtils.waitUntilCondition( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index d8acb0ee5b25a6..b928ad58a95761 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -140,7 +140,7 @@ public void testZooKeeperLeaderElectionRetrieval() throws Exception { leaderElectionDriver = createAndInitLeaderElectionDriver(client, electionEventHandler); leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client, configuration) + ZooKeeperUtils.createLeaderRetrievalDriverFactory(client) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); @@ -183,16 +183,14 @@ public void testZooKeeperReelection() throws Exception { TestingListener listener = new TestingListener(); try { - leaderRetrievalService = - ZooKeeperUtils.createLeaderRetrievalService(client, configuration); + leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client); LOG.debug("Start leader retrieval service for the TestingListener."); leaderRetrievalService.start(listener); for (int i = 0; i < num; i++) { - leaderElectionService[i] = - ZooKeeperUtils.createLeaderElectionService(client, configuration); + leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client); contenders[i] = new TestingContender(createAddress(i), leaderElectionService[i]); LOG.debug("Start leader election service for contender #{}.", i); @@ -273,14 +271,12 @@ public void testZooKeeperReelectionWithReplacement() throws Exception { TestingListener listener = new TestingListener(); try { - leaderRetrievalService = - ZooKeeperUtils.createLeaderRetrievalService(client, configuration); + leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client); leaderRetrievalService.start(listener); for (int i = 0; i < num; i++) { - leaderElectionService[i] = - ZooKeeperUtils.createLeaderElectionService(client, configuration); + leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client); contenders[i] = new TestingContender(TEST_URL + "_" + i + "_0", leaderElectionService[i]); @@ -308,7 +304,7 @@ public void testZooKeeperReelectionWithReplacement() throws Exception { leaderElectionService[index].stop(); // create new leader election service which takes part in the leader election leaderElectionService[index] = - ZooKeeperUtils.createLeaderElectionService(client, configuration); + ZooKeeperUtils.createLeaderElectionService(client); contenders[index] = new TestingContender( TEST_URL + "_" + index + "_" + (lastTry + 1), @@ -341,16 +337,13 @@ public void testZooKeeperReelectionWithReplacement() throws Exception { @Test public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception { final String faultyContenderUrl = "faultyContender"; - final String leaderPath = "/leader"; - - configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH, leaderPath); final TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER); final TestingLeaderRetrievalEventHandler retrievalEventHandler = new TestingLeaderRetrievalEventHandler(); - LeaderElectionDriver leaderElectionDriver = null; + ZooKeeperLeaderElectionDriver leaderElectionDriver = null; LeaderRetrievalDriver leaderRetrievalDriver = null; CuratorFramework anotherClient = null; @@ -375,11 +368,14 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception { // overwrite the current leader address, the leader should notice that boolean dataWritten = false; + final String connectionInformationPath = + leaderElectionDriver.getConnectionInformationPath(); + while (!dataWritten) { - anotherClient.delete().forPath(leaderPath); + anotherClient.delete().forPath(connectionInformationPath); try { - anotherClient.create().forPath(leaderPath, baos.toByteArray()); + anotherClient.create().forPath(connectionInformationPath, baos.toByteArray()); dataWritten = true; } catch (KeeperException.NodeExistsException e) { @@ -389,7 +385,7 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception { // The faulty leader should be corrected on ZooKeeper leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client, configuration) + ZooKeeperUtils.createLeaderRetrievalDriverFactory(client) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); @@ -470,7 +466,7 @@ public void testExceptionForwarding() throws Exception { */ @Test public void testEphemeralZooKeeperNodes() throws Exception { - LeaderElectionDriver leaderElectionDriver = null; + ZooKeeperLeaderElectionDriver leaderElectionDriver = null; LeaderRetrievalDriver leaderRetrievalDriver = null; final TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER); @@ -487,13 +483,11 @@ public void testEphemeralZooKeeperNodes() throws Exception { leaderElectionDriver = createAndInitLeaderElectionDriver(client, electionEventHandler); leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client2, configuration) + ZooKeeperUtils.createLeaderRetrievalDriverFactory(client2) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); - final String leaderPath = - configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH); - cache = new NodeCache(client2, leaderPath); + cache = new NodeCache(client2, leaderElectionDriver.getConnectionInformationPath()); ExistsCacheListener existsListener = new ExistsCacheListener(cache); DeletedCacheListener deletedCacheListener = new DeletedCacheListener(cache); @@ -570,7 +564,7 @@ public void testNotLeaderShouldNotCleanUpTheLeaderInformation() throws Exception is(LeaderInformation.empty())); // The data on ZooKeeper it not be cleared leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client, configuration) + ZooKeeperUtils.createLeaderRetrievalDriverFactory(client) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); @@ -646,7 +640,7 @@ private ZooKeeperLeaderElectionDriver createAndInitLeaderElectionDriver( throws Exception { final ZooKeeperLeaderElectionDriver leaderElectionDriver = - ZooKeeperUtils.createLeaderElectionDriverFactory(client, configuration) + ZooKeeperUtils.createLeaderElectionDriverFactory(client) .createLeaderElectionDriver( electionEventHandler, electionEventHandler::handleError, TEST_URL); electionEventHandler.init(leaderElectionDriver);