Skip to content

Commit

Permalink
Add missing tests for the DAG endpoint (#35158)
Browse files Browse the repository at this point in the history
This commit adds missing tests for the DAG endpoint to improve the coverage.

Related: #35127
  • Loading branch information
ephraimbuddy authored Oct 24, 2023
1 parent 3721c9a commit 9645786
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 1 deletion.
1 change: 0 additions & 1 deletion scripts/cov/restapi_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
restapi_files = ["tests/api_experimental", "tests/api_connexion", "tests/api_internal"]

files_not_fully_covered = [
"airflow/api_connexion/endpoints/dag_endpoint.py",
"airflow/api_connexion/endpoints/dag_run_endpoint.py",
"airflow/api_connexion/endpoints/forward_to_fab_endpoint.py",
"airflow/api_connexion/endpoints/pool_endpoint.py",
Expand Down
217 changes: 217 additions & 0 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.operators.empty import EmptyOperator
from airflow.security import permissions
from airflow.utils.session import provide_session
from airflow.utils.state import TaskInstanceState
from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags
Expand Down Expand Up @@ -1057,6 +1058,39 @@ def test_should_respond_400_on_invalid_request(self):
"type": EXCEPTIONS_LINK_MAP[400],
}

def test_validation_error_raises_400(self):
patch_body = {
"ispaused": True,
}
dag_model = self._create_dag_model()
response = self.client.patch(
f"/api/v1/dags/{dag_model.dag_id}",
json=patch_body,
environ_overrides={"REMOTE_USER": "test_granular_permissions"},
)
assert response.status_code == 400
assert response.json == {
"detail": "{'ispaused': ['Unknown field.']}",
"status": 400,
"title": "Bad Request",
"type": EXCEPTIONS_LINK_MAP[400],
}

def test_non_existing_dag_raises_not_found(self):
patch_body = {
"is_paused": True,
}
response = self.client.patch(
"/api/v1/dags/non_existing_dag", json=patch_body, environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 404
assert response.json == {
"detail": None,
"status": 404,
"title": "Dag with id: 'non_existing_dag' not found",
"type": EXCEPTIONS_LINK_MAP[404],
}

def test_should_respond_404(self):
response = self.client.get("/api/v1/dags/INVALID_DAG", environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 404
Expand Down Expand Up @@ -1257,6 +1291,138 @@ def test_should_respond_200_on_patch_is_paused(self, session, url_safe_serialize
"total_entries": 2,
} == response.json

def test_should_respond_200_on_patch_is_paused_using_update_mask(self, session, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
file_token2 = url_safe_serializer.dumps("/tmp/dag_2.py")
self._create_dag_models(2)
self._create_deactivated_dag()

dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
assert len(dags_query.all()) == 3

response = self.client.patch(
"/api/v1/dags?dag_id_pattern=~&update_mask=is_paused",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)

assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
{
"dag_id": "TEST_DAG_2",
"description": None,
"fileloc": "/tmp/dag_2.py",
"file_token": file_token2,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
],
"total_entries": 2,
} == response.json

def test_wrong_value_as_update_mask_rasise(self, session):
self._create_dag_models(2)
self._create_deactivated_dag()

dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
assert len(dags_query.all()) == 3

response = self.client.patch(
"/api/v1/dags?dag_id_pattern=~&update_mask=ispaused",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)

assert response.status_code == 400
assert response.json == {
"detail": "Only `is_paused` field can be updated through the REST API",
"status": 400,
"title": "Bad Request",
"type": EXCEPTIONS_LINK_MAP[400],
}

def test_invalid_request_body_raises_badrequest(self, session):
self._create_dag_models(2)
self._create_deactivated_dag()

dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
assert len(dags_query.all()) == 3

response = self.client.patch(
"/api/v1/dags?dag_id_pattern=~&update_mask=is_paused",
json={
"ispaused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)

assert response.status_code == 400
assert response.json == {
"detail": "{'ispaused': ['Unknown field.']}",
"status": 400,
"title": "Bad Request",
"type": EXCEPTIONS_LINK_MAP[400],
}

def test_only_active_true_returns_active_dags(self, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
self._create_dag_models(1)
Expand Down Expand Up @@ -1811,3 +1977,54 @@ def test_should_respons_400_dag_id_pattern_missing(self):
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 400


class TestDeleteDagEndpoint(TestDagEndpoint):
def test_that_dag_can_be_deleted(self):
self._create_dag_models(1)

response = self.client.delete(
"/api/v1/dags/TEST_DAG_1",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 204

def test_raise_when_dag_is_not_found(self):
response = self.client.delete(
"/api/v1/dags/TEST_DAG_1",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 404
assert response.json == {
"detail": None,
"status": 404,
"title": "Dag with id: 'TEST_DAG_1' not found",
"type": EXCEPTIONS_LINK_MAP[404],
}

def test_raises_when_task_instances_of_dag_is_still_running(self, dag_maker, session):
with dag_maker("TEST_DAG_1"):
EmptyOperator(task_id="dummy")
dr = dag_maker.create_dagrun()
ti = dr.get_task_instances()[0]
ti.set_state(TaskInstanceState.RUNNING)
session.flush()
response = self.client.delete(
"/api/v1/dags/TEST_DAG_1",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 409
assert response.json == {
"detail": "Task instances of dag with id: 'TEST_DAG_1' are still running",
"status": 409,
"title": "Conflict",
"type": EXCEPTIONS_LINK_MAP[409],
}

def test_users_without_delete_permission_cannot_delete_dag(self):
self._create_dag_models(1)
response = self.client.delete(
"/api/v1/dags/TEST_DAG_1",
environ_overrides={"REMOTE_USER": "test_no_permissions"},
)
assert response.status_code == 403

0 comments on commit 9645786

Please sign in to comment.