Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferrable sensors can implement sensor timeout #33718

Merged
merged 10 commits into from
Dec 3, 2024

Conversation

dstandish
Copy link
Contributor

@dstandish dstandish commented Aug 25, 2023

The goal here is to allow behavioral parity w.r.t. sensor timeouts between deferrable and non-deferrable sensor operators.

With non-deferrable sensors, if there's a sensor timeout, the task fails without retry. But currently, with deferrable sensors, that does not happen.

Since there's already a "timeout" capability on triggers, we can use this for sensor timeout. Essentially all that was missing was the ability to distinguish between trigger timeouts and other trigger errors. With this capability, base sensor can distinguish between the two, and reraise deferral timeouts as sensor timeouts.

So, here we add a new exception type, TaskDeferralTimeout, which base sensor reraises as AirflowSensorTimeout. Then, to take advantage of this feature, a sensor need only ensure that its timeout is passed when deferring. For convenience, we update the task deferred exception signature to take int and float in addition to timedelta, since that's how timeout attr is defined on base sensor. But we do not change the exception attribute type.

In order to keep this PR focused, this PR only updates one sensor to use the timeout functionality, namely, time delta sensor. Other sensors will have to be done as followups.


Old description below ⬇️

Alternative to #32990

resolves #32638

This is a less invasive approach. Essentially, what we do here is, update BaseOperator.resume_execution so that when trigger times out then it raises special exception AirflowDeferralTimeout.

Then, BaseSensorOperator.resume_execution, we reraise AirflowDeferralTimeout as a AirflowSensorTimeout.

So, if a sensor resumes from a timed-out deferral, then it's interpreted as a sensor timeout.

All that is required is for a sensor to add a timeout to the deferral.

Example logs:

