From e3cfa40bf67def24d4b528055bb479298241764d Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Thu, 22 Apr 2021 02:06:20 +0200 Subject: [PATCH] [FLINK-22345][coordination] Catch pre-mature state restore for Operator Coordinators --- .../OperatorCoordinatorHolder.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 8625fe295b125..10696b8fa12e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -390,20 +390,24 @@ private void setupSubtaskGateway(int subtask) { // We need to do this synchronously here, otherwise we violate the contract that // 'subtaskFailed()' will never overtake 'subtaskReady()'. - // An alternative, if we ever figure out that this cannot work synchronously here, - // is that we re-enqueue all actions (like 'subtaskFailed()' and 'subtaskRestored()') - // back into the main thread executor, rather than directly calling the OperatorCoordinator + // --- + // It is also possible that by the time this method here is called, the task execution is in + // a no-longer running state. That happens when the scheduler deals with overlapping global + // failures and the restore method is in fact not yet restoring to the new execution + // attempts, but still targeting the previous execution attempts (and is later subsumed + // by another restore to the new execution attempt). This is tricky behavior that we need + // to work around. So if the task is no longer running, we don't call the 'subtaskReady()' + // method. FutureUtils.assertNoException( sta.hasSwitchedToRunning() .thenAccept( (ignored) -> { mainThreadExecutor.assertRunningInMainThread(); - // this is a guard in case someone accidentally makes the - // notification asynchronous - assert sta.isStillRunning(); - - notifySubtaskReady(subtask, gateway); + // see bigger comment above + if (sta.isStillRunning()) { + notifySubtaskReady(subtask, gateway); + } })); }