Skip to content

Commit

Permalink
Crawlers: append snapshots to history journal, if available (#2743)
Browse files Browse the repository at this point in the history
## Changes

This PR introduces a history table where snapshots are journaled after
each crawling operation.

### Linked issues

Progresses #2572, resolves #2573.

### Functionality

- [X] modified existing workflow: `migration-progress-experimental`

### Tests

- [X] added unit tests
- [X] updated integration tests
  • Loading branch information
asnare authored Oct 23, 2024
1 parent 9408ce2 commit 2b4865e
Show file tree
Hide file tree
Showing 24 changed files with 1,933 additions and 66 deletions.
5 changes: 5 additions & 0 deletions src/databricks/labs/ucx/assessment/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from collections.abc import Iterable
from dataclasses import dataclass
from typing import ClassVar

from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
Expand Down Expand Up @@ -46,6 +47,8 @@ class ClusterInfo:
creator: str | None = None
"""User-name of the creator of the cluster, if known."""

__id_attributes__: ClassVar[tuple[str, ...]] = ("cluster_id",)


class CheckClusterMixin(CheckInitScriptMixin):
_ws: WorkspaceClient
Expand Down Expand Up @@ -203,6 +206,8 @@ class PolicyInfo:
creator: str | None = None
"""User-name of the creator of the cluster policy, if known."""

__id_attributes__: ClassVar[tuple[str, ...]] = ("policy_id",)


class PoliciesCrawler(CrawlerBase[PolicyInfo], CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
Expand Down
3 changes: 3 additions & 0 deletions src/databricks/labs/ucx/assessment/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from hashlib import sha256
from typing import ClassVar

from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
Expand Down Expand Up @@ -40,6 +41,8 @@ class JobInfo:
creator: str | None = None
"""User-name of the creator of the pipeline, if known."""

__id_attributes__: ClassVar[tuple[str, ...]] = ("job_id",)


class JobsMixin:
@classmethod
Expand Down
3 changes: 3 additions & 0 deletions src/databricks/labs/ucx/assessment/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from collections.abc import Iterable
from dataclasses import dataclass
from typing import ClassVar

from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
Expand All @@ -24,6 +25,8 @@ class PipelineInfo:
creator_name: str | None = None
"""User-name of the creator of the pipeline, if known."""

__id_attributes__: ClassVar[tuple[str, ...]] = ("pipeline_id",)


class PipelinesCrawler(CrawlerBase[PipelineInfo], CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
Expand Down
22 changes: 20 additions & 2 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,19 @@
ComputeLocations,
Grant,
GrantsCrawler,
GrantOwnership,
MigrateGrants,
PrincipalACL,
)
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationOwnership
from databricks.labs.ucx.hive_metastore.table_migrate import (
TableMigrationStatusRefresher,
TablesMigrator,
)
from databricks.labs.ucx.hive_metastore.table_move import TableMove
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler
from databricks.labs.ucx.hive_metastore.tables import TableOwnership
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership
from databricks.labs.ucx.hive_metastore.verification import VerifyHasCatalog, VerifyHasMetastore
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
from databricks.labs.ucx.progress.install import VerifyProgressTracking
Expand Down Expand Up @@ -243,14 +245,26 @@ def group_manager(self) -> GroupManager:
def grants_crawler(self) -> GrantsCrawler:
return GrantsCrawler(self.tables_crawler, self.udfs_crawler, self.config.include_databases)

@cached_property
def grant_ownership(self) -> GrantOwnership:
return GrantOwnership(self.administrator_locator)

@cached_property
def udfs_crawler(self) -> UdfsCrawler:
return UdfsCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)

@cached_property
def udf_ownership(self) -> UdfOwnership:
return UdfOwnership(self.administrator_locator)

@cached_property
def tables_crawler(self) -> TablesCrawler:
return TablesCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)

@cached_property
def table_ownership(self) -> TableOwnership:
return TableOwnership(self.administrator_locator)

@cached_property
def tables_migrator(self) -> TablesMigrator:
return TablesMigrator(
Expand Down Expand Up @@ -363,6 +377,10 @@ def migration_status_refresher(self) -> TableMigrationStatusRefresher:
self.tables_crawler,
)

@cached_property
def table_migration_ownership(self) -> TableMigrationOwnership:
return TableMigrationOwnership(self.tables_crawler, self.table_ownership)

@cached_property
def iam_credential_manager(self) -> CredentialManager:
return CredentialManager(self.workspace_client)
Expand Down
134 changes: 129 additions & 5 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,35 @@

from databricks.labs.blueprint.installation import Installation
from databricks.labs.lsql.backends import RuntimeBackend, SqlBackend
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus
from databricks.sdk import WorkspaceClient, core

from databricks.labs.ucx.__about__ import __version__
from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler
from databricks.labs.ucx.assessment.clusters import (
ClustersCrawler,
PoliciesCrawler,
ClusterOwnership,
ClusterInfo,
ClusterPolicyOwnership,
PolicyInfo,
)
from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptCrawler
from databricks.labs.ucx.assessment.jobs import JobsCrawler, SubmitRunsCrawler
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo, JobsCrawler, SubmitRunsCrawler
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler, PipelineInfo, PipelineOwnership
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.contexts.application import GlobalContext
from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler
from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
from databricks.labs.ucx.hive_metastore.udfs import Udf
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
from databricks.labs.ucx.progress.history import HistoryLog
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder

# As with GlobalContext, service factories unavoidably have a lot of public methods.
# pylint: disable=too-many-public-methods


class RuntimeContext(GlobalContext):
@cached_property
Expand Down Expand Up @@ -54,6 +68,10 @@ def installation(self) -> Installation:
def jobs_crawler(self) -> JobsCrawler:
return JobsCrawler(self.workspace_client, self.sql_backend, self.inventory_database)

@cached_property
def job_ownership(self) -> JobOwnership:
return JobOwnership(self.administrator_locator)

