diff --git a/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html b/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html index 1886ef38017834..084f64f9c57c3a 100644 --- a/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html +++ b/docs/layouts/shortcodes/generated/expert_high_availability_zk_section.html @@ -38,36 +38,12 @@ Integer Defines the session timeout for the ZooKeeper session in ms. - -
high-availability.zookeeper.path.checkpoint-counter
- "/checkpoint-counter" - String - ZooKeeper root path (ZNode) for checkpoint counters. - - -
high-availability.zookeeper.path.checkpoints
- "/checkpoints" - String - ZooKeeper root path (ZNode) for completed checkpoints. -
high-availability.zookeeper.path.jobgraphs
"/jobgraphs" String ZooKeeper root path (ZNode) for job graphs - -
high-availability.zookeeper.path.latch
- "/leaderlatch" - String - Defines the znode of the leader latch which is used to elect the leader. - - -
high-availability.zookeeper.path.leader
- "/leader" - String - Defines the znode of the leader which contains the URL to the leader and the current leader session ID. -
high-availability.zookeeper.path.mesos-workers
"/mesos-workers" diff --git a/docs/layouts/shortcodes/generated/high_availability_configuration.html b/docs/layouts/shortcodes/generated/high_availability_configuration.html index 95d4746f9851fc..93f0335c9336e0 100644 --- a/docs/layouts/shortcodes/generated/high_availability_configuration.html +++ b/docs/layouts/shortcodes/generated/high_availability_configuration.html @@ -62,36 +62,12 @@ Integer Defines the session timeout for the ZooKeeper session in ms. - -
high-availability.zookeeper.path.checkpoint-counter
- "/checkpoint-counter" - String - ZooKeeper root path (ZNode) for checkpoint counters. - - -
high-availability.zookeeper.path.checkpoints
- "/checkpoints" - String - ZooKeeper root path (ZNode) for completed checkpoints. -
high-availability.zookeeper.path.jobgraphs
"/jobgraphs" String ZooKeeper root path (ZNode) for job graphs - -
high-availability.zookeeper.path.latch
- "/leaderlatch" - String - Defines the znode of the leader latch which is used to elect the leader. - - -
high-availability.zookeeper.path.leader
- "/leader" - String - Defines the znode of the leader which contains the URL to the leader and the current leader session ID. -
high-availability.zookeeper.path.mesos-workers
"/mesos-workers" diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index a694e50f557787..1f64963b12d083 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -1013,7 +1013,7 @@ public final class ConfigConstants { public static final String HA_ZOOKEEPER_NAMESPACE_KEY = "high-availability.zookeeper.path.namespace"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */ + /** @deprecated no longer used. */ @PublicEvolving @Deprecated public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch"; @@ -1026,14 +1026,14 @@ public final class ConfigConstants { public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH = "high-availability.zookeeper.path.jobgraphs"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */ + /** @deprecated no longer used. */ @PublicEvolving @Deprecated public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader"; /** * ZooKeeper root path (ZNode) for completed checkpoints. * - * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. + * @deprecated no longer used. */ @PublicEvolving @Deprecated public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH = @@ -1042,7 +1042,7 @@ public final class ConfigConstants { /** * ZooKeeper root path (ZNode) for checkpoint counters. * - * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}. + * @deprecated no longer used. */ @PublicEvolving @Deprecated public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = @@ -1691,21 +1691,19 @@ public final class ConfigConstants { /** @deprecated in favor of {@link HighAvailabilityOptions#HA_CLUSTER_ID}. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */ + /** @deprecated no longer used. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */ + /** @deprecated no longer used. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader"; /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs"; - /** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. */ + /** @deprecated no longer used. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints"; - /** - * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH} - */ + /** @deprecated no longer used. */ @Deprecated public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter"; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java index e630d941cd5379..d6cfbc32fc1103 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java @@ -115,14 +115,6 @@ public class HighAvailabilityOptions { .withDescription( "The root path under which Flink stores its entries in ZooKeeper."); - @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) - public static final ConfigOption HA_ZOOKEEPER_LATCH_PATH = - key("high-availability.zookeeper.path.latch") - .defaultValue("/leaderlatch") - .withDeprecatedKeys("recovery.zookeeper.path.latch") - .withDescription( - "Defines the znode of the leader latch which is used to elect the leader."); - /** ZooKeeper root path (ZNode) for job graphs. */ @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) public static final ConfigOption HA_ZOOKEEPER_JOBGRAPHS_PATH = @@ -131,31 +123,6 @@ public class HighAvailabilityOptions { .withDeprecatedKeys("recovery.zookeeper.path.jobgraphs") .withDescription("ZooKeeper root path (ZNode) for job graphs"); - @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) - public static final ConfigOption HA_ZOOKEEPER_LEADER_PATH = - key("high-availability.zookeeper.path.leader") - .defaultValue("/leader") - .withDeprecatedKeys("recovery.zookeeper.path.leader") - .withDescription( - "Defines the znode of the leader which contains the URL to the leader and the current" - + " leader session ID."); - - /** ZooKeeper root path (ZNode) for completed checkpoints. */ - @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) - public static final ConfigOption HA_ZOOKEEPER_CHECKPOINTS_PATH = - key("high-availability.zookeeper.path.checkpoints") - .defaultValue("/checkpoints") - .withDeprecatedKeys("recovery.zookeeper.path.checkpoints") - .withDescription("ZooKeeper root path (ZNode) for completed checkpoints."); - - /** ZooKeeper root path (ZNode) for checkpoint counters. */ - @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) - public static final ConfigOption HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = - key("high-availability.zookeeper.path.checkpoint-counter") - .defaultValue("/checkpoint-counter") - .withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter") - .withDescription("ZooKeeper root path (ZNode) for checkpoint counters."); - /** ZooKeeper root path (ZNode) for Mesos workers. */ @PublicEvolving @Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java index f5ff366710251d..055ff1819bf50e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java @@ -112,20 +112,20 @@ public CheckpointRecoveryFactory createCheckpointRecoveryFactory() { kubeClient, configuration, ioExecutor, - this::getLeaderNameForJobManager, + this::getLeaderPathForJobManager, lockIdentity); } @Override public JobGraphStore createJobGraphStore() throws Exception { return KubernetesUtils.createJobGraphStore( - configuration, kubeClient, getLeaderNameForDispatcher(), lockIdentity); + configuration, kubeClient, getLeaderPathForDispatcher(), lockIdentity); } @Override public RunningJobsRegistry createRunningJobsRegistry() { return new KubernetesRunningJobsRegistry( - kubeClient, getLeaderNameForDispatcher(), lockIdentity); + kubeClient, getLeaderPathForDispatcher(), lockIdentity); } @Override @@ -143,21 +143,21 @@ public void internalCleanup() throws Exception { } @Override - protected String getLeaderNameForResourceManager() { + protected String getLeaderPathForResourceManager() { return getLeaderName(RESOURCE_MANAGER_NAME); } @Override - protected String getLeaderNameForDispatcher() { + protected String getLeaderPathForDispatcher() { return getLeaderName(DISPATCHER_NAME); } - public String getLeaderNameForJobManager(final JobID jobID) { + public String getLeaderPathForJobManager(final JobID jobID) { return getLeaderName(jobID.toString() + NAME_SEPARATOR + JOB_MANAGER_NAME); } @Override - protected String getLeaderNameForRestServer() { + protected String getLeaderPathForRestServer() { return getLeaderName(REST_SERVER_NAME); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java index 9fb480f51704e2..1e4180a4e804f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.shared.SharedCount; @@ -76,14 +78,11 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { * Creates a {@link ZooKeeperCheckpointIDCounter} instance. * * @param client Curator ZooKeeper client - * @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job. */ public ZooKeeperCheckpointIDCounter( - CuratorFramework client, - String counterPath, - LastStateConnectionStateListener connectionStateListener) { + CuratorFramework client, LastStateConnectionStateListener connectionStateListener) { this.client = checkNotNull(client, "Curator client"); - this.counterPath = checkNotNull(counterPath, "Counter path"); + this.counterPath = ZooKeeperUtils.getCheckpointIdCounterPath(); this.sharedCount = new SharedCount(client, counterPath, 1); this.connectionStateListener = connectionStateListener; } @@ -176,4 +175,9 @@ private void checkConnectionState() { } }); } + + @VisibleForTesting + String getPath() { + return counterPath; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java index 432865d27a6833..379d6a3bff96d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java @@ -51,11 +51,17 @@ public CompletedCheckpointStore createCheckpointStore( throws Exception { return ZooKeeperUtils.createCompletedCheckpoints( - client, config, jobId, maxNumberOfCheckpointsToRetain, executor); + ZooKeeperUtils.useNamespaceAndEnsurePath( + client, ZooKeeperUtils.getPathForJob(jobId)), + config, + maxNumberOfCheckpointsToRetain, + executor); } @Override public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception { - return ZooKeeperUtils.createCheckpointIDCounter(client, config, jobID); + return ZooKeeperUtils.createCheckpointIDCounter( + ZooKeeperUtils.useNamespaceAndEnsurePath( + client, ZooKeeperUtils.getPathForJob(jobID))); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java index 9a322a8d924875..e2c980668d7fb9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java @@ -38,9 +38,9 @@ /** * Abstract high availability services based on distributed system(e.g. Zookeeper, Kubernetes). It * will help with creating all the leader election/retrieval services and the cleanup. Please return - * a proper leader name int the implementation of {@link #getLeaderNameForResourceManager}, {@link - * #getLeaderNameForDispatcher}, {@link #getLeaderNameForJobManager}, {@link - * #getLeaderNameForRestServer}. The returned leader name is the ConfigMap name in Kubernetes and + * a proper leader name int the implementation of {@link #getLeaderPathForResourceManager}, {@link + * #getLeaderPathForDispatcher}, {@link #getLeaderPathForJobManager}, {@link + * #getLeaderPathForRestServer}. The returned leader name is the ConfigMap name in Kubernetes and * child path in Zookeeper. * *

{@link #close()} and {@link #closeAndCleanupAllData()} should be implemented to destroy the @@ -75,17 +75,17 @@ public AbstractHaServices( @Override public LeaderRetrievalService getResourceManagerLeaderRetriever() { - return createLeaderRetrievalService(getLeaderNameForResourceManager()); + return createLeaderRetrievalService(getLeaderPathForResourceManager()); } @Override public LeaderRetrievalService getDispatcherLeaderRetriever() { - return createLeaderRetrievalService(getLeaderNameForDispatcher()); + return createLeaderRetrievalService(getLeaderPathForDispatcher()); } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { - return createLeaderRetrievalService(getLeaderNameForJobManager(jobID)); + return createLeaderRetrievalService(getLeaderPathForJobManager(jobID)); } @Override @@ -96,31 +96,31 @@ public LeaderRetrievalService getJobManagerLeaderRetriever( @Override public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() { - return createLeaderRetrievalService(getLeaderNameForRestServer()); + return createLeaderRetrievalService(getLeaderPathForRestServer()); } @Override public LeaderElectionService getResourceManagerLeaderElectionService() { - return createLeaderElectionService(getLeaderNameForResourceManager()); + return createLeaderElectionService(getLeaderPathForResourceManager()); } @Override public LeaderElectionService getDispatcherLeaderElectionService() { - return createLeaderElectionService(getLeaderNameForDispatcher()); + return createLeaderElectionService(getLeaderPathForDispatcher()); } @Override public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { - return createLeaderElectionService(getLeaderNameForJobManager(jobID)); + return createLeaderElectionService(getLeaderPathForJobManager(jobID)); } @Override public LeaderElectionService getClusterRestEndpointLeaderElectionService() { - return createLeaderElectionService(getLeaderNameForRestServer()); + return createLeaderElectionService(getLeaderPathForRestServer()); } @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { return createCheckpointRecoveryFactory(); } @@ -226,7 +226,7 @@ public void closeAndCleanupAllData() throws Exception { * * @return Checkpoint recovery factory */ - protected abstract CheckpointRecoveryFactory createCheckpointRecoveryFactory(); + protected abstract CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws Exception; /** * Create the submitted job graph store for the job manager. @@ -266,7 +266,7 @@ public void closeAndCleanupAllData() throws Exception { * @return Return the ResourceManager leader name. It is ConfigMap name in Kubernetes or child * node path in Zookeeper. */ - protected abstract String getLeaderNameForResourceManager(); + protected abstract String getLeaderPathForResourceManager(); /** * Get the leader name for Dispatcher. @@ -274,7 +274,7 @@ public void closeAndCleanupAllData() throws Exception { * @return Return the Dispatcher leader name. It is ConfigMap name in Kubernetes or child node * path in Zookeeper. */ - protected abstract String getLeaderNameForDispatcher(); + protected abstract String getLeaderPathForDispatcher(); /** * Get the leader name for specific JobManager. @@ -283,7 +283,7 @@ public void closeAndCleanupAllData() throws Exception { * @return Return the JobManager leader name for specified job id. It is ConfigMap name in * Kubernetes or child node path in Zookeeper. */ - protected abstract String getLeaderNameForJobManager(final JobID jobID); + protected abstract String getLeaderPathForJobManager(final JobID jobID); /** * Get the leader name for RestServer. @@ -291,5 +291,5 @@ public void closeAndCleanupAllData() throws Exception { * @return Return the RestServer leader name. It is ConfigMap name in Kubernetes or child node * path in Zookeeper. */ - protected abstract String getLeaderNameForRestServer(); + protected abstract String getLeaderPathForRestServer(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index fdb3fa7dcb5359..650ab971a66ab9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -158,7 +158,7 @@ default LeaderElectionService getWebMonitorLeaderElectionService() { * * @return Checkpoint recovery factory */ - CheckpointRecoveryFactory getCheckpointRecoveryFactory(); + CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception; /** * Gets the submitted job graph store for the job manager. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index 0f6ebb05eb0e6a..3f134317879367 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -140,7 +140,7 @@ public static ClientHighAvailabilityServices createClientHAService(Configuration return new StandaloneClientHAServices(webMonitorAddress); case ZOOKEEPER: final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration); - return new ZooKeeperClientHAServices(client, configuration); + return new ZooKeeperClientHAServices(client); case FACTORY_CLASS: return createCustomClientHAServices(configuration); default: diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java index e82d2cb85a0c12..76002c8d667cd4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.highavailability.zookeeper; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.ZooKeeperUtils; @@ -30,21 +29,16 @@ /** ZooKeeper based implementation for {@link ClientHighAvailabilityServices}. */ public class ZooKeeperClientHAServices implements ClientHighAvailabilityServices { - private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock"; - private final CuratorFramework client; - private final Configuration configuration; - public ZooKeeperClientHAServices( - @Nonnull CuratorFramework client, @Nonnull Configuration configuration) { + public ZooKeeperClientHAServices(@Nonnull CuratorFramework client) { this.client = client; - this.configuration = configuration; } @Override public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService( - client, configuration, REST_SERVER_LEADER_PATH); + client, ZooKeeperUtils.getLeaderPathForRestServer()); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index f6ab9e859d630a..f9d43afafe9e38 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -47,21 +47,37 @@ * *

  * /flink
- *      +/cluster_id_1/resource_manager_lock
+ *      +/cluster_id_1/leader/resource_manager/latch
+ *      |            |                        /connection_info
+ *      |            |       /dispatcher/latch
+ *      |            |                  /connection_info
+ *      |            |       /rest_server/latch
+ *      |            |                   /connection_info
  *      |            |
- *      |            +/job-id-1/job_manager_lock
- *      |            |         /checkpoints/latest
- *      |            |                     /latest-1
- *      |            |                     /latest-2
  *      |            |
- *      |            +/job-id-2/job_manager_lock
+ *      |            +jobgraphs/job-id-1
+ *      |            |         /job-id-2
+ *      |            +jobs/job-id-1/leader/latch
+ *      |                 |               /connection_info
+ *      |                 |        /checkpoints/latest
+ *      |                 |                    /latest-1
+ *      |                 |                    /latest-2
+ *      |                 |       /checkpoint_id_counter
  *      |
- *      +/cluster_id_2/resource_manager_lock
- *                   |
- *                   +/job-id-1/job_manager_lock
- *                            |/checkpoints/latest
- *                            |            /latest-1
- *                            |/persisted_job_graph
+ *      +/cluster_id_2/leader/resource_manager/latch
+ *      |            |                        /connection_info
+ *      |            |       /dispatcher/latch
+ *      |            |                  /connection_info
+ *      |            |       /rest_server/latch
+ *      |            |                   /connection_info
+ *      |            |
+ *      |            +jobgraphs/job-id-2
+ *      |            +jobs/job-id-2/leader/latch
+ *      |                 |               /connection_info
+ *      |                 |        /checkpoints/latest
+ *      |                 |                    /latest-1
+ *      |                 |                    /latest-2
+ *      |                 |       /checkpoint_id_counter
  * 
* *

The root path "/flink" is configurable via the option {@link @@ -81,14 +97,6 @@ */ public class ZooKeeperHaServices extends AbstractHaServices { - private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock"; - - private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock"; - - private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock"; - - private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock"; - // ------------------------------------------------------------------------ /** The ZooKeeper client to use. */ @@ -104,8 +112,11 @@ public ZooKeeperHaServices( } @Override - public CheckpointRecoveryFactory createCheckpointRecoveryFactory() { - return new ZooKeeperCheckpointRecoveryFactory(client, configuration, ioExecutor); + public CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws Exception { + return new ZooKeeperCheckpointRecoveryFactory( + ZooKeeperUtils.useNamespaceAndEnsurePath(client, ZooKeeperUtils.getJobsPath()), + configuration, + ioExecutor); } @Override @@ -119,13 +130,13 @@ public RunningJobsRegistry createRunningJobsRegistry() { } @Override - protected LeaderElectionService createLeaderElectionService(String leaderName) { - return ZooKeeperUtils.createLeaderElectionService(client, configuration, leaderName); + protected LeaderElectionService createLeaderElectionService(String leaderPath) { + return ZooKeeperUtils.createLeaderElectionService(client, leaderPath); } @Override - protected LeaderRetrievalService createLeaderRetrievalService(String leaderName) { - return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, leaderName); + protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath) { + return ZooKeeperUtils.createLeaderRetrievalService(client, leaderPath); } @Override @@ -139,22 +150,22 @@ public void internalCleanup() throws Exception { } @Override - protected String getLeaderNameForResourceManager() { - return RESOURCE_MANAGER_LEADER_PATH; + protected String getLeaderPathForResourceManager() { + return ZooKeeperUtils.getLeaderPathForResourceManager(); } @Override - protected String getLeaderNameForDispatcher() { - return DISPATCHER_LEADER_PATH; + protected String getLeaderPathForDispatcher() { + return ZooKeeperUtils.getLeaderPathForDispatcher(); } - public String getLeaderNameForJobManager(final JobID jobID) { - return "/" + jobID + JOB_MANAGER_LEADER_PATH; + public String getLeaderPathForJobManager(final JobID jobID) { + return ZooKeeperUtils.getLeaderPathForJobManager(jobID); } @Override - protected String getLeaderNameForRestServer() { - return REST_SERVER_LEADER_PATH; + protected String getLeaderPathForRestServer() { + return ZooKeeperUtils.getLeaderPathForRestServer(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java index 855ebca247a38c..522f36eb93b856 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java @@ -18,7 +18,9 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; @@ -68,7 +70,7 @@ public class ZooKeeperLeaderElectionDriver private final NodeCache cache; /** ZooKeeper path of the node which stores the current leader information. */ - private final String leaderPath; + private final String connectionInformationPath; private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState); @@ -85,29 +87,27 @@ public class ZooKeeperLeaderElectionDriver * Creates a ZooKeeperLeaderElectionDriver object. * * @param client Client which is connected to the ZooKeeper quorum - * @param latchPath ZooKeeper node path for the leader election latch - * @param leaderPath ZooKeeper node path for the node which stores the current leader - * information + * @param path ZooKeeper node path for the leader election * @param leaderElectionEventHandler Event handler for processing leader change events * @param fatalErrorHandler Fatal error handler * @param leaderContenderDescription Leader contender description */ public ZooKeeperLeaderElectionDriver( CuratorFramework client, - String latchPath, - String leaderPath, + String path, LeaderElectionEventHandler leaderElectionEventHandler, FatalErrorHandler fatalErrorHandler, String leaderContenderDescription) throws Exception { + checkNotNull(path); this.client = checkNotNull(client); - this.leaderPath = checkNotNull(leaderPath); + this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path); this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.leaderContenderDescription = checkNotNull(leaderContenderDescription); - leaderLatch = new LeaderLatch(client, checkNotNull(latchPath)); - cache = new NodeCache(client, leaderPath); + leaderLatch = new LeaderLatch(client, ZooKeeperUtils.generateLeaderLatchPath(path)); + cache = new NodeCache(client, connectionInformationPath); client.getUnhandledErrorListenable().addListener(this); @@ -220,7 +220,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) { boolean dataWritten = false; while (!dataWritten && leaderLatch.hasLeadership()) { - Stat stat = client.checkExists().forPath(leaderPath); + Stat stat = client.checkExists().forPath(connectionInformationPath); if (stat != null) { long owner = stat.getEphemeralOwner(); @@ -228,7 +228,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) { if (owner == sessionID) { try { - client.setData().forPath(leaderPath, baos.toByteArray()); + client.setData().forPath(connectionInformationPath, baos.toByteArray()); dataWritten = true; } catch (KeeperException.NoNodeException noNode) { @@ -236,7 +236,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) { } } else { try { - client.delete().forPath(leaderPath); + client.delete().forPath(connectionInformationPath); } catch (KeeperException.NoNodeException noNode) { // node was deleted in the meantime --> try again } @@ -246,7 +246,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) - .forPath(leaderPath, baos.toByteArray()); + .forPath(connectionInformationPath, baos.toByteArray()); dataWritten = true; } catch (KeeperException.NodeExistsException nodeExists) { @@ -301,6 +301,15 @@ public void unhandledError(String message, Throwable e) { @Override public String toString() { - return "ZooKeeperLeaderElectionDriver{" + "leaderPath='" + leaderPath + '\'' + '}'; + return "ZooKeeperLeaderElectionDriver{" + + "leaderPath='" + + connectionInformationPath + + '\'' + + '}'; + } + + @VisibleForTesting + String getConnectionInformationPath() { + return connectionInformationPath; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java index a408882ef43e05..eef7d0cb3547e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java @@ -27,15 +27,11 @@ public class ZooKeeperLeaderElectionDriverFactory implements LeaderElectionDrive private final CuratorFramework client; - private final String latchPath; + private final String path; - private final String leaderPath; - - public ZooKeeperLeaderElectionDriverFactory( - CuratorFramework client, String latchPath, String leaderPath) { + public ZooKeeperLeaderElectionDriverFactory(CuratorFramework client, String path) { this.client = client; - this.latchPath = latchPath; - this.leaderPath = leaderPath; + this.path = path; } @Override @@ -45,11 +41,6 @@ public ZooKeeperLeaderElectionDriver createLeaderElectionDriver( String leaderContenderDescription) throws Exception { return new ZooKeeperLeaderElectionDriver( - client, - latchPath, - leaderPath, - leaderEventHandler, - fatalErrorHandler, - leaderContenderDescription); + client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java index f447a17e1ce4e7..c1dc1ff21405a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java @@ -18,9 +18,11 @@ package org.apache.flink.runtime.leaderretrieval; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.leaderelection.LeaderInformation; import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; @@ -57,7 +59,7 @@ public class ZooKeeperLeaderRetrievalDriver /** Curator recipe to watch changes of a specific ZooKeeper node. */ private final NodeCache cache; - private final String retrievalPath; + private final String connectionInformationPath; private final ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState); @@ -72,19 +74,19 @@ public class ZooKeeperLeaderRetrievalDriver * Creates a leader retrieval service which uses ZooKeeper to retrieve the leader information. * * @param client Client which constitutes the connection to the ZooKeeper quorum - * @param retrievalPath Path of the ZooKeeper node which contains the leader information + * @param path Path of the ZooKeeper node which contains the leader information * @param leaderRetrievalEventHandler Handler to notify the leader changes. * @param fatalErrorHandler Fatal error handler */ public ZooKeeperLeaderRetrievalDriver( CuratorFramework client, - String retrievalPath, + String path, LeaderRetrievalEventHandler leaderRetrievalEventHandler, FatalErrorHandler fatalErrorHandler) throws Exception { this.client = checkNotNull(client, "CuratorFramework client"); - this.cache = new NodeCache(client, retrievalPath); - this.retrievalPath = checkNotNull(retrievalPath); + this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path); + this.cache = new NodeCache(client, connectionInformationPath); this.leaderRetrievalEventHandler = checkNotNull(leaderRetrievalEventHandler); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); @@ -188,6 +190,15 @@ public void unhandledError(String s, Throwable throwable) { @Override public String toString() { - return "ZookeeperLeaderRetrievalDriver{" + "retrievalPath='" + retrievalPath + '\'' + '}'; + return "ZookeeperLeaderRetrievalDriver{" + + "retrievalPath='" + + connectionInformationPath + + '\'' + + '}'; + } + + @VisibleForTesting + public String getConnectionInformationPath() { + return connectionInformationPath; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 0af2248340ffed..4164942b93e580 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -47,7 +47,6 @@ import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper; import org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFrameworkFactory; @@ -62,12 +61,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.io.IOException; import java.io.Serializable; import java.util.List; import java.util.concurrent.Executor; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** Class containing helper functions to interact with ZooKeeper. */ public class ZooKeeperUtils { @@ -80,6 +84,56 @@ public class ZooKeeperUtils { /** The prefix of the completed checkpoint file. */ public static final String HA_STORAGE_COMPLETED_CHECKPOINT = "completedCheckpoint"; + private static final String RESOURCE_MANAGER_LEADER = "/resource_manager"; + + private static final String DISPATCHER_LEADER = "/dispatcher"; + + private static final String LEADER_NODE = "/leader"; + + private static final String REST_SERVER_LEADER = "/rest_server"; + + public static String getLeaderPathForResourceManager() { + return getLeaderPath(RESOURCE_MANAGER_LEADER); + } + + public static String getLeaderPathForDispatcher() { + return getLeaderPath(DISPATCHER_LEADER); + } + + public static String getLeaderPathForRestServer() { + return getLeaderPath(REST_SERVER_LEADER); + } + + public static String getLeaderPathForJobManager(JobID jobId) { + return generateZookeeperPath( + generateZookeeperPath(getJobsPath(), getPathForJob(jobId)), LEADER_NODE); + } + + public static String getJobsPath() { + return "/jobs"; + } + + private static String getCheckpointsPath() { + return "/checkpoints"; + } + + public static String getCheckpointIdCounterPath() { + return "/checkpoint_id_counter"; + } + + private static String getLeaderPath(String suffix) { + return generateZookeeperPath(LEADER_NODE, suffix); + } + + @Nonnull + public static String generateConnectionInformationPath(String path) { + return generateZookeeperPath(path, "connection_info"); + } + + public static String generateLeaderLatchPath(String path) { + return generateZookeeperPath(path, "latch"); + } + /** * Starts a {@link CuratorFramework} instance and connects it to the given ZooKeeper quorum. * @@ -87,7 +141,7 @@ public class ZooKeeperUtils { * @return {@link CuratorFramework} instance */ public static CuratorFramework startCuratorFramework(Configuration configuration) { - Preconditions.checkNotNull(configuration, "configuration"); + checkNotNull(configuration, "configuration"); String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM); if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) { @@ -123,7 +177,7 @@ public static CuratorFramework startCuratorFramework(Configuration configuration if (disableSaslClient && aclMode == ZkClientACLMode.CREATOR) { String errorMessage = "Cannot set ACL role to " - + aclMode + + ZkClientACLMode.CREATOR + " since SASL authentication is " + "disabled through the " + SecurityOptions.ZOOKEEPER_SASL_DISABLE.key() @@ -193,12 +247,11 @@ public static String getZooKeeperEnsemble(Configuration flinkConf) * ZooKeeperLeaderRetrievalDriver}. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values * @return {@link DefaultLeaderRetrievalService} instance. */ public static DefaultLeaderRetrievalService createLeaderRetrievalService( - final CuratorFramework client, final Configuration configuration) { - return createLeaderRetrievalService(client, configuration, ""); + final CuratorFramework client) { + return createLeaderRetrievalService(client, ""); } /** @@ -206,46 +259,35 @@ public static DefaultLeaderRetrievalService createLeaderRetrievalService( * ZooKeeperLeaderRetrievalDriver}. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values - * @param pathSuffix The path suffix which we want to append + * @param path The path for the leader retrieval * @return {@link DefaultLeaderRetrievalService} instance. */ public static DefaultLeaderRetrievalService createLeaderRetrievalService( - final CuratorFramework client, - final Configuration configuration, - final String pathSuffix) { - return new DefaultLeaderRetrievalService( - createLeaderRetrievalDriverFactory(client, configuration, pathSuffix)); + final CuratorFramework client, final String path) { + return new DefaultLeaderRetrievalService(createLeaderRetrievalDriverFactory(client, path)); } /** * Creates a {@link LeaderRetrievalDriverFactory} implemented by ZooKeeper. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values * @return {@link LeaderRetrievalDriverFactory} instance. */ public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory( - final CuratorFramework client, final Configuration configuration) { - return createLeaderRetrievalDriverFactory(client, configuration, ""); + final CuratorFramework client) { + return createLeaderRetrievalDriverFactory(client, ""); } /** * Creates a {@link LeaderRetrievalDriverFactory} implemented by ZooKeeper. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values - * @param pathSuffix The path suffix which we want to append + * @param path The path for the leader zNode * @return {@link LeaderRetrievalDriverFactory} instance. */ public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory( - final CuratorFramework client, - final Configuration configuration, - final String pathSuffix) { - final String leaderPath = - configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) - + pathSuffix; - return new ZooKeeperLeaderRetrievalDriverFactory(client, leaderPath); + final CuratorFramework client, final String path) { + return new ZooKeeperLeaderRetrievalDriverFactory(client, path); } /** @@ -253,13 +295,12 @@ public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverF * ZooKeeperLeaderElectionDriver}. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values * @return {@link DefaultLeaderElectionService} instance. */ public static DefaultLeaderElectionService createLeaderElectionService( - CuratorFramework client, Configuration configuration) { + CuratorFramework client) { - return createLeaderElectionService(client, configuration, ""); + return createLeaderElectionService(client, ""); } /** @@ -267,50 +308,35 @@ public static DefaultLeaderElectionService createLeaderElectionService( * ZooKeeperLeaderElectionDriver}. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values - * @param pathSuffix The path suffix which we want to append + * @param path The path for the leader election * @return {@link DefaultLeaderElectionService} instance. */ public static DefaultLeaderElectionService createLeaderElectionService( - final CuratorFramework client, - final Configuration configuration, - final String pathSuffix) { - return new DefaultLeaderElectionService( - createLeaderElectionDriverFactory(client, configuration, pathSuffix)); + final CuratorFramework client, final String path) { + return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path)); } /** * Creates a {@link LeaderElectionDriverFactory} implemented by ZooKeeper. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values * @return {@link LeaderElectionDriverFactory} instance. */ public static ZooKeeperLeaderElectionDriverFactory createLeaderElectionDriverFactory( - final CuratorFramework client, final Configuration configuration) { - return createLeaderElectionDriverFactory(client, configuration, ""); + final CuratorFramework client) { + return createLeaderElectionDriverFactory(client, ""); } /** * Creates a {@link LeaderElectionDriverFactory} implemented by ZooKeeper. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object containing the configuration values - * @param pathSuffix The path suffix which we want to append + * @param path The path suffix which we want to append * @return {@link LeaderElectionDriverFactory} instance. */ public static ZooKeeperLeaderElectionDriverFactory createLeaderElectionDriverFactory( - final CuratorFramework client, - final Configuration configuration, - final String pathSuffix) { - final String latchPath = - configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) - + pathSuffix; - final String leaderPath = - configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) - + pathSuffix; - - return new ZooKeeperLeaderElectionDriverFactory(client, latchPath, leaderPath); + final CuratorFramework client, final String path) { + return new ZooKeeperLeaderElectionDriverFactory(client, path); } /** @@ -359,7 +385,6 @@ public static JobGraphStore createJobGraphs( * * @param client The {@link CuratorFramework} ZooKeeper client to use * @param configuration {@link Configuration} object - * @param jobId ID of job to create the instance for * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain * @param executor to run ZooKeeper callbacks * @return {@link DefaultCompletedCheckpointStore} instance @@ -368,23 +393,17 @@ public static JobGraphStore createJobGraphs( public static CompletedCheckpointStore createCompletedCheckpoints( CuratorFramework client, Configuration configuration, - JobID jobId, int maxNumberOfCheckpointsToRetain, Executor executor) throws Exception { checkNotNull(configuration, "Configuration"); - String checkpointsPath = - configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH); - RetrievableStateStorageHelper stateStorage = createFileSystemStateStorage(configuration, HA_STORAGE_COMPLETED_CHECKPOINT); - checkpointsPath += getPathForJob(jobId); - final ZooKeeperStateHandleStore completedCheckpointStateHandleStore = - createZooKeeperStateHandleStore(client, checkpointsPath, stateStorage); + createZooKeeperStateHandleStore(client, getCheckpointsPath(), stateStorage); final CompletedCheckpointStore zooKeeperCompletedCheckpointStore = new DefaultCompletedCheckpointStore<>( maxNumberOfCheckpointsToRetain, @@ -396,7 +415,7 @@ public static CompletedCheckpointStore createCompletedCheckpoints( "Initialized {} in '{}' with {}.", DefaultCompletedCheckpointStore.class.getSimpleName(), completedCheckpointStateHandleStore, - checkpointsPath); + getCheckpointsPath()); return zooKeeperCompletedCheckpointStore; } @@ -431,21 +450,11 @@ ZooKeeperStateHandleStore createZooKeeperStateHandleStore( * Creates a {@link ZooKeeperCheckpointIDCounter} instance. * * @param client The {@link CuratorFramework} ZooKeeper client to use - * @param configuration {@link Configuration} object - * @param jobId ID of job to create the instance for * @return {@link ZooKeeperCheckpointIDCounter} instance */ - public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter( - CuratorFramework client, Configuration configuration, JobID jobId) { - - String checkpointIdCounterPath = - configuration.getString( - HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH); - - checkpointIdCounterPath += getPathForJob(jobId); - + public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(CuratorFramework client) { return new ZooKeeperCheckpointIDCounter( - client, checkpointIdCounterPath, new DefaultLastStateConnectionStateListener()); + client, new DefaultLastStateConnectionStateListener()); } /** @@ -466,20 +475,40 @@ FileSystemStateStorageHelper createFileSystemStateStorage( prefix); } + /** Creates a ZooKeeper path of the form "/root/namespace". */ public static String generateZookeeperPath(String root, String namespace) { - if (!namespace.startsWith("/")) { - namespace = '/' + namespace; - } + final String result = + Stream.of(root, namespace) + .map(ZooKeeperUtils::trimSlashes) + .filter(s -> !s.isEmpty()) + .collect(Collectors.joining("/")); + + checkState( + !result.isEmpty(), + "Concatenating the root path %s and namespace %s resulted in an empty path. This is invalid.", + root, + namespace); + + return '/' + result; + } + + private static String trimSlashes(String input) { + int left = 0; + int right = input.length() - 1; - if (namespace.endsWith("/")) { - namespace = namespace.substring(0, namespace.length() - 1); + while (left < right && input.charAt(left) == '/') { + left++; } - if (root.endsWith("/")) { - root = root.substring(0, root.length() - 1); + while (right > left && input.charAt(right) == '/') { + right--; } - return root + namespace; + if (left < right) { + return input.substring(left, right); + } else { + return ""; + } } /** @@ -493,14 +522,17 @@ public static String generateZookeeperPath(String root, String namespace) { */ public static CuratorFramework useNamespaceAndEnsurePath( final CuratorFramework client, final String path) throws Exception { - Preconditions.checkNotNull(client, "client must not be null"); - Preconditions.checkNotNull(path, "path must not be null"); + checkNotNull(client, "client must not be null"); + checkNotNull(path, "path must not be null"); // Ensure that the checkpoints path exists client.newNamespaceAwareEnsurePath(path).ensure(client.getZookeeperClient()); // All operations will have the path as root - return client.usingNamespace(generateZookeeperPath(client.getNamespace(), path)); + return client.usingNamespace( + // Curator prepends a '/' manually and throws an Exception if the + // namespace starts with a '/'. + generateZookeeperPath(client.getNamespace(), path).substring(1)); } /** Secure {@link ACLProvider} implementation. */ @@ -530,10 +562,10 @@ public enum ZkClientACLMode { */ public static ZkClientACLMode fromConfig(Configuration config) { String aclMode = config.getString(HighAvailabilityOptions.ZOOKEEPER_CLIENT_ACL); - if (aclMode == null || aclMode.equalsIgnoreCase(ZkClientACLMode.OPEN.name())) { - return ZkClientACLMode.OPEN; - } else if (aclMode.equalsIgnoreCase(ZkClientACLMode.CREATOR.name())) { - return ZkClientACLMode.CREATOR; + if (aclMode == null || aclMode.equalsIgnoreCase(OPEN.name())) { + return OPEN; + } else if (aclMode.equalsIgnoreCase(CREATOR.name())) { + return CREATOR; } else { String message = "Unsupported ACL option: [" + aclMode + "] provided"; LOG.error(message); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java index d6a6eb5d42e18a..f0eb180cda4bfb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java @@ -64,7 +64,7 @@ public void testRecoveredAfterConnectionLoss() throws Exception { connectionLossLatch, reconnectedLatch); ZooKeeperCheckpointIDCounter idCounter = - new ZooKeeperCheckpointIDCounter(client, "/checkpoint-id-counter", listener); + new ZooKeeperCheckpointIDCounter(client, listener); idCounter.start(); AtomicLong localCounter = new AtomicLong(1L); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java index 29d7d93e8bf73e..55ec0920fddd4d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java @@ -51,34 +51,32 @@ public void cleanUp() throws Exception { /** Tests that counter node is removed from ZooKeeper after shutdown. */ @Test public void testShutdownRemovesState() throws Exception { - CheckpointIDCounter counter = createCheckpointIdCounter(); + ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter(); counter.start(); CuratorFramework client = ZooKeeper.getClient(); - assertNotNull(client.checkExists().forPath("/checkpoint-id-counter")); + assertNotNull(client.checkExists().forPath(counter.getPath())); counter.shutdown(JobStatus.FINISHED); - assertNull(client.checkExists().forPath("/checkpoint-id-counter")); + assertNull(client.checkExists().forPath(counter.getPath())); } /** Tests that counter node is NOT removed from ZooKeeper after suspend. */ @Test public void testSuspendKeepsState() throws Exception { - CheckpointIDCounter counter = createCheckpointIdCounter(); + ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter(); counter.start(); CuratorFramework client = ZooKeeper.getClient(); - assertNotNull(client.checkExists().forPath("/checkpoint-id-counter")); + assertNotNull(client.checkExists().forPath(counter.getPath())); counter.shutdown(JobStatus.SUSPENDED); - assertNotNull(client.checkExists().forPath("/checkpoint-id-counter")); + assertNotNull(client.checkExists().forPath(counter.getPath())); } @Override - protected CheckpointIDCounter createCheckpointIdCounter() throws Exception { + protected ZooKeeperCheckpointIDCounter createCheckpointIdCounter() throws Exception { return new ZooKeeperCheckpointIDCounter( - ZooKeeper.getClient(), - "/checkpoint-id-counter", - new DefaultLastStateConnectionStateListener()); + ZooKeeper.getClient(), new DefaultLastStateConnectionStateListener()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java index b9f5bb7331484f..e4582a2754e455 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java @@ -149,9 +149,7 @@ public void testResourceCleanupUnderLeadershipChange() throws Exception { new ZooKeeperRunningJobsRegistry(client, configuration)) .setDispatcherLeaderElectionService(dispatcherLeaderElectionService) .setJobMasterLeaderRetrieverFunction( - jobId -> - ZooKeeperUtils.createLeaderRetrievalService( - client, configuration)) + jobId -> ZooKeeperUtils.createLeaderRetrievalService(client)) .build()) { final PartialDispatcherServices partialDispatcherServices = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java index a90dc61c40a1bd..2810d0b4058a84 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java @@ -204,22 +204,22 @@ protected void internalCleanup() throws Exception { } @Override - protected String getLeaderNameForResourceManager() { + protected String getLeaderPathForResourceManager() { throw new UnsupportedOperationException("Not supported by this test implementation."); } @Override - protected String getLeaderNameForDispatcher() { + protected String getLeaderPathForDispatcher() { throw new UnsupportedOperationException("Not supported by this test implementation."); } @Override - protected String getLeaderNameForJobManager(JobID jobID) { + protected String getLeaderPathForJobManager(JobID jobID) { throw new UnsupportedOperationException("Not supported by this test implementation."); } @Override - protected String getLeaderNameForRestServer() { + protected String getLeaderPathForRestServer() { throw new UnsupportedOperationException("Not supported by this test implementation."); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java index cc1bc68ce21b58..db2117f5dabab7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java @@ -201,7 +201,7 @@ public void teardown() throws Exception { @Override public LeaderElectionService createLeaderElectionService() throws Exception { - return ZooKeeperUtils.createLeaderElectionService(client, configuration); + return ZooKeeperUtils.createLeaderElectionService(client); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java index 10f66986e558d5..8f1e01a369f547 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java @@ -95,7 +95,7 @@ public void testConnectionSuspendedHandlingDuringInitialization() throws Excepti LeaderRetrievalDriver leaderRetrievalDriver = null; try { leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(zooKeeperClient, config) + ZooKeeperUtils.createLeaderRetrievalDriverFactory(zooKeeperClient) .createLeaderRetrievalDriver( queueLeaderElectionListener, fatalErrorHandler); @@ -126,12 +126,12 @@ public void testConnectionSuspendedHandlingDuringInitialization() throws Excepti @Test public void testConnectionSuspendedHandling() throws Exception { - final String retrievalPath = "/testConnectionSuspendedHandling/leaderAddress"; + final String retrievalPath = "/testConnectionSuspendedHandling"; final String leaderAddress = "localhost"; final QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1); - LeaderRetrievalDriver leaderRetrievalDriver = null; + ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null; try { leaderRetrievalDriver = new ZooKeeperLeaderRetrievalDriver( @@ -140,7 +140,10 @@ public void testConnectionSuspendedHandling() throws Exception { queueLeaderElectionListener, fatalErrorHandler); - writeLeaderInformationToZooKeeper(retrievalPath, leaderAddress, UUID.randomUUID()); + writeLeaderInformationToZooKeeper( + leaderRetrievalDriver.getConnectionInformationPath(), + leaderAddress, + UUID.randomUUID()); // do the testing CompletableFuture firstAddress = queueLeaderElectionListener.next(); @@ -166,11 +169,10 @@ public void testConnectionSuspendedHandling() throws Exception { @Test public void testSameLeaderAfterReconnectTriggersListenerNotification() throws Exception { - final String retrievalPath = - "/testSameLeaderAfterReconnectTriggersListenerNotification/leaderAddress"; + final String retrievalPath = "/testSameLeaderAfterReconnectTriggersListenerNotification"; final QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1); - LeaderRetrievalDriver leaderRetrievalDriver = null; + ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null; try { leaderRetrievalDriver = new ZooKeeperLeaderRetrievalDriver( @@ -181,7 +183,8 @@ public void testSameLeaderAfterReconnectTriggersListenerNotification() throws Ex final String leaderAddress = "foobar"; final UUID sessionId = UUID.randomUUID(); - writeLeaderInformationToZooKeeper(retrievalPath, leaderAddress, sessionId); + writeLeaderInformationToZooKeeper( + leaderRetrievalDriver.getConnectionInformationPath(), leaderAddress, sessionId); // pop new leader queueLeaderElectionListener.next(); @@ -233,12 +236,11 @@ private byte[] createLeaderInformation(String leaderAddress, UUID sessionId) @Test public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exception { - final String retrievalPath = - "/testNewLeaderAfterReconnectTriggersListenerNotification/leaderAddress"; + final String retrievalPath = "/testNewLeaderAfterReconnectTriggersListenerNotification"; final QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1); - LeaderRetrievalDriver leaderRetrievalDriver = null; + ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null; try { leaderRetrievalDriver = new ZooKeeperLeaderRetrievalDriver( @@ -249,7 +251,8 @@ public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exc final String leaderAddress = "foobar"; final UUID sessionId = UUID.randomUUID(); - writeLeaderInformationToZooKeeper(retrievalPath, leaderAddress, sessionId); + writeLeaderInformationToZooKeeper( + leaderRetrievalDriver.getConnectionInformationPath(), leaderAddress, sessionId); // pop new leader queueLeaderElectionListener.next(); @@ -266,7 +269,10 @@ public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exc final String newLeaderAddress = "barfoo"; final UUID newSessionId = UUID.randomUUID(); - writeLeaderInformationToZooKeeper(retrievalPath, newLeaderAddress, newSessionId); + writeLeaderInformationToZooKeeper( + leaderRetrievalDriver.getConnectionInformationPath(), + newLeaderAddress, + newSessionId); // check that we find the new leader information eventually CommonTestUtils.waitUntilCondition( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index d8acb0ee5b25a6..b928ad58a95761 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -140,7 +140,7 @@ public void testZooKeeperLeaderElectionRetrieval() throws Exception { leaderElectionDriver = createAndInitLeaderElectionDriver(client, electionEventHandler); leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client, configuration) + ZooKeeperUtils.createLeaderRetrievalDriverFactory(client) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); @@ -183,16 +183,14 @@ public void testZooKeeperReelection() throws Exception { TestingListener listener = new TestingListener(); try { - leaderRetrievalService = - ZooKeeperUtils.createLeaderRetrievalService(client, configuration); + leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client); LOG.debug("Start leader retrieval service for the TestingListener."); leaderRetrievalService.start(listener); for (int i = 0; i < num; i++) { - leaderElectionService[i] = - ZooKeeperUtils.createLeaderElectionService(client, configuration); + leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client); contenders[i] = new TestingContender(createAddress(i), leaderElectionService[i]); LOG.debug("Start leader election service for contender #{}.", i); @@ -273,14 +271,12 @@ public void testZooKeeperReelectionWithReplacement() throws Exception { TestingListener listener = new TestingListener(); try { - leaderRetrievalService = - ZooKeeperUtils.createLeaderRetrievalService(client, configuration); + leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client); leaderRetrievalService.start(listener); for (int i = 0; i < num; i++) { - leaderElectionService[i] = - ZooKeeperUtils.createLeaderElectionService(client, configuration); + leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client); contenders[i] = new TestingContender(TEST_URL + "_" + i + "_0", leaderElectionService[i]); @@ -308,7 +304,7 @@ public void testZooKeeperReelectionWithReplacement() throws Exception { leaderElectionService[index].stop(); // create new leader election service which takes part in the leader election leaderElectionService[index] = - ZooKeeperUtils.createLeaderElectionService(client, configuration); + ZooKeeperUtils.createLeaderElectionService(client); contenders[index] = new TestingContender( TEST_URL + "_" + index + "_" + (lastTry + 1), @@ -341,16 +337,13 @@ public void testZooKeeperReelectionWithReplacement() throws Exception { @Test public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception { final String faultyContenderUrl = "faultyContender"; - final String leaderPath = "/leader"; - - configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH, leaderPath); final TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER); final TestingLeaderRetrievalEventHandler retrievalEventHandler = new TestingLeaderRetrievalEventHandler(); - LeaderElectionDriver leaderElectionDriver = null; + ZooKeeperLeaderElectionDriver leaderElectionDriver = null; LeaderRetrievalDriver leaderRetrievalDriver = null; CuratorFramework anotherClient = null; @@ -375,11 +368,14 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception { // overwrite the current leader address, the leader should notice that boolean dataWritten = false; + final String connectionInformationPath = + leaderElectionDriver.getConnectionInformationPath(); + while (!dataWritten) { - anotherClient.delete().forPath(leaderPath); + anotherClient.delete().forPath(connectionInformationPath); try { - anotherClient.create().forPath(leaderPath, baos.toByteArray()); + anotherClient.create().forPath(connectionInformationPath, baos.toByteArray()); dataWritten = true; } catch (KeeperException.NodeExistsException e) { @@ -389,7 +385,7 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception { // The faulty leader should be corrected on ZooKeeper leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client, configuration) + ZooKeeperUtils.createLeaderRetrievalDriverFactory(client) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); @@ -470,7 +466,7 @@ public void testExceptionForwarding() throws Exception { */ @Test public void testEphemeralZooKeeperNodes() throws Exception { - LeaderElectionDriver leaderElectionDriver = null; + ZooKeeperLeaderElectionDriver leaderElectionDriver = null; LeaderRetrievalDriver leaderRetrievalDriver = null; final TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(TEST_LEADER); @@ -487,13 +483,11 @@ public void testEphemeralZooKeeperNodes() throws Exception { leaderElectionDriver = createAndInitLeaderElectionDriver(client, electionEventHandler); leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client2, configuration) + ZooKeeperUtils.createLeaderRetrievalDriverFactory(client2) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); - final String leaderPath = - configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH); - cache = new NodeCache(client2, leaderPath); + cache = new NodeCache(client2, leaderElectionDriver.getConnectionInformationPath()); ExistsCacheListener existsListener = new ExistsCacheListener(cache); DeletedCacheListener deletedCacheListener = new DeletedCacheListener(cache); @@ -570,7 +564,7 @@ public void testNotLeaderShouldNotCleanUpTheLeaderInformation() throws Exception is(LeaderInformation.empty())); // The data on ZooKeeper it not be cleared leaderRetrievalDriver = - ZooKeeperUtils.createLeaderRetrievalDriverFactory(client, configuration) + ZooKeeperUtils.createLeaderRetrievalDriverFactory(client) .createLeaderRetrievalDriver( retrievalEventHandler, retrievalEventHandler::handleError); @@ -646,7 +640,7 @@ private ZooKeeperLeaderElectionDriver createAndInitLeaderElectionDriver( throws Exception { final ZooKeeperLeaderElectionDriver leaderElectionDriver = - ZooKeeperUtils.createLeaderElectionDriverFactory(client, configuration) + ZooKeeperUtils.createLeaderElectionDriverFactory(client) .createLeaderElectionDriver( electionEventHandler, electionEventHandler::handleError, TEST_URL); electionEventHandler.init(leaderElectionDriver);