diff --git a/airflow/models/dag.py b/airflow/models/dag.py index e3bde16bcb5d51..8177025c2d8ed7 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -974,6 +974,7 @@ def get_task_instances( state=state or (), include_dependent_dags=False, exclude_task_ids=(), + exclude_run_ids=None, session=session, ) return session.scalars(cast(Select, query).order_by(DagRun.logical_date)).all() @@ -989,6 +990,7 @@ def _get_task_instances( state: TaskInstanceState | Sequence[TaskInstanceState], include_dependent_dags: bool, exclude_task_ids: Collection[str | tuple[str, int]] | None, + exclude_run_ids: frozenset[str] | None, session: Session, dag_bag: DagBag | None = ..., ) -> Iterable[TaskInstance]: ... # pragma: no cover @@ -1005,6 +1007,7 @@ def _get_task_instances( state: TaskInstanceState | Sequence[TaskInstanceState], include_dependent_dags: bool, exclude_task_ids: Collection[str | tuple[str, int]] | None, + exclude_run_ids: frozenset[str] | None, session: Session, dag_bag: DagBag | None = ..., recursion_depth: int = ..., @@ -1023,6 +1026,7 @@ def _get_task_instances( state: TaskInstanceState | Sequence[TaskInstanceState], include_dependent_dags: bool, exclude_task_ids: Collection[str | tuple[str, int]] | None, + exclude_run_ids: frozenset[str] | None, session: Session, dag_bag: DagBag | None = None, recursion_depth: int = 0, @@ -1080,6 +1084,9 @@ def _get_task_instances( else: tis = tis.where(TaskInstance.state.in_(state)) + if exclude_run_ids: + tis = tis.where(not_(TaskInstance.run_id.in_(exclude_run_ids))) + if include_dependent_dags: # Recursively find external tasks indicated by ExternalTaskMarker from airflow.sensors.external_task import ExternalTaskMarker @@ -1152,6 +1159,7 @@ def _get_task_instances( include_dependent_dags=include_dependent_dags, as_pk_tuple=True, exclude_task_ids=exclude_task_ids, + exclude_run_ids=exclude_run_ids, dag_bag=dag_bag, session=session, recursion_depth=recursion_depth + 1, @@ -1270,9 +1278,9 @@ def set_task_instance_state( # Exclude the task itself from being cleared. "exclude_task_ids": frozenset((task_id,)), } - if not future and not past: # Simple case 1: we're only dealing with exactly one run. clear_kwargs["run_id"] = run_id + subdag.clear(**clear_kwargs) elif future and past: # Simple case 2: we're clearing ALL runs. subdag.clear(**clear_kwargs) else: # Complex cases: we may have more than one run, based on a date range. @@ -1287,7 +1295,6 @@ def set_task_instance_state( clear_kwargs["end_date"] = logical_date exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id) subdag.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs) - return altered @provide_session @@ -1395,6 +1402,7 @@ def clear( session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), + exclude_run_ids: frozenset[str] | None = frozenset(), ) -> list[TaskInstance]: ... # pragma: no cover @overload @@ -1412,6 +1420,7 @@ def clear( session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), + exclude_run_ids: frozenset[str] | None = frozenset(), ) -> int: ... # pragma: no cover @provide_session @@ -1448,6 +1457,7 @@ def clear( :param dag_bag: The DagBag used to find the dags (Optional) :param exclude_task_ids: A set of ``task_id`` or (``task_id``, ``map_index``) tuples that should not be cleared + :param exclude_run_ids: A set of ``run_id`` or (``run_id``) """ state: list[TaskInstanceState] = [] if only_failed: @@ -1466,6 +1476,7 @@ def clear( session=session, dag_bag=dag_bag, exclude_task_ids=exclude_task_ids, + exclude_run_ids=exclude_run_ids, ) if dry_run: