diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 961c4b7e020b3..305eaff84be7d 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -309,6 +309,17 @@ def _manage_executor_state( self.log.debug("Executor state: %s task %s", state, ti) + if ( + state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS) + and ti.state in self.STATES_COUNT_AS_RUNNING + ): + self.log.debug( + "In-memory TaskInstance state %s does not agree with executor state %s. Attempting to resolve by refreshing in-memory task instance from DB.", + ti, + state, + ) + ti.refresh_from_db(session=session) + if ( state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS) and ti.state in self.STATES_COUNT_AS_RUNNING