Skip to content

Commit

Permalink
Fix TriggeredDagRunOperator triggered link (#40336)
Browse files Browse the repository at this point in the history
* Fix TriggeredDagRunOperator triggered link

Link to the specific dag run id that was triggered, as opposed to the
logical date, such that the instance that was triggered on the DAG grid
UI.

* Add comment regarding triggered dag xcom

(cherry picked from commit 806bb80)
  • Loading branch information
fredthomsen authored and utkarsharma2 committed Jul 2, 2024
1 parent 7614e59 commit f2caeb8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
12 changes: 8 additions & 4 deletions airflow/operators/trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ class TriggerDagRunLink(BaseOperatorLink):
name = "Triggered DAG"

def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
# Fetch the correct execution date for the triggerED dag which is
# Fetch the correct dag_run_id for the triggerED dag which is
# stored in xcom during execution of the triggerING task.
when = XCom.get_value(ti_key=ti_key, key=XCOM_LOGICAL_DATE_ISO)
query = {"dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id, "base_date": when}
triggered_dag_run_id = XCom.get_value(ti_key=ti_key, key=XCOM_RUN_ID)
query = {
"dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id,
"dag_run_id": triggered_dag_run_id,
}
return build_airflow_url_with_query(query)


Expand Down Expand Up @@ -199,8 +202,9 @@ def execute(self, context: Context):
raise e
if dag_run is None:
raise RuntimeError("The dag_run should be set here!")
# Store the execution date from the dag run (either created or found above) to
# Store the run id from the dag run (either created or found above) to
# be used when creating the extra link on the webserver.
# TODO: Logical date as xcom stored only for backwards compatibility. Remove in Airflow 3.0
ti = context["task_instance"]
ti.xcom_push(key=XCOM_LOGICAL_DATE_ISO, value=dag_run.logical_date.isoformat())
ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)
Expand Down
4 changes: 2 additions & 2 deletions tests/operators/test_trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def assert_extra_link(self, triggered_dag_run, triggering_task, session):
"""
Asserts whether the correct extra links url will be created.
Specifically it tests whether the correct dag id and date are passed to
Specifically it tests whether the correct dag id and run id are passed to
the method which constructs the final url.
Note: We can't run that method to generate the url itself because the Flask app context
isn't available within the test logic, so it is mocked here.
Expand All @@ -110,7 +110,7 @@ def assert_extra_link(self, triggered_dag_run, triggering_task, session):
args, _ = mock_build_url.call_args
expected_args = {
"dag_id": triggered_dag_run.dag_id,
"base_date": triggered_dag_run.logical_date.isoformat(),
"dag_run_id": triggered_dag_run.run_id,
}
assert expected_args in args

Expand Down

0 comments on commit f2caeb8

Please sign in to comment.