From ab0caffeccbc76e0490d4b8a89d8784ee6f8a548 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Sep 2024 09:43:48 +0200 Subject: [PATCH 1/4] Reuse `force_refresh` for an up-to-date table migration index --- .../labs/ucx/hive_metastore/table_migrate.py | 17 +++++++---------- .../hive_metastore/table_migration_status.py | 4 ++-- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 9bd0d432fa..2b894841c6 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -52,7 +52,7 @@ def __init__( self._migrate_grants = migrate_grants def get_remaining_tables(self) -> list[Table]: - self.index_full_refresh() + self.index(force_refresh=True) table_rows = [] for crawled_table in self._tc.snapshot(): if not self._is_migrated(crawled_table.database, crawled_table.name): @@ -60,13 +60,8 @@ def get_remaining_tables(self) -> list[Table]: logger.warning(f"remained-hive-metastore-table: {crawled_table.key}") return table_rows - def index(self): - return self._migration_status_refresher.index() - - def index_full_refresh(self): - # when we want the latest up-to-date status, e.g. to determine whether views dependencies have been migrated - self._migration_status_refresher.reset() - return self._migration_status_refresher.index() + def index(self, *, force_refresh: bool = False): + return self._migration_status_refresher.index(force_refresh=force_refresh) def migrate_tables( self, @@ -101,7 +96,9 @@ def _migrate_tables(self, what: What, mounts: list[Mount], hiveserde_in_place_mi def _migrate_views(self): tables_to_migrate = self._tm.get_tables_to_migrate(self._tc) all_tasks = [] - sequencer = ViewsMigrationSequencer(tables_to_migrate, migration_index=self.index_full_refresh()) + # Every batch of views to migrate needs an up-to-date table migration index + # to determine if the dependencies have been migrated + sequencer = ViewsMigrationSequencer(tables_to_migrate, migration_index=self.index(force_refresh=True)) batches = sequencer.sequence_batches() for batch in batches: tasks = [] @@ -109,7 +106,7 @@ def _migrate_views(self): tasks.append(partial(self._migrate_view, view)) Threads.strict("migrate views", tasks) all_tasks.extend(tasks) - self.index_full_refresh() + self.index(force_refresh=True) return all_tasks def _migrate_table(self, src_table: TableToMigrate, mounts: list[Mount], hiveserde_in_place_migrate: bool = False): diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 256ac2c9ea..283be4f717 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -80,8 +80,8 @@ def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema, table_crawler: self._ws = ws self._table_crawler = table_crawler - def index(self) -> TableMigrationIndex: - return TableMigrationIndex(list(self.snapshot())) + def index(self, *, force_refresh: bool = False) -> TableMigrationIndex: + return TableMigrationIndex(list(self.snapshot(force_refresh=force_refresh))) def get_seen_tables(self) -> dict[str, str]: seen_tables: dict[str, str] = {} From 61877cbc3217c16d99369dc8a36d63b535e69a53 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 13 Sep 2024 10:02:08 +0200 Subject: [PATCH 2/4] Extend MockBackend to check if empty rows are written --- tests/unit/hive_metastore/test_workflows.py | 26 +++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index 45dffb23fb..62d44e7033 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -1,4 +1,6 @@ import pytest +from databricks.labs.lsql.backends import DataclassInstance, MockBackend + from databricks.labs.ucx.hive_metastore.workflows import ( TableMigration, MigrateExternalTablesCTAS, @@ -53,6 +55,26 @@ def test_migrate_ctas_views(run_workflow): ctx.workspace_client.catalogs.list.assert_called() +class MockBackendFriend(MockBackend): + """A wrapper class to change the return type on :meth:`rows_written_for`.""" + + def rows_written_for(self, full_name: str, mode: str) -> list[DataclassInstance] | None: + """Retrieve the rows written for a table name given a mode. + + Additionally to the logic of the parent class, differentiate between no rows (empty list) and no match (None). + """ + rows: list[DataclassInstance] = [] + found_write_match = False + for stub_full_name, stub_rows, stub_mode in self._save_table: + if not (stub_full_name == full_name and stub_mode == mode): + continue + found_write_match = True + rows += stub_rows + if len(rows) == 0 and not found_write_match: + return None + return rows + + @pytest.mark.parametrize( "workflow", [ @@ -65,6 +87,6 @@ def test_migrate_ctas_views(run_workflow): ) def test_update_migration_status(run_workflow, workflow): """Migration status is refreshed by deleting and showing new tables""" - ctx = run_workflow(getattr(workflow, "update_migration_status")) - assert "TRUNCATE TABLE `hive_metastore`.`ucx`.`migration_status`" in ctx.sql_backend.queries + ctx = run_workflow(getattr(workflow, "update_migration_status"), replace={"sql_backend": MockBackendFriend()}) + assert ctx.sql_backend.rows_written_for("hive_metastore.ucx.migration_status", "overwrite") is not None assert "SHOW DATABASES" in ctx.sql_backend.queries From 4fc98a03a494c2eeae350e75618c562a940de737 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 19 Sep 2024 15:51:31 +0200 Subject: [PATCH 3/4] Use has_rows_written_for from MockBackend --- tests/unit/hive_metastore/test_workflows.py | 26 +++------------------ 1 file changed, 3 insertions(+), 23 deletions(-) diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index 62d44e7033..a76b5332a7 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -55,26 +55,6 @@ def test_migrate_ctas_views(run_workflow): ctx.workspace_client.catalogs.list.assert_called() -class MockBackendFriend(MockBackend): - """A wrapper class to change the return type on :meth:`rows_written_for`.""" - - def rows_written_for(self, full_name: str, mode: str) -> list[DataclassInstance] | None: - """Retrieve the rows written for a table name given a mode. - - Additionally to the logic of the parent class, differentiate between no rows (empty list) and no match (None). - """ - rows: list[DataclassInstance] = [] - found_write_match = False - for stub_full_name, stub_rows, stub_mode in self._save_table: - if not (stub_full_name == full_name and stub_mode == mode): - continue - found_write_match = True - rows += stub_rows - if len(rows) == 0 and not found_write_match: - return None - return rows - - @pytest.mark.parametrize( "workflow", [ @@ -85,8 +65,8 @@ def rows_written_for(self, full_name: str, mode: str) -> list[DataclassInstance] MigrateTablesInMounts, ], ) -def test_update_migration_status(run_workflow, workflow): +def test_update_migration_status(run_workflow, workflow) -> None: """Migration status is refreshed by deleting and showing new tables""" - ctx = run_workflow(getattr(workflow, "update_migration_status"), replace={"sql_backend": MockBackendFriend()}) - assert ctx.sql_backend.rows_written_for("hive_metastore.ucx.migration_status", "overwrite") is not None + ctx = run_workflow(getattr(workflow, "update_migration_status")) + assert ctx.sql_backend.has_rows_written_for("hive_metastore.ucx.migration_status") assert "SHOW DATABASES" in ctx.sql_backend.queries From 07a0f6f8bab445d37765d31f5b62f634659916f5 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 19 Sep 2024 16:27:28 +0200 Subject: [PATCH 4/4] Remove unused class --- tests/unit/hive_metastore/test_workflows.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index a76b5332a7..7d2a05ec8f 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -1,5 +1,4 @@ import pytest -from databricks.labs.lsql.backends import DataclassInstance, MockBackend from databricks.labs.ucx.hive_metastore.workflows import ( TableMigration,