diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index d19c11aeba94e..9e99db032df56 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -2392,6 +2392,28 @@ components: format: date-time readOnly: true nullable: true + data_interval_start: + type: string + format: date-time + readOnly: true + nullable: true + data_interval_end: + type: string + format: date-time + readOnly: true + nullable: true + last_scheduling_decision: + type: string + format: date-time + readOnly: true + nullable: true + run_type: + type: string + readOnly: true + enum: + - backfill + - manual + - scheduled state: $ref: '#/components/schemas/DagState' readOnly: true diff --git a/airflow/api_connexion/schemas/dag_run_schema.py b/airflow/api_connexion/schemas/dag_run_schema.py index 4bd91dbe372ba..44f6eda496df2 100644 --- a/airflow/api_connexion/schemas/dag_run_schema.py +++ b/airflow/api_connexion/schemas/dag_run_schema.py @@ -67,6 +67,10 @@ class Meta: state = DagStateField(dump_only=True) external_trigger = auto_field(dump_default=True, dump_only=True) conf = ConfObject() + data_interval_start = auto_field(dump_only=True) + data_interval_end = auto_field(dump_only=True) + last_scheduling_decision = auto_field(dump_only=True) + run_type = auto_field(dump_only=True) @pre_load def autogenerate(self, data, **kwargs): diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py index 3ffb231bda1c2..d2547587dd4f6 100644 --- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py @@ -232,6 +232,10 @@ def test_should_respond_200(self, session): 'external_trigger': True, 'start_date': self.default_time, 'conf': {}, + 'data_interval_end': None, + 'data_interval_start': None, + 'last_scheduling_decision': None, + 'run_type': 'manual', } assert response.json == expected_response @@ -286,6 +290,10 @@ def test_should_respond_200(self, session): 'external_trigger': True, 'start_date': self.default_time, 'conf': {}, + 'data_interval_end': None, + 'data_interval_start': None, + 'last_scheduling_decision': None, + 'run_type': 'manual', }, { 'dag_id': 'TEST_DAG_ID', @@ -297,6 +305,10 @@ def test_should_respond_200(self, session): 'external_trigger': True, 'start_date': self.default_time, 'conf': {}, + 'data_interval_end': None, + 'data_interval_start': None, + 'last_scheduling_decision': None, + 'run_type': 'manual', }, ], "total_entries": 2, @@ -348,6 +360,10 @@ def test_return_correct_results_with_order_by(self, session): 'external_trigger': True, 'start_date': self.default_time, 'conf': {}, + 'data_interval_end': None, + 'data_interval_start': None, + 'last_scheduling_decision': None, + 'run_type': 'manual', }, { 'dag_id': 'TEST_DAG_ID', @@ -359,6 +375,10 @@ def test_return_correct_results_with_order_by(self, session): 'external_trigger': True, 'start_date': self.default_time, 'conf': {}, + 'data_interval_end': None, + 'data_interval_start': None, + 'last_scheduling_decision': None, + 'run_type': 'manual', }, ], "total_entries": 2, @@ -603,6 +623,10 @@ def test_should_respond_200(self): 'external_trigger': True, 'start_date': self.default_time, 'conf': {}, + 'data_interval_end': None, + 'data_interval_start': None, + 'last_scheduling_decision': None, + 'run_type': 'manual', }, { 'dag_id': 'TEST_DAG_ID', @@ -614,6 +638,10 @@ def test_should_respond_200(self): 'external_trigger': True, 'start_date': self.default_time, 'conf': {}, + 'data_interval_end': None, + 'data_interval_start': None, + 'last_scheduling_decision': None, + 'run_type': 'manual', }, ], "total_entries": 2, @@ -652,6 +680,10 @@ def test_order_by_descending_works(self): 'external_trigger': True, 'start_date': self.default_time, 'conf': {}, + 'data_interval_end': None, + 'data_interval_start': None, + 'last_scheduling_decision': None, + 'run_type': 'manual', }, { 'dag_id': 'TEST_DAG_ID', @@ -663,6 +695,10 @@ def test_order_by_descending_works(self): 'external_trigger': True, 'start_date': self.default_time, 'conf': {}, + 'data_interval_end': None, + 'data_interval_start': None, + 'last_scheduling_decision': None, + 'run_type': 'manual', }, ], "total_entries": 2, @@ -699,6 +735,10 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions 'external_trigger': True, 'start_date': self.default_time, 'conf': {}, + 'data_interval_end': None, + 'data_interval_start': None, + 'last_scheduling_decision': None, + 'run_type': 'manual', }, { 'dag_id': 'TEST_DAG_ID', @@ -710,6 +750,10 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions 'external_trigger': True, 'start_date': self.default_time, 'conf': {}, + 'data_interval_end': None, + 'data_interval_start': None, + 'last_scheduling_decision': None, + 'run_type': 'manual', }, ], "total_entries": 2, @@ -1005,6 +1049,10 @@ def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_d "external_trigger": True, "start_date": None, "state": "queued", + "data_interval_end": expected_logical_date, + "data_interval_start": expected_logical_date, + "last_scheduling_decision": None, + "run_type": "manual", } == response.json def test_should_respond_400_if_a_dag_has_import_errors(self, session): @@ -1026,43 +1074,48 @@ def test_should_respond_400_if_a_dag_has_import_errors(self, session): } == response.json def test_should_response_200_for_matching_execution_date_logical_date(self): + execution_date = "2020-11-10T08:25:56.939143+00:00" + logical_date = "2020-11-10T08:25:56.939143+00:00" self._create_dag("TEST_DAG_ID") response = self.client.post( "api/v1/dags/TEST_DAG_ID/dagRuns", json={ - "execution_date": "2020-11-10T08:25:56.939143+00:00", - "logical_date": "2020-11-10T08:25:56.939143+00:00", + "execution_date": execution_date, + "logical_date": logical_date, }, environ_overrides={"REMOTE_USER": "test"}, ) + dag_run_id = f"manual__{logical_date}" + assert response.status_code == 200 assert { "conf": {}, "dag_id": "TEST_DAG_ID", - "dag_run_id": "manual__2020-11-10T08:25:56.939143+00:00", + "dag_run_id": dag_run_id, "end_date": None, - "execution_date": "2020-11-10T08:25:56.939143+00:00", - "logical_date": "2020-11-10T08:25:56.939143+00:00", + "execution_date": execution_date, + "logical_date": logical_date, "external_trigger": True, "start_date": None, "state": "queued", + "data_interval_end": logical_date, + "data_interval_start": logical_date, + "last_scheduling_decision": None, + "run_type": "manual", } == response.json def test_should_response_400_for_conflicting_execution_date_logical_date(self): + execution_date = "2020-11-10T08:25:56.939143+00:00" + logical_date = "2020-11-11T08:25:56.939143+00:00" self._create_dag("TEST_DAG_ID") response = self.client.post( "api/v1/dags/TEST_DAG_ID/dagRuns", - json={ - "execution_date": "2020-11-10T08:25:56.939143+00:00", - "logical_date": "2020-11-11T08:25:56.939143+00:00", - }, + json={"execution_date": execution_date, "logical_date": logical_date}, environ_overrides={"REMOTE_USER": "test"}, ) assert response.status_code == 400 assert response.json["title"] == "logical_date conflicts with execution_date" - assert response.json["detail"] == ( - "'2020-11-11T08:25:56.939143+00:00' != '2020-11-10T08:25:56.939143+00:00'" - ) + assert response.json["detail"] == (f"'{logical_date}' != '{execution_date}'") @parameterized.expand( [ @@ -1219,13 +1272,14 @@ def test_should_raises_403_unauthorized(self, username): class TestPatchDagRunState(TestDagRunEndpoint): @pytest.mark.parametrize("state", ["failed", "success"]) - def test_should_respond_200(self, state, dag_maker, session): + @pytest.mark.parametrize("run_type", [state.value for state in DagRunType]) + def test_should_respond_200(self, state, run_type, dag_maker, session): dag_id = "TEST_DAG_ID" dag_run_id = 'TEST_DAG_RUN_ID' with dag_maker(dag_id) as dag: task = EmptyOperator(task_id='task_id', dag=dag) self.app.dag_bag.bag_dag(dag, root_dag=dag) - dr = dag_maker.create_dagrun(run_id=dag_run_id) + dr = dag_maker.create_dagrun(run_id=dag_run_id, run_type=run_type) ti = dr.get_task_instance(task_id='task_id') ti.task = task ti.state = State.RUNNING @@ -1254,6 +1308,10 @@ def test_should_respond_200(self, state, dag_maker, session): 'logical_date': dr.execution_date.isoformat(), 'start_date': dr.start_date.isoformat(), 'state': state, + 'data_interval_start': dr.data_interval_start.isoformat(), + 'data_interval_end': dr.data_interval_end.isoformat(), + 'last_scheduling_decision': None, + 'run_type': run_type, } @pytest.mark.parametrize('invalid_state', ["running", "queued"]) diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py b/tests/api_connexion/schemas/test_dag_run_schema.py index 6f42ec0aa3540..d97e8c3372433 100644 --- a/tests/api_connexion/schemas/test_dag_run_schema.py +++ b/tests/api_connexion/schemas/test_dag_run_schema.py @@ -74,6 +74,10 @@ def test_serialize(self, session): "external_trigger": True, "start_date": self.default_time, "conf": {"start": "stop"}, + "data_interval_end": None, + "data_interval_start": None, + "last_scheduling_decision": None, + "run_type": "manual", } @parameterized.expand( @@ -162,6 +166,10 @@ def test_serialize(self, session): "state": "running", "start_date": self.default_time, "conf": {"start": "stop"}, + "data_interval_end": None, + "data_interval_start": None, + "last_scheduling_decision": None, + "run_type": "manual", }, { "dag_id": "my-dag-run", @@ -173,6 +181,10 @@ def test_serialize(self, session): "external_trigger": True, "start_date": self.default_time, "conf": {}, + "data_interval_end": None, + "data_interval_start": None, + "last_scheduling_decision": None, + "run_type": "manual", }, ], "total_entries": 2,