Skip to content

Commit

Permalink
Add indexes on dag_id column in referencing tables to speed up deleti…
Browse files Browse the repository at this point in the history
…on of dag records (apache#39638)

* Add indexes on dag_id column in refencing tables to speed up deletion of dag records

* Gracefully handle deletion of indexes on foreign key columns during downgrade

* Correct constraint key name for dag_owner_attributes table fk

* Handle ForeignKey for dag_owner_attributes table behavior based on db

* Temporarily disable downgrade for dag_owner_attributes table to check the behavior in CI

* Skip index for dag_owner_attributes table

* Address @ephraimbuddy's comment
  • Loading branch information
pankajkoti authored and romsharon98 committed Jul 26, 2024
1 parent 89223c9 commit b60abc4
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add indexes on dag_id column in referencing tables.
Revision ID: 0fd0c178cbe8
Revises: 686269002441
Create Date: 2024-05-15 16:52:39.077349
"""

from __future__ import annotations

from alembic import op

# revision identifiers, used by Alembic.
revision = "0fd0c178cbe8"
down_revision = "686269002441"
branch_labels = None
depends_on = None
airflow_version = "2.9.2"


def upgrade():
"""Apply Add indexes on dag_id column in referencing tables."""
with op.batch_alter_table("dag_schedule_dataset_reference") as batch_op:
batch_op.create_index("idx_dag_schedule_dataset_reference_dag_id", ["dag_id"], unique=False)

with op.batch_alter_table("dag_tag") as batch_op:
batch_op.create_index("idx_dag_tag_dag_id", ["dag_id"], unique=False)

with op.batch_alter_table("dag_warning") as batch_op:
batch_op.create_index("idx_dag_warning_dag_id", ["dag_id"], unique=False)

with op.batch_alter_table("dataset_dag_run_queue") as batch_op:
batch_op.create_index("idx_dataset_dag_run_queue_target_dag_id", ["target_dag_id"], unique=False)

with op.batch_alter_table("task_outlet_dataset_reference") as batch_op:
batch_op.create_index("idx_task_outlet_dataset_reference_dag_id", ["dag_id"], unique=False)


def _handle_foreign_key_constraint_index_deletion(
batch_op, constraint_name, index_name, local_fk_column_name
):
conn = op.get_bind()
if conn.dialect.name == "mysql":
batch_op.drop_constraint(constraint_name, type_="foreignkey")
batch_op.drop_index(index_name)
batch_op.create_foreign_key(
constraint_name, "dag", [local_fk_column_name], ["dag_id"], ondelete="CASCADE"
)
else:
batch_op.drop_index(index_name)


def downgrade():
"""Unapply Add indexes on dag_id column in referencing tables."""
with op.batch_alter_table("dag_schedule_dataset_reference") as batch_op:
_handle_foreign_key_constraint_index_deletion(
batch_op,
"dsdr_dag_id_fkey",
"idx_dag_schedule_dataset_reference_dag_id",
"dag_id",
)

with op.batch_alter_table("dag_tag") as batch_op:
_handle_foreign_key_constraint_index_deletion(
batch_op, "dag_tag_dag_id_fkey", "idx_dag_tag_dag_id", "dag_id"
)

with op.batch_alter_table("dag_warning") as batch_op:
_handle_foreign_key_constraint_index_deletion(
batch_op, "dcw_dag_id_fkey", "idx_dag_warning_dag_id", "dag_id"
)

with op.batch_alter_table("dataset_dag_run_queue") as batch_op:
_handle_foreign_key_constraint_index_deletion(
batch_op,
"ddrq_dag_fkey",
"idx_dataset_dag_run_queue_target_dag_id",
"target_dag_id",
)

with op.batch_alter_table("task_outlet_dataset_reference") as batch_op:
_handle_foreign_key_constraint_index_deletion(
batch_op,
"todr_dag_id_fkey",
"idx_task_outlet_dataset_reference_dag_id",
"dag_id",
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""add new executor field to db.
Revision ID: 677fdbb7fc54
Revises: 686269002441
Revises: 0fd0c178cbe8
Create Date: 2024-04-01 15:26:59.186579
"""
Expand All @@ -31,7 +31,7 @@

# revision identifiers, used by Alembic.
revision = "677fdbb7fc54"
down_revision = "686269002441"
down_revision = "0fd0c178cbe8"
branch_labels = None
depends_on = None
airflow_version = "2.10.0"
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3555,6 +3555,8 @@ class DagTag(Base):
primary_key=True,
)

__table_args__ = (Index("idx_dag_tag_dag_id", dag_id),)

def __repr__(self):
return self.name

Expand Down
3 changes: 2 additions & 1 deletion airflow/models/dagwarning.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from enum import Enum
from typing import TYPE_CHECKING

from sqlalchemy import Column, ForeignKeyConstraint, String, Text, delete, false, select
from sqlalchemy import Column, ForeignKeyConstraint, Index, String, Text, delete, false, select

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.models.base import Base, StringID
Expand Down Expand Up @@ -55,6 +55,7 @@ class DagWarning(Base):
name="dcw_dag_id_fkey",
ondelete="CASCADE",
),
Index("idx_dag_warning_dag_id", dag_id),
)

def __init__(self, dag_id: str, error_type: str, message: str, **kwargs):
Expand Down
3 changes: 3 additions & 0 deletions airflow/models/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class DagScheduleDatasetReference(Base):
name="dsdr_dag_id_fkey",
ondelete="CASCADE",
),
Index("idx_dag_schedule_dataset_reference_dag_id", dag_id),
)

def __eq__(self, other):
Expand Down Expand Up @@ -182,6 +183,7 @@ class TaskOutletDatasetReference(Base):
name="todr_dag_id_fkey",
ondelete="CASCADE",
),
Index("idx_task_outlet_dataset_reference_dag_id", dag_id),
)

def __eq__(self, other):
Expand Down Expand Up @@ -226,6 +228,7 @@ class DatasetDagRunQueue(Base):
name="ddrq_dag_fkey",
ondelete="CASCADE",
),
Index("idx_dataset_dag_run_queue_target_dag_id", target_dag_id),
)

def __eq__(self, other):
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class MappedClassProtocol(Protocol):
"2.8.0": "10b52ebd31f7",
"2.8.1": "88344c1d9134",
"2.9.0": "1949afb29106",
"2.9.2": "686269002441",
"2.9.2": "0fd0c178cbe8",
"2.10.0": "c4602ba06b4b",
}

Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
62c46e83c8c2a7051fa3d7388e06535bcd01fae4cb1d01b1096cc069a82a89f8
a78b6a58b3e7f006d1f460d34cd901261e56397b027c81da500151400ecac41f
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+=================================+===================+===================+==============================================================+
| ``c4602ba06b4b`` (head) | ``677fdbb7fc54`` | ``2.10.0`` | Added DagPriorityParsingRequest table. |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``677fdbb7fc54`` | ``686269002441`` | ``2.10.0`` | add new executor field to db. |
| ``677fdbb7fc54`` | ``0fd0c178cbe8`` | ``2.10.0`` | add new executor field to db. |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``0fd0c178cbe8`` | ``686269002441`` | ``2.9.2`` | Add indexes on dag_id column in referencing tables. |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``686269002441`` | ``bff083ad727d`` | ``2.9.2`` | Fix inconsistency between ORM and migration files. |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
Expand Down

0 comments on commit b60abc4

Please sign in to comment.