Skip to content

Commit

Permalink
Added --dry-run option for ACL migrate (#3017)
Browse files Browse the repository at this point in the history
related to #2770
  • Loading branch information
FastLee authored Oct 25, 2024
1 parent 69ba7e7 commit 1e53812
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 15 deletions.
6 changes: 6 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,17 @@ commands:
- name: migrate-acls
description: |
Migrate access control lists from legacy metastore to UC metastore.
Use the --dry-run flag to populate the infered_grants table and skip the migration.
Use the hms-fed flag to migrate HMS-FED ACLs. If not provided, HMS ACLs will be migrated for migrated tables.
flags:
- name: target-catalog
description: (Optional) Target catalog to migrate ACLs to. Used for HMS-FED ACLs migration.
- name: hms-fed
description: (Optional) Migrate HMS-FED ACLs. If not provided, HMS ACLs will be migrated for migrated tables.
- name: dry-run
description: (Optional) Dry run the migration. If set to true, acl table will be populated and acl migration will be skipped.
If not provided, the migration will be executed.
- name: run-as-collection
description: (Optional) Run the command for the collection of workspaces with ucx installed. Default is False.

Expand Down
22 changes: 20 additions & 2 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from databricks.labs.ucx.install import AccountInstaller
from databricks.labs.ucx.source_code.linters.files import LocalCodeLinter


ucx = App(__file__)
logger = get_logger(__file__)

Expand Down Expand Up @@ -705,7 +704,26 @@ def migrate_acls(
workspace_contexts = [ctx]
else:
workspace_contexts = _get_workspace_contexts(w, a, run_as_collection, **named_parameters)
target_catalog, hms_fed = named_parameters.get("target_catalog"), named_parameters.get("hms_fed", False)
target_catalog = named_parameters.get("target_catalog")
hms_fed = named_parameters.get("hms_fed", False)
dry_run = named_parameters.get("dry_run", False)
if dry_run:
total_grants = 0
for workspace_context in workspace_contexts:
grants = workspace_context.acl_migrator.snapshot()
total_grants += len(grants)
logger.info(
f"Dry run completed. Found {total_grants} grants. The crawled grants can be found in the 'inferred_grants' table. "
"No changes were made."
)
urls: str = ""
for workspace_context in workspace_contexts:
urls += (
f"{workspace_context.connect_config.host}/explore/data/hive_metastore/"
f"{workspace_context.config.inventory_database}/inferred_grants\n"
)
logger.info(f"URLs to the inferred grants tables: \n{urls}")
return
for workspace_context in workspace_contexts:
workspace_context.acl_migrator.migrate_acls(target_catalog=target_catalog, hms_fed=hms_fed)

Expand Down
2 changes: 2 additions & 0 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ def acl_migrator(self):
self.workspace_info,
self.migration_status_refresher,
self.migrate_grants,
self.sql_backend,
self.config.inventory_database,
)

@cached_property
Expand Down
48 changes: 46 additions & 2 deletions src/databricks/labs/ucx/hive_metastore/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,20 @@ def apply(self, src: SecurableObject, dst: SecurableObject) -> bool:
)
return True

def retrieve(self, src: SecurableObject, dst: SecurableObject) -> list[Grant]:
grants = []
for grant in self._match_grants(src):
acl_migrate_sql = grant.uc_grant_sql(dst.kind, dst.full_name)
if acl_migrate_sql is None:
logger.warning(
f"failed-to-migrate: Hive metastore grant '{grant.action_type}' cannot be mapped to UC grant for "
f"{dst.kind} '{dst.full_name}'. Skipping."
)
continue
logger.debug(f"Retrieving acls on {dst.full_name} using SQL query: {acl_migrate_sql}")
grants.append(grant)
return grants

@cached_property
def _workspace_to_account_group_names(self) -> dict[str, str]:
return {g.name_in_workspace: g.name_in_account for g in self._group_manager.snapshot()}
Expand Down Expand Up @@ -828,18 +842,21 @@ def _replace_account_group(self, grant: Grant) -> Grant:
return replace(grant, principal=target_principal)


class ACLMigrator:
class ACLMigrator(CrawlerBase[Grant]):
def __init__(
self,
tables_crawler: TablesCrawler,
workspace_info: WorkspaceInfo,
migration_status_refresher: TableMigrationStatusRefresher,
migrate_grants: MigrateGrants,
backend: SqlBackend,
schema: str,
):
self._tables_crawler = tables_crawler
self._workspace_info = workspace_info
self._migration_status_refresher = migration_status_refresher
self._migrate_grants = migrate_grants
super().__init__(backend, "hive_metastore", schema, "inferred_grants", Grant)

