Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing tests for the DAG endpoint #35158

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion scripts/cov/restapi_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
restapi_files = ["tests/api_experimental", "tests/api_connexion", "tests/api_internal"]

files_not_fully_covered = [
"airflow/api_connexion/endpoints/dag_endpoint.py",
"airflow/api_connexion/endpoints/dag_run_endpoint.py",
"airflow/api_connexion/endpoints/forward_to_fab_endpoint.py",
"airflow/api_connexion/endpoints/pool_endpoint.py",
Expand Down
217 changes: 217 additions & 0 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.operators.empty import EmptyOperator
from airflow.security import permissions
from airflow.utils.session import provide_session
from airflow.utils.state import TaskInstanceState
from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags
Expand Down Expand Up @@ -1057,6 +1058,39 @@ def test_should_respond_400_on_invalid_request(self):
"type": EXCEPTIONS_LINK_MAP[400],
}

def test_validation_error_raises_400(self):
patch_body = {
"ispaused": True,
}
dag_model = self._create_dag_model()
response = self.client.patch(
f"/api/v1/dags/{dag_model.dag_id}",
json=patch_body,
environ_overrides={"REMOTE_USER": "test_granular_permissions"},
)
assert response.status_code == 400
assert response.json == {
"detail": "{'ispaused': ['Unknown field.']}",
"status": 400,
"title": "Bad Request",
"type": EXCEPTIONS_LINK_MAP[400],
}

def test_non_existing_dag_raises_not_found(self):
patch_body = {
"is_paused": True,
}
response = self.client.patch(
"/api/v1/dags/non_existing_dag", json=patch_body, environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 404
assert response.json == {
"detail": None,
"status": 404,
"title": "Dag with id: 'non_existing_dag' not found",
"type": EXCEPTIONS_LINK_MAP[404],
}

def test_should_respond_404(self):
response = self.client.get("/api/v1/dags/INVALID_DAG", environ_overrides={"REMOTE_USER": "test"})
assert response.status_code == 404
Expand Down Expand Up @@ -1257,6 +1291,138 @@ def test_should_respond_200_on_patch_is_paused(self, session, url_safe_serialize
"total_entries": 2,
} == response.json

def test_should_respond_200_on_patch_is_paused_using_update_mask(self, session, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
file_token2 = url_safe_serializer.dumps("/tmp/dag_2.py")
self._create_dag_models(2)
self._create_deactivated_dag()

dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
assert len(dags_query.all()) == 3

response = self.client.patch(
"/api/v1/dags?dag_id_pattern=~&update_mask=is_paused",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)

assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
{
"dag_id": "TEST_DAG_2",
"description": None,
"fileloc": "/tmp/dag_2.py",
"file_token": file_token2,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
"next_dagrun": None,
"has_task_concurrency_limits": True,
"next_dagrun_data_interval_start": None,
"next_dagrun_data_interval_end": None,
"max_active_runs": 16,
"next_dagrun_create_after": None,
"last_expired": None,
"max_active_tasks": 16,
"last_pickled": None,
"default_view": None,
"last_parsed_time": None,
"scheduler_lock": None,
"timetable_description": None,
"has_import_errors": False,
"pickle_id": None,
},
],
"total_entries": 2,
} == response.json

def test_wrong_value_as_update_mask_rasise(self, session):
self._create_dag_models(2)
self._create_deactivated_dag()

dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
assert len(dags_query.all()) == 3

response = self.client.patch(
"/api/v1/dags?dag_id_pattern=~&update_mask=ispaused",
json={
"is_paused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)

assert response.status_code == 400
assert response.json == {
"detail": "Only `is_paused` field can be updated through the REST API",
"status": 400,
"title": "Bad Request",
"type": EXCEPTIONS_LINK_MAP[400],
}

def test_invalid_request_body_raises_badrequest(self, session):
self._create_dag_models(2)
self._create_deactivated_dag()

dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
assert len(dags_query.all()) == 3

response = self.client.patch(
"/api/v1/dags?dag_id_pattern=~&update_mask=is_paused",
json={
"ispaused": False,
},
environ_overrides={"REMOTE_USER": "test"},
)

assert response.status_code == 400
assert response.json == {
"detail": "{'ispaused': ['Unknown field.']}",
"status": 400,
"title": "Bad Request",
"type": EXCEPTIONS_LINK_MAP[400],
}

def test_only_active_true_returns_active_dags(self, url_safe_serializer):
file_token = url_safe_serializer.dumps("/tmp/dag_1.py")
self._create_dag_models(1)
Expand Down Expand Up @@ -1811,3 +1977,54 @@ def test_should_respons_400_dag_id_pattern_missing(self):
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 400


class TestDeleteDagEndpoint(TestDagEndpoint):
def test_that_dag_can_be_deleted(self):
self._create_dag_models(1)

response = self.client.delete(
"/api/v1/dags/TEST_DAG_1",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 204

def test_raise_when_dag_is_not_found(self):
response = self.client.delete(
"/api/v1/dags/TEST_DAG_1",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 404
assert response.json == {
"detail": None,
"status": 404,
"title": "Dag with id: 'TEST_DAG_1' not found",
"type": EXCEPTIONS_LINK_MAP[404],
}

def test_raises_when_task_instances_of_dag_is_still_running(self, dag_maker, session):
with dag_maker("TEST_DAG_1"):
EmptyOperator(task_id="dummy")
dr = dag_maker.create_dagrun()
ti = dr.get_task_instances()[0]
ti.set_state(TaskInstanceState.RUNNING)
session.flush()
response = self.client.delete(
"/api/v1/dags/TEST_DAG_1",
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 409
assert response.json == {
"detail": "Task instances of dag with id: 'TEST_DAG_1' are still running",
"status": 409,
"title": "Conflict",
"type": EXCEPTIONS_LINK_MAP[409],
}

def test_users_without_delete_permission_cannot_delete_dag(self):
self._create_dag_models(1)
response = self.client.delete(
"/api/v1/dags/TEST_DAG_1",
environ_overrides={"REMOTE_USER": "test_no_permissions"},
)
assert response.status_code == 403