Skip to content

Commit

Permalink
Removed redundant pyspark, databricks-connect, delta-spark, and…
Browse files Browse the repository at this point in the history
… `pandas` dependencies (#193)

This PR removes redundant pyspark, databricks-connect, delta-spark, and
pandas dependencies and their usages.

After it we can use consistent crawlers across HMS Crawling and
Workspace Permissions.

This PR supersedes and closes #105
  • Loading branch information
nfx authored Sep 13, 2023
1 parent 0898bd6 commit f4e5989
Show file tree
Hide file tree
Showing 12 changed files with 95 additions and 204 deletions.
14 changes: 1 addition & 13 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,11 @@ dependencies = [

# TODO: remove later
"typer[all]>=0.9.0,<0.10.0",
"pandas>=2.0.3,<3.0.0",
"ratelimit>=2.2.1,<3.0.0",
"tenacity>=8.2.2,<9.0.0",
]

[project.optional-dependencies]
dbconnect = [
"databricks-connect>=13.2.0,<=14.0.0"
]
test = [
"coverage[toml]>=6.5",
"pytest",
Expand All @@ -62,9 +58,7 @@ path = "src/databricks/labs/ucx/__about__.py"

[tool.hatch.envs.unit]
dependencies = [
"databricks-labs-ucx[test]",
"pyspark>=3.4.0,<=3.5.0",
"delta-spark>=2.4.0,<3.0.0"
"databricks-labs-ucx[test]"
]

[tool.hatch.envs.unit.scripts]
Expand All @@ -74,8 +68,6 @@ test-cov-report = "pytest --cov src tests/unit --cov-report=html"
[tool.hatch.envs.integration]
dependencies = [
"databricks-labs-ucx[test]",
"databricks-labs-ucx[dbconnect]",
"delta-spark>=2.4.0,<3.0.0"
]

[tool.hatch.envs.integration.scripts]
Expand Down Expand Up @@ -108,10 +100,6 @@ profile = "black"

[tool.pytest.ini_options]
addopts = "-s -p no:warnings -vv --cache-clear"
filterwarnings = [
"ignore:::.*pyspark.broadcast*",
"ignore:::.*pyspark.sql.pandas.utils*"
]

[tool.black]
target-version = ["py310"]
Expand Down
7 changes: 2 additions & 5 deletions src/databricks/labs/ucx/inventory/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from databricks.labs.ucx.inventory.permissions_inventory import (
PermissionsInventoryTable,
)
from databricks.labs.ucx.inventory.types import PermissionsInventoryItem
from databricks.labs.ucx.providers.groups_info import GroupMigrationState
from databricks.labs.ucx.support.impl import SupportsProvider
from databricks.labs.ucx.utils import ThreadedExecution
Expand All @@ -28,8 +27,7 @@ def inventorize_permissions(self):
crawler_tasks = list(self._supports_provider.get_crawler_tasks())
logger.info(f"Total crawler tasks: {len(crawler_tasks)}")
logger.info("Starting the permissions inventorization")
execution = ThreadedExecution[PermissionsInventoryItem | None](crawler_tasks)
results = execution.run()
results = ThreadedExecution.gather("crawl permissions", crawler_tasks)
items = [item for item in results if item is not None]
logger.info(f"Total inventorized items: {len(items)}")
self._permissions_inventory.save(items)
Expand Down Expand Up @@ -62,6 +60,5 @@ def apply_group_permissions(self, migration_state: GroupMigrationState, destinat

logger.info(f"Total applier tasks: {len(applier_tasks)}")
logger.info("Starting the permissions application")
execution = ThreadedExecution(applier_tasks)
execution.run()
ThreadedExecution.gather("apply permissions", applier_tasks)
logger.info("Permissions were applied")
49 changes: 15 additions & 34 deletions src/databricks/labs/ucx/inventory/permissions_inventory.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,31 @@
import logging

from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.inventory.types import PermissionsInventoryItem
from databricks.labs.ucx.providers.spark import SparkMixin
from databricks.labs.ucx.tacl._internal import CrawlerBase, SqlBackend

logger = logging.getLogger(__name__)


class PermissionsInventoryTable(SparkMixin):
def __init__(self, inventory_database: str, ws: WorkspaceClient):
super().__init__(ws)
self._table = f"hive_metastore.{inventory_database}.permissions"

@property
def _table_schema(self):
from pyspark.sql.types import StringType, StructField, StructType

return StructType(
[
StructField("object_id", StringType(), True),
StructField("support", StringType(), True),
StructField("raw_object_permissions", StringType(), True),
]
)

@property
def _df(self):
return self.spark.table(self._table)
class PermissionsInventoryTable(CrawlerBase):
def __init__(self, backend: SqlBackend, inventory_database: str):
super().__init__(backend, "hive_metastore", inventory_database, "permissions")

def cleanup(self):
logger.info(f"Cleaning up inventory table {self._table}")
self.spark.sql(f"DROP TABLE IF EXISTS {self._table}")
logger.info(f"Cleaning up inventory table {self._full_name}")
self._exec(f"DROP TABLE IF EXISTS {self._full_name}")
logger.info("Inventory table cleanup complete")

def save(self, items: list[PermissionsInventoryItem]):
# TODO: update instead of append
logger.info(f"Saving {len(items)} items to inventory table {self._table}")
serialized_items = [item.as_dict() for item in items]
df = self.spark.createDataFrame(serialized_items, schema=self._table_schema)
df.write.mode("append").format("delta").saveAsTable(self._table)
logger.info(f"Saving {len(items)} items to inventory table {self._full_name}")
self._append_records(PermissionsInventoryItem, items)
logger.info("Successfully saved the items to inventory table")

def load_all(self) -> list[PermissionsInventoryItem]:
logger.info(f"Loading inventory table {self._table}")
df = self._df.toPandas()

logger.info("Successfully loaded the inventory table")
return PermissionsInventoryItem.from_pandas(df)
logger.info(f"Loading inventory table {self._full_name}")
return [
PermissionsInventoryItem(object_id, support, raw_object_permissions)
for object_id, support, raw_object_permissions in self._fetch(
f"SELECT object_id, support, raw_object_permissions FROM {self._full_name}"
)
]
20 changes: 1 addition & 19 deletions src/databricks/labs/ucx/inventory/types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from dataclasses import asdict, dataclass
from dataclasses import dataclass
from typing import Literal

import pandas as pd

from databricks.labs.ucx.generic import StrEnum

Destination = Literal["backup", "account"]
Expand Down Expand Up @@ -32,19 +30,3 @@ class PermissionsInventoryItem:
object_id: str
support: str # shall be taken from CRAWLERS dict
raw_object_permissions: str

@staticmethod
def from_pandas(source: pd.DataFrame) -> list["PermissionsInventoryItem"]:
items = source.to_dict(orient="records")
return [PermissionsInventoryItem.from_dict(item) for item in items]

def as_dict(self) -> dict:
return asdict(self)

@classmethod
def from_dict(cls, raw: dict) -> "PermissionsInventoryItem":
return cls(
object_id=raw["object_id"],
raw_object_permissions=raw["raw_object_permissions"],
support=raw["support"],
)
39 changes: 0 additions & 39 deletions src/databricks/labs/ucx/providers/spark.py

This file was deleted.

16 changes: 14 additions & 2 deletions src/databricks/labs/ucx/toolkits/group_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@
from databricks.labs.ucx.inventory.verification import VerificationManager
from databricks.labs.ucx.managers.group import GroupManager
from databricks.labs.ucx.support.impl import SupportsProvider
from databricks.labs.ucx.tacl._internal import (
RuntimeBackend,
SqlBackend,
StatementExecutionBackend,
)


class GroupMigrationToolkit:
def __init__(self, config: MigrationConfig):
def __init__(self, config: MigrationConfig, *, warehouse_id=None):
self._num_threads = config.num_threads
self._workspace_start_path = config.workspace_start_path

Expand All @@ -27,13 +32,20 @@ def __init__(self, config: MigrationConfig):
self._verify_ws_client(self._ws)

self._group_manager = GroupManager(self._ws, config.groups)
self._permissions_inventory = PermissionsInventoryTable(config.inventory_database, self._ws)
sql_backend = self._backend(self._ws, warehouse_id)
self._permissions_inventory = PermissionsInventoryTable(sql_backend, config.inventory_database)
self._supports_provider = SupportsProvider(self._ws, self._num_threads, self._workspace_start_path)
self._permissions_manager = PermissionManager(
self._ws, self._permissions_inventory, supports_provider=self._supports_provider
)
self._verification_manager = VerificationManager(self._ws, self._supports_provider.supports["secrets"])

@staticmethod
def _backend(ws: WorkspaceClient, warehouse_id: str | None = None) -> SqlBackend:
if warehouse_id is None:
return RuntimeBackend()
return StatementExecutionBackend(ws, warehouse_id)

@staticmethod
def _verify_ws_client(w: WorkspaceClient):
_me = w.current_user.me()
Expand Down
4 changes: 1 addition & 3 deletions src/databricks/labs/ucx/toolkits/table_acls.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ def __init__(
self._tc = TablesCrawler(self._backend(ws, warehouse_id), inventory_catalog, inventory_schema)
self._gc = GrantsCrawler(self._tc)

self._databases = (
databases if databases else [database["databaseName"] for database in self._tc._all_databases()]
)
self._databases = databases if databases else [database for (database,) in self._tc._all_databases()]

def database_snapshot(self):
tables = []
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ def test_e2e(
tacl=TaclConfig(auto=True),
log_level="DEBUG",
)
toolkit = GroupMigrationToolkit(config)

warehouse_id = os.environ["TEST_DEFAULT_WAREHOUSE_ID"]
toolkit = GroupMigrationToolkit(config, warehouse_id=warehouse_id)
toolkit.prepare_environment()

group_migration_state = toolkit._group_manager.migration_groups_provider
Expand Down
23 changes: 23 additions & 0 deletions tests/integration/test_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os

from databricks.labs.ucx.inventory.permissions_inventory import (
PermissionsInventoryTable,
)
from databricks.labs.ucx.inventory.types import PermissionsInventoryItem
from databricks.labs.ucx.tacl._internal import StatementExecutionBackend


def test_permissions_save_and_load(ws, make_schema):
schema = make_schema().split(".")[-1]
backend = StatementExecutionBackend(ws, os.environ["TEST_DEFAULT_WAREHOUSE_ID"])
pi = PermissionsInventoryTable(backend, schema)

saved = [
PermissionsInventoryItem(object_id="abc", support="bcd", raw_object_permissions="def"),
PermissionsInventoryItem(object_id="efg", support="fgh", raw_object_permissions="ghi"),
]

pi.save(saved)
loaded = pi.load_all()

assert saved == loaded
37 changes: 0 additions & 37 deletions tests/unit/conftest.py

This file was deleted.

Loading

0 comments on commit f4e5989

Please sign in to comment.