Skip to content

Commit

Permalink
AIP-84 Improve testing (apache#44381)
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrejeambrun authored Nov 26, 2024
1 parent 6b4aff8 commit b3f57c5
Showing 1 changed file with 16 additions and 47 deletions.
63 changes: 16 additions & 47 deletions tests/api_fastapi/core_api/routes/public/test_task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@

import pendulum
import pytest
import sqlalchemy
from sqlalchemy import select

from airflow.jobs.job import Job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.models import DagRun, TaskInstance
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.models.dagbag import DagBag
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.models.taskinstancehistory import TaskInstanceHistory
Expand Down Expand Up @@ -1852,23 +1850,10 @@ def test_clear_taskinstance_is_called_with_queued_dr_state(self, mock_clearti, t
)
assert response.status_code == 200

# We check args individually instead of direct matching using
# assert_called_once_with(), because the session objects don't match
# and can't be skipped using mock.ANY.
mock_clearti.assert_called_once()
args, kwargs = mock_clearti.call_args
assert len(args) == 4
assert len(kwargs) == 0
# 1st argument
assert args[0] == []
# 2nd argument
assert args[1] is not None
assert isinstance(args[1], sqlalchemy.orm.session.Session)
# 3rd argument
assert args[2].dag_id, dag_id
assert isinstance(args[2], DAG)
# 4th argument
assert args[3] == DagRunState.QUEUED
# dag_id (3rd argument) is a different session object. Manually asserting that the dag_id
# is the same.
mock_clearti.assert_called_once_with([], mock.ANY, mock.ANY, DagRunState.QUEUED)
assert mock_clearti.call_args[0][2].dag_id == dag_id

def test_clear_taskinstance_is_called_with_invalid_task_ids(self, test_client, session):
"""Test that dagrun is running when invalid task_ids are passed to clearTaskInstances API."""
Expand Down Expand Up @@ -2506,34 +2491,18 @@ def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session):
"triggerer_job": None,
}

mock_set_ti_state.assert_called_once()
# We check args individually instead of direct matching using
# assert_called_once_with(), because the session objects don't match
# and can't be skipped using mock.ANY.
mock_set_ti_state.assert_called_once()
args, kwargs = mock_set_ti_state.call_args
assert len(args) == 0
assert len(kwargs) == 10
# 1st keyword argument
assert kwargs["task_id"] == self.TASK_ID
# 2nd keyword argument
assert kwargs["run_id"] == self.RUN_ID
# 3rd keyword argument
assert kwargs["map_indexes"] == [-1]
# 4th keyword argument
assert kwargs["state"] == self.NEW_STATE
# 5th keyword argument
assert kwargs["upstream"] is False
# 6th keyword argument
assert kwargs["downstream"] is False
# 7th keyword argument
assert kwargs["future"] is False
# 8th keyword argument
assert kwargs["past"] is False
# 9th keyword argument
assert kwargs["commit"] is True
# 10th keyword argument
assert isinstance(kwargs["session"], sqlalchemy.orm.session.Session)
mock_set_ti_state.assert_called_once_with(
commit=True,
downstream=False,
upstream=False,
future=False,
map_indexes=[-1],
past=False,
run_id=self.RUN_ID,
session=mock.ANY,
state=self.NEW_STATE,
task_id=self.TASK_ID,
)

@mock.patch("airflow.models.dag.DAG.set_task_instance_state")
def test_should_not_call_mocked_api_for_dry_run(self, mock_set_task_instance_state, test_client, session):
Expand Down

0 comments on commit b3f57c5

Please sign in to comment.