Skip to content

Commit

Permalink
[FLINK-20695][ha] Clean ha data for job if globally terminated
Browse files Browse the repository at this point in the history
At the moment Flink only cleans up the ha data (e.g. K8s ConfigMaps, or
Zookeeper nodes) while shutting down the cluster. This is not enough for
a long running session cluster to which you submit multiple jobs. In
this commit, we clean up the data for the particular job if it reaches a
globally terminal state.

This closes apache#15561.
  • Loading branch information
yittg authored and tillrohrmann committed May 12, 2021
1 parent ad86adf commit 434f1bb
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ public void internalCleanup() throws Exception {
.get();
}

@Override
public void internalCleanupJobData(JobID jobID) throws Exception {
kubeClient.deleteConfigMap(getLeaderNameForJobManager(jobID)).get();
}

@Override
protected String getLeaderNameForResourceManager() {
return getLeaderName(RESOURCE_MANAGER_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.kubernetes.highavailability;

import org.apache.flink.api.common.JobID;
import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.runtime.blob.VoidBlobStore;

import org.junit.Test;
Expand Down Expand Up @@ -76,4 +79,35 @@ public void testInternalCleanupShouldCleanupConfigMaps() throws Exception {
}
};
}

@Test
public void testInternalJobCleanupShouldCleanupConfigMaps() throws Exception {
new Context() {
{
runTest(
() -> {
final KubernetesHaServices kubernetesHaServices =
new KubernetesHaServices(
flinkKubeClient,
executorService,
configuration,
new VoidBlobStore());
JobID jobID = new JobID();
String configMapName =
kubernetesHaServices.getLeaderNameForJobManager(jobID);
final KubernetesConfigMap configMap =
new TestingFlinkKubeClient.MockKubernetesConfigMap(
configMapName);
flinkKubeClient.createConfigMap(configMap);
assertThat(
flinkKubeClient.getConfigMap(configMapName).isPresent(),
is(true));
kubernetesHaServices.internalCleanupJobData(jobID);
assertThat(
flinkKubeClient.getConfigMap(configMapName).isPresent(),
is(false));
});
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -748,13 +748,14 @@ private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState cleanupJo
private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
jobManagerMetricGroup.removeJob(jobId);

boolean cleanupHABlobs = false;
boolean jobGraphRemoved = false;
if (cleanupHA) {
try {
jobGraphWriter.removeJobGraph(jobId);

// only clean up the HA blobs if we could remove the job from HA storage
cleanupHABlobs = true;
// only clean up the HA blobs and ha service data for the particular job
// if we could remove the job from HA storage
jobGraphRemoved = true;
} catch (Exception e) {
log.warn(
"Could not properly remove job {} from submitted job graph store.",
Expand All @@ -770,6 +771,17 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
jobId,
e);
}

if (jobGraphRemoved) {
try {
highAvailabilityServices.cleanupJobData(jobId);
} catch (Exception e) {
log.warn(
"Could not properly clean data for job {} stored by ha services",
jobId,
e);
}
}
} else {
try {
jobGraphWriter.releaseJobGraph(jobId);
Expand All @@ -781,7 +793,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
}
}

blobServer.cleanupJob(jobId, cleanupHABlobs);
blobServer.cleanupJob(jobId, jobGraphRemoved);
}

/** Terminate all currently running {@link JobManagerRunner}s. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ public void closeAndCleanupAllData() throws Exception {
logger.info("Finished cleaning up the high availability data.");
}

@Override
public void cleanupJobData(JobID jobID) throws Exception {
logger.info("Clean up the high availability data for job {}.", jobID);
internalCleanupJobData(jobID);
logger.info("Finished cleaning up the high availability data for job {}.", jobID);
}

/**
* Create leader election service with specified leaderName.
*
Expand Down Expand Up @@ -260,6 +267,15 @@ public void closeAndCleanupAllData() throws Exception {
*/
protected abstract void internalCleanup() throws Exception;

/**
* Clean up the meta data in the distributed system(e.g. Zookeeper, Kubernetes ConfigMap) for
* the specified Job.
*
* @param jobID The identifier of the job to cleanup.
* @throws Exception when do the cleanup operation on external storage.
*/
protected abstract void internalCleanupJobData(JobID jobID) throws Exception;

/**
* Get the leader name for ResourceManager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,12 @@ default LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
* up data stored by them.
*/
void closeAndCleanupAllData() throws Exception;

/**
* Deletes all data for specified job stored by these services in external stores.
*
* @param jobID The identifier of the job to cleanup.
* @throws Exception Thrown, if an exception occurred while cleaning data stored by them.
*/
void cleanupJobData(JobID jobID) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.highavailability.nonha;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
Expand Down Expand Up @@ -111,6 +112,9 @@ public void closeAndCleanupAllData() throws Exception {
close();
}

@Override
public void cleanupJobData(JobID jobID) throws Exception {}

// ----------------------------------------------------------------------
// Helper methods
// ----------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;

import javax.annotation.Nonnull;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -138,6 +142,22 @@ public void internalCleanup() throws Exception {
cleanupZooKeeperPaths();
}

@Override
public void internalCleanupJobData(JobID jobID) throws Exception {
final List<String> paths =
Stream.of(
HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH,
HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH,
HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH)
.map(configuration::getString)
.map(parent -> parent + "/" + jobID)
.collect(Collectors.toList());
for (String path : paths) {
deleteZNode(path);
}
}

@Override
protected String getLeaderNameForResourceManager() {
return RESOURCE_MANAGER_LEADER_PATH;
Expand Down Expand Up @@ -168,6 +188,10 @@ private void cleanupZooKeeperPaths() throws Exception {
}

private void deleteOwnedZNode() throws Exception {
deleteZNode("/");
}

private void deleteZNode(String path) throws Exception {
// delete the HA_CLUSTER_ID znode which is owned by this cluster

// Since we are using Curator version 2.12 there is a bug in deleting the children
Expand All @@ -176,13 +200,18 @@ private void deleteOwnedZNode() throws Exception {
// The retry logic can be removed once we upgrade to Curator version >= 4.0.1.
boolean zNodeDeleted = false;
while (!zNodeDeleted) {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
logger.debug("znode {} has been deleted", path);
return;
}
try {
client.delete().deletingChildrenIfNeeded().forPath("/");
client.delete().deletingChildrenIfNeeded().forPath(path);
zNodeDeleted = true;
} catch (KeeperException.NoNodeException ignored) {
// concurrent delete operation. Try again.
logger.debug(
"Retrying to delete owned znode because of other concurrent delete operation.");
"Retrying to delete znode because of other concurrent delete operation.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public class DispatcherResourceCleanupTest extends TestLogger {
private CompletableFuture<BlobKey> storedHABlobFuture;
private CompletableFuture<JobID> deleteAllHABlobsFuture;
private CompletableFuture<JobID> cleanupJobFuture;
private CompletableFuture<JobID> cleanupJobHADataFuture;
private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;

@BeforeClass
Expand All @@ -151,6 +152,8 @@ public void setup() throws Exception {
clearedJobLatch = new OneShotLatch();
runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch);
highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry);
cleanupJobHADataFuture = new CompletableFuture<>();
highAvailabilityServices.setCleanupJobDataFuture(cleanupJobHADataFuture);

storedHABlobFuture = new CompletableFuture<>();
deleteAllHABlobsFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -456,6 +459,31 @@ public void testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exceptio
assertThatHABlobsHaveBeenRemoved();
}

@Test
public void testHaDataCleanupWhenJobFinished() throws Exception {
TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob();
TestingJobManagerRunner jobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
finishJob(jobManagerRunner);
JobID jobID = cleanupJobHADataFuture.get(2000, TimeUnit.MILLISECONDS);
assertThat(jobID, is(this.jobId));
}

@Test
public void testHaDataCleanupWhenJobNotFinished() throws Exception {
TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob();
TestingJobManagerRunner jobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
jobManagerRunner.completeResultFutureExceptionally(new JobNotFinishedException(jobId));
try {
cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS);
fail("We should not delete the HA data for job.");
} catch (TimeoutException ignored) {
// expected
}
assertThat(cleanupJobHADataFuture.isDone(), is(false));
}

private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) {
takeCreatedJobManagerRunner.completeResultFuture(
new ExecutionGraphInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

Expand All @@ -63,7 +66,8 @@ public void testCloseAndCleanupAllDataDeletesBlobsAfterCleaningUpHAData() throws
Executors.directExecutor(),
testingBlobStoreService,
closeOperations,
() -> closeOperations.offer(CloseOperations.HA_CLEANUP));
() -> closeOperations.offer(CloseOperations.HA_CLEANUP),
ignored -> {});

haServices.closeAndCleanupAllData();

Expand Down Expand Up @@ -94,7 +98,8 @@ public void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails(
closeOperations,
() -> {
throw new FlinkException("test exception");
});
},
ignored -> {});

