Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds helper functions for migrations #31303

Merged
merged 5 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions superset/migrations/shared/constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
from dataclasses import dataclass

from alembic import op
from sqlalchemy.dialects.sqlite.base import SQLiteDialect # noqa: E402
from sqlalchemy.engine.reflection import Inspector

from superset.migrations.shared.utils import has_table
from superset.utils.core import generic_find_fk_constraint_name


Expand Down Expand Up @@ -73,23 +71,3 @@ def redefine(
ondelete=on_delete,
onupdate=on_update,
)


def drop_fks_for_table(table_name: str) -> None:
"""
Drop all foreign key constraints for a table if it exist and the database
is not sqlite.

:param table_name: The table name to drop foreign key constraints for
"""
connection = op.get_bind()
inspector = Inspector.from_engine(connection)

if isinstance(connection.dialect, SQLiteDialect):
return # sqlite doesn't like constraints

if has_table(table_name):
foreign_keys = inspector.get_foreign_keys(table_name)

for fk in foreign_keys:
op.drop_constraint(fk["name"], table_name, type_="foreignkey")
225 changes: 213 additions & 12 deletions superset/migrations/shared/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,25 @@
from typing import Any, Callable, Optional, Union
from uuid import uuid4

import sqlalchemy as sa
from alembic import op
from sqlalchemy import inspect
from sqlalchemy import Column, inspect
from sqlalchemy.dialects.mysql.base import MySQLDialect
from sqlalchemy.dialects.postgresql.base import PGDialect
from sqlalchemy.dialects.sqlite.base import SQLiteDialect # noqa: E402
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.schema import SchemaItem

from superset.utils import json

logger = logging.getLogger(__name__)
GREEN = "\033[32m"
RESET = "\033[0m"
YELLOW = "\033[33m"
RED = "\033[31m"
LRED = "\033[91m"

logger = logging.getLogger("alembic")

DEFAULT_BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 1000))

Expand Down Expand Up @@ -185,15 +193,208 @@
return table_exists


def add_column_if_not_exists(table_name: str, column: sa.Column) -> None:
def drop_fks_for_table(table_name: str) -> None:
"""
Drop all foreign key constraints for a table if it exist and the database
is not sqlite.

:param table_name: The table name to drop foreign key constraints for
"""
connection = op.get_bind()
inspector = Inspector.from_engine(connection)

Check warning on line 204 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L203-L204

Added lines #L203 - L204 were not covered by tests

if isinstance(connection.dialect, SQLiteDialect):
return # sqlite doesn't like constraints

Check warning on line 207 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L206-L207

Added lines #L206 - L207 were not covered by tests

if has_table(table_name):
foreign_keys = inspector.get_foreign_keys(table_name)
for fk in foreign_keys:
luizotavio32 marked this conversation as resolved.
Show resolved Hide resolved
logger.info(

Check warning on line 212 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L209-L212

Added lines #L209 - L212 were not covered by tests
f"Dropping foreign key {GREEN}{fk['name']}{RESET} from table {GREEN}{table_name}{RESET}..."
)
op.drop_constraint(fk["name"], table_name, type_="foreignkey")

Check warning on line 215 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L215

Added line #L215 was not covered by tests


def create_table(table_name: str, *columns: SchemaItem) -> None:
"""
Creates a database table with the specified name and columns.

This function checks if a table with the given name already exists in the database.
If the table already exists, it logs an informational.
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
Otherwise, it proceeds to create a new table using the provided name and schema columns.

:param table_name: The name of the table to be created.
:param columns: A variable number of arguments representing the schema just like when calling alembic's method create_table()
"""

if has_table(table_name=table_name):
logger.info(f"Table {LRED}{table_name}{RESET} already exists. Skipping...")
return

Check warning on line 232 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L230-L232

Added lines #L230 - L232 were not covered by tests

logger.info(f"Creating table {GREEN}{table_name}{RESET}...")
op.create_table(table_name, *columns)
logger.info(f"Table {GREEN}{table_name}{RESET} created.")

Check warning on line 236 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L234-L236

Added lines #L234 - L236 were not covered by tests


def drop_table(table_name: str) -> None:
"""
Adds a column to a table if it does not already exist.
Drops a database table with the specified name.

:param table_name: Name of the table.
:param column: SQLAlchemy Column object.
This function checks if a table with the given name exists in the database.
If the table does not exist, it logs an informational message and skips the dropping process.
If the table exists, it first attempts to drop all foreign key constraints associated with the table
(handled by `drop_fks_for_table`) and then proceeds to drop the table.

:param table_name: The name of the table to be dropped.
"""
if not table_has_column(table_name, column.name):
print(f"Adding column '{column.name}' to table '{table_name}'.\n")
op.add_column(table_name, column)
else:
print(f"Column '{column.name}' already exists in table '{table_name}'.\n")