def migrate_acls(self, *, target_catalog: str | None = None, hms_fed: bool = False) -> None:
workspace_name = self._workspace_info.current()
Expand All @@ -853,6 +870,20 @@ def migrate_acls(self, *, target_catalog: str | None = None, hms_fed: bool = Fal
tables_to_migrate = self._get_migrated_tables(tables)
self._migrate_acls(tables_to_migrate)

def _retrieve_table_acls(self, *, target_catalog: str | None = None, hms_fed: bool = False) -> Iterable[Grant]:
tables = list(self._tables_crawler.snapshot())
grants: list[Grant] = []
if not tables:
logger.info("No tables found to acl")
return grants
if hms_fed:
tables_to_migrate = self._get_hms_fed_tables(
tables, target_catalog if target_catalog else self._workspace_info.current()
)
else:
tables_to_migrate = self._get_migrated_tables(tables)
return self._retrieve_acls(tables_to_migrate)

def _get_migrated_tables(self, tables: list[Table]) -> list[TableToMigrate]:
# gets all the migrated table to apply ACLs to
tables_to_migrate = []
Expand Down Expand Up @@ -899,9 +930,22 @@ def _get_hms_fed_tables(self, tables: list[Table], target_catalog) -> list[Table
def _migrate_acls(self, tables_in_scope: list[TableToMigrate]) -> None:
tasks = []
for table in tables_in_scope:
tasks.append(partial(self._migrate_grants.apply, table.src, table.rule.as_uc_table_key))
tasks.append(partial(self._migrate_grants.apply, table.src, table.rule.as_uc_table))
Threads.strict("migrate grants", tasks)

def _retrieve_acls(self, tables_in_scope: list[TableToMigrate]) -> Iterable[Grant]:
grants = []
for table in tables_in_scope:
grants += self._migrate_grants.retrieve(table.src, table.rule.as_uc_table)
return grants

def _is_migrated(self, schema: str, table: str) -> bool:
index = self._migration_status_refresher.index()
return index.is_migrated(schema, table)

def _crawl(self) -> Iterable[Grant]:
return self._retrieve_table_acls()

def _try_fetch(self):
for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"):
yield Grant(*row)
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
functools.partial(table, "directfs_in_queries", DirectFsAccess),
functools.partial(table, "used_tables_in_paths", UsedTable),
functools.partial(table, "used_tables_in_queries", UsedTable),
functools.partial(table, "inferred_grants", Grant),
],
)
deployer.deploy_view("grant_detail", "queries/views/grant_detail.sql")
Expand Down
39 changes: 37 additions & 2 deletions tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@
from datetime import timedelta

import pytest
from databricks.sdk import AccountClient
from databricks.sdk.errors import NotFound
from databricks.sdk.retries import retried
from databricks.sdk.service.compute import DataSecurityMode, AwsAttributes
from databricks.sdk.service.catalog import Privilege, SecurableType, TableInfo, TableType
from databricks.sdk.service.iam import PermissionLevel
from databricks.labs.ucx.__about__ import __version__

from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.ucx.hive_metastore.mapping import Rule, TableMapping
from databricks.labs.ucx.hive_metastore.tables import Table, What

from ..conftest import prepare_hiveserde_tables, get_azure_spark_conf


logger = logging.getLogger(__name__)
_SPARK_CONF = get_azure_spark_conf()

Expand Down Expand Up @@ -646,12 +651,19 @@ def test_mapping_reverts_table(ws, sql_backend, runtime_ctx, make_catalog):
assert "upgraded_to" not in results


@retried(on=[NotFound], timeout=timedelta(minutes=3))
def test_migrate_managed_tables_with_acl(ws, sql_backend, runtime_ctx, make_catalog, make_user):
# @retried(on=[NotFound], timeout=timedelta(minutes=3))
def test_migrate_managed_tables_with_acl(ws, sql_backend, runtime_ctx, make_catalog, make_user, env_or_skip):
src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore")
src_managed_table = runtime_ctx.make_table(catalog_name=src_schema.catalog_name, schema_name=src_schema.name)
user = make_user()

