From 54abfaccc1e3b43851ae8626e60b1d9838608109 Mon Sep 17 00:00:00 2001 From: Josh Fell Date: Sun, 26 May 2024 14:03:33 -0400 Subject: [PATCH] Improve typing for allowed/failed_states in TriggerDagRunOperator This surfaces another explicit option when specifying `allowed_states` and/or `failed_states` -- using the DagRunState enum -- when DAG authors look at the Python API docs for this operator. Also added some clarifying details in the docstring for these parameters too. --- airflow/operators/trigger_dagrun.py | 35 ++++++++++++++++------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index b6c24f2180fbf..9bde29fe4d0e7 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -78,25 +78,30 @@ def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str: class TriggerDagRunOperator(BaseOperator): """ - Triggers a DAG run for a specified ``dag_id``. + Triggers a DAG run for a specified DAG ID. - :param trigger_dag_id: The dag_id to trigger (templated). + :param trigger_dag_id: The ``dag_id`` of the DAG to trigger (templated). :param trigger_run_id: The run ID to use for the triggered DAG run (templated). If not provided, a run ID will be automatically generated. :param conf: Configuration for the DAG run (templated). - :param logical_date: Logical date for the dag (templated). - :param reset_dag_run: Whether clear existing dag run if already exists. - This is useful when backfill or rerun an existing dag run. - This only resets (not recreates) the dag run. - Dag run conf is immutable and will not be reset on rerun of an existing dag run. + :param logical_date: Logical date for the triggered DAG (templated). + :param reset_dag_run: Whether clear existing DAG run if already exists. + This is useful when backfill or rerun an existing DAG run. + This only resets (not recreates) the DAG run. + DAG run conf is immutable and will not be reset on rerun of an existing DAG run. When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised. - When reset_dag_run=True and dag run exists, existing dag run will be cleared to rerun. - :param wait_for_completion: Whether or not wait for dag run completion. (default: False) - :param poke_interval: Poke interval to check dag run status when wait_for_completion=True. + When reset_dag_run=True and dag run exists, existing DAG run will be cleared to rerun. + :param wait_for_completion: Whether or not wait for DAG run completion. (default: False) + :param poke_interval: Poke interval to check DAG run status when wait_for_completion=True. (default: 60) - :param allowed_states: List of allowed states, default is ``['success']``. - :param failed_states: List of failed or dis-allowed states, default is ``None``. - :param skip_when_already_exists: Set to true to mark the task as SKIPPED if a dag_run already exists + :param allowed_states: Optional list of allowed DAG run states of the triggered DAG. This is useful when + setting ``wait_for_completion`` to True. Must be a valid DagRunState. + Default is ``[DagRunState.SUCCESS]``. + :param failed_states: Optional list of failed or disallowed DAG run states of the triggered DAG. This is + useful when setting ``wait_for_completion`` to True. Must be a valid DagRunState. + Default is ``[DagRunState.FAILED]``. + :param skip_when_already_exists: Set to true to mark the task as SKIPPED if a DAG run of the triggered + DAG for the same logical date already exists. :param deferrable: If waiting for completion, whether or not to defer the task until done, default is ``False``. :param execution_date: Deprecated parameter; same as ``logical_date``. @@ -124,8 +129,8 @@ def __init__( reset_dag_run: bool = False, wait_for_completion: bool = False, poke_interval: int = 60, - allowed_states: list[str] | None = None, - failed_states: list[str] | None = None, + allowed_states: list[str | DagRunState] | None = None, + failed_states: list[str | DagRunState] | None = None, skip_when_already_exists: bool = False, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), execution_date: str | datetime.datetime | None = None,