From d1204adc9fca1917536f5f8916cebd4568cae832 Mon Sep 17 00:00:00 2001 From: Fred Thomsen Date: Wed, 19 Jun 2024 16:33:09 -0500 Subject: [PATCH 1/2] Fix TriggeredDagRunOperator triggered link Link to the specific dag run id that was triggered, as opposed to the logical date, such that the instance that was triggered on the DAG grid UI. --- airflow/operators/trigger_dagrun.py | 11 +++++++---- tests/operators/test_trigger_dagrun.py | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index be0e99db1f58a..cf3c65995fb50 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -69,10 +69,13 @@ class TriggerDagRunLink(BaseOperatorLink): name = "Triggered DAG" def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str: - # Fetch the correct execution date for the triggerED dag which is + # Fetch the correct dag_run_id for the triggerED dag which is # stored in xcom during execution of the triggerING task. - when = XCom.get_value(ti_key=ti_key, key=XCOM_LOGICAL_DATE_ISO) - query = {"dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id, "base_date": when} + triggered_dag_run_id = XCom.get_value(ti_key=ti_key, key=XCOM_RUN_ID) + query = { + "dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id, + "dag_run_id": triggered_dag_run_id, + } return build_airflow_url_with_query(query) @@ -218,7 +221,7 @@ def execute(self, context: Context): raise e if dag_run is None: raise RuntimeError("The dag_run should be set here!") - # Store the execution date from the dag run (either created or found above) to + # Store the run id from the dag run (either created or found above) to # be used when creating the extra link on the webserver. ti = context["task_instance"] ti.xcom_push(key=XCOM_LOGICAL_DATE_ISO, value=dag_run.logical_date.isoformat()) diff --git a/tests/operators/test_trigger_dagrun.py b/tests/operators/test_trigger_dagrun.py index 7bee42243b50d..f369b705826cc 100644 --- a/tests/operators/test_trigger_dagrun.py +++ b/tests/operators/test_trigger_dagrun.py @@ -92,7 +92,7 @@ def assert_extra_link(self, triggered_dag_run, triggering_task, session): """ Asserts whether the correct extra links url will be created. - Specifically it tests whether the correct dag id and date are passed to + Specifically it tests whether the correct dag id and run id are passed to the method which constructs the final url. Note: We can't run that method to generate the url itself because the Flask app context isn't available within the test logic, so it is mocked here. @@ -111,7 +111,7 @@ def assert_extra_link(self, triggered_dag_run, triggering_task, session): args, _ = mock_build_url.call_args expected_args = { "dag_id": triggered_dag_run.dag_id, - "base_date": triggered_dag_run.logical_date.isoformat(), + "dag_run_id": triggered_dag_run.run_id, } assert expected_args in args From 9f949b0dbcb5f53edb60efc5da3295ae5649657a Mon Sep 17 00:00:00 2001 From: Fred Thomsen Date: Thu, 20 Jun 2024 22:08:59 -0500 Subject: [PATCH 2/2] Add comment regarding triggered dag xcom --- airflow/operators/trigger_dagrun.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py index cf3c65995fb50..19cc0562cc32a 100644 --- a/airflow/operators/trigger_dagrun.py +++ b/airflow/operators/trigger_dagrun.py @@ -223,6 +223,7 @@ def execute(self, context: Context): raise RuntimeError("The dag_run should be set here!") # Store the run id from the dag run (either created or found above) to # be used when creating the extra link on the webserver. + # TODO: Logical date as xcom stored only for backwards compatibility. Remove in Airflow 3.0 ti = context["task_instance"] ti.xcom_push(key=XCOM_LOGICAL_DATE_ISO, value=dag_run.logical_date.isoformat()) ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)