-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CronTriggerTimetable lost one task occasionally #27399
Comments
Mistakenly converted to discussion. Seems legit. This is likely only happening in 1 minute interval, so not really critical but woudl be nice to find out when it happens. - thanks @andi for detailed repot. Could you please check @andi if it appears in the latest version of Airflow and maybe pinpoint some other cicumstances when it happens? How often? Do you see some usage spikes (memory/cpu etc. around that time? do you see any unsual task restarts etc. - we need to try to get some really reproducible case in order to be able to track tha one - otherwise it is a pure quess. Can you try to see much more information @andi ? |
Hmm, this and #27582 seems to be the same issue? |
I think the other issue is "first" schedule - and this one is "missing one in the middle" - it might be the same but it might be different. |
Hey all, we had an issue in our configuration where our DAGs were reloading by the order of milliseconds (looking at last_parsed_time in the database) and we observed this issue happening a lot. The only DAG that failed to run in this circumstance were the ones that used CronTriggerTimetable. (We were using a test that should run every minute). I believe that if you set the DAG reload time very, very low, you will see this problem occur much more frequently. When we fixed the reload issue in the configuration, we have noticed this problem much less frequently. |
Steps to reproduce this (with other bug that reloads DAGs in order of milliseconds):
|
Can confirm when trying to schedule a DAG exactly one with
In the minute before 18:03, the DAG was processed 22 times The first processing at 18:03, the
If I could slow down the processing interval, that might help? |
@johnwarburton Have you had any luck with this? Seeing the same issue in 2.4.2 after switching from a plain cron expression (i.e. Some more context:
|
Hi @RNHTTR - no, I am waiting to see if there is a fix in the milestones
|
Apologies -- I thought that using a cron expression such as |
I was chatting with Jarek earlier today on Slack about a similar issue I saw over the weekend and he pointed me to this open one.
In my scenario I had deployed the DAG late on Friday and manually triggered it since the next run wasn't scheduled until Saturday. It then successfully scheduled on Saturday but not on Sunday or today (Monday). I am going through logs and will see if it runs on Tuesday. |
One commonality is all timezones are non-utc. Not sure if that's indicative of something, but I think it's worth noting. |
We have observed the same issue with a UTC timezone, so I don't think it is the issue, e.g.
Are there any tips on how to deal with this reliably, or should we just expect |
Also observed in #28293 |
@potiuk do you have any advice on how to proceed with managing this bug? |
No. Not until it gets investigated and fixed. |
For what it's worth, prior to 2.4 I wrote my own timetable that acts almost exactly like In my case the problem was that there was a race condition between the DagProcessor and the Scheduler when calling Also worth noting that my bug only happened with |
cc: @uranusjr ? WDYT this looks plausibke I guess ^^ |
My gut feeling is also that the root cause is somewhere in DagProcessor and/or Scheduler. Anyway this problem doesn't happen in the Maybe we need to implement the similar functionality in |
Hard to say. |
May it be somehow related to the fact that To test this hypothesis I set up small (10 minutes) date intervals for my daily jobs. Will report how it goes when I have some results. |
I don’t think the interval is relevant; the scheduler never looks at it, but only pass it directly into a run’s context. |
This is the logic to calculate the next run for start_time_candidates = [self._align_to_next(DateTime.utcnow())]
if last_automated_data_interval is not None:
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
if restriction.earliest is not None:
start_time_candidates.append(self._align_to_next(restriction.earliest))
next_start_time = max(start_time_candidates) The two Also worth noting that |
FYI: even with the non-zero date interval I still faced the issue. So, this is not the root cause of this problem. Overall it looks like the only real workaround for now is to not use But there is a bigger problem with this bug in my opinion. Since we are not 100% sure what is the root cause, all other timetables may be affected by this bug too. And since the main purpose of Airflow is to reliably schedule jobs, I think this is a major problem: with this bug we can't say that Airflow guarantees that user's jobs will be scheduled as requested. They can be randomly skipped. And even if only |
Contributions are always welcomed. |
I spent some time to take a closer look at the implementation. The problem with
is that CronDataIntervalTimetable actually does not do that! The reason it seems to be more resillient is that catchup in that time table relies on the data interval start, not the trigger time. So say
For CronDataIntervalTimetable, since the last run covered 2–3am, the next run should cover 3–4am, which can be achieved without catchup (the further next covers 4–5am and is not due yet). Everything is fine. But for CronTriggerTimetable, the last run covered 3am, but the 4am should be skipped since that’s already in the past. I think a reasonable logic would be to change the I can prepare a PR if that sounds reasonable (or anyone can, it’s just one line in |
Forgot to mentioned the one-line change. This line airflow/airflow/timetables/trigger.py Line 92 in 4a44343
should be changed to use |
Sounds good. It's natural to those who are used to Cron and Anacron. I think we need to document the new behavior. The new behavior might be unnatural to those who are not familiar with Anacron. Say
|
Yeah. Or maybe we can addd a flag for this? Not sure what it should be called though. |
I've been testing the following change to fix this issue seems to be working. Its similar to what has been suggested: def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
if restriction.catchup:
if last_automated_data_interval is None:
if restriction.earliest is None:
return None
next_start_time = self._align_to_next(restriction.earliest)
else:
next_start_time = self._get_next(last_automated_data_interval.end)
else:
current_time = DateTime.utcnow()
if restriction.earliest is not None and current_time < restriction.earliest:
next_start_time = self._align_to_next(restriction.earliest)
else:
if last_automated_data_interval is None:
# The DAG has never run before.
next_start_time = self._align_to_next(current_time)
elif last_automated_data_interval.end == self._align_to_prev(current_time):
# The last run is the same as the previously scheduled run. Everything is working fine.
next_start_time = self._align_to_next(current_time)
else:
# The last run is NOT the same as the previously scheduled run, we must have skipped it.
next_start_time = self._align_to_prev(current_time)
LOGGER.warn('DAG skipped previous schedule, rescheduling. last_automated_data_interval.end:"%s" next_start_time:"%s"', last_automated_data_interval.end, next_start_time)
if restriction.latest is not None and restriction.latest < next_start_time:
return None
return DagRunInfo.exact(next_start_time) Specifically this addition: if last_automated_data_interval is None:
# The DAG has never run before.
next_start_time = self._align_to_next(current_time)
elif last_automated_data_interval.end == self._align_to_prev(current_time):
# The last run is the same as the previously scheduled run. Everything is working fine.
next_start_time = self._align_to_next(current_time)
else:
# The last run is NOT the same as the previously scheduled run, we must have skipped it.
next_start_time = self._align_to_prev(current_time)
LOGGER.warn('DAG skipped previous schedule, rescheduling. last_automated_data_interval.end:"%s" next_start_time:"%s"', last_automated_data_interval.end, next_start_time) I added the logging so I could monitor it and alert if it happens, just in case. Not sure if this could be done better though, I'm open to suggestions. |
elif last_automated_data_interval.end == self._align_to_prev(current_time):
# The last run is the same as the previously scheduled run. Everything is working fine.
next_start_time = self._align_to_next(current_time) This part has a bug when The |
Thanks @uranusjr for the feedback! I'll look into it, good points. |
The bug occurs due to the fact that seconds and microseconds are not reset here: The bug fix is only one line: |
@BohdanSemonov I tried to applied your fix and do the following test.
The dag start to run, maybe I misunderstand something but I thought the entire point of CronTriggerTimetable is to prevent this? |
@snippins |
Hey @BohdanSemonov , if the fix works, could you please submit a PR? I think this issue is quite critical, and I also observe such behavior from CronTriggerTimetable |
I have catchup=False already. |
Sorry, maybe we didn't understand each other, but I observe such behavior when |
Done. |
I'm closing this as it appears to have been fixed in 2.7.1 by #33404 but there was a typo in the description that allowed this Issue to stay open. |
Discussed in #27398
Originally posted by AndrewTsao October 25, 2022
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
My Airflow version 2.4.0. I use CronTriggerTimetable in my DAG. I find it lost one task occasionally. I inspected schedule log,
It should be run at 2022-10-24T23:10:00.000+0800. BUT not run.
What you think should happen instead
I think it lost schedule infomation when reload dag file and reschedule dag.
How to reproduce
ag_dir_list_interval = 10
.In scheduler log, We can find on that point, scheduler is reloading dag file.
Operating System
Centos 7.9
Versions of Apache Airflow Providers
Deployment
Virtualenv installation
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: