diff --git a/src/databricks/labs/ucx/hive_metastore/catalog_schema.py b/src/databricks/labs/ucx/hive_metastore/catalog_schema.py index 280a345460..60d1a921d6 100644 --- a/src/databricks/labs/ucx/hive_metastore/catalog_schema.py +++ b/src/databricks/labs/ucx/hive_metastore/catalog_schema.py @@ -1,7 +1,6 @@ import collections import logging from dataclasses import replace -import fnmatch from pathlib import PurePath from databricks.labs.blueprint.tui import Prompts @@ -30,7 +29,7 @@ def __init__( ): self._ws = ws self._table_mapping = table_mapping - self._external_locations = self._ws.external_locations.list() + self._external_locations = list(self._ws.external_locations.list()) self._principal_grants = principal_grants self._backend = sql_backend self._hive_grants_crawler = grants_crawler @@ -45,32 +44,19 @@ def create_ucx_catalog(self, prompts: Prompts, *, properties: dict[str, str] | N properties : (dict[str, str] | None), default None The properties to pass to the catalog. If None, no properties are passed. """ - try: - self._create_catalog_validate(self._ucx_catalog, prompts, properties=properties) - except BadRequest as e: - if "already exists" in str(e): - logger.warning(f"Catalog '{self._ucx_catalog}' already exists. Skipping.") - return - raise + self._create_catalog_validate(self._ucx_catalog, prompts, properties=properties) def create_all_catalogs_schemas(self, prompts: Prompts) -> None: candidate_catalogs, candidate_schemas = self._get_missing_catalogs_schemas() for candidate_catalog in candidate_catalogs: - try: - self._create_catalog_validate(candidate_catalog, prompts, properties=None) - except BadRequest as e: - if "already exists" in str(e): - logger.warning(f"Catalog '{candidate_catalog}' already exists. Skipping.") - continue + self._create_catalog_validate(candidate_catalog, prompts, properties=None) for candidate_catalog, schemas in candidate_schemas.items(): for candidate_schema in schemas: try: self._create_schema(candidate_catalog, candidate_schema) except BadRequest as e: if "already exists" in str(e): - logger.warning( - f"Schema '{candidate_schema}' in catalog '{candidate_catalog}' already exists. Skipping." - ) + logger.warning(f"Skipping already existing schema: {candidate_catalog}.{candidate_schema}") continue self._apply_from_legacy_table_acls() self._update_principal_acl() @@ -141,19 +127,28 @@ def _get_database_source_target_mapping(self) -> dict[str, list[SchemaInfo]]: src_trg_schema_mapping[table_mapping.src_schema].append(schema) return src_trg_schema_mapping - def _create_catalog_validate(self, catalog: str, prompts: Prompts, *, properties: dict[str, str] | None) -> None: - logger.info(f"Validating UC catalog: {catalog}") + def _create_catalog_validate( + self, catalog_name: str, prompts: Prompts, *, properties: dict[str, str] | None + ) -> None: + try: + catalog = self._ws.catalogs.get(catalog_name) + except NotFound: + catalog = None + if catalog: + logger.warning(f"Skipping already existing catalog: {catalog_name}") + return + logger.info(f"Validating UC catalog: {catalog_name}") attempts = 3 while True: catalog_storage = prompts.question( - f"Please provide storage location url for catalog: {catalog}", default="metastore" + f"Please provide storage location url for catalog: {catalog_name}", default="metastore" ) if self._validate_location(catalog_storage): break attempts -= 1 if attempts == 0: - raise NotFound(f"Failed to validate location for {catalog} catalog") - self._create_catalog(catalog, catalog_storage, properties=properties) + raise NotFound(f"Failed to validate location for catalog: {catalog_name}") + self._create_catalog(catalog_name, catalog_storage, properties=properties) def _list_existing(self) -> tuple[set[str], dict[str, set[str]]]: """generate a list of existing UC catalogs and schema.""" @@ -203,19 +198,18 @@ def _get_missing_catalogs_schemas(self) -> tuple[set[str], dict[str, set[str]]]: target_schemas[catalog] = target_schemas[catalog] - schemas return target_catalogs, target_schemas - def _validate_location(self, location: str): + def _validate_location(self, location: str) -> bool: if location == "metastore": return True try: PurePath(location) except ValueError: - logger.error(f"Invalid location path {location}") + logger.error(f"Invalid location path: {location}") return False for external_location in self._external_locations: - if location == external_location.url: - return True - if external_location.url is not None and fnmatch.fnmatch(location, external_location.url + '*'): + if external_location.url is not None and location.startswith(external_location.url): return True + logger.warning(f"No matching external location found for: {location}") return False def _create_catalog(self, catalog: str, catalog_storage: str, *, properties: dict[str, str] | None) -> None: diff --git a/tests/unit/hive_metastore/test_catalog_schema.py b/tests/unit/hive_metastore/test_catalog_schema.py index 100c130519..f8596600ab 100644 --- a/tests/unit/hive_metastore/test_catalog_schema.py +++ b/tests/unit/hive_metastore/test_catalog_schema.py @@ -17,13 +17,23 @@ def prepare_test(ws, backend: MockBackend | None = None) -> CatalogSchema: ws.catalogs.list.return_value = [CatalogInfo(name="catalog1")] + def get_catalog(catalog_name: str) -> CatalogInfo: + if catalog_name == "catalog1": + return CatalogInfo(name="catalog1") + raise NotFound(f"Catalog: {catalog_name}") + + ws.catalogs.get.side_effect = get_catalog + def raise_catalog_exists(catalog: str, *_, **__) -> None: if catalog == "catalog1": raise BadRequest("Catalog 'catalog1' already exists") ws.catalogs.create.side_effect = raise_catalog_exists ws.schemas.list.return_value = [SchemaInfo(name="schema1")] - ws.external_locations.list.return_value = [ExternalLocationInfo(url="s3://foo/bar")] + ws.external_locations.list.return_value = [ + ExternalLocationInfo(url="s3://foo/bar"), + ExternalLocationInfo(url="abfss://container@storageaccount.dfs.core.windows.net"), + ] if backend is None: backend = MockBackend() installation = MockInstallation( @@ -133,8 +143,8 @@ def test_create_ucx_catalog_creates_ucx_catalog() -> None: def test_create_ucx_catalog_skips_when_ucx_catalogs_exists(caplog) -> None: ws = create_autospec(WorkspaceClient) - mock_prompts = MockPrompts({"Please provide storage location url for catalog: ucx": "metastore"}) catalog_schema = prepare_test(ws) + ws.catalogs.get.side_effect = lambda catalog_name: CatalogInfo(name=catalog_name) def raise_catalog_exists(catalog: str, *_, **__) -> None: if catalog == "ucx": @@ -143,12 +153,20 @@ def raise_catalog_exists(catalog: str, *_, **__) -> None: ws.catalogs.create.side_effect = raise_catalog_exists with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore.catalog_schema"): - catalog_schema.create_ucx_catalog(mock_prompts) - assert "Catalog 'ucx' already exists. Skipping." in caplog.text + catalog_schema.create_ucx_catalog(MockPrompts({})) + assert "Skipping already existing catalog: ucx" in caplog.text -@pytest.mark.parametrize("location", ["s3://foo/bar", "s3://foo/bar/test", "s3://foo/bar/test/baz"]) -def test_create_all_catalogs_schemas_creates_catalogs(location: str): +@pytest.mark.parametrize( + "location", + [ + "s3://foo/bar", + "s3://foo/bar/test", + "s3://foo/bar/test/baz", + "abfss://container@storageaccount.dfs.core.windows.net", + ], +) +def test_create_all_catalogs_schemas_creates_catalogs(location: str) -> None: """Catalog 2-4 should be created; catalog 1 already exists.""" ws = create_autospec(WorkspaceClient) mock_prompts = MockPrompts({"Please provide storage location url for catalog: *": location}) @@ -164,6 +182,28 @@ def test_create_all_catalogs_schemas_creates_catalogs(location: str): ws.catalogs.create.assert_has_calls(calls, any_order=True) +def test_create_all_catalogs_schemas_creates_catalogs_with_different_locations() -> None: + """Catalog 2-4 should be created; catalog 1 already exists.""" + ws = create_autospec(WorkspaceClient) + mock_prompts = MockPrompts( + { + "Please provide storage location url for catalog: catalog2": "s3://foo/bar", + "Please provide storage location url for catalog: catalog3": "s3://foo/bar/test", + "Please provide storage location url for catalog: catalog4": "s3://foo/bar/test/baz", + } + ) + + catalog_schema = prepare_test(ws) + catalog_schema.create_all_catalogs_schemas(mock_prompts) + + calls = [ + call("catalog2", storage_root="s3://foo/bar", comment="Created by UCX", properties=None), + call("catalog3", storage_root="s3://foo/bar/test", comment="Created by UCX", properties=None), + call("catalog4", storage_root="s3://foo/bar/test/baz", comment="Created by UCX", properties=None), + ] + ws.catalogs.create.assert_has_calls(calls, any_order=True) + + @pytest.mark.parametrize( "catalog,schema", [("catalog1", "schema2"), ("catalog1", "schema3"), ("catalog2", "schema2"), ("catalog3", "schema3")], diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 571b3a4c33..ffa4cf78e8 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -804,8 +804,8 @@ def test_create_catalogs_schemas_handles_existing(ws, caplog) -> None: create_catalogs_schemas(ws, prompts, ctx=WorkspaceContext(ws)) ws.catalogs.list.assert_called_once() - assert "Catalog 'test' already exists. Skipping." in caplog.messages - assert "Schema 'test' in catalog 'test' already exists. Skipping." in caplog.messages + assert "Skipping already existing catalog: test" in caplog.messages + assert "Skipping already existing schema: test.test" in caplog.messages def test_cluster_remap(ws, caplog): @@ -887,12 +887,12 @@ def test_assign_metastore_logs_account_id_and_assigns_metastore(caplog, acc_clie acc_client.metastore_assignments.create.assert_called_once() -def test_create_ucx_catalog_calls_create_catalog(ws) -> None: +def test_create_ucx_catalog_calls_get_catalog(ws) -> None: prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"}) create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws)) - ws.catalogs.create.assert_called_once() + ws.catalogs.get.assert_called_once() def test_create_ucx_catalog_creates_history_schema_and_table(ws, mock_backend) -> None: