-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-22001] Forward exceptions thrown during initialization to the user #15577
[FLINK-22001] Forward exceptions thrown during initialization to the user #15577
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 100c6c5 (Fri May 28 09:10:53 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
For the review of the PR it would be great to also capture somewhere the description and analysis of the problem. I know we talked about it, but I already forgot the details ;-) |
The problem seems to be the following: Since we create the If now the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @rmetzger. I fear that this PR is not going in the right direction, tbh.
I think the underlying problem is that you try to mirror the state of the JobManagerRunner
in the DispatcherJob
which overly couples these components and possibly not really solves the underlying problem.
If you think about why we introduced the DispatcherJob
, then it was to handle the asynchronous creation of the JobManagerRunner
. Now the JobManagerRunner
can be created synchronously and the asynchronous generation of the JobMasterService
has been moved to the JobManagerRunner
implementation. Hence, the DispatcherJob
is not really doing anything anymore because the JobManagerRunner
is always created.
However, what is not created is the JobMasterService
. Hence, I would suggest to move the DispatcherJob
logic which manages the job accesses when the job is not yet running into the JobManagerRunnerImpl
. By doing these accesses under the same lock as the leadership operations, we can ensure that we have a consistent view of the state of the job.
What we probably have to change in the JobManagerRunnerImpl
, however, is that we don't create the JobMasterService
under the same lock. Otherwise we risk that the Dispatcher's
main thread might be blocked while we create the JobMasterService
.
// This DispatcherJob is pending a termination. Forward | ||
// cancellation result. | ||
FutureUtils.forward( | ||
cancelFuture.thenCompose((ign) -> null), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not working and produces a NPE. Did you want to use cancelFuture.thenAccept()
?
|
||
@Nullable private final JobStatus jobStatus; | ||
|
||
Status(JobStatus jobStatus) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nullable
is missing.
leadershipOperation.thenRunAsync( | ||
ThrowingRunnable.unchecked( | ||
() -> { | ||
synchronized (lock) { | ||
verifyJobSchedulingStatusAndStartJobManager( | ||
leaderSessionID); | ||
} | ||
})); | ||
}), | ||
executor); // run in separate thread to not block main thread on | ||
// JobManager initialization. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does it happen that the main thread executor runs this part of the code?
|
||
assertThat(dispatcherJob.requestJobStatus(TIMEOUT).get(), is(JobStatus.INITIALIZING)); | ||
|
||
testContext.setRunning(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary?
} | ||
|
||
@Test | ||
public void testLeadershipLoss() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to add what should happen under leadership loss.
DispatcherJob.createFor(jobId, jobGraph.getName(), initializationTimestamp); | ||
runningJobs.put(jobId, dispatcherJob); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels odd that we first create the DispatcherJob
and then tell him about the JobManagerRunner
through some callback. Why not first creating the JobManagerRunner
and then giving it to the DispatcherJob
?
// if the termination future is set, we are signaling that this DispatcherJob is closing / has | ||
// been closed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the embodiment of an expressive and clear contract.
return jobStatus.isJobManagerCreatedOrFailed(); | ||
} | ||
} | ||
|
||
/** Returns a future completing to the ExecutionGraphInfo of the job. */ | ||
public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR effectively suffers from the same problems as the existing code. What happens if the client polling happens exactly after onJobManagerStarted
has been called. In this case, we will call getJobMasterGateway().thenCompose(gateway -> gateway.requestJob(timeout))
. If the leader loses leadership before the call can be executed (note that the leadership operation of the JobManagerRunner
runs in a separate thread), then it can happen that this call will never be answered or that you get a fresh leaderGatewayFuture
which in the future might be completed exceptionally if, for example, the JobMasterService
cannot be instantiated.
I think the underlying problem is that you try to maintain a consistent view of the JobManagerRunner
within the DispatcherJob
but the former runs independent of the latter.
public class DispatcherJobStatus { | ||
private Status status; | ||
|
||
private CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to be a future? If we transition into the Status.JOB_MANAGER_CREATED_OR_INIT_FAILED
, then the JobManagerRunner
should be known.
// Execute listener notification asynchronously in the main thread executor to make sure | ||
// this "grant leadership" operation is properly completed before the next operation is | ||
// started. With the current implementation DispatcherJob might call closeAsync() | ||
// leading to concurrent access under the "lock". | ||
FutureUtils.assertNoException( | ||
CompletableFuture.runAsync( | ||
() -> jobManagerStatusListener.onJobManagerStarted(this), | ||
mainThreadExecutor)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is design problem here. Somehow the JobManagerRunner
needs to know that some calls have to be executed by the mainThreadExecutor
. This looks like implementation details which are leaked from the DispatcherJob
into the JobManagerRunnerImpl
.
Thanks a lot for your review. Next time, I'll write the issue analysis into the Jira (as I usually do). I agree with your observation that we should remove |
0ace107
to
c5b5770
Compare
As discussed offline, this is a rough first draft of the revised JobManagerRunnerImpl, pretty much untested. |
9725f3a
to
100c6c5
Compare
I've now pushed a version of the change that should have all tests passing, and have a pretty solid test coverage. Looking forward to your feedback @tillrohrmann ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @rmetzger. I think it goes in a very good direction. Moving the responsibility of the DispatcherJob
into the JobManagerRunnerImpl
is a good decision imo because it allows us to have a consistent view of the actual state.
I had a couple of comments concerning the current implementation I think that some cases and the semantics of some fields are not fully defined yet. I also think that it could help to make the different states of the JobManagerRunnerImpl
more explicit and to define how the component should behave in which situation. I haven't managed to create a draft for such a state machine yet but that's what I will continue tomorrow morning with.
CompletableFuture<Acknowledge> cancel(Time timeout); | ||
|
||
CompletableFuture<JobStatus> requestJobStatus(Time timeout); | ||
|
||
CompletableFuture<JobDetails> requestJobDetails(Time timeout); | ||
|
||
CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout); | ||
|
||
boolean isInitialized(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JavaDocs would be nice.
checkState(cancelFuture != null); | ||
cancelFuture.completeExceptionally(shutdownException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we have to close the leaderElectionService
in case of a cancellation? Same for the resultFuture
. What is the semantics for the resultFuture
when the JobManagerRunner
is cancelled?
jobMasterService | ||
.closeAsync() | ||
.whenComplete( | ||
(Void ignored, Throwable throwable) -> | ||
onJobManagerTermination(throwable))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For chaining of shutdown operations there are some utilities like FutureUtils.runAfterwards
or so. This makes the handling of exception occurring in the individual steps easier because they are automatically added as suppressed exceptions.
t, | ||
ExceptionUtils.stripCompletionException(throwable)); | ||
} | ||
return terminationFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think it would be better if the completion of the terminationFuture
happens in this method. E.g. via using FutureUtils.forward(shutdownFuture, terminationFuture)
. That way one does not have to look around where the termination future is actually completed.
// no ongoing JobMaster initialization (waiting for leadership) --> close | ||
onJobManagerTermination(null); | ||
} | ||
// ongoing initialization, we will finish closing once it is done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just looking at this method, it is not very clear to me how the terminationFuture
is completed in case of an ongoing initialization.
Preconditions.checkState( | ||
!blocker.isTriggered(), | ||
"This only works before the initialization has been unblocked"); | ||
this.initializationException = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you set things from another thread, then we either need at least a volatile
or some other form of synchronization. An alternative to setting these kind of fields while the instance is being used is to configure it when creating an instance of this class.
@@ -101,6 +105,15 @@ | |||
|
|||
private volatile CompletableFuture<JobMasterGateway> leaderGatewayFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to properly annotate all fields in this class now with their guard.
@@ -348,17 +462,76 @@ private void startJobMaster(UUID leaderSessionId) throws FlinkException { | |||
e); | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this also qualify as an initialization failure if we cannot set the correct state in the runningJobsRegistry
?
new FlinkException( | ||
"Cancellation failed because JobMaster initialization failed")); | ||
} | ||
jobStatus = JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the JOBMASTER_INITIALIZED
if its creation failed?
getDescription()); | ||
} | ||
} | ||
|
||
@Override | ||
public void revokeLeadership() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My gut feeling is that we need to integrate the leadership calls (grant + revoke) into the internal state management of the JobManagerRunnerImpl
. For example, after cancelling the JobManagerRunner
before a leadership operation has started, it should probably not accept a grantLeadership
call.
Thanks a lot for your review! Your comments are very helpful! I thought about representing the states in the Runner more nicely as well, as there are quite a few fields that are only set in certain conditions (cancellation during initialization), and many methods distinguish between two cases (initialized vs initializing). I briefly tried setting up State classes, but it felt that I had to carry a lot of stuff around. Not sure if the pattern with the context used in Adaptive Scheduler isn't a bit of overkill here. |
Closing in favor of #15715. |
What is the purpose of the change
Open TODOs: (due to the amount of changes, and the time spend on the issue already, I'd like to get some initial feedback on the change)
JobMasterITCase.testJobManagerInitializationExceptionsAreForwardedToTheUser()
is not nice: It relies on an implementation detail.Brief change log
JobManagerStatusListener
to JobManagerRunnerDispatcherJob
to track the initialization based on the signals from theJobManagerStatusListener
.DispatcherJobStatus
class with tracks the status along with the JobManagerRunner future (they are updated together, thus we track them in one class)Verifying this change
Added some tests, adjusted some.