-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Respect "soft_fail" for core async sensors #33403
Respect "soft_fail" for core async sensors #33403
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's wait #33424 before deciding if we need to merge this one.
Hi, I just checked #33424. We might still need this one, as the exceptions are raised in the |
56b367e
to
8c5185b
Compare
0fbb52b
to
a6d8033
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I just checked #33424. We might still need this one, as the exceptions are raised in the execute method before we enter deferring. (but the solution in #33424 can still save us time handling these kinds of stuff in execute_complete which is still great!)
I agree that #33424 is not completed yet, but I don't think that this PR could fix the issue. The concept of soft fail is if the sensor fails, we raise the specific skip exception which is processed by the job runner to change the state to skipped instead of failed.
Could you add a test for the operator? (the added test is for the trigger and not the task)
airflow/triggers/temporal.py
Outdated
raise ValueError("You cannot pass naive datetimes") | ||
message = "You cannot pass naive datetimes" | ||
if soft_fail: | ||
raise AirflowSkipException(f"{message}. {skipping_message_postfix}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How could raising an AirflowSkipException
in the trigger pass the task state to skipped? The trigger exception is not propagated to task resuming if I am not mistaken. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take DateTimeSensorAsync
as an example. The second line will raise the AirflowSkipException
before we execute self.defer
self.defer(
trigger=DateTimeTrigger(moment=target_dttm, soft_fail=self.soft_fail),
method_name="execute_complete",
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about:
try:
self.defer(
trigger=DateTimeTrigger(moment=target_dttm, soft_fail=self.soft_fail),
method_name="execute_complete",
)
except Exception as e:
raise AirflowException(str(e))
or just changing raise TypeError(message)
and raise ValueError
to AirflowException
.
In both cases the exception will be cached by BaseSensor, and a AirflowSkipException
will be raised when soft_fail is True. I'm trying to avoid complicating the trigger code, and duplicating the soft fail logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or what do you think about my first commit? b2f2662 I was trying not to duplicate the check logic and thus moved it to the trigger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for me it's much better
Since it's a problem in the initialization of the trigger, and it is not a runtime issue, what about raising an exception in parsing time, to give the user a fast feedback about the wrong parameters, for example: diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py
index 1425028870..a036d09e01 100644
--- a/airflow/sensors/date_time.py
+++ b/airflow/sensors/date_time.py
@@ -69,6 +69,7 @@ class DateTimeSensor(BaseSensorOperator):
raise TypeError(
f"Expected str or datetime.datetime type for target_time. Got {type(target_time)}"
)
+ self.trigger = DateTimeTrigger(moment=timezone.parse(self.target_time))
def poke(self, context: Context) -> bool:
self.log.info("Checking if the time (%s) has come", self.target_time)
@@ -87,7 +88,7 @@ class DateTimeSensorAsync(DateTimeSensor):
def execute(self, context: Context):
self.defer(
- trigger=DateTimeTrigger(moment=timezone.parse(self.target_time)),
+ trigger=self.trigger,
method_name="execute_complete",
) This way the dag parsing will fail, and the user will fix the issue. WDYT? |
This might not be doable for https://github.com/apache/airflow/blob/a6d803303bf71a84e9e59e94d9c088e3120bedb5/airflow/sensors/time_delta.py#L65-L66 |
500fff1
to
ce3eb1b
Compare
…TimeSensorAsync respect soft_fail
…sorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger
…postfix as message for AirflowSkipException
…kipping postfix as message for AirflowSkipException" This reverts commit a6d803303bf71a84e9e59e94d9c088e3120bedb5.
…c respects soft_fail" This reverts commit 50e39e08a415685ace788ae728397a199c21e82b.
…eTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger" This reverts commit 985981a269cea68da719d6fd1c60bedd9a7e5225.
…rAsync, TimeSensorAsync respect soft_fail" This reverts commit b2f2662ae1a11ea928aad57acd2892c763c2db25.
ce3eb1b
to
eac45ee
Compare
hi @hussein-awala, could you please check whether it works for you when you have time? Thanks! |
in apache#33403, we move trigger initialization to __init__ which causes a failure for one uses template variable
* fix(sensors): ensure that DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync respect soft_fail * refactor(sensors): move the soft_fail checking logic from DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger * test(triggers/temporal): add test case for DateTimeSensorAsync respects soft_fail * fix(triggers/temporal): use the original error message with skipping postfix as message for AirflowSkipException * Revert "fix(triggers/temporal): use the original error message with skipping postfix as message for AirflowSkipException" This reverts commit a6d803303bf71a84e9e59e94d9c088e3120bedb5. * Revert "test(triggers/temporal): add test case for DateTimeSensorAsync respects soft_fail" This reverts commit 50e39e08a415685ace788ae728397a199c21e82b. * Revert "refactor(sensors): move the soft_fail checking logic from DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger" This reverts commit 985981a269cea68da719d6fd1c60bedd9a7e5225. * Revert "fix(sensors): ensure that DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync respect soft_fail" This reverts commit b2f2662ae1a11ea928aad57acd2892c763c2db25. * fix(sensors): move core async sensor trigger initialization to __init__ if possible (cherry picked from commit 9ce76e3)
…#33926) in apache/airflow#33403, we move trigger initialization to __init__ which causes a failure for one uses template variable (cherry picked from commit eaa6126e45ab9869c11c59363e52490aa6dff91f) GitOrigin-RevId: d0886260d7de08f5605ee1e0a6846082620a4753
…#33926) in apache/airflow#33403, we move trigger initialization to __init__ which causes a failure for one uses template variable GitOrigin-RevId: eaa6126e45ab9869c11c59363e52490aa6dff91f
…#33926) in apache/airflow#33403, we move trigger initialization to __init__ which causes a failure for one uses template variable GitOrigin-RevId: eaa6126e45ab9869c11c59363e52490aa6dff91f
…#33926) in apache/airflow#33403, we move trigger initialization to __init__ which causes a failure for one uses template variable GitOrigin-RevId: eaa6126e45ab9869c11c59363e52490aa6dff91f
This is the following PR to #33196 and intends to solve the issue that the soft_fail argument might not work on all sensors.
This pull request addresses the issue of the core async sensors potentially raising non-AirflowSkipException even when
soft_fail
is set to True. It achieves this by modifying theDateTimeTrigger
to include thesoft_fail
argument.Not related to this PR: I noticed that we are currently using separate sensors for synchronous and asynchronous operations in our core sensors. Do you think it would be beneficial to merge them into a single sensor, like what the providers do?
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.