Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify migration progress prerequisites during UCX catalog creation #2912

Merged
merged 29 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f123df6
Add VerifyProgressTracking
JCZuurmond Oct 10, 2024
e3bafd4
Move verification docs
JCZuurmond Oct 10, 2024
3557a4b
Add command to metastore not attached message
JCZuurmond Oct 10, 2024
959f73c
Normalize error message
JCZuurmond Oct 10, 2024
3458386
Add command to assessment workflow not run
JCZuurmond Oct 10, 2024
9d7b46a
Format
JCZuurmond Oct 10, 2024
eb79edd
Fix wrong import
JCZuurmond Oct 10, 2024
3f941b9
Add verify progress tracking to global context
JCZuurmond Oct 10, 2024
f8cf9cf
Reuse verify progress tracking in workflow
JCZuurmond Oct 10, 2024
3f862fa
Verify progress tracking in create-ucx-catalog command
JCZuurmond Oct 10, 2024
33e03f3
Format
JCZuurmond Oct 10, 2024
c6d3c6f
Fix unit test
JCZuurmond Oct 10, 2024
74cdc13
Remove jobs import
JCZuurmond Oct 10, 2024
f9c79fd
Fix test
JCZuurmond Oct 10, 2024
a3be010
Test runtime warning being raised
JCZuurmond Oct 10, 2024
fb64741
Test verify progress tracking
JCZuurmond Oct 10, 2024
16aa938
Remove parenthesis in warning message
JCZuurmond Oct 10, 2024
e07f3ce
Format
JCZuurmond Oct 10, 2024
ec48d1b
Change mock level for test_verify_progress_tracking_valid_prerequisites
JCZuurmond Oct 10, 2024
d9f183b
Change mock level for test_verify_progress_tracking_raises_runtime_er…
JCZuurmond Oct 10, 2024
d5052fd
Change mock level for test_verify_progress_tracking_raises_runtime_er…
JCZuurmond Oct 10, 2024
9f34ead
Change mock level for test_verify_progress_tracking_raises_runtime_er…
JCZuurmond Oct 10, 2024
724756b
Change mock level for test_verify_progress_tracking_raises_runtime_er…
JCZuurmond Oct 10, 2024
7ce4a3a
Rename test
JCZuurmond Oct 10, 2024
382a820
Remove redundant tests
JCZuurmond Oct 10, 2024
6fb73e3
Fix regex pattern
JCZuurmond Oct 10, 2024
28581df
Format
JCZuurmond Oct 10, 2024
b3368a7
Wait for one hour on the assessment run during migration progress
JCZuurmond Oct 10, 2024
4830ef5
Merge branch 'main' into feat/verify-migration-progress-prerequisites
nfx Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ def create_ucx_catalog(w: WorkspaceClient, prompts: Prompts, ctx: WorkspaceConte
workspace_context = ctx or WorkspaceContext(w)
workspace_context.catalog_schema.create_ucx_catalog(prompts)
workspace_context.progress_tracking_installation.run()
workspace_context.verify_progress_tracking.verify()


@ucx.command
Expand Down
5 changes: 5 additions & 0 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler
from databricks.labs.ucx.hive_metastore.verification import VerifyHasCatalog, VerifyHasMetastore
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
from databricks.labs.ucx.progress.install import VerifyProgressTracking
from databricks.labs.ucx.source_code.graph import DependencyResolver
from databricks.labs.ucx.source_code.jobs import WorkflowLinter
from databricks.labs.ucx.source_code.known import KnownList
Expand Down Expand Up @@ -397,6 +398,10 @@ def verify_has_metastore(self) -> VerifyHasMetastore:
def verify_has_ucx_catalog(self) -> VerifyHasCatalog:
return VerifyHasCatalog(self.workspace_client, self.config.ucx_catalog)

@cached_property
def verify_progress_tracking(self) -> VerifyProgressTracking:
return VerifyProgressTracking(self.verify_has_metastore, self.verify_has_ucx_catalog, self.deployed_workflows)

@cached_property
def pip_resolver(self) -> PythonLibraryResolver:
return PythonLibraryResolver(self.allow_list)
Expand Down
53 changes: 53 additions & 0 deletions src/databricks/labs/ucx/progress/install.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import datetime as dt
import logging
from dataclasses import dataclass

from databricks.labs.lsql.backends import SqlBackend
from databricks.labs.lsql.deployment import SchemaDeployer

from databricks.labs.ucx.__about__ import __version__
from databricks.labs.ucx.hive_metastore.verification import MetastoreNotFoundError, VerifyHasCatalog, VerifyHasMetastore
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
from databricks.labs.ucx.progress.workflow_runs import WorkflowRun


Expand Down Expand Up @@ -52,3 +55,53 @@ def run(self) -> None:
self._schema_deployer.deploy_table("workflow_runs", WorkflowRun)
self._schema_deployer.deploy_table("historical", Historical)
logger.info("Installation completed successfully!")


class VerifyProgressTracking:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nfx : What do you think of these Verify... classes? They are small classes that mainly function as a wrapper around one method

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we'd merge those

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oke, I could cover that in a separate PR. To make it more specific, we have the following classes:

  • VerifyHasMetastore: Verifies if there is a (current) metastore accessible
  • VerifyHasCatalog : Verifies if there is a catalog accessible
  • VerifyProgressTracking : Verifies if the progress tracking pre-requisites are met.

VerifyHasMetastore and VerifyHasCatalog are logically closer and easier to merge. VerifyProgressTracking is more specific (to the progress tracking) and requires the DeployedWorkflows.

Would you conclude to merge all and have a progress tracking specific method on the class? Or merge VerifyHasMetastore and VerifyHasCatalog and not merge VerifyProgressTracking (for now)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VerifyHasMetastore and VerifyHasCatalog

let's start with merging those once we have cycles for it

"""Verify the progress tracking is ready to be used."""

def __init__(
self,
verify_has_metastore: VerifyHasMetastore,
verify_has_ucx_catalog: VerifyHasCatalog,
deployed_workflows: DeployedWorkflows,
) -> None:
self._verify_has_metastore = verify_has_metastore
self._verify_has_ucx_catalog = verify_has_ucx_catalog
self._deployed_workflows = deployed_workflows

def verify(self, timeout=dt.timedelta(seconds=0)) -> None:
"""Verify the progress tracking installation is ready to be used.

Prerequisites:
- UC metastore exists.
- UCX catalog exists.
- A job run corresponding to the "assessment" job:
- Finished successfully.
- OR if pending or running, we will wait up to the timeout for the assessment run to finish. If it did still
not finish successfully, we raise an error.

Otherwise, we consider the prerequisites to be NOT matched.

Args :
timeout (datetime.timedelta) : Timeout to wait for pending or running assessment run.

Raises :
RuntimeWarning : Signalling the prerequisites are not met.
"""
metastore_not_attached_message = (
"Metastore not attached to workspace. Run `databricks labs ucx assign-metastore`"
)
try:
has_metastore = self._verify_has_metastore.verify_metastore()
except MetastoreNotFoundError as e:
raise RuntimeWarning(metastore_not_attached_message) from e
if not has_metastore:
raise RuntimeWarning(metastore_not_attached_message)
if not self._verify_has_ucx_catalog.verify():
raise RuntimeWarning("UCX catalog not configured. Run `databricks labs ucx create-ucx-catalog`")
if not self._deployed_workflows.validate_step("assessment", timeout=timeout):
raise RuntimeWarning(
"Assessment workflow did not complete successfully yet. "
"Run `databricks labs ucx ensure-assessment-run` command"
)
25 changes: 2 additions & 23 deletions src/databricks/labs/ucx/progress/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
from databricks.labs.ucx.framework.tasks import Workflow, job_task
from databricks.labs.ucx.hive_metastore.verification import MetastoreNotFoundError


class MigrationProgress(Workflow):
Expand Down Expand Up @@ -111,29 +110,9 @@ def setup_table_migration(self, ctx: RuntimeContext) -> None:
def verify_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled.

Prerequisites:
- UC metastore exists.
- UCX catalog exists.
- A job run corresponding to the "assessment" job:
- Finished successfully.
- OR if pending or running, we will wait up to 1 hour for the assessment run to finish. If did still not
finish successfully, we fail.

Otherwise, we consider the prerequisites to be NOT matched.

Raises :
RuntimeWarning : Signalling the prerequisites are not met.
We will wait up to 1 hour for the assessment run to finish if it is running or pending.
"""
try:
has_metastore = ctx.verify_has_metastore.verify_metastore()
except MetastoreNotFoundError as e:
raise RuntimeWarning("Metastore not attached to workspace") from e
if not has_metastore:
raise RuntimeWarning("Metastore not attached to workspace")
if not ctx.verify_has_ucx_catalog.verify():
raise RuntimeWarning("UCX catalog not configured. Run `databricks labs ucx create-ucx-catalog` command")
if not ctx.deployed_workflows.validate_step("assessment", timeout=dt.timedelta(hours=1)):
raise RuntimeWarning("Assessment workflow not completed successfully")
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(depends_on=[crawl_tables, verify_prerequisites], job_cluster="table_migration")
def refresh_table_migration_status(self, ctx: RuntimeContext) -> None:
Expand Down
80 changes: 79 additions & 1 deletion tests/unit/progress/test_install.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from databricks.labs.ucx.progress.install import ProgressTrackingInstallation
import datetime as dt
from unittest.mock import create_autospec

import pytest

from databricks.labs.ucx.hive_metastore.verification import MetastoreNotFoundError, VerifyHasCatalog, VerifyHasMetastore
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
from databricks.labs.ucx.progress.install import ProgressTrackingInstallation, VerifyProgressTracking


def test_progress_tracking_installation_run_creates_progress_tracking_schema(mock_backend) -> None:
Expand All @@ -12,3 +19,74 @@ def test_progress_tracking_installation_run_creates_tables(mock_backend) -> None
installation.run()
# Dataclass to schema conversion is tested within the lsql package
assert sum("CREATE TABLE IF NOT EXISTS" in query for query in mock_backend.queries) == 2


def test_verify_progress_tracking_valid_prerequisites() -> None:
verify_has_metastore = create_autospec(VerifyHasMetastore)
verify_has_catalog = create_autospec(VerifyHasCatalog)
deployed_workflows = create_autospec(DeployedWorkflows)
verify_progress_tracking = VerifyProgressTracking(verify_has_metastore, verify_has_catalog, deployed_workflows)
timeout = dt.timedelta(hours=1)
try:
verify_progress_tracking.verify(timeout=timeout)
except RuntimeError as e:
assert False, f"Verify progress tracking raises: {e}"
else:
assert True, "Valid prerequisites found"
verify_has_metastore.verify_metastore.assert_called_once()
verify_has_catalog.verify.assert_called_once()
deployed_workflows.validate_step.assert_called_once_with("assessment", timeout=timeout)


def test_verify_progress_tracking_raises_runtime_error_if_metastore_not_attached_to_workspace(
mock_installation,
) -> None:
verify_has_metastore = create_autospec(VerifyHasMetastore)
verify_has_metastore.verify_metastore.side_effect = MetastoreNotFoundError
verify_has_catalog = create_autospec(VerifyHasCatalog)
deployed_workflows = create_autospec(DeployedWorkflows)
verify_progress_tracking = VerifyProgressTracking(verify_has_metastore, verify_has_catalog, deployed_workflows)
with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace"):
verify_progress_tracking.verify()
verify_has_metastore.verify_metastore.assert_called_once()
verify_has_catalog.assert_not_called()
deployed_workflows.assert_not_called()


def test_verify_progress_tracking_raises_runtime_error_if_no_metastore(mock_installation) -> None:
verify_has_metastore = create_autospec(VerifyHasMetastore)
verify_has_metastore.verify_metastore.return_value = False
verify_has_catalog = create_autospec(VerifyHasCatalog)
deployed_workflows = create_autospec(DeployedWorkflows)
verify_progress_tracking = VerifyProgressTracking(verify_has_metastore, verify_has_catalog, deployed_workflows)
with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace"):
verify_progress_tracking.verify()
verify_has_metastore.verify_metastore.assert_called_once()
verify_has_catalog.assert_not_called()
deployed_workflows.assert_not_called()


def test_verify_progress_tracking_raises_runtime_error_if_missing_ucx_catalog(mock_installation) -> None:
verify_has_metastore = create_autospec(VerifyHasMetastore)
verify_has_catalog = create_autospec(VerifyHasCatalog)
verify_has_catalog.verify.return_value = False
deployed_workflows = create_autospec(DeployedWorkflows)
verify_progress_tracking = VerifyProgressTracking(verify_has_metastore, verify_has_catalog, deployed_workflows)
with pytest.raises(RuntimeWarning, match="UCX catalog not configured."):
verify_progress_tracking.verify()
verify_has_metastore.verify_metastore.assert_called_once()
verify_has_catalog.verify.assert_called_once()
deployed_workflows.assert_not_called()


def test_verify_progress_tracking_raises_runtime_error_if_assessment_workflow_did_not_run(mock_installation) -> None:
verify_has_metastore = create_autospec(VerifyHasMetastore)
verify_has_catalog = create_autospec(VerifyHasCatalog)
deployed_workflows = create_autospec(DeployedWorkflows)
deployed_workflows.validate_step.return_value = False
verify_progress_tracking = VerifyProgressTracking(verify_has_metastore, verify_has_catalog, deployed_workflows)
with pytest.raises(RuntimeWarning, match="Assessment workflow did not complete successfully yet."):
verify_progress_tracking.verify()
verify_has_metastore.verify_metastore.assert_called_once()
verify_has_catalog.verify.assert_called_once()
deployed_workflows.validate_step.assert_called_once_with("assessment", timeout=dt.timedelta(seconds=0))
36 changes: 3 additions & 33 deletions tests/unit/progress/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pytest
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import PermissionDenied
from databricks.sdk.service.catalog import CatalogInfo, MetastoreAssignment
from databricks.sdk.service.jobs import BaseRun, RunResultState, RunState

Expand Down Expand Up @@ -55,39 +54,10 @@ def test_migration_progress_with_valid_prerequisites(run_workflow) -> None:
assert True, "Valid prerequisites found"


def test_migration_progress_raises_runtime_error_if_metastore_not_attached_to_workflow(run_workflow) -> None:
def test_migration_progress_with_invalid_prerequisites(run_workflow) -> None:
"""All invalid prerequisites permutations are tested for `VerifyProgressTracking` separately."""
ws = create_autospec(WorkspaceClient)
ws.metastores.current.return_value = None
task = MigrationProgress.verify_prerequisites
with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace"):
with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace."):
run_workflow(task, workspace_client=ws)


def test_migration_progress_raises_runtime_error_if_missing_permissions_to_access_metastore(run_workflow) -> None:
ws = create_autospec(WorkspaceClient)
ws.metastores.current.side_effect = PermissionDenied
task = MigrationProgress.verify_prerequisites
with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace"):
run_workflow(task, workspace_client=ws)


def test_migration_progress_raises_runtime_error_if_missing_ucx_catalog(run_workflow) -> None:
ws = create_autospec(WorkspaceClient)
ws.catalogs.get.return_value = None
task = MigrationProgress.verify_prerequisites
with pytest.raises(RuntimeWarning, match="UCX catalog not configured. .*"):
run_workflow(task, workspace_client=ws)


def test_migration_progress_raises_runtime_error_if_missing_permissions_to_access_ucx_catalog(run_workflow) -> None:
ws = create_autospec(WorkspaceClient)
ws.catalogs.get.side_effect = PermissionDenied
task = MigrationProgress.verify_prerequisites
with pytest.raises(RuntimeWarning, match="UCX catalog not configured. .*"):
run_workflow(task, workspace_client=ws)


def test_migration_progress_raises_runtime_error_if_assessment_workflow_did_not_run(run_workflow) -> None:
task = MigrationProgress.verify_prerequisites
with pytest.raises(RuntimeWarning, match="Assessment workflow not completed successfully"):
run_workflow(task)
17 changes: 13 additions & 4 deletions tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from databricks.sdk import AccountClient, WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.errors.platform import BadRequest
from databricks.sdk.service import jobs, sql
from databricks.sdk.service import sql
from databricks.sdk.service.catalog import ExternalLocationInfo, MetastoreInfo
from databricks.sdk.service.compute import ClusterDetails, ClusterSource
from databricks.sdk.service.iam import ComplexValue, User
Expand Down Expand Up @@ -842,8 +842,8 @@ def test_revert_cluster_remap_empty(ws, caplog):
ws.workspace.list.assert_called_once()


def test_relay_logs(ws, caplog):
ws.jobs.list_runs.return_value = [jobs.BaseRun(run_id=123, start_time=int(time.time()))]
def test_relay_logs(ws, caplog) -> None:
ws.jobs.list_runs.return_value = [Run(run_id=123, start_time=int(time.time()))]
ws.workspace.list.side_effect = [
[
ObjectInfo(path='/Users/foo/.ucx/logs/run-123-0', object_type=ObjectType.DIRECTORY),
Expand Down Expand Up @@ -889,21 +889,30 @@ def test_assign_metastore_logs_account_id_and_assigns_metastore(caplog, acc_clie

def test_create_ucx_catalog_calls_get_catalog(ws) -> None:
prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"})
ws.jobs.list_runs.return_value = [Run(state=RunState(result_state=RunResultState.SUCCESS))]

create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws))

ws.catalogs.get.assert_called_once()
ws.catalogs.get.assert_called()


def test_create_ucx_catalog_creates_history_schema_and_table(ws, mock_backend) -> None:
prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"})
ws.jobs.list_runs.return_value = [Run(state=RunState(result_state=RunResultState.SUCCESS))]

create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws).replace(sql_backend=mock_backend))

assert len(mock_backend.queries) > 0, "No queries executed on backend"
assert "CREATE SCHEMA" in mock_backend.queries[0]


def test_create_ucx_catalog_raises_runtime_error_because_progress_tracking_prerequisites_are_not_met(ws) -> None:
prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"})

with pytest.raises(RuntimeWarning): # Specific warning is not important here
create_ucx_catalog(ws, prompts)


@pytest.mark.parametrize("run_as_collection", [False, True])
def test_migrate_tables_calls_migrate_table_job_run_now(
run_as_collection,
Expand Down