diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java b/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java index 6bd2011e0c648..5726899b7cd7b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/JobStatus.java @@ -24,8 +24,8 @@ @PublicEvolving public enum JobStatus { /** - * The job has been received by the Dispatcher, and is waiting for the job manager to be - * created. + * The job has been received by the Dispatcher, and is waiting for the job manager to receive + * leadership and to be created. */ INITIALIZING(TerminalState.NON_TERMINAL), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 401c4d127edd3..ea6786e88963f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.client.DuplicateJobSubmissionException; -import org.apache.flink.runtime.client.JobInitializationException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -43,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmanager.JobGraphWriter; import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobNotFinishedException; @@ -116,7 +116,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint runningJobs; + private final Map runningJobs; private final Collection recoveredJobs; @@ -134,7 +134,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint> dispatcherJobTerminationFutures; + private final Map> jobManagerRunnerTerminationFutures; protected final CompletableFuture shutDownFuture; @@ -182,7 +182,7 @@ public Dispatcher( this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory(); - this.dispatcherJobTerminationFutures = new HashMap<>(2); + this.jobManagerRunnerTerminationFutures = new HashMap<>(2); this.shutDownFuture = new CompletableFuture<>(); @@ -393,36 +393,33 @@ private void persistAndRunJob(JobGraph jobGraph) throws Exception { runJob(jobGraph, ExecutionType.SUBMISSION); } - private void runJob(JobGraph jobGraph, ExecutionType executionType) { + private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Exception { Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID())); long initializationTimestamp = System.currentTimeMillis(); - CompletableFuture jobManagerRunnerFuture = + JobManagerRunner jobManagerRunner = createJobManagerRunner(jobGraph, initializationTimestamp); - DispatcherJob dispatcherJob = - DispatcherJob.createFor( - jobManagerRunnerFuture, - jobGraph.getJobID(), - jobGraph.getName(), - initializationTimestamp); - runningJobs.put(jobGraph.getJobID(), dispatcherJob); + runningJobs.put(jobGraph.getJobID(), jobManagerRunner); final JobID jobId = jobGraph.getJobID(); final CompletableFuture cleanupJobStateFuture = - dispatcherJob + jobManagerRunner .getResultFuture() .handleAsync( - (dispatcherJobResult, throwable) -> { + (jobManagerRunnerResult, throwable) -> { Preconditions.checkState( - runningJobs.get(jobId) == dispatcherJob, - "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob."); - - if (dispatcherJobResult != null) { - return handleDispatcherJobResult( - jobId, dispatcherJobResult, executionType); + runningJobs.get(jobId) == jobManagerRunner, + "The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner."); + + if (jobManagerRunnerResult != null) { + return handleJobManagerRunnerResult( + jobGraph, + initializationTimestamp, + jobManagerRunnerResult, + executionType); } else { - return dispatcherJobFailed(jobId, throwable); + return jobManagerRunnerFailed(jobId, throwable); } }, getMainThreadExecutor()); @@ -433,16 +430,31 @@ private void runJob(JobGraph jobGraph, ExecutionType executionType) { .thenCompose(Function.identity()); FutureUtils.assertNoException(jobTerminationFuture); - registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture); - } - - private CleanupJobState handleDispatcherJobResult( - JobID jobId, DispatcherJobResult dispatcherJobResult, ExecutionType executionType) { - if (dispatcherJobResult.isInitializationFailure() - && executionType == ExecutionType.RECOVERY) { - return dispatcherJobFailed(jobId, dispatcherJobResult.getInitializationFailure()); + registerJobManagerRunnerTerminationFuture(jobId, jobTerminationFuture); + } + + private CleanupJobState handleJobManagerRunnerResult( + JobGraph jobGraph, + long initializationTimestamp, + JobManagerRunnerResult jobManagerRunnerResult, + ExecutionType executionType) { + if (jobManagerRunnerResult.isInitializationFailure()) { + if (executionType == ExecutionType.RECOVERY) { + return jobManagerRunnerFailed( + jobGraph.getJobID(), jobManagerRunnerResult.getInitializationFailure()); + } else { + final ArchivedExecutionGraph archivedExecutionGraph = + ArchivedExecutionGraph.createFromInitializingJob( + jobGraph.getJobID(), + jobGraph.getName(), + JobStatus.FAILED, + jobManagerRunnerResult.getInitializationFailure(), + initializationTimestamp); + return jobReachedGloballyTerminalState( + new ExecutionGraphInfo(archivedExecutionGraph)); + } } else { - return jobReachedGloballyTerminalState(dispatcherJobResult.getExecutionGraphInfo()); + return jobReachedGloballyTerminalState(jobManagerRunnerResult.getExecutionGraphInfo()); } } @@ -457,7 +469,7 @@ enum CleanupJobState { } } - private CleanupJobState dispatcherJobFailed(JobID jobId, Throwable throwable) { + private CleanupJobState jobManagerRunnerFailed(JobID jobId, Throwable throwable) { if (throwable instanceof JobNotFinishedException) { jobNotFinished(jobId); } else { @@ -467,36 +479,23 @@ private CleanupJobState dispatcherJobFailed(JobID jobId, Throwable throwable) { return CleanupJobState.LOCAL; } - CompletableFuture createJobManagerRunner( - JobGraph jobGraph, long initializationTimestamp) { + JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) + throws Exception { final RpcService rpcService = getRpcService(); - return CompletableFuture.supplyAsync( - () -> { - try { - JobManagerRunner runner = - jobManagerRunnerFactory.createJobManagerRunner( - jobGraph, - configuration, - rpcService, - highAvailabilityServices, - heartbeatServices, - jobManagerSharedServices, - new DefaultJobManagerJobMetricGroupFactory( - jobManagerMetricGroup), - fatalErrorHandler, - initializationTimestamp); - runner.start(); - return runner; - } catch (Exception e) { - throw new CompletionException( - new JobInitializationException( - jobGraph.getJobID(), - "Could not instantiate JobManager.", - e)); - } - }, - ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on - // JobManager creation + + JobManagerRunner runner = + jobManagerRunnerFactory.createJobManagerRunner( + jobGraph, + configuration, + rpcService, + highAvailabilityServices, + heartbeatServices, + jobManagerSharedServices, + new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), + fatalErrorHandler, + initializationTimestamp); + runner.start(); + return runner; } @Override @@ -531,7 +530,7 @@ public CompletableFuture disposeSavepoint(String savepointPath, Tim @Override public CompletableFuture cancelJob(JobID jobId, Time timeout) { - Optional maybeJob = getDispatcherJob(jobId); + Optional maybeJob = getJobManagerRunner(jobId); return maybeJob.map(job -> job.cancel(timeout)) .orElseGet( () -> { @@ -550,7 +549,7 @@ public CompletableFuture requestClusterOverview(Time timeout) { final List>> optionalJobInformation = queryJobMastersForInformation( - dispatcherJob -> dispatcherJob.requestJobStatus(timeout)); + jobManagerRunner -> jobManagerRunner.requestJobStatus(timeout)); CompletableFuture>> allOptionalJobsFuture = FutureUtils.combineAll(optionalJobInformation); @@ -572,7 +571,8 @@ public CompletableFuture requestClusterOverview(Time timeout) { @Override public CompletableFuture requestMultipleJobDetails(Time timeout) { List>> individualOptionalJobDetails = - queryJobMastersForInformation(dj -> dj.requestJobDetails(timeout)); + queryJobMastersForInformation( + jobManagerRunner -> jobManagerRunner.requestJobDetails(timeout)); CompletableFuture>> optionalCombinedJobDetails = FutureUtils.combineAll(individualOptionalJobDetails); @@ -597,7 +597,7 @@ public CompletableFuture requestMultipleJobDetails(Time tim @Override public CompletableFuture requestJobStatus(JobID jobId, Time timeout) { - Optional maybeJob = getDispatcherJob(jobId); + Optional maybeJob = getJobManagerRunner(jobId); return maybeJob.map(job -> job.requestJobStatus(timeout)) .orElseGet( () -> { @@ -628,7 +628,7 @@ public CompletableFuture requestExecutionGraphInfo( return executionGraphInfo; } }; - Optional maybeJob = getDispatcherJob(jobId); + Optional maybeJob = getJobManagerRunner(jobId); return maybeJob.map(job -> job.requestJob(timeout)) .orElse(FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId))) .exceptionally(checkExecutionGraphStoreOnException); @@ -636,7 +636,7 @@ public CompletableFuture requestExecutionGraphInfo( @Override public CompletableFuture requestJobResult(JobID jobId, Time timeout) { - DispatcherJob job = runningJobs.get(jobId); + JobManagerRunner job = runningJobs.get(jobId); if (job == null) { final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId); @@ -650,9 +650,9 @@ public CompletableFuture requestJobResult(JobID jobId, Time timeout) } else { return job.getResultFuture() .thenApply( - dispatcherJobResult -> + jobManagerRunnerResult -> JobResult.createFrom( - dispatcherJobResult + jobManagerRunnerResult .getExecutionGraphInfo() .getArchivedExecutionGraph())); } @@ -728,28 +728,28 @@ public CompletableFuture deliverCoordinationRequestToCoord operatorId, serializedRequest, timeout)); } - private void registerDispatcherJobTerminationFuture( - JobID jobId, CompletableFuture dispatcherJobTerminationFuture) { - Preconditions.checkState(!dispatcherJobTerminationFutures.containsKey(jobId)); - dispatcherJobTerminationFutures.put(jobId, dispatcherJobTerminationFuture); + private void registerJobManagerRunnerTerminationFuture( + JobID jobId, CompletableFuture jobManagerRunnerTerminationFuture) { + Preconditions.checkState(!jobManagerRunnerTerminationFutures.containsKey(jobId)); + jobManagerRunnerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture); // clean up the pending termination future - dispatcherJobTerminationFuture.thenRunAsync( + jobManagerRunnerTerminationFuture.thenRunAsync( () -> { final CompletableFuture terminationFuture = - dispatcherJobTerminationFutures.remove(jobId); + jobManagerRunnerTerminationFutures.remove(jobId); //noinspection ObjectEquality if (terminationFuture != null - && terminationFuture != dispatcherJobTerminationFuture) { - dispatcherJobTerminationFutures.put(jobId, terminationFuture); + && terminationFuture != jobManagerRunnerTerminationFuture) { + jobManagerRunnerTerminationFutures.put(jobId, terminationFuture); } }, getMainThreadExecutor()); } private CompletableFuture removeJob(JobID jobId, CleanupJobState cleanupJobState) { - final DispatcherJob job = checkNotNull(runningJobs.remove(jobId)); + final JobManagerRunner job = checkNotNull(runningJobs.remove(jobId)); final CompletableFuture jobTerminationFuture = job.closeAsync(); @@ -796,7 +796,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) { blobServer.cleanupJob(jobId, cleanupHABlobs); } - /** Terminate all currently running {@link DispatcherJob}s. */ + /** Terminate all currently running {@link JobManagerRunner}s. */ private void terminateRunningJobs() { log.info("Stopping all currently running jobs of dispatcher {}.", getAddress()); @@ -808,16 +808,17 @@ private void terminateRunningJobs() { } private void terminateJob(JobID jobId) { - final DispatcherJob dispatcherJob = runningJobs.get(jobId); + final JobManagerRunner jobManagerRunner = runningJobs.get(jobId); - if (dispatcherJob != null) { - dispatcherJob.closeAsync(); + if (jobManagerRunner != null) { + jobManagerRunner.closeAsync(); } } private CompletableFuture terminateRunningJobsAndGetTerminationFuture() { terminateRunningJobs(); - final Collection> values = dispatcherJobTerminationFutures.values(); + final Collection> values = + jobManagerRunnerTerminationFutures.values(); return FutureUtils.completeAll(values); } @@ -884,10 +885,11 @@ private void jobMasterFailed(JobID jobId, Throwable cause) { /** Ensures that the JobMasterGateway is available. */ private CompletableFuture getJobMasterGateway(JobID jobId) { - DispatcherJob job = runningJobs.get(jobId); + JobManagerRunner job = runningJobs.get(jobId); if (job == null) { return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } + if (!job.isInitialized()) { return FutureUtils.completedExceptionally( new UnavailableDispatcherOperationException( @@ -906,7 +908,7 @@ private CompletableFuture getResourceManagerGateway() { return resourceManagerGatewayRetriever.getFuture(); } - private Optional getDispatcherJob(JobID jobId) { + private Optional getJobManagerRunner(JobID jobId) { return Optional.ofNullable(runningJobs.get(jobId)); } @@ -926,12 +928,12 @@ private List flattenOptionalCollection(Collection> optionalCo @Nonnull private List>> queryJobMastersForInformation( - Function> queryFunction) { + Function> queryFunction) { List>> optionalJobInformation = new ArrayList<>(runningJobs.size()); - for (DispatcherJob job : runningJobs.values()) { + for (JobManagerRunner job : runningJobs.values()) { final CompletableFuture> queryResult = queryFunction .apply(job) @@ -958,7 +960,7 @@ private CompletableFuture waitForTerminatingJob( return jobManagerTerminationFuture.thenAcceptAsync( FunctionUtils.uncheckedConsumer( (ignored) -> { - dispatcherJobTerminationFutures.remove(jobId); + jobManagerRunnerTerminationFutures.remove(jobId); action.accept(jobGraph); }), getMainThreadExecutor()); @@ -970,7 +972,7 @@ CompletableFuture getJobTerminationFuture(JobID jobId) { new DispatcherException( String.format("Job with job id %s is still running.", jobId))); } else { - return dispatcherJobTerminationFutures.getOrDefault( + return jobManagerRunnerTerminationFutures.getOrDefault( jobId, CompletableFuture.completedFuture(null)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java deleted file mode 100644 index 7243303a0a851..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.dispatcher; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.jobmaster.JobManagerRunner; -import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult; -import org.apache.flink.runtime.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.jobmaster.JobNotFinishedException; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; -import org.apache.flink.util.AutoCloseableAsync; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; - -import java.util.concurrent.CompletableFuture; - -/** Abstraction used by the {@link Dispatcher} to manage jobs. */ -public final class DispatcherJob implements AutoCloseableAsync { - - private final Logger log = LoggerFactory.getLogger(DispatcherJob.class); - - private final CompletableFuture jobManagerRunnerFuture; - private final CompletableFuture jobResultFuture; - private final CompletableFuture terminationFuture = new CompletableFuture<>(); - - private final long initializationTimestamp; - private final JobID jobId; - private final String jobName; - - private final Object lock = new Object(); - - // internal field to track job status during initialization. Is not updated anymore after - // job is initialized, cancelled or failed. - @GuardedBy("lock") - private DispatcherJobStatus jobStatus = DispatcherJobStatus.INITIALIZING; - - private enum DispatcherJobStatus { - // We are waiting for the JobManagerRunner to be initialized - INITIALIZING(JobStatus.INITIALIZING), - // JobManagerRunner is initialized - JOB_MANAGER_RUNNER_INITIALIZED(null), - // waiting for cancellation. We stay in this status until the job result future completed, - // then we consider the JobManager to be initialized. - CANCELLING(JobStatus.CANCELLING); - - @Nullable private final JobStatus jobStatus; - - DispatcherJobStatus(JobStatus jobStatus) { - this.jobStatus = jobStatus; - } - - public JobStatus asJobStatus() { - if (jobStatus == null) { - throw new IllegalStateException("This state is not defined as a 'JobStatus'"); - } - return jobStatus; - } - } - - static DispatcherJob createFor( - CompletableFuture jobManagerRunnerFuture, - JobID jobId, - String jobName, - long initializationTimestamp) { - return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp); - } - - private DispatcherJob( - CompletableFuture jobManagerRunnerFuture, - JobID jobId, - String jobName, - long initializationTimestamp) { - this.jobManagerRunnerFuture = jobManagerRunnerFuture; - this.jobId = jobId; - this.jobName = jobName; - this.initializationTimestamp = initializationTimestamp; - this.jobResultFuture = new CompletableFuture<>(); - - FutureUtils.assertNoException( - this.jobManagerRunnerFuture.handle( - (jobManagerRunner, throwable) -> { - // JM has been initialized, or the initialization failed - synchronized (lock) { - jobStatus = DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; - if (throwable == null) { // initialization succeeded - // Forward result future - jobManagerRunner - .getResultFuture() - .whenComplete( - (jobManagerRunnerResult, resultThrowable) -> { - if (jobManagerRunnerResult != null) { - handleJobManagerRunnerResult( - jobManagerRunnerResult); - } else { - jobResultFuture.completeExceptionally( - ExceptionUtils - .stripCompletionException( - resultThrowable)); - } - }); - } else { // failure during initialization - handleInitializationFailure( - ExceptionUtils.stripCompletionException(throwable)); - } - } - return null; - })); - } - - private void handleJobManagerRunnerResult(JobManagerRunnerResult jobManagerRunnerResult) { - if (jobManagerRunnerResult.isSuccess()) { - jobResultFuture.complete( - DispatcherJobResult.forSuccess(jobManagerRunnerResult.getExecutionGraphInfo())); - } else if (jobManagerRunnerResult.isJobNotFinished()) { - jobResultFuture.completeExceptionally(new JobNotFinishedException(jobId)); - } else if (jobManagerRunnerResult.isInitializationFailure()) { - handleInitializationFailure(jobManagerRunnerResult.getInitializationFailure()); - } - } - - private void handleInitializationFailure(Throwable initializationFailure) { - ArchivedExecutionGraph archivedExecutionGraph = - ArchivedExecutionGraph.createFromInitializingJob( - jobId, - jobName, - JobStatus.FAILED, - initializationFailure, - initializationTimestamp); - jobResultFuture.complete( - DispatcherJobResult.forInitializationFailure( - new ExecutionGraphInfo(archivedExecutionGraph), initializationFailure)); - } - - public CompletableFuture getResultFuture() { - return jobResultFuture; - } - - public CompletableFuture requestJobDetails(Time timeout) { - return requestJob(timeout) - .thenApply( - executionGraphInfo -> { - synchronized (lock) { - return JobDetails.createDetailsForJob( - executionGraphInfo.getArchivedExecutionGraph()); - } - }); - } - - /** - * Cancel job. A cancellation will be scheduled if the initialization is not completed. The - * returned future will complete exceptionally if the JobManagerRunner initialization failed. - */ - public CompletableFuture cancel(Time timeout) { - synchronized (lock) { - if (isInitialized()) { - return getJobMasterGateway() - .thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout)); - } else { - log.info( - "Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", - jobId); - - // cancel job - CompletableFuture cancelFuture = - jobManagerRunnerFuture - .thenCompose(JobManagerRunner::getJobMasterGateway) - .thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout)); - cancelFuture.whenComplete( - (ignored, cancelThrowable) -> { - if (cancelThrowable != null) { - log.warn("Cancellation of job {} failed", jobId, cancelThrowable); - } - }); - jobStatus = DispatcherJobStatus.CANCELLING; - return cancelFuture; - } - } - } - - public CompletableFuture requestJobStatus(Time timeout) { - return requestJob(timeout) - .thenApply( - executionGraphInfo -> - executionGraphInfo.getArchivedExecutionGraph().getState()); - } - - /** Returns a future completing to the ExecutionGraphInfo of the job. */ - public CompletableFuture requestJob(Time timeout) { - synchronized (lock) { - if (isInitialized()) { - if (jobResultFuture.isDone()) { // job is not running anymore - return jobResultFuture.thenApply(DispatcherJobResult::getExecutionGraphInfo); - } - // job is still running - return getJobMasterGateway() - .thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(timeout)); - } else { - Preconditions.checkState( - this.jobStatus == DispatcherJobStatus.INITIALIZING - || jobStatus == DispatcherJobStatus.CANCELLING); - return CompletableFuture.completedFuture( - new ExecutionGraphInfo( - ArchivedExecutionGraph.createFromInitializingJob( - jobId, - jobName, - jobStatus.asJobStatus(), - null, - initializationTimestamp))); - } - } - } - - /** - * The job is initialized once the JobManager runner has been initialized. It is also - * initialized if the runner initialization failed, or of it has been canceled (and the - * cancellation is complete). - */ - public boolean isInitialized() { - synchronized (lock) { - return jobStatus == DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; - } - } - - /** - * Returns the {@link JobMasterGateway} from the JobManagerRunner. - * - * @return the {@link JobMasterGateway}. The future will complete exceptionally if the - * JobManagerRunner initialization failed. - * @throws IllegalStateException is thrown if the job is not initialized - */ - public CompletableFuture getJobMasterGateway() { - Preconditions.checkState( - isInitialized(), "JobMaster Gateway is not available during initialization"); - return jobManagerRunnerFuture.thenCompose(JobManagerRunner::getJobMasterGateway); - } - - @Override - public CompletableFuture closeAsync() { - FutureUtils.assertNoException( - jobManagerRunnerFuture.handle( - (runner, throwable) -> { - if (throwable == null) { - // init was successful: close jobManager runner. - CompletableFuture jobManagerRunnerClose = - jobManagerRunnerFuture.thenCompose( - AutoCloseableAsync::closeAsync); - FutureUtils.forward(jobManagerRunnerClose, terminationFuture); - } else { - // initialization has failed: complete termination. - terminationFuture.complete(null); - } - return null; - })); - return terminationFuture; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java deleted file mode 100644 index a5d93da3a087c..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobResult.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.dispatcher; - -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; -import org.apache.flink.util.Preconditions; - -import javax.annotation.Nullable; - -/** - * Container for returning the {@link ExecutionGraphInfo} and a flag whether the initialization has - * failed. For initialization failures, the throwable is also attached, to avoid deserializing it - * from the {@link ArchivedExecutionGraph}. - */ -final class DispatcherJobResult { - - private final ExecutionGraphInfo executionGraphInfo; - - // if the throwable field is set, the job failed during initialization. - @Nullable private final Throwable initializationFailure; - - private DispatcherJobResult( - ExecutionGraphInfo executionGraphInfo, @Nullable Throwable throwable) { - this.executionGraphInfo = executionGraphInfo; - this.initializationFailure = throwable; - } - - public boolean isInitializationFailure() { - return initializationFailure != null; - } - - public ExecutionGraphInfo getExecutionGraphInfo() { - return executionGraphInfo; - } - - /** @throws IllegalStateException if this DispatcherJobResult is a successful initialization. */ - public Throwable getInitializationFailure() { - Preconditions.checkState( - isInitializationFailure(), - "This DispatcherJobResult does not represent a failed initialization."); - return initializationFailure; - } - - public static DispatcherJobResult forInitializationFailure( - ExecutionGraphInfo executionGraphInfo, Throwable throwable) { - return new DispatcherJobResult(executionGraphInfo, throwable); - } - - public static DispatcherJobResult forSuccess(ExecutionGraphInfo executionGraphInfo) { - return new DispatcherJobResult(executionGraphInfo, null); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index fbd40a384191f..5a7a97c41f67c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -19,6 +19,11 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.util.AutoCloseableAsync; import java.util.concurrent.CompletableFuture; @@ -55,4 +60,43 @@ public interface JobManagerRunner extends AutoCloseableAsync { * @return job id of the executed job */ JobID getJobID(); + + /** + * Cancels the currently executed job. + * + * @param timeout of this operation + * @return Future acknowledge of the operation + */ + CompletableFuture cancel(Time timeout); + + /** + * Requests the current job status. + * + * @param timeout for the rpc call + * @return Future containing the current job status + */ + CompletableFuture requestJobStatus(Time timeout); + + /** + * Request the details of the executed job. + * + * @param timeout for the rpc call + * @return Future details of the executed job + */ + CompletableFuture requestJobDetails(Time timeout); + + /** + * Requests the {@link ExecutionGraphInfo} of the executed job. + * + * @param timeout for the rpc call + * @return Future which is completed with the {@link ExecutionGraphInfo} of the executed job + */ + CompletableFuture requestJob(Time timeout); + + /** + * Flag indicating if the JobManagerRunner has been initialized. + * + * @return true if the JobManagerRunner has been initialized. + */ + boolean isInitialized(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java index 042945d9249cf..f17db5f874b63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java @@ -19,9 +19,12 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.client.JobInitializationException; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -29,11 +32,12 @@ import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +105,15 @@ public class JobManagerRunnerImpl private volatile CompletableFuture leaderGatewayFuture; + @GuardedBy("lock") + private JobManagerRunnerJobStatus jobStatus = JobManagerRunnerJobStatus.INITIALIZING; + + @Nullable private UUID currentLeaderSession = null; + + // set in state INITIALIZING_CANCELLING + @Nullable private CompletableFuture cancelFuture; + @Nullable private Time cancelTimeout; + // ------------------------------------------------------------------------ /** @@ -194,44 +207,132 @@ public CompletableFuture closeAsync() { shutdown = true; setNewLeaderGatewayFuture(); - leaderGatewayFuture.completeExceptionally( - new FlinkException("JobMaster has been shut down.")); - - final CompletableFuture jobManagerTerminationFuture; + final FlinkException shutdownException = + new FlinkException("JobMaster has been shut down."); + leaderGatewayFuture.completeExceptionally(shutdownException); + + if (jobStatus == JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED) { + checkState( + jobMasterService != null, + "JobMaster service must be set when Job master is initialized"); + FutureUtils.assertNoException( + jobMasterService + .closeAsync() + .whenComplete( + (Void ignored, Throwable throwable) -> + onJobManagerTermination(throwable))); + } else if (jobStatus.isInitializing()) { + if (jobStatus == JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) { + checkState(cancelFuture != null); + cancelFuture.completeExceptionally(shutdownException); + } - if (jobMasterService == null) { - jobManagerTerminationFuture = FutureUtils.completedVoidFuture(); - } else { - jobManagerTerminationFuture = jobMasterService.closeAsync(); + if (currentLeaderSession == null) { + // no ongoing JobMaster initialization (waiting for leadership) --> close + onJobManagerTermination(null); + } + // ongoing initialization, we will finish closing once it is done. } + } - jobManagerTerminationFuture.whenComplete( - (Void ignored, Throwable throwable) -> { - try { - leaderElectionService.stop(); - } catch (Throwable t) { - throwable = - ExceptionUtils.firstOrSuppressed( - t, - ExceptionUtils.stripCompletionException(throwable)); - } + return terminationFuture; + } + } - classLoaderLease.release(); + private void onJobManagerTermination(@Nullable Throwable throwable) { + try { + leaderElectionService.stop(); + } catch (Throwable t) { + throwable = + ExceptionUtils.firstOrSuppressed( + t, ExceptionUtils.stripCompletionException(throwable)); + } - resultFuture.complete(JobManagerRunnerResult.forJobNotFinished()); + classLoaderLease.release(); - if (throwable != null) { - terminationFuture.completeExceptionally( - new FlinkException( - "Could not properly shut down the JobManagerRunner", - throwable)); - } else { - terminationFuture.complete(null); - } - }); + resultFuture.completeExceptionally(new JobNotFinishedException(jobGraph.getJobID())); + + if (throwable != null) { + terminationFuture.completeExceptionally( + new FlinkException( + "Could not properly shut down the JobManagerRunner", throwable)); + } else { + terminationFuture.complete(null); + } + } + + // ---------------------------------------------------------------------------------------------- + // Job operations + // ---------------------------------------------------------------------------------------------- + + @Override + public CompletableFuture cancel(Time timeout) { + synchronized (lock) { + if (jobStatus == JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED) { + return leaderGatewayFuture.thenCompose( + jobMasterGateway -> jobMasterGateway.cancel(timeout)); + } else { + if (jobStatus == JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) { + checkState(cancelFuture != null); + return cancelFuture; + } + log.info( + "Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", + jobGraph.getJobID()); + + cancelFuture = new CompletableFuture<>(); + cancelTimeout = timeout; + jobStatus = JobManagerRunnerJobStatus.INITIALIZING_CANCELLING; + return cancelFuture; } + } + } - return terminationFuture; + @Override + public CompletableFuture requestJobStatus(Time timeout) { + return requestJob(timeout) + .thenApply( + executionGraphInfo -> + executionGraphInfo.getArchivedExecutionGraph().getState()); + } + + @Override + public CompletableFuture requestJobDetails(Time timeout) { + return requestJob(timeout) + .thenApply( + executionGraphInfo -> + JobDetails.createDetailsForJob( + executionGraphInfo.getArchivedExecutionGraph())); + } + + @Override + public CompletableFuture requestJob(Time timeout) { + synchronized (lock) { + if (jobStatus == JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED) { + if (resultFuture.isDone()) { // job is not running anymore + return resultFuture.thenApply(JobManagerRunnerResult::getExecutionGraphInfo); + } + // job is running + return leaderGatewayFuture.thenCompose( + jobMasterGateway -> jobMasterGateway.requestJob(timeout)); + } else { + checkState(jobStatus.isInitializing()); + return CompletableFuture.completedFuture( + new ExecutionGraphInfo( + ArchivedExecutionGraph.createFromInitializingJob( + jobGraph.getJobID(), + jobGraph.getName(), + jobStatus.asJobStatus(), + null, + initializationTimestamp))); + } + } + } + + @Override + public boolean isInitialized() { + synchronized (lock) { + return jobStatus == JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED; } } @@ -250,7 +351,7 @@ public void jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInf /** Job completion notification triggered by self. */ @Override public void jobFinishedByOther() { - resultFuture.complete(JobManagerRunnerResult.forJobNotFinished()); + resultFuture.completeExceptionally(new JobNotFinishedException(jobGraph.getJobID())); } @Override @@ -297,27 +398,38 @@ public void grantLeadership(final UUID leaderSessionID) { "JobManagerRunner cannot be granted leadership because it is already shut down."); return; } + this.currentLeaderSession = leaderSessionID; + // enqueue a leadership operation leadershipOperation = - leadershipOperation.thenRun( - ThrowingRunnable.unchecked( - () -> { - synchronized (lock) { - verifyJobSchedulingStatusAndStartJobManager( - leaderSessionID); - } - })); + leadershipOperation.thenCompose( + (ign) -> { + synchronized (lock) { + if (currentLeaderSession == null + || !currentLeaderSession.equals(leaderSessionID)) { + // lost leadership in the meantime, complete this operation + return CompletableFuture.completedFuture(null); + } + try { + return verifyJobSchedulingStatusAndStartJobManager( + leaderSessionID); + } catch (FlinkException e) { + ExceptionUtils.rethrow(e); + } + } + return null; + }); handleException(leadershipOperation, "Could not start the job manager."); } } @GuardedBy("lock") - private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) - throws FlinkException { + private CompletableFuture verifyJobSchedulingStatusAndStartJobManager( + UUID leaderSessionId) throws FlinkException { if (shutdown) { log.debug("Ignoring starting JobMaster because JobManagerRunner is already shut down."); - return; + return CompletableFuture.completedFuture(null); } final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus = @@ -325,13 +437,15 @@ private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) { jobAlreadyDone(); + return CompletableFuture.completedFuture(null); } else { - startJobMaster(leaderSessionId); + return startJobMaster(leaderSessionId); } } @GuardedBy("lock") - private void startJobMaster(UUID leaderSessionId) throws FlinkException { + private CompletableFuture startJobMaster(UUID leaderSessionId) throws FlinkException { + checkState(jobMasterService == null, "JobMasterService must be null before being started."); log.info( "JobManager runner for job {} ({}) was granted leadership with session id {}.", jobGraph.getName(), @@ -348,17 +462,76 @@ private void startJobMaster(UUID leaderSessionId) throws FlinkException { e); } - startJobMasterServiceSafely(leaderSessionId); + // run blocking JobMaster initialization outside of the lock + CompletableFuture jobMasterStartFuture = + CompletableFuture.supplyAsync( + () -> startJobMasterServiceSafely(leaderSessionId), executor); + + return jobMasterStartFuture.thenCompose( + (newJobMasterService) -> { + synchronized (lock) { + if (newJobMasterService == null) { + // initialization failed + if (jobStatus == JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) { + checkState(cancelFuture != null); + cancelFuture.completeExceptionally( + new FlinkException( + "Cancellation failed because JobMaster initialization failed")); + } + jobStatus = JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED; + return CompletableFuture.completedFuture(null); + } else { + return onJobMasterInitializationCompletion( + newJobMasterService, leaderSessionId); + } + } + }); + } - if (jobMasterService != null) { - confirmLeaderSessionIdIfStillLeader(jobMasterService, leaderSessionId); + // JobMaster initialization is completed. Ensure proper state and leadership + @GuardedBy("lock") + private CompletableFuture onJobMasterInitializationCompletion( + JobMasterService newJobMasterService, UUID leaderSessionId) { + checkNotNull(newJobMasterService); + if (shutdown) { + return newJobMasterService + .closeAsync() + .whenComplete( + (Void ignored, Throwable throwable) -> + onJobManagerTermination(throwable)); } - } - private void startJobMasterServiceSafely(UUID leaderSessionId) { - checkState(jobMasterService == null, "JobMasterService must be null before being started."); + checkState( + jobStatus.isInitializing(), + "Can only complete initialization in initializing state"); + + if (leaderElectionService.hasLeadership(leaderSessionId)) { + jobMasterService = newJobMasterService; + leaderGatewayFuture.complete(jobMasterService.getGateway()); + leaderElectionService.confirmLeadership(leaderSessionId, jobMasterService.getAddress()); + + if (jobStatus == JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) { + checkState(cancelFuture != null); + FutureUtils.forward( + newJobMasterService.getGateway().cancel(cancelTimeout), cancelFuture); + } + + jobStatus = JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED; + } else { + log.info( + "Ignoring confirmation of leader session id because {} is no longer the leader. Shutting down JobMaster", + getDescription()); + return newJobMasterService.closeAsync(); + } + + return CompletableFuture.completedFuture(null); + } + @Nullable + private JobMasterService startJobMasterServiceSafely(UUID leaderSessionId) { try { + // We assume this operation to be potentially long-running (thus it can not block the + // JobManager main thread on it) final JobMasterService newJobMasterService = jobMasterServiceFactory.createJobMasterService( jobGraph, @@ -367,9 +540,7 @@ private void startJobMasterServiceSafely(UUID leaderSessionId) { userCodeClassLoader, initializationTimestamp); - jobMasterService = newJobMasterService; - - jobMasterService + newJobMasterService .getTerminationFuture() .whenComplete( (unused, throwable) -> { @@ -383,11 +554,15 @@ private void startJobMasterServiceSafely(UUID leaderSessionId) { } } }); - } catch (Exception e) { + return newJobMasterService; + } catch (Exception initializationError) { resultFuture.complete( JobManagerRunnerResult.forInitializationFailure( new JobInitializationException( - jobGraph.getJobID(), "Could not start the JobMaster.", e))); + jobGraph.getJobID(), + "Could not start the JobMaster.", + initializationError))); + return null; } } @@ -408,19 +583,6 @@ private RunningJobsRegistry.JobSchedulingStatus getJobSchedulingStatus() throws } } - private void confirmLeaderSessionIdIfStillLeader( - JobMasterService jobMasterService, UUID leaderSessionId) { - - if (leaderElectionService.hasLeadership(leaderSessionId)) { - leaderGatewayFuture.complete(jobMasterService.getGateway()); - leaderElectionService.confirmLeadership(leaderSessionId, jobMasterService.getAddress()); - } else { - log.debug( - "Ignoring confirmation of leader session id because {} is no longer the leader.", - getDescription()); - } - } - @Override public void revokeLeadership() { synchronized (lock) { @@ -429,6 +591,8 @@ public void revokeLeadership() { "Ignoring revoking leadership because JobManagerRunner is already shut down."); return; } + // unset current leader session to fail-fast queued grant leadership operations + currentLeaderSession = null; leadershipOperation = leadershipOperation.thenCompose( @@ -456,12 +620,12 @@ private CompletableFuture revokeJobMasterLeadership() { jobGraph.getName(), jobGraph.getJobID(), jobMasterService.getAddress()); - setNewLeaderGatewayFuture(); final CompletableFuture jobMasterServiceTerminationFuture = jobMasterService.closeAsync(); jobMasterService = null; + jobStatus = JobManagerRunnerJobStatus.INITIALIZING; return jobMasterServiceTerminationFuture; } else { @@ -501,4 +665,34 @@ public void handleError(Exception exception) { log.error("Leader Election Service encountered a fatal error.", exception); handleJobManagerRunnerError(exception); } + + // ------------------------------------------------------------------------ + + private enum JobManagerRunnerJobStatus { + // We are waiting for the JobMaster to be initialized + INITIALIZING(JobStatus.INITIALIZING, true), + // JobMaster is initialized (this includes initialization failures) + JOBMASTER_INITIALIZED(null, false), + // waiting for cancellation during initialization + INITIALIZING_CANCELLING(JobStatus.CANCELLING, true); + + @Nullable private final JobStatus jobStatus; + private final boolean initializing; + + JobManagerRunnerJobStatus(@Nullable JobStatus jobStatus, boolean initializing) { + this.jobStatus = jobStatus; + this.initializing = initializing; + } + + public JobStatus asJobStatus() { + if (jobStatus == null) { + throw new IllegalStateException("This state is not defined as a 'JobStatus'"); + } + return jobStatus; + } + + public boolean isInitializing() { + return initializing; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java index 53542ef4d3a0d..d13cb5dd7ff52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResult.java @@ -42,10 +42,6 @@ public boolean isSuccess() { return executionGraphInfo != null && failure == null; } - public boolean isJobNotFinished() { - return executionGraphInfo == null && failure == null; - } - public boolean isInitializationFailure() { return executionGraphInfo == null && failure != null; } @@ -90,10 +86,6 @@ public int hashCode() { return Objects.hash(executionGraphInfo, failure); } - public static JobManagerRunnerResult forJobNotFinished() { - return new JobManagerRunnerResult(null, null); - } - public static JobManagerRunnerResult forSuccess(ExecutionGraphInfo executionGraphInfo) { return new JobManagerRunnerResult(executionGraphInfo, null); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java deleted file mode 100644 index 32ff226258f9c..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java +++ /dev/null @@ -1,410 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.dispatcher; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; -import org.apache.flink.runtime.jobmaster.JobManagerRunner; -import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult; -import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; -import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; -import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -import static org.hamcrest.core.Is.is; -import static org.hamcrest.core.StringContains.containsString; -import static org.junit.Assert.assertThat; - -/** Test for the {@link DispatcherJob} class. */ -public class DispatcherJobTest extends TestLogger { - - private static final Time TIMEOUT = Time.seconds(10L); - - @Test - public void testStatusWhenInitializing() throws Exception { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - - assertThat(dispatcherJob.isInitialized(), is(false)); - assertThat(dispatcherJob.getResultFuture().isDone(), is(false)); - assertJobStatus(dispatcherJob, JobStatus.INITIALIZING); - } - - @Test - public void testStatusWhenRunning() throws Exception { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - - // finish initialization - testContext.setRunning(); - - assertJobStatus(dispatcherJob, JobStatus.RUNNING); - - // result future not done - assertThat(dispatcherJob.getResultFuture().isDone(), is(false)); - - assertThat(dispatcherJob.isInitialized(), is(true)); - } - - @Test - public void testStatusWhenJobFinished() throws Exception { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - - // finish job - testContext.setRunning(); - testContext.finishJob(); - - assertJobStatus(dispatcherJob, JobStatus.FINISHED); - - // assert result future done - DispatcherJobResult result = dispatcherJob.getResultFuture().get(); - - assertThat( - result.getExecutionGraphInfo().getArchivedExecutionGraph().getState(), - is(JobStatus.FINISHED)); - } - - @Test - public void testStatusWhenCancellingWhileInitializing() throws Exception { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - assertJobStatus(dispatcherJob, JobStatus.INITIALIZING); - - CompletableFuture cancelFuture = dispatcherJob.cancel(TIMEOUT); - - assertThat(cancelFuture.isDone(), is(false)); - assertThat(dispatcherJob.isInitialized(), is(false)); - - assertJobStatus(dispatcherJob, JobStatus.CANCELLING); - - testContext.setRunning(); - testContext.finishCancellation(); - - // assert that cancel future completes - cancelFuture.get(); - - assertJobStatus(dispatcherJob, JobStatus.CANCELED); - assertThat(dispatcherJob.isInitialized(), is(true)); - // assert that the result future completes - assertThat( - dispatcherJob - .getResultFuture() - .get() - .getExecutionGraphInfo() - .getArchivedExecutionGraph() - .getState(), - is(JobStatus.CANCELED)); - } - - @Test - public void testStatusWhenCancellingWhileRunning() throws Exception { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - - testContext.setRunning(); - CompletableFuture cancelFuture = dispatcherJob.cancel(TIMEOUT); - - assertJobStatus(dispatcherJob, JobStatus.CANCELLING); - testContext.finishCancellation(); - - cancelFuture.get(); - assertJobStatus(dispatcherJob, JobStatus.CANCELED); - assertThat( - dispatcherJob - .getResultFuture() - .get() - .getExecutionGraphInfo() - .getArchivedExecutionGraph() - .getState(), - is(JobStatus.CANCELED)); - } - - @Test - public void testStatusWhenCancellingWhileFailed() throws Exception { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - - RuntimeException exception = - new RuntimeException("Artificial failure in runner initialization"); - testContext.failInitialization(exception); - - assertJobStatus(dispatcherJob, JobStatus.FAILED); - - CommonTestUtils.assertThrows( - "Artificial failure", - ExecutionException.class, - () -> dispatcherJob.cancel(TIMEOUT).get()); - - assertJobStatus(dispatcherJob, JobStatus.FAILED); - } - - @Test - public void testErrorWhileInitializing() throws Exception { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - - // now fail - RuntimeException exception = - new RuntimeException("Artificial failure in runner initialization"); - testContext.failInitialization(exception); - - assertThat(dispatcherJob.isInitialized(), is(true)); - assertJobStatus(dispatcherJob, JobStatus.FAILED); - - ArchivedExecutionGraph aeg = - dispatcherJob - .getResultFuture() - .get() - .getExecutionGraphInfo() - .getArchivedExecutionGraph(); - assertThat( - aeg.getFailureInfo() - .getException() - .deserializeError(ClassLoader.getSystemClassLoader()), - is(exception)); - } - - @Test - public void testDispatcherJobResult() throws Exception { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - testContext.failInitialization( - new RuntimeException("Artificial failure in runner initialization")); - - DispatcherJobResult result = dispatcherJob.getResultFuture().get(); - assertThat(result.isInitializationFailure(), is(true)); - assertThat( - result.getExecutionGraphInfo().getArchivedExecutionGraph().getState(), - is(JobStatus.FAILED)); - assertThat( - result.getInitializationFailure().getMessage(), - containsString("Artificial failure")); - } - - @Test - public void testCloseWhileInitializingSuccessfully() throws Exception { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - - CompletableFuture closeFuture = dispatcherJob.closeAsync(); - assertThat(closeFuture.isDone(), is(false)); - - // set job running, so that we can cancel it - testContext.setRunning(); - - // assert future completes now - closeFuture.get(); - - // ensure the result future is complete (how it completes is up to the JobManager) - CompletableFuture resultFuture = dispatcherJob.getResultFuture(); - CommonTestUtils.assertThrows( - "has not been finished", ExecutionException.class, resultFuture::get); - } - - @Test - public void testCloseWhileInitializingErroneously() throws Exception { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - - CompletableFuture closeFuture = dispatcherJob.closeAsync(); - assertThat(closeFuture.isDone(), is(false)); - - testContext.failInitialization(new RuntimeException("fail")); - - // assert future completes now - closeFuture.get(); - - // ensure the result future is complete - dispatcherJob.getResultFuture().get(); - } - - @Test - public void testCloseWhileRunning() throws Exception { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - - // complete JobManager runner future to indicate to the DispatcherJob that the Runner has - // been initialized - testContext.setRunning(); - - CompletableFuture closeFuture = dispatcherJob.closeAsync(); - - closeFuture.get(); - - // result future should complete exceptionally. - CompletableFuture resultFuture = dispatcherJob.getResultFuture(); - CommonTestUtils.assertThrows( - "has not been finished", ExecutionException.class, resultFuture::get); - } - - @Test(expected = IllegalStateException.class) - public void testUnavailableJobMasterGateway() { - TestContext testContext = createTestContext(); - DispatcherJob dispatcherJob = testContext.getDispatcherJob(); - dispatcherJob.getJobMasterGateway(); - } - - private TestContext createTestContext() { - JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph(); - CompletableFuture jobManagerRunnerCompletableFuture = - new CompletableFuture<>(); - DispatcherJob dispatcherJob = - DispatcherJob.createFor( - jobManagerRunnerCompletableFuture, - jobGraph.getJobID(), - jobGraph.getName(), - System.currentTimeMillis()); - - return new TestContext(jobManagerRunnerCompletableFuture, dispatcherJob, jobGraph); - } - - private static class TestContext { - private final CompletableFuture jobManagerRunnerCompletableFuture; - private final DispatcherJob dispatcherJob; - private final JobGraph jobGraph; - private final TestingJobMasterGateway mockRunningJobMasterGateway; - private final CompletableFuture cancellationFuture; - - private JobStatus internalJobStatus = JobStatus.INITIALIZING; - private CompletableFuture resultFuture = new CompletableFuture<>(); - - public TestContext( - CompletableFuture jobManagerRunnerCompletableFuture, - DispatcherJob dispatcherJob, - JobGraph jobGraph) { - this.jobManagerRunnerCompletableFuture = jobManagerRunnerCompletableFuture; - this.dispatcherJob = dispatcherJob; - this.jobGraph = jobGraph; - - this.cancellationFuture = new CompletableFuture<>(); - this.mockRunningJobMasterGateway = - new TestingJobMasterGatewayBuilder() - .setRequestJobSupplier( - () -> - CompletableFuture.completedFuture( - new ExecutionGraphInfo( - ArchivedExecutionGraph - .createFromInitializingJob( - getJobID(), - "test", - internalJobStatus, - null, - 1337)))) - .setRequestJobDetailsSupplier( - () -> { - JobDetails jobDetails = - new JobDetails( - getJobID(), - "", - 0, - 0, - 0, - internalJobStatus, - 0, - new int[] {0, 0, 0, 0, 0, 0, 0, 0, 0}, - 0); - return CompletableFuture.completedFuture(jobDetails); - }) - // once JobManagerRunner is initialized, complete result future with - // CANCELLED AEG and ack cancellation. - .setCancelFunction( - () -> { - internalJobStatus = JobStatus.CANCELLING; - return cancellationFuture; - }) - .build(); - } - - public JobID getJobID() { - return jobGraph.getJobID(); - } - - public void failInitialization(Throwable ex) { - jobManagerRunnerCompletableFuture.completeExceptionally(ex); - } - - public DispatcherJob getDispatcherJob() { - return dispatcherJob; - } - - public void setRunning() { - internalJobStatus = JobStatus.RUNNING; - JobManagerRunner jobManagerRunner = - new TestingJobManagerRunner.Builder() - .setJobId(getJobID()) - .setBlockingTermination(false) - .setJobMasterGatewayFuture( - CompletableFuture.completedFuture(mockRunningJobMasterGateway)) - .setResultFuture(resultFuture) - .build(); - jobManagerRunnerCompletableFuture.complete(jobManagerRunner); - } - - public void finishJob() { - internalJobStatus = JobStatus.FINISHED; - resultFuture.complete( - JobManagerRunnerResult.forSuccess( - new ExecutionGraphInfo( - ArchivedExecutionGraph.createFromInitializingJob( - getJobID(), "test", JobStatus.FINISHED, null, 1337)))); - } - - public void finishCancellation() { - jobManagerRunnerCompletableFuture.thenAccept( - runner -> { - internalJobStatus = JobStatus.CANCELED; - runner.getResultFuture() - .complete( - JobManagerRunnerResult.forSuccess( - new ExecutionGraphInfo( - ArchivedExecutionGraph - .createFromInitializingJob( - getJobID(), - "test", - JobStatus.CANCELED, - null, - 1337)))); - cancellationFuture.complete(Acknowledge.get()); - }); - } - } - - private void assertJobStatus(DispatcherJob dispatcherJob, JobStatus expectedStatus) - throws Exception { - assertThat(dispatcherJob.requestJobDetails(TIMEOUT).get().getStatus(), is(expectedStatus)); - assertThat( - dispatcherJob.requestJob(TIMEOUT).get().getArchivedExecutionGraph().getState(), - is(expectedStatus)); - assertThat(dispatcherJob.requestJobStatus(TIMEOUT).get(), is(expectedStatus)); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index 09d9c3c2c372b..d55860fb88d72 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.blob.TestingBlobStore; import org.apache.flink.runtime.blob.TestingBlobStoreBuilder; import org.apache.flink.runtime.client.DuplicateJobSubmissionException; -import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -58,7 +58,6 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -294,17 +293,18 @@ public void testBlobServerCleanupWhenJobNotFinished() throws Exception { @Test public void testBlobServerCleanupWhenJobSubmissionFails() throws Exception { startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception"))); - dispatcherGateway.submitJob(jobGraph, timeout).get(); - - Optional maybeError = - dispatcherGateway.requestJobResult(jobId, timeout).get().getSerializedThrowable(); + final CompletableFuture submissionFuture = + dispatcherGateway.submitJob(jobGraph, timeout); - assertThat(maybeError.isPresent(), is(true)); - Throwable exception = maybeError.get().deserializeError(this.getClass().getClassLoader()); + try { + submissionFuture.get(); + fail("Job submission was expected to fail."); + } catch (ExecutionException ee) { + assertThat( + ExceptionUtils.findThrowable(ee, JobSubmissionException.class).isPresent(), + is(true)); + } - assertThat( - ExceptionUtils.findThrowable(exception, JobExecutionException.class).isPresent(), - is(true)); assertThatHABlobsHaveBeenRemoved(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index d19f5b451e35e..82814bd3d05f0 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -45,11 +45,15 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.JobGraphWriter; import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl; +import org.apache.flink.runtime.jobmaster.JobManagerRunnerImplTest; import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobNotFinishedException; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; +import org.apache.flink.runtime.jobmaster.TestingJobMasterService; import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; @@ -78,6 +82,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import org.apache.flink.util.TimeUtils; import org.apache.flink.util.function.ThrowingRunnable; @@ -116,8 +121,10 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; -import static org.hamcrest.Matchers.empty; +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; @@ -215,109 +222,6 @@ private TestingDispatcher createAndStartDispatcher( return dispatcher; } - private static final class InitializationTimestampCapturingJobManagerRunnerFactory - implements JobManagerRunnerFactory { - private final BlockingQueue initializationTimestampQueue; - - private InitializationTimestampCapturingJobManagerRunnerFactory( - BlockingQueue initializationTimestampQueue) { - this.initializationTimestampQueue = initializationTimestampQueue; - } - - @Override - public JobManagerRunner createJobManagerRunner( - JobGraph jobGraph, - Configuration configuration, - RpcService rpcService, - HighAvailabilityServices highAvailabilityServices, - HeartbeatServices heartbeatServices, - JobManagerSharedServices jobManagerServices, - JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, - FatalErrorHandler fatalErrorHandler, - long initializationTimestamp) { - initializationTimestampQueue.offer(initializationTimestamp); - return new TestingJobManagerRunner.Builder().setJobId(jobGraph.getJobID()).build(); - } - } - - /** Builder for the TestingDispatcher. */ - public class TestingDispatcherBuilder { - - private Collection initialJobGraphs = Collections.emptyList(); - - private DispatcherBootstrapFactory dispatcherBootstrapFactory = - (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(); - - private HeartbeatServices heartbeatServices = DispatcherTest.this.heartbeatServices; - - private HighAvailabilityServices haServices = DispatcherTest.this.haServices; - - private JobManagerRunnerFactory jobManagerRunnerFactory = - DefaultJobManagerRunnerFactory.INSTANCE; - - private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; - - TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices heartbeatServices) { - this.heartbeatServices = heartbeatServices; - return this; - } - - TestingDispatcherBuilder setHaServices(HighAvailabilityServices haServices) { - this.haServices = haServices; - return this; - } - - TestingDispatcherBuilder setInitialJobGraphs(Collection initialJobGraphs) { - this.initialJobGraphs = initialJobGraphs; - return this; - } - - TestingDispatcherBuilder setDispatcherBootstrapFactory( - DispatcherBootstrapFactory dispatcherBootstrapFactory) { - this.dispatcherBootstrapFactory = dispatcherBootstrapFactory; - return this; - } - - TestingDispatcherBuilder setJobManagerRunnerFactory( - JobManagerRunnerFactory jobManagerRunnerFactory) { - this.jobManagerRunnerFactory = jobManagerRunnerFactory; - return this; - } - - TestingDispatcherBuilder setJobGraphWriter(JobGraphWriter jobGraphWriter) { - this.jobGraphWriter = jobGraphWriter; - return this; - } - - TestingDispatcher build() throws Exception { - TestingResourceManagerGateway resourceManagerGateway = - new TestingResourceManagerGateway(); - - final MemoryExecutionGraphInfoStore executionGraphInfoStore = - new MemoryExecutionGraphInfoStore(); - - return new TestingDispatcher( - rpcService, - DispatcherId.generate(), - initialJobGraphs, - dispatcherBootstrapFactory, - new DispatcherServices( - configuration, - haServices, - () -> CompletableFuture.completedFuture(resourceManagerGateway), - blobServer, - heartbeatServices, - executionGraphInfoStore, - testingFatalErrorHandlerResource.getFatalErrorHandler(), - VoidHistoryServerArchivist.INSTANCE, - null, - UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), - jobGraphWriter, - jobManagerRunnerFactory, - ForkJoinPool.commonPool())); - } - } - @After public void tearDown() throws Exception { if (dispatcher != null) { @@ -394,55 +298,44 @@ public void testJobSubmissionWithPartialResourceConfigured() throws Exception { @Test public void testNonBlockingJobSubmission() throws Exception { - final OneShotLatch latch = new OneShotLatch(); - dispatcher = - createAndStartDispatcher( - heartbeatServices, - haServices, - new BlockingJobManagerRunnerFactory(latch::await)); + JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster = + new JobManagerRunnerWithBlockingJobMasterFactory(); + dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster); DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); - final JobGraph emptyJobGraph = JobGraphTestUtils.emptyJobGraph(); - JobID jobID = emptyJobGraph.getJobID(); - - dispatcherGateway.submitJob(emptyJobGraph, TIMEOUT).get(); + blockingJobMaster.waitForBlockingInit(); // ensure INITIALIZING status assertThat( - dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), + dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), is(JobStatus.INITIALIZING)); // ensure correct JobDetails MultipleJobsDetails multiDetails = dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get(); assertEquals(1, multiDetails.getJobs().size()); - assertEquals(jobID, multiDetails.getJobs().iterator().next().getJobId()); + assertEquals(jobId, multiDetails.getJobs().iterator().next().getJobId()); - // submission has succeeded, let the initialization finish. - latch.trigger(); + // let the initialization finish. + blockingJobMaster.unblockJobMasterInitialization(); // ensure job is running CommonTestUtils.waitUntilCondition( - () -> dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get() == JobStatus.RUNNING, + () -> dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get() == JobStatus.RUNNING, Deadline.fromNow(TimeUtils.toDuration(TIMEOUT)), 5L); } @Test public void testInvalidCallDuringInitialization() throws Exception { - final OneShotLatch latch = new OneShotLatch(); - dispatcher = - createAndStartDispatcher( - heartbeatServices, - haServices, - new BlockingJobManagerRunnerFactory(latch::await)); - + JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster = + new JobManagerRunnerWithBlockingJobMasterFactory(); + dispatcher = createAndStartDispatcher(heartbeatServices, haServices, blockingJobMaster); DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - final JobGraph emptyJobGraph = - JobGraphBuilder.newStreamingJobGraphBuilder().setJobId(jobId).build(); - - dispatcherGateway.submitJob(emptyJobGraph, TIMEOUT).get(); + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); assertThat( dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), @@ -457,17 +350,6 @@ public void testInvalidCallDuringInitialization() throws Exception { } catch (ExecutionException t) { assertTrue(t.getCause() instanceof UnavailableDispatcherOperationException); } - - // submission has succeeded, let the initialization finish. - latch.trigger(); - - // ensure job is running - CommonTestUtils.waitUntilCondition( - () -> - dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get() - == JobStatus.RUNNING, - Deadline.fromNow(TimeUtils.toDuration(TIMEOUT)), - 5L); } @Test @@ -652,64 +534,6 @@ private URI createTestingSavepoint() throws IOException, URISyntaxException { return new URI(completedCheckpointStorageLocation.getExternalPointer()); } - /** - * Tests that we wait until the JobMaster has gained leader ship before sending requests to it. - * See FLINK-8887. - */ - @Test - public void testWaitingForJobMasterLeadership() throws Exception { - final TestingJobManagerRunnerFactory testingJobManagerRunnerFactory = - new TestingJobManagerRunnerFactory(); - dispatcher = - createAndStartDispatcher( - heartbeatServices, haServices, testingJobManagerRunnerFactory); - - final DispatcherGateway dispatcherGateway = - dispatcher.getSelfGateway(DispatcherGateway.class); - - dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); - log.info("Job submission completed"); - - // try getting a blocking, non-initializing job status future in a retry-loop. - // In some CI environments, we can not guarantee that the job immediately leaves the - // INITIALIZING status - // after the jobMasterLeaderElectionService has been started. - CompletableFuture jobStatusFuture = null; - for (int i = 0; i < 5; i++) { - jobStatusFuture = dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT); - try { - JobStatus status = jobStatusFuture.get(10, TimeUnit.MILLISECONDS); - if (status == JobStatus.INITIALIZING) { - jobStatusFuture = null; - Thread.sleep(100); - } - } catch (TimeoutException ignored) { - break; // great, we have a blocking future - } - } - if (jobStatusFuture == null) { - fail("Unable to get a job status future blocked on leader election."); - } - - final TestingJobManagerRunner testingJobManagerRunner = - testingJobManagerRunnerFactory.takeCreatedJobManagerRunner(); - - // completing the JobMasterGatewayFuture means that the JobManagerRunner has confirmed the - // leadership - testingJobManagerRunner.completeJobMasterGatewayFuture( - new TestingJobMasterGatewayBuilder() - .setRequestJobSupplier( - () -> - CompletableFuture.completedFuture( - new ExecutionGraphInfo( - new ArchivedExecutionGraphBuilder() - .setState(JobStatus.RUNNING) - .build()))) - .build()); - - assertThat(jobStatusFuture.get(), is(JobStatus.RUNNING)); - } - /** * Tests that the {@link Dispatcher} fails fatally if the recovered jobs cannot be started. See * FLINK-9097. @@ -753,92 +577,42 @@ public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws Exception { fatalErrorHandler.clearError(); } - /** - * Tests that a blocking {@link JobManagerRunner} creation, e.g. due to blocking FileSystem - * access, does not block the {@link Dispatcher}. - * - *

See FLINK-10314 - */ - @Test - public void testBlockingJobManagerRunner() throws Exception { - final OneShotLatch jobManagerRunnerCreationLatch = new OneShotLatch(); - dispatcher = - createAndStartDispatcher( - heartbeatServices, - haServices, - new BlockingJobManagerRunnerFactory(jobManagerRunnerCreationLatch::await)); - - final DispatcherGateway dispatcherGateway = - dispatcher.getSelfGateway(DispatcherGateway.class); - - dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); - - assertThat( - dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), - is(JobStatus.INITIALIZING)); - - final CompletableFuture> metricQueryServiceAddressesFuture = - dispatcherGateway.requestMetricQueryServiceAddresses(Time.seconds(5L)); - - assertThat(metricQueryServiceAddressesFuture.get(), is(empty())); - - assertThat( - dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), - is(JobStatus.INITIALIZING)); - - jobManagerRunnerCreationLatch.trigger(); - } - /** Tests that a failing {@link JobManagerRunner} will be properly cleaned up. */ @Test public void testFailingJobManagerRunnerCleanup() throws Exception { final FlinkException testException = new FlinkException("Test exception."); final ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(2); + BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory = + new BlockingJobManagerRunnerFactory( + () -> { + final Optional take = queue.take(); + final Exception exception = take.orElse(null); + + if (exception != null) { + throw exception; + } + }); dispatcher = createAndStartDispatcher( - heartbeatServices, - haServices, - new BlockingJobManagerRunnerFactory( - () -> { - final Optional take = queue.take(); - final Exception exception = take.orElse(null); - - if (exception != null) { - throw exception; - } - })); + heartbeatServices, haServices, blockingJobManagerRunnerFactory); final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); - - assertThat( - dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), - is(JobStatus.INITIALIZING)); - queue.offer(Optional.of(testException)); - - // wait till job is failed - dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get(); - - ArchivedExecutionGraph execGraph = - dispatcherGateway.requestJob(jobGraph.getJobID(), TIMEOUT).get(); - Assert.assertNotNull(execGraph.getFailureInfo()); - assertThat( - ExceptionUtils.findSerializedThrowable( - execGraph.getFailureInfo().getException(), - FlinkException.class, - this.getClass().getClassLoader()) - .isPresent(), - is(true)); - - // submit job again - dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + try { + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + } catch (Throwable expectedException) { + assertThat(expectedException, containsCause(FlinkException.class)); + assertThat(expectedException, containsMessage(testException.getMessage())); + } // don't fail this time queue.offer(Optional.empty()); + // submit job again + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + blockingJobManagerRunnerFactory.setJobStatus(JobStatus.RUNNING); // Ensure job is running CommonTestUtils.waitUntilCondition( @@ -969,10 +743,76 @@ public void testInitializationTimestampForwardedToJobManagerRunner() throws Exce assertThat(initializationTimestamp, greaterThan(0L)); } + private static class JobManagerRunnerWithBlockingJobMasterFactory + implements JobManagerRunnerFactory { + + private final JobMasterGateway jobMasterGateway; + private final AtomicReference currentJobStatus; + private JobManagerRunnerImplTest.BlockingJobMasterServiceFactory + blockingJobMasterServiceFactory; + + private JobManagerRunnerWithBlockingJobMasterFactory() { + currentJobStatus = new AtomicReference<>(JobStatus.INITIALIZING); + this.jobMasterGateway = + new TestingJobMasterGatewayBuilder() + .setRequestJobSupplier( + () -> + CompletableFuture.completedFuture( + new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder() + .setState( + currentJobStatus.get()) + .build()))) + .build(); + } + + @Override + public JobManagerRunner createJobManagerRunner( + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + JobManagerSharedServices jobManagerServices, + JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, + FatalErrorHandler fatalErrorHandler, + long initializationTimestamp) + throws Exception { + + this.blockingJobMasterServiceFactory = + new JobManagerRunnerImplTest.BlockingJobMasterServiceFactory(jobMasterGateway); + + return new JobManagerRunnerImpl( + jobGraph, + blockingJobMasterServiceFactory, + highAvailabilityServices, + jobManagerServices + .getLibraryCacheManager() + .registerClassLoaderLease(jobGraph.getJobID()), + jobManagerServices.getScheduledExecutorService(), + fatalErrorHandler, + initializationTimestamp); + } + + public TestingJobMasterService waitForBlockingInit() + throws ExecutionException, InterruptedException { + return blockingJobMasterServiceFactory.waitForBlockingOnInit(); + } + + public void unblockJobMasterInitialization() { + Preconditions.checkNotNull( + blockingJobMasterServiceFactory, + "This action is only available after the JobManagerRunner has been created"); + blockingJobMasterServiceFactory.unblock(); + currentJobStatus.set(JobStatus.RUNNING); + } + } + private static final class BlockingJobManagerRunnerFactory extends TestingJobManagerRunnerFactory { @Nonnull private final ThrowingRunnable jobManagerRunnerCreationLatch; + private TestingJobManagerRunner testingRunner; BlockingJobManagerRunnerFactory( @Nonnull ThrowingRunnable jobManagerRunnerCreationLatch) { @@ -993,7 +833,7 @@ public TestingJobManagerRunner createJobManagerRunner( throws Exception { jobManagerRunnerCreationLatch.run(); - TestingJobManagerRunner testingRunner = + this.testingRunner = super.createJobManagerRunner( jobGraph, configuration, @@ -1022,6 +862,110 @@ public TestingJobManagerRunner createJobManagerRunner( testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway); return testingRunner; } + + public void setJobStatus(JobStatus newStatus) { + Preconditions.checkState( + testingRunner != null, + "JobManagerRunner must be created before this method is available"); + this.testingRunner.setJobStatus(newStatus); + } + } + + private static final class InitializationTimestampCapturingJobManagerRunnerFactory + implements JobManagerRunnerFactory { + private final BlockingQueue initializationTimestampQueue; + + private InitializationTimestampCapturingJobManagerRunnerFactory( + BlockingQueue initializationTimestampQueue) { + this.initializationTimestampQueue = initializationTimestampQueue; + } + + @Override + public JobManagerRunner createJobManagerRunner( + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + JobManagerSharedServices jobManagerServices, + JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, + FatalErrorHandler fatalErrorHandler, + long initializationTimestamp) { + initializationTimestampQueue.offer(initializationTimestamp); + return new TestingJobManagerRunner.Builder().setJobId(jobGraph.getJobID()).build(); + } + } + + /** Builder for the TestingDispatcher. */ + public class TestingDispatcherBuilder { + + private Collection initialJobGraphs = Collections.emptyList(); + + private final DispatcherBootstrapFactory dispatcherBootstrapFactory = + (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(); + + private HeartbeatServices heartbeatServices = DispatcherTest.this.heartbeatServices; + + private HighAvailabilityServices haServices = DispatcherTest.this.haServices; + + private JobManagerRunnerFactory jobManagerRunnerFactory = + DefaultJobManagerRunnerFactory.INSTANCE; + + private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; + + TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices heartbeatServices) { + this.heartbeatServices = heartbeatServices; + return this; + } + + TestingDispatcherBuilder setHaServices(HighAvailabilityServices haServices) { + this.haServices = haServices; + return this; + } + + TestingDispatcherBuilder setInitialJobGraphs(Collection initialJobGraphs) { + this.initialJobGraphs = initialJobGraphs; + return this; + } + + TestingDispatcherBuilder setJobManagerRunnerFactory( + JobManagerRunnerFactory jobManagerRunnerFactory) { + this.jobManagerRunnerFactory = jobManagerRunnerFactory; + return this; + } + + TestingDispatcherBuilder setJobGraphWriter(JobGraphWriter jobGraphWriter) { + this.jobGraphWriter = jobGraphWriter; + return this; + } + + TestingDispatcher build() throws Exception { + TestingResourceManagerGateway resourceManagerGateway = + new TestingResourceManagerGateway(); + + final MemoryExecutionGraphInfoStore executionGraphInfoStore = + new MemoryExecutionGraphInfoStore(); + + return new TestingDispatcher( + rpcService, + DispatcherId.generate(), + initialJobGraphs, + dispatcherBootstrapFactory, + new DispatcherServices( + configuration, + haServices, + () -> CompletableFuture.completedFuture(resourceManagerGateway), + blobServer, + heartbeatServices, + executionGraphInfoStore, + testingFatalErrorHandlerResource.getFatalErrorHandler(), + VoidHistoryServerArchivist.INSTANCE, + null, + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), + jobGraphWriter, + jobManagerRunnerFactory, + ForkJoinPool.commonPool())); + } } private Tuple2 getBlockingJobGraphAndVertex() { @@ -1038,23 +982,6 @@ private Tuple2 getBlockingJobGraphAndVertex() { blockingJobVertex); } - private static class FailingJobVertex extends JobVertex { - - private static final long serialVersionUID = 3218428829168840760L; - - private final Exception failure; - - private FailingJobVertex(String name, Exception failure) { - super(name); - this.failure = failure; - } - - @Override - public void initializeOnMaster(ClassLoader loader) throws Exception { - throw failure; - } - } - private static final class ExpectedJobIdJobManagerRunnerFactory implements JobManagerRunnerFactory { @@ -1100,7 +1027,7 @@ public JobManagerRunner createJobManagerRunner( private static class BlockingJobVertex extends JobVertex { private final OneShotLatch oneShotLatch = new OneShotLatch(); - public BlockingJobVertex(String name) { + private BlockingJobVertex(String name) { super(name); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java index 7ec2bd1f5fa70..b2876bc2de2e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.configuration.BlobServerOptions; @@ -29,6 +28,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; @@ -41,7 +41,6 @@ import org.apache.flink.runtime.testutils.MiniClusterResource; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -55,19 +54,17 @@ import java.io.File; import java.io.FilenameFilter; import java.net.InetSocketAddress; -import java.nio.file.NoSuchFileException; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -181,23 +178,21 @@ private void testBlobServerCleanup(final TestCase testCase) throws Exception { jobGraph.addUserJarBlobKey(new PermanentBlobKey()); } - final JobSubmissionResult jobSubmissionResult = miniCluster.submitJob(jobGraph).get(); + final CompletableFuture submissionFuture = + miniCluster.submitJob(jobGraph); if (testCase == TestCase.JOB_SUBMISSION_FAILS) { - // Wait for submission to fail & check if exception is forwarded - Optional exception = - miniCluster.requestJobResult(jid).get().getSerializedThrowable(); - assertTrue(exception.isPresent()); - assertTrue( - ExceptionUtils.findThrowableSerializedAware( - exception.get(), - NoSuchFileException.class, - getClass().getClassLoader()) - .isPresent()); - - // check job status - assertThat(miniCluster.getJobStatus(jid).get(), is(JobStatus.FAILED)); + try { + submissionFuture.get(); + fail("Expected job submission failure."); + } catch (ExecutionException e) { + assertThat( + ExceptionUtils.findThrowable(e, JobSubmissionException.class).isPresent(), + is(true)); + } } else { + final JobSubmissionResult jobSubmissionResult = submissionFuture.get(); + assertThat(jobSubmissionResult.getJobID(), is(jid)); final CompletableFuture resultFuture = miniCluster.requestJobResult(jid); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java index 4f5c42e216b53..e9266156e5572 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; @@ -29,10 +30,13 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory; import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -40,6 +44,7 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.runtime.util.TestingUserCodeClassLoader; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -50,13 +55,22 @@ import org.junit.rules.TemporaryFolder; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.time.Duration; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -65,6 +79,8 @@ /** Tests for the {@link JobManagerRunnerImpl}. */ public class JobManagerRunnerImplTest extends TestLogger { + private static final Time TESTING_TIMEOUT = Time.seconds(10); + @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); private static JobGraph jobGraph; @@ -148,9 +164,7 @@ public void testJobFinishedByOther() throws Exception { jobManagerRunner.jobFinishedByOther(); - final JobManagerRunnerResult jobManagerRunnerResult = resultFuture.get(); - - assertTrue(jobManagerRunnerResult.isJobNotFinished()); + assertJobNotFinished(resultFuture); } finally { jobManagerRunner.close(); } @@ -170,9 +184,7 @@ public void testShutDown() throws Exception { jobManagerRunner.closeAsync(); - final JobManagerRunnerResult jobManagerRunnerResult = resultFuture.join(); - - assertTrue(jobManagerRunnerResult.isJobNotFinished()); + assertJobNotFinished(resultFuture); } finally { jobManagerRunner.close(); } @@ -219,7 +231,7 @@ public void testConcurrentLeadershipOperationsBlockingClose() throws Exception { TestingJobMasterServiceFactory jobMasterServiceFactory = new TestingJobMasterServiceFactory( - () -> new TestingJobMasterService("localhost", terminationFuture)); + () -> new TestingJobMasterService("localhost", terminationFuture, null)); JobManagerRunner jobManagerRunner = createJobManagerRunner(jobMasterServiceFactory); jobManagerRunner.start(); @@ -252,7 +264,7 @@ public void testJobMasterServiceTerminatesUnexpectedlyTriggersFailure() throws E TestingJobMasterServiceFactory jobMasterServiceFactory = new TestingJobMasterServiceFactory( - () -> new TestingJobMasterService("localhost", terminationFuture)); + () -> new TestingJobMasterService("localhost", terminationFuture, null)); JobManagerRunner jobManagerRunner = createJobManagerRunner(jobMasterServiceFactory); jobManagerRunner.start(); @@ -290,9 +302,286 @@ public void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializat assertTrue( jobManagerRunnerResult.getInitializationFailure() instanceof JobInitializationException); + assertThat(jobManagerRunnerResult.getInitializationFailure(), containsCause(testException)); + } + + @Test + public void testJobMasterShutDownOnRunnerShutdownDuringJobMasterInitialization() + throws Exception { + final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory = + new BlockingJobMasterServiceFactory(); + + final JobManagerRunner jobManagerRunner = + createJobManagerRunner(blockingJobMasterServiceFactory); + + jobManagerRunner.start(); + + leaderElectionService.isLeader(UUID.randomUUID()); + + TestingJobMasterService testingJobMasterService = + blockingJobMasterServiceFactory.waitForBlockingOnInit(); + + CompletableFuture closeFuture = jobManagerRunner.closeAsync(); + + blockingJobMasterServiceFactory.unblock(); + + closeFuture.get(); + + assertJobNotFinished(jobManagerRunner.getResultFuture()); + + assertThat(testingJobMasterService.isClosed(), is(true)); + } + + @Test + public void testJobMasterShutdownOnLeadershipLossDuringInitialization() throws Exception { + final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory = + new BlockingJobMasterServiceFactory(); + + final JobManagerRunner jobManagerRunner = + createJobManagerRunner(blockingJobMasterServiceFactory); + + jobManagerRunner.start(); + + leaderElectionService.isLeader(UUID.randomUUID()); + + TestingJobMasterService testingJobMasterService = + blockingJobMasterServiceFactory.waitForBlockingOnInit(); + + leaderElectionService.notLeader(); + + blockingJobMasterServiceFactory.unblock(); + + // assert termination of testingJobMaster + testingJobMasterService.getTerminationFuture().get(); + assertThat(testingJobMasterService.isClosed(), is(true)); + } + + @Test + public void testJobCancellationOnCancellationDuringInitialization() throws Exception { + AtomicBoolean cancelCalled = new AtomicBoolean(false); + JobMasterGateway jobMasterGateway = + new TestingJobMasterGatewayBuilder() + .setCancelFunction( + () -> { + cancelCalled.set(true); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .build(); + + TestingJobMasterService testingJobMasterService = + new TestingJobMasterService(jobMasterGateway); + final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory = + new BlockingJobMasterServiceFactory(() -> testingJobMasterService); + + final JobManagerRunner jobManagerRunner = + createJobManagerRunner(blockingJobMasterServiceFactory); + + jobManagerRunner.start(); + + leaderElectionService.isLeader(UUID.randomUUID()); + + blockingJobMasterServiceFactory.waitForBlockingOnInit(); + + // cancel during init + CompletableFuture cancellationFuture = + jobManagerRunner.cancel(TESTING_TIMEOUT); + + assertThat(cancellationFuture.isDone(), is(false)); + + blockingJobMasterServiceFactory.unblock(); + + // assert that cancellation future completes when cancellation completes. + cancellationFuture.get(); + assertThat(cancelCalled.get(), is(true)); + } + + @Test + public void testJobInformationOperationsDuringInitialization() throws Exception { + final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory = + new BlockingJobMasterServiceFactory(); + + final JobManagerRunner jobManagerRunner = + createJobManagerRunner(blockingJobMasterServiceFactory); + + jobManagerRunner.start(); + + // assert initializing while waiting for leadership + assertInitializingStates(jobManagerRunner); + + // assign leadership + leaderElectionService.isLeader(UUID.randomUUID()); + + // assert initializing while JobMaster is blocked + assertInitializingStates(jobManagerRunner); + blockingJobMasterServiceFactory.unblock(); + } + + private static void assertInitializingStates(JobManagerRunner jobManagerRunner) + throws ExecutionException, InterruptedException { + assertThat( + jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(), + is(JobStatus.INITIALIZING)); + assertThat(jobManagerRunner.getResultFuture().isDone(), is(false)); assertThat( - jobManagerRunnerResult.getInitializationFailure(), - FlinkMatchers.containsCause(testException)); + jobManagerRunner + .requestJob(TESTING_TIMEOUT) + .get() + .getArchivedExecutionGraph() + .getState(), + is(JobStatus.INITIALIZING)); + + assertThat( + jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get().getStatus(), + is(JobStatus.INITIALIZING)); + } + + @Test + public void testShutdownInInitializedState() throws Exception { + final JobManagerRunnerImpl jobManagerRunner = createJobManagerRunner(); + jobManagerRunner.start(); + // grant leadership to finish initialization + leaderElectionService.isLeader(UUID.randomUUID()).get(); + + assertThat(jobManagerRunner.isInitialized(), is(true)); + + jobManagerRunner.close(); + + assertJobNotFinished(jobManagerRunner.getResultFuture()); + } + + @Test + public void testShutdownWhileWaitingForCancellationDuringInitialization() throws Exception { + final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory = + new BlockingJobMasterServiceFactory(); + + final JobManagerRunner jobManagerRunner = + createJobManagerRunner(blockingJobMasterServiceFactory); + + jobManagerRunner.start(); + + leaderElectionService.isLeader(UUID.randomUUID()); + + blockingJobMasterServiceFactory.waitForBlockingOnInit(); + + // cancel while initializing + assertThat( + jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(), + is(JobStatus.INITIALIZING)); + + CompletableFuture cancelFuture = jobManagerRunner.cancel(TESTING_TIMEOUT); + assertThat(cancelFuture.isDone(), is(false)); + + CompletableFuture closeFuture = jobManagerRunner.closeAsync(); + assertThat(closeFuture.isDone(), is(false)); + + // the close operation finishes only once the initialization finishes + blockingJobMasterServiceFactory.unblock(); + + assertThat(cancelFuture.isCompletedExceptionally(), is(true)); + assertJobNotFinished(jobManagerRunner.getResultFuture()); + } + + @Test + public void testCancellationAfterInitialization() throws Exception { + AtomicBoolean cancelCalled = new AtomicBoolean(false); + JobMasterGateway testingGateway = + new TestingJobMasterGatewayBuilder() + .setCancelFunction( + () -> { + cancelCalled.set(true); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .build(); + TestingJobMasterServiceFactory jobMasterServiceFactory = + new TestingJobMasterServiceFactory( + () -> new TestingJobMasterService(testingGateway)); + final JobManagerRunnerImpl jobManagerRunner = + createJobManagerRunner(jobMasterServiceFactory); + jobManagerRunner.start(); + // grant leadership to finish initialization + leaderElectionService.isLeader(UUID.randomUUID()).get(); + + assertThat(jobManagerRunner.isInitialized(), is(true)); + + jobManagerRunner.cancel(TESTING_TIMEOUT).get(); + assertThat(cancelCalled.get(), is(true)); + } + + // It can happen that a series of leadership operations happens while the JobMaster + // initialization is blocked. This test is to ensure that we are not starting-stopping + // JobMasters for all pending leadership grants, but only for the latest. + @Test + public void testSkippingOfEnqueuedLeadershipOperations() throws Exception { + final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory = + new BlockingJobMasterServiceFactory(); + + final JobManagerRunner jobManagerRunner = + createJobManagerRunner(blockingJobMasterServiceFactory); + + jobManagerRunner.start(); + + // first leadership assignment to get into blocking initialization + leaderElectionService.isLeader(UUID.randomUUID()); + + blockingJobMasterServiceFactory.waitForBlockingOnInit(); + + // we are now blocked on the initialization, enqueue some operations: + for (int i = 0; i < 10; i++) { + leaderElectionService.notLeader(); + leaderElectionService.isLeader(UUID.randomUUID()); + } + + blockingJobMasterServiceFactory.unblock(); + + // wait until the second JobMaster has been created + blockingJobMasterServiceFactory.waitForBlockingOnInit(); + + assertThat( + blockingJobMasterServiceFactory.getNumberOfJobMasterInstancesCreated(), equalTo(2)); + } + + @Test + public void testCancellationFailsWhenInitializationFails() throws Exception { + final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory = + new BlockingJobMasterServiceFactory(); + + final JobManagerRunner jobManagerRunner = + createJobManagerRunner(blockingJobMasterServiceFactory); + + jobManagerRunner.start(); + + leaderElectionService.isLeader(UUID.randomUUID()); + + blockingJobMasterServiceFactory.waitForBlockingOnInit(); + + // cancel while initializing + assertThat( + jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(), + is(JobStatus.INITIALIZING)); + + CompletableFuture cancelFuture = jobManagerRunner.cancel(TESTING_TIMEOUT); + assertThat(cancelFuture.isDone(), is(false)); + + blockingJobMasterServiceFactory.failBlockingInitialization(); + + try { + cancelFuture.get(); + fail(); + } catch (Throwable t) { + assertThat( + t, + containsMessage("Cancellation failed because JobMaster initialization failed")); + } + assertThat(jobManagerRunner.getResultFuture().get().isInitializationFailure(), is(true)); + } + + private void assertJobNotFinished(CompletableFuture resultFuture) { + try { + resultFuture.get(); + fail(); + } catch (Throwable t) { + assertThat(t, containsCause(JobNotFinishedException.class)); + } } @Nonnull @@ -308,8 +597,8 @@ private JobManagerRunnerImpl createJobManagerRunner() throws Exception { } @Nonnull - private JobManagerRunner createJobManagerRunner(JobMasterServiceFactory jobMasterServiceFactory) - throws Exception { + private JobManagerRunnerImpl createJobManagerRunner( + JobMasterServiceFactory jobMasterServiceFactory) throws Exception { return createJobManagerRunner( jobMasterServiceFactory, TestingClassLoaderLease.newBuilder().build()); } @@ -328,4 +617,68 @@ private JobManagerRunnerImpl createJobManagerRunner( fatalErrorHandler, System.currentTimeMillis()); } + + public static class BlockingJobMasterServiceFactory implements JobMasterServiceFactory { + + private final OneShotLatch blocker = new OneShotLatch(); + private final BlockingQueue jobMasterServicesQueue = + new ArrayBlockingQueue(1); + private final Supplier testingJobMasterServiceSupplier; + private int numberOfJobMasterInstancesCreated = 0; + private FlinkException initializationException = null; + + public BlockingJobMasterServiceFactory() { + this((JobMasterGateway) null); + } + + public BlockingJobMasterServiceFactory(@Nullable JobMasterGateway jobMasterGateway) { + this(() -> new TestingJobMasterService(null, null, jobMasterGateway)); + } + + public BlockingJobMasterServiceFactory( + Supplier testingJobMasterServiceSupplier) { + this.testingJobMasterServiceSupplier = testingJobMasterServiceSupplier; + } + + @Override + public JobMasterService createJobMasterService( + JobGraph jobGraph, + JobMasterId jobMasterId, + OnCompletionActions jobCompletionActions, + ClassLoader userCodeClassloader, + long initializationTimestamp) + throws Exception { + TestingJobMasterService service = testingJobMasterServiceSupplier.get(); + jobMasterServicesQueue.offer(service); + + blocker.await(); + if (initializationException != null) { + throw initializationException; + } + numberOfJobMasterInstancesCreated++; + return service; + } + + public void unblock() { + blocker.trigger(); + } + + public TestingJobMasterService waitForBlockingOnInit() + throws ExecutionException, InterruptedException { + return jobMasterServicesQueue.take(); + } + + public int getNumberOfJobMasterInstancesCreated() { + return numberOfJobMasterInstancesCreated; + } + + public void failBlockingInitialization() { + Preconditions.checkState( + !blocker.isTriggered(), + "This only works before the initialization has been unblocked"); + this.initializationException = + new FlinkException("Test exception during initialization"); + unblock(); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java index d90d753b97bb7..21125ce2e62ce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerResultTest.java @@ -43,17 +43,6 @@ public void testSuccessfulJobManagerResult() { JobManagerRunnerResult.forSuccess(executionGraphInfo); assertTrue(jobManagerRunnerResult.isSuccess()); - assertFalse(jobManagerRunnerResult.isJobNotFinished()); - assertFalse(jobManagerRunnerResult.isInitializationFailure()); - } - - @Test - public void testJobNotFinishedJobManagerResult() { - final JobManagerRunnerResult jobManagerRunnerResult = - JobManagerRunnerResult.forJobNotFinished(); - - assertTrue(jobManagerRunnerResult.isJobNotFinished()); - assertFalse(jobManagerRunnerResult.isSuccess()); assertFalse(jobManagerRunnerResult.isInitializationFailure()); } @@ -64,7 +53,6 @@ public void testInitializationFailureJobManagerResult() { assertTrue(jobManagerRunnerResult.isInitializationFailure()); assertFalse(jobManagerRunnerResult.isSuccess()); - assertFalse(jobManagerRunnerResult.isJobNotFinished()); } @Test @@ -75,14 +63,6 @@ public void testGetArchivedExecutionGraphFromSuccessfulJobManagerResult() { assertThat(jobManagerRunnerResult.getExecutionGraphInfo(), is(executionGraphInfo)); } - @Test(expected = IllegalStateException.class) - public void testGetArchivedExecutionGraphFromJobNotFinishedFails() { - final JobManagerRunnerResult jobManagerRunnerResult = - JobManagerRunnerResult.forJobNotFinished(); - - jobManagerRunnerResult.getExecutionGraphInfo(); - } - @Test(expected = IllegalStateException.class) public void testGetArchivedExecutionGraphFromInitializationFailureFails() { final JobManagerRunnerResult jobManagerRunnerResult = @@ -99,14 +79,6 @@ public void testGetInitializationFailureFromFailedJobManagerResult() { assertThat(jobManagerRunnerResult.getInitializationFailure(), is(testException)); } - @Test(expected = IllegalStateException.class) - public void testGetInitializationFailureFromJobNotFinished() { - final JobManagerRunnerResult jobManagerRunnerResult = - JobManagerRunnerResult.forJobNotFinished(); - - jobManagerRunnerResult.getInitializationFailure(); - } - @Test(expected = IllegalStateException.class) public void testGetInitializationFailureFromSuccessfulJobManagerResult() { final JobManagerRunnerResult jobManagerRunnerResult = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java index 38e98613fb289..e7440c0b0163f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java @@ -19,7 +19,11 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.util.Preconditions; @@ -40,6 +44,8 @@ public class TestingJobManagerRunner implements JobManagerRunner { private final OneShotLatch closeAsyncCalledLatch = new OneShotLatch(); + private JobStatus jobStatus = JobStatus.INITIALIZING; + private TestingJobManagerRunner( JobID jobId, boolean blockingTermination, @@ -74,6 +80,31 @@ public JobID getJobID() { return jobId; } + @Override + public CompletableFuture cancel(Time timeout) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture requestJobStatus(Time timeout) { + return CompletableFuture.completedFuture(jobStatus); + } + + @Override + public CompletableFuture requestJobDetails(Time timeout) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture requestJob(Time timeout) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isInitialized() { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture closeAsync() { if (!blockingTermination) { @@ -84,6 +115,10 @@ public CompletableFuture closeAsync() { return terminationFuture; } + public void setJobStatus(JobStatus newStatus) { + this.jobStatus = newStatus; + } + public OneShotLatch getCloseAsyncCalledLatch() { return closeAsyncCalledLatch; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java index 0cc2b6b9e08fe..27887f99013a5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterService.java @@ -38,10 +38,16 @@ public class TestingJobMasterService implements JobMasterService { private final boolean completeTerminationFutureOnCloseAsync; public TestingJobMasterService( - @Nonnull String address, @Nullable CompletableFuture terminationFuture) { + @Nonnull String address, + @Nullable CompletableFuture terminationFuture, + @Nullable JobMasterGateway gateway) { this.address = address; - jobMasterGateway = new TestingJobMasterGatewayBuilder().build(); + if (gateway == null) { + jobMasterGateway = new TestingJobMasterGatewayBuilder().build(); + } else { + jobMasterGateway = gateway; + } if (terminationFuture == null) { this.terminationFuture = new CompletableFuture<>(); @@ -52,8 +58,12 @@ public TestingJobMasterService( } } + public TestingJobMasterService(JobMasterGateway gateway) { + this("localhost", null, gateway); + } + public TestingJobMasterService() { - this("localhost", null); + this("localhost", null, null); } @Override @@ -78,7 +88,10 @@ public CompletableFuture closeAsync() { if (completeTerminationFutureOnCloseAsync) { terminationFuture.complete(null); } - return terminationFuture; } + + public boolean isClosed() { + return terminationFuture.isDone(); + } } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java new file mode 100644 index 0000000000000..610d2ef4d2476 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; +import static org.junit.Assert.assertThat; + +/** Integration tests for the {@link JobMaster}. */ +public class JobMasterITCase extends TestLogger { + private static final String FAILURE_MESSAGE = "Intentional Test failure"; + + /** + * This test is to guard against the issue reported in FLINK-22001, where any exception from the + * JobManager initialization was not forwarded to the user. + */ + @Test + public void testJobManagerInitializationExceptionsAreForwardedToTheUser() { + // we must use the LocalStreamEnvironment to reproduce this issue. + // It passes with the TestStreamEnvironment (which is initialized by the + // MiniClusterResource). The LocalStreamEnvironment is polling the JobManager for the job + // status, while TestStreamEnvironment is waiting on the resultFuture. + StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironment(); + + Source mySource = new FailOnInitializationSource(); + DataStream stream = + see.fromSource(mySource, WatermarkStrategy.noWatermarks(), "MySourceName"); + stream.addSink(new DiscardingSink<>()); + + try { + see.execute(); + } catch (Exception e) { + log.info("caught", e); + assertThat(e, containsMessage("Context was not yet initialized")); + } + } + + private static class FailOnInitializationSource implements Source { + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) + throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) throws Exception { + // here, we fail in the JobMaster + throw new RuntimeException(FAILURE_MESSAGE); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, Void checkpoint) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + throw new RuntimeException(FAILURE_MESSAGE); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + throw new UnsupportedOperationException(); + } + } + + private static class MockSplit implements SourceSplit { + @Override + public String splitId() { + throw new UnsupportedOperationException(); + } + } +}