if not has_table(table_name=table_name):
logger.info(f"Table {GREEN}{table_name}{RESET} doesn't exist. Skipping...")
return

Check warning on line 253 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L251-L253

Added lines #L251 - L253 were not covered by tests

logger.info(f"Dropping table {GREEN}{table_name}{RESET}...")
drop_fks_for_table(table_name)
op.drop_table(table_name=table_name)
logger.info(f"Table {GREEN}{table_name}{RESET} dropped.")

Check warning on line 258 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L255-L258

Added lines #L255 - L258 were not covered by tests


def batch_operation(
callable: Callable[[int, int], None], count: int, batch_size: int
) -> None:
"""
Executes an operation by dividing a task into smaller batches and tracking progress.

This function is designed to process a large number of items in smaller batches. It takes a callable
that performs the operation on each batch. The function logs the progress of the operation as it processes
through the batches.

If count is set to 0 or lower, it logs an informational message and skips the batch process.

:param callable: A callable function that takes two integer arguments:
the start index and the end index of the current batch.
:param count: The total number of items to process.
:param batch_size: The number of items to process in each batch.
"""
if count <= 0:
logger.info(

Check warning on line 279 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L278-L279

Added lines #L278 - L279 were not covered by tests
f"No records to process in batch {LRED}(count <= 0){RESET} for callable {LRED}other_callable_example{RESET}. Skipping..."
)
return
for offset in range(0, count, batch_size):
percentage = (offset / count) * 100 if count else 0
logger.info(f"Progress: {offset:,}/{count:,} ({percentage:.2f}%)")
callable(offset, min(offset + batch_size, count))

Check warning on line 286 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L282-L286

Added lines #L282 - L286 were not covered by tests

logger.info(f"Progress: {count:,}/{count:,} (100%)")
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
logger.info(

Check warning on line 289 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L288-L289

Added lines #L288 - L289 were not covered by tests
f"End: {GREEN}{callable.__name__}{RESET} batch operation {GREEN}succesfully{RESET} executed."
)


def add_columns(table_name: str, *columns: Column) -> None:
"""
Adds new columns to an existing database table.

If a column already exists, it logs an informational message and skips the adding process.
Otherwise, it proceeds to add the new column to the table.

The operation is performed using Alembic's batch_alter_table.

:param table_name: The name of the table to which the columns will be added.
:param columns: A list of SQLAlchemy Column objects that define the name, type, and other attributes of the columns to be added.
"""

cols_to_add = []
for col in columns:
if table_has_column(table_name=table_name, column_name=col.name):
logger.info(

Check warning on line 310 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L307-L310

Added lines #L307 - L310 were not covered by tests
f"Column {LRED}{col.name}{RESET} already present on table {LRED}{table_name}{RESET}. Skipping..."
)
else:
cols_to_add.append(col)

Check warning on line 314 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L314

Added line #L314 was not covered by tests

with op.batch_alter_table(table_name) as batch_op:
for col in cols_to_add:
logger.info(

Check warning on line 318 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L316-L318

Added lines #L316 - L318 were not covered by tests
f"Adding column {GREEN}{col.name}{RESET} to table {GREEN}{table_name}{RESET}..."
)
batch_op.add_column(col)

Check warning on line 321 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L321

Added line #L321 was not covered by tests


def drop_columns(table_name: str, *columns: str) -> None:
"""
Drops specified columns from an existing database table.

If a column does not exist, it logs an informational message and skips the dropping process.
Otherwise, it proceeds to remove the column from the table.

The operation is performed using Alembic's batch_alter_table.

:param table_name: The name of the table from which the columns will be removed.
:param columns: A list of column names to be dropped.
"""

cols_to_drop = []
for col in columns:
if not table_has_column(table_name=table_name, column_name=col):
logger.info(

Check warning on line 340 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L337-L340

Added lines #L337 - L340 were not covered by tests
f"Column {LRED}{col}{RESET} is not present on table {LRED}{table_name}{RESET}. Skipping..."
)
else:
cols_to_drop.append(col)

Check warning on line 344 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L344

Added line #L344 was not covered by tests

with op.batch_alter_table(table_name) as batch_op:
for col in cols_to_drop:
logger.info(

Check warning on line 348 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L346-L348

Added lines #L346 - L348 were not covered by tests
f"Dropping column {GREEN}{col}{RESET} from table {GREEN}{table_name}{RESET}..."
)
batch_op.drop_column(col)

Check warning on line 351 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L351

Added line #L351 was not covered by tests


def create_index(table_name: str, index_name: str, *columns: str) -> None:
"""
Creates an index on specified columns of an existing database table.

If the index already exists, it logs an informational message and skips the creation process.
Otherwise, it proceeds to create a new index with the specified name on the given columns of the table.

:param table_name: The name of the table on which the index will be created.
:param index_name: The name of the index to be created.
:param columns: A list column names where the index will be created
"""

