diff --git a/superset/migrations/shared/constraints.py b/superset/migrations/shared/constraints.py index 5ae63f57ce611..d3bc140fd0bab 100644 --- a/superset/migrations/shared/constraints.py +++ b/superset/migrations/shared/constraints.py @@ -19,10 +19,8 @@ from dataclasses import dataclass from alembic import op -from sqlalchemy.dialects.sqlite.base import SQLiteDialect # noqa: E402 from sqlalchemy.engine.reflection import Inspector -from superset.migrations.shared.utils import has_table from superset.utils.core import generic_find_fk_constraint_name @@ -73,23 +71,3 @@ def redefine( ondelete=on_delete, onupdate=on_update, ) - - -def drop_fks_for_table(table_name: str) -> None: - """ - Drop all foreign key constraints for a table if it exist and the database - is not sqlite. - - :param table_name: The table name to drop foreign key constraints for - """ - connection = op.get_bind() - inspector = Inspector.from_engine(connection) - - if isinstance(connection.dialect, SQLiteDialect): - return # sqlite doesn't like constraints - - if has_table(table_name): - foreign_keys = inspector.get_foreign_keys(table_name) - - for fk in foreign_keys: - op.drop_constraint(fk["name"], table_name, type_="foreignkey") diff --git a/superset/migrations/shared/utils.py b/superset/migrations/shared/utils.py index f62f8a858a2b4..813162bcd5527 100644 --- a/superset/migrations/shared/utils.py +++ b/superset/migrations/shared/utils.py @@ -21,17 +21,25 @@ from typing import Any, Callable, Optional, Union from uuid import uuid4 -import sqlalchemy as sa from alembic import op -from sqlalchemy import inspect +from sqlalchemy import Column, inspect from sqlalchemy.dialects.mysql.base import MySQLDialect from sqlalchemy.dialects.postgresql.base import PGDialect +from sqlalchemy.dialects.sqlite.base import SQLiteDialect # noqa: E402 +from sqlalchemy.engine.reflection import Inspector from sqlalchemy.exc import NoSuchTableError from sqlalchemy.orm import Query, Session +from sqlalchemy.sql.schema import SchemaItem from superset.utils import json -logger = logging.getLogger(__name__) +GREEN = "\033[32m" +RESET = "\033[0m" +YELLOW = "\033[33m" +RED = "\033[31m" +LRED = "\033[91m" + +logger = logging.getLogger("alembic") DEFAULT_BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 1000)) @@ -185,15 +193,208 @@ def has_table(table_name: str) -> bool: return table_exists -def add_column_if_not_exists(table_name: str, column: sa.Column) -> None: +def drop_fks_for_table(table_name: str) -> None: + """ + Drop all foreign key constraints for a table if it exist and the database + is not sqlite. + + :param table_name: The table name to drop foreign key constraints for + """ + connection = op.get_bind() + inspector = Inspector.from_engine(connection) + + if isinstance(connection.dialect, SQLiteDialect): + return # sqlite doesn't like constraints + + if has_table(table_name): + foreign_keys = inspector.get_foreign_keys(table_name) + for fk in foreign_keys: + logger.info( + f"Dropping foreign key {GREEN}{fk['name']}{RESET} from table {GREEN}{table_name}{RESET}..." + ) + op.drop_constraint(fk["name"], table_name, type_="foreignkey") + + +def create_table(table_name: str, *columns: SchemaItem) -> None: + """ + Creates a database table with the specified name and columns. + + This function checks if a table with the given name already exists in the database. + If the table already exists, it logs an informational. + Otherwise, it proceeds to create a new table using the provided name and schema columns. + + :param table_name: The name of the table to be created. + :param columns: A variable number of arguments representing the schema just like when calling alembic's method create_table() + """ + + if has_table(table_name=table_name): + logger.info(f"Table {LRED}{table_name}{RESET} already exists. Skipping...") + return + + logger.info(f"Creating table {GREEN}{table_name}{RESET}...") + op.create_table(table_name, *columns) + logger.info(f"Table {GREEN}{table_name}{RESET} created.") + + +def drop_table(table_name: str) -> None: """ - Adds a column to a table if it does not already exist. + Drops a database table with the specified name. - :param table_name: Name of the table. - :param column: SQLAlchemy Column object. + This function checks if a table with the given name exists in the database. + If the table does not exist, it logs an informational message and skips the dropping process. + If the table exists, it first attempts to drop all foreign key constraints associated with the table + (handled by `drop_fks_for_table`) and then proceeds to drop the table. + + :param table_name: The name of the table to be dropped. """ - if not table_has_column(table_name, column.name): - print(f"Adding column '{column.name}' to table '{table_name}'.\n") - op.add_column(table_name, column) - else: - print(f"Column '{column.name}' already exists in table '{table_name}'.\n") + + if not has_table(table_name=table_name): + logger.info(f"Table {GREEN}{table_name}{RESET} doesn't exist. Skipping...") + return + + logger.info(f"Dropping table {GREEN}{table_name}{RESET}...") + drop_fks_for_table(table_name) + op.drop_table(table_name=table_name) + logger.info(f"Table {GREEN}{table_name}{RESET} dropped.") + + +def batch_operation( + callable: Callable[[int, int], None], count: int, batch_size: int +) -> None: + """ + Executes an operation by dividing a task into smaller batches and tracking progress. + + This function is designed to process a large number of items in smaller batches. It takes a callable + that performs the operation on each batch. The function logs the progress of the operation as it processes + through the batches. + + If count is set to 0 or lower, it logs an informational message and skips the batch process. + + :param callable: A callable function that takes two integer arguments: + the start index and the end index of the current batch. + :param count: The total number of items to process. + :param batch_size: The number of items to process in each batch. + """ + if count <= 0: + logger.info( + f"No records to process in batch {LRED}(count <= 0){RESET} for callable {LRED}other_callable_example{RESET}. Skipping..." + ) + return + for offset in range(0, count, batch_size): + percentage = (offset / count) * 100 if count else 0 + logger.info(f"Progress: {offset:,}/{count:,} ({percentage:.2f}%)") + callable(offset, min(offset + batch_size, count)) + + logger.info(f"Progress: {count:,}/{count:,} (100%)") + logger.info( + f"End: {GREEN}{callable.__name__}{RESET} batch operation {GREEN}succesfully{RESET} executed." + ) + + +def add_columns(table_name: str, *columns: Column) -> None: + """ + Adds new columns to an existing database table. + + If a column already exists, it logs an informational message and skips the adding process. + Otherwise, it proceeds to add the new column to the table. + + The operation is performed using Alembic's batch_alter_table. + + :param table_name: The name of the table to which the columns will be added. + :param columns: A list of SQLAlchemy Column objects that define the name, type, and other attributes of the columns to be added. + """ + + cols_to_add = [] + for col in columns: + if table_has_column(table_name=table_name, column_name=col.name): + logger.info( + f"Column {LRED}{col.name}{RESET} already present on table {LRED}{table_name}{RESET}. Skipping..." + ) + else: + cols_to_add.append(col) + + with op.batch_alter_table(table_name) as batch_op: + for col in cols_to_add: + logger.info( + f"Adding column {GREEN}{col.name}{RESET} to table {GREEN}{table_name}{RESET}..." + ) + batch_op.add_column(col) + + +def drop_columns(table_name: str, *columns: str) -> None: + """ + Drops specified columns from an existing database table. + + If a column does not exist, it logs an informational message and skips the dropping process. + Otherwise, it proceeds to remove the column from the table. + + The operation is performed using Alembic's batch_alter_table. + + :param table_name: The name of the table from which the columns will be removed. + :param columns: A list of column names to be dropped. + """ + + cols_to_drop = [] + for col in columns: + if not table_has_column(table_name=table_name, column_name=col): + logger.info( + f"Column {LRED}{col}{RESET} is not present on table {LRED}{table_name}{RESET}. Skipping..." + ) + else: + cols_to_drop.append(col) + + with op.batch_alter_table(table_name) as batch_op: + for col in cols_to_drop: + logger.info( + f"Dropping column {GREEN}{col}{RESET} from table {GREEN}{table_name}{RESET}..." + ) + batch_op.drop_column(col) + + +def create_index(table_name: str, index_name: str, *columns: str) -> None: + """ + Creates an index on specified columns of an existing database table. + + If the index already exists, it logs an informational message and skips the creation process. + Otherwise, it proceeds to create a new index with the specified name on the given columns of the table. + + :param table_name: The name of the table on which the index will be created. + :param index_name: The name of the index to be created. + :param columns: A list column names where the index will be created + """ + + if table_has_index(table=table_name, index=index_name): + logger.info( + f"Table {LRED}{table_name}{RESET} already has index {LRED}{index_name}{RESET}. Skipping..." + ) + return + + logger.info( + f"Creating index {GREEN}{index_name}{RESET} on table {GREEN}{table_name}{RESET}" + ) + + op.create_index(table_name=table_name, index_name=index_name, columns=columns) + + +def drop_index(table_name: str, index_name: str) -> None: + """ + Drops an index from an existing database table. + + If the index does not exists, it logs an informational message and skips the dropping process. + Otherwise, it proceeds with the removal operation. + + :param table_name: The name of the table from which the index will be dropped. + :param index_name: The name of the index to be dropped. + """ + + if not table_has_index(table=table_name, index=index_name): + logger.info( + f"Table {LRED}{table_name}{RESET} doesn't have index {LRED}{index_name}{RESET}. Skipping..." + ) + return + + logger.info( + f"Dropping index {GREEN}{index_name}{RESET} from table {GREEN}{table_name}{RESET}..." + ) + + op.drop_index(table_name=table_name, index_name=index_name) diff --git a/superset/migrations/versions/2024-03-20_16-02_678eefb4ab44_add_access_token_table.py b/superset/migrations/versions/2024-03-20_16-02_678eefb4ab44_add_access_token_table.py index 0e92884e58c99..59ccd7a0852db 100644 --- a/superset/migrations/versions/2024-03-20_16-02_678eefb4ab44_add_access_token_table.py +++ b/superset/migrations/versions/2024-03-20_16-02_678eefb4ab44_add_access_token_table.py @@ -30,7 +30,7 @@ from alembic import op # noqa: E402 from sqlalchemy_utils import EncryptedType # noqa: E402 -from superset.migrations.shared.constraints import drop_fks_for_table # noqa: E402 +from superset.migrations.shared.utils import drop_fks_for_table # noqa: E402 def upgrade(): diff --git a/superset/migrations/versions/2024-04-01_22-44_c22cb5c2e546_user_attr_avatar_url.py b/superset/migrations/versions/2024-04-01_22-44_c22cb5c2e546_user_attr_avatar_url.py index 126280f338227..c3022b3d5ab72 100644 --- a/superset/migrations/versions/2024-04-01_22-44_c22cb5c2e546_user_attr_avatar_url.py +++ b/superset/migrations/versions/2024-04-01_22-44_c22cb5c2e546_user_attr_avatar_url.py @@ -22,9 +22,8 @@ """ import sqlalchemy as sa -from alembic import op -from superset.migrations.shared.utils import add_column_if_not_exists +from superset.migrations.shared.utils import add_columns, drop_columns # revision identifiers, used by Alembic. revision = "c22cb5c2e546" @@ -32,11 +31,11 @@ def upgrade(): - add_column_if_not_exists( + add_columns( "user_attribute", sa.Column("avatar_url", sa.String(length=100), nullable=True), ) def downgrade(): - op.drop_column("user_attribute", "avatar_url") + drop_columns("user_attribute", "avatar_url") diff --git a/superset/migrations/versions/2024-04-11_15-41_5f57af97bc3f_add_catalog_column.py b/superset/migrations/versions/2024-04-11_15-41_5f57af97bc3f_add_catalog_column.py index b535867d64a4b..f7b35c15afdb1 100644 --- a/superset/migrations/versions/2024-04-11_15-41_5f57af97bc3f_add_catalog_column.py +++ b/superset/migrations/versions/2024-04-11_15-41_5f57af97bc3f_add_catalog_column.py @@ -23,9 +23,8 @@ """ import sqlalchemy as sa -from alembic import op -from superset.migrations.shared.utils import add_column_if_not_exists +from superset.migrations.shared.utils import add_columns, drop_columns # revision identifiers, used by Alembic. revision = "5f57af97bc3f" @@ -36,12 +35,9 @@ def upgrade(): for table in tables: - add_column_if_not_exists( - table, - sa.Column("catalog", sa.String(length=256), nullable=True), - ) + add_columns(table, sa.Column("catalog", sa.String(length=256), nullable=True)) def downgrade(): for table in reversed(tables): - op.drop_column(table, "catalog") + drop_columns(table, "catalog") diff --git a/superset/migrations/versions/2024-05-01_10-52_58d051681a3b_add_catalog_perm_to_tables.py b/superset/migrations/versions/2024-05-01_10-52_58d051681a3b_add_catalog_perm_to_tables.py index 6dfc2845bcb7d..1d87113d16f95 100644 --- a/superset/migrations/versions/2024-05-01_10-52_58d051681a3b_add_catalog_perm_to_tables.py +++ b/superset/migrations/versions/2024-05-01_10-52_58d051681a3b_add_catalog_perm_to_tables.py @@ -23,13 +23,12 @@ """ import sqlalchemy as sa -from alembic import op from superset.migrations.shared.catalogs import ( downgrade_catalog_perms, upgrade_catalog_perms, ) -from superset.migrations.shared.utils import add_column_if_not_exists +from superset.migrations.shared.utils import add_columns, drop_columns # revision identifiers, used by Alembic. revision = "58d051681a3b" @@ -37,18 +36,16 @@ def upgrade(): - add_column_if_not_exists( - "tables", - sa.Column("catalog_perm", sa.String(length=1000), nullable=True), + add_columns( + "tables", sa.Column("catalog_perm", sa.String(length=1000), nullable=True) ) - add_column_if_not_exists( - "slices", - sa.Column("catalog_perm", sa.String(length=1000), nullable=True), + add_columns( + "slices", sa.Column("catalog_perm", sa.String(length=1000), nullable=True) ) upgrade_catalog_perms(engines={"postgresql"}) def downgrade(): downgrade_catalog_perms(engines={"postgresql"}) - op.drop_column("slices", "catalog_perm") - op.drop_column("tables", "catalog_perm") + drop_columns("slices", "catalog_perm") + drop_columns("tables", "catalog_perm") diff --git a/superset/migrations/versions/2024-05-24_11-31_02f4f7811799_remove_sl_dataset_columns_table.py b/superset/migrations/versions/2024-05-24_11-31_02f4f7811799_remove_sl_dataset_columns_table.py index 03158fabd97b1..70913d2ecb9de 100644 --- a/superset/migrations/versions/2024-05-24_11-31_02f4f7811799_remove_sl_dataset_columns_table.py +++ b/superset/migrations/versions/2024-05-24_11-31_02f4f7811799_remove_sl_dataset_columns_table.py @@ -25,8 +25,7 @@ import sqlalchemy as sa from alembic import op -from superset.migrations.shared.constraints import drop_fks_for_table -from superset.migrations.shared.utils import has_table +from superset.migrations.shared.utils import drop_fks_for_table, has_table # revision identifiers, used by Alembic. revision = "02f4f7811799" diff --git a/superset/migrations/versions/2024-08-13_15-17_39549add7bfc_remove_sl_table_columns_table.py b/superset/migrations/versions/2024-08-13_15-17_39549add7bfc_remove_sl_table_columns_table.py index 1ec41f5e1ca87..bb7a2d14dbd52 100644 --- a/superset/migrations/versions/2024-08-13_15-17_39549add7bfc_remove_sl_table_columns_table.py +++ b/superset/migrations/versions/2024-08-13_15-17_39549add7bfc_remove_sl_table_columns_table.py @@ -25,8 +25,7 @@ import sqlalchemy as sa from alembic import op -from superset.migrations.shared.constraints import drop_fks_for_table -from superset.migrations.shared.utils import has_table +from superset.migrations.shared.utils import drop_fks_for_table, has_table # revision identifiers, used by Alembic. revision = "39549add7bfc" diff --git a/superset/migrations/versions/2024-08-13_15-23_38f4144e8558_remove_sl_dataset_tables.py b/superset/migrations/versions/2024-08-13_15-23_38f4144e8558_remove_sl_dataset_tables.py index 4931456a33830..d8565f89b6fb7 100644 --- a/superset/migrations/versions/2024-08-13_15-23_38f4144e8558_remove_sl_dataset_tables.py +++ b/superset/migrations/versions/2024-08-13_15-23_38f4144e8558_remove_sl_dataset_tables.py @@ -25,8 +25,7 @@ import sqlalchemy as sa from alembic import op -from superset.migrations.shared.constraints import drop_fks_for_table -from superset.migrations.shared.utils import has_table +from superset.migrations.shared.utils import drop_fks_for_table, has_table # revision identifiers, used by Alembic. revision = "38f4144e8558" diff --git a/superset/migrations/versions/2024-08-13_15-27_e53fd48cc078_remove_sl_dataset_users.py b/superset/migrations/versions/2024-08-13_15-27_e53fd48cc078_remove_sl_dataset_users.py index 359b05dcd4948..e9122dc47afb6 100644 --- a/superset/migrations/versions/2024-08-13_15-27_e53fd48cc078_remove_sl_dataset_users.py +++ b/superset/migrations/versions/2024-08-13_15-27_e53fd48cc078_remove_sl_dataset_users.py @@ -25,8 +25,7 @@ import sqlalchemy as sa from alembic import op -from superset.migrations.shared.constraints import drop_fks_for_table -from superset.migrations.shared.utils import has_table +from superset.migrations.shared.utils import drop_fks_for_table, has_table # revision identifiers, used by Alembic. revision = "e53fd48cc078" diff --git a/superset/migrations/versions/2024-08-13_15-29_a6b32d2d07b1_remove_sl_columns.py b/superset/migrations/versions/2024-08-13_15-29_a6b32d2d07b1_remove_sl_columns.py index 1562ed962a0c4..d10862fb8c76f 100644 --- a/superset/migrations/versions/2024-08-13_15-29_a6b32d2d07b1_remove_sl_columns.py +++ b/superset/migrations/versions/2024-08-13_15-29_a6b32d2d07b1_remove_sl_columns.py @@ -25,8 +25,7 @@ import sqlalchemy as sa from alembic import op -from superset.migrations.shared.constraints import drop_fks_for_table -from superset.migrations.shared.utils import has_table +from superset.migrations.shared.utils import drop_fks_for_table, has_table # revision identifiers, used by Alembic. revision = "a6b32d2d07b1" diff --git a/superset/migrations/versions/2024-08-13_15-31_007a1abffe7e_remove_sl_tables.py b/superset/migrations/versions/2024-08-13_15-31_007a1abffe7e_remove_sl_tables.py index 106cd7a1704a0..8b0f2861018ea 100644 --- a/superset/migrations/versions/2024-08-13_15-31_007a1abffe7e_remove_sl_tables.py +++ b/superset/migrations/versions/2024-08-13_15-31_007a1abffe7e_remove_sl_tables.py @@ -25,8 +25,7 @@ import sqlalchemy as sa from alembic import op -from superset.migrations.shared.constraints import drop_fks_for_table -from superset.migrations.shared.utils import has_table +from superset.migrations.shared.utils import drop_fks_for_table, has_table # revision identifiers, used by Alembic. revision = "007a1abffe7e" diff --git a/superset/migrations/versions/2024-08-13_15-33_48cbb571fa3a_remove_sl_datasets.py b/superset/migrations/versions/2024-08-13_15-33_48cbb571fa3a_remove_sl_datasets.py index c35d3d7572e56..7b35ebec26141 100644 --- a/superset/migrations/versions/2024-08-13_15-33_48cbb571fa3a_remove_sl_datasets.py +++ b/superset/migrations/versions/2024-08-13_15-33_48cbb571fa3a_remove_sl_datasets.py @@ -25,8 +25,7 @@ import sqlalchemy as sa from alembic import op -from superset.migrations.shared.constraints import drop_fks_for_table -from superset.migrations.shared.utils import has_table +from superset.migrations.shared.utils import drop_fks_for_table, has_table # revision identifiers, used by Alembic. revision = "48cbb571fa3a"