Skip to content

Commit

Permalink
Add fields to dagrun endpoint (#23440)
Browse files Browse the repository at this point in the history
* Add below fields to dagrun endpoint :

* data_interval_start
* data_interval_end
* last_scheduling_decision
* run_type

* Refactor hardcoded dates with constants.
  • Loading branch information
tirkarthi authored May 3, 2022
1 parent 22b49d3 commit 6178491
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 14 deletions.
22 changes: 22 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2392,6 +2392,28 @@ components:
format: date-time
readOnly: true
nullable: true
data_interval_start:
type: string
format: date-time
readOnly: true
nullable: true
data_interval_end:
type: string
format: date-time
readOnly: true
nullable: true
last_scheduling_decision:
type: string
format: date-time
readOnly: true
nullable: true
run_type:
type: string
readOnly: true
enum:
- backfill
- manual
- scheduled
state:
$ref: '#/components/schemas/DagState'
readOnly: true
Expand Down
4 changes: 4 additions & 0 deletions airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ class Meta:
state = DagStateField(dump_only=True)
external_trigger = auto_field(dump_default=True, dump_only=True)
conf = ConfObject()
data_interval_start = auto_field(dump_only=True)
data_interval_end = auto_field(dump_only=True)
last_scheduling_decision = auto_field(dump_only=True)
run_type = auto_field(dump_only=True)

@pre_load
def autogenerate(self, data, **kwargs):
Expand Down
86 changes: 72 additions & 14 deletions tests/api_connexion/endpoints/test_dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ def test_should_respond_200(self, session):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
'data_interval_end': None,
'data_interval_start': None,
'last_scheduling_decision': None,
'run_type': 'manual',
}
assert response.json == expected_response

Expand Down Expand Up @@ -286,6 +290,10 @@ def test_should_respond_200(self, session):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
'data_interval_end': None,
'data_interval_start': None,
'last_scheduling_decision': None,
'run_type': 'manual',
},
{
'dag_id': 'TEST_DAG_ID',
Expand All @@ -297,6 +305,10 @@ def test_should_respond_200(self, session):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
'data_interval_end': None,
'data_interval_start': None,
'last_scheduling_decision': None,
'run_type': 'manual',
},
],
"total_entries": 2,
Expand Down Expand Up @@ -348,6 +360,10 @@ def test_return_correct_results_with_order_by(self, session):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
'data_interval_end': None,
'data_interval_start': None,
'last_scheduling_decision': None,
'run_type': 'manual',
},
{
'dag_id': 'TEST_DAG_ID',
Expand All @@ -359,6 +375,10 @@ def test_return_correct_results_with_order_by(self, session):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
'data_interval_end': None,
'data_interval_start': None,
'last_scheduling_decision': None,
'run_type': 'manual',
},
],
"total_entries": 2,
Expand Down Expand Up @@ -603,6 +623,10 @@ def test_should_respond_200(self):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
'data_interval_end': None,
'data_interval_start': None,
'last_scheduling_decision': None,
'run_type': 'manual',
},
{
'dag_id': 'TEST_DAG_ID',
Expand All @@ -614,6 +638,10 @@ def test_should_respond_200(self):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
'data_interval_end': None,
'data_interval_start': None,
'last_scheduling_decision': None,
'run_type': 'manual',
},
],
"total_entries": 2,
Expand Down Expand Up @@ -652,6 +680,10 @@ def test_order_by_descending_works(self):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
'data_interval_end': None,
'data_interval_start': None,
'last_scheduling_decision': None,
'run_type': 'manual',
},
{
'dag_id': 'TEST_DAG_ID',
Expand All @@ -663,6 +695,10 @@ def test_order_by_descending_works(self):
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
'data_interval_end': None,
'data_interval_start': None,
'last_scheduling_decision': None,
'run_type': 'manual',
},
],
"total_entries": 2,
Expand Down Expand Up @@ -699,6 +735,10 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
'data_interval_end': None,
'data_interval_start': None,
'last_scheduling_decision': None,
'run_type': 'manual',
},
{
'dag_id': 'TEST_DAG_ID',
Expand All @@ -710,6 +750,10 @@ def test_should_return_accessible_with_tilde_as_dag_id_and_dag_level_permissions
'external_trigger': True,
'start_date': self.default_time,
'conf': {},
'data_interval_end': None,
'data_interval_start': None,
'last_scheduling_decision': None,
'run_type': 'manual',
},
],
"total_entries": 2,
Expand Down Expand Up @@ -1005,6 +1049,10 @@ def test_should_respond_200(self, logical_date_field_name, dag_run_id, logical_d
"external_trigger": True,
"start_date": None,
"state": "queued",
"data_interval_end": expected_logical_date,
"data_interval_start": expected_logical_date,
"last_scheduling_decision": None,
"run_type": "manual",
} == response.json

