From 2805b1a8c64abc7a2dd7c54a0fa9352710667697 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 15 Aug 2023 16:39:41 +0800 Subject: [PATCH 1/9] fix(sensors): ensure that DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync respect soft_fail --- airflow/sensors/date_time.py | 10 +++++++++- airflow/sensors/time_delta.py | 10 +++++++++- airflow/sensors/time_sensor.py | 10 +++++++++- 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py index 1425028870664..632ef3b885df0 100644 --- a/airflow/sensors/date_time.py +++ b/airflow/sensors/date_time.py @@ -20,6 +20,7 @@ import datetime from typing import Sequence +from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -86,8 +87,15 @@ class DateTimeSensorAsync(DateTimeSensor): """ def execute(self, context: Context): + try: + datetime_trigger = DateTimeTrigger(moment=timezone.parse(self.target_time)) + except Exception as e: + if self.soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e + raise e + self.defer( - trigger=DateTimeTrigger(moment=timezone.parse(self.target_time)), + trigger=datetime_trigger, method_name="execute_complete", ) diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py index 1571334757afa..35fd3ecd50b40 100644 --- a/airflow/sensors/time_delta.py +++ b/airflow/sensors/time_delta.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -64,7 +65,14 @@ class TimeDeltaSensorAsync(TimeDeltaSensor): def execute(self, context: Context): target_dttm = context["data_interval_end"] target_dttm += self.delta - self.defer(trigger=DateTimeTrigger(moment=target_dttm), method_name="execute_complete") + try: + datetime_trigger = DateTimeTrigger(moment=target_dttm) + except Exception as e: + if self.soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e + raise e + + self.defer(trigger=datetime_trigger, method_name="execute_complete") def execute_complete(self, context, event=None): """Execute for when the trigger fires - return immediately.""" diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py index 7f6809851a71b..d3a87151cd092 100644 --- a/airflow/sensors/time_sensor.py +++ b/airflow/sensors/time_sensor.py @@ -19,6 +19,7 @@ import datetime +from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -70,8 +71,15 @@ def __init__(self, *, target_time, **kwargs): self.target_datetime = timezone.convert_to_utc(aware_time) def execute(self, context: Context): + try: + datetime_trigger = DateTimeTrigger(moment=self.target_datetime) + except Exception as e: + if self.soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e + raise e + self.defer( - trigger=DateTimeTrigger(moment=self.target_datetime), + trigger=datetime_trigger, method_name="execute_complete", ) From 85a8c44683ec3da50bfe77e9ad69870cdea67ff8 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 15 Aug 2023 16:44:23 +0800 Subject: [PATCH 2/9] refactor(sensors): move the soft_fail checking logic from DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger --- airflow/sensors/date_time.py | 10 +--------- airflow/sensors/time_delta.py | 13 ++++--------- airflow/sensors/time_sensor.py | 10 +--------- airflow/triggers/temporal.py | 20 ++++++++++++++++---- 4 files changed, 22 insertions(+), 31 deletions(-) diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py index 632ef3b885df0..c5ca0c5bbedde 100644 --- a/airflow/sensors/date_time.py +++ b/airflow/sensors/date_time.py @@ -20,7 +20,6 @@ import datetime from typing import Sequence -from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -87,15 +86,8 @@ class DateTimeSensorAsync(DateTimeSensor): """ def execute(self, context: Context): - try: - datetime_trigger = DateTimeTrigger(moment=timezone.parse(self.target_time)) - except Exception as e: - if self.soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e - raise e - self.defer( - trigger=datetime_trigger, + trigger=DateTimeTrigger(moment=timezone.parse(self.target_time), soft_fail=self.soft_fail), method_name="execute_complete", ) diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py index 35fd3ecd50b40..825950a7dba33 100644 --- a/airflow/sensors/time_delta.py +++ b/airflow/sensors/time_delta.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -65,14 +64,10 @@ class TimeDeltaSensorAsync(TimeDeltaSensor): def execute(self, context: Context): target_dttm = context["data_interval_end"] target_dttm += self.delta - try: - datetime_trigger = DateTimeTrigger(moment=target_dttm) - except Exception as e: - if self.soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e - raise e - - self.defer(trigger=datetime_trigger, method_name="execute_complete") + self.defer( + trigger=DateTimeTrigger(moment=target_dttm, soft_fail=self.soft_fail), + method_name="execute_complete", + ) def execute_complete(self, context, event=None): """Execute for when the trigger fires - return immediately.""" diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py index d3a87151cd092..b2b1ffb347efa 100644 --- a/airflow/sensors/time_sensor.py +++ b/airflow/sensors/time_sensor.py @@ -19,7 +19,6 @@ import datetime -from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -71,15 +70,8 @@ def __init__(self, *, target_time, **kwargs): self.target_datetime = timezone.convert_to_utc(aware_time) def execute(self, context: Context): - try: - datetime_trigger = DateTimeTrigger(moment=self.target_datetime) - except Exception as e: - if self.soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e - raise e - self.defer( - trigger=datetime_trigger, + trigger=DateTimeTrigger(moment=self.target_datetime, soft_fail=self.soft_fail), method_name="execute_complete", ) diff --git a/airflow/triggers/temporal.py b/airflow/triggers/temporal.py index e23e19f389e7b..746d600d4f0a6 100644 --- a/airflow/triggers/temporal.py +++ b/airflow/triggers/temporal.py @@ -20,6 +20,7 @@ import datetime from typing import Any +from airflow.exceptions import AirflowSkipException from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.utils import timezone @@ -34,18 +35,29 @@ class DateTimeTrigger(BaseTrigger): The provided datetime MUST be in UTC. """ - def __init__(self, moment: datetime.datetime): + def __init__(self, moment: datetime.datetime, soft_fail: bool = False): super().__init__() if not isinstance(moment, datetime.datetime): - raise TypeError(f"Expected datetime.datetime type for moment. Got {type(moment)}") + exc = TypeError(f"Expected datetime.datetime type for moment. Got {type(moment)}") + if soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from exc + raise exc # Make sure it's in UTC elif moment.tzinfo is None: - raise ValueError("You cannot pass naive datetimes") + exc = ValueError("You cannot pass naive datetimes") + if soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from exc + raise exc else: self.moment = timezone.convert_to_utc(moment) + self.soft_fail = soft_fail + def serialize(self) -> tuple[str, dict[str, Any]]: - return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment}) + return ( + "airflow.triggers.temporal.DateTimeTrigger", + {"moment": self.moment, "soft_fail": self.soft_fail}, + ) async def run(self): """ From 6d429fbd48e0b4dbbd4804af3afc65be7d4a1a0e Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Tue, 15 Aug 2023 17:11:41 +0800 Subject: [PATCH 3/9] test(triggers/temporal): add test case for DateTimeSensorAsync respects soft_fail --- tests/triggers/test_temporal.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py index 655910394fb8c..3157634c61655 100644 --- a/tests/triggers/test_temporal.py +++ b/tests/triggers/test_temporal.py @@ -22,6 +22,7 @@ import pendulum import pytest +from airflow.exceptions import AirflowSkipException from airflow.triggers.base import TriggerEvent from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.utils import timezone @@ -35,6 +36,14 @@ def test_input_validation(): DateTimeTrigger("2012-01-01T03:03:03+00:00") +def test_input_validation_with_soft_fail(): + """ + Tests that DateTimeTrigger raises AirflowSkipException when soft_fail is set to True + """ + with pytest.raises(AirflowSkipException, match="Skipping due to soft_fail is set to True"): + DateTimeTrigger("2012-01-01T03:03:03+00:00", soft_fail=True) + + def test_datetime_trigger_serialization(): """ Tests that the DateTimeTrigger correctly serializes its arguments @@ -44,7 +53,7 @@ def test_datetime_trigger_serialization(): trigger = DateTimeTrigger(moment) classpath, kwargs = trigger.serialize() assert classpath == "airflow.triggers.temporal.DateTimeTrigger" - assert kwargs == {"moment": moment} + assert kwargs == {"moment": moment, "soft_fail": False} def test_timedelta_trigger_serialization(): From abb7b07f876bad5d2998544ecd7257177b7ca095 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 16 Aug 2023 11:03:19 +0800 Subject: [PATCH 4/9] fix(triggers/temporal): use the original error message with skipping postfix as message for AirflowSkipException --- airflow/triggers/temporal.py | 15 ++++++++------- tests/triggers/test_temporal.py | 8 +++++++- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/airflow/triggers/temporal.py b/airflow/triggers/temporal.py index 746d600d4f0a6..e8f5508334c1c 100644 --- a/airflow/triggers/temporal.py +++ b/airflow/triggers/temporal.py @@ -35,19 +35,20 @@ class DateTimeTrigger(BaseTrigger): The provided datetime MUST be in UTC. """ - def __init__(self, moment: datetime.datetime, soft_fail: bool = False): + def __init__(self, moment: datetime.datetime, *, soft_fail: bool = False) -> None: super().__init__() + skipping_message_postfix = "Skipping due to soft_fail is set to True." if not isinstance(moment, datetime.datetime): - exc = TypeError(f"Expected datetime.datetime type for moment. Got {type(moment)}") + message = f"Expected datetime.datetime type for moment. Got {type(moment)}" if soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from exc - raise exc + raise AirflowSkipException(f"{message}. {skipping_message_postfix}") + raise TypeError(message) # Make sure it's in UTC elif moment.tzinfo is None: - exc = ValueError("You cannot pass naive datetimes") + message = "You cannot pass naive datetimes" if soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from exc - raise exc + raise AirflowSkipException(f"{message}. {skipping_message_postfix}") + raise ValueError(message) else: self.moment = timezone.convert_to_utc(moment) diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py index 3157634c61655..674328cb76579 100644 --- a/tests/triggers/test_temporal.py +++ b/tests/triggers/test_temporal.py @@ -40,7 +40,13 @@ def test_input_validation_with_soft_fail(): """ Tests that DateTimeTrigger raises AirflowSkipException when soft_fail is set to True """ - with pytest.raises(AirflowSkipException, match="Skipping due to soft_fail is set to True"): + with pytest.raises( + AirflowSkipException, + match=( + "Expected datetime.datetime type for moment. Got . " + "Skipping due to soft_fail is set to True" + ), + ): DateTimeTrigger("2012-01-01T03:03:03+00:00", soft_fail=True) From a19924a875cafe7ce32eb80ed1ac23ded83e2bcc Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 17 Aug 2023 10:43:55 +0800 Subject: [PATCH 5/9] Revert "fix(triggers/temporal): use the original error message with skipping postfix as message for AirflowSkipException" This reverts commit a6d803303bf71a84e9e59e94d9c088e3120bedb5. --- airflow/triggers/temporal.py | 15 +++++++-------- tests/triggers/test_temporal.py | 8 +------- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/airflow/triggers/temporal.py b/airflow/triggers/temporal.py index e8f5508334c1c..746d600d4f0a6 100644 --- a/airflow/triggers/temporal.py +++ b/airflow/triggers/temporal.py @@ -35,20 +35,19 @@ class DateTimeTrigger(BaseTrigger): The provided datetime MUST be in UTC. """ - def __init__(self, moment: datetime.datetime, *, soft_fail: bool = False) -> None: + def __init__(self, moment: datetime.datetime, soft_fail: bool = False): super().__init__() - skipping_message_postfix = "Skipping due to soft_fail is set to True." if not isinstance(moment, datetime.datetime): - message = f"Expected datetime.datetime type for moment. Got {type(moment)}" + exc = TypeError(f"Expected datetime.datetime type for moment. Got {type(moment)}") if soft_fail: - raise AirflowSkipException(f"{message}. {skipping_message_postfix}") - raise TypeError(message) + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from exc + raise exc # Make sure it's in UTC elif moment.tzinfo is None: - message = "You cannot pass naive datetimes" + exc = ValueError("You cannot pass naive datetimes") if soft_fail: - raise AirflowSkipException(f"{message}. {skipping_message_postfix}") - raise ValueError(message) + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from exc + raise exc else: self.moment = timezone.convert_to_utc(moment) diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py index 674328cb76579..3157634c61655 100644 --- a/tests/triggers/test_temporal.py +++ b/tests/triggers/test_temporal.py @@ -40,13 +40,7 @@ def test_input_validation_with_soft_fail(): """ Tests that DateTimeTrigger raises AirflowSkipException when soft_fail is set to True """ - with pytest.raises( - AirflowSkipException, - match=( - "Expected datetime.datetime type for moment. Got . " - "Skipping due to soft_fail is set to True" - ), - ): + with pytest.raises(AirflowSkipException, match="Skipping due to soft_fail is set to True"): DateTimeTrigger("2012-01-01T03:03:03+00:00", soft_fail=True) From 7305dfdc7607d6301fd885570f750f33a33b2763 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 17 Aug 2023 10:44:06 +0800 Subject: [PATCH 6/9] Revert "test(triggers/temporal): add test case for DateTimeSensorAsync respects soft_fail" This reverts commit 50e39e08a415685ace788ae728397a199c21e82b. --- tests/triggers/test_temporal.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py index 3157634c61655..655910394fb8c 100644 --- a/tests/triggers/test_temporal.py +++ b/tests/triggers/test_temporal.py @@ -22,7 +22,6 @@ import pendulum import pytest -from airflow.exceptions import AirflowSkipException from airflow.triggers.base import TriggerEvent from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.utils import timezone @@ -36,14 +35,6 @@ def test_input_validation(): DateTimeTrigger("2012-01-01T03:03:03+00:00") -def test_input_validation_with_soft_fail(): - """ - Tests that DateTimeTrigger raises AirflowSkipException when soft_fail is set to True - """ - with pytest.raises(AirflowSkipException, match="Skipping due to soft_fail is set to True"): - DateTimeTrigger("2012-01-01T03:03:03+00:00", soft_fail=True) - - def test_datetime_trigger_serialization(): """ Tests that the DateTimeTrigger correctly serializes its arguments @@ -53,7 +44,7 @@ def test_datetime_trigger_serialization(): trigger = DateTimeTrigger(moment) classpath, kwargs = trigger.serialize() assert classpath == "airflow.triggers.temporal.DateTimeTrigger" - assert kwargs == {"moment": moment, "soft_fail": False} + assert kwargs == {"moment": moment} def test_timedelta_trigger_serialization(): From 00ce9ea8d52a964d5eaffe1063f968285eee03e2 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 17 Aug 2023 10:44:14 +0800 Subject: [PATCH 7/9] Revert "refactor(sensors): move the soft_fail checking logic from DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger" This reverts commit 985981a269cea68da719d6fd1c60bedd9a7e5225. --- airflow/sensors/date_time.py | 10 +++++++++- airflow/sensors/time_delta.py | 13 +++++++++---- airflow/sensors/time_sensor.py | 10 +++++++++- airflow/triggers/temporal.py | 20 ++++---------------- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py index c5ca0c5bbedde..632ef3b885df0 100644 --- a/airflow/sensors/date_time.py +++ b/airflow/sensors/date_time.py @@ -20,6 +20,7 @@ import datetime from typing import Sequence +from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -86,8 +87,15 @@ class DateTimeSensorAsync(DateTimeSensor): """ def execute(self, context: Context): + try: + datetime_trigger = DateTimeTrigger(moment=timezone.parse(self.target_time)) + except Exception as e: + if self.soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e + raise e + self.defer( - trigger=DateTimeTrigger(moment=timezone.parse(self.target_time), soft_fail=self.soft_fail), + trigger=datetime_trigger, method_name="execute_complete", ) diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py index 825950a7dba33..35fd3ecd50b40 100644 --- a/airflow/sensors/time_delta.py +++ b/airflow/sensors/time_delta.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -64,10 +65,14 @@ class TimeDeltaSensorAsync(TimeDeltaSensor): def execute(self, context: Context): target_dttm = context["data_interval_end"] target_dttm += self.delta - self.defer( - trigger=DateTimeTrigger(moment=target_dttm, soft_fail=self.soft_fail), - method_name="execute_complete", - ) + try: + datetime_trigger = DateTimeTrigger(moment=target_dttm) + except Exception as e: + if self.soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e + raise e + + self.defer(trigger=datetime_trigger, method_name="execute_complete") def execute_complete(self, context, event=None): """Execute for when the trigger fires - return immediately.""" diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py index b2b1ffb347efa..d3a87151cd092 100644 --- a/airflow/sensors/time_sensor.py +++ b/airflow/sensors/time_sensor.py @@ -19,6 +19,7 @@ import datetime +from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -70,8 +71,15 @@ def __init__(self, *, target_time, **kwargs): self.target_datetime = timezone.convert_to_utc(aware_time) def execute(self, context: Context): + try: + datetime_trigger = DateTimeTrigger(moment=self.target_datetime) + except Exception as e: + if self.soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e + raise e + self.defer( - trigger=DateTimeTrigger(moment=self.target_datetime, soft_fail=self.soft_fail), + trigger=datetime_trigger, method_name="execute_complete", ) diff --git a/airflow/triggers/temporal.py b/airflow/triggers/temporal.py index 746d600d4f0a6..e23e19f389e7b 100644 --- a/airflow/triggers/temporal.py +++ b/airflow/triggers/temporal.py @@ -20,7 +20,6 @@ import datetime from typing import Any -from airflow.exceptions import AirflowSkipException from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.utils import timezone @@ -35,29 +34,18 @@ class DateTimeTrigger(BaseTrigger): The provided datetime MUST be in UTC. """ - def __init__(self, moment: datetime.datetime, soft_fail: bool = False): + def __init__(self, moment: datetime.datetime): super().__init__() if not isinstance(moment, datetime.datetime): - exc = TypeError(f"Expected datetime.datetime type for moment. Got {type(moment)}") - if soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from exc - raise exc + raise TypeError(f"Expected datetime.datetime type for moment. Got {type(moment)}") # Make sure it's in UTC elif moment.tzinfo is None: - exc = ValueError("You cannot pass naive datetimes") - if soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from exc - raise exc + raise ValueError("You cannot pass naive datetimes") else: self.moment = timezone.convert_to_utc(moment) - self.soft_fail = soft_fail - def serialize(self) -> tuple[str, dict[str, Any]]: - return ( - "airflow.triggers.temporal.DateTimeTrigger", - {"moment": self.moment, "soft_fail": self.soft_fail}, - ) + return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment}) async def run(self): """ From 78fc4902c949cef33c81568f16c7bc6f9682e6f4 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 17 Aug 2023 10:49:51 +0800 Subject: [PATCH 8/9] Revert "fix(sensors): ensure that DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync respect soft_fail" This reverts commit b2f2662ae1a11ea928aad57acd2892c763c2db25. --- airflow/sensors/date_time.py | 10 +--------- airflow/sensors/time_delta.py | 10 +--------- airflow/sensors/time_sensor.py | 10 +--------- 3 files changed, 3 insertions(+), 27 deletions(-) diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py index 632ef3b885df0..1425028870664 100644 --- a/airflow/sensors/date_time.py +++ b/airflow/sensors/date_time.py @@ -20,7 +20,6 @@ import datetime from typing import Sequence -from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -87,15 +86,8 @@ class DateTimeSensorAsync(DateTimeSensor): """ def execute(self, context: Context): - try: - datetime_trigger = DateTimeTrigger(moment=timezone.parse(self.target_time)) - except Exception as e: - if self.soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e - raise e - self.defer( - trigger=datetime_trigger, + trigger=DateTimeTrigger(moment=timezone.parse(self.target_time)), method_name="execute_complete", ) diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py index 35fd3ecd50b40..1571334757afa 100644 --- a/airflow/sensors/time_delta.py +++ b/airflow/sensors/time_delta.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -65,14 +64,7 @@ class TimeDeltaSensorAsync(TimeDeltaSensor): def execute(self, context: Context): target_dttm = context["data_interval_end"] target_dttm += self.delta - try: - datetime_trigger = DateTimeTrigger(moment=target_dttm) - except Exception as e: - if self.soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e - raise e - - self.defer(trigger=datetime_trigger, method_name="execute_complete") + self.defer(trigger=DateTimeTrigger(moment=target_dttm), method_name="execute_complete") def execute_complete(self, context, event=None): """Execute for when the trigger fires - return immediately.""" diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py index d3a87151cd092..7f6809851a71b 100644 --- a/airflow/sensors/time_sensor.py +++ b/airflow/sensors/time_sensor.py @@ -19,7 +19,6 @@ import datetime -from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -71,15 +70,8 @@ def __init__(self, *, target_time, **kwargs): self.target_datetime = timezone.convert_to_utc(aware_time) def execute(self, context: Context): - try: - datetime_trigger = DateTimeTrigger(moment=self.target_datetime) - except Exception as e: - if self.soft_fail: - raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e - raise e - self.defer( - trigger=datetime_trigger, + trigger=DateTimeTrigger(moment=self.target_datetime), method_name="execute_complete", ) From eac45ee8f826872729bf8fefc37a935f13d6a133 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Wed, 23 Aug 2023 18:45:08 +0800 Subject: [PATCH 9/9] fix(sensors): move core async sensor trigger initialization to __init__ if possible --- airflow/sensors/date_time.py | 6 +++++- airflow/sensors/time_delta.py | 10 +++++++++- airflow/sensors/time_sensor.py | 3 ++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py index 1425028870664..2ac17ca1b6242 100644 --- a/airflow/sensors/date_time.py +++ b/airflow/sensors/date_time.py @@ -85,9 +85,13 @@ class DateTimeSensorAsync(DateTimeSensor): :param target_time: datetime after which the job succeeds. (templated) """ + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self.trigger = DateTimeTrigger(moment=timezone.parse(self.target_time)) + def execute(self, context: Context): self.defer( - trigger=DateTimeTrigger(moment=timezone.parse(self.target_time)), + trigger=self.trigger, method_name="execute_complete", ) diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py index 1571334757afa..dfedcd706f805 100644 --- a/airflow/sensors/time_delta.py +++ b/airflow/sensors/time_delta.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -64,7 +65,14 @@ class TimeDeltaSensorAsync(TimeDeltaSensor): def execute(self, context: Context): target_dttm = context["data_interval_end"] target_dttm += self.delta - self.defer(trigger=DateTimeTrigger(moment=target_dttm), method_name="execute_complete") + try: + trigger = DateTimeTrigger(moment=target_dttm) + except (TypeError, ValueError) as e: + if self.soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e + raise + + self.defer(trigger=trigger, method_name="execute_complete") def execute_complete(self, context, event=None): """Execute for when the trigger fires - return immediately.""" diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py index 7f6809851a71b..c4590030902ea 100644 --- a/airflow/sensors/time_sensor.py +++ b/airflow/sensors/time_sensor.py @@ -68,10 +68,11 @@ def __init__(self, *, target_time, **kwargs): ) self.target_datetime = timezone.convert_to_utc(aware_time) + self.trigger = DateTimeTrigger(moment=self.target_datetime) def execute(self, context: Context): self.defer( - trigger=DateTimeTrigger(moment=self.target_datetime), + trigger=self.trigger, method_name="execute_complete", )