Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExternalTaskSensor doesn't timeout if external DAG doesn't exist - dag.test() #34497

Closed
1 of 2 tasks
cbuffett opened this issue Sep 20, 2023 · 12 comments
Closed
1 of 2 tasks
Assignees
Labels

Comments

@cbuffett
Copy link

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

My DAG has a number of tasks, the first of which is an ExternalTaskSensor. This sensor functions correctly when the external DAG exists (normal operation/deployment). However, when using dag.test() to debug the DAG, the ExternalTaskSensor never terminates, rescheduling itself indefinitely. I believe this happens because in this situation, the external DAG doesn't exist.

Using check_existence isn't an option as this immediately throws an exception and terminates the debugger. Using soft_fail and/or silent_fail result in the exception being logged instead of thrown, but the ExternalTaskSensor continues to reschedule itself.

After some debugging, what I noticed is that the start_date keeps being reset to the current time, because task_reschedules is always empty

   def execute(self, context: Context) -> Any:
        started_at: datetime.datetime | float

        if self.reschedule:

            # If reschedule, use the start date of the first try (first try can be either the very
            # first execution of the task, or the first execution after the task was cleared.)
            first_try_number = context["ti"].max_tries - self.retries + 1
            task_reschedules = TaskReschedule.find_for_task_instance(
                context["ti"], try_number=first_try_number
            )
            if not task_reschedules:  # This is always empty
                start_date = timezone.utcnow()
            else:
                start_date = task_reschedules[0].start_date
            started_at = start_date

            def run_duration() -> float:
                # If we are in reschedule mode, then we have to compute diff
                # based on the time in a DB, so can't use time.monotonic
                return (timezone.utcnow() - start_date).total_seconds()

What you think should happen instead

A way to ignore/skip ExternalTaskSensors when using dag.test(). At the very least, the ExternalTaskSensor should respect the timeout value provided.

How to reproduce

Running a DAG with the following ExternalTaskSensor using dag.test()

    external_task_sensor = ExternalTaskSensor(
        task_id='external_dag_sensor',
        poke_interval=60,
        timeout=300,
        soft_fail=True,
        retries=0,
        external_dag_id=NON_EXISTENT_DAG,
        execution_date_fn=return_date,  # Since the external DAG doesn't exist, this function just returns the dt passed in
        allowed_states=[State.SUCCESS],
        failed_states=[State.FAILED],
        mode="reschedule"
    )

Operating System

Ubuntu 22.04

Versions of Apache Airflow Providers

apache-airflow==2.6.1
apache-airflow-providers-amazon==8.3.1
apache-airflow-providers-apache-hive==6.0.0
apache-airflow-providers-cncf-kubernetes==6.1.0
apache-airflow-providers-common-sql==1.4.0
apache-airflow-providers-ftp==3.3.1
apache-airflow-providers-google==10.0.0
apache-airflow-providers-http==4.3.0
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-jdbc==3.3.0
apache-airflow-providers-microsoft-mssql==3.3.2
apache-airflow-providers-mysql==5.0.0
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-slack==7.3.1
apache-airflow-providers-snowflake==4.0.5
apache-airflow-providers-sqlite==3.3.2
apache-airflow-providers-ssh==3.6.0

Deployment

Other

Deployment details

No response

Anything else

Log entry showing the DAG continuing to reschedule itself well past the timeout period

