From 543afb2b1800d7053560ca30a32d09f58f3d7dcc Mon Sep 17 00:00:00 2001 From: Antony Date: Wed, 16 Oct 2024 14:50:49 +1300 Subject: [PATCH 1/7] Double-check TaskInstance state if it differs from Executor. --- airflow/jobs/backfill_job_runner.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 961c4b7e020b3..ea9ee610ab712 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -309,6 +309,13 @@ def _manage_executor_state( self.log.debug("Executor state: %s task %s", state, ti) + if ( + state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS) + and ti.state in self.STATES_COUNT_AS_RUNNING + ): + self.log.debug(f"In-memory TaskInstance state {ti} does not agree with executor state {state}. Attempting to resolve by refreshing in-memory task instance from DB.") + ti.refresh_from_db(session=session) + if ( state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS) and ti.state in self.STATES_COUNT_AS_RUNNING From 814f0bcd619bbce969ca29dbcddf43e3256aae82 Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Mon, 9 Dec 2024 17:39:09 +0530 Subject: [PATCH 2/7] Update airflow/jobs/backfill_job_runner.py --- airflow/jobs/backfill_job_runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index ea9ee610ab712..13f0281d80b2e 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -313,7 +313,9 @@ def _manage_executor_state( state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS) and ti.state in self.STATES_COUNT_AS_RUNNING ): - self.log.debug(f"In-memory TaskInstance state {ti} does not agree with executor state {state}. Attempting to resolve by refreshing in-memory task instance from DB.") + self.log.debug( + f"In-memory TaskInstance state {ti} does not agree with executor state {state}. Attempting to resolve by refreshing in-memory task instance from DB." + ) ti.refresh_from_db(session=session) if ( From 88a241f14facbbff106c4ed5712432c0c6aec10e Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Mon, 9 Dec 2024 17:52:40 +0530 Subject: [PATCH 3/7] Update airflow/jobs/backfill_job_runner.py --- airflow/jobs/backfill_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 13f0281d80b2e..fceaa22f4cf5e 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -314,7 +314,7 @@ def _manage_executor_state( and ti.state in self.STATES_COUNT_AS_RUNNING ): self.log.debug( - f"In-memory TaskInstance state {ti} does not agree with executor state {state}. Attempting to resolve by refreshing in-memory task instance from DB." + "In-memory TaskInstance state %s does not agree with executor state {state}. Attempting to resolve by refreshing in-memory task instance from DB.", ti ) ti.refresh_from_db(session=session) From 3b8b3971fe66b979b53ff4a8fef3c64009aeb962 Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Mon, 9 Dec 2024 18:10:54 +0530 Subject: [PATCH 4/7] Update airflow/jobs/backfill_job_runner.py --- airflow/jobs/backfill_job_runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index fceaa22f4cf5e..81f76cac3aa87 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -314,7 +314,8 @@ def _manage_executor_state( and ti.state in self.STATES_COUNT_AS_RUNNING ): self.log.debug( - "In-memory TaskInstance state %s does not agree with executor state {state}. Attempting to resolve by refreshing in-memory task instance from DB.", ti + "In-memory TaskInstance state %s does not agree with executor state %s. Attempting to resolve by refreshing in-memory task instance from DB.", + ti, state ) ti.refresh_from_db(session=session) From 95753618d40aac1d323a0563d2dd081e1ae673ec Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Mon, 9 Dec 2024 19:17:30 +0530 Subject: [PATCH 5/7] Update airflow/jobs/backfill_job_runner.py --- airflow/jobs/backfill_job_runner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 81f76cac3aa87..94d737df9ccf2 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -314,8 +314,9 @@ def _manage_executor_state( and ti.state in self.STATES_COUNT_AS_RUNNING ): self.log.debug( - "In-memory TaskInstance state %s does not agree with executor state %s. Attempting to resolve by refreshing in-memory task instance from DB.", - ti, state + "In-memory TaskInstance state %s does not agree with executor state %s. Attempting to resolve by refreshing in-memory task instance from DB.", + ti, + state, ) ti.refresh_from_db(session=session) From d4435a73fa688a56d036be3352cc194465a1ba3b Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Mon, 9 Dec 2024 19:34:25 +0530 Subject: [PATCH 6/7] Update airflow/jobs/backfill_job_runner.py --- airflow/jobs/backfill_job_runner.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 94d737df9ccf2..9d40ca6ce43bc 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -314,10 +314,10 @@ def _manage_executor_state( and ti.state in self.STATES_COUNT_AS_RUNNING ): self.log.debug( - "In-memory TaskInstance state %s does not agree with executor state %s. Attempting to resolve by refreshing in-memory task instance from DB.", - ti, - state, - ) + "In-memory TaskInstance state %s does not agree with executor state %s. Attempting to resolve by refreshing in-memory task instance from DB.", + ti, + state, + ) ti.refresh_from_db(session=session) if ( From 6f492895abb2a3e49dea3f061989ee983ff74f73 Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Mon, 9 Dec 2024 19:47:16 +0530 Subject: [PATCH 7/7] Update airflow/jobs/backfill_job_runner.py --- airflow/jobs/backfill_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py index 9d40ca6ce43bc..305eaff84be7d 100644 --- a/airflow/jobs/backfill_job_runner.py +++ b/airflow/jobs/backfill_job_runner.py @@ -317,7 +317,7 @@ def _manage_executor_state( "In-memory TaskInstance state %s does not agree with executor state %s. Attempting to resolve by refreshing in-memory task instance from DB.", ti, state, - ) + ) ti.refresh_from_db(session=session) if (