Skip to content

Commit

Permalink
Revert skipping test to using a conditional code depending on airflow…
Browse files Browse the repository at this point in the history
… version
  • Loading branch information
hardeybisey committed Nov 30, 2024
1 parent 4af8172 commit 3c2ee36
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions providers/tests/standard/operators/test_latest_only_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@

def get_task_instances(task_id):
session = settings.Session()
logical_date = DagRun.logical_date if AIRFLOW_V_3_0_PLUS else DagRun.execution_date
return (
session.query(TaskInstance)
.join(TaskInstance.dag_run)
.filter(TaskInstance.task_id == task_id)
.order_by(DagRun.logical_date)
.order_by(logical_date)
.all()
)

Expand Down Expand Up @@ -82,9 +83,6 @@ def test_run(self, dag_maker):
dag_maker.create_dagrun()
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

@pytest.mark.skipif(
not AIRFLOW_V_3_0_PLUS, reason="execution_date is renamed to logical_date in Airflow 3.0"
)
def test_skipping_non_latest(self, dag_maker):
with dag_maker(
default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, schedule=INTERVAL, serialized=True
Expand Down Expand Up @@ -133,40 +131,49 @@ def test_skipping_non_latest(self, dag_maker):
downstream_task3.run(start_date=DEFAULT_DATE, end_date=END_DATE)

latest_instances = get_task_instances("latest")
exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances}
else:
exec_date_to_latest_state = {ti.execution_date: ti.state for ti in latest_instances}
assert exec_date_to_latest_state == {
timezone.datetime(2016, 1, 1): "success",
timezone.datetime(2016, 1, 1, 12): "success",
timezone.datetime(2016, 1, 2): "success",
}

downstream_instances = get_task_instances("downstream")
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
else:
exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances}
assert exec_date_to_downstream_state == {
timezone.datetime(2016, 1, 1): "skipped",
timezone.datetime(2016, 1, 1, 12): "skipped",
timezone.datetime(2016, 1, 2): "success",
}

downstream_instances = get_task_instances("downstream_2")
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
else:
exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances}
assert exec_date_to_downstream_state == {
timezone.datetime(2016, 1, 1): None,
timezone.datetime(2016, 1, 1, 12): None,
timezone.datetime(2016, 1, 2): "success",
}

downstream_instances = get_task_instances("downstream_3")
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
else:
exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances}
assert exec_date_to_downstream_state == {
timezone.datetime(2016, 1, 1): "success",
timezone.datetime(2016, 1, 1, 12): "success",
timezone.datetime(2016, 1, 2): "success",
}

@pytest.mark.skipif(
not AIRFLOW_V_3_0_PLUS, reason="execution_date is renamed to logical_date in Airflow 3.0"
)
def test_not_skipping_external(self, dag_maker):
with dag_maker(
default_args={"owner": "airflow", "start_date": DEFAULT_DATE}, schedule=INTERVAL, serialized=True
Expand Down Expand Up @@ -216,23 +223,32 @@ def test_not_skipping_external(self, dag_maker):
downstream_task2.run(start_date=DEFAULT_DATE, end_date=END_DATE)

latest_instances = get_task_instances("latest")
exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_latest_state = {ti.logical_date: ti.state for ti in latest_instances}
else:
exec_date_to_latest_state = {ti.execution_date: ti.state for ti in latest_instances}
assert exec_date_to_latest_state == {
timezone.datetime(2016, 1, 1): "success",
timezone.datetime(2016, 1, 1, 12): "success",
timezone.datetime(2016, 1, 2): "success",
}

downstream_instances = get_task_instances("downstream")
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
else:
exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances}
assert exec_date_to_downstream_state == {
timezone.datetime(2016, 1, 1): "success",
timezone.datetime(2016, 1, 1, 12): "success",
timezone.datetime(2016, 1, 2): "success",
}

downstream_instances = get_task_instances("downstream_2")
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
if AIRFLOW_V_3_0_PLUS:
exec_date_to_downstream_state = {ti.logical_date: ti.state for ti in downstream_instances}
else:
exec_date_to_downstream_state = {ti.execution_date: ti.state for ti in downstream_instances}
assert exec_date_to_downstream_state == {
timezone.datetime(2016, 1, 1): "success",
timezone.datetime(2016, 1, 1, 12): "success",
Expand Down

0 comments on commit 3c2ee36

Please sign in to comment.