Skip to content

Commit

Permalink
fix(sensors): move core async sensor trigger initialization to __init…
Browse files Browse the repository at this point in the history
…__ if possible
  • Loading branch information
Lee-W committed Aug 25, 2023
1 parent 78fc490 commit eac45ee
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
6 changes: 5 additions & 1 deletion airflow/sensors/date_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,13 @@ class DateTimeSensorAsync(DateTimeSensor):
:param target_time: datetime after which the job succeeds. (templated)
"""

def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.trigger = DateTimeTrigger(moment=timezone.parse(self.target_time))

def execute(self, context: Context):
self.defer(
trigger=DateTimeTrigger(moment=timezone.parse(self.target_time)),
trigger=self.trigger,
method_name="execute_complete",
)

Expand Down
10 changes: 9 additions & 1 deletion airflow/sensors/time_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations

from airflow.exceptions import AirflowSkipException
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone
Expand Down Expand Up @@ -64,7 +65,14 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
def execute(self, context: Context):
target_dttm = context["data_interval_end"]
target_dttm += self.delta
self.defer(trigger=DateTimeTrigger(moment=target_dttm), method_name="execute_complete")
try:
trigger = DateTimeTrigger(moment=target_dttm)
except (TypeError, ValueError) as e:
if self.soft_fail:
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
raise

self.defer(trigger=trigger, method_name="execute_complete")

def execute_complete(self, context, event=None):
"""Execute for when the trigger fires - return immediately."""
Expand Down
3 changes: 2 additions & 1 deletion airflow/sensors/time_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ def __init__(self, *, target_time, **kwargs):
)

self.target_datetime = timezone.convert_to_utc(aware_time)
self.trigger = DateTimeTrigger(moment=self.target_datetime)

def execute(self, context: Context):
self.defer(
trigger=DateTimeTrigger(moment=self.target_datetime),
trigger=self.trigger,
method_name="execute_complete",
)

Expand Down

0 comments on commit eac45ee

Please sign in to comment.