Skip to content

Commit

Permalink
[FLINK-22345][coordination] Catch pre-mature state restore for Operat…
Browse files Browse the repository at this point in the history
…or Coordinators
  • Loading branch information
StephanEwen committed Apr 22, 2021
1 parent fa0be65 commit e3cfa40
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,20 +390,24 @@ private void setupSubtaskGateway(int subtask) {

// We need to do this synchronously here, otherwise we violate the contract that
// 'subtaskFailed()' will never overtake 'subtaskReady()'.
// An alternative, if we ever figure out that this cannot work synchronously here,
// is that we re-enqueue all actions (like 'subtaskFailed()' and 'subtaskRestored()')
// back into the main thread executor, rather than directly calling the OperatorCoordinator
// ---
// It is also possible that by the time this method here is called, the task execution is in
// a no-longer running state. That happens when the scheduler deals with overlapping global
// failures and the restore method is in fact not yet restoring to the new execution
// attempts, but still targeting the previous execution attempts (and is later subsumed
// by another restore to the new execution attempt). This is tricky behavior that we need
// to work around. So if the task is no longer running, we don't call the 'subtaskReady()'
// method.
FutureUtils.assertNoException(
sta.hasSwitchedToRunning()
.thenAccept(
(ignored) -> {
mainThreadExecutor.assertRunningInMainThread();

// this is a guard in case someone accidentally makes the
// notification asynchronous
assert sta.isStillRunning();

notifySubtaskReady(subtask, gateway);
// see bigger comment above
if (sta.isStillRunning()) {
notifySubtaskReady(subtask, gateway);
}
}));
}

Expand Down

0 comments on commit e3cfa40

Please sign in to comment.