Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-22636] Group job specific zNodes under /jobs zNode #15893

Merged
merged 1 commit into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,12 @@
<td>Integer</td>
<td>Defines the session timeout for the ZooKeeper session in ms.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.checkpoint-counter</h5></td>
<td style="word-wrap: break-word;">"/checkpoint-counter"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for checkpoint counters.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.checkpoints</h5></td>
<td style="word-wrap: break-word;">"/checkpoints"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for completed checkpoints.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.jobgraphs</h5></td>
<td style="word-wrap: break-word;">"/jobgraphs"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for job graphs</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.latch</h5></td>
<td style="word-wrap: break-word;">"/leaderlatch"</td>
<td>String</td>
<td>Defines the znode of the leader latch which is used to elect the leader.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.leader</h5></td>
<td style="word-wrap: break-word;">"/leader"</td>
<td>String</td>
<td>Defines the znode of the leader which contains the URL to the leader and the current leader session ID.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.mesos-workers</h5></td>
<td style="word-wrap: break-word;">"/mesos-workers"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,36 +62,12 @@
<td>Integer</td>
<td>Defines the session timeout for the ZooKeeper session in ms.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.checkpoint-counter</h5></td>
<td style="word-wrap: break-word;">"/checkpoint-counter"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for checkpoint counters.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.checkpoints</h5></td>
<td style="word-wrap: break-word;">"/checkpoints"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for completed checkpoints.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.jobgraphs</h5></td>
<td style="word-wrap: break-word;">"/jobgraphs"</td>
<td>String</td>
<td>ZooKeeper root path (ZNode) for job graphs</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.latch</h5></td>
<td style="word-wrap: break-word;">"/leaderlatch"</td>
<td>String</td>
<td>Defines the znode of the leader latch which is used to elect the leader.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.leader</h5></td>
<td style="word-wrap: break-word;">"/leader"</td>
<td>String</td>
<td>Defines the znode of the leader which contains the URL to the leader and the current leader session ID.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.mesos-workers</h5></td>
<td style="word-wrap: break-word;">"/mesos-workers"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ public final class ConfigConstants {
public static final String HA_ZOOKEEPER_NAMESPACE_KEY =
"high-availability.zookeeper.path.namespace";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
/** @deprecated no longer used. */
@PublicEvolving @Deprecated
public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch";

Expand All @@ -1026,14 +1026,14 @@ public final class ConfigConstants {
public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH =
"high-availability.zookeeper.path.jobgraphs";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
/** @deprecated no longer used. */
@PublicEvolving @Deprecated
public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";

/**
* ZooKeeper root path (ZNode) for completed checkpoints.
*
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}.
* @deprecated no longer used.
*/
@PublicEvolving @Deprecated
public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH =
Expand All @@ -1042,7 +1042,7 @@ public final class ConfigConstants {
/**
* ZooKeeper root path (ZNode) for checkpoint counters.
*
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}.
* @deprecated no longer used.
*/
@PublicEvolving @Deprecated
public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
Expand Down Expand Up @@ -1691,21 +1691,19 @@ public final class ConfigConstants {
/** @deprecated in favor of {@link HighAvailabilityOptions#HA_CLUSTER_ID}. */
@Deprecated public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
/** @deprecated no longer used. */
@Deprecated public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
/** @deprecated no longer used. */
@Deprecated public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}. */
@Deprecated public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. */
/** @deprecated no longer used. */
@Deprecated public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints";

/**
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}
*/
/** @deprecated no longer used. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,6 @@ public class HighAvailabilityOptions {
.withDescription(
"The root path under which Flink stores its entries in ZooKeeper.");

@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =
key("high-availability.zookeeper.path.latch")
.defaultValue("/leaderlatch")
.withDeprecatedKeys("recovery.zookeeper.path.latch")
.withDescription(
"Defines the znode of the leader latch which is used to elect the leader.");

/** ZooKeeper root path (ZNode) for job graphs. */
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =
Expand All @@ -131,31 +123,6 @@ public class HighAvailabilityOptions {
.withDeprecatedKeys("recovery.zookeeper.path.jobgraphs")
.withDescription("ZooKeeper root path (ZNode) for job graphs");

@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =
key("high-availability.zookeeper.path.leader")
.defaultValue("/leader")
.withDeprecatedKeys("recovery.zookeeper.path.leader")
.withDescription(
"Defines the znode of the leader which contains the URL to the leader and the current"
+ " leader session ID.");

/** ZooKeeper root path (ZNode) for completed checkpoints. */
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =
key("high-availability.zookeeper.path.checkpoints")
.defaultValue("/checkpoints")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoints")
.withDescription("ZooKeeper root path (ZNode) for completed checkpoints.");

/** ZooKeeper root path (ZNode) for checkpoint counters. */
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
key("high-availability.zookeeper.path.checkpoint-counter")
.defaultValue("/checkpoint-counter")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter")
.withDescription("ZooKeeper root path (ZNode) for checkpoint counters.");

/** ZooKeeper root path (ZNode) for Mesos workers. */
@PublicEvolving
@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ public CheckpointRecoveryFactory createCheckpointRecoveryFactory() {
kubeClient,
configuration,
ioExecutor,
this::getLeaderNameForJobManager,
this::getLeaderPathForJobManager,
lockIdentity);
}

@Override
public JobGraphStore createJobGraphStore() throws Exception {
return KubernetesUtils.createJobGraphStore(
configuration, kubeClient, getLeaderNameForDispatcher(), lockIdentity);
configuration, kubeClient, getLeaderPathForDispatcher(), lockIdentity);
}

@Override
public RunningJobsRegistry createRunningJobsRegistry() {
return new KubernetesRunningJobsRegistry(
kubeClient, getLeaderNameForDispatcher(), lockIdentity);
kubeClient, getLeaderPathForDispatcher(), lockIdentity);
}

@Override
Expand All @@ -144,25 +144,25 @@ public void internalCleanup() throws Exception {

@Override
public void internalCleanupJobData(JobID jobID) throws Exception {
kubeClient.deleteConfigMap(getLeaderNameForJobManager(jobID)).get();
kubeClient.deleteConfigMap(getLeaderPathForJobManager(jobID)).get();
}

@Override
protected String getLeaderNameForResourceManager() {
protected String getLeaderPathForResourceManager() {
return getLeaderName(RESOURCE_MANAGER_NAME);
}

@Override
protected String getLeaderNameForDispatcher() {
protected String getLeaderPathForDispatcher() {
return getLeaderName(DISPATCHER_NAME);
}

public String getLeaderNameForJobManager(final JobID jobID) {
public String getLeaderPathForJobManager(final JobID jobID) {
return getLeaderName(jobID.toString() + NAME_SEPARATOR + JOB_MANAGER_NAME);
}

@Override
protected String getLeaderNameForRestServer() {
protected String getLeaderPathForRestServer() {
return getLeaderName(REST_SERVER_NAME);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testInternalJobCleanupShouldCleanupConfigMaps() throws Exception {
new VoidBlobStore());
JobID jobID = new JobID();
String configMapName =
kubernetesHaServices.getLeaderNameForJobManager(jobID);
kubernetesHaServices.getLeaderPathForJobManager(jobID);
final KubernetesConfigMap configMap =
new TestingFlinkKubeClient.MockKubernetesConfigMap(
configMapName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.util.ZooKeeperUtils;

import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.shared.SharedCount;
Expand Down Expand Up @@ -76,14 +78,11 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
* Creates a {@link ZooKeeperCheckpointIDCounter} instance.
*
* @param client Curator ZooKeeper client
* @param counterPath ZooKeeper path for the counter. It's sufficient to have a path per-job.
*/
public ZooKeeperCheckpointIDCounter(
CuratorFramework client,
String counterPath,
LastStateConnectionStateListener connectionStateListener) {
CuratorFramework client, LastStateConnectionStateListener connectionStateListener) {
this.client = checkNotNull(client, "Curator client");
this.counterPath = checkNotNull(counterPath, "Counter path");
this.counterPath = ZooKeeperUtils.getCheckpointIdCounterPath();
this.sharedCount = new SharedCount(client, counterPath, 1);
this.connectionStateListener = connectionStateListener;
}
Expand Down Expand Up @@ -176,4 +175,9 @@ private void checkConnectionState() {
}
});
}

@VisibleForTesting
String getPath() {
return counterPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,17 @@ public CompletedCheckpointStore createCheckpointStore(
throws Exception {

return ZooKeeperUtils.createCompletedCheckpoints(
client, config, jobId, maxNumberOfCheckpointsToRetain, executor);
ZooKeeperUtils.useNamespaceAndEnsurePath(
client, ZooKeeperUtils.getPathForJob(jobId)),
config,
maxNumberOfCheckpointsToRetain,
executor);
}

@Override
public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception {
return ZooKeeperUtils.createCheckpointIDCounter(client, config, jobID);
return ZooKeeperUtils.createCheckpointIDCounter(
ZooKeeperUtils.useNamespaceAndEnsurePath(
client, ZooKeeperUtils.getPathForJob(jobID)));
}
}
Loading