[2023-09-19T23:04:30.555-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-19T23:04:30.555-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-19 23:04:32,082] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19T23:04:32.082-0700] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19 23:04:32,104] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:04:32.104-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:04:32.104-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-19T23:04:32.105-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-19T23:05:32.095-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-19T23:05:32.096-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-19 23:05:32,995] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19T23:05:32.995-0700] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19 23:05:33,010] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:05:33.010-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:05:33.011-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-19T23:05:33.011-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-19T23:06:33.013-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-19T23:06:33.014-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-19 23:06:33,921] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19T23:06:33.921-0700] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19 23:06:33,936] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:06:33.936-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:06:33.936-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-19T23:06:33.936-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-19T23:07:33.987-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-19T23:07:33.987-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-19 23:07:34,871] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19T23:07:34.871-0700] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19 23:07:34,886] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:07:34.886-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:07:34.886-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-19T23:07:34.886-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-19T23:08:34.888-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-19T23:08:34.889-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-19 23:08:35,784] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19T23:08:35.784-0700] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19 23:08:35,800] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:08:35.800-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:08:35.800-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-19T23:08:35.800-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-19T23:09:35.799-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-19T23:09:35.799-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-19 23:09:36,706] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19T23:09:36.706-0700] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19 23:09:36,722] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:09:36.722-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:09:36.723-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-19T23:09:36.723-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-19T23:10:36.720-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-19T23:10:36.720-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-19 23:10:37,606] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19T23:10:37.606-0700] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19 23:10:37,621] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:10:37.621-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:10:37.621-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-19T23:10:37.621-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-19T23:11:37.619-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-19T23:11:37.619-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-19 23:11:38,516] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19T23:11:38.516-0700] {external_task.py:247} INFO - Poking for DAG 'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
[2023-09-19 23:11:38,532] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:11:38.532-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:11:38.533-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-19T23:11:38.534-0700] {dag.py:3694} INFO - *****************************************************

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@cbuffett cbuffett added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Sep 20, 2023
@cbuffett
Copy link
Author

Some further testing shows this doesn't seem to affect the poke mode; it times out correctly. This may be a workaround for my specific use case for now.

@utkarsharma2 utkarsharma2 self-assigned this Sep 21, 2023
@utkarsharma2
Copy link
Contributor

utkarsharma2 commented Sep 21, 2023

@cbuffett Can you please share your testing script?

is it something like the below script?

dag = DAG(dag_id='DAG-1',
        default_args=default_args,
        schedule_interval='@once', 
        catchup=False
    )
with dag:
    external_task_sensor = ExternalTaskSensor(
        task_id='external_dag_sensor',
        poke_interval=60,
        timeout=300,
        soft_fail=True,
        retries=0,
        external_dag_id=NON_EXISTENT_DAG,
        execution_date_fn=return_date,  # Since the external DAG doesn't exist, this function just returns the dt passed in
        allowed_states=[State.SUCCESS],
        failed_states=[State.FAILED],
        mode="reschedule"
    )
dag.test()

@cbuffett
Copy link
Author

@utkarsharma2 Yes, that script should easily reproduce the infinite rescheduling.

@hussein-awala
Copy link
Member

@utkarsharma2 did you reproduce the issue? if so, could you remove the needs-triage label?

@utkarsharma2
Copy link
Contributor

utkarsharma2 commented Sep 25, 2023

@cbuffett I'm not able to reproduce this locally with the latest airflow code, I ran below script:

import datetime
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.models.taskinstance import State

default_args = {
    'owner': 'test',
    'start_date': datetime.datetime(2023, 1, 1)
}
dag = DAG(dag_id='DAG-1',
        default_args=default_args,
        schedule_interval='@once',
        catchup=False
    )
with dag:
    external_task_sensor = ExternalTaskSensor(
        task_id='external_dag_sensor',
        poke_interval=60,
        timeout=300,
        soft_fail=True,
        retries=0,
        external_dag_id="some_dag",
        execution_date_fn=datetime.datetime(2023, 1, 1),
        allowed_states=[State.SUCCESS],
        failed_states=[State.FAILED],
        mode="reschedule"
    )
if __name__ == "__main__" :
    dag.test()

can you please specify which airflow version you are on?

@utkarsharma2 utkarsharma2 added pending-response and removed needs-triage label for new issues that we didn't triage yet labels Sep 25, 2023
@cbuffett
Copy link
Author

I'm on Airflow 2.6.1.

I ran your test script, but it had some errors due to value being passed to execution_date_fn not being a function. After fixing that (see below), I was able to reproduce the issue

import datetime
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.models.taskinstance import State

def return_date(dt):
    return dt

