From 434f1bbf4ce0b1662aaa08fe5cc74e85cc820180 Mon Sep 17 00:00:00 2001 From: Yi Tang Date: Mon, 12 Apr 2021 12:59:40 +0800 Subject: [PATCH] [FLINK-20695][ha] Clean ha data for job if globally terminated At the moment Flink only cleans up the ha data (e.g. K8s ConfigMaps, or Zookeeper nodes) while shutting down the cluster. This is not enough for a long running session cluster to which you submit multiple jobs. In this commit, we clean up the data for the particular job if it reaches a globally terminal state. This closes #15561. --- .../KubernetesHaServices.java | 5 ++ .../KubernetesHaServicesTest.java | 34 +++++++++++ .../flink/runtime/dispatcher/Dispatcher.java | 20 +++++-- .../highavailability/AbstractHaServices.java | 16 +++++ .../HighAvailabilityServices.java | 8 +++ .../nonha/AbstractNonHaServices.java | 4 ++ .../zookeeper/ZooKeeperHaServices.java | 33 +++++++++- .../DispatcherResourceCleanupTest.java | 28 +++++++++ .../AbstractHaServicesTest.java | 42 ++++++++++++- .../TestingHighAvailabilityServices.java | 12 ++++ ...TestingManualHighAvailabilityServices.java | 5 ++ .../zookeeper/ZooKeeperHaServicesTest.java | 60 ++++++++++++++++++- 12 files changed, 257 insertions(+), 10 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java index f5ff366710251d..b7d35f89b67639 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java @@ -142,6 +142,11 @@ public void internalCleanup() throws Exception { .get(); } + @Override + public void internalCleanupJobData(JobID jobID) throws Exception { + kubeClient.deleteConfigMap(getLeaderNameForJobManager(jobID)).get(); + } + @Override protected String getLeaderNameForResourceManager() { return getLeaderName(RESOURCE_MANAGER_NAME); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java index d6b69c32c04eb2..cd784915510583 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java @@ -18,6 +18,9 @@ package org.apache.flink.kubernetes.highavailability; +import org.apache.flink.api.common.JobID; +import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; import org.apache.flink.runtime.blob.VoidBlobStore; import org.junit.Test; @@ -76,4 +79,35 @@ public void testInternalCleanupShouldCleanupConfigMaps() throws Exception { } }; } + + @Test + public void testInternalJobCleanupShouldCleanupConfigMaps() throws Exception { + new Context() { + { + runTest( + () -> { + final KubernetesHaServices kubernetesHaServices = + new KubernetesHaServices( + flinkKubeClient, + executorService, + configuration, + new VoidBlobStore()); + JobID jobID = new JobID(); + String configMapName = + kubernetesHaServices.getLeaderNameForJobManager(jobID); + final KubernetesConfigMap configMap = + new TestingFlinkKubeClient.MockKubernetesConfigMap( + configMapName); + flinkKubeClient.createConfigMap(configMap); + assertThat( + flinkKubeClient.getConfigMap(configMapName).isPresent(), + is(true)); + kubernetesHaServices.internalCleanupJobData(jobID); + assertThat( + flinkKubeClient.getConfigMap(configMapName).isPresent(), + is(false)); + }); + } + }; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 7e2ff220f94926..70bf6aa55cfbb4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -748,13 +748,14 @@ private CompletableFuture removeJob(JobID jobId, CleanupJobState cleanupJo private void cleanUpJobData(JobID jobId, boolean cleanupHA) { jobManagerMetricGroup.removeJob(jobId); - boolean cleanupHABlobs = false; + boolean jobGraphRemoved = false; if (cleanupHA) { try { jobGraphWriter.removeJobGraph(jobId); - // only clean up the HA blobs if we could remove the job from HA storage - cleanupHABlobs = true; + // only clean up the HA blobs and ha service data for the particular job + // if we could remove the job from HA storage + jobGraphRemoved = true; } catch (Exception e) { log.warn( "Could not properly remove job {} from submitted job graph store.", @@ -770,6 +771,17 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) { jobId, e); } + + if (jobGraphRemoved) { + try { + highAvailabilityServices.cleanupJobData(jobId); + } catch (Exception e) { + log.warn( + "Could not properly clean data for job {} stored by ha services", + jobId, + e); + } + } } else { try { jobGraphWriter.releaseJobGraph(jobId); @@ -781,7 +793,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) { } } - blobServer.cleanupJob(jobId, cleanupHABlobs); + blobServer.cleanupJob(jobId, jobGraphRemoved); } /** Terminate all currently running {@link JobManagerRunner}s. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java index 9a322a8d924875..4c4dfe610a26ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java @@ -205,6 +205,13 @@ public void closeAndCleanupAllData() throws Exception { logger.info("Finished cleaning up the high availability data."); } + @Override + public void cleanupJobData(JobID jobID) throws Exception { + logger.info("Clean up the high availability data for job {}.", jobID); + internalCleanupJobData(jobID); + logger.info("Finished cleaning up the high availability data for job {}.", jobID); + } + /** * Create leader election service with specified leaderName. * @@ -260,6 +267,15 @@ public void closeAndCleanupAllData() throws Exception { */ protected abstract void internalCleanup() throws Exception; + /** + * Clean up the meta data in the distributed system(e.g. Zookeeper, Kubernetes ConfigMap) for + * the specified Job. + * + * @param jobID The identifier of the job to cleanup. + * @throws Exception when do the cleanup operation on external storage. + */ + protected abstract void internalCleanupJobData(JobID jobID) throws Exception; + /** * Get the leader name for ResourceManager. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index fdb3fa7dcb5359..fdf031e34e19f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -237,4 +237,12 @@ default LeaderRetrievalService getClusterRestEndpointLeaderRetriever() { * up data stored by them. */ void closeAndCleanupAllData() throws Exception; + + /** + * Deletes all data for specified job stored by these services in external stores. + * + * @param jobID The identifier of the job to cleanup. + * @throws Exception Thrown, if an exception occurred while cleaning data stored by them. + */ + void cleanupJobData(JobID jobID) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java index 188bdbbfd1c42e..dfc53dca61ba01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.highavailability.nonha; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blob.BlobStore; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; @@ -111,6 +112,9 @@ public void closeAndCleanupAllData() throws Exception { close(); } + @Override + public void cleanupJobData(JobID jobID) throws Exception {} + // ---------------------------------------------------------------------- // Helper methods // ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index f6ab9e859d630a..802162b1428462 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -34,10 +34,14 @@ import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; import org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths; import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat; import javax.annotation.Nonnull; +import java.util.List; import java.util.concurrent.Executor; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -138,6 +142,22 @@ public void internalCleanup() throws Exception { cleanupZooKeeperPaths(); } + @Override + public void internalCleanupJobData(JobID jobID) throws Exception { + final List paths = + Stream.of( + HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + .map(configuration::getString) + .map(parent -> parent + "/" + jobID) + .collect(Collectors.toList()); + for (String path : paths) { + deleteZNode(path); + } + } + @Override protected String getLeaderNameForResourceManager() { return RESOURCE_MANAGER_LEADER_PATH; @@ -168,6 +188,10 @@ private void cleanupZooKeeperPaths() throws Exception { } private void deleteOwnedZNode() throws Exception { + deleteZNode("/"); + } + + private void deleteZNode(String path) throws Exception { // delete the HA_CLUSTER_ID znode which is owned by this cluster // Since we are using Curator version 2.12 there is a bug in deleting the children @@ -176,13 +200,18 @@ private void deleteOwnedZNode() throws Exception { // The retry logic can be removed once we upgrade to Curator version >= 4.0.1. boolean zNodeDeleted = false; while (!zNodeDeleted) { + Stat stat = client.checkExists().forPath(path); + if (stat == null) { + logger.debug("znode {} has been deleted", path); + return; + } try { - client.delete().deletingChildrenIfNeeded().forPath("/"); + client.delete().deletingChildrenIfNeeded().forPath(path); zNodeDeleted = true; } catch (KeeperException.NoNodeException ignored) { // concurrent delete operation. Try again. logger.debug( - "Retrying to delete owned znode because of other concurrent delete operation."); + "Retrying to delete znode because of other concurrent delete operation."); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index eb04d9d747f4bc..dd80410cb867ab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -131,6 +131,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { private CompletableFuture storedHABlobFuture; private CompletableFuture deleteAllHABlobsFuture; private CompletableFuture cleanupJobFuture; + private CompletableFuture cleanupJobHADataFuture; private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; @BeforeClass @@ -151,6 +152,8 @@ public void setup() throws Exception { clearedJobLatch = new OneShotLatch(); runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch); highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry); + cleanupJobHADataFuture = new CompletableFuture<>(); + highAvailabilityServices.setCleanupJobDataFuture(cleanupJobHADataFuture); storedHABlobFuture = new CompletableFuture<>(); deleteAllHABlobsFuture = new CompletableFuture<>(); @@ -456,6 +459,31 @@ public void testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exceptio assertThatHABlobsHaveBeenRemoved(); } + @Test + public void testHaDataCleanupWhenJobFinished() throws Exception { + TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob(); + TestingJobManagerRunner jobManagerRunner = + jobManagerRunnerFactory.takeCreatedJobManagerRunner(); + finishJob(jobManagerRunner); + JobID jobID = cleanupJobHADataFuture.get(2000, TimeUnit.MILLISECONDS); + assertThat(jobID, is(this.jobId)); + } + + @Test + public void testHaDataCleanupWhenJobNotFinished() throws Exception { + TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob(); + TestingJobManagerRunner jobManagerRunner = + jobManagerRunnerFactory.takeCreatedJobManagerRunner(); + jobManagerRunner.completeResultFutureExceptionally(new JobNotFinishedException(jobId)); + try { + cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS); + fail("We should not delete the HA data for job."); + } catch (TimeoutException ignored) { + // expected + } + assertThat(cleanupJobHADataFuture.isDone(), is(false)); + } + private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) { takeCreatedJobManagerRunner.completeResultFuture( new ExecutionGraphInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java index a90dc61c40a1bd..96e6daf27efb23 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java @@ -37,9 +37,12 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.function.Consumer; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -63,7 +66,8 @@ public void testCloseAndCleanupAllDataDeletesBlobsAfterCleaningUpHAData() throws Executors.directExecutor(), testingBlobStoreService, closeOperations, - () -> closeOperations.offer(CloseOperations.HA_CLEANUP)); + () -> closeOperations.offer(CloseOperations.HA_CLEANUP), + ignored -> {}); haServices.closeAndCleanupAllData(); @@ -94,7 +98,8 @@ public void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails( closeOperations, () -> { throw new FlinkException("test exception"); - }); + }, + ignored -> {}); try { haServices.closeAndCleanupAllData(); @@ -106,6 +111,29 @@ public void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails( assertThat(closeOperations, contains(CloseOperations.HA_CLOSE, CloseOperations.BLOB_CLOSE)); } + @Test + public void testCleanupJobData() throws Exception { + final Queue closeOperations = new ArrayDeque<>(3); + final TestingBlobStoreService testingBlobStoreService = + new TestingBlobStoreService(closeOperations); + + JobID jobID = new JobID(); + CompletableFuture jobCleanupFuture = new CompletableFuture<>(); + + final TestingHaServices haServices = + new TestingHaServices( + new Configuration(), + Executors.directExecutor(), + testingBlobStoreService, + closeOperations, + () -> {}, + jobCleanupFuture::complete); + + haServices.cleanupJobData(jobID); + JobID jobIDCleaned = jobCleanupFuture.get(); + assertThat(jobIDCleaned, is(jobID)); + } + private enum CloseOperations { HA_CLEANUP, HA_CLOSE, @@ -156,16 +184,19 @@ private static final class TestingHaServices extends AbstractHaServices { private final Queue closeOperations; private final RunnableWithException internalCleanupRunnable; + private final Consumer internalJobCleanupConsumer; private TestingHaServices( Configuration config, Executor ioExecutor, BlobStoreService blobStoreService, Queue closeOperations, - RunnableWithException internalCleanupRunnable) { + RunnableWithException internalCleanupRunnable, + Consumer internalJobCleanupConsumer) { super(config, ioExecutor, blobStoreService); this.closeOperations = closeOperations; this.internalCleanupRunnable = internalCleanupRunnable; + this.internalJobCleanupConsumer = internalJobCleanupConsumer; } @Override @@ -203,6 +234,11 @@ protected void internalCleanup() throws Exception { internalCleanupRunnable.run(); } + @Override + protected void internalCleanupJobData(JobID jobID) throws Exception { + internalJobCleanupConsumer.accept(jobID); + } + @Override protected String getLeaderNameForResourceManager() { throw new UnsupportedOperationException("Not supported by this test implementation."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 0afc9812d86e90..4434eefc852bea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -72,6 +73,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private CompletableFuture closeAndCleanupAllDataFuture = new CompletableFuture<>(); + private volatile CompletableFuture jobCleanupFuture; + // ------------------------------------------------------------------------ // Setters for mock / testing implementations // ------------------------------------------------------------------------ @@ -145,6 +148,10 @@ public void setCloseAndCleanupAllDataFuture( this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture; } + public void setCleanupJobDataFuture(CompletableFuture jobCleanupFuture) { + this.jobCleanupFuture = jobCleanupFuture; + } + // ------------------------------------------------------------------------ // HA Services Methods // ------------------------------------------------------------------------ @@ -277,4 +284,9 @@ public void close() throws Exception { public void closeAndCleanupAllData() throws Exception { closeAndCleanupAllDataFuture.complete(null); } + + @Override + public void cleanupJobData(JobID jobID) { + Optional.ofNullable(jobCleanupFuture).ifPresent(f -> f.complete(jobID)); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java index 6cbb92c8cc3fc7..912b48f9538e96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java @@ -136,6 +136,11 @@ public void closeAndCleanupAllData() throws Exception { // nothing to do } + @Override + public void cleanupJobData(JobID jobID) throws Exception { + // nothing to do + } + public void grantLeadership(JobID jobId, int index, UUID leaderId) { ManualLeaderService manualLeaderService = jobManagerLeaderServices.get(jobId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java index 81dc6b450db150..32601b9532d47a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java @@ -49,9 +49,12 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -148,6 +151,41 @@ public void testCloseAndCleanupAllDataWithUncle() throws Exception { assertThat(client.checkExists().forPath(unclePath), is(notNullValue())); } + /** Tests that the ZooKeeperHaServices cleans up paths for job manager. */ + @Test + public void testCleanupJobData() throws Exception { + String rootPath = "/foo/bar/flink"; + final Configuration configuration = createConfiguration(rootPath); + String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); + + final List paths = + Stream.of( + HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + .map(configuration::getString) + .map(path -> rootPath + namespace + path) + .collect(Collectors.toList()); + + final TestingBlobStoreService blobStoreService = new TestingBlobStoreService(); + + JobID jobID = new JobID(); + runCleanupTestWithJob( + configuration, + blobStoreService, + jobID, + haServices -> { + for (String path : paths) { + final List children = client.getChildren().forPath(path); + assertThat(children, hasItem(jobID.toString())); + } + haServices.cleanupJobData(jobID); + for (String path : paths) { + final List children = client.getChildren().forPath(path); + assertThat(children, not(hasItem(jobID.toString()))); + } + }); + } + private static CuratorFramework startCuratorFramework() { return CuratorFrameworkFactory.builder() .connectString(ZOO_KEEPER_RESOURCE.getConnectString()) @@ -170,6 +208,16 @@ private void runCleanupTest( TestingBlobStoreService blobStoreService, ThrowingConsumer zooKeeperHaServicesConsumer) throws Exception { + runCleanupTestWithJob( + configuration, blobStoreService, new JobID(), zooKeeperHaServicesConsumer); + } + + private void runCleanupTestWithJob( + Configuration configuration, + TestingBlobStoreService blobStoreService, + JobID jobId, + ThrowingConsumer zooKeeperHaServicesConsumer) + throws Exception { try (ZooKeeperHaServices zooKeeperHaServices = new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(configuration), @@ -190,13 +238,23 @@ private void runCleanupTest( resourceManagerLeaderRetriever.start(listener); resourceManagerLeaderElectionService.start( new TestingContender("foobar", resourceManagerLeaderElectionService)); - final JobID jobId = new JobID(); + LeaderElectionService jobManagerLeaderElectionService = + zooKeeperHaServices.getJobManagerLeaderElectionService(jobId); + jobManagerLeaderElectionService.start( + new TestingContender("", jobManagerLeaderElectionService)); + LeaderRetrievalService jobManagerLeaderRetriever = + zooKeeperHaServices.getJobManagerLeaderRetriever(jobId); + jobManagerLeaderRetriever.start( + new LeaderRetrievalUtils.LeaderConnectionInfoListener()); + runningJobsRegistry.setJobRunning(jobId); listener.getLeaderConnectionInfoFuture().join(); resourceManagerLeaderRetriever.stop(); resourceManagerLeaderElectionService.stop(); + jobManagerLeaderRetriever.stop(); + jobManagerLeaderElectionService.stop(); runningJobsRegistry.clearJob(jobId); zooKeeperHaServicesConsumer.accept(zooKeeperHaServices);