Skip to content

Commit

Permalink
Merge branch 'main' into acl_crawl_2770
Browse files Browse the repository at this point in the history
  • Loading branch information
nfx authored Oct 24, 2024
2 parents fbdf0fd + 21cafaa commit 2a000cc
Show file tree
Hide file tree
Showing 54 changed files with 967 additions and 330 deletions.
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/assessment/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class ServicePrincipalClusterMapping:


class AzureServicePrincipalCrawler(CrawlerBase[AzureServicePrincipalInfo], JobsMixin, SecretsMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "azure_service_principals", AzureServicePrincipalInfo)
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
super().__init__(sql_backend, "hive_metastore", schema, "azure_service_principals", AzureServicePrincipalInfo)
self._ws = ws

def _try_fetch(self) -> Iterable[AzureServicePrincipalInfo]:
Expand Down
8 changes: 4 additions & 4 deletions src/databricks/labs/ucx/assessment/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ def _check_cluster_failures(self, cluster: ClusterDetails, source: str) -> list[


class ClustersCrawler(CrawlerBase[ClusterInfo], CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema: str):
super().__init__(sbe, "hive_metastore", schema, "clusters", ClusterInfo)
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema: str):
super().__init__(sql_backend, "hive_metastore", schema, "clusters", ClusterInfo)
self._ws = ws

def _crawl(self) -> Iterable[ClusterInfo]:
Expand Down Expand Up @@ -210,8 +210,8 @@ class PolicyInfo:


class PoliciesCrawler(CrawlerBase[PolicyInfo], CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "policies", PolicyInfo)
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
super().__init__(sql_backend, "hive_metastore", schema, "policies", PolicyInfo)
self._ws = ws

def _crawl(self) -> Iterable[PolicyInfo]:
Expand Down
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/assessment/init_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def check_init_script(self, init_script_data: str | None, source: str) -> list[s


class GlobalInitScriptCrawler(CrawlerBase[GlobalInitScriptInfo], CheckInitScriptMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "global_init_scripts", GlobalInitScriptInfo)
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
super().__init__(sql_backend, "hive_metastore", schema, "global_init_scripts", GlobalInitScriptInfo)
self._ws = ws

def _crawl(self) -> Iterable[GlobalInitScriptInfo]:
Expand Down
8 changes: 4 additions & 4 deletions src/databricks/labs/ucx/assessment/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ def _job_clusters(job: BaseJob) -> Iterable[tuple[BaseJob, ClusterSpec]]:


class JobsCrawler(CrawlerBase[JobInfo], JobsMixin, CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "jobs", JobInfo)
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
super().__init__(sql_backend, "hive_metastore", schema, "jobs", JobInfo)
self._ws = ws

def _crawl(self) -> Iterable[JobInfo]:
Expand Down Expand Up @@ -180,8 +180,8 @@ class SubmitRunsCrawler(CrawlerBase[SubmitRunInfo], JobsMixin, CheckClusterMixin
"fs.adl",
]

def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema: str, num_days_history: int):
super().__init__(sbe, "hive_metastore", schema, "submit_runs", SubmitRunInfo)
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema: str, num_days_history: int):
super().__init__(sql_backend, "hive_metastore", schema, "submit_runs", SubmitRunInfo)
self._ws = ws
self._num_days_history = num_days_history

Expand Down
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/assessment/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class PipelineInfo:


class PipelinesCrawler(CrawlerBase[PipelineInfo], CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "pipelines", PipelineInfo)
def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema):
super().__init__(sql_backend, "hive_metastore", schema, "pipelines", PipelineInfo)
self._ws = ws