default_args = {
    'owner': 'test',
    'start_date': datetime.datetime(2023, 1, 1)
}
dag = DAG(dag_id='DAG-1',
        default_args=default_args,
        schedule='@once',
        catchup=False
    )
with dag:
    external_task_sensor = ExternalTaskSensor(
        task_id='external_dag_sensor',
        poke_interval=60,
        timeout=300,
        soft_fail=True,
        retries=0,
        external_dag_id="some_dag",
        execution_date_fn=return_date,
        allowed_states=[State.SUCCESS],
        failed_states=[State.FAILED],
        mode="reschedule"
    )
if __name__ == "__main__" :
    dag.test()
[2023-09-26T11:01:48.848-0700] {dag.py:3716} INFO - dagrun id: DAG-1
[2023-09-26T11:01:48.859-0700] {dag.py:3733} INFO - created dagrun <DagRun DAG-1 @ 2023-09-26T18:01:48.838825+00:00: manual__2023-09-26T18:01:48.838825+00:00, state:running, queued_at: None. externally triggered: False>
C:\Users\cbuffett\AppData\Roaming\Python\Python38\site-packages\airflow\models\dag.py:3725 RemovedInAirflow3Warning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated
[2023-09-26T11:01:48.866-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-26T11:01:48.866-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-26 11:01:48,921] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26T11:01:48.921-0700] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26 11:01:48,926] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26T11:01:48.926-0700] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26 11:01:48,962] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:01:48.962-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:01:48.963-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-26T11:01:48.963-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-26T11:02:48.937-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-26T11:02:48.938-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-26 11:02:48,989] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26T11:02:48.989-0700] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26 11:02:48,995] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26T11:02:48.995-0700] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26 11:02:49,022] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:02:49.022-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:02:49.023-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-26T11:02:49.023-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-26T11:03:49.042-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-26T11:03:49.042-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-26 11:03:49,106] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26T11:03:49.106-0700] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26 11:03:49,112] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26T11:03:49.112-0700] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26 11:03:49,145] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:03:49.145-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:03:49.145-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-26T11:03:49.145-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-26T11:04:49.123-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-26T11:04:49.124-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-26 11:04:49,182] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-2' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26T11:04:49.182-0700] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-2' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26 11:04:49,188] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26T11:04:49.188-0700] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26 11:04:49,217] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:04:49.217-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:04:49.218-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-26T11:04:49.218-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-26T11:05:49.200-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-26T11:05:49.200-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-26 11:05:49,269] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-3' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26T11:05:49.269-0700] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-3' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26 11:05:49,274] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26T11:05:49.274-0700] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26 11:05:49,307] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:05:49.307-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:05:49.308-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-26T11:05:49.309-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-26T11:06:49.287-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-26T11:06:49.288-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-26 11:06:49,346] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-4' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26T11:06:49.346-0700] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-4' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26 11:06:49,353] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26T11:06:49.353-0700] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26 11:06:49,383] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:06:49.383-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:06:49.383-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-26T11:06:49.384-0700] {dag.py:3694} INFO - *****************************************************

!!! SHOULD HAVE TIMED OUT HERE !!!