def test_should_respond_400_if_a_dag_has_import_errors(self, session):
Expand All @@ -1026,43 +1074,48 @@ def test_should_respond_400_if_a_dag_has_import_errors(self, session):
} == response.json

def test_should_response_200_for_matching_execution_date_logical_date(self):
execution_date = "2020-11-10T08:25:56.939143+00:00"
logical_date = "2020-11-10T08:25:56.939143+00:00"
self._create_dag("TEST_DAG_ID")
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
json={
"execution_date": "2020-11-10T08:25:56.939143+00:00",
"logical_date": "2020-11-10T08:25:56.939143+00:00",
"execution_date": execution_date,
"logical_date": logical_date,
},
environ_overrides={"REMOTE_USER": "test"},
)
dag_run_id = f"manual__{logical_date}"

assert response.status_code == 200
assert {
"conf": {},
"dag_id": "TEST_DAG_ID",
"dag_run_id": "manual__2020-11-10T08:25:56.939143+00:00",
"dag_run_id": dag_run_id,
"end_date": None,
"execution_date": "2020-11-10T08:25:56.939143+00:00",
"logical_date": "2020-11-10T08:25:56.939143+00:00",
"execution_date": execution_date,
"logical_date": logical_date,
"external_trigger": True,
"start_date": None,
"state": "queued",
"data_interval_end": logical_date,
"data_interval_start": logical_date,
"last_scheduling_decision": None,
"run_type": "manual",
} == response.json

def test_should_response_400_for_conflicting_execution_date_logical_date(self):
execution_date = "2020-11-10T08:25:56.939143+00:00"
logical_date = "2020-11-11T08:25:56.939143+00:00"
self._create_dag("TEST_DAG_ID")
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
json={
"execution_date": "2020-11-10T08:25:56.939143+00:00",
"logical_date": "2020-11-11T08:25:56.939143+00:00",
},
json={"execution_date": execution_date, "logical_date": logical_date},
environ_overrides={"REMOTE_USER": "test"},
)
assert response.status_code == 400
assert response.json["title"] == "logical_date conflicts with execution_date"
assert response.json["detail"] == (
"'2020-11-11T08:25:56.939143+00:00' != '2020-11-10T08:25:56.939143+00:00'"
)
assert response.json["detail"] == (f"'{logical_date}' != '{execution_date}'")

@parameterized.expand(
[
Expand Down Expand Up @@ -1219,13 +1272,14 @@ def test_should_raises_403_unauthorized(self, username):

class TestPatchDagRunState(TestDagRunEndpoint):
@pytest.mark.parametrize("state", ["failed", "success"])
def test_should_respond_200(self, state, dag_maker, session):
@pytest.mark.parametrize("run_type", [state.value for state in DagRunType])
def test_should_respond_200(self, state, run_type, dag_maker, session):
dag_id = "TEST_DAG_ID"
dag_run_id = 'TEST_DAG_RUN_ID'
with dag_maker(dag_id) as dag:
task = EmptyOperator(task_id='task_id', dag=dag)
self.app.dag_bag.bag_dag(dag, root_dag=dag)
dr = dag_maker.create_dagrun(run_id=dag_run_id)
dr = dag_maker.create_dagrun(run_id=dag_run_id, run_type=run_type)
ti = dr.get_task_instance(task_id='task_id')
ti.task = task
ti.state = State.RUNNING
Expand Down Expand Up @@ -1254,6 +1308,10 @@ def test_should_respond_200(self, state, dag_maker, session):
'logical_date': dr.execution_date.isoformat(),
'start_date': dr.start_date.isoformat(),
'state': state,
'data_interval_start': dr.data_interval_start.isoformat(),
'data_interval_end': dr.data_interval_end.isoformat(),
'last_scheduling_decision': None,
'run_type': run_type,
}

@pytest.mark.parametrize('invalid_state', ["running", "queued"])
Expand Down
12 changes: 12 additions & 0 deletions tests/api_connexion/schemas/test_dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def test_serialize(self, session):
"external_trigger": True,
"start_date": self.default_time,
"conf": {"start": "stop"},
"data_interval_end": None,
"data_interval_start": None,
"last_scheduling_decision": None,
"run_type": "manual",
}

@parameterized.expand(
Expand Down Expand Up @@ -162,6 +166,10 @@ def test_serialize(self, session):
"state": "running",
"start_date": self.default_time,
"conf": {"start": "stop"},
"data_interval_end": None,
"data_interval_start": None,
"last_scheduling_decision": None,
"run_type": "manual",
},
{
"dag_id": "my-dag-run",
Expand All @@ -173,6 +181,10 @@ def test_serialize(self, session):
"external_trigger": True,
"start_date": self.default_time,
"conf": {},
"data_interval_end": None,
"data_interval_start": None,
"last_scheduling_decision": None,
"run_type": "manual",
},
],
"total_entries": 2,
Expand Down

0 comments on commit 6178491

Please sign in to comment.