...
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1357} INFO - Resuming after deferral
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1380} INFO - Executing <Task(TimeDeltaSensorAsync): delta_sensor> on 2023-08-25 07:00:32.864128+00:00
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:57} INFO - Started process 50543 to run task
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'simple2', 'delta_sensor', 'manual__2023-08-25T07:00:32.864128+00:00', '--job-id', '129', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/async_timeout.py', '--cfg-path', '/var/folders/9c/tknx7xx10qx92983y1r5djb40000gn/T/tmp72slvjz_']
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:85} INFO - Job 129: Subtask delta_sensor
[2023-08-25, 07:00:54 UTC] {task_command.py:415} INFO - Running <TaskInstance: simple2.delta_sensor manual__2023-08-25T07:00:32.864128+00:00 [running]> on host daniels-mbp.lan
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1933} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/Users/dstandish/code/airflow/airflow/sensors/base.py", line 288, in resume_execution
    return super().resume_execution(next_method, next_kwargs, context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/dstandish/code/airflow/airflow/models/baseoperator.py", line 1605, in resume_execution
    raise AirflowDeferralTimeout(error)
airflow.exceptions.AirflowDeferralTimeout: Trigger timeout
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/Users/dstandish/code/airflow/airflow/sensors/base.py", line 290, in resume_execution
    raise AirflowSensorTimeout(*e.args) from e
airflow.exceptions.AirflowSensorTimeout: Trigger timeout
[2023-08-25, 07:00:54 UTC] {taskinstance.py:1398} INFO - Immediate failure requested. Marking task as FAILED. dag_id=simple2, task_id=delta_sensor, execution_date=20230825T070032, start_date=20230825T070034, end_date=20230825T070054
[2023-08-25, 07:00:54 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 129 for task delta_sensor (Trigger timeout; 50543)
[2023-08-25, 07:00:54 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2023-08-25, 07:00:54 UTC] {taskinstance.py:2774} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2023-08-25, 07:00:55 UTC] {triggerer_job_runner.py:611} ERROR - Trigger cancelled due to timeout
[2023-08-25, 07:00:55 UTC] {triggerer_job_runner.py:612} ERROR - Trigger cancelled; message=

@boring-cyborg boring-cyborg bot added area:core-operators Operators, Sensors and hooks within Core Airflow area:Scheduler including HA (high availability) scheduler labels Aug 25, 2023
@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Oct 14, 2023
@github-actions github-actions bot closed this Oct 19, 2023
@dstandish dstandish reopened this Nov 6, 2023
@robg-eb
Copy link

robg-eb commented Nov 6, 2023

@dstandish and team - I see this issue was auto-closed and now reopened. My team is struggling with the same issue, so we'd be happy to help/test in any way possible as needed - let us know !

@github-actions github-actions bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Nov 7, 2023
@dstandish dstandish force-pushed the test-defer-timeout branch 2 times, most recently from 3a5a800 to 2fe05c8 Compare November 22, 2023 13:58
@dstandish
Copy link
Contributor Author

@dstandish and team - I see this issue was auto-closed and now reopened. My team is struggling with the same issue, so we'd be happy to help/test in any way possible as needed - let us know !

Hi @robg-eb, thanks for your interest. Yes, your feedback / review / testing would be welcome. I just rebased it.

@robg-eb
Copy link

robg-eb commented Nov 22, 2023

@dstandish - To clarify, is this PR ready to test as-is?

@dstandish
Copy link
Contributor Author

@dstandish - To clarify, is this PR ready to test as-is?

Right.

In short what this does is, now when trigger times out we raise TaskDeferralTimeout instead of the generic TaskDeferralError. And in BaseSensorOperator, we reraise this as AirflowSensorTimeout which has special meaning (results in immediately fail and no more retries.
So if you inherit from BaseSensorOperator this should just work. And if you ant the trigger timeout, after multiple retries etc, to be calculated from the very first try, it's your responsibility to calculate that when deferring as shown in example by @hussein-awala here.

At least that's my understanding after dusting this off just now :)

Copy link

github-actions bot commented Jan 7, 2024

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jan 7, 2024
@dstandish dstandish force-pushed the test-defer-timeout branch 3 times, most recently from bcb4f06 to cfaf095 Compare November 25, 2024 17:58
@dstandish dstandish changed the title Deferrable sensors can timeout with no retries Deferrable sensors can implement sensor timeout Nov 25, 2024
@dstandish
Copy link
Contributor Author

dstandish commented Nov 25, 2024

@robg-eb looked at your comment

However, we found another inconsistency between sync and async sensors here. In the case where an interruption does happen and a retry is needed, it seems with this implementation, the timeout interval will restart after the retry.

I don't understand. What do you mean "an interruption does happen"?

@potiuk
Copy link
Member

potiuk commented Nov 30, 2024

Is this one ready to review and we think we can merge it in?

Also I think it relates with our discussions on making deferrable the "default" for Airflow 3 and it is part of the discussion "Do we actually pay enough attention for deferrable timeouts and faiilure scenarios to make it "first-class" replacement for other types of sensors - so maybe we should prioritize this one.

I saw a number of people commenting before, but I am not sure what the status is after so many back/forth and long conversations. .. So maybe we can somewhat restart this one and get more people who commented before and understand more in detail about the problems?

cc: @eladkal

Pinging those who were active here

Also @thesuperzapper -> re: #36090 (comment) which seems very much relevant to this one, as I think we need to agree on strategy of how to treat "exceptional" cases for deferrable operators in a consistent way.

I personally know too little in this area to make a meaningful (and correct) feedback, But I have a feeling this and #36090 would need to be addressed if we want to seriously continue discussion started here https://lists.apache.org/thread/3m7vjwcbvodnhrklo69s3j8s8pp7nm6o

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

But we must make sure the core part of this PR is released in 2.11 otherwise we will have compatibility problems with the provider part cc @ephraimbuddy @utkarsharma2

@eladkal eladkal added this to the Airflow 2.11.0 milestone Dec 3, 2024
The goal here is to ensure behavioral parity w.r.t. sensor timeouts between deferrable and non-deferrable sensor operators.

With non-deferrable sensors, if there's a sensor timeout, the task fails without retry.  But currently, with deferrable sensors, that does not happen.

Since there's already a "timeout" capability on triggers, we can use this for sensor timeout.  Essentially all that was missing was the ability to distinguish between trigger timeouts and other trigger errors.  With this capability, base sensor can distinguish between the two, and reraise deferral timeouts as sensor timeouts.

So, here we add a new exception type, TaskDeferralTimeout, which base sensor reraises as AirflowSensorTimeout. Then, to take advantage of this feature, a sensor need only ensure that its timeout is passed when deferring. For convenience, we update the task deferred exception signature to take int and float in addition to timedelta, since that's how `timeout` attr is defined on base sensor.  But we do not change the exception attribute type.

In order to keep this PR focused, this PR only updates one sensor to use the timeout functionality, namely, time delta sensor.  Other sensors will have to be done as followups.
@dstandish dstandish merged commit 8ca061d into apache:main Dec 3, 2024
67 checks passed
@dstandish dstandish deleted the test-defer-timeout branch December 3, 2024 22:39
LefterisXefteris pushed a commit to LefterisXefteris/airflow that referenced this pull request Jan 5, 2025
The goal here is to ensure behavioral parity w.r.t. sensor timeouts between deferrable and non-deferrable sensor operators.

With non-deferrable sensors, if there's a sensor timeout, the task fails without retry.  But currently, with deferrable sensors, that does not happen.

Since there's already a "timeout" capability on triggers, we can use this for sensor timeout.  Essentially all that was missing was the ability to distinguish between trigger timeouts and other trigger errors.  With this capability, base sensor can distinguish between the two, and reraise deferral timeouts as sensor timeouts.

So, here we add a new exception type, TaskDeferralTimeout, which base sensor reraises as AirflowSensorTimeout. Then, to take advantage of this feature, a sensor need only ensure that its timeout is passed when deferring. For convenience, we update the task deferred exception signature to take int and float in addition to timedelta, since that's how `timeout` attr is defined on base sensor.  But we do not change the exception attribute type.

In order to keep this PR focused, this PR only updates one sensor to use the timeout functionality, namely, time delta sensor.  Other sensors will have to be done as followups.
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
The goal here is to ensure behavioral parity w.r.t. sensor timeouts between deferrable and non-deferrable sensor operators.

With non-deferrable sensors, if there's a sensor timeout, the task fails without retry.  But currently, with deferrable sensors, that does not happen.

Since there's already a "timeout" capability on triggers, we can use this for sensor timeout.  Essentially all that was missing was the ability to distinguish between trigger timeouts and other trigger errors.  With this capability, base sensor can distinguish between the two, and reraise deferral timeouts as sensor timeouts.

So, here we add a new exception type, TaskDeferralTimeout, which base sensor reraises as AirflowSensorTimeout. Then, to take advantage of this feature, a sensor need only ensure that its timeout is passed when deferring. For convenience, we update the task deferred exception signature to take int and float in addition to timedelta, since that's how `timeout` attr is defined on base sensor.  But we do not change the exception attribute type.

In order to keep this PR focused, this PR only updates one sensor to use the timeout functionality, namely, time delta sensor.  Other sensors will have to be done as followups.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow area:Scheduler including HA (high availability) scheduler
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Find a solution to handle the execution timeout in Triggers
8 participants