[2023-09-26T11:07:49.362-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-26T11:07:49.362-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-26 11:07:49,412] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-5' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26T11:07:49.412-0700] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-5' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26 11:07:49,417] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26T11:07:49.417-0700] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26 11:07:49,451] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:07:49.451-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:07:49.452-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-26T11:07:49.452-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-26T11:08:49.428-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-26T11:08:49.429-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-26 11:08:49,481] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-6' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26T11:08:49.481-0700] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-6' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26 11:08:49,487] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26T11:08:49.487-0700] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26 11:08:49,517] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:08:49.517-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:08:49.518-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-26T11:08:49.518-0700] {dag.py:3694} INFO - *****************************************************
[2023-09-26T11:09:49.504-0700] {dag.py:3683} INFO - *****************************************************
[2023-09-26T11:09:49.505-0700] {dag.py:3687} INFO - Running task external_dag_sensor
[2023-09-26 11:09:49,566] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-7' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26T11:09:49.566-0700] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='test' AIRFLOW_CTX_DAG_ID='DAG-1' AIRFLOW_CTX_TASK_ID='external_dag_sensor' AIRFLOW_CTX_EXECUTION_DATE='2023-09-26T18:01:48.838825+00:00' AIRFLOW_CTX_TRY_NUMBER='-7' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-09-26T18:01:48.838825+00:00'
[2023-09-26 11:09:49,572] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26T11:09:49.572-0700] {external_task.py:247} INFO - Poking for DAG 'some_dag' on 2023-09-26T18:01:48.838825+00:00 ... 
[2023-09-26 11:09:49,599] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:09:49.599-0700] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-09-26T11:09:49.600-0700] {dag.py:3691} INFO - external_dag_sensor ran successfully!
[2023-09-26T11:09:49.600-0700] {dag.py:3694} INFO - *****************************************************

@utkarsharma2
Copy link
Contributor

@cbuffett Thanks for the response, now I'm able to reproduce this on the latest airflow main branch and 2.6.1 as well. I'll look into it and get back.

@utkarsharma2
Copy link
Contributor

@cbuffett The issue is fixed in the airflow 2.7.1 by PR - #33401. There was another issue in my script which led me to think the issue still persists. The below script should work without halting the debugging flow on airflow's 2.7.1.

import time
import datetime
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.models.taskinstance import State
from airflow.decorators import task
from airflow.operators.python_operator import PythonOperator


def return_date(dt):
    return dt


def print_context(**kwargs):
    """Print the Airflow context and ds variable from the context."""
    count = 1
    while True:
        if count > 5:
            break
        print(kwargs)
        time.sleep(20)
        import ipdb
        ipdb.set_trace()
        count = count + 1

default_args = {
    'owner': 'test',
    'start_date': datetime.datetime(2023, 1, 1)
}
dag = DAG(dag_id='DAG-1',
        default_args=default_args,
        schedule='@once',
        catchup=False
    )
with dag:
    external_task_sensor = ExternalTaskSensor(
        task_id='external_dag_sensor',
        poke_interval=60,
        timeout=300,
        soft_fail=True,
        retries=0,
        external_dag_id="some_dag",
        execution_date_fn=return_date,
        allowed_states=[State.SUCCESS],
        failed_states=[State.FAILED],
        check_existence=True,
        mode="reschedule"
    )

    t1 = PythonOperator(
        task_id='print',
        python_callable=print_context,
        op_kwargs={"x": "Apache Airflow"},
        dag=dag,
    )

if __name__ == "__main__" or __name__:
    dag.test()

@cbuffett
Copy link
Author

cbuffett commented Dec 13, 2023

Unfortunately I'm still seeing the infinite rescheduling issue after upgrading to Airflow 2.7.2. I believe the issue still revolves around the fact that the following code in base.py returns an empty list, even on subsequent rescheduled executions:

            task_reschedules = TaskReschedule.find_for_task_instance(
                context["ti"], try_number=first_try_number
            )  # <-- This call is returning an empty list
            if not task_reschedules:
                start_date = timezone.utcnow()
            else:
                start_date = task_reschedules[0].start_date

Because this list is empty, start_date is always utcnow(), so it never fails the timeout check further down:

            if run_duration() > self.timeout:
                # If sensor is in soft fail mode but times out raise AirflowSkipException.
                message = (
                    f"Sensor has timed out; run duration of {run_duration()} seconds exceeds "
                    f"the specified timeout of {self.timeout}."
                )

                if self.soft_fail:
                    raise AirflowSkipException(message)
                else:
                    raise AirflowSensorTimeout(message)
```

@cbuffett
Copy link
Author

Did some more testing and the infinite rescheduling doesn't happen if I set checkExistence=True, but I still believe the infinite rescheduling is a problem in this particular case for the reason mentioned above.

Copy link

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

Copy link

This issue has been closed because it has not received response from the issue author.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Jan 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants