Skip to content

Commit

Permalink
Improve creating UC catalogs (#2898)
Browse files Browse the repository at this point in the history
## Changes
Ran into a couple improvements when manually testing #2744:
- We request the catalog location also when the catalog already exists.
Solved by checking if a catalog exists before requesting the storage
location
- Multiple loops over the storage locations are not supported as the
iterator is empty after first loop. Solved by emptying the external
locations in a list.
- More consistent:
   - Logging
   - Matching storage locations

### Linked issues

Resolves #2879

### Functionality

- [x] modified existing command: `databricks labs ucx
create-ucx-catalog/create-catalogs-schemas`

### Tests

- [x] manually tested
- [x] added unit tests
  • Loading branch information
JCZuurmond authored Oct 9, 2024
1 parent 3983fbe commit bb3768a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 38 deletions.
50 changes: 22 additions & 28 deletions src/databricks/labs/ucx/hive_metastore/catalog_schema.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import collections
import logging
from dataclasses import replace
import fnmatch
from pathlib import PurePath

from databricks.labs.blueprint.tui import Prompts
Expand Down Expand Up @@ -30,7 +29,7 @@ def __init__(
):
self._ws = ws
self._table_mapping = table_mapping
self._external_locations = self._ws.external_locations.list()
self._external_locations = list(self._ws.external_locations.list())
self._principal_grants = principal_grants
self._backend = sql_backend
self._hive_grants_crawler = grants_crawler
Expand All @@ -45,32 +44,19 @@ def create_ucx_catalog(self, prompts: Prompts, *, properties: dict[str, str] | N
properties : (dict[str, str] | None), default None
The properties to pass to the catalog. If None, no properties are passed.
"""
try:
self._create_catalog_validate(self._ucx_catalog, prompts, properties=properties)
except BadRequest as e:
if "already exists" in str(e):
logger.warning(f"Catalog '{self._ucx_catalog}' already exists. Skipping.")
return
raise
self._create_catalog_validate(self._ucx_catalog, prompts, properties=properties)

def create_all_catalogs_schemas(self, prompts: Prompts) -> None:
candidate_catalogs, candidate_schemas = self._get_missing_catalogs_schemas()
for candidate_catalog in candidate_catalogs:
try:
self._create_catalog_validate(candidate_catalog, prompts, properties=None)
except BadRequest as e:
if "already exists" in str(e):
logger.warning(f"Catalog '{candidate_catalog}' already exists. Skipping.")
continue
self._create_catalog_validate(candidate_catalog, prompts, properties=None)
for candidate_catalog, schemas in candidate_schemas.items():
for candidate_schema in schemas:
try:
self._create_schema(candidate_catalog, candidate_schema)
except BadRequest as e:
if "already exists" in str(e):
logger.warning(
f"Schema '{candidate_schema}' in catalog '{candidate_catalog}' already exists. Skipping."
)
logger.warning(f"Skipping already existing schema: {candidate_catalog}.{candidate_schema}")
continue
self._apply_from_legacy_table_acls()
self._update_principal_acl()
Expand Down Expand Up @@ -141,19 +127,28 @@ def _get_database_source_target_mapping(self) -> dict[str, list[SchemaInfo]]:
src_trg_schema_mapping[table_mapping.src_schema].append(schema)
return src_trg_schema_mapping

def _create_catalog_validate(self, catalog: str, prompts: Prompts, *, properties: dict[str, str] | None) -> None:
logger.info(f"Validating UC catalog: {catalog}")
def _create_catalog_validate(
self, catalog_name: str, prompts: Prompts, *, properties: dict[str, str] | None
) -> None:
try:
catalog = self._ws.catalogs.get(catalog_name)
except NotFound:
catalog = None
if catalog:
logger.warning(f"Skipping already existing catalog: {catalog_name}")
return
logger.info(f"Validating UC catalog: {catalog_name}")
attempts = 3
while True:
catalog_storage = prompts.question(
f"Please provide storage location url for catalog: {catalog}", default="metastore"
f"Please provide storage location url for catalog: {catalog_name}", default="metastore"
)
if self._validate_location(catalog_storage):
break
attempts -= 1
if attempts == 0:
raise NotFound(f"Failed to validate location for {catalog} catalog")
self._create_catalog(catalog, catalog_storage, properties=properties)
raise NotFound(f"Failed to validate location for catalog: {catalog_name}")
self._create_catalog(catalog_name, catalog_storage, properties=properties)

def _list_existing(self) -> tuple[set[str], dict[str, set[str]]]:
"""generate a list of existing UC catalogs and schema."""
Expand Down Expand Up @@ -203,19 +198,18 @@ def _get_missing_catalogs_schemas(self) -> tuple[set[str], dict[str, set[str]]]:
target_schemas[catalog] = target_schemas[catalog] - schemas
return target_catalogs, target_schemas

def _validate_location(self, location: str):
def _validate_location(self, location: str) -> bool:
if location == "metastore":
return True
try:
PurePath(location)
except ValueError:
logger.error(f"Invalid location path {location}")
logger.error(f"Invalid location path: {location}")
return False
for external_location in self._external_locations:
if location == external_location.url:
return True
if external_location.url is not None and fnmatch.fnmatch(location, external_location.url + '*'):
if external_location.url is not None and location.startswith(external_location.url):
return True
logger.warning(f"No matching external location found for: {location}")
return False

def _create_catalog(self, catalog: str, catalog_storage: str, *, properties: dict[str, str] | None) -> None:
Expand Down
52 changes: 46 additions & 6 deletions tests/unit/hive_metastore/test_catalog_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,23 @@
def prepare_test(ws, backend: MockBackend | None = None) -> CatalogSchema:
ws.catalogs.list.return_value = [CatalogInfo(name="catalog1")]

def get_catalog(catalog_name: str) -> CatalogInfo:
if catalog_name == "catalog1":
return CatalogInfo(name="catalog1")
raise NotFound(f"Catalog: {catalog_name}")

ws.catalogs.get.side_effect = get_catalog

def raise_catalog_exists(catalog: str, *_, **__) -> None:
if catalog == "catalog1":
raise BadRequest("Catalog 'catalog1' already exists")

ws.catalogs.create.side_effect = raise_catalog_exists
ws.schemas.list.return_value = [SchemaInfo(name="schema1")]
ws.external_locations.list.return_value = [ExternalLocationInfo(url="s3://foo/bar")]
ws.external_locations.list.return_value = [
ExternalLocationInfo(url="s3://foo/bar"),
ExternalLocationInfo(url="abfss://container@storageaccount.dfs.core.windows.net"),
]
if backend is None:
backend = MockBackend()
installation = MockInstallation(
Expand Down Expand Up @@ -133,8 +143,8 @@ def test_create_ucx_catalog_creates_ucx_catalog() -> None:

def test_create_ucx_catalog_skips_when_ucx_catalogs_exists(caplog) -> None:
ws = create_autospec(WorkspaceClient)
mock_prompts = MockPrompts({"Please provide storage location url for catalog: ucx": "metastore"})
catalog_schema = prepare_test(ws)
ws.catalogs.get.side_effect = lambda catalog_name: CatalogInfo(name=catalog_name)

def raise_catalog_exists(catalog: str, *_, **__) -> None:
if catalog == "ucx":
Expand All @@ -143,12 +153,20 @@ def raise_catalog_exists(catalog: str, *_, **__) -> None:
ws.catalogs.create.side_effect = raise_catalog_exists

with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore.catalog_schema"):
catalog_schema.create_ucx_catalog(mock_prompts)
assert "Catalog 'ucx' already exists. Skipping." in caplog.text
catalog_schema.create_ucx_catalog(MockPrompts({}))
assert "Skipping already existing catalog: ucx" in caplog.text


@pytest.mark.parametrize("location", ["s3://foo/bar", "s3://foo/bar/test", "s3://foo/bar/test/baz"])
def test_create_all_catalogs_schemas_creates_catalogs(location: str):
@pytest.mark.parametrize(
"location",
[
"s3://foo/bar",
"s3://foo/bar/test",
"s3://foo/bar/test/baz",
"abfss://container@storageaccount.dfs.core.windows.net",
],
)
def test_create_all_catalogs_schemas_creates_catalogs(location: str) -> None:
"""Catalog 2-4 should be created; catalog 1 already exists."""
ws = create_autospec(WorkspaceClient)
mock_prompts = MockPrompts({"Please provide storage location url for catalog: *": location})
Expand All @@ -164,6 +182,28 @@ def test_create_all_catalogs_schemas_creates_catalogs(location: str):
ws.catalogs.create.assert_has_calls(calls, any_order=True)


def test_create_all_catalogs_schemas_creates_catalogs_with_different_locations() -> None:
"""Catalog 2-4 should be created; catalog 1 already exists."""
ws = create_autospec(WorkspaceClient)
mock_prompts = MockPrompts(
{
"Please provide storage location url for catalog: catalog2": "s3://foo/bar",
"Please provide storage location url for catalog: catalog3": "s3://foo/bar/test",
"Please provide storage location url for catalog: catalog4": "s3://foo/bar/test/baz",
}
)

catalog_schema = prepare_test(ws)
catalog_schema.create_all_catalogs_schemas(mock_prompts)

calls = [
call("catalog2", storage_root="s3://foo/bar", comment="Created by UCX", properties=None),
call("catalog3", storage_root="s3://foo/bar/test", comment="Created by UCX", properties=None),
call("catalog4", storage_root="s3://foo/bar/test/baz", comment="Created by UCX", properties=None),
]
ws.catalogs.create.assert_has_calls(calls, any_order=True)


@pytest.mark.parametrize(
"catalog,schema",
[("catalog1", "schema2"), ("catalog1", "schema3"), ("catalog2", "schema2"), ("catalog3", "schema3")],
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,8 @@ def test_create_catalogs_schemas_handles_existing(ws, caplog) -> None:
create_catalogs_schemas(ws, prompts, ctx=WorkspaceContext(ws))
ws.catalogs.list.assert_called_once()

assert "Catalog 'test' already exists. Skipping." in caplog.messages
assert "Schema 'test' in catalog 'test' already exists. Skipping." in caplog.messages
assert "Skipping already existing catalog: test" in caplog.messages
assert "Skipping already existing schema: test.test" in caplog.messages


def test_cluster_remap(ws, caplog):
Expand Down Expand Up @@ -887,12 +887,12 @@ def test_assign_metastore_logs_account_id_and_assigns_metastore(caplog, acc_clie
acc_client.metastore_assignments.create.assert_called_once()


def test_create_ucx_catalog_calls_create_catalog(ws) -> None:
def test_create_ucx_catalog_calls_get_catalog(ws) -> None:
prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"})

create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws))

ws.catalogs.create.assert_called_once()
ws.catalogs.get.assert_called_once()


def test_create_ucx_catalog_creates_history_schema_and_table(ws, mock_backend) -> None:
Expand Down

0 comments on commit bb3768a

Please sign in to comment.