Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2-10-test] Re-queue tassk when they are stuck in queued (#43520) #44158

Merged
merged 2 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple

import pendulum
from deprecated import deprecated

from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
Expand Down Expand Up @@ -545,7 +546,12 @@ def terminate(self):
"""Get called when the daemon receives a SIGTERM."""
raise NotImplementedError()

def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover
@deprecated(
reason="Replaced by function `revoke_task`.",
category=RemovedInAirflow3Warning,
action="ignore",
)
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Handle remnants of tasks that were failed because they were stuck in queued.

Expand All @@ -556,7 +562,23 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # p
:param tis: List of Task Instances to clean up
:return: List of readable task instances for a warning message
"""
raise NotImplementedError()
raise NotImplementedError

def revoke_task(self, *, ti: TaskInstance):
"""
Attempt to remove task from executor.

It should attempt to ensure that the task is no longer running on the worker,
and ensure that it is cleared out from internal data structures.

It should *not* change the state of the task in airflow, or add any events
to the event buffer.

It should not raise any error.

:param ti: Task instance to remove
"""
raise NotImplementedError

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
"""
Expand Down
161 changes: 129 additions & 32 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import time
import warnings
from collections import Counter, defaultdict, deque
from contextlib import suppress
from dataclasses import dataclass
from datetime import timedelta
from functools import lru_cache, partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator

from deprecated import deprecated
from sqlalchemy import and_, delete, func, not_, or_, select, text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
Expand Down Expand Up @@ -97,6 +99,9 @@
DR = DagRun
DM = DagModel

TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule"
""":meta private:"""


@dataclass
class ConcurrencyMap:
Expand Down Expand Up @@ -228,6 +233,13 @@ def __init__(
stalled_task_timeout, task_adoption_timeout, worker_pods_pending_timeout, task_queued_timeout
)

# this param is intentionally undocumented
self._num_stuck_queued_retries = conf.getint(
section="scheduler",
key="num_stuck_in_queued_retries",
fallback=2,
)

self.do_pickle = do_pickle

if log:
Expand Down Expand Up @@ -1093,7 +1105,7 @@ def _run_scheduler_loop(self) -> None:

timers.call_regular_interval(
conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
self._fail_tasks_stuck_in_queued,
self._handle_tasks_stuck_in_queued,
)

timers.call_regular_interval(
Expand Down Expand Up @@ -1141,6 +1153,7 @@ def _run_scheduler_loop(self) -> None:
for executor in self.job.executors:
try:
# this is backcompat check if executor does not inherit from BaseExecutor
# todo: remove in airflow 3.0
if not hasattr(executor, "_task_event_logs"):
continue
with create_session() as session:
Expand Down Expand Up @@ -1772,48 +1785,132 @@ def _send_sla_callbacks_to_processor(self, dag: DAG) -> None:
self.job.executor.send_callback(request)

@provide_session
def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
"""
Mark tasks stuck in queued for longer than `task_queued_timeout` as failed.
Handle the scenario where a task is queued for longer than `task_queued_timeout`.

Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses
track of a task, a cluster can't further scale up its workers, etc.), but tasks
should not be stuck in queued for a long time. This will mark tasks stuck in
queued for longer than `self._task_queued_timeout` as failed. If the task has
available retries, it will be retried.
should not be stuck in queued for a long time.

We will attempt to requeue the task (by revoking it from executor and setting to
scheduled) up to 2 times before failing the task.
"""
self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method")
tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session)
for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items():
try:
for ti in stuck_tis:
executor.revoke_task(ti=ti)
self._maybe_requeue_stuck_ti(
ti=ti,
session=session,
)
except NotImplementedError:
# this block only gets entered if the executor has not implemented `revoke_task`.
# in which case, we try the fallback logic
# todo: remove the call to _stuck_in_queued_backcompat_logic in airflow 3.0.
# after 3.0, `cleanup_stuck_queued_tasks` will be removed, so we should
# just continue immediately.
self._stuck_in_queued_backcompat_logic(executor, stuck_tis)
continue

tasks_stuck_in_queued = session.scalars(
def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
"""Query db for TIs that are stuck in queued."""
return session.scalars(
select(TI).where(
TI.state == TaskInstanceState.QUEUED,
TI.queued_dttm < (timezone.utcnow() - timedelta(seconds=self._task_queued_timeout)),
TI.queued_by_job_id == self.job.id,
)
).all()
)

for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items():
try:
cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
for ti in stuck_tis:
if repr(ti) in cleaned_up_task_instances:
self.log.warning(
"Marking task instance %s stuck in queued as failed. "
"If the task instance has available retries, it will be retried.",
ti,
)
session.add(
Log(
event="stuck in queued",
task_instance=ti.key,
extra=(
"Task will be marked as failed. If the task instance has "
"available retries, it will be retried."
),
)
)
except NotImplementedError:
self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.")
def _maybe_requeue_stuck_ti(self, *, ti, session):
"""
Requeue task if it has not been attempted too many times.

Otherwise, fail it.
"""
num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
if num_times_stuck < self._num_stuck_queued_retries:
self.log.info("Task stuck in queued; will try to requeue. task_id=%s", ti.task_id)
session.add(
Log(
event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
task_instance=ti.key,
extra=(
f"Task was in queued state for longer than {self._task_queued_timeout} "
"seconds; task state will be set back to scheduled."
),
)
)
self._reschedule_stuck_task(ti)
else:
self.log.info(
"Task requeue attempts exceeded max; marking failed. task_instance=%s",
ti,
)
session.add(
Log(
event="stuck in queued tries exceeded",
task_instance=ti.key,
extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.",
)
)
ti.set_state(TaskInstanceState.FAILED, session=session)

@deprecated(
reason="This is backcompat layer for older executor interface. Should be removed in 3.0",
category=RemovedInAirflow3Warning,
action="ignore",
)
def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis):
"""
Try to invoke stuck in queued cleanup for older executor interface.

TODO: remove in airflow 3.0

Here we handle case where the executor pre-dates the interface change that
introduced `cleanup_tasks_stuck_in_queued` and deprecated `cleanup_stuck_queued_tasks`.

"""
with suppress(NotImplementedError):
for ti_repr in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
self.log.warning(
"Task instance %s stuck in queued. Will be set to failed.",
ti_repr,
)

@provide_session
def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
session.execute(
update(TI)
.where(TI.filter_for_tis([ti]))
.values(
state=TaskInstanceState.SCHEDULED,
queued_dttm=None,
)
.execution_options(synchronize_session=False)
)

@provide_session
def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int:
"""
Check the Log table to see how many times a taskinstance has been stuck in queued.

We can then use this information to determine whether to reschedule a task or fail it.
"""
return (
session.query(Log)
.where(
Log.task_id == ti.task_id,
Log.dag_id == ti.dag_id,
Log.run_id == ti.run_id,
Log.map_index == ti.map_index,
Log.try_number == ti.try_number,
Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
)
.count()
)

@provide_session
def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
Expand Down Expand Up @@ -2102,7 +2199,7 @@ def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> None:
updated_count = sum(self._set_orphaned(dataset) for dataset in orphaned_dataset_query)
Stats.gauge("dataset.orphaned", updated_count)

def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]:
def _executor_to_tis(self, tis: Iterable[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]:
"""Organize TIs into lists per their respective executor."""
_executor_to_tis: defaultdict[BaseExecutor, list[TaskInstance]] = defaultdict(list)
for ti in tis:
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,7 @@ repos
repr
req
reqs
requeued
Reserialize
reserialize
reserialized
Expand Down
Loading
Loading