def _crawl(self) -> Iterable[PipelineInfo]:
Expand Down
20 changes: 16 additions & 4 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from databricks.labs.ucx.assessment.export import AssessmentExporter
from databricks.labs.ucx.aws.credentials import CredentialManager
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership, LegacyQueryOwnership
from databricks.labs.ucx.hive_metastore import ExternalLocations, MountsCrawler, TablesCrawler
from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema
from databricks.labs.ucx.hive_metastore.grants import (
Expand All @@ -43,13 +43,13 @@
PrincipalACL,
)
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationOwnership
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership
from databricks.labs.ucx.hive_metastore.table_migrate import (
TableMigrationStatusRefresher,
TablesMigrator,
)
from databricks.labs.ucx.hive_metastore.table_move import TableMove
from databricks.labs.ucx.hive_metastore.tables import TableOwnership
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership
from databricks.labs.ucx.hive_metastore.verification import VerifyHasCatalog, VerifyHasMetastore
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
Expand Down Expand Up @@ -263,17 +263,29 @@ def tables_crawler(self) -> TablesCrawler:

@cached_property
def table_ownership(self) -> TableOwnership:
return TableOwnership(self.administrator_locator)
return TableOwnership(
self.administrator_locator,
self.grants_crawler,
self.used_tables_crawler_for_paths,
self.used_tables_crawler_for_queries,
self.legacy_query_ownership,
self.workspace_path_ownership,
)

@cached_property
def workspace_path_ownership(self) -> WorkspacePathOwnership:
return WorkspacePathOwnership(self.administrator_locator, self.workspace_client)

@cached_property
def legacy_query_ownership(self) -> LegacyQueryOwnership:
return LegacyQueryOwnership(self.administrator_locator, self.workspace_client)

@cached_property
def directfs_access_ownership(self) -> DirectFsAccessOwnership:
return DirectFsAccessOwnership(
self.administrator_locator,
self.workspace_path_ownership,
self.legacy_query_ownership,
self.workspace_client,
)

Expand Down
62 changes: 34 additions & 28 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
from databricks.labs.ucx.hive_metastore.udfs import Udf
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
from databricks.labs.ucx.progress.history import HistoryLog
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder

# As with GlobalContext, service factories unavoidably have a lot of public methods.
Expand Down Expand Up @@ -137,7 +138,7 @@ def task_run_warning_recorder(self) -> TaskRunWarningRecorder:
self._config_path.parent,
self.named_parameters["workflow"],
int(self.named_parameters["job_id"]),
int(self.named_parameters["parent_run_id"]),
self.parent_run_id,
self.sql_backend,
self.inventory_database,
int(self.named_parameters.get("attempt", "0")),
Expand All @@ -151,7 +152,7 @@ def workflow_run_recorder(self) -> WorkflowRunRecorder:
workspace_id=self.workspace_id,
workflow_name=self.named_parameters["workflow"],
workflow_id=int(self.named_parameters["job_id"]),
workflow_run_id=int(self.named_parameters["parent_run_id"]),
workflow_run_id=self.parent_run_id,
workflow_run_attempt=int(self.named_parameters.get("attempt", 0)),
workflow_start_time=self.named_parameters["start_time"],
)
Expand All @@ -161,89 +162,94 @@ def workspace_id(self) -> int:
return self.workspace_client.get_workspace_id()

