HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
- key("high-availability.zookeeper.path.checkpoint-counter")
- .defaultValue("/checkpoint-counter")
- .withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter")
- .withDescription("ZooKeeper root path (ZNode) for checkpoint counters.");
-
/** ZooKeeper root path (ZNode) for Mesos workers. */
@PublicEvolving
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
index b7d35f89b67639..aba11848174597 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
@@ -112,20 +112,20 @@ public CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
kubeClient,
configuration,
ioExecutor,
- this::getLeaderNameForJobManager,
+ this::getLeaderPathForJobManager,
lockIdentity);
}
@Override
public JobGraphStore createJobGraphStore() throws Exception {
return KubernetesUtils.createJobGraphStore(
- configuration, kubeClient, getLeaderNameForDispatcher(), lockIdentity);
+ configuration, kubeClient, getLeaderPathForDispatcher(), lockIdentity);
}
@Override
public RunningJobsRegistry createRunningJobsRegistry() {
return new KubernetesRunningJobsRegistry(
- kubeClient, getLeaderNameForDispatcher(), lockIdentity);
+ kubeClient, getLeaderPathForDispatcher(), lockIdentity);
}
@Override
@@ -144,25 +144,25 @@ public void internalCleanup() throws Exception {
@Override
public void internalCleanupJobData(JobID jobID) throws Exception {
- kubeClient.deleteConfigMap(getLeaderNameForJobManager(jobID)).get();
+ kubeClient.deleteConfigMap(getLeaderPathForJobManager(jobID)).get();
}
@Override
- protected String getLeaderNameForResourceManager() {
+ protected String getLeaderPathForResourceManager() {
return getLeaderName(RESOURCE_MANAGER_NAME);
}
@Override
- protected String getLeaderNameForDispatcher() {
+ protected String getLeaderPathForDispatcher() {
return getLeaderName(DISPATCHER_NAME);
}
- public String getLeaderNameForJobManager(final JobID jobID) {
+ public String getLeaderPathForJobManager(final JobID jobID) {
return getLeaderName(jobID.toString() + NAME_SEPARATOR + JOB_MANAGER_NAME);
}
@Override
- protected String getLeaderNameForRestServer() {
+ protected String getLeaderPathForRestServer() {
return getLeaderName(REST_SERVER_NAME);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index 9fb480f51704e2..1e4180a4e804f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -18,8 +18,10 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.shared.SharedCount;
@@ -76,14 +78,11 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
* Creates a {@link ZooKeeperCheckpointIDCounter} instance.
*
* @param client Curator ZooKeeper client
- * @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job.
*/
public ZooKeeperCheckpointIDCounter(
- CuratorFramework client,
- String counterPath,
- LastStateConnectionStateListener connectionStateListener) {
+ CuratorFramework client, LastStateConnectionStateListener connectionStateListener) {
this.client = checkNotNull(client, "Curator client");
- this.counterPath = checkNotNull(counterPath, "Counter path");
+ this.counterPath = ZooKeeperUtils.getCheckpointIdCounterPath();
this.sharedCount = new SharedCount(client, counterPath, 1);
this.connectionStateListener = connectionStateListener;
}
@@ -176,4 +175,9 @@ private void checkConnectionState() {
}
});
}
+
+ @VisibleForTesting
+ String getPath() {
+ return counterPath;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 432865d27a6833..379d6a3bff96d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -51,11 +51,17 @@ public CompletedCheckpointStore createCheckpointStore(
throws Exception {
return ZooKeeperUtils.createCompletedCheckpoints(
- client, config, jobId, maxNumberOfCheckpointsToRetain, executor);
+ ZooKeeperUtils.useNamespaceAndEnsurePath(
+ client, ZooKeeperUtils.getPathForJob(jobId)),
+ config,
+ maxNumberOfCheckpointsToRetain,
+ executor);
}
@Override
public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception {
- return ZooKeeperUtils.createCheckpointIDCounter(client, config, jobID);
+ return ZooKeeperUtils.createCheckpointIDCounter(
+ ZooKeeperUtils.useNamespaceAndEnsurePath(
+ client, ZooKeeperUtils.getPathForJob(jobID)));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index 4c4dfe610a26ce..195b623950cb22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -38,9 +38,9 @@
/**
* Abstract high availability services based on distributed system(e.g. Zookeeper, Kubernetes). It
* will help with creating all the leader election/retrieval services and the cleanup. Please return
- * a proper leader name int the implementation of {@link #getLeaderNameForResourceManager}, {@link
- * #getLeaderNameForDispatcher}, {@link #getLeaderNameForJobManager}, {@link
- * #getLeaderNameForRestServer}. The returned leader name is the ConfigMap name in Kubernetes and
+ * a proper leader name int the implementation of {@link #getLeaderPathForResourceManager}, {@link
+ * #getLeaderPathForDispatcher}, {@link #getLeaderPathForJobManager}, {@link
+ * #getLeaderPathForRestServer}. The returned leader name is the ConfigMap name in Kubernetes and
* child path in Zookeeper.
*
* {@link #close()} and {@link #closeAndCleanupAllData()} should be implemented to destroy the
@@ -75,17 +75,17 @@ public AbstractHaServices(
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
- return createLeaderRetrievalService(getLeaderNameForResourceManager());
+ return createLeaderRetrievalService(getLeaderPathForResourceManager());
}
@Override
public LeaderRetrievalService getDispatcherLeaderRetriever() {
- return createLeaderRetrievalService(getLeaderNameForDispatcher());
+ return createLeaderRetrievalService(getLeaderPathForDispatcher());
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
- return createLeaderRetrievalService(getLeaderNameForJobManager(jobID));
+ return createLeaderRetrievalService(getLeaderPathForJobManager(jobID));
}
@Override
@@ -96,31 +96,31 @@ public LeaderRetrievalService getJobManagerLeaderRetriever(
@Override
public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
- return createLeaderRetrievalService(getLeaderNameForRestServer());
+ return createLeaderRetrievalService(getLeaderPathForRestServer());
}
@Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
- return createLeaderElectionService(getLeaderNameForResourceManager());
+ return createLeaderElectionService(getLeaderPathForResourceManager());
}
@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
- return createLeaderElectionService(getLeaderNameForDispatcher());
+ return createLeaderElectionService(getLeaderPathForDispatcher());
}
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
- return createLeaderElectionService(getLeaderNameForJobManager(jobID));
+ return createLeaderElectionService(getLeaderPathForJobManager(jobID));
}
@Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService() {
- return createLeaderElectionService(getLeaderNameForRestServer());
+ return createLeaderElectionService(getLeaderPathForRestServer());
}
@Override
- public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
return createCheckpointRecoveryFactory();
}
@@ -233,7 +233,7 @@ public void cleanupJobData(JobID jobID) throws Exception {
*
* @return Checkpoint recovery factory
*/
- protected abstract CheckpointRecoveryFactory createCheckpointRecoveryFactory();
+ protected abstract CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws Exception;
/**
* Create the submitted job graph store for the job manager.
@@ -277,35 +277,35 @@ public void cleanupJobData(JobID jobID) throws Exception {
protected abstract void internalCleanupJobData(JobID jobID) throws Exception;
/**
- * Get the leader name for ResourceManager.
+ * Get the leader path for ResourceManager.
*
* @return Return the ResourceManager leader name. It is ConfigMap name in Kubernetes or child
* node path in Zookeeper.
*/
- protected abstract String getLeaderNameForResourceManager();
+ protected abstract String getLeaderPathForResourceManager();
/**
- * Get the leader name for Dispatcher.
+ * Get the leader path for Dispatcher.
*
* @return Return the Dispatcher leader name. It is ConfigMap name in Kubernetes or child node
* path in Zookeeper.
*/
- protected abstract String getLeaderNameForDispatcher();
+ protected abstract String getLeaderPathForDispatcher();
/**
- * Get the leader name for specific JobManager.
+ * Get the leader path for specific JobManager.
*
* @param jobID job id
* @return Return the JobManager leader name for specified job id. It is ConfigMap name in
* Kubernetes or child node path in Zookeeper.
*/
- protected abstract String getLeaderNameForJobManager(final JobID jobID);
+ protected abstract String getLeaderPathForJobManager(final JobID jobID);
/**
- * Get the leader name for RestServer.
+ * Get the leader path for RestServer.
*
* @return Return the RestServer leader name. It is ConfigMap name in Kubernetes or child node
* path in Zookeeper.
*/
- protected abstract String getLeaderNameForRestServer();
+ protected abstract String getLeaderPathForRestServer();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index fdf031e34e19f1..1771b4bf0ae5c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -158,7 +158,7 @@ default LeaderElectionService getWebMonitorLeaderElectionService() {
*
* @return Checkpoint recovery factory
*/
- CheckpointRecoveryFactory getCheckpointRecoveryFactory();
+ CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception;
/**
* Gets the submitted job graph store for the job manager.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 0f6ebb05eb0e6a..3f134317879367 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -140,7 +140,7 @@ public static ClientHighAvailabilityServices createClientHAService(Configuration
return new StandaloneClientHAServices(webMonitorAddress);
case ZOOKEEPER:
final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
- return new ZooKeeperClientHAServices(client, configuration);
+ return new ZooKeeperClientHAServices(client);
case FACTORY_CLASS:
return createCustomClientHAServices(configuration);
default:
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java
index e82d2cb85a0c12..76002c8d667cd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperClientHAServices.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.highavailability.zookeeper;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -30,21 +29,16 @@
/** ZooKeeper based implementation for {@link ClientHighAvailabilityServices}. */
public class ZooKeeperClientHAServices implements ClientHighAvailabilityServices {
- private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";
-
private final CuratorFramework client;
- private final Configuration configuration;
- public ZooKeeperClientHAServices(
- @Nonnull CuratorFramework client, @Nonnull Configuration configuration) {
+ public ZooKeeperClientHAServices(@Nonnull CuratorFramework client) {
this.client = client;
- this.configuration = configuration;
}
@Override
public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(
- client, configuration, REST_SERVER_LEADER_PATH);
+ client, ZooKeeperUtils.getLeaderPathForRestServer());
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 802162b1428462..d6bdd18ac05d4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -38,10 +38,7 @@
import javax.annotation.Nonnull;
-import java.util.List;
import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -51,21 +48,37 @@
*
*
* /flink
- * +/cluster_id_1/resource_manager_lock
+ * +/cluster_id_1/leader/resource_manager/latch
+ * | | /connection_info
+ * | | /dispatcher/latch
+ * | | /connection_info
+ * | | /rest_server/latch
+ * | | /connection_info
* | |
- * | +/job-id-1/job_manager_lock
- * | | /checkpoints/latest
- * | | /latest-1
- * | | /latest-2
* | |
- * | +/job-id-2/job_manager_lock
+ * | +jobgraphs/job-id-1
+ * | | /job-id-2
+ * | +jobs/job-id-1/leader/latch
+ * | | /connection_info
+ * | | /checkpoints/latest
+ * | | /latest-1
+ * | | /latest-2
+ * | | /checkpoint_id_counter
* |
- * +/cluster_id_2/resource_manager_lock
- * |
- * +/job-id-1/job_manager_lock
- * |/checkpoints/latest
- * | /latest-1
- * |/persisted_job_graph
+ * +/cluster_id_2/leader/resource_manager/latch
+ * | | /connection_info
+ * | | /dispatcher/latch
+ * | | /connection_info
+ * | | /rest_server/latch
+ * | | /connection_info
+ * | |
+ * | +jobgraphs/job-id-2
+ * | +jobs/job-id-2/leader/latch
+ * | | /connection_info
+ * | | /checkpoints/latest
+ * | | /latest-1
+ * | | /latest-2
+ * | | /checkpoint_id_counter
*
*
* The root path "/flink" is configurable via the option {@link
@@ -85,14 +98,6 @@
*/
public class ZooKeeperHaServices extends AbstractHaServices {
- private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
-
- private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";
-
- private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
-
- private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";
-
// ------------------------------------------------------------------------
/** The ZooKeeper client to use. */
@@ -108,8 +113,11 @@ public ZooKeeperHaServices(
}
@Override
- public CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
- return new ZooKeeperCheckpointRecoveryFactory(client, configuration, ioExecutor);
+ public CheckpointRecoveryFactory createCheckpointRecoveryFactory() throws Exception {
+ return new ZooKeeperCheckpointRecoveryFactory(
+ ZooKeeperUtils.useNamespaceAndEnsurePath(client, ZooKeeperUtils.getJobsPath()),
+ configuration,
+ ioExecutor);
}
@Override
@@ -123,13 +131,13 @@ public RunningJobsRegistry createRunningJobsRegistry() {
}
@Override
- protected LeaderElectionService createLeaderElectionService(String leaderName) {
- return ZooKeeperUtils.createLeaderElectionService(client, configuration, leaderName);
+ protected LeaderElectionService createLeaderElectionService(String leaderPath) {
+ return ZooKeeperUtils.createLeaderElectionService(client, leaderPath);
}
@Override
- protected LeaderRetrievalService createLeaderRetrievalService(String leaderName) {
- return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, leaderName);
+ protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath) {
+ return ZooKeeperUtils.createLeaderRetrievalService(client, leaderPath);
}
@Override
@@ -144,37 +152,27 @@ public void internalCleanup() throws Exception {
@Override
public void internalCleanupJobData(JobID jobID) throws Exception {
- final List paths =
- Stream.of(
- HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH,
- HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
- HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH,
- HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH)
- .map(configuration::getString)
- .map(parent -> parent + "/" + jobID)
- .collect(Collectors.toList());
- for (String path : paths) {
- deleteZNode(path);
- }
+ deleteZNode(ZooKeeperUtils.getLeaderPathForJob(jobID));
}
@Override
- protected String getLeaderNameForResourceManager() {
- return RESOURCE_MANAGER_LEADER_PATH;
+ protected String getLeaderPathForResourceManager() {
+ return ZooKeeperUtils.getLeaderPathForResourceManager();
}
@Override
- protected String getLeaderNameForDispatcher() {
- return DISPATCHER_LEADER_PATH;
+ protected String getLeaderPathForDispatcher() {
+ return ZooKeeperUtils.getLeaderPathForDispatcher();
}
- public String getLeaderNameForJobManager(final JobID jobID) {
- return "/" + jobID + JOB_MANAGER_LEADER_PATH;
+ @Override
+ public String getLeaderPathForJobManager(final JobID jobID) {
+ return ZooKeeperUtils.getLeaderPathForJobManager(jobID);
}
@Override
- protected String getLeaderNameForRestServer() {
- return REST_SERVER_LEADER_PATH;
+ protected String getLeaderPathForRestServer() {
+ return ZooKeeperUtils.getLeaderPathForRestServer();
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java
index 855ebca247a38c..522f36eb93b856 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java
@@ -18,7 +18,9 @@
package org.apache.flink.runtime.leaderelection;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
@@ -68,7 +70,7 @@ public class ZooKeeperLeaderElectionDriver
private final NodeCache cache;
/** ZooKeeper path of the node which stores the current leader information. */
- private final String leaderPath;
+ private final String connectionInformationPath;
private final ConnectionStateListener listener =
(client, newState) -> handleStateChange(newState);
@@ -85,29 +87,27 @@ public class ZooKeeperLeaderElectionDriver
* Creates a ZooKeeperLeaderElectionDriver object.
*
* @param client Client which is connected to the ZooKeeper quorum
- * @param latchPath ZooKeeper node path for the leader election latch
- * @param leaderPath ZooKeeper node path for the node which stores the current leader
- * information
+ * @param path ZooKeeper node path for the leader election
* @param leaderElectionEventHandler Event handler for processing leader change events
* @param fatalErrorHandler Fatal error handler
* @param leaderContenderDescription Leader contender description
*/
public ZooKeeperLeaderElectionDriver(
CuratorFramework client,
- String latchPath,
- String leaderPath,
+ String path,
LeaderElectionEventHandler leaderElectionEventHandler,
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception {
+ checkNotNull(path);
this.client = checkNotNull(client);
- this.leaderPath = checkNotNull(leaderPath);
+ this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
- leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
- cache = new NodeCache(client, leaderPath);
+ leaderLatch = new LeaderLatch(client, ZooKeeperUtils.generateLeaderLatchPath(path));
+ cache = new NodeCache(client, connectionInformationPath);
client.getUnhandledErrorListenable().addListener(this);
@@ -220,7 +220,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) {
boolean dataWritten = false;
while (!dataWritten && leaderLatch.hasLeadership()) {
- Stat stat = client.checkExists().forPath(leaderPath);
+ Stat stat = client.checkExists().forPath(connectionInformationPath);
if (stat != null) {
long owner = stat.getEphemeralOwner();
@@ -228,7 +228,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) {
if (owner == sessionID) {
try {
- client.setData().forPath(leaderPath, baos.toByteArray());
+ client.setData().forPath(connectionInformationPath, baos.toByteArray());
dataWritten = true;
} catch (KeeperException.NoNodeException noNode) {
@@ -236,7 +236,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) {
}
} else {
try {
- client.delete().forPath(leaderPath);
+ client.delete().forPath(connectionInformationPath);
} catch (KeeperException.NoNodeException noNode) {
// node was deleted in the meantime --> try again
}
@@ -246,7 +246,7 @@ public void writeLeaderInformation(LeaderInformation leaderInformation) {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
- .forPath(leaderPath, baos.toByteArray());
+ .forPath(connectionInformationPath, baos.toByteArray());
dataWritten = true;
} catch (KeeperException.NodeExistsException nodeExists) {
@@ -301,6 +301,15 @@ public void unhandledError(String message, Throwable e) {
@Override
public String toString() {
- return "ZooKeeperLeaderElectionDriver{" + "leaderPath='" + leaderPath + '\'' + '}';
+ return "ZooKeeperLeaderElectionDriver{"
+ + "leaderPath='"
+ + connectionInformationPath
+ + '\''
+ + '}';
+ }
+
+ @VisibleForTesting
+ String getConnectionInformationPath() {
+ return connectionInformationPath;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java
index a408882ef43e05..eef7d0cb3547e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriverFactory.java
@@ -27,15 +27,11 @@ public class ZooKeeperLeaderElectionDriverFactory implements LeaderElectionDrive
private final CuratorFramework client;
- private final String latchPath;
+ private final String path;
- private final String leaderPath;
-
- public ZooKeeperLeaderElectionDriverFactory(
- CuratorFramework client, String latchPath, String leaderPath) {
+ public ZooKeeperLeaderElectionDriverFactory(CuratorFramework client, String path) {
this.client = client;
- this.latchPath = latchPath;
- this.leaderPath = leaderPath;
+ this.path = path;
}
@Override
@@ -45,11 +41,6 @@ public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
String leaderContenderDescription)
throws Exception {
return new ZooKeeperLeaderElectionDriver(
- client,
- latchPath,
- leaderPath,
- leaderEventHandler,
- fatalErrorHandler,
- leaderContenderDescription);
+ client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
index f447a17e1ce4e7..680dee2ab28aa1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
@@ -18,9 +18,11 @@
package org.apache.flink.runtime.leaderretrieval;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
@@ -57,7 +59,7 @@ public class ZooKeeperLeaderRetrievalDriver
/** Curator recipe to watch changes of a specific ZooKeeper node. */
private final NodeCache cache;
- private final String retrievalPath;
+ private final String connectionInformationPath;
private final ConnectionStateListener connectionStateListener =
(client, newState) -> handleStateChange(newState);
@@ -72,19 +74,19 @@ public class ZooKeeperLeaderRetrievalDriver
* Creates a leader retrieval service which uses ZooKeeper to retrieve the leader information.
*
* @param client Client which constitutes the connection to the ZooKeeper quorum
- * @param retrievalPath Path of the ZooKeeper node which contains the leader information
+ * @param path Path of the ZooKeeper node which contains the leader information
* @param leaderRetrievalEventHandler Handler to notify the leader changes.
* @param fatalErrorHandler Fatal error handler
*/
public ZooKeeperLeaderRetrievalDriver(
CuratorFramework client,
- String retrievalPath,
+ String path,
LeaderRetrievalEventHandler leaderRetrievalEventHandler,
FatalErrorHandler fatalErrorHandler)
throws Exception {
this.client = checkNotNull(client, "CuratorFramework client");
- this.cache = new NodeCache(client, retrievalPath);
- this.retrievalPath = checkNotNull(retrievalPath);
+ this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
+ this.cache = new NodeCache(client, connectionInformationPath);
this.leaderRetrievalEventHandler = checkNotNull(leaderRetrievalEventHandler);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
@@ -188,6 +190,15 @@ public void unhandledError(String s, Throwable throwable) {
@Override
public String toString() {
- return "ZookeeperLeaderRetrievalDriver{" + "retrievalPath='" + retrievalPath + '\'' + '}';
+ return "ZookeeperLeaderRetrievalDriver{"
+ + "connectionInformationPath='"
+ + connectionInformationPath
+ + '\''
+ + '}';
+ }
+
+ @VisibleForTesting
+ public String getConnectionInformationPath() {
+ return connectionInformationPath;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 0af2248340ffed..9e656211425ce3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -47,7 +47,6 @@
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFrameworkFactory;
@@ -62,10 +61,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -80,6 +83,60 @@ public class ZooKeeperUtils {
/** The prefix of the completed checkpoint file. */
public static final String HA_STORAGE_COMPLETED_CHECKPOINT = "completedCheckpoint";
+ private static final String RESOURCE_MANAGER_LEADER = "/resource_manager";
+
+ private static final String DISPATCHER_LEADER = "/dispatcher";
+
+ private static final String LEADER_NODE = "/leader";
+
+ private static final String REST_SERVER_LEADER = "/rest_server";
+
+ public static String getLeaderPathForResourceManager() {
+ return getLeaderPath(RESOURCE_MANAGER_LEADER);
+ }
+
+ public static String getLeaderPathForDispatcher() {
+ return getLeaderPath(DISPATCHER_LEADER);
+ }
+
+ public static String getLeaderPathForRestServer() {
+ return getLeaderPath(REST_SERVER_LEADER);
+ }
+
+ public static String getLeaderPathForJobManager(JobID jobId) {
+ return generateZookeeperPath(getLeaderPathForJob(jobId), LEADER_NODE);
+ }
+
+ @Nonnull
+ public static String getLeaderPathForJob(JobID jobId) {
+ return generateZookeeperPath(getJobsPath(), getPathForJob(jobId));
+ }
+
+ public static String getJobsPath() {
+ return "/jobs";
+ }
+
+ private static String getCheckpointsPath() {
+ return "/checkpoints";
+ }
+
+ public static String getCheckpointIdCounterPath() {
+ return "/checkpoint_id_counter";
+ }
+
+ private static String getLeaderPath(String suffix) {
+ return generateZookeeperPath(LEADER_NODE, suffix);
+ }
+
+ @Nonnull
+ public static String generateConnectionInformationPath(String path) {
+ return generateZookeeperPath(path, "connection_info");
+ }
+
+ public static String generateLeaderLatchPath(String path) {
+ return generateZookeeperPath(path, "latch");
+ }
+
/**
* Starts a {@link CuratorFramework} instance and connects it to the given ZooKeeper quorum.
*
@@ -87,7 +144,7 @@ public class ZooKeeperUtils {
* @return {@link CuratorFramework} instance
*/
public static CuratorFramework startCuratorFramework(Configuration configuration) {
- Preconditions.checkNotNull(configuration, "configuration");
+ checkNotNull(configuration, "configuration");
String zkQuorum = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM);
if (zkQuorum == null || StringUtils.isBlank(zkQuorum)) {
@@ -123,7 +180,7 @@ public static CuratorFramework startCuratorFramework(Configuration configuration
if (disableSaslClient && aclMode == ZkClientACLMode.CREATOR) {
String errorMessage =
"Cannot set ACL role to "
- + aclMode
+ + ZkClientACLMode.CREATOR
+ " since SASL authentication is "
+ "disabled through the "
+ SecurityOptions.ZOOKEEPER_SASL_DISABLE.key()
@@ -152,10 +209,7 @@ public static CuratorFramework startCuratorFramework(Configuration configuration
.retryPolicy(new ExponentialBackoffRetry(retryWait, maxRetryAttempts))
// Curator prepends a '/' manually and throws an Exception if the
// namespace starts with a '/'.
- .namespace(
- rootWithNamespace.startsWith("/")
- ? rootWithNamespace.substring(1)
- : rootWithNamespace)
+ .namespace(trimStartingSlash(rootWithNamespace))
.aclProvider(aclProvider)
.build();
@@ -193,12 +247,11 @@ public static String getZooKeeperEnsemble(Configuration flinkConf)
* ZooKeeperLeaderRetrievalDriver}.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
- * @param configuration {@link Configuration} object containing the configuration values
* @return {@link DefaultLeaderRetrievalService} instance.
*/
public static DefaultLeaderRetrievalService createLeaderRetrievalService(
- final CuratorFramework client, final Configuration configuration) {
- return createLeaderRetrievalService(client, configuration, "");
+ final CuratorFramework client) {
+ return createLeaderRetrievalService(client, "");
}
/**
@@ -206,46 +259,35 @@ public static DefaultLeaderRetrievalService createLeaderRetrievalService(
* ZooKeeperLeaderRetrievalDriver}.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
- * @param configuration {@link Configuration} object containing the configuration values
- * @param pathSuffix The path suffix which we want to append
+ * @param path The path for the leader retrieval
* @return {@link DefaultLeaderRetrievalService} instance.
*/
public static DefaultLeaderRetrievalService createLeaderRetrievalService(
- final CuratorFramework client,
- final Configuration configuration,
- final String pathSuffix) {
- return new DefaultLeaderRetrievalService(
- createLeaderRetrievalDriverFactory(client, configuration, pathSuffix));
+ final CuratorFramework client, final String path) {
+ return new DefaultLeaderRetrievalService(createLeaderRetrievalDriverFactory(client, path));
}
/**
* Creates a {@link LeaderRetrievalDriverFactory} implemented by ZooKeeper.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
- * @param configuration {@link Configuration} object containing the configuration values
* @return {@link LeaderRetrievalDriverFactory} instance.
*/
public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory(
- final CuratorFramework client, final Configuration configuration) {
- return createLeaderRetrievalDriverFactory(client, configuration, "");
+ final CuratorFramework client) {
+ return createLeaderRetrievalDriverFactory(client, "");
}
/**
* Creates a {@link LeaderRetrievalDriverFactory} implemented by ZooKeeper.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
- * @param configuration {@link Configuration} object containing the configuration values
- * @param pathSuffix The path suffix which we want to append
+ * @param path The path for the leader zNode
* @return {@link LeaderRetrievalDriverFactory} instance.
*/
public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory(
- final CuratorFramework client,
- final Configuration configuration,
- final String pathSuffix) {
- final String leaderPath =
- configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH)
- + pathSuffix;
- return new ZooKeeperLeaderRetrievalDriverFactory(client, leaderPath);
+ final CuratorFramework client, final String path) {
+ return new ZooKeeperLeaderRetrievalDriverFactory(client, path);
}
/**
@@ -253,13 +295,12 @@ public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverF
* ZooKeeperLeaderElectionDriver}.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
- * @param configuration {@link Configuration} object containing the configuration values
* @return {@link DefaultLeaderElectionService} instance.
*/
public static DefaultLeaderElectionService createLeaderElectionService(
- CuratorFramework client, Configuration configuration) {
+ CuratorFramework client) {
- return createLeaderElectionService(client, configuration, "");
+ return createLeaderElectionService(client, "");
}
/**
@@ -267,50 +308,35 @@ public static DefaultLeaderElectionService createLeaderElectionService(
* ZooKeeperLeaderElectionDriver}.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
- * @param configuration {@link Configuration} object containing the configuration values
- * @param pathSuffix The path suffix which we want to append
+ * @param path The path for the leader election
* @return {@link DefaultLeaderElectionService} instance.
*/
public static DefaultLeaderElectionService createLeaderElectionService(
- final CuratorFramework client,
- final Configuration configuration,
- final String pathSuffix) {
- return new DefaultLeaderElectionService(
- createLeaderElectionDriverFactory(client, configuration, pathSuffix));
+ final CuratorFramework client, final String path) {
+ return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
}
/**
* Creates a {@link LeaderElectionDriverFactory} implemented by ZooKeeper.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
- * @param configuration {@link Configuration} object containing the configuration values
* @return {@link LeaderElectionDriverFactory} instance.
*/
public static ZooKeeperLeaderElectionDriverFactory createLeaderElectionDriverFactory(
- final CuratorFramework client, final Configuration configuration) {
- return createLeaderElectionDriverFactory(client, configuration, "");
+ final CuratorFramework client) {
+ return createLeaderElectionDriverFactory(client, "");
}
/**
* Creates a {@link LeaderElectionDriverFactory} implemented by ZooKeeper.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
- * @param configuration {@link Configuration} object containing the configuration values
- * @param pathSuffix The path suffix which we want to append
+ * @param path The path suffix which we want to append
* @return {@link LeaderElectionDriverFactory} instance.
*/
public static ZooKeeperLeaderElectionDriverFactory createLeaderElectionDriverFactory(
- final CuratorFramework client,
- final Configuration configuration,
- final String pathSuffix) {
- final String latchPath =
- configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH)
- + pathSuffix;
- final String leaderPath =
- configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH)
- + pathSuffix;
-
- return new ZooKeeperLeaderElectionDriverFactory(client, latchPath, leaderPath);
+ final CuratorFramework client, final String path) {
+ return new ZooKeeperLeaderElectionDriverFactory(client, path);
}
/**
@@ -359,7 +385,6 @@ public static JobGraphStore createJobGraphs(
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param configuration {@link Configuration} object
- * @param jobId ID of job to create the instance for
* @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain
* @param executor to run ZooKeeper callbacks
* @return {@link DefaultCompletedCheckpointStore} instance
@@ -368,23 +393,17 @@ public static JobGraphStore createJobGraphs(
public static CompletedCheckpointStore createCompletedCheckpoints(
CuratorFramework client,
Configuration configuration,
- JobID jobId,
int maxNumberOfCheckpointsToRetain,
Executor executor)
throws Exception {
checkNotNull(configuration, "Configuration");
- String checkpointsPath =
- configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH);
-
RetrievableStateStorageHelper stateStorage =
createFileSystemStateStorage(configuration, HA_STORAGE_COMPLETED_CHECKPOINT);
- checkpointsPath += getPathForJob(jobId);
-
final ZooKeeperStateHandleStore completedCheckpointStateHandleStore =
- createZooKeeperStateHandleStore(client, checkpointsPath, stateStorage);
+ createZooKeeperStateHandleStore(client, getCheckpointsPath(), stateStorage);
final CompletedCheckpointStore zooKeeperCompletedCheckpointStore =
new DefaultCompletedCheckpointStore<>(
maxNumberOfCheckpointsToRetain,
@@ -396,7 +415,7 @@ public static CompletedCheckpointStore createCompletedCheckpoints(
"Initialized {} in '{}' with {}.",
DefaultCompletedCheckpointStore.class.getSimpleName(),
completedCheckpointStateHandleStore,
- checkpointsPath);
+ getCheckpointsPath());
return zooKeeperCompletedCheckpointStore;
}
@@ -431,21 +450,11 @@ ZooKeeperStateHandleStore createZooKeeperStateHandleStore(
* Creates a {@link ZooKeeperCheckpointIDCounter} instance.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
- * @param configuration {@link Configuration} object
- * @param jobId ID of job to create the instance for
* @return {@link ZooKeeperCheckpointIDCounter} instance
*/
- public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(
- CuratorFramework client, Configuration configuration, JobID jobId) {
-
- String checkpointIdCounterPath =
- configuration.getString(
- HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
-
- checkpointIdCounterPath += getPathForJob(jobId);
-
+ public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter(CuratorFramework client) {
return new ZooKeeperCheckpointIDCounter(
- client, checkpointIdCounterPath, new DefaultLastStateConnectionStateListener());
+ client, new DefaultLastStateConnectionStateListener());
}
/**
@@ -466,20 +475,38 @@ FileSystemStateStorageHelper createFileSystemStateStorage(
prefix);
}
- public static String generateZookeeperPath(String root, String namespace) {
- if (!namespace.startsWith("/")) {
- namespace = '/' + namespace;
- }
+ /** Creates a ZooKeeper path of the form "/root/child". */
+ public static String generateZookeeperPath(String root, String child) {
+ final String result =
+ Stream.of(root, child)
+ .map(ZooKeeperUtils::trimSlashes)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.joining("/", "/", ""));
+
+ return result;
+ }
+
+ private static String trimStartingSlash(String path) {
+ return path.startsWith("/") ? path.substring(1) : path;
+ }
- if (namespace.endsWith("/")) {
- namespace = namespace.substring(0, namespace.length() - 1);
+ private static String trimSlashes(String input) {
+ int left = 0;
+ int right = input.length() - 1;
+
+ while (left <= right && input.charAt(left) == '/') {
+ left++;
}
- if (root.endsWith("/")) {
- root = root.substring(0, root.length() - 1);
+ while (right >= left && input.charAt(right) == '/') {
+ right--;
}
- return root + namespace;
+ if (left <= right) {
+ return input.substring(left, right + 1);
+ } else {
+ return "";
+ }
}
/**
@@ -493,14 +520,18 @@ public static String generateZookeeperPath(String root, String namespace) {
*/
public static CuratorFramework useNamespaceAndEnsurePath(
final CuratorFramework client, final String path) throws Exception {
- Preconditions.checkNotNull(client, "client must not be null");
- Preconditions.checkNotNull(path, "path must not be null");
+ checkNotNull(client, "client must not be null");
+ checkNotNull(path, "path must not be null");
// Ensure that the checkpoints path exists
client.newNamespaceAwareEnsurePath(path).ensure(client.getZookeeperClient());
// All operations will have the path as root
- return client.usingNamespace(generateZookeeperPath(client.getNamespace(), path));
+ final String newNamespace = generateZookeeperPath(client.getNamespace(), path);
+ return client.usingNamespace(
+ // Curator prepends a '/' manually and throws an Exception if the
+ // namespace starts with a '/'.
+ trimStartingSlash(newNamespace));
}
/** Secure {@link ACLProvider} implementation. */
@@ -530,10 +561,10 @@ public enum ZkClientACLMode {
*/
public static ZkClientACLMode fromConfig(Configuration config) {
String aclMode = config.getString(HighAvailabilityOptions.ZOOKEEPER_CLIENT_ACL);
- if (aclMode == null || aclMode.equalsIgnoreCase(ZkClientACLMode.OPEN.name())) {
- return ZkClientACLMode.OPEN;
- } else if (aclMode.equalsIgnoreCase(ZkClientACLMode.CREATOR.name())) {
- return ZkClientACLMode.CREATOR;
+ if (aclMode == null || aclMode.equalsIgnoreCase(OPEN.name())) {
+ return OPEN;
+ } else if (aclMode.equalsIgnoreCase(CREATOR.name())) {
+ return CREATOR;
} else {
String message = "Unsupported ACL option: [" + aclMode + "] provided";
LOG.error(message);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java
index d6a6eb5d42e18a..f0eb180cda4bfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZKCheckpointIDCounterMultiServersTest.java
@@ -64,7 +64,7 @@ public void testRecoveredAfterConnectionLoss() throws Exception {
connectionLossLatch, reconnectedLatch);
ZooKeeperCheckpointIDCounter idCounter =
- new ZooKeeperCheckpointIDCounter(client, "/checkpoint-id-counter", listener);
+ new ZooKeeperCheckpointIDCounter(client, listener);
idCounter.start();
AtomicLong localCounter = new AtomicLong(1L);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java
index 29d7d93e8bf73e..55ec0920fddd4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java
@@ -51,34 +51,32 @@ public void cleanUp() throws Exception {
/** Tests that counter node is removed from ZooKeeper after shutdown. */
@Test
public void testShutdownRemovesState() throws Exception {
- CheckpointIDCounter counter = createCheckpointIdCounter();
+ ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
counter.start();
CuratorFramework client = ZooKeeper.getClient();
- assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
+ assertNotNull(client.checkExists().forPath(counter.getPath()));
counter.shutdown(JobStatus.FINISHED);
- assertNull(client.checkExists().forPath("/checkpoint-id-counter"));
+ assertNull(client.checkExists().forPath(counter.getPath()));
}
/** Tests that counter node is NOT removed from ZooKeeper after suspend. */
@Test
public void testSuspendKeepsState() throws Exception {
- CheckpointIDCounter counter = createCheckpointIdCounter();
+ ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
counter.start();
CuratorFramework client = ZooKeeper.getClient();
- assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
+ assertNotNull(client.checkExists().forPath(counter.getPath()));
counter.shutdown(JobStatus.SUSPENDED);
- assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
+ assertNotNull(client.checkExists().forPath(counter.getPath()));
}
@Override
- protected CheckpointIDCounter createCheckpointIdCounter() throws Exception {
+ protected ZooKeeperCheckpointIDCounter createCheckpointIdCounter() throws Exception {
return new ZooKeeperCheckpointIDCounter(
- ZooKeeper.getClient(),
- "/checkpoint-id-counter",
- new DefaultLastStateConnectionStateListener());
+ ZooKeeper.getClient(), new DefaultLastStateConnectionStateListener());
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
index b9f5bb7331484f..e4582a2754e455 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java
@@ -149,9 +149,7 @@ public void testResourceCleanupUnderLeadershipChange() throws Exception {
new ZooKeeperRunningJobsRegistry(client, configuration))
.setDispatcherLeaderElectionService(dispatcherLeaderElectionService)
.setJobMasterLeaderRetrieverFunction(
- jobId ->
- ZooKeeperUtils.createLeaderRetrievalService(
- client, configuration))
+ jobId -> ZooKeeperUtils.createLeaderRetrievalService(client))
.build()) {
final PartialDispatcherServices partialDispatcherServices =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
index 96e6daf27efb23..4b970116a40402 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
@@ -240,22 +240,22 @@ protected void internalCleanupJobData(JobID jobID) throws Exception {
}
@Override
- protected String getLeaderNameForResourceManager() {
+ protected String getLeaderPathForResourceManager() {
throw new UnsupportedOperationException("Not supported by this test implementation.");
}
@Override
- protected String getLeaderNameForDispatcher() {
+ protected String getLeaderPathForDispatcher() {
throw new UnsupportedOperationException("Not supported by this test implementation.");
}
@Override
- protected String getLeaderNameForJobManager(JobID jobID) {
+ protected String getLeaderPathForJobManager(JobID jobID) {
throw new UnsupportedOperationException("Not supported by this test implementation.");
}
@Override
- protected String getLeaderNameForRestServer() {
+ protected String getLeaderPathForRestServer() {
throw new UnsupportedOperationException("Not supported by this test implementation.");
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
index 32601b9532d47a..b8a715585dc233 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
@@ -49,8 +49,6 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
@@ -156,33 +154,26 @@ public void testCloseAndCleanupAllDataWithUncle() throws Exception {
public void testCleanupJobData() throws Exception {
String rootPath = "/foo/bar/flink";
final Configuration configuration = createConfiguration(rootPath);
- String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+ final String namespace = configuration.get(HighAvailabilityOptions.HA_CLUSTER_ID);
- final List paths =
- Stream.of(
- HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH,
- HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH)
- .map(configuration::getString)
- .map(path -> rootPath + namespace + path)
- .collect(Collectors.toList());
+ JobID jobID = new JobID();
+ final String path = rootPath + namespace + ZooKeeperUtils.getJobsPath();
final TestingBlobStoreService blobStoreService = new TestingBlobStoreService();
- JobID jobID = new JobID();
runCleanupTestWithJob(
configuration,
blobStoreService,
jobID,
haServices -> {
- for (String path : paths) {
- final List children = client.getChildren().forPath(path);
- assertThat(children, hasItem(jobID.toString()));
- }
+ final List childrenBefore = client.getChildren().forPath(path);
+
haServices.cleanupJobData(jobID);
- for (String path : paths) {
- final List children = client.getChildren().forPath(path);
- assertThat(children, not(hasItem(jobID.toString())));
- }
+
+ final List childrenAfter = client.getChildren().forPath(path);
+
+ assertThat(childrenBefore, hasItem(jobID.toString()));
+ assertThat(childrenAfter, not(hasItem(jobID.toString())));
});
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
index cc1bc68ce21b58..db2117f5dabab7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
@@ -201,7 +201,7 @@ public void teardown() throws Exception {
@Override
public LeaderElectionService createLeaderElectionService() throws Exception {
- return ZooKeeperUtils.createLeaderElectionService(client, configuration);
+ return ZooKeeperUtils.createLeaderElectionService(client);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
index 10f66986e558d5..8f1e01a369f547 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
@@ -95,7 +95,7 @@ public void testConnectionSuspendedHandlingDuringInitialization() throws Excepti
LeaderRetrievalDriver leaderRetrievalDriver = null;
try {
leaderRetrievalDriver =
- ZooKeeperUtils.createLeaderRetrievalDriverFactory(zooKeeperClient, config)
+ ZooKeeperUtils.createLeaderRetrievalDriverFactory(zooKeeperClient)
.createLeaderRetrievalDriver(
queueLeaderElectionListener, fatalErrorHandler);
@@ -126,12 +126,12 @@ public void testConnectionSuspendedHandlingDuringInitialization() throws Excepti
@Test
public void testConnectionSuspendedHandling() throws Exception {
- final String retrievalPath = "/testConnectionSuspendedHandling/leaderAddress";
+ final String retrievalPath = "/testConnectionSuspendedHandling";
final String leaderAddress = "localhost";
final QueueLeaderElectionListener queueLeaderElectionListener =
new QueueLeaderElectionListener(1);
- LeaderRetrievalDriver leaderRetrievalDriver = null;
+ ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
try {
leaderRetrievalDriver =
new ZooKeeperLeaderRetrievalDriver(
@@ -140,7 +140,10 @@ public void testConnectionSuspendedHandling() throws Exception {
queueLeaderElectionListener,
fatalErrorHandler);
- writeLeaderInformationToZooKeeper(retrievalPath, leaderAddress, UUID.randomUUID());
+ writeLeaderInformationToZooKeeper(
+ leaderRetrievalDriver.getConnectionInformationPath(),
+ leaderAddress,
+ UUID.randomUUID());
// do the testing
CompletableFuture firstAddress = queueLeaderElectionListener.next();
@@ -166,11 +169,10 @@ public void testConnectionSuspendedHandling() throws Exception {
@Test
public void testSameLeaderAfterReconnectTriggersListenerNotification() throws Exception {
- final String retrievalPath =
- "/testSameLeaderAfterReconnectTriggersListenerNotification/leaderAddress";
+ final String retrievalPath = "/testSameLeaderAfterReconnectTriggersListenerNotification";
final QueueLeaderElectionListener queueLeaderElectionListener =
new QueueLeaderElectionListener(1);
- LeaderRetrievalDriver leaderRetrievalDriver = null;
+ ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
try {
leaderRetrievalDriver =
new ZooKeeperLeaderRetrievalDriver(
@@ -181,7 +183,8 @@ public void testSameLeaderAfterReconnectTriggersListenerNotification() throws Ex
final String leaderAddress = "foobar";
final UUID sessionId = UUID.randomUUID();
- writeLeaderInformationToZooKeeper(retrievalPath, leaderAddress, sessionId);
+ writeLeaderInformationToZooKeeper(
+ leaderRetrievalDriver.getConnectionInformationPath(), leaderAddress, sessionId);
// pop new leader
queueLeaderElectionListener.next();
@@ -233,12 +236,11 @@ private byte[] createLeaderInformation(String leaderAddress, UUID sessionId)
@Test
public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exception {
- final String retrievalPath =
- "/testNewLeaderAfterReconnectTriggersListenerNotification/leaderAddress";
+ final String retrievalPath = "/testNewLeaderAfterReconnectTriggersListenerNotification";
final QueueLeaderElectionListener queueLeaderElectionListener =
new QueueLeaderElectionListener(1);
- LeaderRetrievalDriver leaderRetrievalDriver = null;
+ ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
try {
leaderRetrievalDriver =
new ZooKeeperLeaderRetrievalDriver(
@@ -249,7 +251,8 @@ public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exc
final String leaderAddress = "foobar";
final UUID sessionId = UUID.randomUUID();
- writeLeaderInformationToZooKeeper(retrievalPath, leaderAddress, sessionId);
+ writeLeaderInformationToZooKeeper(
+ leaderRetrievalDriver.getConnectionInformationPath(), leaderAddress, sessionId);
// pop new leader
queueLeaderElectionListener.next();
@@ -266,7 +269,10 @@ public void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exc
final String newLeaderAddress = "barfoo";
final UUID newSessionId = UUID.randomUUID();
- writeLeaderInformationToZooKeeper(retrievalPath, newLeaderAddress, newSessionId);
+ writeLeaderInformationToZooKeeper(
+ leaderRetrievalDriver.getConnectionInformationPath(),
+ newLeaderAddress,
+ newSessionId);
// check that we find the new leader information eventually
CommonTestUtils.waitUntilCondition(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index d8acb0ee5b25a6..b928ad58a95761 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -140,7 +140,7 @@ public void testZooKeeperLeaderElectionRetrieval() throws Exception {
leaderElectionDriver = createAndInitLeaderElectionDriver(client, electionEventHandler);
leaderRetrievalDriver =
- ZooKeeperUtils.createLeaderRetrievalDriverFactory(client, configuration)
+ ZooKeeperUtils.createLeaderRetrievalDriverFactory(client)
.createLeaderRetrievalDriver(
retrievalEventHandler, retrievalEventHandler::handleError);
@@ -183,16 +183,14 @@ public void testZooKeeperReelection() throws Exception {
TestingListener listener = new TestingListener();
try {
- leaderRetrievalService =
- ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
+ leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client);
LOG.debug("Start leader retrieval service for the TestingListener.");
leaderRetrievalService.start(listener);
for (int i = 0; i < num; i++) {
- leaderElectionService[i] =
- ZooKeeperUtils.createLeaderElectionService(client, configuration);
+ leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client);
contenders[i] = new TestingContender(createAddress(i), leaderElectionService[i]);
LOG.debug("Start leader election service for contender #{}.", i);
@@ -273,14 +271,12 @@ public void testZooKeeperReelectionWithReplacement() throws Exception {
TestingListener listener = new TestingListener();
try {
- leaderRetrievalService =
- ZooKeeperUtils.createLeaderRetrievalService(client, configuration);
+ leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(client);
leaderRetrievalService.start(listener);
for (int i = 0; i < num; i++) {
- leaderElectionService[i] =
- ZooKeeperUtils.createLeaderElectionService(client, configuration);
+ leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(client);
contenders[i] =
new TestingContender(TEST_URL + "_" + i + "_0", leaderElectionService[i]);
@@ -308,7 +304,7 @@ public void testZooKeeperReelectionWithReplacement() throws Exception {
leaderElectionService[index].stop();
// create new leader election service which takes part in the leader election
leaderElectionService[index] =
- ZooKeeperUtils.createLeaderElectionService(client, configuration);
+ ZooKeeperUtils.createLeaderElectionService(client);
contenders[index] =
new TestingContender(
TEST_URL + "_" + index + "_" + (lastTry + 1),
@@ -341,16 +337,13 @@ public void testZooKeeperReelectionWithReplacement() throws Exception {
@Test
public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception {
final String faultyContenderUrl = "faultyContender";
- final String leaderPath = "/leader";
-
- configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
final TestingLeaderElectionEventHandler electionEventHandler =
new TestingLeaderElectionEventHandler(TEST_LEADER);
final TestingLeaderRetrievalEventHandler retrievalEventHandler =
new TestingLeaderRetrievalEventHandler();
- LeaderElectionDriver leaderElectionDriver = null;
+ ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
LeaderRetrievalDriver leaderRetrievalDriver = null;
CuratorFramework anotherClient = null;
@@ -375,11 +368,14 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception {
// overwrite the current leader address, the leader should notice that
boolean dataWritten = false;
+ final String connectionInformationPath =
+ leaderElectionDriver.getConnectionInformationPath();
+
while (!dataWritten) {
- anotherClient.delete().forPath(leaderPath);
+ anotherClient.delete().forPath(connectionInformationPath);
try {
- anotherClient.create().forPath(leaderPath, baos.toByteArray());
+ anotherClient.create().forPath(connectionInformationPath, baos.toByteArray());
dataWritten = true;
} catch (KeeperException.NodeExistsException e) {
@@ -389,7 +385,7 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception {
// The faulty leader should be corrected on ZooKeeper
leaderRetrievalDriver =
- ZooKeeperUtils.createLeaderRetrievalDriverFactory(client, configuration)
+ ZooKeeperUtils.createLeaderRetrievalDriverFactory(client)
.createLeaderRetrievalDriver(
retrievalEventHandler, retrievalEventHandler::handleError);
@@ -470,7 +466,7 @@ public void testExceptionForwarding() throws Exception {
*/
@Test
public void testEphemeralZooKeeperNodes() throws Exception {
- LeaderElectionDriver leaderElectionDriver = null;
+ ZooKeeperLeaderElectionDriver leaderElectionDriver = null;
LeaderRetrievalDriver leaderRetrievalDriver = null;
final TestingLeaderElectionEventHandler electionEventHandler =
new TestingLeaderElectionEventHandler(TEST_LEADER);
@@ -487,13 +483,11 @@ public void testEphemeralZooKeeperNodes() throws Exception {
leaderElectionDriver = createAndInitLeaderElectionDriver(client, electionEventHandler);
leaderRetrievalDriver =
- ZooKeeperUtils.createLeaderRetrievalDriverFactory(client2, configuration)
+ ZooKeeperUtils.createLeaderRetrievalDriverFactory(client2)
.createLeaderRetrievalDriver(
retrievalEventHandler, retrievalEventHandler::handleError);
- final String leaderPath =
- configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH);
- cache = new NodeCache(client2, leaderPath);
+ cache = new NodeCache(client2, leaderElectionDriver.getConnectionInformationPath());
ExistsCacheListener existsListener = new ExistsCacheListener(cache);
DeletedCacheListener deletedCacheListener = new DeletedCacheListener(cache);
@@ -570,7 +564,7 @@ public void testNotLeaderShouldNotCleanUpTheLeaderInformation() throws Exception
is(LeaderInformation.empty()));
// The data on ZooKeeper it not be cleared
leaderRetrievalDriver =
- ZooKeeperUtils.createLeaderRetrievalDriverFactory(client, configuration)
+ ZooKeeperUtils.createLeaderRetrievalDriverFactory(client)
.createLeaderRetrievalDriver(
retrievalEventHandler, retrievalEventHandler::handleError);
@@ -646,7 +640,7 @@ private ZooKeeperLeaderElectionDriver createAndInitLeaderElectionDriver(
throws Exception {
final ZooKeeperLeaderElectionDriver leaderElectionDriver =
- ZooKeeperUtils.createLeaderElectionDriverFactory(client, configuration)
+ ZooKeeperUtils.createLeaderElectionDriverFactory(client)
.createLeaderElectionDriver(
electionEventHandler, electionEventHandler::handleError, TEST_URL);
electionEventHandler.init(leaderElectionDriver);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
new file mode 100644
index 00000000000000..92bb24195d3162
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+public class ZooKeeperUtilsTest extends TestLogger {
+
+ @Test
+ public void testZookeeperPathGeneration() {
+ runZookeeperPathGenerationTest("root", "namespace", "/root/namespace");
+ runZookeeperPathGenerationTest("/root/", "/namespace/", "/root/namespace");
+ runZookeeperPathGenerationTest("//root//", "//namespace//", "/root/namespace");
+ runZookeeperPathGenerationTest("////", "namespace", "/namespace");
+ runZookeeperPathGenerationTest("//a//", "/b/", "/a/b");
+ runZookeeperPathGenerationTest("", "", "/");
+ runZookeeperPathGenerationTest("root", "////", "/root");
+ }
+
+ private void runZookeeperPathGenerationTest(
+ String root, String namespace, String expectedValue) {
+ final String result = ZooKeeperUtils.generateZookeeperPath(root, namespace);
+
+ assertThat(result, is(expectedValue));
+ }
+}