try {
haServices.closeAndCleanupAllData();
Expand All @@ -106,6 +111,29 @@ public void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails(
assertThat(closeOperations, contains(CloseOperations.HA_CLOSE, CloseOperations.BLOB_CLOSE));
}

@Test
public void testCleanupJobData() throws Exception {
final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
final TestingBlobStoreService testingBlobStoreService =
new TestingBlobStoreService(closeOperations);

JobID jobID = new JobID();
CompletableFuture<JobID> jobCleanupFuture = new CompletableFuture<>();

final TestingHaServices haServices =
new TestingHaServices(
new Configuration(),
Executors.directExecutor(),
testingBlobStoreService,
closeOperations,
() -> {},
jobCleanupFuture::complete);

haServices.cleanupJobData(jobID);
JobID jobIDCleaned = jobCleanupFuture.get();
assertThat(jobIDCleaned, is(jobID));
}

private enum CloseOperations {
HA_CLEANUP,
HA_CLOSE,
Expand Down Expand Up @@ -156,16 +184,19 @@ private static final class TestingHaServices extends AbstractHaServices {

private final Queue<? super CloseOperations> closeOperations;
private final RunnableWithException internalCleanupRunnable;
private final Consumer<JobID> internalJobCleanupConsumer;

private TestingHaServices(
Configuration config,
Executor ioExecutor,
BlobStoreService blobStoreService,
Queue<? super CloseOperations> closeOperations,
RunnableWithException internalCleanupRunnable) {
RunnableWithException internalCleanupRunnable,
Consumer<JobID> internalJobCleanupConsumer) {
super(config, ioExecutor, blobStoreService);
this.closeOperations = closeOperations;
this.internalCleanupRunnable = internalCleanupRunnable;
this.internalJobCleanupConsumer = internalJobCleanupConsumer;
}

@Override
Expand Down Expand Up @@ -203,6 +234,11 @@ protected void internalCleanup() throws Exception {
internalCleanupRunnable.run();
}

@Override
protected void internalCleanupJobData(JobID jobID) throws Exception {
internalJobCleanupConsumer.accept(jobID);
}

@Override
protected String getLeaderNameForResourceManager() {
throw new UnsupportedOperationException("Not supported by this test implementation.");
Expand Down
Loading

0 comments on commit 434f1bb

Please sign in to comment.