@cached_property
def submit_runs_crawler(self) -> SubmitRunsCrawler:
return SubmitRunsCrawler(
Expand All @@ -67,10 +85,18 @@ def submit_runs_crawler(self) -> SubmitRunsCrawler:
def clusters_crawler(self) -> ClustersCrawler:
return ClustersCrawler(self.workspace_client, self.sql_backend, self.inventory_database)

@cached_property
def cluster_ownership(self) -> ClusterOwnership:
return ClusterOwnership(self.administrator_locator)

@cached_property
def pipelines_crawler(self) -> PipelinesCrawler:
return PipelinesCrawler(self.workspace_client, self.sql_backend, self.inventory_database)

@cached_property
def pipeline_ownership(self) -> PipelineOwnership:
return PipelineOwnership(self.administrator_locator)

@cached_property
def table_size_crawler(self) -> TableSizeCrawler:
return TableSizeCrawler(self.tables_crawler)
Expand All @@ -79,12 +105,18 @@ def table_size_crawler(self) -> TableSizeCrawler:
def policies_crawler(self) -> PoliciesCrawler:
return PoliciesCrawler(self.workspace_client, self.sql_backend, self.inventory_database)

@cached_property
def cluster_policy_ownership(self) -> ClusterPolicyOwnership:
return ClusterPolicyOwnership(self.administrator_locator)

@cached_property
def global_init_scripts_crawler(self) -> GlobalInitScriptCrawler:
return GlobalInitScriptCrawler(self.workspace_client, self.sql_backend, self.inventory_database)

@cached_property
def tables_crawler(self) -> TablesCrawler:
# Warning: Not all runtime contexts support the fast-scan implementation; it requires the JVM bridge to Spark
# and that's not always available.
return FasterTableScanCrawler(self.sql_backend, self.inventory_database, self.config.include_databases)

@cached_property
Expand Down Expand Up @@ -116,10 +148,102 @@ def workflow_run_recorder(self) -> WorkflowRunRecorder:
return WorkflowRunRecorder(
self.sql_backend,
self.config.ucx_catalog,
workspace_id=self.workspace_client.get_workspace_id(),
workspace_id=self.workspace_id,
workflow_name=self.named_parameters["workflow"],
workflow_id=int(self.named_parameters["job_id"]),
workflow_run_id=int(self.named_parameters["parent_run_id"]),
workflow_run_attempt=int(self.named_parameters.get("attempt", 0)),
workflow_start_time=self.named_parameters["start_time"],
)

@cached_property
def workspace_id(self) -> int:
return self.workspace_client.get_workspace_id()

@cached_property
def historical_clusters_log(self) -> HistoryLog[ClusterInfo]:
return HistoryLog(
self.sql_backend,
self.cluster_ownership,
ClusterInfo,
int(self.named_parameters["parent_run_id"]),
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_cluster_policies_log(self) -> HistoryLog[PolicyInfo]:
return HistoryLog(
self.sql_backend,
self.cluster_policy_ownership,
PolicyInfo,
int(self.named_parameters["parent_run_id"]),
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_grants_log(self) -> HistoryLog[Grant]:
return HistoryLog(
self.sql_backend,
self.grant_ownership,
Grant,
int(self.named_parameters["parent_run_id"]),
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_jobs_log(self) -> HistoryLog[JobInfo]:
return HistoryLog(
self.sql_backend,
self.job_ownership,
JobInfo,
int(self.named_parameters["parent_run_id"]),
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_pipelines_log(self) -> HistoryLog[PipelineInfo]:
return HistoryLog(
self.sql_backend,
self.pipeline_ownership,
PipelineInfo,
int(self.named_parameters["parent_run_id"]),
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_tables_log(self) -> HistoryLog[Table]:
return HistoryLog(
self.sql_backend,
self.table_ownership,
Table,
int(self.named_parameters["parent_run_id"]),
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_table_migration_log(self) -> HistoryLog[TableMigrationStatus]:
return HistoryLog(
self.sql_backend,
self.table_migration_ownership,
TableMigrationStatus,
int(self.named_parameters["parent_run_id"]),
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_udfs_log(self) -> HistoryLog[Udf]:
return HistoryLog(
self.sql_backend,
self.udf_ownership,
Udf,
int(self.named_parameters["parent_run_id"]),
self.workspace_id,
self.config.ucx_catalog,
)
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/framework/crawlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,6 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool)
self._update_snapshot(loaded_records, mode="overwrite")
return loaded_records

def _update_snapshot(self, items: Sequence[Result], mode: Literal["append", "overwrite"] = "append") -> None:
def _update_snapshot(self, items: Sequence[Result], *, mode: Literal["append", "overwrite"]) -> None:
logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}")
self._backend.save_table(self.full_name, items, self._klass, mode=mode)
9 changes: 8 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections.abc import Callable, Iterable
from dataclasses import dataclass, replace
from functools import partial, cached_property
from typing import Protocol
from typing import ClassVar, Protocol

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.parallel import ManyError, Threads
Expand Down Expand Up @@ -66,6 +66,8 @@ class Grant:
any_file: bool = False
anonymous_function: bool = False

__id_attributes__: ClassVar[tuple[str, ...]] = ("object_type", "object_key", "action_type", "principal")

@staticmethod
def type_and_key(
*,
Expand Down Expand Up @@ -105,6 +107,11 @@ def type_and_key(
)
raise ValueError(msg)

@property
def object_type(self) -> str:
this_type, _ = self.this_type_and_key()
return this_type

@property
def object_key(self) -> str:
_, key = self.this_type_and_key()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from dataclasses import dataclass, replace
from collections.abc import Iterable, KeysView
from typing import ClassVar

from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
Expand All @@ -25,6 +26,8 @@ class TableMigrationStatus:
dst_table: str | None = None
update_ts: str | None = None

__id_attributes__: ClassVar[tuple[str, ...]] = ("src_schema", "src_table")

def destination(self):
return f"{self.dst_catalog}.{self.dst_schema}.{self.dst_table}".lower()

Expand Down
4 changes: 3 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import re
import typing
from collections.abc import Iterable, Iterator, Collection
from collections.abc import Collection, Iterable, Iterator
from dataclasses import dataclass
from enum import Enum, auto
from functools import cached_property, partial
Expand Down Expand Up @@ -64,6 +64,8 @@ class Table: # pylint: disable=too-many-public-methods
storage_properties: str | None = None
is_partitioned: bool = False

__id_attributes__: typing.ClassVar[tuple[str, ...]] = ("catalog", "database", "name")

DBFS_ROOT_PREFIXES: typing.ClassVar[list[str]] = [
"/dbfs/",
"dbfs:/",
Expand Down
Loading

0 comments on commit 2b4865e

Please sign in to comment.