@cached_property
def historical_clusters_log(self) -> HistoryLog[ClusterInfo]:
return HistoryLog(
def parent_run_id(self) -> int:
return int(self.named_parameters["parent_run_id"])

@cached_property
def clusters_progress(self) -> ProgressEncoder[ClusterInfo]:
return ProgressEncoder(
self.sql_backend,
self.cluster_ownership,
ClusterInfo,
int(self.named_parameters["parent_run_id"]),
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_cluster_policies_log(self) -> HistoryLog[PolicyInfo]:
return HistoryLog(
def policies_progress(self) -> ProgressEncoder[PolicyInfo]:
return ProgressEncoder(
self.sql_backend,
self.cluster_policy_ownership,
PolicyInfo,
int(self.named_parameters["parent_run_id"]),
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_grants_log(self) -> HistoryLog[Grant]:
return HistoryLog(
def grants_progress(self) -> ProgressEncoder[Grant]:
return ProgressEncoder(
self.sql_backend,
self.grant_ownership,
Grant,
int(self.named_parameters["parent_run_id"]),
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_jobs_log(self) -> HistoryLog[JobInfo]:
return HistoryLog(
def jobs_progress(self) -> ProgressEncoder[JobInfo]:
return JobsProgressEncoder(
self.sql_backend,
self.job_ownership,
JobInfo,
int(self.named_parameters["parent_run_id"]),
self.inventory_database,
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_pipelines_log(self) -> HistoryLog[PipelineInfo]:
return HistoryLog(
def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]:
return ProgressEncoder(
self.sql_backend,
self.pipeline_ownership,
PipelineInfo,
int(self.named_parameters["parent_run_id"]),
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_tables_log(self) -> HistoryLog[Table]:
return HistoryLog(
def tables_progress(self) -> ProgressEncoder[Table]:
return ProgressEncoder(
self.sql_backend,
self.table_ownership,
Table,
int(self.named_parameters["parent_run_id"]),
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_table_migration_log(self) -> HistoryLog[TableMigrationStatus]:
return HistoryLog(
def historical_table_migration_log(self) -> ProgressEncoder[TableMigrationStatus]:
# TODO: merge into tables_progress
return ProgressEncoder(
self.sql_backend,
self.table_migration_ownership,
TableMigrationStatus,
int(self.named_parameters["parent_run_id"]),
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_udfs_log(self) -> HistoryLog[Udf]:
return HistoryLog(
def udfs_progress(self) -> ProgressEncoder[Udf]:
return ProgressEncoder(
self.sql_backend,
self.udf_ownership,
Udf,
int(self.named_parameters["parent_run_id"]),
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)
12 changes: 6 additions & 6 deletions src/databricks/labs/ucx/framework/crawlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ class DataclassInstance(Protocol):


class CrawlerBase(ABC, Generic[Result]):
def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]) -> None:
def __init__(self, sql_backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]) -> None:
"""
Initializes a CrawlerBase instance.
Args:
backend (SqlBackend): The backend that executes SQL queries:
sql_backend (SqlBackend): The backend that executes SQL queries:
Statement Execution API or Databricks Runtime.
catalog (str): The catalog name for the inventory persistence.
schema: The schema name for the inventory persistence.
Expand All @@ -35,9 +35,9 @@ def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, k
self._catalog = self._valid(catalog)
self._schema = self._valid(schema)
self._table = self._valid(table)
self._backend = backend
self._fetch = backend.fetch
self._exec = backend.execute
self._sql_backend = sql_backend
self._fetch = sql_backend.fetch
self._exec = sql_backend.execute
self._klass = klass

@property
Expand Down Expand Up @@ -161,4 +161,4 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool)

def _update_snapshot(self, items: Sequence[Result], *, mode: Literal["append", "overwrite"]) -> None:
logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}")
self._backend.save_table(self.full_name, items, self._klass, mode=mode)
self._sql_backend.save_table(self.full_name, items, self._klass, mode=mode)
18 changes: 18 additions & 0 deletions src/databricks/labs/ucx/framework/owners.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ def __init__(self, administrator_locator: AdministratorLocator, ws: WorkspaceCli
super().__init__(administrator_locator)
self._ws = ws

def owner_of_path(self, path: str) -> str:
return self.owner_of(WorkspacePath(self._ws, path))

@retried(on=[InternalError], timeout=timedelta(minutes=1))
def _maybe_direct_owner(self, record: WorkspacePath) -> str | None:
maybe_type_and_id = self._maybe_type_and_id(record)
Expand Down Expand Up @@ -237,3 +240,18 @@ def _infer_from_first_can_manage(object_permissions):
return acl.group_name
return acl.service_principal_name
return None


class LegacyQueryOwnership(Ownership[str]):
def __init__(self, administrator_locator: AdministratorLocator, workspace_client: WorkspaceClient) -> None:
super().__init__(administrator_locator)
self._workspace_client = workspace_client

def _maybe_direct_owner(self, record: str) -> str | None:
try:
legacy_query = self._workspace_client.queries.get(record)
return legacy_query.owner_user_name
except NotFound:
return None
except InternalError: # redash is very naughty and throws 500s instead of proper 404s
return None
Loading

0 comments on commit 2a000cc

Please sign in to comment.