Skip to content

Commit

Permalink
Fix missing clear and add logic for exclude_run_ids
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Nov 20, 2024
1 parent ef7bbce commit 9f3adfc
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ def get_task_instances(
state=state or (),
include_dependent_dags=False,
exclude_task_ids=(),
exclude_run_ids=None,
session=session,
)
return session.scalars(cast(Select, query).order_by(DagRun.logical_date)).all()
Expand All @@ -989,6 +990,7 @@ def _get_task_instances(
state: TaskInstanceState | Sequence[TaskInstanceState],
include_dependent_dags: bool,
exclude_task_ids: Collection[str | tuple[str, int]] | None,
exclude_run_ids: frozenset[str] | None,
session: Session,
dag_bag: DagBag | None = ...,
) -> Iterable[TaskInstance]: ... # pragma: no cover
Expand All @@ -1005,6 +1007,7 @@ def _get_task_instances(
state: TaskInstanceState | Sequence[TaskInstanceState],
include_dependent_dags: bool,
exclude_task_ids: Collection[str | tuple[str, int]] | None,
exclude_run_ids: frozenset[str] | None,
session: Session,
dag_bag: DagBag | None = ...,
recursion_depth: int = ...,
Expand All @@ -1023,6 +1026,7 @@ def _get_task_instances(
state: TaskInstanceState | Sequence[TaskInstanceState],
include_dependent_dags: bool,
exclude_task_ids: Collection[str | tuple[str, int]] | None,
exclude_run_ids: frozenset[str] | None,
session: Session,
dag_bag: DagBag | None = None,
recursion_depth: int = 0,
Expand Down Expand Up @@ -1080,6 +1084,9 @@ def _get_task_instances(
else:
tis = tis.where(TaskInstance.state.in_(state))

if exclude_run_ids:
tis = tis.where(not_(TaskInstance.run_id.in_(exclude_run_ids)))

if include_dependent_dags:
# Recursively find external tasks indicated by ExternalTaskMarker
from airflow.sensors.external_task import ExternalTaskMarker
Expand Down Expand Up @@ -1152,6 +1159,7 @@ def _get_task_instances(
include_dependent_dags=include_dependent_dags,
as_pk_tuple=True,
exclude_task_ids=exclude_task_ids,
exclude_run_ids=exclude_run_ids,
dag_bag=dag_bag,
session=session,
recursion_depth=recursion_depth + 1,
Expand Down Expand Up @@ -1270,9 +1278,9 @@ def set_task_instance_state(
# Exclude the task itself from being cleared.
"exclude_task_ids": frozenset((task_id,)),
}

if not future and not past: # Simple case 1: we're only dealing with exactly one run.
clear_kwargs["run_id"] = run_id
subdag.clear(**clear_kwargs)
elif future and past: # Simple case 2: we're clearing ALL runs.
subdag.clear(**clear_kwargs)
else: # Complex cases: we may have more than one run, based on a date range.
Expand All @@ -1287,7 +1295,6 @@ def set_task_instance_state(
clear_kwargs["end_date"] = logical_date
exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id)
subdag.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs)

return altered

@provide_session
Expand Down Expand Up @@ -1395,6 +1402,7 @@ def clear(
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(),
exclude_run_ids: frozenset[str] | None = frozenset(),
) -> list[TaskInstance]: ... # pragma: no cover

@overload
Expand All @@ -1412,6 +1420,7 @@ def clear(
session: Session = NEW_SESSION,
dag_bag: DagBag | None = None,
exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(),
exclude_run_ids: frozenset[str] | None = frozenset(),
) -> int: ... # pragma: no cover

@provide_session
Expand Down Expand Up @@ -1448,6 +1457,7 @@ def clear(
:param dag_bag: The DagBag used to find the dags (Optional)
:param exclude_task_ids: A set of ``task_id`` or (``task_id``, ``map_index``)
tuples that should not be cleared
:param exclude_run_ids: A set of ``run_id`` or (``run_id``)
"""
state: list[TaskInstanceState] = []
if only_failed:
Expand All @@ -1466,6 +1476,7 @@ def clear(
session=session,
dag_bag=dag_bag,
exclude_task_ids=exclude_task_ids,
exclude_run_ids=exclude_run_ids,
)

if dry_run:
Expand Down

0 comments on commit 9f3adfc

Please sign in to comment.