Skip to content

Commit

Permalink
Fix Dataset and DatasetAlias scheduling in Airflow 2.10 (#1240)
Browse files Browse the repository at this point in the history
When tasks inside task groups were emitting events, they were different
during task initialization and task execution, resulting in different
DatasetAlias representations and scheduling not working as expected.

During task execution, the task ID is modified also to contain the
task_group.

We could not catch this before due to inconsistent behaviour between
running Airflow using `dags.test()`, `airflow standalone` and
`astro-cli`.
  • Loading branch information
tatiana authored Oct 4, 2024
1 parent afa635d commit d58c2f5
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 10 deletions.
4 changes: 2 additions & 2 deletions cosmos/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def get_dataset_alias_name(dag: DAG | None, task_group: TaskGroup | None, task_i
dag_id = task_group.dag_id
if task_group.group_id is not None:
task_group_id = task_group.group_id
task_group_id = task_group_id.replace(".", "__")
task_group_id = task_group_id.split(".")[-1]
elif dag:
dag_id = dag.dag_id

Expand All @@ -28,6 +28,6 @@ def get_dataset_alias_name(dag: DAG | None, task_group: TaskGroup | None, task_i
if task_group_id:
identifiers_list.append(task_group_id)

identifiers_list.append(task_id)
identifiers_list.append(task_id.split(".")[-1])

return "__".join(identifiers_list)
65 changes: 65 additions & 0 deletions docs/configuration/scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,71 @@ Then, you can use Airflow's data-aware scheduling capabilities to schedule ``my_
In this scenario, ``project_one`` runs once a day and ``project_two`` runs immediately after ``project_one``. You can view these dependencies in Airflow's UI.


Examples
.................

This example DAG:

..
The following renders in Sphinx but not Github:
.. literalinclude:: ./../dev/dags/basic_cosmos_dag.py

Check warning on line 76 in docs/configuration/scheduling.rst

View workflow job for this annotation

GitHub Actions / pages

Include file '/home/runner/work/astronomer-cosmos/astronomer-cosmos/docs/dev/dags/basic_cosmos_dag.py' not found or reading it failed [docutils]
:language: python
:start-after: [START local_example]
:end-before: [END local_example]


Will trigger the following DAG to be run (when using Cosmos 1.1 when using Airflow 2.4 or newer):

.. code-block:: python
from datetime import datetime
from airflow import DAG
from airflow.datasets import Dataset
from airflow.operators.empty import EmptyOperator
with DAG(
"dataset_triggered_dag",
description="A DAG that should be triggered via Dataset",
start_date=datetime(2024, 9, 1),
schedule=[Dataset(uri="postgres://0.0.0.0:5434/postgres.public.orders")],
) as dag:
t1 = EmptyOperator(
task_id="task_1",
)
t2 = EmptyOperator(
task_id="task_2",
)
t1 >> t2
From Cosmos 1.7 and Airflow 2.10, it is also possible to trigger DAGs be to be run by using ``DatasetAliases``:

.. code-block:: python
from datetime import datetime
from airflow import DAG
from airflow.datasets import DatasetAlias
from airflow.operators.empty import EmptyOperator
with DAG(
"datasetalias_triggered_dag",
description="A DAG that should be triggered via Dataset alias",
start_date=datetime(2024, 9, 1),
schedule=[DatasetAlias(name="basic_cosmos_dag__orders__run")],
) as dag:
t3 = EmptyOperator(
task_id="task_3",
)
t3
Known Limitations
.................

Expand Down
25 changes: 18 additions & 7 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@


@pytest.mark.parametrize(
"dag, task_group, result_identifier",
"dag, task_group,task_id,result_identifier",
[
(example_dag, None, "dag__task_id"),
(None, TaskGroup(dag=example_dag, group_id="inner_tg"), "dag__inner_tg__task_id"),
(example_dag, None, "task_id", "dag__task_id"),
(None, TaskGroup(dag=example_dag, group_id="inner_tg"), "task_id", "dag__inner_tg__task_id"),
(
None,
TaskGroup(
dag=example_dag, group_id="child_tg", parent_group=TaskGroup(dag=example_dag, group_id="parent_tg")
),
"dag__parent_tg__child_tg__task_id",
"task_id",
"dag__child_tg__task_id",
),
(
None,
Expand All @@ -31,9 +32,19 @@
dag=example_dag, group_id="mum_tg", parent_group=TaskGroup(dag=example_dag, group_id="nana_tg")
),
),
"dag__nana_tg__mum_tg__child_tg__task_id",
"task_id",
"dag__child_tg__task_id",
),
(
None,
TaskGroup(
dag=example_dag,
group_id="another_tg",
),
"another_tg.task_id", # Airflow injects this during task execution time when outside of standalone
"dag__another_tg__task_id",
),
],
)
def test_get_dataset_alias_name(dag, task_group, result_identifier):
assert get_dataset_alias_name(dag, task_group, "task_id") == result_identifier
def test_get_dataset_alias_name(dag, task_group, task_id, result_identifier):
assert get_dataset_alias_name(dag, task_group, task_id) == result_identifier
22 changes: 21 additions & 1 deletion tests/test_example_dags.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import warnings
from pathlib import Path

try:
Expand All @@ -10,6 +11,7 @@

import airflow
import pytest
import sqlalchemy
from airflow.models.dagbag import DagBag
from airflow.utils.db import create_default_connections
from airflow.utils.session import provide_session
Expand Down Expand Up @@ -106,6 +108,24 @@ def test_example_dag(session, dag_id: str):
# This feature is available since Airflow 2.5 and we've backported it in Cosmos:
# https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02
if AIRFLOW_VERSION >= Version("2.5"):
dag.test()
if AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2")):
dag.test()
else:
# This is a work around until we fix the issue in Airflow:
# https://github.com/apache/airflow/issues/42495
"""
FAILED tests/test_example_dags.py::test_example_dag[example_model_version] - sqlalchemy.exc.PendingRollbackError:
This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback().
Original exception was: Can't flush None value found in collection DatasetModel.aliases (Background on this error at: https://sqlalche.me/e/14/7s2a)
FAILED tests/test_example_dags.py::test_example_dag[basic_cosmos_dag]
FAILED tests/test_example_dags.py::test_example_dag[cosmos_profile_mapping]
FAILED tests/test_example_dags.py::test_example_dag[user_defined_profile]
"""
try:
dag.test()
except sqlalchemy.exc.PendingRollbackError:
warnings.warn(
"Early versions of Airflow 2.10 have issues when running the test command with DatasetAlias / Datasets"
)
else:
test_utils.run_dag(dag)

0 comments on commit d58c2f5

Please sign in to comment.