Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove execution_date and logical_date from arguments in api_connexion #43678

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ def _fetch_dag_runs(
query = query.where(DagRun.updated_at <= updated_at_lte)

total_entries = get_query_count(query, session=session)
to_replace = {"dag_run_id": "run_id", "execution_date": "logical_date"}
to_replace = {"dag_run_id": "run_id", "logical_date": "logical_date"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to replace it?

allowed_sort_attrs = [
"id",
"state",
"dag_id",
"execution_date",
"logical_date",
"dag_run_id",
"start_date",
"end_date",
Expand Down Expand Up @@ -318,13 +318,13 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
except ValidationError as err:
raise BadRequest(detail=str(err))

logical_date = pendulum.instance(post_body["execution_date"])
logical_date = pendulum.instance(post_body["logical_date"])
run_id = post_body["run_id"]
dagrun_instance = session.scalar(
select(DagRun)
.where(
DagRun.dag_id == dag_id,
or_(DagRun.run_id == run_id, DagRun.execution_date == logical_date),
or_(DagRun.run_id == run_id, DagRun.logical_date == logical_date),
)
.limit(1)
)
Expand All @@ -345,7 +345,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
execution_date=logical_date,
logical_date=logical_date,
data_interval=data_interval,
state=DagRunState.QUEUED,
conf=post_body.get("conf"),
Expand All @@ -362,7 +362,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
except (ValueError, ParamValidationError) as ve:
raise BadRequest(detail=str(ve))

if dagrun_instance.execution_date == logical_date:
if dagrun_instance.logical_date == logical_date:
raise AlreadyExists(
detail=(
f"DAGRun with DAG ID: '{dag_id}' and "
Expand Down
23 changes: 3 additions & 20 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def get_mapped_task_instances(
# Other search criteria
base_query = _apply_range_filter(
base_query,
key=DR.execution_date,
key=DR.logical_date,
value_range=(execution_date_gte, execution_date_lte),
)
base_query = _apply_range_filter(
Expand Down Expand Up @@ -334,7 +334,7 @@ def get_task_instances(
base_query = base_query.where(TI.run_id == dag_run_id)
base_query = _apply_range_filter(
base_query,
key=DR.execution_date,
key=DR.logical_date,
value_range=(execution_date_gte, execution_date_lte),
)
base_query = _apply_range_filter(
Expand Down Expand Up @@ -396,7 +396,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
base_query = _apply_array_filter(base_query, key=TI.task_id, values=data["task_ids"])
base_query = _apply_range_filter(
base_query,
key=DR.execution_date,
key=DR.logical_date,
value_range=(data["execution_date_gte"], data["execution_date_lte"]),
)
base_query = _apply_range_filter(
Expand Down Expand Up @@ -519,23 +519,7 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
if not task:
error_message = f"Task ID {task_id} not found"
raise NotFound(error_message)

execution_date = data.get("execution_date")
run_id = data.get("dag_run_id")
if (
execution_date
and (
session.scalars(
select(TI).where(
TI.task_id == task_id, TI.dag_id == dag_id, TI.execution_date == execution_date
)
).one_or_none()
)
is None
):
raise NotFound(
detail=f"Task instance not found for task {task_id!r} on execution_date {execution_date}"
)

select_stmt = select(TI).where(
TI.dag_id == dag_id, TI.task_id == task_id, TI.run_id == run_id, TI.map_index == -1
Expand All @@ -548,7 +532,6 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
tis = dag.set_task_instance_state(
task_id=task_id,
run_id=run_id,
execution_date=execution_date,
state=data["new_state"],
upstream=data["include_upstream"],
downstream=data["include_downstream"],
Expand Down
7 changes: 1 addition & 6 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4751,14 +4751,9 @@ components:
description: The task ID.
type: string

execution_date:
description: The execution date. Either set this or dag_run_id but not both.
type: string
format: datetime

dag_run_id:
description: |
The task instance's DAG run ID. Either set this or execution_date but not both.
The task instance's DAG run ID.

*New in version 2.3.0*
type: string
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/schemas/asset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class Meta:

run_id = auto_field(data_key="dag_run_id")
dag_id = auto_field(dump_only=True)
execution_date = auto_field(data_key="logical_date", dump_only=True)
logical_date = auto_field(data_key="logical_date", dump_only=True)
start_date = auto_field(dump_only=True)
end_date = auto_field(dump_only=True)
state = auto_field(dump_only=True)
Expand Down
22 changes: 7 additions & 15 deletions airflow/api_connexion/schemas/dag_run_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Meta:

run_id = auto_field(data_key="dag_run_id")
dag_id = auto_field(dump_only=True)
execution_date = auto_field(data_key="logical_date", validate=validate_istimezone)
logical_date = auto_field(data_key="logical_date", validate=validate_istimezone)
start_date = auto_field(dump_only=True)
end_date = auto_field(dump_only=True)
state = DagStateField(dump_only=True)
Expand All @@ -81,22 +81,14 @@ def autogenerate(self, data, **kwargs):
"""
Auto generate run_id and logical_date if they are not provided.

For compatibility, if `execution_date` is submitted, it is converted
For compatibility, if `logical_date` is submitted, it is converted
to `logical_date`.
"""
logical_date = data.get("logical_date", _MISSING)
execution_date = data.pop("execution_date", _MISSING)
if logical_date is execution_date is _MISSING: # Both missing.

# Auto-generate logical_date if missing
if logical_date is _MISSING:
data["logical_date"] = str(timezone.utcnow())
elif logical_date is _MISSING: # Only logical_date missing.
data["logical_date"] = execution_date
elif execution_date is _MISSING: # Only execution_date missing.
pass
elif logical_date != execution_date: # Both provided but don't match.
raise BadRequest(
"logical_date conflicts with execution_date",
detail=f"{logical_date!r} != {execution_date!r}",
)

if "dag_run_id" not in data:
try:
Expand All @@ -109,9 +101,9 @@ def autogenerate(self, data, **kwargs):

@post_dump
def autofill(self, data, **kwargs):
"""Populate execution_date from logical_date for compatibility."""
"""Populate logical_date from logical_date for compatibility."""
ret_data = {}
data["execution_date"] = data["logical_date"]
data["logical_date"] = data["logical_date"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data["logical_date"] = data["logical_date"]

I think we probably don't need this line?

if self.context.get("fields"):
ret_fields = self.context.get("fields")
for ret_field in ret_fields:
Expand Down
10 changes: 5 additions & 5 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Meta:
dag_id = auto_field()
run_id = auto_field(data_key="dag_run_id")
map_index = auto_field()
execution_date = auto_field()
logical_date = auto_field()
start_date = auto_field()
end_date = auto_field()
duration = auto_field()
Expand Down Expand Up @@ -196,7 +196,7 @@ class SetTaskInstanceStateFormSchema(Schema):

dry_run = fields.Boolean(load_default=True)
task_id = fields.Str(required=True)
execution_date = fields.DateTime(validate=validate_istimezone)
logical_date = fields.DateTime(validate=validate_istimezone)
dag_run_id = fields.Str()
include_upstream = fields.Boolean(required=True)
include_downstream = fields.Boolean(required=True)
Expand All @@ -212,8 +212,8 @@ class SetTaskInstanceStateFormSchema(Schema):
@validates_schema
def validate_form(self, data, **kwargs):
"""Validate set task instance state form."""
if not exactly_one(data.get("execution_date"), data.get("dag_run_id")):
raise ValidationError("Exactly one of execution_date or dag_run_id must be provided")
if not exactly_one(data.get("logical_date"), data.get("dag_run_id")):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed after the removal in airflow/api_connexion/openapi/v1.yaml

raise ValidationError("Exactly one of logical_date or dag_run_id must be provided")


class SetSingleTaskInstanceStateFormSchema(Schema):
Expand All @@ -234,7 +234,7 @@ class TaskInstanceReferenceSchema(Schema):
task_id = fields.Str()
run_id = fields.Str(data_key="dag_run_id")
dag_id = fields.Str()
execution_date = fields.DateTime()
logical_date = fields.DateTime()


class TaskInstanceReferenceCollection(NamedTuple):
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/schemas/xcom_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Meta:

key = auto_field()
timestamp = auto_field()
execution_date = auto_field()
logical_date = auto_field()
map_index = auto_field()
task_id = auto_field()
dag_id = auto_field()
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def stats_tags(self) -> dict[str, str]:
return prune_dict({"dag_id": self.dag_id, "run_type": self.run_type})

@property
def logical_date(self) -> datetime:
def logical_date(self):
return self.execution_date
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to be removed in another PR?


def get_state(self):
Expand Down
7 changes: 1 addition & 6 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2111,12 +2111,7 @@ export interface components {
/** @description The task ID. */
task_id?: string;
/**
* Format: datetime
* @description The execution date. Either set this or dag_run_id but not both.
*/
execution_date?: string;
/**
* @description The task instance's DAG run ID. Either set this or execution_date but not both.
* @description The task instance's DAG run ID.
*
* *New in version 2.3.0*
*/
Expand Down
Loading