From 6c46b65bbb1289ad8ef8fdbaf48671a58f9e75d6 Mon Sep 17 00:00:00 2001 From: renardeinside Date: Thu, 27 Jul 2023 17:08:28 +0200 Subject: [PATCH 1/6] add workspace listing --- .../managers/inventory/inventorizer.py | 66 ++++++++++++++++++- .../managers/inventory/listing.py | 64 +++++++++++++++++- .../managers/inventory/permissions.py | 2 + .../managers/inventory/types.py | 3 + 4 files changed, 133 insertions(+), 2 deletions(-) diff --git a/src/uc_migration_toolkit/managers/inventory/inventorizer.py b/src/uc_migration_toolkit/managers/inventory/inventorizer.py index c54ce981e3..9b465d72a1 100644 --- a/src/uc_migration_toolkit/managers/inventory/inventorizer.py +++ b/src/uc_migration_toolkit/managers/inventory/inventorizer.py @@ -6,8 +6,14 @@ from databricks.sdk.core import DatabricksError from databricks.sdk.service.iam import AccessControlResponse, ObjectPermissions -from databricks.sdk.service.workspace import AclItem, SecretScope +from databricks.sdk.service.workspace import ( + AclItem, + ObjectInfo, + ObjectType, + SecretScope, +) +from uc_migration_toolkit.managers.inventory.listing import WorkspaceListing from uc_migration_toolkit.managers.inventory.types import ( AclItemsContainer, LogicalObjectType, @@ -170,3 +176,61 @@ def inventorize(self) -> list[PermissionsInventoryItem]: def preload(self): pass + + +class WorkspaceInventorizer(BaseInventorizer[InventoryObject]): + def __init__(self): + self.listing = WorkspaceListing( + provider.ws, + num_threads=config_provider.config.num_threads, + with_directories=False, + rate_limit=config_provider.config.rate_limit, + ) + + def preload(self): + pass + + @staticmethod + def __convert_object_type_to_request_type(_object: ObjectInfo) -> RequestObjectType | None: + match _object.object_type: + case ObjectType.NOTEBOOK: + return RequestObjectType.NOTEBOOKS + case ObjectType.DIRECTORY: + return RequestObjectType.DIRECTORIES + case ObjectType.LIBRARY: + return None + case ObjectType.REPO: + return RequestObjectType.REPOS + + @staticmethod + def __convert_request_object_type_to_logical_type(request_object_type: RequestObjectType) -> LogicalObjectType: + match request_object_type: + case RequestObjectType.NOTEBOOKS: + return LogicalObjectType.NOTEBOOK + case RequestObjectType.DIRECTORIES: + return LogicalObjectType.DIRECTORY + case RequestObjectType.REPOS: + return LogicalObjectType.REPO + + def _convert_result_to_permission_item(self, _object: ObjectInfo) -> PermissionsInventoryItem | None: + request_object_type = self.__convert_object_type_to_request_type(_object) + if not request_object_type: + return + else: + permissions = provider.ws.permissions.get(_object.object_type, _object.object_id) + + inventory_item = PermissionsInventoryItem( + object_id=str(_object.object_id), + logical_object_type=self.__convert_request_object_type_to_logical_type(request_object_type), + request_object_type=_object.object_type, + raw_object_permissions=json.dumps(permissions.as_dict()), + ) + return inventory_item + + def inventorize(self) -> list[PermissionsInventoryItem]: + self.listing.walk("/") + executables = [partial(self._convert_result_to_permission_item, _object) for _object in self.listing.results] + results = ThreadedExecution[PermissionsInventoryItem | None](executables).run() + results = [result for result in results if result] + logger.info(f"Permissions fetched for {len(results)} workspace objects") + return results diff --git a/src/uc_migration_toolkit/managers/inventory/listing.py b/src/uc_migration_toolkit/managers/inventory/listing.py index 6ca99ee848..c3be47890c 100644 --- a/src/uc_migration_toolkit/managers/inventory/listing.py +++ b/src/uc_migration_toolkit/managers/inventory/listing.py @@ -1,8 +1,15 @@ +import datetime as dt from collections.abc import Iterator +from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait from databricks.sdk.service.ml import ModelDatabricks +from databricks.sdk.service.workspace import ObjectType +from ratelimit import limits, sleep_and_retry -from uc_migration_toolkit.providers.client import provider +from uc_migration_toolkit.config import RateLimitConfig +from uc_migration_toolkit.providers.client import ImprovedWorkspaceClient, provider +from uc_migration_toolkit.providers.config import provider as config_provider +from uc_migration_toolkit.providers.logger import logger class CustomListing: @@ -15,3 +22,58 @@ def list_models() -> Iterator[ModelDatabricks]: for model in provider.ws.model_registry.list_models(): model_with_id = provider.ws.model_registry.get_model(model.name).registered_model_databricks yield model_with_id + + +class WorkspaceListing: + def __init__( + self, + ws: ImprovedWorkspaceClient, + num_threads: int, + *, + with_directories: bool = True, + rate_limit: RateLimitConfig | None = None, + ): + self.start_time = None + self._ws = ws + self.results = [] + self._num_threads = num_threads + self._with_directories = with_directories + self._counter = 0 + self._rate_limit = rate_limit if rate_limit else config_provider.config.rate_limit + + @sleep_and_retry + @limits(calls=self._rate_limit.max_requests_per_period, period=self._rate_limit.period_in_seconds) + def _rate_limited_listing(path: str) -> Iterator[ObjectType]: + return self._ws.workspace.list(path=path, recursive=False) + + self._rate_limited_listing = _rate_limited_listing + + def _progress_report(self, _): + self._counter += 1 + measuring_time = dt.datetime.now() + delta_from_start = measuring_time - self.start_time + rps = self._counter / delta_from_start.total_seconds() + if self._counter % 10 == 0: + logger.info( + f"Made {self._counter} workspace listing calls, " + f"collected {len(self.results)} objects, rps: {rps:.3f}/sec" + ) + + def _walk(self, _path: str): + futures = [] + with ThreadPoolExecutor(self._num_threads) as executor: + for _obj in self._rate_limited_listing(_path): + if _obj.object_type == ObjectType.DIRECTORY: + if self._with_directories: + self.results.append(_obj) + future = executor.submit(self._walk, _obj.path) + future.add_done_callback(self._progress_report) + futures.append(future) + else: + self.results.append(_obj) + wait(futures, return_when=ALL_COMPLETED) + + def walk(self, path: str): + self.start_time = dt.datetime.now() + self._walk(path) + self._progress_report(None) # report the final progress diff --git a/src/uc_migration_toolkit/managers/inventory/permissions.py b/src/uc_migration_toolkit/managers/inventory/permissions.py index 3fec229564..1096a864f1 100644 --- a/src/uc_migration_toolkit/managers/inventory/permissions.py +++ b/src/uc_migration_toolkit/managers/inventory/permissions.py @@ -11,6 +11,7 @@ SecretScopeInventorizer, StandardInventorizer, TokensAndPasswordsInventorizer, + WorkspaceInventorizer, ) from uc_migration_toolkit.managers.inventory.listing import CustomListing from uc_migration_toolkit.managers.inventory.table import InventoryTableManager @@ -98,6 +99,7 @@ def get_inventorizers(): id_attribute="id", ), SecretScopeInventorizer(), + WorkspaceInventorizer(), ] def inventorize_permissions(self): diff --git a/src/uc_migration_toolkit/managers/inventory/types.py b/src/uc_migration_toolkit/managers/inventory/types.py index 48171cd958..72a13b1794 100644 --- a/src/uc_migration_toolkit/managers/inventory/types.py +++ b/src/uc_migration_toolkit/managers/inventory/types.py @@ -41,6 +41,9 @@ def __repr__(self): class LogicalObjectType(StrEnum): + REPO = "REPO" + DIRECTORY = "DIRECTORY" + NOTEBOOK = "NOTEBOOK" SECRET_SCOPE = "SECRET_SCOPE" PASSWORD = "PASSWORD" TOKEN = "TOKEN" From 4f6a265e1b894323bf265109a69478c77717c23a Mon Sep 17 00:00:00 2001 From: renardeinside Date: Fri, 28 Jul 2023 15:12:54 +0200 Subject: [PATCH 2/6] flaky tests? --- .DS_Store | Bin 0 -> 6148 bytes README.md | 11 +-- pyproject.toml | 3 +- src/.DS_Store | Bin 0 -> 6148 bytes .../managers/inventory/inventorizer.py | 13 ++- .../managers/inventory/permissions.py | 71 +++++++++++----- .../managers/inventory/types.py | 1 + src/uc_migration_toolkit/utils.py | 9 +++ tests/integration/conftest.py | 76 +++++++++++++++++- tests/integration/test_e2e.py | 50 ++++++++---- tests/integration/utils.py | 10 +++ 11 files changed, 194 insertions(+), 50 deletions(-) create mode 100644 .DS_Store create mode 100644 src/.DS_Store diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..81229e8d26382bd3469cb3d9bb2de3f1954844ac GIT binary patch literal 6148 zcmeH~Jqp4=5QS&dB4Cr!avKle4VIuM@B*S@B?yZB9^E%TjnP_yyn&f-XEsBUS7b9H zqQmpN5$Q#wgBxXSVPuMYE)TiO>2iLYjtb*@FJ*K=2U&T%hcRwa*e@u>x3=Er<$CqZN!+^)bZi z-VT<$t|nVB+C_8t(7dzS6a&*}7cEF&S{)2jfC`Khm`C2*`M-mIoBu~GOsN1B_%j7` zvE6S6yi}g8AFpTiLso6w;GkcQ@b(jc#E#+>+ztE17GO=bASy8a2)GOkRN$uyya2`( B5q1Co literal 0 HcmV?d00001 diff --git a/README.md b/README.md index 4915df99e3..2f8404bb90 100644 --- a/README.md +++ b/README.md @@ -83,14 +83,9 @@ Security: Workspace: -- [ ] Notebooks in the Workspace FS -- [ ] Directories in the Workspace FS -- [ ] Files in the Workspace FS - -Repos: - -- [ ] User-level Repos -- [ ] Org-level Repos +- [ ] Notebooks +- [ ] Directories +- [ ] Repos Data access: diff --git a/pyproject.toml b/pyproject.toml index 7d13e5604d..5d2a96ee3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,8 @@ dependencies = [ "PyYAML>=6.0.0,<7.0.0", "ratelimit>=2.2.1,<3.0.0", "pandas>=2.0.3,<3.0.0", - "python-dotenv>=1.0.0,<=2.0.0" + "python-dotenv>=1.0.0,<=2.0.0", + "tenacity>=8.2.2,<9.0.0", ] [project.optional-dependencies] diff --git a/src/.DS_Store b/src/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..eee7abe7d7f15d24b8d0ad7ba9c5c808e9f98079 GIT binary patch literal 6148 zcmeHKyH3ME5S)b+k!Vs-P~I=_2UZlmfDho22!se&0;hMyckyY=J{rqHLXl{qS!-|Z z_I7-_DZE|)wm$5xffaxy-4S0t%+2@RXLeQ*BhvYf7d+zw4;VQ6n>qIq2W)YNw+X+; zJ7WWSK6|`!Z+O~$ZXf%8?M%FdW8t^b*A80b*A;CNe^^q!N>A)nZuE8E=)>6^@BXhgI`ob+T25VsSd( z-y$8>C2EucQedjUc`jF8|8MBO^#4}Hu^o?bH3?r qoCk$Nlw)F)V=lZLUqw>pHJ@|8D;yJp&Uny?`WbLtWK!V275D)itr^t- literal 0 HcmV?d00001 diff --git a/src/uc_migration_toolkit/managers/inventory/inventorizer.py b/src/uc_migration_toolkit/managers/inventory/inventorizer.py index 9b465d72a1..37f822be15 100644 --- a/src/uc_migration_toolkit/managers/inventory/inventorizer.py +++ b/src/uc_migration_toolkit/managers/inventory/inventorizer.py @@ -201,6 +201,11 @@ def __convert_object_type_to_request_type(_object: ObjectInfo) -> RequestObjectT return None case ObjectType.REPO: return RequestObjectType.REPOS + case ObjectType.FILE: + return RequestObjectType.FILES + # silent handler for experiments - they'll be inventorized by the experiments manager + case None: + return None @staticmethod def __convert_request_object_type_to_logical_type(request_object_type: RequestObjectType) -> LogicalObjectType: @@ -211,18 +216,22 @@ def __convert_request_object_type_to_logical_type(request_object_type: RequestOb return LogicalObjectType.DIRECTORY case RequestObjectType.REPOS: return LogicalObjectType.REPO + case RequestObjectType.FILES: + return LogicalObjectType.FILE def _convert_result_to_permission_item(self, _object: ObjectInfo) -> PermissionsInventoryItem | None: request_object_type = self.__convert_object_type_to_request_type(_object) if not request_object_type: return else: - permissions = provider.ws.permissions.get(_object.object_type, _object.object_id) + permissions = provider.ws.permissions.get( + request_object_type=request_object_type, request_object_id=_object.object_id + ) inventory_item = PermissionsInventoryItem( object_id=str(_object.object_id), logical_object_type=self.__convert_request_object_type_to_logical_type(request_object_type), - request_object_type=_object.object_type, + request_object_type=request_object_type, raw_object_permissions=json.dumps(permissions.as_dict()), ) return inventory_item diff --git a/src/uc_migration_toolkit/managers/inventory/permissions.py b/src/uc_migration_toolkit/managers/inventory/permissions.py index 1096a864f1..cd9a1dcaf9 100644 --- a/src/uc_migration_toolkit/managers/inventory/permissions.py +++ b/src/uc_migration_toolkit/managers/inventory/permissions.py @@ -1,3 +1,4 @@ +import itertools from copy import deepcopy from dataclasses import dataclass from functools import partial @@ -5,6 +6,7 @@ from databricks.sdk.service.iam import AccessControlRequest, Group, ObjectPermissions from databricks.sdk.service.workspace import AclItem as SdkAclItem +from tenacity import retry, stop_after_attempt, wait_fixed, wait_random from uc_migration_toolkit.managers.group import MigrationGroupsProvider from uc_migration_toolkit.managers.inventory.inventorizer import ( @@ -24,7 +26,7 @@ from uc_migration_toolkit.providers.client import provider from uc_migration_toolkit.providers.config import provider as config_provider from uc_migration_toolkit.providers.logger import logger -from uc_migration_toolkit.utils import ThreadedExecution +from uc_migration_toolkit.utils import ThreadedExecution, safe_get_acls @dataclass @@ -199,27 +201,38 @@ def _prepare_new_permission_request( ) @staticmethod - def _permission_applicator(request_payload: PermissionRequestPayload | SecretsPermissionRequestPayload): - if isinstance(request_payload, PermissionRequestPayload): - provider.ws.permissions.update( - request_object_type=request_payload.request_object_type, - request_object_id=request_payload.object_id, - access_control_list=request_payload.access_control_list, + @retry(wait=wait_fixed(1) + wait_random(0, 2), stop=stop_after_attempt(5)) + def _scope_permissions_applicator(request_payload: SecretsPermissionRequestPayload): + # TODO: rewrite and generalize this + for _acl_item in request_payload.access_control_list: + # this request will create OR update the ACL for the given principal + # it means that the access_control_list should only keep records required for update + provider.ws.secrets.put_acl( + scope=request_payload.object_id, principal=_acl_item.principal, permission=_acl_item.permission ) - elif isinstance(request_payload, SecretsPermissionRequestPayload): - for _acl_item in request_payload.access_control_list: - # this request will create OR update the ACL for the given principal - # it means that the access_control_list should only keep records required for update - provider.ws.secrets.put_acl( - scope=request_payload.object_id, principal=_acl_item.principal, permission=_acl_item.permission - ) - else: - logger.warning(f"Unsupported logical object type {request_payload}") + logger.info(f"Applied new permissions for scope {request_payload.object_id}: {_acl_item}") + # in-flight check for the applied permissions + applied_acls = safe_get_acls( + provider.ws, scope_name=request_payload.object_id, group_name=_acl_item.principal + ) + assert applied_acls, f"Failed to apply permissions for {_acl_item.principal}" + assert applied_acls.permission == _acl_item.permission, ( + f"Failed to apply permissions for {_acl_item.principal}. " + f"Expected: {_acl_item.permission}. Actual: {applied_acls.permission}" + ) + + @staticmethod + def _standard_permissions_applicator(request_payload: PermissionRequestPayload): + provider.ws.permissions.update( + request_object_type=request_payload.request_object_type, + request_object_id=request_payload.object_id, + access_control_list=request_payload.access_control_list, + ) - def _apply_permissions_in_parallel( + def _apply_standard_permissions_in_parallel( self, requests: list[PermissionRequestPayload | SecretsPermissionRequestPayload] ): - executables = [partial(self._permission_applicator, payload) for payload in requests] + executables = [partial(self._standard_permissions_applicator, payload) for payload in requests] execution = ThreadedExecution[None](executables) execution.run() @@ -232,12 +245,28 @@ def apply_group_permissions( permissions_on_source = self.inventory_table_manager.load_for_groups( groups=[g.workspace.display_name for g in migration_groups_provider.groups] ) - applicable_permissions = [ + permission_payloads: list[PermissionRequestPayload | SecretsPermissionRequestPayload] = [ self._prepare_new_permission_request(item, migration_groups_provider, destination=destination) for item in permissions_on_source ] + logger.info(f"Applying {len(permission_payloads)} permissions") - logger.info(f"Applying {len(applicable_permissions)} permissions") + generic_requests = [p for p in permission_payloads if not isinstance(p, SecretsPermissionRequestPayload)] - self._apply_permissions_in_parallel(requests=applicable_permissions) + scope_requests = [p for p in permission_payloads if isinstance(p, SecretsPermissionRequestPayload)] + + self._apply_standard_permissions_in_parallel(requests=generic_requests) + self._apply_scope_permissions(scope_requests=scope_requests) logger.info("All permissions were applied") + + def _apply_scope_permissions(self, scope_requests: list[SecretsPermissionRequestPayload]): + """ + Secret scope requests work really poor with parallel updates, therefore here we just apply them in sequence + :param scope_requests: + :return: + """ + chunked_by_scope = itertools.groupby(scope_requests, lambda x: x.object_id) + for scope, requests in chunked_by_scope: + logger.info(f"Applying permissions for scope {scope}") + for req in requests: + self._scope_permissions_applicator(req) diff --git a/src/uc_migration_toolkit/managers/inventory/types.py b/src/uc_migration_toolkit/managers/inventory/types.py index 72a13b1794..b111ad196e 100644 --- a/src/uc_migration_toolkit/managers/inventory/types.py +++ b/src/uc_migration_toolkit/managers/inventory/types.py @@ -41,6 +41,7 @@ def __repr__(self): class LogicalObjectType(StrEnum): + FILE = "FILE" REPO = "REPO" DIRECTORY = "DIRECTORY" NOTEBOOK = "NOTEBOOK" diff --git a/src/uc_migration_toolkit/utils.py b/src/uc_migration_toolkit/utils.py index ed78aaa740..ff0106dff8 100644 --- a/src/uc_migration_toolkit/utils.py +++ b/src/uc_migration_toolkit/utils.py @@ -6,6 +6,8 @@ from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor from typing import Generic, TypeVar +from databricks.sdk.core import DatabricksError +from databricks.sdk.service.workspace import AclItem from ratelimit import limits, sleep_and_retry from uc_migration_toolkit.config import RateLimitConfig @@ -113,3 +115,10 @@ def default(self, obj): if isinstance(obj, enum.Enum): return obj.name return json.JSONEncoder.default(self, obj) + + +def safe_get_acls(ws, scope_name: str, group_name: str) -> AclItem | None: + try: + return ws.secrets.get_acl(scope=scope_name, principal=group_name) + except DatabricksError: + return None diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 60752c307d..7302414366 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,3 +1,4 @@ +import io import json import os import random @@ -26,10 +27,16 @@ CreateWarehouseRequestWarehouseType, GetWarehouseResponse, ) -from databricks.sdk.service.workspace import AclPermission, SecretScope +from databricks.sdk.service.workspace import ( + AclPermission, + ObjectInfo, + ObjectType, + SecretScope, +) from utils import ( EnvironmentInfo, InstanceProfile, + WorkspaceObjects, _cleanup_groups, _create_groups, _get_basic_job_cluster, @@ -62,7 +69,7 @@ NUM_TEST_MODELS = int(os.environ.get("NUM_TEST_MODELS", 3)) NUM_TEST_WAREHOUSES = int(os.environ.get("NUM_TEST_WAREHOUSES", 3)) NUM_TEST_TOKENS = int(os.environ.get("NUM_TEST_TOKENS", 3)) -NUM_TEST_SECRET_SCOPES = int(os.environ.get("NUM_TEST_SECRET_SCOPES", 3)) +NUM_TEST_SECRET_SCOPES = int(os.environ.get("NUM_TEST_SECRET_SCOPES", 10)) NUM_THREADS = int(os.environ.get("NUM_TEST_THREADS", 20)) DB_CONNECT_CLUSTER_NAME = os.environ.get("DB_CONNECT_CLUSTER_NAME", "ucx-integration-testing") @@ -488,11 +495,74 @@ def secret_scopes(ws: ImprovedWorkspaceClient, env: EnvironmentInfo) -> list[Sec Threader(executables).run() +@pytest.fixture(scope="session", autouse=True) +def workspace_objects(ws: ImprovedWorkspaceClient, env: EnvironmentInfo) -> WorkspaceObjects: + logger.info(f"Creating test workspace objects under /{env.test_uid}") + ws.workspace.mkdirs(f"/{env.test_uid}") + + base_dirs = [] + + for ws_group, _ in env.groups: + _path = f"/{env.test_uid}/{ws_group.display_name}" + ws.workspace.mkdirs(_path) + object_info = ws.workspace.get_status(_path) + base_dirs.append(object_info) + + ws.permissions.set( + request_object_type=RequestObjectType.DIRECTORIES, + request_object_id=object_info.object_id, + access_control_list=[ + AccessControlRequest(group_name=ws_group.display_name, permission_level=PermissionLevel.CAN_MANAGE) + ], + ) + + notebooks = [] + + for nb_idx in range(3): + random_group = random.choice([g[0] for g in env.groups]) + _nb_path = f"/{env.test_uid}/{random_group.display_name}/nb-{nb_idx}.py" + ws.workspace.upload(path=_nb_path, content=io.BytesIO(b"print(1)")) + _nb_obj = ws.workspace.get_status(_nb_path) + notebooks.append(_nb_obj) + ws.permissions.set( + request_object_type=RequestObjectType.NOTEBOOKS, + request_object_id=_nb_obj.object_id, + access_control_list=[ + AccessControlRequest(group_name=random_group.display_name, permission_level=PermissionLevel.CAN_EDIT) + ], + ) + + yield WorkspaceObjects( + root_dir=ObjectInfo( + path=f"/{env.test_uid}", + object_type=ObjectType.DIRECTORY, + object_id=ws.workspace.get_status(f"/{env.test_uid}").object_id, + ), + directories=base_dirs, + notebooks=notebooks, + ) + + logger.debug("Deleting test workspace objects") + ws.workspace.delete(f"/{env.test_uid}", recursive=True) + logger.debug("Test workspace objects deleted") + + @pytest.fixture(scope="session", autouse=True) def verifiable_objects( - clusters, instance_pools, cluster_policies, pipelines, jobs, experiments, models, warehouses, tokens, secret_scopes + clusters, + instance_pools, + cluster_policies, + pipelines, + jobs, + experiments, + models, + warehouses, + tokens, + secret_scopes, + workspace_objects, ) -> list[tuple[list, str, RequestObjectType | None]]: _verifiable_objects = [ + (workspace_objects, "workspace_objects", None), (secret_scopes, "secret_scopes", None), (tokens, "tokens", RequestObjectType.AUTHORIZATION), (clusters, "cluster_id", RequestObjectType.CLUSTERS), diff --git a/tests/integration/test_e2e.py b/tests/integration/test_e2e.py index c7821a03cd..cac9e50ad5 100644 --- a/tests/integration/test_e2e.py +++ b/tests/integration/test_e2e.py @@ -1,17 +1,15 @@ from typing import Literal import pytest -from databricks.sdk.core import DatabricksError from databricks.sdk.service.iam import ( AccessControlRequest, AccessControlResponse, - Group, ObjectPermissions, Permission, ) -from databricks.sdk.service.workspace import AclItem, SecretScope +from databricks.sdk.service.workspace import SecretScope from pyspark.errors import AnalysisException -from utils import EnvironmentInfo +from utils import EnvironmentInfo, WorkspaceObjects from uc_migration_toolkit.config import ( GroupsConfig, @@ -24,19 +22,44 @@ from uc_migration_toolkit.providers.client import ImprovedWorkspaceClient from uc_migration_toolkit.providers.logger import logger from uc_migration_toolkit.toolkits.group_migration import GroupMigrationToolkit +from uc_migration_toolkit.utils import safe_get_acls def _verify_group_permissions( - objects: list | None, + objects: list | WorkspaceObjects | None, id_attribute: str, request_object_type: RequestObjectType | None, ws: ImprovedWorkspaceClient, toolkit: GroupMigrationToolkit, target: Literal["backup", "account"], ): - logger.debug(f"Verifying that the permissions of object {request_object_type} were applied to {target} groups") + logger.debug(f"Verifying that the permissions of object " + f"{request_object_type or id_attribute} were applied to {target} groups") - if id_attribute == "secret_scopes": + if id_attribute == "workspace_objects": + _workspace_objects: WorkspaceObjects = objects + + # list of groups that source the permissions + comparison_base = [ + getattr(mi, "workspace" if target == "backup" else "backup") + for mi in toolkit.group_manager.migration_groups_provider.groups + ] + # list of groups that are the target of the permissions + comparison_target = [getattr(mi, target) for mi in toolkit.group_manager.migration_groups_provider.groups] + + root_permissions = ws.permissions.get( + request_object_type=RequestObjectType.DIRECTORIES, request_object_id=_workspace_objects.root_dir.object_id + ) + base_group_names = [g.display_name for g in comparison_base] + target_group_names = [g.display_name for g in comparison_target] + + base_acls = [a for a in root_permissions.access_control_list if a.group_name in base_group_names] + + target_acls = [a for a in root_permissions.access_control_list if a.group_name in target_group_names] + + assert len(base_acls) == len(target_acls) + + elif id_attribute == "secret_scopes": _scopes: list[SecretScope] = objects comparison_base = [ getattr(mi, "workspace" if target == "backup" else "backup") @@ -45,17 +68,14 @@ def _verify_group_permissions( comparison_target = [getattr(mi, target) for mi in toolkit.group_manager.migration_groups_provider.groups] - def _safe_get_acls(_scope: SecretScope, _group: Group) -> AclItem | None: - try: - return ws.secrets.get_acl(scope=_scope.name, principal=_group.display_name) - except DatabricksError: - return None - for scope in _scopes: for base_group, target_group in zip(comparison_base, comparison_target, strict=True): - base_acl = _safe_get_acls(scope, base_group) - target_acl = _safe_get_acls(scope, target_group) + base_acl = safe_get_acls(ws, scope.name, base_group.display_name) + target_acl = safe_get_acls(ws, scope.name, target_group.display_name) + # TODO: for some reason, permissions were not correctly + # set for account-level group from the backup group + # check the permissions_applicator method for debugging. if not base_acl: assert not target_acl else: diff --git a/tests/integration/utils.py b/tests/integration/utils.py index c85b3112ad..04b0b94a91 100644 --- a/tests/integration/utils.py +++ b/tests/integration/utils.py @@ -14,6 +14,7 @@ User, ) from databricks.sdk.service.jobs import JobCluster, PythonWheelTask, Task +from databricks.sdk.service.workspace import ObjectInfo from dotenv import load_dotenv from uc_migration_toolkit.managers.inventory.types import RequestObjectType @@ -150,3 +151,12 @@ def _get_basic_task() -> Task: python_wheel_task=PythonWheelTask(entry_point="main", package_name="some-pkg"), job_cluster_key="default", ) + + +@dataclass +class WorkspaceObjects: + root_dir: ObjectInfo + notebooks: list[ObjectInfo] + directories: list[ObjectInfo] + # files: list[ObjectInfo] + # repos: list[ObjectInfo] From b20fec4441bfb8b3fbb72f8a60b0d9da1cd45a06 Mon Sep 17 00:00:00 2001 From: renardeinside Date: Fri, 28 Jul 2023 15:13:42 +0200 Subject: [PATCH 3/6] delete ds store --- .DS_Store | Bin 6148 -> 0 bytes .gitignore | 5 +++++ src/.DS_Store | Bin 6148 -> 0 bytes 3 files changed, 5 insertions(+) delete mode 100644 .DS_Store delete mode 100644 src/.DS_Store diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 81229e8d26382bd3469cb3d9bb2de3f1954844ac..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeH~Jqp4=5QS&dB4Cr!avKle4VIuM@B*S@B?yZB9^E%TjnP_yyn&f-XEsBUS7b9H zqQmpN5$Q#wgBxXSVPuMYE)TiO>2iLYjtb*@FJ*K=2U&T%hcRwa*e@u>x3=Er<$CqZN!+^)bZi z-VT<$t|nVB+C_8t(7dzS6a&*}7cEF&S{)2jfC`Khm`C2*`M-mIoBu~GOsN1B_%j7` zvE6S6yi}g8AFpTiLso6w;GkcQ@b(jc#E#+>+ztE17GO=bASy8a2)GOkRN$uyya2`( B5q1Co diff --git a/.gitignore b/.gitignore index 5494ed79a8..08bf4a2798 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ +# macos + +.DS_Store +*.DS_Store + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/src/.DS_Store b/src/.DS_Store deleted file mode 100644 index eee7abe7d7f15d24b8d0ad7ba9c5c808e9f98079..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHKyH3ME5S)b+k!Vs-P~I=_2UZlmfDho22!se&0;hMyckyY=J{rqHLXl{qS!-|Z z_I7-_DZE|)wm$5xffaxy-4S0t%+2@RXLeQ*BhvYf7d+zw4;VQ6n>qIq2W)YNw+X+; zJ7WWSK6|`!Z+O~$ZXf%8?M%FdW8t^b*A80b*A;CNe^^q!N>A)nZuE8E=)>6^@BXhgI`ob+T25VsSd( z-y$8>C2EucQedjUc`jF8|8MBO^#4}Hu^o?bH3?r qoCk$Nlw)F)V=lZLUqw>pHJ@|8D;yJp&Uny?`WbLtWK!V275D)itr^t- From dfab8d2865b895d8c0600b94382d2782b6180c23 Mon Sep 17 00:00:00 2001 From: renardeinside Date: Fri, 28 Jul 2023 16:05:16 +0200 Subject: [PATCH 4/6] add stabilization requests for secret scope api --- .../managers/inventory/permissions.py | 25 +++++++++++-------- src/uc_migration_toolkit/utils.py | 21 +++++----------- tests/integration/test_e2e.py | 21 +++++++++------- 3 files changed, 33 insertions(+), 34 deletions(-) diff --git a/src/uc_migration_toolkit/managers/inventory/permissions.py b/src/uc_migration_toolkit/managers/inventory/permissions.py index cd9a1dcaf9..4fbfd31ad3 100644 --- a/src/uc_migration_toolkit/managers/inventory/permissions.py +++ b/src/uc_migration_toolkit/managers/inventory/permissions.py @@ -1,4 +1,6 @@ import itertools +import random +import time from copy import deepcopy from dataclasses import dataclass from functools import partial @@ -210,16 +212,19 @@ def _scope_permissions_applicator(request_payload: SecretsPermissionRequestPaylo provider.ws.secrets.put_acl( scope=request_payload.object_id, principal=_acl_item.principal, permission=_acl_item.permission ) - logger.info(f"Applied new permissions for scope {request_payload.object_id}: {_acl_item}") + logger.debug(f"Applied new permissions for scope {request_payload.object_id}: {_acl_item}") # in-flight check for the applied permissions - applied_acls = safe_get_acls( - provider.ws, scope_name=request_payload.object_id, group_name=_acl_item.principal - ) - assert applied_acls, f"Failed to apply permissions for {_acl_item.principal}" - assert applied_acls.permission == _acl_item.permission, ( - f"Failed to apply permissions for {_acl_item.principal}. " - f"Expected: {_acl_item.permission}. Actual: {applied_acls.permission}" - ) + # the api might be inconsistent, therefore we need to check that the permissions were applied + for _ in range(3): + time.sleep(random.random() * 2) + applied_acls = safe_get_acls( + provider.ws, scope_name=request_payload.object_id, group_name=_acl_item.principal + ) + assert applied_acls, f"Failed to apply permissions for {_acl_item.principal}" + assert applied_acls.permission == _acl_item.permission, ( + f"Failed to apply permissions for {_acl_item.principal}. " + f"Expected: {_acl_item.permission}. Actual: {applied_acls.permission}" + ) @staticmethod def _standard_permissions_applicator(request_payload: PermissionRequestPayload): @@ -267,6 +272,6 @@ def _apply_scope_permissions(self, scope_requests: list[SecretsPermissionRequest """ chunked_by_scope = itertools.groupby(scope_requests, lambda x: x.object_id) for scope, requests in chunked_by_scope: - logger.info(f"Applying permissions for scope {scope}") + logger.debug(f"Applying permissions for scope {scope}") for req in requests: self._scope_permissions_applicator(req) diff --git a/src/uc_migration_toolkit/utils.py b/src/uc_migration_toolkit/utils.py index ff0106dff8..d7de6d6ff2 100644 --- a/src/uc_migration_toolkit/utils.py +++ b/src/uc_migration_toolkit/utils.py @@ -1,16 +1,15 @@ import concurrent import datetime as dt import enum -import json from collections.abc import Callable from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor from typing import Generic, TypeVar -from databricks.sdk.core import DatabricksError from databricks.sdk.service.workspace import AclItem from ratelimit import limits, sleep_and_retry from uc_migration_toolkit.config import RateLimitConfig +from uc_migration_toolkit.providers.client import ImprovedWorkspaceClient from uc_migration_toolkit.providers.config import provider as config_provider from uc_migration_toolkit.providers.logger import logger @@ -109,16 +108,8 @@ class WorkspaceLevelEntitlement(StrEnum): ALLOW_INSTANCE_POOL_CREATE = "allow-instance-pool-create" -# TODO: using this because SDK doesn't know how to properly write enums, highlight this to the SDK team -class EnumEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, enum.Enum): - return obj.name - return json.JSONEncoder.default(self, obj) - - -def safe_get_acls(ws, scope_name: str, group_name: str) -> AclItem | None: - try: - return ws.secrets.get_acl(scope=scope_name, principal=group_name) - except DatabricksError: - return None +def safe_get_acls(ws: ImprovedWorkspaceClient, scope_name: str, group_name: str) -> AclItem | None: + all_acls = ws.secrets.list_acls(scope=scope_name) + for acl in all_acls: + if acl.principal == group_name: + return acl diff --git a/tests/integration/test_e2e.py b/tests/integration/test_e2e.py index cac9e50ad5..dfa7133524 100644 --- a/tests/integration/test_e2e.py +++ b/tests/integration/test_e2e.py @@ -33,8 +33,10 @@ def _verify_group_permissions( toolkit: GroupMigrationToolkit, target: Literal["backup", "account"], ): - logger.debug(f"Verifying that the permissions of object " - f"{request_object_type or id_attribute} were applied to {target} groups") + logger.debug( + f"Verifying that the permissions of object " + f"{request_object_type or id_attribute} were applied to {target} groups" + ) if id_attribute == "workspace_objects": _workspace_objects: WorkspaceObjects = objects @@ -73,13 +75,14 @@ def _verify_group_permissions( base_acl = safe_get_acls(ws, scope.name, base_group.display_name) target_acl = safe_get_acls(ws, scope.name, target_group.display_name) - # TODO: for some reason, permissions were not correctly - # set for account-level group from the backup group - # check the permissions_applicator method for debugging. - if not base_acl: - assert not target_acl - else: - assert base_acl.permission == target_acl.permission + if base_acl: + if not target_acl: + msg = "Target ACL is empty, while base ACL is not" + raise AssertionError(msg) + + assert ( + base_acl.permission == target_acl.permission + ), f"Target permissions were not applied correctly for scope {scope.name}" elif id_attribute in ("tokens", "passwords"): _typed_objects: list[AccessControlRequest] = objects From 401e85f5b1f82a3b457ea952139776698ccc588d Mon Sep 17 00:00:00 2001 From: renardeinside Date: Fri, 28 Jul 2023 16:15:46 +0200 Subject: [PATCH 5/6] make secrets api parallel --- .../managers/inventory/permissions.py | 32 +++++++------------ 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/src/uc_migration_toolkit/managers/inventory/permissions.py b/src/uc_migration_toolkit/managers/inventory/permissions.py index 4fbfd31ad3..33b33c6762 100644 --- a/src/uc_migration_toolkit/managers/inventory/permissions.py +++ b/src/uc_migration_toolkit/managers/inventory/permissions.py @@ -1,4 +1,3 @@ -import itertools import random import time from copy import deepcopy @@ -234,10 +233,18 @@ def _standard_permissions_applicator(request_payload: PermissionRequestPayload): access_control_list=request_payload.access_control_list, ) - def _apply_standard_permissions_in_parallel( + def applicator(self, request_payload: PermissionRequestPayload | SecretsPermissionRequestPayload): + if isinstance(request_payload, PermissionRequestPayload): + self._standard_permissions_applicator(request_payload) + elif isinstance(request_payload, SecretsPermissionRequestPayload): + self._scope_permissions_applicator(request_payload) + else: + logger.warning(f"Unsupported payload type {type(request_payload)}") + + def _apply_permissions_in_parallel( self, requests: list[PermissionRequestPayload | SecretsPermissionRequestPayload] ): - executables = [partial(self._standard_permissions_applicator, payload) for payload in requests] + executables = [partial(self.applicator, payload) for payload in requests] execution = ThreadedExecution[None](executables) execution.run() @@ -256,22 +263,5 @@ def apply_group_permissions( ] logger.info(f"Applying {len(permission_payloads)} permissions") - generic_requests = [p for p in permission_payloads if not isinstance(p, SecretsPermissionRequestPayload)] - - scope_requests = [p for p in permission_payloads if isinstance(p, SecretsPermissionRequestPayload)] - - self._apply_standard_permissions_in_parallel(requests=generic_requests) - self._apply_scope_permissions(scope_requests=scope_requests) + self._apply_permissions_in_parallel(requests=permission_payloads) logger.info("All permissions were applied") - - def _apply_scope_permissions(self, scope_requests: list[SecretsPermissionRequestPayload]): - """ - Secret scope requests work really poor with parallel updates, therefore here we just apply them in sequence - :param scope_requests: - :return: - """ - chunked_by_scope = itertools.groupby(scope_requests, lambda x: x.object_id) - for scope, requests in chunked_by_scope: - logger.debug(f"Applying permissions for scope {scope}") - for req in requests: - self._scope_permissions_applicator(req) From 907e6712ee5076b23475210de0165d156dd80ed4 Mon Sep 17 00:00:00 2001 From: renardeinside Date: Fri, 28 Jul 2023 16:17:23 +0200 Subject: [PATCH 6/6] fix readme --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 2f8404bb90..a8c915536d 100644 --- a/README.md +++ b/README.md @@ -83,9 +83,10 @@ Security: Workspace: -- [ ] Notebooks -- [ ] Directories -- [ ] Repos +- [x] Notebooks +- [x] Directories +- [x] Repos +- [x] Files Data access: