diff --git a/airflow/migrations/versions/0143_2_9_2_add_indexes_on_dag_id_column_in_referencing_tables.py b/airflow/migrations/versions/0143_2_9_2_add_indexes_on_dag_id_column_in_referencing_tables.py new file mode 100644 index 0000000000000..6533dc5a7e085 --- /dev/null +++ b/airflow/migrations/versions/0143_2_9_2_add_indexes_on_dag_id_column_in_referencing_tables.py @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add indexes on dag_id column in referencing tables. + +Revision ID: 0fd0c178cbe8 +Revises: 686269002441 +Create Date: 2024-05-15 16:52:39.077349 + +""" + +from __future__ import annotations + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "0fd0c178cbe8" +down_revision = "686269002441" +branch_labels = None +depends_on = None +airflow_version = "2.9.2" + + +def upgrade(): + """Apply Add indexes on dag_id column in referencing tables.""" + with op.batch_alter_table("dag_schedule_dataset_reference") as batch_op: + batch_op.create_index("idx_dag_schedule_dataset_reference_dag_id", ["dag_id"], unique=False) + + with op.batch_alter_table("dag_tag") as batch_op: + batch_op.create_index("idx_dag_tag_dag_id", ["dag_id"], unique=False) + + with op.batch_alter_table("dag_warning") as batch_op: + batch_op.create_index("idx_dag_warning_dag_id", ["dag_id"], unique=False) + + with op.batch_alter_table("dataset_dag_run_queue") as batch_op: + batch_op.create_index("idx_dataset_dag_run_queue_target_dag_id", ["target_dag_id"], unique=False) + + with op.batch_alter_table("task_outlet_dataset_reference") as batch_op: + batch_op.create_index("idx_task_outlet_dataset_reference_dag_id", ["dag_id"], unique=False) + + +def _handle_foreign_key_constraint_index_deletion( + batch_op, constraint_name, index_name, local_fk_column_name +): + conn = op.get_bind() + if conn.dialect.name == "mysql": + batch_op.drop_constraint(constraint_name, type_="foreignkey") + batch_op.drop_index(index_name) + batch_op.create_foreign_key( + constraint_name, "dag", [local_fk_column_name], ["dag_id"], ondelete="CASCADE" + ) + else: + batch_op.drop_index(index_name) + + +def downgrade(): + """Unapply Add indexes on dag_id column in referencing tables.""" + with op.batch_alter_table("dag_schedule_dataset_reference") as batch_op: + _handle_foreign_key_constraint_index_deletion( + batch_op, + "dsdr_dag_id_fkey", + "idx_dag_schedule_dataset_reference_dag_id", + "dag_id", + ) + + with op.batch_alter_table("dag_tag") as batch_op: + _handle_foreign_key_constraint_index_deletion( + batch_op, "dag_tag_dag_id_fkey", "idx_dag_tag_dag_id", "dag_id" + ) + + with op.batch_alter_table("dag_warning") as batch_op: + _handle_foreign_key_constraint_index_deletion( + batch_op, "dcw_dag_id_fkey", "idx_dag_warning_dag_id", "dag_id" + ) + + with op.batch_alter_table("dataset_dag_run_queue") as batch_op: + _handle_foreign_key_constraint_index_deletion( + batch_op, + "ddrq_dag_fkey", + "idx_dataset_dag_run_queue_target_dag_id", + "target_dag_id", + ) + + with op.batch_alter_table("task_outlet_dataset_reference") as batch_op: + _handle_foreign_key_constraint_index_deletion( + batch_op, + "todr_dag_id_fkey", + "idx_task_outlet_dataset_reference_dag_id", + "dag_id", + ) diff --git a/airflow/migrations/versions/0143_2_10_0_add_new_executor_field_to_db.py b/airflow/migrations/versions/0144_2_10_0_add_new_executor_field_to_db.py similarity index 96% rename from airflow/migrations/versions/0143_2_10_0_add_new_executor_field_to_db.py rename to airflow/migrations/versions/0144_2_10_0_add_new_executor_field_to_db.py index 5adf5dbb330ec..36d02350fc80d 100644 --- a/airflow/migrations/versions/0143_2_10_0_add_new_executor_field_to_db.py +++ b/airflow/migrations/versions/0144_2_10_0_add_new_executor_field_to_db.py @@ -19,7 +19,7 @@ """add new executor field to db. Revision ID: 677fdbb7fc54 -Revises: 686269002441 +Revises: 0fd0c178cbe8 Create Date: 2024-04-01 15:26:59.186579 """ @@ -31,7 +31,7 @@ # revision identifiers, used by Alembic. revision = "677fdbb7fc54" -down_revision = "686269002441" +down_revision = "0fd0c178cbe8" branch_labels = None depends_on = None airflow_version = "2.10.0" diff --git a/airflow/migrations/versions/0144_2_10_0_added_dagpriorityparsingrequest_table.py b/airflow/migrations/versions/0145_2_10_0_added_dagpriorityparsingrequest_table.py similarity index 100% rename from airflow/migrations/versions/0144_2_10_0_added_dagpriorityparsingrequest_table.py rename to airflow/migrations/versions/0145_2_10_0_added_dagpriorityparsingrequest_table.py diff --git a/airflow/models/dag.py b/airflow/models/dag.py index c9a8424a831f0..91ee3b083c919 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -3555,6 +3555,8 @@ class DagTag(Base): primary_key=True, ) + __table_args__ = (Index("idx_dag_tag_dag_id", dag_id),) + def __repr__(self): return self.name diff --git a/airflow/models/dagwarning.py b/airflow/models/dagwarning.py index 789fe0172784b..ffab515f85495 100644 --- a/airflow/models/dagwarning.py +++ b/airflow/models/dagwarning.py @@ -20,7 +20,7 @@ from enum import Enum from typing import TYPE_CHECKING -from sqlalchemy import Column, ForeignKeyConstraint, String, Text, delete, false, select +from sqlalchemy import Column, ForeignKeyConstraint, Index, String, Text, delete, false, select from airflow.api_internal.internal_api_call import internal_api_call from airflow.models.base import Base, StringID @@ -55,6 +55,7 @@ class DagWarning(Base): name="dcw_dag_id_fkey", ondelete="CASCADE", ), + Index("idx_dag_warning_dag_id", dag_id), ) def __init__(self, dag_id: str, error_type: str, message: str, **kwargs): diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 7b42ff324b64b..3df059c87d31b 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -138,6 +138,7 @@ class DagScheduleDatasetReference(Base): name="dsdr_dag_id_fkey", ondelete="CASCADE", ), + Index("idx_dag_schedule_dataset_reference_dag_id", dag_id), ) def __eq__(self, other): @@ -182,6 +183,7 @@ class TaskOutletDatasetReference(Base): name="todr_dag_id_fkey", ondelete="CASCADE", ), + Index("idx_task_outlet_dataset_reference_dag_id", dag_id), ) def __eq__(self, other): @@ -226,6 +228,7 @@ class DatasetDagRunQueue(Base): name="ddrq_dag_fkey", ondelete="CASCADE", ), + Index("idx_dataset_dag_run_queue_target_dag_id", target_dag_id), ) def __eq__(self, other): diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 41fa1ab2e0064..f2c837eef04e3 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -117,7 +117,7 @@ class MappedClassProtocol(Protocol): "2.8.0": "10b52ebd31f7", "2.8.1": "88344c1d9134", "2.9.0": "1949afb29106", - "2.9.2": "686269002441", + "2.9.2": "0fd0c178cbe8", "2.10.0": "c4602ba06b4b", } diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index f71ce47401f47..8263e6d131e62 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -62c46e83c8c2a7051fa3d7388e06535bcd01fae4cb1d01b1096cc069a82a89f8 \ No newline at end of file +a78b6a58b3e7f006d1f460d34cd901261e56397b027c81da500151400ecac41f \ No newline at end of file diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 6ae768550b9a0..45fce007f7b1f 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -41,7 +41,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +=================================+===================+===================+==============================================================+ | ``c4602ba06b4b`` (head) | ``677fdbb7fc54`` | ``2.10.0`` | Added DagPriorityParsingRequest table. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ -| ``677fdbb7fc54`` | ``686269002441`` | ``2.10.0`` | add new executor field to db. | +| ``677fdbb7fc54`` | ``0fd0c178cbe8`` | ``2.10.0`` | add new executor field to db. | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``0fd0c178cbe8`` | ``686269002441`` | ``2.9.2`` | Add indexes on dag_id column in referencing tables. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``686269002441`` | ``bff083ad727d`` | ``2.9.2`` | Fix inconsistency between ORM and migration files. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+