diff --git a/.gitignore b/.gitignore index 7639e5b67d..e84fbb6882 100644 --- a/.gitignore +++ b/.gitignore @@ -145,7 +145,7 @@ cython_debug/ # dev files and scratches dev/cleanup.py -Support - .databricks -.vscode \ No newline at end of file +.vscode + +.python-version \ No newline at end of file diff --git a/Makefile b/Makefile index bff45a446c..06590b6199 100644 --- a/Makefile +++ b/Makefile @@ -5,4 +5,7 @@ fmt: hatch run lint:fmt test: - hatch run unit:test \ No newline at end of file + hatch run unit:test + +test-cov: + hatch run unit:test-cov-report \ No newline at end of file diff --git a/docs/logic.md b/docs/logic.md new file mode 100644 index 0000000000..22fc52060c --- /dev/null +++ b/docs/logic.md @@ -0,0 +1,184 @@ +# Permissions migration logic and data structures + +On a very high-level, the permissions inventorization process is split into two steps: + +1. collect all existing permissions into a persistent storage. +2. apply the collected permissions to the target resources. + +The first step is performed by the `Crawler` and the second by the `Applier`. + +Crawler and applier are intrinsically connected to each other due to SerDe (serialization/deserialization) logic. + +We implement separate crawlers and applier for each supported resource type. + +Please note that `table ACLs` logic is currently handled separately from the logic described in this document. + +## Logical objects and relevant APIs + + +### Group level properties (uses SCIM API) + +- [x] Entitlements (One of `workspace-access`, `databricks-sql-access`, `allow-cluster-create`, `allow-instance-pool-create`) +- [x] Roles (AWS only) + +These are workspace-level properties that are not associated with any specific resource. + +Additional info: + +- object ID: `group_id` +- listing method: `ws.groups.list` +- get method: `ws.groups.get(group_id)` +- put method: `ws.groups.patch(group_id)` + +### Compute infrastructure (uses Permissions API) + +- [x] Clusters +- [x] Cluster policies +- [x] Instance pools +- [x] SQL warehouses + +These are compute infrastructure resources that are associated with a specific workspace. + +Additional info: + +- object ID: `cluster_id`, `policy_id`, `instance_pool_id`, `id` (SQL warehouses) +- listing method: `ws.clusters.list`, `ws.cluster_policies.list`, `ws.instance_pools.list`, `ws.warehouses.list` +- get method: `ws.permissions.get(object_id, object_type)` +- put method: `ws.permissions.update(object_id, object_type)` +- get response object type: `databricks.sdk.service.iam.ObjectPermissions` + + +### Workflows (uses Permissions API) + +- [x] Jobs +- [x] Delta Live Tables + +These are workflow resources that are associated with a specific workspace. + +Additional info: + +- object ID: `job_id`, `pipeline_id` +- listing method: `ws.jobs.list`, `ws.pipelines.list` +- get method: `ws.permissions.get(object_id, object_type)` +- put method: `ws.permissions.update(object_id, object_type)` +- get response object type: `databricks.sdk.service.iam.ObjectPermissions` + +### ML (uses Permissions API) + +- [x] MLflow experiments +- [x] MLflow models + +These are ML resources that are associated with a specific workspace. + +Additional info: + +- object ID: `experiment_id`, `id` (models) +- listing method: custom listing +- get method: `ws.permissions.get(object_id, object_type)` +- put method: `ws.permissions.update(object_id, object_type)` +- get response object type: `databricks.sdk.service.iam.ObjectPermissions` + + +### SQL (uses SQL Permissions API) + +- [x] Alerts +- [x] Dashboards +- [x] Queries + +These are SQL resources that are associated with a specific workspace. + +Additional info: + +- object ID: `id` +- listing method: `ws.alerts.list`, `ws.dashboards.list`, `ws.queries.list` +- get method: `ws.dbsql_permissions.get` +- put method: `ws.dbsql_permissions.set` +- get response object type: `databricks.sdk.service.sql.GetResponse` +- Note that API has no support for UPDATE operation, only PUT (overwrite) is supported. + +### Security (uses Permissions API) + +- [x] Tokens +- [x] Passwords + +These are security resources that are associated with a specific workspace. + +Additional info: + +- object ID: `tokens` (static value), `passwords` (static value) +- listing method: N/A +- get method: `ws.permissions.get(object_id, object_type)` +- put method: `ws.permissions.update(object_id, object_type)` +- get response object type: `databricks.sdk.service.iam.ObjectPermissions` + +### Workspace (uses Permissions API) + +- [x] Notebooks +- [x] Directories +- [x] Repos +- [x] Files + +These are workspace resources that are associated with a specific workspace. + +Additional info: + +- object ID: `object_id` +- listing method: custom listing +- get method: `ws.permissions.get(object_id, object_type)` +- put method: `ws.permissions.update(object_id, object_type)` +- get response object type: `databricks.sdk.service.iam.ObjectPermissions` + +### Secrets (uses Secrets API) + +- [x] Secrets + +These are secrets resources that are associated with a specific workspace. + +Additional info: + +- object ID: `scope_name` +- listing method: `ws.secrets.list_scopes()` +- get method: `ws.secrets.list_acls(scope_name)` +- put method: `ws.secrets.put_acl` + + +## Crawler and serialization logic + +Crawlers are expected to return a list of callable functions that will be later used to get the permissions. +Each of these functions shall return a `PermissionInventoryItem` that should be serializable into a Delta Table. +The permission payload differs between different crawlers, therefore each crawler should implement a serialization +method. + +## Applier and deserialization logic + +Appliers are expected to accept a list of `PermissionInventoryItem` and generate a list of callables that will apply the +given permissions. +Each applier should implement a deserialization method that will convert the raw payload into a typed one. +Each permission item should have a crawler type associated with it, so that the applier can use the correct +deserialization method. + +## Relevance identification + +Since we save all objects into the permission table, we need to filter out the objects that are not relevant to the +current migration. +We do this inside the `applier`, by returning a `noop` callable if the object is not relevant to the current migration. + +## Crawling the permissions + +To crawl the permissions, we use the following logic: +1. Go through the list of all crawlers. +2. Get the list of all objects of the given type. +3. For each object, generate a callable that will return a `PermissionInventoryItem`. +4. Execute the callables in parallel +5. Collect the results into a list of `PermissionInventoryItem`. +6. Save the list of `PermissionInventoryItem` into a Delta Table. + +## Applying the permissions + +To apply the permissions, we use the following logic: + +1. Read the Delta Table with raw permissions. +2. Map the items to the relevant `support` object. If no relevant `support` object is found, an exception is raised. +3. Deserialize the items using the relevant applier. +4. Generate a list of callables that will apply the permissions. +5. Execute the callables in parallel. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index f7db9a0dff..8a9b8e8173 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -173,8 +173,12 @@ known-first-party = ["databricks.labs.ucx"] ban-relative-imports = "all" [tool.ruff.per-file-ignores] -# Tests can use magic values, assertions, and relative imports -"tests/**/*" = ["PLR2004", "S101", "TID252"] + +"tests/**/*" = [ + "PLR2004", "S101", "TID252", # tests can use magic values, assertions, and relative imports + "ARG001" # tests may not use the provided fixtures +] + "src/databricks/labs/ucx/providers/mixins/redash.py" = ["A002", "A003", "N815"] [tool.coverage.run] diff --git a/src/databricks/labs/ucx/inventory/inventorizer.py b/src/databricks/labs/ucx/inventory/inventorizer.py deleted file mode 100644 index 6c7f640713..0000000000 --- a/src/databricks/labs/ucx/inventory/inventorizer.py +++ /dev/null @@ -1,438 +0,0 @@ -import json -import logging -from abc import ABC, abstractmethod -from collections.abc import Callable, Iterator -from functools import partial -from typing import Generic, TypeVar - -from databricks.sdk import WorkspaceClient -from databricks.sdk.core import DatabricksError -from databricks.sdk.service.iam import AccessControlResponse, Group, ObjectPermissions -from databricks.sdk.service.ml import ModelDatabricks -from databricks.sdk.service.workspace import ( - AclItem, - ObjectInfo, - ObjectType, - SecretScope, -) -from ratelimit import limits, sleep_and_retry - -from databricks.labs.ucx.inventory.listing import WorkspaceListing -from databricks.labs.ucx.inventory.types import ( - AclItemsContainer, - LogicalObjectType, - PermissionsInventoryItem, - RequestObjectType, -) -from databricks.labs.ucx.providers.groups_info import GroupMigrationState -from databricks.labs.ucx.utils import ProgressReporter, ThreadedExecution - -InventoryObject = TypeVar("InventoryObject") -logger = logging.getLogger(__name__) - - -class BaseInventorizer(ABC, Generic[InventoryObject]): - @property - @abstractmethod - def logical_object_types(self) -> list[LogicalObjectType]: - """Logical object types that this inventorizer can handle""" - - @abstractmethod - def preload(self): - """Any preloading activities should happen here""" - - @abstractmethod - def inventorize(self) -> list[PermissionsInventoryItem]: - """Any inventorization activities should happen here""" - - -class StandardInventorizer(BaseInventorizer[InventoryObject]): - """ - Standard means that it can collect using the default listing/permissions function without any additional logic. - """ - - @property - def logical_object_types(self) -> list[LogicalObjectType]: - return [self._logical_object_type] - - def __init__( - self, - ws: WorkspaceClient, - logical_object_type: LogicalObjectType, - request_object_type: RequestObjectType, - listing_function: Callable[..., Iterator[InventoryObject]], - id_attribute: str, - permissions_function: Callable[..., ObjectPermissions] | None = None, - ): - self._ws = ws - self._logical_object_type = logical_object_type - self._request_object_type = request_object_type - self._listing_function = listing_function - self._id_attribute = id_attribute - self._permissions_function = permissions_function if permissions_function else self._safe_get_permissions - self._objects: list[InventoryObject] = [] - - @sleep_and_retry - @limits(calls=100, period=1) - def _get_permissions(self, request_object_type: RequestObjectType, request_object_id: str): - return self._ws.permissions.get(request_object_type=request_object_type, request_object_id=request_object_id) - - def _safe_get_permissions(self, request_object_type: RequestObjectType, object_id: str) -> ObjectPermissions | None: - try: - permissions = self._get_permissions(request_object_type, object_id) - return permissions - except DatabricksError as e: - if e.error_code in ["RESOURCE_DOES_NOT_EXIST", "RESOURCE_NOT_FOUND", "PERMISSION_DENIED"]: - logger.warning(f"Could not get permissions for {request_object_type} {object_id} due to {e.error_code}") - return None - else: - raise e - - @property - def logical_object_type(self) -> LogicalObjectType: - return self._logical_object_type - - def preload(self): - logger.info(f"Listing objects with type {self._request_object_type}...") - self._objects = list(self._listing_function()) - logger.info(f"Object metadata prepared for {len(self._objects)} objects.") - - def _process_single_object(self, _object: InventoryObject) -> PermissionsInventoryItem | None: - object_id = str(getattr(_object, self._id_attribute)) - permissions = self._permissions_function(self._request_object_type, object_id) - if permissions: - inventory_item = PermissionsInventoryItem( - object_id=object_id, - logical_object_type=self._logical_object_type, - request_object_type=self._request_object_type, - raw_object_permissions=json.dumps(permissions.as_dict()), - ) - return inventory_item - - def inventorize(self): - logger.info(f"Fetching permissions for {len(self._objects)} objects...") - - executables = [partial(self._process_single_object, _object) for _object in self._objects] - threaded_execution = ThreadedExecution[PermissionsInventoryItem](executables) - collected = [item for item in threaded_execution.run() if item is not None] - logger.info(f"Permissions fetched for {len(collected)} objects of type {self._request_object_type}") - return collected - - -class TokensAndPasswordsInventorizer(BaseInventorizer[InventoryObject]): - @property - def logical_object_types(self) -> list[LogicalObjectType]: - return [LogicalObjectType.TOKEN, LogicalObjectType.PASSWORD] - - def __init__(self, ws: WorkspaceClient): - self._ws = ws - self._tokens_acl = [] - self._passwords_acl = [] - - def _preload_tokens(self): - try: - # TODO: rewrite with self._ws.token_management.get_token_permissions().access_control_list - return self._ws.api_client.do("GET", "/api/2.0/preview/permissions/authorization/tokens").get( - "access_control_list", [] - ) - except DatabricksError as e: - logger.warning("Cannot load token permissions due to error:") - logger.warning(e) - return [] - - def _preload_passwords(self): - try: - # TODO: rewrite with return self._ws.users.get_password_permissions().access_control_list - return self._ws.api_client.do("GET", "/api/2.0/preview/permissions/authorization/passwords").get( - "access_control_list", [] - ) - except DatabricksError as e: - logger.error("Cannot load password permissions due to error:") - logger.error(e) - return [] - - def preload(self): - self._tokens_acl = [AccessControlResponse.from_dict(acl) for acl in self._preload_tokens()] - self._passwords_acl = [AccessControlResponse.from_dict(acl) for acl in self._preload_passwords()] - - def inventorize(self) -> list[PermissionsInventoryItem]: - results = [] - - if self._passwords_acl: - results.append( - PermissionsInventoryItem( - object_id="passwords", - logical_object_type=LogicalObjectType.PASSWORD, - request_object_type=RequestObjectType.AUTHORIZATION, - raw_object_permissions=json.dumps( - ObjectPermissions( - object_id="passwords", object_type="authorization", access_control_list=self._passwords_acl - ).as_dict() - ), - ) - ) - - if self._tokens_acl: - results.append( - PermissionsInventoryItem( - object_id="tokens", - logical_object_type=LogicalObjectType.TOKEN, - request_object_type=RequestObjectType.AUTHORIZATION, - raw_object_permissions=json.dumps( - ObjectPermissions( - object_id="tokens", object_type="authorization", access_control_list=self._tokens_acl - ).as_dict() - ), - ) - ) - return results - - -class SecretScopeInventorizer(BaseInventorizer[InventoryObject]): - @property - def logical_object_types(self) -> list[LogicalObjectType]: - return [LogicalObjectType.SECRET_SCOPE] - - def __init__(self, ws: WorkspaceClient): - self._ws = ws - self._scopes = ws.secrets.list_scopes() - - def _get_acls_for_scope(self, scope: SecretScope) -> Iterator[AclItem]: - return self._ws.secrets.list_acls(scope.name) - - def _prepare_permissions_inventory_item(self, scope: SecretScope) -> PermissionsInventoryItem: - acls = self._get_acls_for_scope(scope) - acls_container = AclItemsContainer.from_sdk(list(acls)) - - return PermissionsInventoryItem( - object_id=scope.name, - logical_object_type=LogicalObjectType.SECRET_SCOPE, - request_object_type=None, - raw_object_permissions=json.dumps(acls_container.as_dict()), - ) - - def inventorize(self) -> list[PermissionsInventoryItem]: - executables = [partial(self._prepare_permissions_inventory_item, scope) for scope in self._scopes] - results = ThreadedExecution[PermissionsInventoryItem](executables).run() - logger.info(f"Permissions fetched for {len(results)} objects of type {LogicalObjectType.SECRET_SCOPE}") - return results - - def preload(self): - pass - - -class WorkspaceInventorizer(BaseInventorizer[InventoryObject]): - @property - def logical_object_types(self) -> list[LogicalObjectType]: - return [LogicalObjectType.NOTEBOOK, LogicalObjectType.DIRECTORY, LogicalObjectType.REPO, LogicalObjectType.FILE] - - def __init__(self, ws: WorkspaceClient, num_threads=20, start_path: str | None = "/"): - self._ws = ws - self.listing = WorkspaceListing( - ws, - num_threads=num_threads, - with_directories=False, - ) - self._start_path = start_path - - def preload(self): - pass - - @staticmethod - def __convert_object_type_to_request_type(_object: ObjectInfo) -> RequestObjectType | None: - match _object.object_type: - case ObjectType.NOTEBOOK: - return RequestObjectType.NOTEBOOKS - case ObjectType.DIRECTORY: - return RequestObjectType.DIRECTORIES - case ObjectType.LIBRARY: - return None - case ObjectType.REPO: - return RequestObjectType.REPOS - case ObjectType.FILE: - return RequestObjectType.FILES - # silent handler for experiments - they'll be inventorized by the experiments manager - case None: - return None - - @staticmethod - def __convert_request_object_type_to_logical_type(request_object_type: RequestObjectType) -> LogicalObjectType: - match request_object_type: - case RequestObjectType.NOTEBOOKS: - return LogicalObjectType.NOTEBOOK - case RequestObjectType.DIRECTORIES: - return LogicalObjectType.DIRECTORY - case RequestObjectType.REPOS: - return LogicalObjectType.REPO - case RequestObjectType.FILES: - return LogicalObjectType.FILE - - @sleep_and_retry - @limits(calls=100, period=1) - def _get_permissions(self, request_object_type: RequestObjectType, request_object_id: str): - return self._ws.permissions.get(request_object_type=request_object_type, request_object_id=request_object_id) - - def _convert_result_to_permission_item(self, _object: ObjectInfo) -> PermissionsInventoryItem | None: - request_object_type = self.__convert_object_type_to_request_type(_object) - if not request_object_type: - return - else: - try: - permissions = self._get_permissions( - request_object_type=request_object_type, request_object_id=_object.object_id - ) - except DatabricksError as e: - if e.error_code in ["PERMISSION_DENIED", "RESOURCE_NOT_FOUND"]: - logger.warning(f"Cannot load permissions for {_object.path} due to error {e.error_code}") - return - else: - raise e - - if permissions: - inventory_item = PermissionsInventoryItem( - object_id=str(_object.object_id), - logical_object_type=self.__convert_request_object_type_to_logical_type(request_object_type), - request_object_type=request_object_type, - raw_object_permissions=json.dumps(permissions.as_dict()), - ) - return inventory_item - - def inventorize(self) -> list[PermissionsInventoryItem]: - self.listing.walk(self._start_path) - executables = [partial(self._convert_result_to_permission_item, _object) for _object in self.listing.results] - results = ThreadedExecution[PermissionsInventoryItem | None]( - executables, - progress_reporter=ProgressReporter( - len(executables), "Fetching permissions for workspace objects - processed: " - ), - ).run() - results = [result for result in results if result] # empty filter - logger.info(f"Permissions fetched for {len(results)} workspace objects") - return results - - -class RolesAndEntitlementsInventorizer(BaseInventorizer[InventoryObject]): - @property - def logical_object_types(self) -> list[LogicalObjectType]: - return [LogicalObjectType.ROLES, LogicalObjectType.ENTITLEMENTS] - - def __init__(self, ws: WorkspaceClient, migration_state: GroupMigrationState): - self._ws = ws - self._migration_state = migration_state - self._group_info: list[Group] = [] - - def preload(self): - logger.info("Please note that group roles and entitlements will be ONLY inventorized for migration groups") - self._group_info: list[Group] = [ - # TODO: why do we load group twice from platform? this really looks unnecessary - self._ws.groups.get(id=g.workspace.id) - for g in self._migration_state.groups - ] - logger.info("Group roles and entitlements preload completed") - - def inventorize(self) -> list[PermissionsInventoryItem]: - _items = [] - - for group in self._group_info: - # add safe-getters - group_roles = group.roles if group.roles else [] - group_entitlements = group.entitlements if group.entitlements else [] - - roles = [r.as_dict() for r in group_roles] - entitlements = [e.as_dict() for e in group_entitlements] - - inventory_item = PermissionsInventoryItem( - object_id=group.display_name, - logical_object_type=LogicalObjectType.ROLES, - request_object_type=None, - raw_object_permissions=json.dumps({"roles": roles, "entitlements": entitlements}), - ) - _items.append(inventory_item) - - return _items - - -def models_listing(ws: WorkspaceClient): - def inner() -> Iterator[ModelDatabricks]: - for model in ws.model_registry.list_models(): - model_with_id = ws.model_registry.get_model(model.name).registered_model_databricks - yield model_with_id - - return inner - - -def experiments_listing(ws: WorkspaceClient): - def inner() -> Iterator[ModelDatabricks]: - for experiment in ws.experiments.list_experiments(): - nb_tag = [t for t in experiment.tags if t.key == "mlflow.experimentType" and t.value == "NOTEBOOK"] - if not nb_tag: - yield experiment - - return inner - - -class Inventorizers: - @staticmethod - def provide(ws: WorkspaceClient, migration_state: GroupMigrationState, num_threads: int, start_path: str): - return [ - RolesAndEntitlementsInventorizer(ws, migration_state), - TokensAndPasswordsInventorizer(ws), - StandardInventorizer( - ws, - logical_object_type=LogicalObjectType.CLUSTER, - request_object_type=RequestObjectType.CLUSTERS, - listing_function=ws.clusters.list, - id_attribute="cluster_id", - ), - StandardInventorizer( - ws, - logical_object_type=LogicalObjectType.INSTANCE_POOL, - request_object_type=RequestObjectType.INSTANCE_POOLS, - listing_function=ws.instance_pools.list, - id_attribute="instance_pool_id", - ), - StandardInventorizer( - ws, - logical_object_type=LogicalObjectType.CLUSTER_POLICY, - request_object_type=RequestObjectType.CLUSTER_POLICIES, - listing_function=ws.cluster_policies.list, - id_attribute="policy_id", - ), - StandardInventorizer( - ws, - logical_object_type=LogicalObjectType.PIPELINE, - request_object_type=RequestObjectType.PIPELINES, - listing_function=ws.pipelines.list_pipelines, - id_attribute="pipeline_id", - ), - StandardInventorizer( - ws, - logical_object_type=LogicalObjectType.JOB, - request_object_type=RequestObjectType.JOBS, - listing_function=ws.jobs.list, - id_attribute="job_id", - ), - StandardInventorizer( - ws, - logical_object_type=LogicalObjectType.EXPERIMENT, - request_object_type=RequestObjectType.EXPERIMENTS, - listing_function=experiments_listing(ws), - id_attribute="experiment_id", - ), - StandardInventorizer( - ws, - logical_object_type=LogicalObjectType.MODEL, - request_object_type=RequestObjectType.REGISTERED_MODELS, - listing_function=models_listing(ws), - id_attribute="id", - ), - StandardInventorizer( - ws, - logical_object_type=LogicalObjectType.WAREHOUSE, - request_object_type=RequestObjectType.SQL_WAREHOUSES, - listing_function=ws.warehouses.list, - id_attribute="id", - ), - SecretScopeInventorizer(ws), - WorkspaceInventorizer(ws, num_threads=num_threads, start_path=start_path), - ] diff --git a/src/databricks/labs/ucx/inventory/permissions.py b/src/databricks/labs/ucx/inventory/permissions.py index 4393a913e0..ba6e5a826d 100644 --- a/src/databricks/labs/ucx/inventory/permissions.py +++ b/src/databricks/labs/ucx/inventory/permissions.py @@ -1,366 +1,67 @@ -import json import logging -import random -import time -from copy import deepcopy -from dataclasses import dataclass -from functools import partial +from itertools import groupby from typing import Literal from databricks.sdk import WorkspaceClient -from databricks.sdk.service import workspace -from databricks.sdk.service.iam import AccessControlRequest, Group, ObjectPermissions -from databricks.sdk.service.workspace import AclItem as SdkAclItem -from ratelimit import limits, sleep_and_retry -from tenacity import retry, stop_after_attempt, wait_fixed, wait_random -from databricks.labs.ucx.inventory.inventorizer import BaseInventorizer from databricks.labs.ucx.inventory.permissions_inventory import ( PermissionsInventoryTable, ) -from databricks.labs.ucx.inventory.types import ( - AclItemsContainer, - LogicalObjectType, - PermissionsInventoryItem, - RequestObjectType, - RolesAndEntitlements, -) +from databricks.labs.ucx.inventory.types import PermissionsInventoryItem from databricks.labs.ucx.providers.groups_info import GroupMigrationState +from databricks.labs.ucx.support.impl import SupportsProvider from databricks.labs.ucx.utils import ThreadedExecution logger = logging.getLogger(__name__) -@dataclass -class PermissionRequestPayload: - logical_object_type: LogicalObjectType - request_object_type: RequestObjectType | None - object_id: str - access_control_list: list[AccessControlRequest] - - -@dataclass -class SecretsPermissionRequestPayload: - object_id: str - access_control_list: list[SdkAclItem] - - -@dataclass -class RolesAndEntitlementsRequestPayload: - payload: RolesAndEntitlements - group_id: str - - -AnyRequestPayload = PermissionRequestPayload | SecretsPermissionRequestPayload | RolesAndEntitlementsRequestPayload - - -# TODO: this class has too many @staticmethod and they must not be such. write a unit test for this logic. class PermissionManager: - def __init__(self, ws: WorkspaceClient, permissions_inventory: PermissionsInventoryTable): + def __init__( + self, ws: WorkspaceClient, permissions_inventory: PermissionsInventoryTable, supports_provider: SupportsProvider + ): self._ws = ws self._permissions_inventory = permissions_inventory - self._inventorizers = [] - - @property - def inventorizers(self) -> list[BaseInventorizer]: - return self._inventorizers - - def set_inventorizers(self, value: list[BaseInventorizer]): - self._inventorizers = value + self._supports_provider = supports_provider def inventorize_permissions(self): - for inventorizer in self.inventorizers: - logger.info(f"Inventorizing the permissions for objects of type(s) {inventorizer.logical_object_types}") - inventorizer.preload() - collected = inventorizer.inventorize() - if collected: - self._permissions_inventory.save(collected) - else: - logger.warning(f"No objects of type {inventorizer.logical_object_types} were found") - + logger.info("Inventorizing the permissions") + crawler_tasks = list(self._supports_provider.get_crawler_tasks()) + logger.info(f"Total crawler tasks: {len(crawler_tasks)}") + logger.info("Starting the permissions inventorization") + execution = ThreadedExecution[PermissionsInventoryItem | None](crawler_tasks) + results = execution.run() + items = [item for item in results if item is not None] + logger.info(f"Total inventorized items: {len(items)}") + self._permissions_inventory.save(items) logger.info("Permissions were inventorized and saved") - @staticmethod - def _prepare_request_for_permissions_api( - item: PermissionsInventoryItem, - migration_state: GroupMigrationState, - destination: Literal["backup", "account"], - ) -> PermissionRequestPayload: - _existing_permissions: ObjectPermissions = item.typed_object_permissions - _acl = _existing_permissions.access_control_list - acl_requests = [] - - for _item in _acl: - # TODO: we have a double iteration over migration_state.groups - # (also by migration_state.get_by_workspace_group_name). - # Has to be be fixed by iterating just on .groups - if _item.group_name in [g.workspace.display_name for g in migration_state.groups]: - migration_info = migration_state.get_by_workspace_group_name(_item.group_name) - assert migration_info is not None, f"Group {_item.group_name} is not in the migration groups provider" - destination_group: Group = getattr(migration_info, destination) - _item.group_name = destination_group.display_name - _reqs = [ - AccessControlRequest( - group_name=_item.group_name, - service_principal_name=_item.service_principal_name, - user_name=_item.user_name, - permission_level=p.permission_level, - ) - for p in _item.all_permissions - if not p.inherited - ] - acl_requests.extend(_reqs) - - return PermissionRequestPayload( - logical_object_type=item.logical_object_type, - request_object_type=item.request_object_type, - object_id=item.object_id, - access_control_list=acl_requests, - ) - - @staticmethod - def _prepare_permission_request_for_secrets_api( - item: PermissionsInventoryItem, - migration_state: GroupMigrationState, - destination: Literal["backup", "account"], - ) -> SecretsPermissionRequestPayload: - _existing_acl_container: AclItemsContainer = item.typed_object_permissions - _final_acls = [] - - logger.debug("Preparing the permissions for the secrets API") - - for _existing_acl in _existing_acl_container.acls: - _new_acl = deepcopy(_existing_acl) - - if _existing_acl.principal in [g.workspace.display_name for g in migration_state.groups]: - migration_info = migration_state.get_by_workspace_group_name(_existing_acl.principal) - assert ( - migration_info is not None - ), f"Group {_existing_acl.principal} is not in the migration groups provider" - destination_group: Group = getattr(migration_info, destination) - _new_acl.principal = destination_group.display_name - _final_acls.append(_new_acl) - - _typed_acl_container = AclItemsContainer(acls=_final_acls) - - return SecretsPermissionRequestPayload( - object_id=item.object_id, - access_control_list=_typed_acl_container.to_sdk(), - ) - - @staticmethod - def _prepare_request_for_roles_and_entitlements( - item: PermissionsInventoryItem, migration_state: GroupMigrationState, destination - ) -> RolesAndEntitlementsRequestPayload: - # TODO: potential BUG - why does item.object_id hold a group name and not ID? - migration_info = migration_state.get_by_workspace_group_name(item.object_id) - assert migration_info is not None, f"Group {item.object_id} is not in the migration groups provider" - destination_group: Group = getattr(migration_info, destination) - return RolesAndEntitlementsRequestPayload(payload=item.typed_object_permissions, group_id=destination_group.id) - - def _prepare_new_permission_request( - self, - item: PermissionsInventoryItem, - migration_state: GroupMigrationState, - destination: Literal["backup", "account"], - ) -> AnyRequestPayload: - if isinstance(item.request_object_type, RequestObjectType) and isinstance( - item.typed_object_permissions, ObjectPermissions - ): - return self._prepare_request_for_permissions_api(item, migration_state, destination) - elif item.logical_object_type == LogicalObjectType.SECRET_SCOPE: - return self._prepare_permission_request_for_secrets_api(item, migration_state, destination) - elif item.logical_object_type in [LogicalObjectType.ROLES, LogicalObjectType.ENTITLEMENTS]: - return self._prepare_request_for_roles_and_entitlements(item, migration_state, destination) - else: - logger.warning( - f"Unsupported permissions payload for object {item.object_id} " - f"with logical type {item.logical_object_type}" - ) - - @retry(wait=wait_fixed(1) + wait_random(0, 2), stop=stop_after_attempt(5)) - def _scope_permissions_applicator(self, request_payload: SecretsPermissionRequestPayload): - for _acl_item in request_payload.access_control_list: - # this request will create OR update the ACL for the given principal - # it means that the access_control_list should only keep records required for update - self._ws.secrets.put_acl( - scope=request_payload.object_id, principal=_acl_item.principal, permission=_acl_item.permission - ) - logger.debug(f"Applied new permissions for scope {request_payload.object_id}: {_acl_item}") - # TODO: add mixin to SDK - # in-flight check for the applied permissions - # the api might be inconsistent, therefore we need to check that the permissions were applied - for _ in range(3): - time.sleep(random.random() * 2) - applied_permission = self._secret_scope_permission( - scope_name=request_payload.object_id, group_name=_acl_item.principal - ) - assert applied_permission, f"Failed to apply permissions for {_acl_item.principal}" - assert applied_permission == _acl_item.permission, ( - f"Failed to apply permissions for {_acl_item.principal}. " - f"Expected: {_acl_item.permission}. Actual: {applied_permission}" - ) - - @sleep_and_retry - @limits(calls=30, period=1) - def _update_permissions( - self, - request_object_type: RequestObjectType, - request_object_id: str, - access_control_list: list[AccessControlRequest], - ): - return self._ws.permissions.update( - request_object_type=request_object_type, - request_object_id=request_object_id, - access_control_list=access_control_list, - ) - - def _standard_permissions_applicator(self, request_payload: PermissionRequestPayload): - self._update_permissions( - request_object_type=request_payload.request_object_type, - request_object_id=request_payload.object_id, - access_control_list=request_payload.access_control_list, - ) - - def applicator(self, request_payload: AnyRequestPayload): - if isinstance(request_payload, RolesAndEntitlementsRequestPayload): - self._apply_roles_and_entitlements( - group_id=request_payload.group_id, - roles=request_payload.payload.roles, - entitlements=request_payload.payload.entitlements, - ) - elif isinstance(request_payload, PermissionRequestPayload): - self._standard_permissions_applicator(request_payload) - elif isinstance(request_payload, SecretsPermissionRequestPayload): - self._scope_permissions_applicator(request_payload) - else: - logger.warning(f"Unsupported payload type {type(request_payload)}") - - @sleep_and_retry - @limits(calls=10, period=1) # assumption - def _apply_roles_and_entitlements(self, group_id: str, roles: list, entitlements: list): - # TODO: move to other places, this won't be in SDK - op_schema = "urn:ietf:params:scim:api:messages:2.0:PatchOp" - schemas = [] - operations = [] - - if entitlements: - schemas.append(op_schema) - entitlements_payload = { - "op": "add", - "path": "entitlements", - "value": entitlements, - } - operations.append(entitlements_payload) - - if roles: - schemas.append(op_schema) - roles_payload = { - "op": "add", - "path": "roles", - "value": roles, - } - operations.append(roles_payload) - - if operations: - request = { - "schemas": schemas, - "Operations": operations, - } - self._patch_workspace_group(group_id, request) - - def _patch_workspace_group(self, group_id: str, payload: dict): - # TODO: replace usages - # self.groups.patch(group_id, - # schemas=[PatchSchema.URN_IETF_PARAMS_SCIM_API_MESSAGES_2_0_PATCH_OP], - # operations=[ - # Patch(op=PatchOp.ADD, path='..', value='...') - # ]) - path = f"/api/2.0/preview/scim/v2/Groups/{group_id}" - self._ws.api_client.do("PATCH", path, data=json.dumps(payload)) - - def _apply_permissions_in_parallel( - self, - requests: list[AnyRequestPayload], - ): - executables = [partial(self.applicator, payload) for payload in requests] - execution = ThreadedExecution[None](executables) - execution.run() - def apply_group_permissions(self, migration_state: GroupMigrationState, destination: Literal["backup", "account"]): logger.info(f"Applying the permissions to {destination} groups") logger.info(f"Total groups to apply permissions: {len(migration_state.groups)}") - - permissions_on_source = self._permissions_inventory.load_for_groups( - groups=[g.workspace.display_name for g in migration_state.groups] - ) - permission_payloads: list[AnyRequestPayload] = [ - self._prepare_new_permission_request(item, migration_state, destination=destination) - for item in permissions_on_source - ] - logger.info(f"Applying {len(permission_payloads)} permissions") - - self._apply_permissions_in_parallel(requests=permission_payloads) - logger.info(f"All permissions were applied for {destination} groups") - - def verify( - self, migration_state: GroupMigrationState, target: Literal["backup", "account"], tuples: list[tuple[str, str]] - ): - for object_type, object_id in tuples: - if object_type == LogicalObjectType.SECRET_SCOPE: - self.verify_applied_scope_acls(object_id, migration_state, target) - else: - self.verify_applied_permissions(object_type, object_id, migration_state, target) - self.verify_roles_and_entitlements(migration_state, target) - - def verify_applied_permissions( - self, - object_type: str, - object_id: str, - migration_state: GroupMigrationState, - target: Literal["backup", "account"], - ): - op = self._ws.permissions.get(object_type, object_id) - for info in migration_state.groups: - src_permissions = sorted( - [_ for _ in op.access_control_list if _.group_name == info.workspace.display_name], - key=lambda p: p.group_name, - ) - dst_permissions = sorted( - [_ for _ in op.access_control_list if _.group_name == getattr(info, target).display_name], - key=lambda p: p.group_name, - ) - assert len(dst_permissions) == len( - src_permissions - ), f"Target permissions were not applied correctly for {object_type}/{object_id}" - assert [t.all_permissions for t in dst_permissions] == [ - s.all_permissions for s in src_permissions - ], f"Target permissions were not applied correctly for {object_type}/{object_id}" - - def verify_applied_scope_acls( - self, scope_name: str, migration_state: GroupMigrationState, target: Literal["backup", "account"] - ): - base_attr = "workspace" if target == "backup" else "backup" - for mi in migration_state.groups: - src_name = getattr(mi, base_attr).display_name - dst_name = getattr(mi, target).display_name - src_permission = self._secret_scope_permission(scope_name, src_name) - dst_permission = self._secret_scope_permission(scope_name, dst_name) - assert src_permission == dst_permission, "Scope ACLs were not applied correctly" - - def verify_roles_and_entitlements(self, migration_state: GroupMigrationState, target: Literal["backup", "account"]): - for el in migration_state.groups: - comparison_base = getattr(el, "workspace" if target == "backup" else "backup") - comparison_target = getattr(el, target) - - base_group_info = self._ws.groups.get(comparison_base.id) - target_group_info = self._ws.groups.get(comparison_target.id) - - assert base_group_info.roles == target_group_info.roles - assert base_group_info.entitlements == target_group_info.entitlements - - def _secret_scope_permission(self, scope_name: str, group_name: str) -> workspace.AclPermission | None: - for acl in self._ws.secrets.list_acls(scope=scope_name): - if acl.principal == group_name: - return acl.permission - return None + # list shall be sorted prior to using group by + items = sorted(self._permissions_inventory.load_all(), key=lambda i: i.support) + logger.info(f"Total inventorized items: {len(items)}") + applier_tasks = [] + supports_to_items = { + support: list(items_subset) for support, items_subset in groupby(items, key=lambda i: i.support) + } + + # we first check that all supports are valid. + for support in supports_to_items: + if support not in self._supports_provider.supports: + msg = f"Could not find support for {support}. Please check the inventory table." + raise ValueError(msg) + + for support, items_subset in supports_to_items.items(): + relevant_support = self._supports_provider.supports[support] + tasks_for_support = [ + relevant_support.get_apply_task(item, migration_state, destination) for item in items_subset + ] + logger.info(f"Total tasks for {support}: {len(tasks_for_support)}") + applier_tasks.extend(tasks_for_support) + + logger.info(f"Total applier tasks: {len(applier_tasks)}") + logger.info("Starting the permissions application") + execution = ThreadedExecution(applier_tasks) + execution.run() + logger.info("Permissions were applied") diff --git a/src/databricks/labs/ucx/inventory/permissions_inventory.py b/src/databricks/labs/ucx/inventory/permissions_inventory.py index 7cae56c4ef..4c6575514e 100644 --- a/src/databricks/labs/ucx/inventory/permissions_inventory.py +++ b/src/databricks/labs/ucx/inventory/permissions_inventory.py @@ -2,16 +2,10 @@ import pandas as pd from databricks.sdk import WorkspaceClient -from databricks.sdk.service.iam import ObjectPermissions from pyspark.sql import DataFrame from pyspark.sql.types import StringType, StructField, StructType -from databricks.labs.ucx.inventory.types import ( - AclItemsContainer, - LogicalObjectType, - PermissionsInventoryItem, - RequestObjectType, -) +from databricks.labs.ucx.inventory.types import PermissionsInventoryItem from databricks.labs.ucx.providers.spark import SparkMixin logger = logging.getLogger(__name__) @@ -24,11 +18,11 @@ def __init__(self, inventory_database: str, ws: WorkspaceClient): @property def _table_schema(self) -> StructType: + # TODO: generate the table schema automatically from the PermissionsInventoryItem class return StructType( [ StructField("object_id", StringType(), True), - StructField("logical_object_type", StringType(), True), - StructField("request_object_type", StringType(), True), + StructField("support", StringType(), True), StructField("raw_object_permissions", StringType(), True), ] ) @@ -56,29 +50,3 @@ def load_all(self) -> list[PermissionsInventoryItem]: logger.info("Successfully loaded the inventory table") return PermissionsInventoryItem.from_pandas(df) - - @staticmethod - def _is_item_relevant_to_groups(item: PermissionsInventoryItem, groups: list[str]) -> bool: - if item.logical_object_type == LogicalObjectType.SECRET_SCOPE: - _acl_container: AclItemsContainer = item.typed_object_permissions - return any(acl_item.principal in groups for acl_item in _acl_container.acls) - - elif isinstance(item.request_object_type, RequestObjectType): - _ops: ObjectPermissions = item.typed_object_permissions - mentioned_groups = [acl.group_name for acl in _ops.access_control_list] - return any(g in mentioned_groups for g in groups) - - elif item.logical_object_type in [LogicalObjectType.ENTITLEMENTS, LogicalObjectType.ROLES]: - return any(g in item.object_id for g in groups) - - else: - msg = f"Logical object type {item.logical_object_type} is not supported" - raise NotImplementedError(msg) - - def load_for_groups(self, groups: list[str]) -> list[PermissionsInventoryItem]: - logger.info(f"Loading inventory table {self._table} and filtering it to relevant groups") - df = self._df.toPandas() - all_items = PermissionsInventoryItem.from_pandas(df) - filtered_items = [item for item in all_items if self._is_item_relevant_to_groups(item, groups)] - logger.info(f"Found {len(filtered_items)} items relevant to the groups among {len(all_items)} items") - return filtered_items diff --git a/src/databricks/labs/ucx/inventory/types.py b/src/databricks/labs/ucx/inventory/types.py index 3d7b520053..cea7bfad72 100644 --- a/src/databricks/labs/ucx/inventory/types.py +++ b/src/databricks/labs/ucx/inventory/types.py @@ -1,13 +1,12 @@ -import json from dataclasses import asdict, dataclass +from typing import Literal import pandas as pd -from databricks.sdk.service.iam import ObjectPermissions -from databricks.sdk.service.workspace import AclItem as SdkAclItem -from databricks.sdk.service.workspace import AclPermission as SdkAclPermission from databricks.labs.ucx.generic import StrEnum +Destination = Literal["backup", "account"] + class RequestObjectType(StrEnum): AUTHORIZATION = "authorization" # tokens and passwords are here too! @@ -22,102 +21,18 @@ class RequestObjectType(StrEnum): PIPELINES = "pipelines" REGISTERED_MODELS = "registered-models" REPOS = "repos" - SERVING_ENDPOINTS = "serving-endpoints" SQL_WAREHOUSES = "sql/warehouses" # / is not a typo, it's the real object type def __repr__(self): return self.value -class LogicalObjectType(StrEnum): - ENTITLEMENTS = "ENTITLEMENTS" - ROLES = "ROLES" - FILE = "FILE" - REPO = "REPO" - DIRECTORY = "DIRECTORY" - NOTEBOOK = "NOTEBOOK" - SECRET_SCOPE = "SECRET_SCOPE" - PASSWORD = "PASSWORD" - TOKEN = "TOKEN" - WAREHOUSE = "WAREHOUSE" - MODEL = "MODEL" - EXPERIMENT = "EXPERIMENT" - JOB = "JOB" - PIPELINE = "PIPELINE" - CLUSTER = "CLUSTER" - INSTANCE_POOL = "INSTANCE_POOL" - CLUSTER_POLICY = "CLUSTER_POLICY" - - def __repr__(self): - return self.value - - -class AclPermission(StrEnum): - READ = "READ" - WRITE = "WRITE" - MANAGE = "MANAGE" - - -@dataclass -class AclItem: - principal: str - permission: AclPermission - - @classmethod - def from_dict(cls, raw: dict): - return cls(principal=raw.get("principal", None), permission=AclPermission(raw.get("permission"))) - - -@dataclass -class AclItemsContainer: - acls: list[AclItem] - - @staticmethod - def from_sdk(source: list[SdkAclItem]) -> "AclItemsContainer": - _typed_acls = [ - AclItem(principal=acl.principal, permission=AclPermission(acl.permission.value)) for acl in source - ] - return AclItemsContainer(acls=_typed_acls) - - def to_sdk(self) -> list[SdkAclItem]: - return [ - SdkAclItem(principal=acl.principal, permission=SdkAclPermission(acl.permission.value)) for acl in self.acls - ] - - @classmethod - def from_dict(cls, raw: dict) -> "AclItemsContainer": - return cls(acls=[AclItem.from_dict(a) for a in raw.get("acls", [])]) - - def as_dict(self) -> dict: - return asdict(self) - - -@dataclass -class RolesAndEntitlements: - roles: list - entitlements: list - - @dataclass class PermissionsInventoryItem: object_id: str - logical_object_type: LogicalObjectType - request_object_type: RequestObjectType | None + support: str # shall be taken from CRAWLERS dict raw_object_permissions: str - @property - def object_permissions(self) -> dict: - return json.loads(self.raw_object_permissions) - - @property - def typed_object_permissions(self) -> ObjectPermissions | AclItemsContainer | RolesAndEntitlements: - if self.logical_object_type == LogicalObjectType.SECRET_SCOPE: - return AclItemsContainer.from_dict(self.object_permissions) - elif self.logical_object_type in [LogicalObjectType.ROLES, LogicalObjectType.ENTITLEMENTS]: - return RolesAndEntitlements(**self.object_permissions) - else: - return ObjectPermissions.from_dict(self.object_permissions) - @staticmethod def from_pandas(source: pd.DataFrame) -> list["PermissionsInventoryItem"]: items = source.to_dict(orient="records") @@ -130,9 +45,6 @@ def as_dict(self) -> dict: def from_dict(cls, raw: dict) -> "PermissionsInventoryItem": return cls( object_id=raw["object_id"], - logical_object_type=LogicalObjectType(raw["logical_object_type"]), - request_object_type=RequestObjectType(raw["request_object_type"]) - if raw.get("request_object_type", None) is not None - else None, - raw_object_permissions=raw.get("raw_object_permissions", None), + raw_object_permissions=raw["raw_object_permissions"], + support=raw["support"], ) diff --git a/src/databricks/labs/ucx/inventory/verification.py b/src/databricks/labs/ucx/inventory/verification.py new file mode 100644 index 0000000000..8ab348ce97 --- /dev/null +++ b/src/databricks/labs/ucx/inventory/verification.py @@ -0,0 +1,73 @@ +from typing import Literal + +from databricks.sdk import WorkspaceClient +from databricks.sdk.service import workspace + +from databricks.labs.ucx.providers.groups_info import GroupMigrationState + + +class VerificationManager: + def __init__(self, ws: WorkspaceClient): + self._ws = ws + + def verify( + self, migration_state: GroupMigrationState, target: Literal["backup", "account"], tuples: list[tuple[str, str]] + ): + for object_type, object_id in tuples: + if object_type == "secrets": + self.verify_applied_scope_acls(object_id, migration_state, target) + else: + self.verify_applied_permissions(object_type, object_id, migration_state, target) + self.verify_roles_and_entitlements(migration_state, target) + + def verify_applied_permissions( + self, + object_type: str, + object_id: str, + migration_state: GroupMigrationState, + target: Literal["backup", "account"], + ): + op = self._ws.permissions.get(object_type, object_id) + for info in migration_state.groups: + src_permissions = sorted( + [_ for _ in op.access_control_list if _.group_name == info.workspace.display_name], + key=lambda p: p.group_name, + ) + dst_permissions = sorted( + [_ for _ in op.access_control_list if _.group_name == getattr(info, target).display_name], + key=lambda p: p.group_name, + ) + assert len(dst_permissions) == len( + src_permissions + ), f"Target permissions were not applied correctly for {object_type}/{object_id}" + assert [t.all_permissions for t in dst_permissions] == [ + s.all_permissions for s in src_permissions + ], f"Target permissions were not applied correctly for {object_type}/{object_id}" + + def verify_applied_scope_acls( + self, scope_name: str, migration_state: GroupMigrationState, target: Literal["backup", "account"] + ): + base_attr = "workspace" if target == "backup" else "backup" + for mi in migration_state.groups: + src_name = getattr(mi, base_attr).display_name + dst_name = getattr(mi, target).display_name + src_permission = self._secret_scope_permission(scope_name, src_name) + dst_permission = self._secret_scope_permission(scope_name, dst_name) + assert src_permission == dst_permission, "Scope ACLs were not applied correctly" + + def verify_roles_and_entitlements(self, migration_state: GroupMigrationState, target: Literal["backup", "account"]): + for el in migration_state.groups: + comparison_base = getattr(el, "workspace" if target == "backup" else "backup") + comparison_target = getattr(el, target) + + base_group_info = self._ws.groups.get(comparison_base.id) + target_group_info = self._ws.groups.get(comparison_target.id) + + assert base_group_info.roles == target_group_info.roles + assert base_group_info.entitlements == target_group_info.entitlements + + def _secret_scope_permission(self, scope_name: str, group_name: str) -> workspace.AclPermission | None: + for acl in self._ws.secrets.list_acls(scope=scope_name): + if acl.principal == group_name: + return acl.permission + return None diff --git a/src/databricks/labs/ucx/support/__init__.py b/src/databricks/labs/ucx/support/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/databricks/labs/ucx/support/base.py b/src/databricks/labs/ucx/support/base.py new file mode 100644 index 0000000000..ab9fd75561 --- /dev/null +++ b/src/databricks/labs/ucx/support/base.py @@ -0,0 +1,57 @@ +from abc import ABC, abstractmethod +from collections.abc import Callable, Iterator +from functools import partial +from logging import Logger + +from databricks.sdk import WorkspaceClient + +from databricks.labs.ucx.inventory.types import Destination, PermissionsInventoryItem +from databricks.labs.ucx.providers.groups_info import GroupMigrationState +from databricks.labs.ucx.utils import noop + +logger = Logger(__name__) + + +class Crawler: + @abstractmethod + def get_crawler_tasks(self) -> Iterator[Callable[..., PermissionsInventoryItem | None]]: + """ + This method should return a list of crawler tasks (e.g. partials or just any callables) + :return: + """ + + +class Applier: + @abstractmethod + def is_item_relevant(self, item: PermissionsInventoryItem, migration_state: GroupMigrationState) -> bool: + """ + This method verifies that the given item is relevant for the given migration state. + """ + + @abstractmethod + def _get_apply_task( + self, item: PermissionsInventoryItem, migration_state: GroupMigrationState, destination: Destination + ) -> partial: + """ + This method should return an instance of ApplierTask. + """ + + def get_apply_task( + self, item: PermissionsInventoryItem, migration_state: GroupMigrationState, destination: Destination + ) -> partial: + # we explicitly put the relevance check here to avoid "forgotten implementation" in child classes + if self.is_item_relevant(item, migration_state): + return self._get_apply_task(item, migration_state, destination) + else: + return partial(noop) + + +class BaseSupport(ABC, Crawler, Applier): + """ + Base class for all support classes. + Child classes must implement all abstract methods. + """ + + def __init__(self, ws: WorkspaceClient): + # workspace client is required in all implementations + self._ws = ws diff --git a/src/databricks/labs/ucx/support/group_level.py b/src/databricks/labs/ucx/support/group_level.py new file mode 100644 index 0000000000..f06e0c3570 --- /dev/null +++ b/src/databricks/labs/ucx/support/group_level.py @@ -0,0 +1,49 @@ +import json +from functools import partial + +from databricks.sdk.service import iam +from ratelimit import limits, sleep_and_retry + +from databricks.labs.ucx.inventory.types import Destination, PermissionsInventoryItem +from databricks.labs.ucx.providers.groups_info import GroupMigrationState +from databricks.labs.ucx.support.base import BaseSupport + + +class ScimSupport(BaseSupport): + def _crawler_task(self, group: iam.Group, property_name: str): + return PermissionsInventoryItem( + object_id=group.id, + support=property_name, + raw_object_permissions=json.dumps([e.as_dict() for e in getattr(group, property_name)]), + ) + + @sleep_and_retry + @limits(calls=10, period=1) + def _applier_task(self, group_id: str, value: list[iam.ComplexValue], property_name: str): + operations = [iam.Patch(op=iam.PatchOp.ADD, path=property_name, value=value)] + schemas = [iam.PatchSchema.URN_IETF_PARAMS_SCIM_API_MESSAGES_2_0_PATCH_OP] + self._ws.groups.patch(id=group_id, operations=operations, schemas=schemas) + + def is_item_relevant(self, item: PermissionsInventoryItem, migration_state: GroupMigrationState) -> bool: + return any(g.workspace.id == item.object_id for g in migration_state.groups) + + def get_crawler_tasks(self): + groups = self._ws.groups.list(attributes="id,displayName,roles,entitlements") + with_roles = [g for g in groups if g.roles and len(g.roles) > 0] + with_entitlements = [g for g in groups if g.entitlements and len(g.entitlements) > 0] + for g in with_roles: + yield partial(self._crawler_task, g, "roles") + for g in with_entitlements: + yield partial(self._crawler_task, g, "entitlements") + + def _get_apply_task( + self, item: PermissionsInventoryItem, migration_state: GroupMigrationState, destination: Destination + ): + value = [iam.ComplexValue.from_dict(e) for e in json.loads(item.raw_object_permissions)] + target_info = [g for g in migration_state.groups if g.workspace.id == item.object_id] + if len(target_info) == 0: + msg = f"Could not find group with ID {item.object_id}" + raise ValueError(msg) + else: + target_group_id = getattr(target_info[0], destination).id + return partial(self._applier_task, group_id=target_group_id, value=value, property_name=item.support) diff --git a/src/databricks/labs/ucx/support/impl.py b/src/databricks/labs/ucx/support/impl.py new file mode 100644 index 0000000000..ca29ffe9f1 --- /dev/null +++ b/src/databricks/labs/ucx/support/impl.py @@ -0,0 +1,87 @@ +from collections.abc import Callable, Iterator + +from databricks.sdk import WorkspaceClient +from databricks.sdk.service import sql + +from databricks.labs.ucx.inventory.types import ( + PermissionsInventoryItem, + RequestObjectType, +) +from databricks.labs.ucx.support.base import BaseSupport +from databricks.labs.ucx.support.group_level import ScimSupport +from databricks.labs.ucx.support.listing import ( + authorization_listing, + experiments_listing, + models_listing, + workspace_listing, +) +from databricks.labs.ucx.support.permissions import ( + GenericPermissionsSupport, + listing_wrapper, +) +from databricks.labs.ucx.support.secrets import SecretScopesSupport +from databricks.labs.ucx.support.sql import SqlPermissionsSupport +from databricks.labs.ucx.support.sql import listing_wrapper as sql_listing_wrapper + + +class SupportsProvider: + def __init__(self, ws: WorkspaceClient, num_threads: int, workspace_start_path: str): + self._generic_support = GenericPermissionsSupport( + ws=ws, + listings=[ + listing_wrapper(ws.clusters.list, "cluster_id", RequestObjectType.CLUSTERS), + listing_wrapper(ws.cluster_policies.list, "cluster_policy_id", RequestObjectType.CLUSTER_POLICIES), + listing_wrapper(ws.instance_pools.list, "instance_pool_id", RequestObjectType.INSTANCE_POOLS), + listing_wrapper(ws.warehouses.list, "id", RequestObjectType.SQL_WAREHOUSES), + listing_wrapper(ws.jobs.list, "job_id", RequestObjectType.JOBS), + listing_wrapper(ws.pipelines.list, "pipeline_id", RequestObjectType.PIPELINES), + listing_wrapper(experiments_listing(ws), "experiment_id", RequestObjectType.EXPERIMENTS), + listing_wrapper(models_listing(ws), "id", RequestObjectType.REGISTERED_MODELS), + workspace_listing(ws, num_threads=num_threads, start_path=workspace_start_path), + authorization_listing(), + ], + ) + self._secrets_support = SecretScopesSupport(ws=ws) + self._scim_support = ScimSupport(ws) + self._sql_support = SqlPermissionsSupport( + ws, + listings=[ + sql_listing_wrapper(ws.alerts.list, sql.ObjectTypePlural.ALERTS), + sql_listing_wrapper(ws.dashboards.list, sql.ObjectTypePlural.DASHBOARDS), + sql_listing_wrapper(ws.queries.list, sql.ObjectTypePlural.QUERIES), + ], + ) + + def get_crawler_tasks(self) -> Iterator[Callable[..., PermissionsInventoryItem | None]]: + for support in [self._generic_support, self._secrets_support, self._scim_support, self._sql_support]: + yield from support.get_crawler_tasks() + + @property + def supports(self) -> dict[str, BaseSupport]: + return { + # SCIM-based API + "entitlements": self._scim_support, + "roles": self._scim_support, + # generic API + "clusters": self._generic_support, + "cluster_policies": self._generic_support, + "instance_pools": self._generic_support, + "sql_warehouses": self._generic_support, + "jobs": self._generic_support, + "pipelines": self._generic_support, + "experiments": self._generic_support, + "registered_models": self._generic_support, + "tokens": self._generic_support, + "passwords": self._generic_support, + # workspace objects + "notebooks": self._generic_support, + "files": self._generic_support, + "directories": self._generic_support, + "repos": self._generic_support, + # SQL API + "alerts": self._sql_support, + "queries": self._sql_support, + "dashboards": self._sql_support, + # secrets API + "secrets": self._secrets_support, + } diff --git a/src/databricks/labs/ucx/inventory/listing.py b/src/databricks/labs/ucx/support/listing.py similarity index 57% rename from src/databricks/labs/ucx/inventory/listing.py rename to src/databricks/labs/ucx/support/listing.py index b8a4411426..107ce0ba97 100644 --- a/src/databricks/labs/ucx/inventory/listing.py +++ b/src/databricks/labs/ucx/support/listing.py @@ -5,9 +5,13 @@ from itertools import groupby from databricks.sdk import WorkspaceClient +from databricks.sdk.service import ml, workspace from databricks.sdk.service.workspace import ObjectInfo, ObjectType from ratelimit import limits, sleep_and_retry +from databricks.labs.ucx.inventory.types import RequestObjectType +from databricks.labs.ucx.support.permissions import GenericPermissionsInfo + logger = logging.getLogger(__name__) @@ -21,7 +25,7 @@ def __init__( ): self.start_time = None self._ws = ws - self.results = [] + self.results: list[ObjectInfo] = [] self._num_threads = num_threads self._with_directories = with_directories self._counter = 0 @@ -89,3 +93,77 @@ def walk(self, start_path="/"): logger.info(f"Recursive WorkspaceFS listing finished at {dt.datetime.now()}") logger.info(f"Total time taken for workspace listing: {dt.datetime.now() - self.start_time}") self._progress_report(None) + return self.results + + +def models_listing(ws: WorkspaceClient): + def inner() -> Iterator[ml.ModelDatabricks]: + for model in ws.model_registry.list_models(): + model_with_id = ws.model_registry.get_model(model.name).registered_model_databricks + yield model_with_id + + return inner + + +def experiments_listing(ws: WorkspaceClient): + def inner() -> Iterator[ml.Experiment]: + for experiment in ws.experiments.list_experiments(): + """ + We filter-out notebook-based experiments, because they are covered by notebooks listing + """ + # workspace-based notebook experiment + if experiment.tags: + nb_tag = [t for t in experiment.tags if t.key == "mlflow.experimentType" and t.value == "NOTEBOOK"] + # repo-based notebook experiment + repo_nb_tag = [ + t for t in experiment.tags if t.key == "mlflow.experiment.sourceType" and t.value == "REPO_NOTEBOOK" + ] + if nb_tag or repo_nb_tag: + continue + + yield experiment + + return inner + + +def authorization_listing(): + def inner(): + for _value in ["passwords", "tokens"]: + yield GenericPermissionsInfo( + object_id=_value, + request_type=RequestObjectType.AUTHORIZATION, + ) + + return inner + + +def _convert_object_type_to_request_type(_object: workspace.ObjectInfo) -> RequestObjectType | None: + match _object.object_type: + case workspace.ObjectType.NOTEBOOK: + return RequestObjectType.NOTEBOOKS + case workspace.ObjectType.DIRECTORY: + return RequestObjectType.DIRECTORIES + case workspace.ObjectType.LIBRARY: + return None + case workspace.ObjectType.REPO: + return RequestObjectType.REPOS + case workspace.ObjectType.FILE: + return RequestObjectType.FILES + # silent handler for experiments - they'll be inventorized by the experiments manager + case None: + return None + + +def workspace_listing(ws: WorkspaceClient, num_threads=20, start_path: str | None = "/"): + def inner(): + ws_listing = WorkspaceListing( + ws, + num_threads=num_threads, + with_directories=False, + ) + for _object in ws_listing.walk(start_path): + request_type = _convert_object_type_to_request_type(_object) + if request_type: + yield GenericPermissionsInfo(object_id=str(_object.object_id), request_type=request_type) + + return inner diff --git a/src/databricks/labs/ucx/support/permissions.py b/src/databricks/labs/ucx/support/permissions.py new file mode 100644 index 0000000000..d902dffbdb --- /dev/null +++ b/src/databricks/labs/ucx/support/permissions.py @@ -0,0 +1,146 @@ +import json +from collections.abc import Callable, Iterator +from dataclasses import dataclass +from functools import partial + +from databricks.sdk import WorkspaceClient +from databricks.sdk.core import DatabricksError +from databricks.sdk.service import iam +from ratelimit import limits, sleep_and_retry + +from databricks.labs.ucx.inventory.types import ( + Destination, + PermissionsInventoryItem, + RequestObjectType, +) +from databricks.labs.ucx.providers.groups_info import GroupMigrationState +from databricks.labs.ucx.support.base import BaseSupport, logger + + +@dataclass +class GenericPermissionsInfo: + object_id: str + request_type: RequestObjectType + + +class GenericPermissionsSupport(BaseSupport): + def __init__( + self, + listings: list[Callable[..., Iterator[GenericPermissionsInfo]]], + ws: WorkspaceClient, + ): + super().__init__(ws) + self._listings: list[Callable[..., Iterator[GenericPermissionsInfo]]] = listings + + def _safe_get_permissions( + self, ws: WorkspaceClient, request_object_type: RequestObjectType, object_id: str + ) -> iam.ObjectPermissions | None: + try: + permissions = ws.permissions.get(request_object_type, object_id) + return permissions + except DatabricksError as e: + if e.error_code in ["RESOURCE_DOES_NOT_EXIST", "RESOURCE_NOT_FOUND", "PERMISSION_DENIED"]: + logger.warning(f"Could not get permissions for {request_object_type} {object_id} due to {e.error_code}") + return None + else: + raise e + + def _prepare_new_acl( + self, permissions: iam.ObjectPermissions, migration_state: GroupMigrationState, destination: Destination + ) -> list[iam.AccessControlRequest]: + _acl = permissions.access_control_list + acl_requests = [] + + for _item in _acl: + # TODO: we have a double iteration over migration_state.groups + # (also by migration_state.get_by_workspace_group_name). + # Has to be be fixed by iterating just on .groups + if _item.group_name in [g.workspace.display_name for g in migration_state.groups]: + migration_info = migration_state.get_by_workspace_group_name(_item.group_name) + assert migration_info is not None, f"Group {_item.group_name} is not in the migration groups provider" + destination_group: iam.Group = getattr(migration_info, destination) + _item.group_name = destination_group.display_name + _reqs = [ + iam.AccessControlRequest( + group_name=_item.group_name, + service_principal_name=_item.service_principal_name, + user_name=_item.user_name, + permission_level=p.permission_level, + ) + for p in _item.all_permissions + if not p.inherited + ] + acl_requests.extend(_reqs) + + return acl_requests + + @sleep_and_retry + @limits(calls=30, period=1) + def _applier_task( + self, ws: WorkspaceClient, object_id: str, acl: list[iam.AccessControlRequest], request_type: RequestObjectType + ): + ws.permissions.update(request_type, object_id, acl) + + @sleep_and_retry + @limits(calls=100, period=1) + def _crawler_task( + self, + ws: WorkspaceClient, + object_id: str, + request_type: RequestObjectType, + ) -> PermissionsInventoryItem | None: + permissions = self._safe_get_permissions(ws, request_type, object_id) + if permissions: + return PermissionsInventoryItem( + object_id=object_id, + support=request_type.value, + raw_object_permissions=json.dumps(permissions.as_dict()), + ) + + def _get_apply_task( + self, item: PermissionsInventoryItem, migration_state: GroupMigrationState, destination: Destination + ) -> partial: + new_acl = self._prepare_new_acl( + iam.ObjectPermissions.from_dict(json.loads(item.raw_object_permissions)), migration_state, destination + ) + return partial( + self._applier_task, + ws=self._ws, + request_type=RequestObjectType(item.support), + acl=new_acl, + object_id=item.object_id, + ) + + def is_item_relevant(self, item: PermissionsInventoryItem, migration_state: GroupMigrationState) -> bool: + # passwords and tokens are represented on the workspace-level + if item.object_id in ("tokens", "passwords"): + return True + else: + mentioned_groups = [ + acl.group_name + for acl in iam.ObjectPermissions.from_dict(json.loads(item.raw_object_permissions)).access_control_list + ] + return any(g in mentioned_groups for g in [info.workspace.display_name for info in migration_state.groups]) + + def get_crawler_tasks(self): + for listing in self._listings: + for info in listing(): + yield partial( + self._crawler_task, + ws=self._ws, + object_id=info.object_id, + request_type=info.request_type, + ) + + +def listing_wrapper( + func: Callable[..., list], id_attribute: str, object_type: RequestObjectType +) -> Callable[..., Iterator[GenericPermissionsInfo]]: + def wrapper() -> Iterator[GenericPermissionsInfo]: + for item in func(): + yield GenericPermissionsInfo( + object_id=getattr(item, id_attribute), + request_type=object_type, + ) + + return wrapper diff --git a/src/databricks/labs/ucx/support/secrets.py b/src/databricks/labs/ucx/support/secrets.py new file mode 100644 index 0000000000..36915299b9 --- /dev/null +++ b/src/databricks/labs/ucx/support/secrets.py @@ -0,0 +1,59 @@ +import json +from functools import partial + +from databricks.sdk import WorkspaceClient +from databricks.sdk.service import iam, workspace +from ratelimit import limits, sleep_and_retry + +from databricks.labs.ucx.inventory.types import Destination, PermissionsInventoryItem +from databricks.labs.ucx.providers.groups_info import GroupMigrationState +from databricks.labs.ucx.support.base import BaseSupport + + +class SecretScopesSupport(BaseSupport): + def __init__(self, ws: WorkspaceClient): + super().__init__(ws=ws) + + def get_crawler_tasks(self): + scopes = self._ws.secrets.list_scopes() + + def _crawler_task(scope: workspace.SecretScope): + acl_items = self._ws.secrets.list_acls(scope.name) + return PermissionsInventoryItem( + object_id=scope.name, + support="secrets", + raw_object_permissions=json.dumps([item.as_dict() for item in acl_items]), + ) + + for scope in scopes: + yield partial(_crawler_task, scope) + + def is_item_relevant(self, item: PermissionsInventoryItem, migration_state: GroupMigrationState) -> bool: + acls = [workspace.AclItem.from_dict(acl) for acl in json.loads(item.raw_object_permissions)] + mentioned_groups = [acl.principal for acl in acls] + return any(g in mentioned_groups for g in [info.workspace.display_name for info in migration_state.groups]) + + @sleep_and_retry + @limits(calls=30, period=1) + def _rate_limited_put_acl(self, object_id: str, principal: str, permission: workspace.AclPermission): + self._ws.secrets.put_acl(object_id, principal, permission) + + def _get_apply_task( + self, item: PermissionsInventoryItem, migration_state: GroupMigrationState, destination: Destination + ) -> partial: + acls = [workspace.AclItem.from_dict(acl) for acl in json.loads(item.raw_object_permissions)] + new_acls = [] + + for acl in acls: + if acl.principal in [i.workspace.display_name for i in migration_state.groups]: + source_info = migration_state.get_by_workspace_group_name(acl.principal) + target: iam.Group = getattr(source_info, destination) + new_acls.append(workspace.AclItem(principal=target.display_name, permission=acl.permission)) + else: + new_acls.append(acl) + + def apply_acls(): + for acl in new_acls: + self._rate_limited_put_acl(item.object_id, acl.principal, acl.permission) + + return partial(apply_acls) diff --git a/src/databricks/labs/ucx/support/sql.py b/src/databricks/labs/ucx/support/sql.py new file mode 100644 index 0000000000..b98c46748a --- /dev/null +++ b/src/databricks/labs/ucx/support/sql.py @@ -0,0 +1,120 @@ +import dataclasses +import json +from collections.abc import Callable +from dataclasses import dataclass +from functools import partial + +from databricks.sdk import WorkspaceClient +from databricks.sdk.core import DatabricksError +from databricks.sdk.service import iam, sql +from ratelimit import limits, sleep_and_retry + +from databricks.labs.ucx.inventory.types import Destination, PermissionsInventoryItem +from databricks.labs.ucx.providers.groups_info import GroupMigrationState +from databricks.labs.ucx.support.base import BaseSupport, logger + + +@dataclass +class SqlPermissionsInfo: + object_id: str + request_type: sql.ObjectTypePlural + + +class SqlPermissionsSupport(BaseSupport): + def is_item_relevant(self, item: PermissionsInventoryItem, migration_state: GroupMigrationState) -> bool: + mentioned_groups = [ + acl.group_name + for acl in sql.GetResponse.from_dict(json.loads(item.raw_object_permissions)).access_control_list + ] + return any(g in mentioned_groups for g in [info.workspace.display_name for info in migration_state.groups]) + + def __init__( + self, + ws: WorkspaceClient, + listings: list[Callable[..., list[SqlPermissionsInfo]]], + ): + super().__init__(ws) + self._listings = listings + + def _safe_get_dbsql_permissions(self, object_type: sql.ObjectTypePlural, object_id: str) -> sql.GetResponse | None: + try: + return self._ws.dbsql_permissions.get(object_type, object_id) + except DatabricksError as e: + if e.error_code in ["RESOURCE_DOES_NOT_EXIST", "RESOURCE_NOT_FOUND", "PERMISSION_DENIED"]: + logger.warning(f"Could not get permissions for {object_type} {object_id} due to {e.error_code}") + return None + else: + raise e + + @sleep_and_retry + @limits(calls=100, period=1) + def _crawler_task(self, object_id: str, object_type: sql.ObjectTypePlural) -> PermissionsInventoryItem | None: + permissions = self._safe_get_dbsql_permissions(object_type=object_type, object_id=object_id) + if permissions: + return PermissionsInventoryItem( + object_id=object_id, + support=object_type.value, + raw_object_permissions=json.dumps(permissions.as_dict()), + ) + + @sleep_and_retry + @limits(calls=30, period=1) + def _applier_task(self, object_type: sql.ObjectTypePlural, object_id: str, acl: list[sql.AccessControl]): + """ + Please note that we only have SET option (DBSQL Permissions API doesn't support UPDATE operation). + This affects the way how we prepare the new ACL request. + """ + self._ws.dbsql_permissions.set(object_type=object_type, object_id=object_id, acl=acl) + + def get_crawler_tasks(self): + for listing in self._listings: + for item in listing(): + yield partial(self._crawler_task, item.object_id, item.request_type) + + def _prepare_new_acl( + self, acl: list[sql.AccessControl], migration_state: GroupMigrationState, destination: Destination + ) -> list[sql.AccessControl]: + """ + Please note the comment above on how we apply these permissions. + """ + acl_requests: list[sql.AccessControl] = [] + + for acl_request in acl: + if acl_request.group_name in [g.workspace.display_name for g in migration_state.groups]: + migration_info = migration_state.get_by_workspace_group_name(acl_request.group_name) + assert ( + migration_info is not None + ), f"Group {acl_request.group_name} is not in the migration groups provider" + destination_group: iam.Group = getattr(migration_info, destination) + new_acl_request = dataclasses.replace(acl_request, group_name=destination_group.display_name) + acl_requests.append(new_acl_request) + else: + # no changes shall be applied + acl_requests.append(acl_request) + + return acl_requests + + def _get_apply_task( + self, item: PermissionsInventoryItem, migration_state: GroupMigrationState, destination: Destination + ): + new_acl = self._prepare_new_acl( + sql.GetResponse.from_dict(json.loads(item.raw_object_permissions)).access_control_list, + migration_state, + destination, + ) + return partial( + self._applier_task, object_type=sql.ObjectTypePlural(item.support), object_id=item.object_id, acl=new_acl + ) + + +def listing_wrapper( + func: Callable[..., list], object_type: sql.ObjectTypePlural +) -> Callable[..., list[SqlPermissionsInfo]]: + def wrapper() -> list[SqlPermissionsInfo]: + for item in func(): + yield SqlPermissionsInfo( + object_id=item.id, + request_type=object_type, + ) + + return wrapper diff --git a/src/databricks/labs/ucx/toolkits/group_migration.py b/src/databricks/labs/ucx/toolkits/group_migration.py index d3343e9029..dc2c861ab7 100644 --- a/src/databricks/labs/ucx/toolkits/group_migration.py +++ b/src/databricks/labs/ucx/toolkits/group_migration.py @@ -3,12 +3,13 @@ from databricks.sdk import WorkspaceClient from databricks.labs.ucx.config import MigrationConfig -from databricks.labs.ucx.inventory.inventorizer import Inventorizers from databricks.labs.ucx.inventory.permissions import PermissionManager from databricks.labs.ucx.inventory.permissions_inventory import ( PermissionsInventoryTable, ) +from databricks.labs.ucx.inventory.verification import VerificationManager from databricks.labs.ucx.managers.group import GroupManager +from databricks.labs.ucx.support.impl import SupportsProvider class GroupMigrationToolkit: @@ -27,7 +28,12 @@ def __init__(self, config: MigrationConfig): self._group_manager = GroupManager(self._ws, config.groups) self._permissions_inventory = PermissionsInventoryTable(config.inventory_database, self._ws) - self._permissions_manager = PermissionManager(self._ws, self._permissions_inventory) + self._permissions_manager = PermissionManager( + self._ws, + self._permissions_inventory, + supports_provider=SupportsProvider(self._ws, self._num_threads, self._workspace_start_path), + ) + self._verification_manager = VerificationManager(self._ws) @staticmethod def _verify_ws_client(w: WorkspaceClient): @@ -44,10 +50,6 @@ def _configure_logger(level: str): def prepare_environment(self): self._group_manager.prepare_groups_in_environment() - inventorizers = Inventorizers.provide( - self._ws, self._group_manager.migration_groups_provider, self._num_threads, self._workspace_start_path - ) - self._permissions_manager.set_inventorizers(inventorizers) def cleanup_inventory_table(self): self._permissions_inventory.cleanup() @@ -61,7 +63,7 @@ def apply_permissions_to_backup_groups(self): ) def verify_permissions_on_backup_groups(self, to_verify): - self._permissions_manager.verify(self._group_manager.migration_groups_provider, "backup", to_verify) + self._verification_manager.verify(self._group_manager.migration_groups_provider, "backup", to_verify) def replace_workspace_groups_with_account_groups(self): self._group_manager.replace_workspace_groups_with_account_groups() @@ -72,7 +74,7 @@ def apply_permissions_to_account_groups(self): ) def verify_permissions_on_account_groups(self, to_verify): - self._permissions_manager.verify(self._group_manager.migration_groups_provider, "account", to_verify) + self._verification_manager.verify(self._group_manager.migration_groups_provider, "account", to_verify) def delete_backup_groups(self): self._group_manager.delete_backup_groups() diff --git a/src/databricks/labs/ucx/utils.py b/src/databricks/labs/ucx/utils.py index 7dda8543bd..030411d353 100644 --- a/src/databricks/labs/ucx/utils.py +++ b/src/databricks/labs/ucx/utils.py @@ -42,7 +42,7 @@ def __init__( self._executables = executables self._futures = [] _reporter = ProgressReporter(len(executables)) if not progress_reporter else progress_reporter - self._done_callback = _reporter.progress_report + self._done_callback: Callable = _reporter.progress_report @classmethod def gather(cls, name: str, tasks: list[ExecutableFunction]) -> list[ExecutableResult]: @@ -55,8 +55,7 @@ def run(self) -> list[ExecutableResult]: with ThreadPoolExecutor(self._num_threads) as executor: for executable in self._executables: future = executor.submit(executable) - if self._done_callback: - future.add_done_callback(self._done_callback) + future.add_done_callback(self._done_callback) # TODO: errors are not handled yet - https://github.com/databricks/UC-Upgrade/issues/89 self._futures.append(future) @@ -80,3 +79,7 @@ class WorkspaceLevelEntitlement(StrEnum): DATABRICKS_SQL_ACCESS = "databricks-sql-access" ALLOW_CLUSTER_CREATE = "allow-cluster-create" ALLOW_INSTANCE_POOL_CREATE = "allow-instance-pool-create" + + +def noop(): + pass diff --git a/tests/integration/test_e2e.py b/tests/integration/test_e2e.py index d8a7686e46..da2118eacc 100644 --- a/tests/integration/test_e2e.py +++ b/tests/integration/test_e2e.py @@ -12,7 +12,7 @@ MigrationConfig, TaclConfig, ) -from databricks.labs.ucx.inventory.types import LogicalObjectType, RequestObjectType +from databricks.labs.ucx.inventory.types import RequestObjectType from databricks.labs.ucx.toolkits.group_migration import GroupMigrationToolkit logger = logging.getLogger(__name__) @@ -141,7 +141,7 @@ def test_e2e( scope = make_secret_scope() make_secret_scope_acl(scope=scope, principal=ws_group.display_name, permission=workspace.AclPermission.WRITE) - to_verify.add((LogicalObjectType.SECRET_SCOPE, scope)) + to_verify.add(("secrets", scope)) make_authorization_permissions( object_id="tokens", diff --git a/tests/unit/support/__init__.py b/tests/unit/support/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/support/conftest.py b/tests/unit/support/conftest.py new file mode 100644 index 0000000000..5375cce750 --- /dev/null +++ b/tests/unit/support/conftest.py @@ -0,0 +1,20 @@ +import pytest +from databricks.sdk.service import iam + +from databricks.labs.ucx.providers.groups_info import ( + GroupMigrationState, + MigrationGroupInfo, +) + + +@pytest.fixture(scope="function") +def migration_state() -> GroupMigrationState: + ms = GroupMigrationState() + ms.add( + group=MigrationGroupInfo( + workspace=iam.Group(display_name="test", id="test-ws"), + backup=iam.Group(display_name="db-temp-test", id="test-backup"), + account=iam.Group(display_name="test", id="test-acc"), + ) + ) + return ms diff --git a/tests/unit/support/test_base.py b/tests/unit/support/test_base.py new file mode 100644 index 0000000000..665e565243 --- /dev/null +++ b/tests/unit/support/test_base.py @@ -0,0 +1,42 @@ +from functools import partial + +from databricks.sdk.service import iam + +from databricks.labs.ucx.inventory.types import PermissionsInventoryItem +from databricks.labs.ucx.providers.groups_info import ( + GroupMigrationState, + MigrationGroupInfo, +) +from databricks.labs.ucx.support.base import Applier +from databricks.labs.ucx.utils import noop + + +def test_applier(): + class SampleApplier(Applier): + def is_item_relevant(self, item: PermissionsInventoryItem, migration_state: GroupMigrationState) -> bool: + workspace_groups = [info.workspace.display_name for info in migration_state.groups] + return item.object_id in workspace_groups + + def _get_apply_task(self, _, __, ___): + def test_task(): + print("here!") + + return partial(test_task) + + applier = SampleApplier() + positive_item = PermissionsInventoryItem(object_id="test", support="test", raw_object_permissions="test") + migration_state = GroupMigrationState() + migration_state.add( + group=MigrationGroupInfo( + workspace=iam.Group(display_name="test", id="test"), + account=iam.Group(display_name="test", id="test-acc"), + backup=iam.Group(display_name="db-temp-test", id="test-backup"), + ) + ) + + task = applier.get_apply_task(positive_item, migration_state, "backup") + assert task.func.__name__ == "test_task" + + negative_item = PermissionsInventoryItem(object_id="not-here", support="test", raw_object_permissions="test") + new_task = applier.get_apply_task(negative_item, migration_state, "backup") + assert new_task.func == noop diff --git a/tests/unit/support/test_group_level.py b/tests/unit/support/test_group_level.py new file mode 100644 index 0000000000..5d66eeab25 --- /dev/null +++ b/tests/unit/support/test_group_level.py @@ -0,0 +1,93 @@ +import json +from unittest.mock import MagicMock + +import pytest +from databricks.sdk.service import iam + +from databricks.labs.ucx.inventory.types import PermissionsInventoryItem +from databricks.labs.ucx.support.group_level import ScimSupport + + +def test_scim_crawler(): + ws = MagicMock() + ws.groups.list.return_value = [ + iam.Group( + id="1", + display_name="group1", + roles=[], # verify that empty roles and entitlements are not returned + ), + iam.Group( + id="2", + display_name="group2", + roles=[iam.ComplexValue(value="role1")], + entitlements=[iam.ComplexValue(value="entitlement1")], + ), + iam.Group( + id="3", + display_name="group3", + roles=[iam.ComplexValue(value="role1"), iam.ComplexValue(value="role2")], + entitlements=[], + ), + ] + sup = ScimSupport(ws=ws) + tasks = list(sup.get_crawler_tasks()) + assert len(tasks) == 3 + ws.groups.list.assert_called_once() + for task in tasks: + item = task() + if item.object_id == "1": + assert item is None + else: + assert item.object_id in ["2", "3"] + assert item.support in ["roles", "entitlements"] + assert item.raw_object_permissions is not None + + +def test_scim_apply(migration_state): + ws = MagicMock() + sup = ScimSupport(ws=ws) + sample_permissions = [iam.ComplexValue(value="role1"), iam.ComplexValue(value="role2")] + item = PermissionsInventoryItem( + object_id="test-ws", + support="roles", + raw_object_permissions=json.dumps([p.as_dict() for p in sample_permissions]), + ) + + task = sup.get_apply_task(item, migration_state, "backup") + task() + ws.groups.patch.assert_called_once_with( + id="test-backup", + operations=[iam.Patch(op=iam.PatchOp.ADD, path="roles", value=sample_permissions)], + schemas=[iam.PatchSchema.URN_IETF_PARAMS_SCIM_API_MESSAGES_2_0_PATCH_OP], + ) + + +def test_no_group_in_migration_state(migration_state): + ws = MagicMock() + sup = ScimSupport(ws=ws) + sample_permissions = [iam.ComplexValue(value="role1"), iam.ComplexValue(value="role2")] + item = PermissionsInventoryItem( + object_id="test-non-existent", + support="roles", + raw_object_permissions=json.dumps([p.as_dict() for p in sample_permissions]), + ) + with pytest.raises(ValueError): + sup._get_apply_task(item, migration_state, "backup") + + +def test_non_relevant(migration_state): + ws = MagicMock() + sup = ScimSupport(ws=ws) + sample_permissions = [iam.ComplexValue(value="role1")] + relevant_item = PermissionsInventoryItem( + object_id="test-ws", + support="roles", + raw_object_permissions=json.dumps([p.as_dict() for p in sample_permissions]), + ) + irrelevant_item = PermissionsInventoryItem( + object_id="something-non-relevant", + support="roles", + raw_object_permissions=json.dumps([p.as_dict() for p in sample_permissions]), + ) + assert sup.is_item_relevant(relevant_item, migration_state) + assert not sup.is_item_relevant(irrelevant_item, migration_state) diff --git a/tests/unit/support/test_impl.py b/tests/unit/support/test_impl.py new file mode 100644 index 0000000000..11b7401903 --- /dev/null +++ b/tests/unit/support/test_impl.py @@ -0,0 +1,29 @@ +from unittest.mock import MagicMock + +from databricks.labs.ucx.support.impl import SupportsProvider + + +def test_supports_provider(): + provider = SupportsProvider(ws=MagicMock(), num_threads=1, workspace_start_path="/") + assert provider.supports.keys() == { + "entitlements", + "roles", + "clusters", + "cluster_policies", + "instance_pools", + "sql_warehouses", + "jobs", + "pipelines", + "experiments", + "registered_models", + "tokens", + "passwords", + "notebooks", + "files", + "directories", + "repos", + "alerts", + "queries", + "dashboards", + "secrets", + } diff --git a/tests/unit/support/test_listing.py b/tests/unit/support/test_listing.py new file mode 100644 index 0000000000..293506cac1 --- /dev/null +++ b/tests/unit/support/test_listing.py @@ -0,0 +1,85 @@ +import datetime as dt +from unittest.mock import MagicMock, patch + +from databricks.sdk.service import ml, workspace + +from databricks.labs.ucx.inventory.types import RequestObjectType +from databricks.labs.ucx.support.listing import ( + WorkspaceListing, + experiments_listing, + logger, + models_listing, + workspace_listing, +) +from databricks.labs.ucx.support.permissions import listing_wrapper + + +def test_logging_calls(): + workspace_listing = WorkspaceListing(ws=MagicMock(), num_threads=1) + workspace_listing.start_time = dt.datetime.now() + workspace_listing._counter = 9 + with patch.object(logger, "info") as mock_info: + workspace_listing._progress_report(None) + mock_info.assert_called_once() + + +def test_models_listing(): + ws = MagicMock() + ws.model_registry.list_models.return_value = [ml.Model(name="test")] + ws.model_registry.get_model.return_value = ml.GetModelResponse( + registered_model_databricks=ml.ModelDatabricks( + id="some-id", + name="test", + ) + ) + + wrapped = listing_wrapper(models_listing(ws), id_attribute="id", object_type=RequestObjectType.REGISTERED_MODELS) + result = list(wrapped()) + assert len(result) == 1 + assert result[0].object_id == "some-id" + assert result[0].request_type == RequestObjectType.REGISTERED_MODELS + + +def test_experiment_listing(): + ws = MagicMock() + ws.experiments.list_experiments.return_value = [ + ml.Experiment(experiment_id="test"), + ml.Experiment(experiment_id="test2", tags=[ml.ExperimentTag(key="whatever", value="SOMETHING")]), + ml.Experiment(experiment_id="test3", tags=[ml.ExperimentTag(key="mlflow.experimentType", value="NOTEBOOK")]), + ml.Experiment( + experiment_id="test4", tags=[ml.ExperimentTag(key="mlflow.experiment.sourceType", value="REPO_NOTEBOOK")] + ), + ] + wrapped = listing_wrapper( + experiments_listing(ws), id_attribute="experiment_id", object_type=RequestObjectType.EXPERIMENTS + ) + results = list(wrapped()) + assert len(results) == 2 + for res in results: + assert res.request_type == RequestObjectType.EXPERIMENTS + assert res.object_id in ["test", "test2"] + + +def test_workspace_listing(): + listing = MagicMock(spec=WorkspaceListing) + listing.walk.return_value = [ + workspace.ObjectInfo(object_id=1, object_type=workspace.ObjectType.NOTEBOOK), + workspace.ObjectInfo(object_id=2, object_type=workspace.ObjectType.DIRECTORY), + workspace.ObjectInfo(object_id=3, object_type=workspace.ObjectType.LIBRARY), + workspace.ObjectInfo(object_id=4, object_type=workspace.ObjectType.REPO), + workspace.ObjectInfo(object_id=5, object_type=workspace.ObjectType.FILE), + workspace.ObjectInfo(object_id=6, object_type=None), # MLflow Experiment + ] + + with patch("databricks.labs.ucx.support.listing.WorkspaceListing", return_value=listing): + results = workspace_listing(ws=MagicMock())() + assert len(list(results)) == 4 + listing.walk.assert_called_once() + for res in results: + assert res.request_type in [ + RequestObjectType.NOTEBOOKS, + RequestObjectType.DIRECTORIES, + RequestObjectType.REPOS, + RequestObjectType.FILES, + ] + assert res.object_id in [1, 2, 4, 5] diff --git a/tests/unit/support/test_permissions.py b/tests/unit/support/test_permissions.py new file mode 100644 index 0000000000..9b25d6ab38 --- /dev/null +++ b/tests/unit/support/test_permissions.py @@ -0,0 +1,138 @@ +import json +from unittest.mock import MagicMock + +import pytest +from databricks.sdk.core import DatabricksError +from databricks.sdk.service import compute, iam + +from databricks.labs.ucx.inventory.types import ( + PermissionsInventoryItem, + RequestObjectType, +) +from databricks.labs.ucx.support.permissions import ( + GenericPermissionsSupport, + listing_wrapper, +) + + +def test_crawler(): + ws = MagicMock() + ws.clusters.list.return_value = [ + compute.ClusterDetails( + cluster_id="test", + ) + ] + + sample_permission = iam.ObjectPermissions( + object_id="test", + object_type=str(RequestObjectType.CLUSTERS), + access_control_list=[ + iam.AccessControlResponse( + group_name="test", + all_permissions=[iam.Permission(inherited=False, permission_level=iam.PermissionLevel.CAN_USE)], + ) + ], + ) + + ws.permissions.get.return_value = sample_permission + + sup = GenericPermissionsSupport( + ws=ws, + listings=[ + listing_wrapper(ws.clusters.list, "cluster_id", RequestObjectType.CLUSTERS), + ], + ) + + tasks = list(sup.get_crawler_tasks()) + assert len(tasks) == 1 + ws.clusters.list.assert_called_once() + _task = tasks[0] + item = _task() + ws.permissions.get.assert_called_once() + assert item.object_id == "test" + assert item.support == "clusters" + assert json.loads(item.raw_object_permissions) == sample_permission.as_dict() + + +def test_apply(migration_state): + ws = MagicMock() + sup = GenericPermissionsSupport(ws=ws, listings=[]) # no listings since only apply is tested + + item = PermissionsInventoryItem( + object_id="test", + support="clusters", + raw_object_permissions=json.dumps( + iam.ObjectPermissions( + object_id="test", + object_type=str(RequestObjectType.CLUSTERS), + access_control_list=[ + iam.AccessControlResponse( + group_name="test", + all_permissions=[iam.Permission(inherited=False, permission_level=iam.PermissionLevel.CAN_USE)], + ), + iam.AccessControlResponse( + group_name="irrelevant", + all_permissions=[ + iam.Permission(inherited=False, permission_level=iam.PermissionLevel.CAN_MANAGE) + ], + ), + ], + ).as_dict() + ), + ) + + _task = sup.get_apply_task(item, migration_state, "backup") + _task() + ws.permissions.update.assert_called_once() + + expected_acl_payload = [ + iam.AccessControlRequest( + group_name="db-temp-test", + permission_level=iam.PermissionLevel.CAN_USE, + ) + ] + + ws.permissions.update.assert_called_with(RequestObjectType.CLUSTERS, "test", expected_acl_payload) + + +def test_relevance(): + sup = GenericPermissionsSupport(ws=MagicMock(), listings=[]) # no listings since only apply is tested + result = sup.is_item_relevant( + item=PermissionsInventoryItem(object_id="passwords", support="passwords", raw_object_permissions="some-stuff"), + migration_state=MagicMock(), + ) + assert result is True + + +def test_safe_get(): + ws = MagicMock() + ws.permissions.get.side_effect = DatabricksError(error_code="RESOURCE_DOES_NOT_EXIST") + sup = GenericPermissionsSupport(ws=ws, listings=[]) + result = sup._safe_get_permissions(ws, RequestObjectType.CLUSTERS, "test") + assert result is None + + ws.permissions.get.side_effect = DatabricksError(error_code="SOMETHING_UNEXPECTED") + with pytest.raises(DatabricksError): + sup._safe_get_permissions(ws, RequestObjectType.CLUSTERS, "test") + + +def test_no_permissions(): + ws = MagicMock() + ws.clusters.list.return_value = [ + compute.ClusterDetails( + cluster_id="test", + ) + ] + ws.permissions.get.side_effect = DatabricksError(error_code="RESOURCE_DOES_NOT_EXIST") + sup = GenericPermissionsSupport( + ws=ws, + listings=[ + listing_wrapper(ws.clusters.list, "cluster_id", RequestObjectType.CLUSTERS), + ], + ) + tasks = list(sup.get_crawler_tasks()) + assert len(tasks) == 1 + ws.clusters.list.assert_called_once() + _task = tasks[0] + item = _task() + assert item is None diff --git a/tests/unit/support/test_secrets.py b/tests/unit/support/test_secrets.py new file mode 100644 index 0000000000..f70dfba0d2 --- /dev/null +++ b/tests/unit/support/test_secrets.py @@ -0,0 +1,66 @@ +import json +from unittest.mock import MagicMock, call + +from databricks.sdk.service import workspace + +from databricks.labs.ucx.inventory.types import PermissionsInventoryItem +from databricks.labs.ucx.providers.groups_info import GroupMigrationState +from databricks.labs.ucx.support.secrets import SecretScopesSupport + + +def test_secret_scopes_crawler(): + ws = MagicMock() + ws.secrets.list_acls.return_value = [ + workspace.AclItem( + principal="test", + permission=workspace.AclPermission.MANAGE, + ) + ] + ws.secrets.list_scopes.return_value = [ + workspace.SecretScope( + name="test", + ) + ] + + sup = SecretScopesSupport(ws=ws) + tasks = list(sup.get_crawler_tasks()) + assert len(tasks) == 1 + ws.secrets.list_scopes.assert_called_once() + + _task = tasks[0] + item = _task() + + assert item.object_id == "test" + assert item.support == "secrets" + assert item.raw_object_permissions == '[{"permission": "MANAGE", "principal": "test"}]' + + +def test_secret_scopes_apply(migration_state: GroupMigrationState): + ws = MagicMock() + sup = SecretScopesSupport(ws=ws) + item = PermissionsInventoryItem( + object_id="test", + support="secrets", + raw_object_permissions=json.dumps( + [ + workspace.AclItem( + principal="test", + permission=workspace.AclPermission.MANAGE, + ).as_dict(), + workspace.AclItem( + principal="irrelevant", + permission=workspace.AclPermission.MANAGE, + ).as_dict(), + ] + ), + ) + + task = sup.get_apply_task(item, migration_state, "backup") + task() + assert ws.secrets.put_acl.call_count == 2 + + calls = [ + call("test", "db-temp-test", workspace.AclPermission.MANAGE), + call("test", "irrelevant", workspace.AclPermission.MANAGE), + ] + ws.secrets.put_acl.assert_has_calls(calls, any_order=False) diff --git a/tests/unit/support/test_sql.py b/tests/unit/support/test_sql.py new file mode 100644 index 0000000000..0c383f88fd --- /dev/null +++ b/tests/unit/support/test_sql.py @@ -0,0 +1,120 @@ +import json +from unittest.mock import MagicMock + +import pytest +from databricks.sdk.core import DatabricksError +from databricks.sdk.service import sql + +from databricks.labs.ucx.inventory.types import PermissionsInventoryItem +from databricks.labs.ucx.support.sql import SqlPermissionsSupport, listing_wrapper + + +def test_crawlers(): + ws = MagicMock() + + ws.alerts.list.return_value = [ + sql.Alert( + id="test", + ) + ] + ws.queries.list.return_value = [ + sql.Query( + id="test", + ) + ] + ws.dashboards.list.return_value = [sql.Dashboard(id="test")] + + sample_acl = [ + sql.AccessControl( + group_name="test", + permission_level=sql.PermissionLevel.CAN_MANAGE, + ) + ] + + ws.dbsql_permissions.get.side_effect = [ + sql.GetResponse(object_type=ot, object_id="test", access_control_list=sample_acl) + for ot in [sql.ObjectType.ALERT, sql.ObjectType.QUERY, sql.ObjectType.DASHBOARD] + ] + + sup = SqlPermissionsSupport( + ws=ws, + listings=[ + listing_wrapper(ws.alerts.list, sql.ObjectTypePlural.ALERTS), + listing_wrapper(ws.dashboards.list, sql.ObjectTypePlural.DASHBOARDS), + listing_wrapper(ws.queries.list, sql.ObjectTypePlural.QUERIES), + ], + ) + + tasks = list(sup.get_crawler_tasks()) + assert len(tasks) == 3 + ws.alerts.list.assert_called_once() + ws.queries.list.assert_called_once() + ws.dashboards.list.assert_called_once() + for task in tasks: + item = task() + assert item.object_id == "test" + assert item.support in ["alerts", "dashboards", "queries"] + assert item.raw_object_permissions is not None + + +def test_apply(migration_state): + ws = MagicMock() + sup = SqlPermissionsSupport(ws=ws, listings=[]) + item = PermissionsInventoryItem( + object_id="test", + support="alerts", + raw_object_permissions=json.dumps( + sql.GetResponse( + object_type=sql.ObjectType.ALERT, + object_id="test", + access_control_list=[ + sql.AccessControl( + group_name="test", + permission_level=sql.PermissionLevel.CAN_MANAGE, + ), + sql.AccessControl( + group_name="irrelevant", + permission_level=sql.PermissionLevel.CAN_MANAGE, + ), + ], + ).as_dict() + ), + ) + task = sup.get_apply_task(item, migration_state, "backup") + task() + assert ws.dbsql_permissions.set.call_count == 1 + expected_payload = [ + sql.AccessControl( + group_name="db-temp-test", + permission_level=sql.PermissionLevel.CAN_MANAGE, + ), + sql.AccessControl( + group_name="irrelevant", + permission_level=sql.PermissionLevel.CAN_MANAGE, + ), + ] + ws.dbsql_permissions.set.assert_called_once_with( + object_type=sql.ObjectTypePlural.ALERTS, object_id="test", acl=expected_payload + ) + + +def test_safe_getter_known(): + ws = MagicMock() + ws.dbsql_permissions.get.side_effect = DatabricksError(error_code="RESOURCE_DOES_NOT_EXIST") + sup = SqlPermissionsSupport(ws=ws, listings=[]) + assert sup._safe_get_dbsql_permissions(object_type=sql.ObjectTypePlural.ALERTS, object_id="test") is None + + +def test_safe_getter_unknown(): + ws = MagicMock() + ws.dbsql_permissions.get.side_effect = DatabricksError(error_code="SOMETHING_NON_EXPECTED") + sup = SqlPermissionsSupport(ws=ws, listings=[]) + with pytest.raises(DatabricksError): + sup._safe_get_dbsql_permissions(object_type=sql.ObjectTypePlural.ALERTS, object_id="test") + + +def test_empty_permissions(): + ws = MagicMock() + ws.dbsql_permissions.get.side_effect = DatabricksError(error_code="RESOURCE_DOES_NOT_EXIST") + sup = SqlPermissionsSupport(ws=ws, listings=[]) + assert sup._crawler_task(object_id="test", object_type=sql.ObjectTypePlural.ALERTS) is None diff --git a/tests/unit/test_generic.py b/tests/unit/test_generic.py new file mode 100644 index 0000000000..3f9cb12e9d --- /dev/null +++ b/tests/unit/test_generic.py @@ -0,0 +1,18 @@ +import pytest + +from databricks.labs.ucx.generic import StrEnum + + +def test_error(): + with pytest.raises(TypeError): + + class InvalidEnum(StrEnum): + A = 1 + + +def test_generate(): + class Sample(StrEnum): + A = "a" + B = "b" + + assert Sample._generate_next_value_("C", 3) == "C" diff --git a/tests/unit/test_group.py b/tests/unit/test_group.py index d9acd583c7..f6302fb8dc 100644 --- a/tests/unit/test_group.py +++ b/tests/unit/test_group.py @@ -81,7 +81,7 @@ def test_prepare_groups_in_environment_with_one_group_in_conf_should_return_migr de_group = Group(display_name="de", meta=ResourceMeta(resource_type="WorkspaceGroup")) backup_de_group = Group(display_name="dbr_backup_de", meta=ResourceMeta(resource_type="WorkspaceGroup")) - def my_side_effect(filter, **kwargs): # noqa: A002,ARG001 + def my_side_effect(filter, **kwargs): # noqa: A002 if filter == "displayName eq 'de'": return [de_group] elif filter == "displayName eq 'dbr_backup_de'": @@ -108,7 +108,7 @@ def test_prepare_groups_in_environment_with_multiple_groups_in_conf_should_retur client = Mock() - def api_client_side_effect(method, path, query, **kwargs): # noqa: ARG001 + def api_client_side_effect(method, path, query, **kwargs): if query == { "filter": "displayName eq 'ds'", "attributes": "id,displayName,meta,entitlements,roles,members", @@ -124,7 +124,7 @@ def api_client_side_effect(method, path, query, **kwargs): # noqa: ARG001 client.api_client.do.side_effect = api_client_side_effect - def list_side_effect(filter, **kwargs): # noqa: A002,ARG001 + def list_side_effect(filter, **kwargs): # noqa: A002 if filter == "displayName eq 'de'": return [de_group] elif filter == "displayName eq 'ds'": @@ -178,7 +178,7 @@ def test_prepare_groups_in_environment_with_backup_group_not_created_should_crea de_group = Group(display_name="de", meta=ResourceMeta(resource_type="WorkspaceGroup")) backup_de_group = Group(display_name="dbr_backup_de", meta=ResourceMeta(resource_type="WorkspaceGroup")) - def groups_list_side_effect(filter, **kwargs): # noqa: A002,ARG001 + def groups_list_side_effect(filter, **kwargs): # noqa: A002 if filter == "displayName eq 'de'": return [de_group] elif filter == "displayName eq 'dbr_backup_de'": @@ -205,7 +205,7 @@ def test_prepare_groups_in_environment_with_conf_in_auto_mode_should_populate_mi de_group = Group(display_name="de", meta=ResourceMeta(resource_type="WorkspaceGroup")) backup_de_group = Group(display_name="dbr_backup_de", meta=ResourceMeta(resource_type="WorkspaceGroup")) - def my_side_effect(filter, **kwargs): # noqa: A002,ARG001 + def my_side_effect(filter, **kwargs): # noqa: A002 if filter == "displayName eq 'de'": return [de_group] elif filter == "displayName eq 'dbr_backup_de'": @@ -230,7 +230,7 @@ def test_prepare_groups_in_environment_with_conf_in_auto_mode_and_backup_group_e de_group = Group(display_name="de", meta=ResourceMeta(resource_type="WorkspaceGroup")) backup_de_group = Group(display_name="dbr_backup_de", meta=ResourceMeta(resource_type="WorkspaceGroup")) - def my_side_effect(filter, **kwargs): # noqa: A002,ARG001 + def my_side_effect(filter, **kwargs): # noqa: A002 if filter == "displayName eq 'de'": return [de_group] elif filter == "displayName eq 'dbr_backup_de'": @@ -271,7 +271,7 @@ def test_prepare_groups_in_environment_with_no_groups_in_conf_and_backup_group_e de_group = Group(display_name="de", meta=ResourceMeta(resource_type="WorkspaceGroup")) - def my_side_effect(filter, **kwargs): # noqa: A002,ARG001 + def my_side_effect(filter, **kwargs): # noqa: A002 if filter == "displayName eq 'de'": return [de_group] elif filter == "displayName eq 'dbr_backup_de'": diff --git a/tests/unit/test_inventorizer.py b/tests/unit/test_inventorizer.py deleted file mode 100644 index 7f19f418c8..0000000000 --- a/tests/unit/test_inventorizer.py +++ /dev/null @@ -1,418 +0,0 @@ -import json -from unittest.mock import Mock - -import pytest -from databricks.sdk.service.compute import ClusterDetails -from databricks.sdk.service.iam import ComplexValue, Group, ObjectPermissions -from databricks.sdk.service.ml import Experiment, ExperimentTag -from databricks.sdk.service.workspace import AclPermission, ObjectInfo, ObjectType - -from databricks.labs.ucx.inventory.inventorizer import ( - AccessControlResponse, - AclItem, - DatabricksError, - Inventorizers, - LogicalObjectType, - ModelDatabricks, - PermissionsInventoryItem, - RequestObjectType, - RolesAndEntitlementsInventorizer, - SecretScope, - SecretScopeInventorizer, - StandardInventorizer, - TokensAndPasswordsInventorizer, - WorkspaceInventorizer, - experiments_listing, - models_listing, -) -from databricks.labs.ucx.providers.groups_info import ( - GroupMigrationState, - MigrationGroupInfo, -) - -CLUSTER_DETAILS = ClusterDetails(cluster_name="cn1", cluster_id="cid1") -OBJECT_PERMISSION = ObjectPermissions(object_id="oid1", object_type="ot1") -INVENTORY_ITEM = PermissionsInventoryItem( - object_id="cid1", - logical_object_type=LogicalObjectType.CLUSTER, - request_object_type=RequestObjectType.CLUSTERS, - raw_object_permissions=json.dumps(OBJECT_PERMISSION.as_dict()), -) - -PERMISSION_RESPONSE = { - "object_id": "tokens", - "object_type": "authorization", - "access_control_list": [ - { - "user_name": "un1", - "group_name": "gn1", - "service_principal_name": "sp1", - "display_name": "dn1", - "all_permissions": [], - } - ], -} -ACCESS_CONTROL_RESPONSE = [ - AccessControlResponse( - all_permissions=None, display_name="dn1", group_name="gn1", service_principal_name="sp1", user_name="un1" - ) -] - - -@pytest.fixture -def workspace_client(): - client = Mock() - client.clusters.list.return_value = [CLUSTER_DETAILS] - client.permissions.get.return_value = OBJECT_PERMISSION - return client - - -@pytest.fixture -def standard_inventorizer(workspace_client): - return StandardInventorizer( - workspace_client, - logical_object_type=LogicalObjectType.CLUSTER, - request_object_type=RequestObjectType.CLUSTERS, - listing_function=workspace_client.clusters.list, - id_attribute="cluster_id", - ) - - -@pytest.fixture -def tokens_passwords_inventorizer(workspace_client): - return TokensAndPasswordsInventorizer(workspace_client) - - -@pytest.fixture -def secret_scope_inventorizer(workspace_client): - return SecretScopeInventorizer(workspace_client) - - -@pytest.fixture -def workspace_inventorizer(workspace_client): - return WorkspaceInventorizer(workspace_client) - - -@pytest.fixture -def role_entitlements_inventorizer(workspace_client): - state = GroupMigrationState() - return RolesAndEntitlementsInventorizer(workspace_client, migration_state=state) - - -# Tests for StandardInventorizer - - -def test_standard_inventorizer_properties(standard_inventorizer): - assert standard_inventorizer.logical_object_type == LogicalObjectType.CLUSTER - assert standard_inventorizer.logical_object_types == [LogicalObjectType.CLUSTER] - - -def test_standard_inventorizer_get_permissions(standard_inventorizer): - ret_val = ObjectPermissions(object_id="foo") - standard_inventorizer._ws.permissions.get.return_value = ret_val - assert standard_inventorizer._get_permissions(RequestObjectType.CLUSTERS, "foo") == ret_val - - -def test_standard_inventorizer_safe_get_permissions(standard_inventorizer): - ret_val = ObjectPermissions(object_id="foo") - standard_inventorizer._ws.permissions.get.return_value = ret_val - assert standard_inventorizer._safe_get_permissions(RequestObjectType.CLUSTERS, "foo") == ret_val - - -def test_standard_inventorizer_safe_get_permissions_fail(standard_inventorizer): - ret_val = ObjectPermissions(object_id="foo") - standard_inventorizer._ws.permissions.get.return_value = ret_val - standard_inventorizer._ws.permissions.get.side_effect = DatabricksError - with pytest.raises(DatabricksError): - standard_inventorizer._safe_get_permissions(RequestObjectType.CLUSTERS, "foo") - - -def test_standard_inventorizer_safe_get_permissions_return_none(standard_inventorizer): - ret_val = ObjectPermissions(object_id="foo") - standard_inventorizer._ws.permissions.get.return_value = ret_val - standard_inventorizer._ws.permissions.get.side_effect = DatabricksError(error_code="RESOURCE_DOES_NOT_EXIST") - assert standard_inventorizer._safe_get_permissions(RequestObjectType.CLUSTERS, "foo") is None - - -def test_standard_inventorizer_preload(standard_inventorizer): - standard_inventorizer.preload() - assert standard_inventorizer._objects == [CLUSTER_DETAILS] - - -def test_standard_inventorizer_inventorize_permission(standard_inventorizer): - standard_inventorizer._ws.permissions.get.return_value = OBJECT_PERMISSION - standard_inventorizer.preload() - collected = standard_inventorizer.inventorize() - assert len(collected) == 1 - assert collected[0] == INVENTORY_ITEM - - -def test_standard_inventorizer_inventorize_no_permission(standard_inventorizer): - standard_inventorizer._ws.permissions.get.return_value = None - standard_inventorizer.preload() - collected = standard_inventorizer.inventorize() - assert len(collected) == 0 - - -# Tests for TokensAndPasswordsInventorizer - - -def test_tokens_password_inventorizer_properties(tokens_passwords_inventorizer): - assert tokens_passwords_inventorizer.logical_object_types == [LogicalObjectType.TOKEN, LogicalObjectType.PASSWORD] - - -def test_tokens_password_inventorizer_preload(tokens_passwords_inventorizer): - tokens_passwords_inventorizer._ws.api_client.do.return_value = PERMISSION_RESPONSE - tokens_passwords_inventorizer.preload() - assert tokens_passwords_inventorizer._tokens_acl == ACCESS_CONTROL_RESPONSE - assert tokens_passwords_inventorizer._passwords_acl == ACCESS_CONTROL_RESPONSE - - -def test_tokens_password_inventorizer_preload_fail(tokens_passwords_inventorizer): - tokens_passwords_inventorizer._ws.api_client.do.side_effect = DatabricksError - tokens_passwords_inventorizer.preload() - assert tokens_passwords_inventorizer._tokens_acl == [] - assert tokens_passwords_inventorizer._passwords_acl == [] - - -def test_tokens_password_inventorizer_inventorize(tokens_passwords_inventorizer): - tokens_passwords_inventorizer._ws.api_client.do.return_value = PERMISSION_RESPONSE - tokens_passwords_inventorizer.preload() - inventory = tokens_passwords_inventorizer.inventorize() - assert len(inventory) == 2 - - -def test_tokens_password_inventorizer_inventorize_no_acls(tokens_passwords_inventorizer): - assert tokens_passwords_inventorizer.inventorize() == [] - - -# Tests for SecretScopeInventorizer - - -def test_secret_scope_inventorizer_properties(secret_scope_inventorizer): - assert secret_scope_inventorizer.logical_object_types == [LogicalObjectType.SECRET_SCOPE] - - -def test_secret_scope_inventorizer_preload(secret_scope_inventorizer): - secret_scope_inventorizer.preload() - - -def test_secret_scope_inventorizer_acls(secret_scope_inventorizer): - scope = SecretScope(name="sc1") - - secret_scope_inventorizer._ws.secrets.list_acls.return_value = [] - acls = secret_scope_inventorizer._get_acls_for_scope(scope) - item = secret_scope_inventorizer._prepare_permissions_inventory_item(scope) - assert sum(1 for _ in acls) == 0 - assert item.raw_object_permissions == '{"acls": []}' - - secret_scope_inventorizer._ws.secrets.list_acls.return_value = [ - AclItem(principal="pr1", permission=AclPermission.MANAGE) - ] - acls = secret_scope_inventorizer._get_acls_for_scope(scope) - item = secret_scope_inventorizer._prepare_permissions_inventory_item(scope) - assert sum(1 for _ in acls) == 1 - assert item.raw_object_permissions == '{"acls": [{"principal": "pr1", "permission": "MANAGE"}]}' - - -def test_secret_scope_inventorizer_inventorize(secret_scope_inventorizer): - scope = SecretScope(name="sc1") - - secret_scope_inventorizer._ws.secrets.list_acls.return_value = [] - secret_scope_inventorizer._scopes = [scope] - inventory = secret_scope_inventorizer.inventorize() - assert len(inventory) == 1 - assert inventory[0].object_id == scope.name - - secret_scope_inventorizer._scopes = [] - inventory = secret_scope_inventorizer.inventorize() - assert len(inventory) == 0 - - -def test_models_listing(workspace_client): - workspace_client.model_registry.list_models.return_value = [] - f = models_listing(workspace_client) - models = list(f()) - assert len(models) == 0 - - model = ModelDatabricks(name="mn1", id="mid1") - workspace_client.model_registry.list_models.return_value = [model] - response = Mock() - response.registered_model_databricks = model - workspace_client.model_registry.get_model.return_value = response - f = models_listing(workspace_client) - models = list(f()) - assert len(models) == 1 - assert models[0].name == "mn1" - - -def test_experiments_listing(workspace_client): - # try without experiment present - workspace_client.experiments.list_experiments.return_value = [] - f = experiments_listing(workspace_client) - experiments = list(f()) - assert len(experiments) == 0 - - # try with one experiment present - experiment = Experiment(name="en1", experiment_id="eid1") - filtered_tags = [ExperimentTag(key="mlflow.experimentType", value="NOTEBOOK")] - unfiltered_tags = [ExperimentTag(key="foo", value="bar")] - all_tags = filtered_tags + unfiltered_tags - workspace_client.experiments.list_experiments.return_value = [experiment] - # first test without tags (means `None`) - # FIX: Bug in function, does not handle None tags! - # f = experiments_listing(workspace_client) - # experiments = list(f()) - # assert len(experiments) == 0 - - # with the tags filtering the experiment out - experiment.tags = filtered_tags - experiments = list(f()) - assert len(experiments) == 0 - - # with tags returning experiment - experiment.tags = unfiltered_tags - experiments = list(f()) - assert len(experiments) == 1 - assert experiments[0].name == "en1" - - # with all tags combined, dropping the experiment - experiment.tags = all_tags - experiments = list(f()) - assert len(experiments) == 0 - - # try with two experiments present - experiment.tags = unfiltered_tags - experiment2 = Experiment(name="en2", experiment_id="eid2") - experiment2.tags = filtered_tags - workspace_client.experiments.list_experiments.return_value = [experiment, experiment2] - experiments = list(f()) - assert len(experiments) == 1 - assert experiments[0].name == "en1" - - -def test_inventorizers_provide(workspace_client): - state = GroupMigrationState() - inventorizers = Inventorizers.provide(workspace_client, migration_state=state, num_threads=1, start_path="/") - assert len(inventorizers) > 0 - - -# Tests for WorkspaceInventorizer - - -def test_workspace_inventorizer_properties(workspace_inventorizer): - assert workspace_inventorizer.logical_object_types == [ - LogicalObjectType.NOTEBOOK, - LogicalObjectType.DIRECTORY, - LogicalObjectType.REPO, - LogicalObjectType.FILE, - ] - - -def test_workspace_inventorizer_preload(workspace_inventorizer): - workspace_inventorizer.preload() - - -# def test_workspace_inventorizer_static_converters(workspace_inventorizer): -# info = ObjectInfo(object_type=ObjectType.NOTEBOOK, object_id="oid1") -# WorkspaceInventorizer._WorkspaceInventorizer__convert_object_type_to_request_type() - - -def test_workspace_inventorizer_get_permissions(workspace_inventorizer): - ret_val = ObjectPermissions(object_type=str(ObjectType.NOTEBOOK), object_id="foo") - workspace_inventorizer._ws.permissions.get.return_value = ret_val - assert workspace_inventorizer._get_permissions(RequestObjectType.NOTEBOOKS, "foo") == ret_val - - -@pytest.mark.parametrize( - ["object_type", "request_type"], - [ - (None, None), - (ObjectType.NOTEBOOK, RequestObjectType.NOTEBOOKS), - (ObjectType.DIRECTORY, RequestObjectType.DIRECTORIES), - (ObjectType.LIBRARY, None), - (ObjectType.REPO, RequestObjectType.REPOS), - (ObjectType.FILE, RequestObjectType.FILES), - ], -) -def test_workspace_inventorizer_convert_object_to_permission(workspace_inventorizer, object_type, request_type): - info = ObjectInfo(object_type=object_type, object_id=1) - item = workspace_inventorizer._convert_result_to_permission_item(info) - assert ( - (object_type == ObjectType.LIBRARY and item is None) - or (object_type and item.request_object_type == request_type) - or item is None - ) - - -def test_workspace_inventorizer_convert_object_to_permission_no_perms(workspace_inventorizer): - info = ObjectInfo(object_type=ObjectType.NOTEBOOK, object_id=1) - workspace_inventorizer._ws.permissions.get.return_value = None - assert workspace_inventorizer._convert_result_to_permission_item(info) is None - - -def test_workspace_inventorizer_convert_object_to_permission_fail(workspace_inventorizer): - info = ObjectInfo(object_type=ObjectType.NOTEBOOK, object_id=1) - # Test case where remote exception is raised again - workspace_inventorizer._ws.permissions.get.side_effect = DatabricksError(error_code="bogus") - with pytest.raises(DatabricksError): - workspace_inventorizer._convert_result_to_permission_item(info) - # Test case where remote exception is converted to None - workspace_inventorizer._ws.permissions.get.side_effect = DatabricksError(error_code="PERMISSION_DENIED") - assert workspace_inventorizer._convert_result_to_permission_item(info) is None - - -def test_workspace_inventorizer_inventorize(workspace_inventorizer): - workspace_inventorizer._ws.workspace.list.return_value = iter([]) - items = workspace_inventorizer.inventorize() - assert len(items) == 0 - - objects = iter([ObjectInfo(object_type=ObjectType.NOTEBOOK, object_id=1)]) - workspace_inventorizer._ws.workspace.list.return_value = objects - items = workspace_inventorizer.inventorize() - assert len(items) == 1 - - -# Tests for RolesAndEntitlementsInventorizer - - -def test_role_entitlements_inventorizer_properties(role_entitlements_inventorizer): - assert role_entitlements_inventorizer.logical_object_types == [ - LogicalObjectType.ROLES, - LogicalObjectType.ENTITLEMENTS, - ] - - -def test_role_entitlements_inventorizer_preload(role_entitlements_inventorizer): - # Test empty groups - role_entitlements_inventorizer.preload() - assert len(role_entitlements_inventorizer._group_info) == 0 - - # Test with groups present - group = Group(display_name="grp1") - role_entitlements_inventorizer._migration_state.add( - MigrationGroupInfo(workspace=group, backup=group, account=group) - ) - role_entitlements_inventorizer.preload() - assert len(role_entitlements_inventorizer._group_info) == 1 - - -def test_role_entitlements_inventorizer_inventorize(role_entitlements_inventorizer): - role_entitlements_inventorizer._ws.groups.get.return_value = Group(display_name="grp1") - - # Test empty groups - role_entitlements_inventorizer.preload() - items = role_entitlements_inventorizer.inventorize() - assert len(items) == 0 - - # Test with groups present - roles = [ComplexValue(value="cv1"), ComplexValue(value="cv2")] - entitlements = [ComplexValue(value="cv3"), ComplexValue(value="cv4")] - group = Group(display_name="grp1", roles=roles, entitlements=entitlements) - role_entitlements_inventorizer._migration_state.add( - MigrationGroupInfo(workspace=group, backup=group, account=group) - ) - role_entitlements_inventorizer.preload() - items = role_entitlements_inventorizer.inventorize() - assert len(items) == 1 - assert items[0].object_id == "grp1" diff --git a/tests/unit/test_listing.py b/tests/unit/test_listing.py index 36aded06b3..cade7d979d 100644 --- a/tests/unit/test_listing.py +++ b/tests/unit/test_listing.py @@ -2,7 +2,7 @@ from databricks.sdk.service.workspace import ObjectInfo, ObjectType -from databricks.labs.ucx.inventory.listing import WorkspaceListing +from databricks.labs.ucx.support.listing import WorkspaceListing # Helper to compare an unordered list of objects @@ -69,7 +69,7 @@ def test_walk_with_nested_folders_should_return_nested_objects(): nested_folder = ObjectInfo(path="/rootPath/nested_folder", object_type=ObjectType.DIRECTORY) nested_notebook = ObjectInfo(path="/rootPath/nested_folder/notebook", object_type=ObjectType.NOTEBOOK) - def my_side_effect(path, **kwargs): # noqa: ARG001 + def my_side_effect(path, **kwargs): if path == "/rootPath": return [file, nested_folder] elif path == "/rootPath/nested_folder": @@ -98,7 +98,7 @@ def test_walk_with_three_level_nested_folders_returns_three_levels(): path="/rootPath/nested_folder/second_nested_folder/notebook2", object_type=ObjectType.NOTEBOOK ) - def my_side_effect(path, **kwargs): # noqa: ARG001 + def my_side_effect(path, **kwargs): if path == "/rootPath": return [file, nested_folder] elif path == "/rootPath/nested_folder": diff --git a/tests/unit/test_permissions.py b/tests/unit/test_permissions.py deleted file mode 100644 index f329d50800..0000000000 --- a/tests/unit/test_permissions.py +++ /dev/null @@ -1,423 +0,0 @@ -import json -from unittest.mock import Mock - -import pytest -from databricks.sdk.service.iam import AccessControlResponse, Group, ObjectPermissions - -from databricks.labs.ucx.inventory.permissions import ( - PermissionManager, - PermissionRequestPayload, - RolesAndEntitlementsRequestPayload, - SecretsPermissionRequestPayload, -) -from databricks.labs.ucx.inventory.types import ( - AclItem, - LogicalObjectType, - PermissionsInventoryItem, - RequestObjectType, - RolesAndEntitlements, -) -from databricks.labs.ucx.providers.groups_info import ( - GroupMigrationState, - MigrationGroupInfo, -) - - -def test_secrets_api(): - item = PermissionsInventoryItem( - object_id="scope-1", - logical_object_type=LogicalObjectType.SECRET_SCOPE, - request_object_type=None, - raw_object_permissions="""{"acls": [ - {"principal": "g1", "permission": "READ"}, - {"principal": "unrelated-group", "permission": "READ"}, - {"principal": "admins", "permission": "MANAGE"} - ]}""", - ) - - migration_state = GroupMigrationState() - migration_state.groups = [ - MigrationGroupInfo( - account=Group(display_name="g1"), - workspace=Group(display_name="g1"), - backup=Group(display_name="some-prefix-g1"), - ) - ] - - apply_backup = PermissionManager._prepare_permission_request_for_secrets_api(item, migration_state, "backup") - - assert len(apply_backup.access_control_list) == 1 - assert apply_backup.access_control_list[0].principal == "some-prefix-g1" - - apply_account = PermissionManager._prepare_permission_request_for_secrets_api(item, migration_state, "account") - - assert len(apply_account.access_control_list) == 1 - assert apply_account.access_control_list[0].principal == "g1" - - -def test_prepare_request_for_roles_and_entitlements(): - item = PermissionsInventoryItem( - object_id="group1", - logical_object_type=LogicalObjectType.ROLES, - request_object_type=None, - raw_object_permissions=json.dumps( - { - "roles": [ - {"value": "arn:aws:iam::123456789:instance-profile/test-uc-role"}, - {"value": "arn:aws:iam::123456789:instance-profile/test-uc-role2"}, - ], - "entitlements": [{"value": "workspace-access"}], - } - ), - ) - - migration_state = GroupMigrationState() - migration_state.groups = [ - MigrationGroupInfo( - account=Group(display_name="group1", id="group1"), - workspace=Group(display_name="group1", id="group1"), - backup=Group(display_name="some-prefix-group1", id="some-prefix-group1"), - ) - ] - - apply_backup = PermissionManager._prepare_request_for_roles_and_entitlements(item, migration_state, "backup") - - assert len(apply_backup.payload.roles) == 2 - assert len(apply_backup.payload.entitlements) == 1 - assert apply_backup.group_id == "some-prefix-group1" - - apply_account = PermissionManager._prepare_request_for_roles_and_entitlements(item, migration_state, "account") - - assert len(apply_account.payload.roles) == 2 - assert len(apply_account.payload.entitlements) == 1 - assert apply_account.group_id == "group1" - - -@pytest.mark.parametrize( - "item,acl_length,object_type, object_id", - [ - ( - PermissionsInventoryItem( - object_id="group1", - logical_object_type=LogicalObjectType.CLUSTER, - request_object_type=RequestObjectType.CLUSTERS, - raw_object_permissions=json.dumps( - ObjectPermissions( - object_id="clusterid1", - object_type="clusters", - access_control_list=[ - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_MANAGE"}], "group_name": "group1"} - ), - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_MANAGE"}], "group_name": "admin"} - ), - ], - ).as_dict() - ), - ), - 1, - LogicalObjectType.CLUSTER, - "group1", - ), - ( - PermissionsInventoryItem( - object_id="group1", - logical_object_type=LogicalObjectType.PASSWORD, - request_object_type=RequestObjectType.AUTHORIZATION, - raw_object_permissions=json.dumps( - ObjectPermissions( - object_id="passwords", - object_type="authorization", - access_control_list=[ - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_MANAGE"}], "group_name": "group1"} - ) - ], - ).as_dict() - ), - ), - 1, - LogicalObjectType.PASSWORD, - "group1", - ), - ( - PermissionsInventoryItem( - object_id="group1", - logical_object_type=LogicalObjectType.TOKEN, - request_object_type=RequestObjectType.AUTHORIZATION, - raw_object_permissions=json.dumps( - ObjectPermissions( - object_id="tokens", - object_type="authorization", - access_control_list=[ - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_MANAGE"}], "group_name": "group1"} - ) - ], - ).as_dict() - ), - ), - 1, - LogicalObjectType.TOKEN, - "group1", - ), - ( - PermissionsInventoryItem( - object_id="group1", - logical_object_type=LogicalObjectType.NOTEBOOK, - request_object_type=RequestObjectType.NOTEBOOKS, - raw_object_permissions=json.dumps( - ObjectPermissions( - object_id="notebook1", - object_type="notebooks", - access_control_list=[ - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_EDIT"}], "group_name": "group1"} - ), - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_MANAGE"}], "group_name": "admin"} - ), - ], - ).as_dict() - ), - ), - 1, - LogicalObjectType.NOTEBOOK, - "group1", - ), - ], -) -def test_prepare_request_for_permissions_api(item, acl_length, object_type, object_id): - migration_state = GroupMigrationState() - migration_state.groups = [ - MigrationGroupInfo( - account=Group(display_name="group1", id="group1"), - workspace=Group(display_name="group1", id="group1"), - backup=Group(display_name="some-prefix-group1", id="some-prefix-group1"), - ) - ] - - apply_backup = PermissionManager._prepare_request_for_permissions_api(item, migration_state, "backup") - - assert len(apply_backup.access_control_list) == acl_length - assert apply_backup.logical_object_type == object_type - assert apply_backup.object_id == object_id - - apply_account = PermissionManager._prepare_request_for_permissions_api(item, migration_state, "account") - - assert len(apply_account.access_control_list) == acl_length - assert apply_account.logical_object_type == object_type - assert apply_account.object_id == object_id - - -@pytest.mark.parametrize( - "item,object_type", - [ - ( - PermissionsInventoryItem( - object_id="group1", - logical_object_type=LogicalObjectType.CLUSTER, - request_object_type=RequestObjectType.CLUSTERS, - raw_object_permissions=json.dumps( - ObjectPermissions( - object_id="clusterid1", - object_type="clusters", - access_control_list=[ - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_MANAGE"}], "group_name": "group1"} - ), - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_MANAGE"}], "group_name": "admin"} - ), - ], - ).as_dict() - ), - ), - PermissionRequestPayload, - ), - ( - PermissionsInventoryItem( - object_id="group1", - logical_object_type=LogicalObjectType.ROLES, - request_object_type=None, - raw_object_permissions=json.dumps( - { - "roles": [ - {"value": "arn:aws:iam::123456789:instance-profile/test-uc-role"}, - {"value": "arn:aws:iam::123456789:instance-profile/test-uc-role2"}, - ], - "entitlements": [{"value": "workspace-access"}], - } - ), - ), - RolesAndEntitlementsRequestPayload, - ), - ( - PermissionsInventoryItem( - object_id="scope-1", - logical_object_type=LogicalObjectType.SECRET_SCOPE, - request_object_type=None, - raw_object_permissions="""{"acls": [ - {"principal": "g1", "permission": "READ"}, - {"principal": "unrelated-group", "permission": "READ"}, - {"principal": "admins", "permission": "MANAGE"} - ]}""", - ), - SecretsPermissionRequestPayload, - ), - ], -) -def test_prepare_new_permission_request(item, object_type): - migration_state = GroupMigrationState() - migration_state.groups = [ - MigrationGroupInfo( - account=Group(display_name="group1", id="group1"), - workspace=Group(display_name="group1", id="group1"), - backup=Group(display_name="some-prefix-group1", id="some-prefix-group1"), - ) - ] - perm_obj = PermissionManager(None, None) - apply_backup = perm_obj._prepare_new_permission_request(item, migration_state, "backup") - - assert isinstance(apply_backup, object_type) is True - - -@pytest.fixture -def workspace_client(): - client = Mock() - return client - - -def test_update_permissions(workspace_client): - perm_obj = PermissionManager(workspace_client, None) - workspace_client.permissions.update.return_value = ObjectPermissions(object_id="cluster1") - output = perm_obj._update_permissions(RequestObjectType.CLUSTERS, "clusterid1", None) - assert output == ObjectPermissions(object_id="cluster1") - - -def test_standard_permissions_applicator(workspace_client, mocker): - standard_perm = mocker.patch("databricks.labs.ucx.inventory.permissions.PermissionManager._update_permissions") - perm_obj = PermissionManager(workspace_client, None) - perm_obj._standard_permissions_applicator( - PermissionRequestPayload(None, RequestObjectType.CLUSTERS, "clusterid1", None) - ) - standard_perm.assert_called_with( - request_object_type=RequestObjectType.CLUSTERS, request_object_id="clusterid1", access_control_list=None - ) - - -def test_scope_permissions_applicator(workspace_client): - perm_obj = PermissionManager(workspace_client, None) - workspace_client.secrets.list_acls.return_value = [ - AclItem(principal="group1", permission="READ"), - AclItem(principal="group2", permission="MANAGE"), - ] - request_payload = SecretsPermissionRequestPayload( - object_id="scope-1", - access_control_list=[ - AclItem(principal="group1", permission="READ"), - AclItem(principal="group2", permission="MANAGE"), - ], - ) - perm_obj._scope_permissions_applicator(request_payload=request_payload) - - -def test_patch_workspace_group(workspace_client): - payload = { - "schemas": "urn:ietf:params:scim:api:messages:2.0:PatchOp", - "Operations": { - "op": "add", - "path": "entitlements", - "value": [{"value": "workspace-access"}], - }, - } - perm_obj = PermissionManager(workspace_client, None) - perm_obj._patch_workspace_group("group1", payload) - workspace_client.api_client.do.assert_called_with( - "PATCH", "/api/2.0/preview/scim/v2/Groups/group1", data=json.dumps(payload) - ) - - payload = { - "schemas": "urn:ietf:params:scim:api:messages:2.0:PatchOp", - "Operations": { - "op": "add", - "path": "roles", - "value": [{"value": "arn:aws:iam::123456789:instance-profile/test-uc-role"}], - }, - } - perm_obj = PermissionManager(workspace_client, None) - perm_obj._patch_workspace_group("group2", payload) - workspace_client.api_client.do.assert_called_with( - "PATCH", "/api/2.0/preview/scim/v2/Groups/group2", data=json.dumps(payload) - ) - - -def test_apply_roles_and_entitlements(workspace_client, mocker): - entitlements = [{"value": "workspace-access"}] - roles = [{"value": "arn:aws:iam::123456789:instance-profile/test-uc-role"}] - perm_obj = PermissionManager(workspace_client, None) - roles_perm = mocker.patch("databricks.labs.ucx.inventory.permissions.PermissionManager._patch_workspace_group") - perm_obj._apply_roles_and_entitlements("group1", roles, entitlements) - payload = { - "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp", "urn:ietf:params:scim:api:messages:2.0:PatchOp"], - "Operations": [ - { - "op": "add", - "path": "entitlements", - "value": [{"value": "workspace-access"}], - }, - { - "op": "add", - "path": "roles", - "value": [{"value": "arn:aws:iam::123456789:instance-profile/test-uc-role"}], - }, - ], - } - roles_perm.assert_called_with("group1", payload) - - -def test_applicator_roles(workspace_client, mocker): - roles_payload = RolesAndEntitlementsRequestPayload( - payload=RolesAndEntitlements( - roles=[{"value": "arn:aws:iam::123456789:instance-profile/test-uc-role"}], - entitlements=[{"value": "workspace-access"}], - ), - group_id="group1", - ) - perm_obj = PermissionManager(workspace_client, None) - roles_perm = mocker.patch( - "databricks.labs.ucx.inventory.permissions.PermissionManager._apply_roles_and_entitlements" - ) - perm_obj.applicator(roles_payload) - roles_perm.assert_called_with( - group_id="group1", - roles=[{"value": "arn:aws:iam::123456789:instance-profile/test-uc-role"}], - entitlements=[{"value": "workspace-access"}], - ) - - -def test_applicator_scope(workspace_client, mocker): - secret_payload = SecretsPermissionRequestPayload( - object_id="scope-1", - access_control_list=[ - AclItem(principal="group1", permission="READ"), - AclItem(principal="group2", permission="MANAGE"), - ], - ) - perm_obj = PermissionManager(workspace_client, None) - roles_perm = mocker.patch( - "databricks.labs.ucx.inventory.permissions.PermissionManager._scope_permissions_applicator" - ) - perm_obj.applicator(secret_payload) - roles_perm.assert_called_with(secret_payload) - - -def test_applicator_standard_permission(workspace_client, mocker): - standard_payload = PermissionRequestPayload(None, RequestObjectType.CLUSTERS, "clusterid1", None) - perm_obj = PermissionManager(workspace_client, None) - roles_perm = mocker.patch( - "databricks.labs.ucx.inventory.permissions.PermissionManager._standard_permissions_applicator" - ) - perm_obj.applicator(standard_payload) - roles_perm.assert_called_with(standard_payload) diff --git a/tests/unit/test_permissions_inventory.py b/tests/unit/test_permissions_inventory.py index 6a5dee8717..e5ee4a1bea 100644 --- a/tests/unit/test_permissions_inventory.py +++ b/tests/unit/test_permissions_inventory.py @@ -1,19 +1,13 @@ -import json from unittest.mock import Mock import pandas as pd import pytest -from databricks.sdk.service.iam import AccessControlResponse, ObjectPermissions from pyspark.sql.types import StringType, StructField, StructType from databricks.labs.ucx.inventory.permissions_inventory import ( PermissionsInventoryTable, ) -from databricks.labs.ucx.inventory.types import ( - LogicalObjectType, - PermissionsInventoryItem, - RequestObjectType, -) +from databricks.labs.ucx.inventory.types import PermissionsInventoryItem @pytest.fixture @@ -22,9 +16,6 @@ def workspace_client(): return client -perm_items = [PermissionsInventoryItem("object1", LogicalObjectType.CLUSTER, RequestObjectType.CLUSTERS, "test acl")] - - @pytest.fixture def permissions_inventory(workspace_client, mocker): mocker.patch("databricks.labs.ucx.providers.spark.SparkMixin._initialize_spark", Mock()) @@ -39,8 +30,7 @@ def test_table_schema(permissions_inventory): schema = StructType( [ StructField("object_id", StringType(), True), - StructField("logical_object_type", StringType(), True), - StructField("request_object_type", StringType(), True), + StructField("support", StringType(), True), StructField("raw_object_permissions", StringType(), True), ] ) @@ -57,6 +47,7 @@ def test_cleanup(permissions_inventory): def test_save(permissions_inventory): + perm_items = [PermissionsInventoryItem("object1", "clusters", "test acl")] permissions_inventory.save(perm_items) permissions_inventory.spark.createDataFrame.assert_called_once() @@ -65,138 +56,10 @@ def test_load_all(permissions_inventory): items = pd.DataFrame( { "object_id": ["object1"], - "logical_object_type": ["CLUSTER"], - "request_object_type": ["clusters"], + "support": ["clusters"], "raw_object_permissions": ["test acl"], } ) permissions_inventory._df.toPandas.return_value = items output = permissions_inventory.load_all() - assert output[0] == PermissionsInventoryItem( - "object1", LogicalObjectType.CLUSTER, RequestObjectType.CLUSTERS, "test acl" - ) - - -@pytest.mark.parametrize( - "items,groups,status", - [ - ( - PermissionsInventoryItem( - object_id="group1", - logical_object_type=LogicalObjectType.CLUSTER, - request_object_type=RequestObjectType.CLUSTERS, - raw_object_permissions=json.dumps( - ObjectPermissions( - object_id="clusterid1", - object_type="clusters", - access_control_list=[ - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_MANAGE"}], "group_name": "group1"} - ), - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_MANAGE"}], "group_name": "admin"} - ), - ], - ).as_dict() - ), - ), - ["group1", "group2"], - True, - ), - ( - PermissionsInventoryItem( - object_id="group1", - logical_object_type=LogicalObjectType.ROLES, - request_object_type=None, - raw_object_permissions=json.dumps( - { - "roles": [ - {"value": "arn:aws:iam::123456789:instance-profile/test-uc-role"}, - {"value": "arn:aws:iam::123456789:instance-profile/test-uc-role2"}, - ], - "entitlements": [{"value": "workspace-access"}], - } - ), - ), - ["group1", "group2"], - True, - ), - ( - PermissionsInventoryItem( - object_id="scope-1", - logical_object_type=LogicalObjectType.SECRET_SCOPE, - request_object_type=None, - raw_object_permissions="""{"acls": [ - {"principal": "g1", "permission": "READ"}, - {"principal": "unrelated-group", "permission": "READ"}, - {"principal": "admins", "permission": "MANAGE"} - ]}""", - ), - ["group1", "group2"], - False, - ), - ( - PermissionsInventoryItem( - object_id="scope-1", - logical_object_type=LogicalObjectType.SECRET_SCOPE, - request_object_type=None, - raw_object_permissions="""{"acls": [ - {"principal": "g1", "permission": "READ"}, - {"principal": "unrelated-group", "permission": "READ"}, - {"principal": "admins", "permission": "MANAGE"} - ]}""", - ), - ["g1", "group2"], - True, - ), - ], -) -def test_is_item_relevant_to_groups(permissions_inventory, items, groups, status): - assert permissions_inventory._is_item_relevant_to_groups(items, groups) is status - - -def test_is_item_relevant_to_groups_exception(permissions_inventory): - item = PermissionsInventoryItem("object1", "FOO", "BAR", "test acl") - with pytest.raises(NotImplementedError): - permissions_inventory._is_item_relevant_to_groups(item, ["g1"]) - - -def test_load_for_groups(permissions_inventory): - items = pd.DataFrame( - { - "object_id": ["group1"], - "logical_object_type": ["CLUSTER"], - "request_object_type": ["clusters"], - "raw_object_permissions": json.dumps( - ObjectPermissions( - object_id="clusterid1", - object_type="clusters", - access_control_list=[ - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_MANAGE"}], "group_name": "group1"} - ) - ], - ).as_dict() - ), - } - ) - groups = ["group1", "group2"] - permissions_inventory._df.toPandas.return_value = items - output = permissions_inventory.load_for_groups(groups) - assert output[0] == PermissionsInventoryItem( - object_id="group1", - logical_object_type=LogicalObjectType.CLUSTER, - request_object_type=RequestObjectType.CLUSTERS, - raw_object_permissions=json.dumps( - ObjectPermissions( - object_id="clusterid1", - object_type="clusters", - access_control_list=[ - AccessControlResponse.from_dict( - {"all_permissions": [{"permission_level": "CAN_MANAGE"}], "group_name": "group1"} - ) - ], - ).as_dict() - ), - ) - assert len(output) == 1 + assert output[0] == PermissionsInventoryItem("object1", support="clusters", raw_object_permissions="test acl") diff --git a/tests/unit/test_permissions_manager.py b/tests/unit/test_permissions_manager.py new file mode 100644 index 0000000000..fb28e7eb42 --- /dev/null +++ b/tests/unit/test_permissions_manager.py @@ -0,0 +1,88 @@ +import json +from unittest import mock +from unittest.mock import MagicMock + +import pytest +from databricks.sdk.service import iam + +from databricks.labs.ucx.inventory.permissions import PermissionManager +from databricks.labs.ucx.inventory.permissions_inventory import ( + PermissionsInventoryTable, +) +from databricks.labs.ucx.inventory.types import PermissionsInventoryItem +from databricks.labs.ucx.support.impl import SupportsProvider + + +@pytest.fixture(scope="function") +def spark_mixin(): + with mock.patch("databricks.labs.ucx.providers.spark.SparkMixin._initialize_spark", MagicMock()): + yield + + +def test_manager_inventorize(spark_mixin): + sup = SupportsProvider(ws=MagicMock(), num_threads=1, workspace_start_path="/") + pm = PermissionManager( + ws=MagicMock(), permissions_inventory=PermissionsInventoryTable("test", MagicMock()), supports_provider=sup + ) + + with mock.patch("databricks.labs.ucx.inventory.permissions.ThreadedExecution.run", MagicMock()) as run_mock: + pm.inventorize_permissions() + run_mock.assert_called_once() + + +def test_manager_apply(spark_mixin): + sup = SupportsProvider(ws=MagicMock(), num_threads=1, workspace_start_path="/") + inventory = MagicMock(spec=PermissionsInventoryTable) + inventory.load_all.return_value = [ + PermissionsInventoryItem( + object_id="test", + support="clusters", + raw_object_permissions=json.dumps( + iam.ObjectPermissions( + object_id="test", + object_type="clusters", + access_control_list=[ + iam.AccessControlResponse( + group_name="test", + all_permissions=[ + iam.Permission(inherited=False, permission_level=iam.PermissionLevel.CAN_USE) + ], + ) + ], + ).as_dict() + ), + ), + PermissionsInventoryItem( + object_id="test2", + support="cluster_policies", + raw_object_permissions=json.dumps( + iam.ObjectPermissions( + object_id="test", + object_type="cluster_policies", + access_control_list=[ + iam.AccessControlResponse( + group_name="test", + all_permissions=[ + iam.Permission(inherited=False, permission_level=iam.PermissionLevel.CAN_USE) + ], + ) + ], + ).as_dict() + ), + ), + ] + pm = PermissionManager(ws=MagicMock(), permissions_inventory=inventory, supports_provider=sup) + with mock.patch("databricks.labs.ucx.inventory.permissions.ThreadedExecution.run", MagicMock()) as run_mock: + pm.apply_group_permissions(migration_state=MagicMock(), destination="backup") + run_mock.assert_called_once() + + +def test_unregistered_support(): + sup = SupportsProvider(ws=MagicMock(), num_threads=1, workspace_start_path="/") + inventory = MagicMock(spec=PermissionsInventoryTable) + inventory.load_all.return_value = [ + PermissionsInventoryItem(object_id="test", support="SOME_NON_EXISTENT", raw_object_permissions="") + ] + pm = PermissionManager(ws=MagicMock(), permissions_inventory=inventory, supports_provider=sup) + with pytest.raises(ValueError): + pm.apply_group_permissions(migration_state=MagicMock(), destination="backup") diff --git a/tests/unit/test_serde.py b/tests/unit/test_serde.py deleted file mode 100644 index e039248853..0000000000 --- a/tests/unit/test_serde.py +++ /dev/null @@ -1,24 +0,0 @@ -import dataclasses - -from databricks.sdk.service.workspace import AclItem as SdkAclItem -from databricks.sdk.service.workspace import AclPermission as SdkAclPermission - -from databricks.labs.ucx.inventory.types import AclItemsContainer - - -def test_acl_items_container_serde(): - sdk_items = [ - SdkAclItem(principal="blah", permission=SdkAclPermission.READ), - SdkAclItem(principal="blah2", permission=SdkAclPermission.WRITE), - ] - - container = AclItemsContainer.from_sdk(sdk_items) - - after = container.to_sdk() - - assert after == sdk_items - - _dump = dataclasses.asdict(container) - _str = AclItemsContainer.from_dict(_dump) - - assert _str == container diff --git a/tests/unit/test_types.py b/tests/unit/test_types.py new file mode 100644 index 0000000000..e9423cafd7 --- /dev/null +++ b/tests/unit/test_types.py @@ -0,0 +1,7 @@ +from databricks.labs.ucx.inventory.types import RequestObjectType + + +def test_request_object_type(): + typed = RequestObjectType.AUTHORIZATION + assert typed == "authorization" + assert typed.__repr__() == "authorization" diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py new file mode 100644 index 0000000000..a47f264ce8 --- /dev/null +++ b/tests/unit/test_utils.py @@ -0,0 +1,11 @@ +from databricks.labs.ucx.utils import Request, noop + + +def test_req(): + req = Request({"test": "test"}) + assert req.as_dict() == {"test": "test"} + + +def test_noop(): + noop() + assert True