if table_has_index(table=table_name, index=index_name):
logger.info(

Check warning on line 367 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L366-L367

Added lines #L366 - L367 were not covered by tests
f"Table {LRED}{table_name}{RESET} already has index {LRED}{index_name}{RESET}. Skipping..."
)
return

Check warning on line 370 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L370

Added line #L370 was not covered by tests

logger.info(

Check warning on line 372 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L372

Added line #L372 was not covered by tests
f"Creating index {GREEN}{index_name}{RESET} on table {GREEN}{table_name}{RESET}"
)

op.create_index(table_name=table_name, index_name=index_name, columns=columns)

Check warning on line 376 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L376

Added line #L376 was not covered by tests


def drop_index(table_name: str, index_name: str) -> None:
"""
Drops an index from an existing database table.

If the index does not exists, it logs an informational message and skips the dropping process.
Otherwise, it proceeds with the removal operation.

:param table_name: The name of the table from which the index will be dropped.
:param index_name: The name of the index to be dropped.
"""

if not table_has_index(table=table_name, index=index_name):
logger.info(

Check warning on line 391 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L390-L391

Added lines #L390 - L391 were not covered by tests
f"Table {LRED}{table_name}{RESET} doesn't have index {LRED}{index_name}{RESET}. Skipping..."
)
return

Check warning on line 394 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L394

Added line #L394 was not covered by tests

logger.info(

Check warning on line 396 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L396

Added line #L396 was not covered by tests
f"Dropping index {GREEN}{index_name}{RESET} from table {GREEN}{table_name}{RESET}..."
)

op.drop_index(table_name=table_name, index_name=index_name)

Check warning on line 400 in superset/migrations/shared/utils.py

View check run for this annotation

Codecov / codecov/patch

superset/migrations/shared/utils.py#L400

Added line #L400 was not covered by tests
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from alembic import op # noqa: E402
from sqlalchemy_utils import EncryptedType # noqa: E402

from superset.migrations.shared.constraints import drop_fks_for_table # noqa: E402
from superset.migrations.shared.utils import drop_fks_for_table # noqa: E402


def upgrade():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,20 @@
"""

import sqlalchemy as sa
from alembic import op

from superset.migrations.shared.utils import add_column_if_not_exists
from superset.migrations.shared.utils import add_columns, drop_columns

# revision identifiers, used by Alembic.
revision = "c22cb5c2e546"
down_revision = "678eefb4ab44"


def upgrade():
add_column_if_not_exists(
add_columns(
"user_attribute",
sa.Column("avatar_url", sa.String(length=100), nullable=True),
)


def downgrade():
op.drop_column("user_attribute", "avatar_url")
drop_columns("user_attribute", "avatar_url")
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
"""

import sqlalchemy as sa
from alembic import op

from superset.migrations.shared.utils import add_column_if_not_exists
from superset.migrations.shared.utils import add_columns, drop_columns

# revision identifiers, used by Alembic.
revision = "5f57af97bc3f"
Expand All @@ -36,12 +35,9 @@

def upgrade():
for table in tables:
add_column_if_not_exists(
table,
sa.Column("catalog", sa.String(length=256), nullable=True),
)
add_columns(table, sa.Column("catalog", sa.String(length=256), nullable=True))


def downgrade():
for table in reversed(tables):
op.drop_column(table, "catalog")
drop_columns(table, "catalog")
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,29 @@
"""

import sqlalchemy as sa
from alembic import op

from superset.migrations.shared.catalogs import (
downgrade_catalog_perms,
upgrade_catalog_perms,
)
from superset.migrations.shared.utils import add_column_if_not_exists
from superset.migrations.shared.utils import add_columns, drop_columns

# revision identifiers, used by Alembic.
revision = "58d051681a3b"
down_revision = "4a33124c18ad"


def upgrade():
add_column_if_not_exists(
"tables",
sa.Column("catalog_perm", sa.String(length=1000), nullable=True),
add_columns(
"tables", sa.Column("catalog_perm", sa.String(length=1000), nullable=True)
)
add_column_if_not_exists(
"slices",
sa.Column("catalog_perm", sa.String(length=1000), nullable=True),
add_columns(
"slices", sa.Column("catalog_perm", sa.String(length=1000), nullable=True)
)
upgrade_catalog_perms(engines={"postgresql"})


def downgrade():
downgrade_catalog_perms(engines={"postgresql"})
op.drop_column("slices", "catalog_perm")
op.drop_column("tables", "catalog_perm")
drop_columns("slices", "catalog_perm")
drop_columns("tables", "catalog_perm")
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import sqlalchemy as sa
from alembic import op

from superset.migrations.shared.constraints import drop_fks_for_table
from superset.migrations.shared.utils import has_table
from superset.migrations.shared.utils import drop_fks_for_table, has_table

# revision identifiers, used by Alembic.
revision = "02f4f7811799"
Expand Down
Loading
Loading