Skip to content

Commit

Permalink
Respect "soft_fail" for core async sensors (#33403)
Browse files Browse the repository at this point in the history
* fix(sensors): ensure that DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync respect soft_fail

* refactor(sensors): move the soft_fail checking logic from DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger

* test(triggers/temporal): add test case for DateTimeSensorAsync respects soft_fail

* fix(triggers/temporal): use the original error message with skipping postfix as message for AirflowSkipException

* Revert "fix(triggers/temporal): use the original error message with skipping postfix as message for AirflowSkipException"

This reverts commit a6d803303bf71a84e9e59e94d9c088e3120bedb5.

* Revert "test(triggers/temporal): add test case for DateTimeSensorAsync respects soft_fail"

This reverts commit 50e39e08a415685ace788ae728397a199c21e82b.

* Revert "refactor(sensors): move the soft_fail checking logic from DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger"

This reverts commit 985981a269cea68da719d6fd1c60bedd9a7e5225.

* Revert "fix(sensors): ensure that DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync respect soft_fail"

This reverts commit b2f2662ae1a11ea928aad57acd2892c763c2db25.

* fix(sensors): move core async sensor trigger initialization to __init__ if possible
  • Loading branch information
Lee-W authored Aug 26, 2023
1 parent 6d182be commit 9ce76e3
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
6 changes: 5 additions & 1 deletion airflow/sensors/date_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,13 @@ class DateTimeSensorAsync(DateTimeSensor):
:param target_time: datetime after which the job succeeds. (templated)
"""

def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.trigger = DateTimeTrigger(moment=timezone.parse(self.target_time))

def execute(self, context: Context):
self.defer(
trigger=DateTimeTrigger(moment=timezone.parse(self.target_time)),
trigger=self.trigger,
method_name="execute_complete",
)

Expand Down
10 changes: 9 additions & 1 deletion airflow/sensors/time_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

from airflow.exceptions import AirflowSkipException
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone
Expand Down Expand Up @@ -64,7 +65,14 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
def execute(self, context: Context):
target_dttm = context["data_interval_end"]
target_dttm += self.delta
self.defer(trigger=DateTimeTrigger(moment=target_dttm), method_name="execute_complete")
try:
trigger = DateTimeTrigger(moment=target_dttm)
except (TypeError, ValueError) as e:
if self.soft_fail:
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
raise

self.defer(trigger=trigger, method_name="execute_complete")

def execute_complete(self, context, event=None):
"""Execute for when the trigger fires - return immediately."""
Expand Down
3 changes: 2 additions & 1 deletion airflow/sensors/time_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ def __init__(self, *, target_time, **kwargs):
)

self.target_datetime = timezone.convert_to_utc(aware_time)
self.trigger = DateTimeTrigger(moment=self.target_datetime)

def execute(self, context: Context):
self.defer(
trigger=DateTimeTrigger(moment=self.target_datetime),
trigger=self.trigger,
method_name="execute_complete",
)

Expand Down

0 comments on commit 9ce76e3

Please sign in to comment.