acc_client = AccountClient(
host=ws.config.environment.deployment_url("accounts"),
account_id=env_or_skip("DATABRICKS_ACCOUNT_ID"),
product='ucx',
product_version=__version__,
)
runtime_ctx.with_workspace_info([acc_client.workspaces.get(ws.get_workspace_id())])
runtime_ctx.make_grant(
principal=user.user_name,
action_type="SELECT",
Expand Down Expand Up @@ -685,6 +697,29 @@ def test_migrate_managed_tables_with_acl(ws, sql_backend, runtime_ctx, make_cata
assert len(target_principals) == 1, f"Missing grant for user {user.user_name}"
assert target_principals[0].privileges == [Privilege.MODIFY, Privilege.SELECT]

acl_migrator = runtime_ctx.acl_migrator
acls = acl_migrator.snapshot()
assert (
Grant(
principal=user.user_name,
action_type='MODIFY',
catalog='hive_metastore',
database=src_managed_table.schema_name,
table=src_managed_table.name,
)
in acls
)
assert (
Grant(
principal=user.user_name,
action_type="SELECT",
catalog='hive_metastore',
database=src_managed_table.schema_name,
table=src_managed_table.name,
)
in acls
)


@retried(on=[NotFound], timeout=timedelta(minutes=3))
def test_migrate_external_tables_with_principal_acl_azure(
Expand Down
76 changes: 67 additions & 9 deletions tests/unit/hive_metastore/test_migrate_acls.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,31 @@ def ws_info():
return info


def test_migrate_acls_should_produce_proper_queries(ws, ws_info, caplog):
def test_migrate_acls_should_produce_proper_queries(ws, ws_info, mock_backend, caplog):
table_crawler = create_autospec(TablesCrawler)
src = Table('hive_metastore', 'db1_src', 'view_src', 'VIEW', 'UNKNOWN')
dst = Table('ucx_default', 'db1_dst', 'view_dst', 'VIEW', 'UNKNOWN')
table_crawler.snapshot.return_value = [src]

workspace_info = ws_info
migration_status_refresher = create_autospec(TableMigrationStatusRefresher)

migrate_grants = create_autospec(MigrateGrants)
acl_migrate = ACLMigrator(
table_crawler,
workspace_info,
migration_status_refresher,
migrate_grants,
table_crawler, workspace_info, migration_status_refresher, migrate_grants, mock_backend, "ucx"
)
migration_status_refresher.get_seen_tables.return_value = {
"ucx_default.db1_dst.view_dst": "hive_metastore.db1_src.view_src",
}
acl_migrate.migrate_acls()
migrate_grants.apply.assert_called_with(src, 'ucx_default.db1_dst.view_dst')
migrate_grants.apply.assert_called_with(src, dst)


def test_migrate_acls_hms_fed_proper_queries(ws, ws_info, caplog):
def test_migrate_acls_hms_fed_proper_queries(ws, ws_info, mock_backend, caplog):
table_crawler = create_autospec(TablesCrawler)
src = Table('hive_metastore', 'db1_src', 'managed_dbfs', 'TABLE', 'DELTA', "/foo/bar/test")
dst = Table('hms_fed', 'db1_src', 'managed_dbfs', 'TABLE', 'DELTA', "/foo/bar/test")

table_crawler.snapshot.return_value = [src]
workspace_info = ws_info
migrate_grants = create_autospec(MigrateGrants)
Expand All @@ -67,10 +67,68 @@ def test_migrate_acls_hms_fed_proper_queries(ws, ws_info, caplog):
workspace_info,
migration_status_refresher,
migrate_grants,
mock_backend,
"ucx",
)
acl_migrate.migrate_acls(hms_fed=True)
acl_migrate.migrate_acls(hms_fed=True, target_catalog='hms_fed')

migrate_grants.apply.assert_called_with(src, dst)


def test_tacl_crawler(ws, ws_info, caplog):
table_crawler = create_autospec(TablesCrawler)
sql_backend = create_autospec(SqlBackend)
src = [
Table('hive_metastore', 'db1_src', 'table1', 'TABLE', 'DELTA', "/foo/bar/table1"),
Table('hive_metastore', 'db1_src', 'table2', 'TABLE', 'DELTA', "/foo/bar/table2"),
]
table_crawler.snapshot.return_value = src
workspace_info = ws_info

user_grants = [
Grant('user1', 'SELECT', database='db1_src', table='table1'),
Grant('user2', 'MODIFY', database='db1_src', table='table1'),
Grant('user1', 'SELECT', database='db1_src', table='table2'),
Grant('user2', 'MODIFY', database='db1_src', table='table2'),
Grant('user1', 'SELECT', database='db1_src', table='table2'),
]

group_grants = [
Grant('group1', 'SELECT', database='db1_src', table='table1'),
]

def grant_loader():
return user_grants + group_grants

migrate_grants.apply.assert_called_with(src, 'hms_fed.db1_src.managed_dbfs')
group_manager = create_autospec(GroupManager)
group_manager.snapshot.return_value = [
MigratedGroup(
name_in_workspace='group1',
name_in_account='acc_group1',
id_in_workspace='123',
temporary_name='temp_group1',
),
]
migrate_grants = MigrateGrants(sql_backend, group_manager, [grant_loader])

migration_index = create_autospec(TableMigrationIndex)
migration_index.is_migrated.return_value = True

migration_status_refresher = create_autospec(TableMigrationStatusRefresher)
migration_status_refresher.get_seen_tables.return_value = {
"ucx_default.db1_dst.table1": "hive_metastore.db1_src.table1",
"ucx_default.db1_dst.table2": "hive_metastore.db1_src.table2",
}
migration_status_refresher.index.return_value = migration_index

acl_migrate = ACLMigrator(
table_crawler, workspace_info, migration_status_refresher, migrate_grants, sql_backend, "ucx"
)
tacls = acl_migrate.snapshot()
sql_backend.fetch.assert_called_with('SELECT * FROM `hive_metastore`.`ucx`.`inferred_grants`')
for grant in user_grants:
assert grant in tacls
assert Grant('acc_group1', 'SELECT', database='db1_src', table='table1') in tacls


def test_migrate_matched_grants_applies() -> None:
Expand Down

0 comments on commit 1e53812

Please sign in to comment.