From cb40b60a6cdd97f28c36b9031cc6eb6ccb62d6ac Mon Sep 17 00:00:00 2001 From: Cor Date: Wed, 2 Oct 2024 17:11:03 +0200 Subject: [PATCH] Add and populate UCX `workflow_runs` table (#2754) ## Changes Add and populate workflow runs table ### Linked issues Resolves #2600 ### Functionality - [x] added relevant user documentation - [x] modified existing workflow: `migration-process-experimental` ### Tests - [ ] manually tested - [x] added unit tests - [x] added integration tests ### TODO - [ ] Handle concurrent writes, [see](https://github.com/databrickslabs/ucx/pull/2754#discussion_r1778259805) - [x] Decide on getting workflow run status from `parse_log_task` --> only add it to the migration progress workflow for now --- README.md | 26 ++++-- src/databricks/labs/ucx/cli.py | 1 + .../labs/ucx/contexts/workflow_task.py | 14 ++++ .../labs/ucx/contexts/workspace_cli.py | 5 ++ .../labs/ucx/installer/workflows.py | 3 + src/databricks/labs/ucx/progress/install.py | 22 ++++++ .../labs/ucx/progress/workflow_runs.py | 79 +++++++++++++++++++ src/databricks/labs/ucx/progress/workflows.py | 14 ++++ tests/integration/conftest.py | 10 ++- tests/integration/progress/test_install.py | 7 ++ .../progress/test_workflow_runs.py | 29 +++++++ tests/integration/progress/test_workflows.py | 7 ++ tests/unit/progress/test_install.py | 14 ++++ tests/unit/progress/test_workflow_runs.py | 30 +++++++ tests/unit/test_cli.py | 12 ++- 15 files changed, 265 insertions(+), 8 deletions(-) create mode 100644 src/databricks/labs/ucx/progress/install.py create mode 100644 src/databricks/labs/ucx/progress/workflow_runs.py create mode 100644 tests/integration/progress/test_install.py create mode 100644 tests/integration/progress/test_workflow_runs.py create mode 100644 tests/unit/progress/test_install.py create mode 100644 tests/unit/progress/test_workflow_runs.py diff --git a/README.md b/README.md index 787fe09cd0..676739ec22 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,7 @@ See [contributing instructions](CONTRIBUTING.md) to help improve this project. * [`table-migrated-to-uc`](#table-migrated-to-uc) * [`to-json-in-shared-clusters`](#to-json-in-shared-clusters) * [`unsupported-magic-line`](#unsupported-magic-line) + * [[EXPERIMENTAL] Migration Progress Workflow](#experimental-migration-progress-workflow) * [Utility commands](#utility-commands) * [`logs` command](#logs-command) * [`ensure-assessment-run` command](#ensure-assessment-run-command) @@ -126,7 +127,7 @@ See [contributing instructions](CONTRIBUTING.md) to help improve this project. * [`revert-cluster-remap` command](#revert-cluster-remap-command) * [`upload` command](#upload-command) * [`download` command](#download-command) - * [`join-collection` command](#join-collection command) + * [`join-collection` command](#join-collection-command) * [collection eligible command](#collection-eligible-command) * [Common Challenges and the Solutions](#common-challenges-and-the-solutions) * [Network Connectivity Issues](#network-connectivity-issues) @@ -994,6 +995,19 @@ This message indicates the code that could not be analysed by UCX. User must che [[back to top](#databricks-labs-ucx)] +## [EXPERIMENTAL] Migration Progress Workflow + +The `migration-progress-experimental` workflow updates a subset of the inventory tables to track migration status of +workspace resources that need to be migrated. Besides updating the inventory tables, this workflow tracks the migration +progress by updating the following [UCX catalog](#create-ucx-catalog-command) tables: + +- `workflow_runs`: Tracks the status of the workflow runs. + +_Note: A subset of the inventory is updated, *not* the complete inventory that is initially gathered by +the [assessment workflow](#assessment-workflow)._ + +[[back to top](#databricks-labs-ucx)] + # Utility commands ## `logs` command @@ -1029,11 +1043,13 @@ listed with the [`workflows` command](#workflows-command). databricks labs ucx update-migration-progress ``` -This command updates a subset of the inventory tables that are used to track workspace resources that need to be migrated. It does this by triggering the `migration-process-experimental` workflow to run on a workspace and waiting for it to complete. This can be used to ensure that dashboards and associated reporting are updated to reflect the current state of the workspace. - -_Note: Only a subset of the inventory is updated, *not* the complete inventory that is initially gathered by the [assessment workflow](#assessment-workflow)._ +This command runs the [(experimental) migration progress workflow](#experimental-migration-progress-workflow) to update +the migration status of workspace resources that need to be migrated. It does this by triggering +the `migration-progress-experimental` workflow to run on a workspace and waiting for +it to complete. -Workflows and their status can be listed with the [`workflows` command](#workflows-commandr), while failed workflows can be fixed with the [`repair-run` command](#repair-run-command). +Workflows and their status can be listed with the [`workflows` command](#workflows-commandr), while failed workflows can +be fixed with the [`repair-run` command](#repair-run-command). [[back to top](#databricks-labs-ucx)] diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 04f0b22d0b..b9b75b1640 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -611,6 +611,7 @@ def create_ucx_catalog(w: WorkspaceClient, prompts: Prompts, ctx: WorkspaceConte """ workspace_context = ctx or WorkspaceContext(w) workspace_context.catalog_schema.create_ucx_catalog(prompts) + workspace_context.progress_tracking_installation.run() @ucx.command diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 488c224243..715fb5c67d 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -16,6 +16,7 @@ from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder +from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder class RuntimeContext(GlobalContext): @@ -109,3 +110,16 @@ def task_run_warning_recorder(self): self.inventory_database, int(self.named_parameters.get("attempt", "0")), ) + + @cached_property + def workflow_run_recorder(self) -> WorkflowRunRecorder: + return WorkflowRunRecorder( + self.sql_backend, + self.config.ucx_catalog, + workspace_id=self.workspace_client.get_workspace_id(), + workflow_name=self.named_parameters["workflow"], + workflow_id=int(self.named_parameters["job_id"]), + workflow_run_id=int(self.named_parameters["parent_run_id"]), + workflow_run_attempt=int(self.named_parameters.get("attempt", 0)), + workflow_start_time=self.named_parameters["start_time"], + ) diff --git a/src/databricks/labs/ucx/contexts/workspace_cli.py b/src/databricks/labs/ucx/contexts/workspace_cli.py index 0ef0edc3aa..2c4830a863 100644 --- a/src/databricks/labs/ucx/contexts/workspace_cli.py +++ b/src/databricks/labs/ucx/contexts/workspace_cli.py @@ -18,6 +18,7 @@ from databricks.labs.ucx.azure.resources import AzureAPIClient, AzureResources from databricks.labs.ucx.contexts.application import CliContext from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex +from databricks.labs.ucx.progress.install import ProgressTrackingInstallation from databricks.labs.ucx.source_code.base import CurrentSessionState from databricks.labs.ucx.source_code.linters.context import LinterContext from databricks.labs.ucx.source_code.linters.files import LocalFileMigrator, LocalCodeLinter @@ -179,6 +180,10 @@ def iam_role_creation(self): def notebook_loader(self) -> NotebookLoader: return NotebookLoader() + @cached_property + def progress_tracking_installation(self) -> ProgressTrackingInstallation: + return ProgressTrackingInstallation(self.sql_backend, self.config.ucx_catalog) + class LocalCheckoutContext(WorkspaceContext): """Local context extends Workspace context to provide extra properties diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index e6148a0da5..945a79d861 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -55,9 +55,11 @@ TEST_RESOURCE_PURGE_TIMEOUT = timedelta(hours=1) TEST_NIGHTLY_CI_RESOURCES_PURGE_TIMEOUT = timedelta(hours=3) # Buffer for debugging nightly integration test runs +# See https://docs.databricks.com/en/jobs/parameter-value-references.html#supported-value-references EXTRA_TASK_PARAMS = { "job_id": "{{job_id}}", "run_id": "{{run_id}}", + "start_time": "{{job.start_time.iso_datetime}}", "attempt": "{{job.repair_count}}", "parent_run_id": "{{parent_run_id}}", } @@ -108,6 +110,7 @@ f'--task=' + dbutils.widgets.get('task'), f'--job_id=' + dbutils.widgets.get('job_id'), f'--run_id=' + dbutils.widgets.get('run_id'), + f'--start_time=' + dbutils.widgets.get('start_time'), f'--attempt=' + dbutils.widgets.get('attempt'), f'--parent_run_id=' + dbutils.widgets.get('parent_run_id')) """ diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py new file mode 100644 index 0000000000..5f2ab69f7e --- /dev/null +++ b/src/databricks/labs/ucx/progress/install.py @@ -0,0 +1,22 @@ +import logging + +from databricks.labs.lsql.backends import SqlBackend +from databricks.labs.lsql.deployment import SchemaDeployer +from databricks.labs.ucx.progress.workflow_runs import WorkflowRun + +logger = logging.getLogger(__name__) + + +class ProgressTrackingInstallation: + """Install resources for UCX's progress tracking.""" + + _SCHEMA = "multiworkspace" + + def __init__(self, sql_backend: SqlBackend, ucx_catalog: str) -> None: + # `mod` is a required parameter, though, it's not used in this context without views. + self._schema_deployer = SchemaDeployer(sql_backend, self._SCHEMA, mod=None, catalog=ucx_catalog) + + def run(self) -> None: + self._schema_deployer.deploy_schema() + self._schema_deployer.deploy_table("workflow_runs", WorkflowRun) + logger.info("Installation completed successfully!") diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py new file mode 100644 index 0000000000..d92a59961b --- /dev/null +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -0,0 +1,79 @@ +import datetime as dt +import logging +from dataclasses import dataclass + +from databricks.labs.lsql.backends import SqlBackend +from databricks.sdk.errors import NotFound + + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True, kw_only=True) +class WorkflowRun: + started_at: dt.datetime + """The timestamp of the workflow run start.""" + + finished_at: dt.datetime + """The timestamp of the workflow run end.""" + + workspace_id: int + """The workspace id in which the workflow ran.""" + + workflow_name: str + """The workflow name that ran.""" + + workflow_id: int + """"The workflow id of the workflow that ran.""" + + workflow_run_id: int + """The workflow run id.""" + + workflow_run_attempt: int + """The workflow run attempt.""" + + +class WorkflowRunRecorder: + """Record workflow runs in a database.""" + + def __init__( + self, + sql_backend: SqlBackend, + ucx_catalog: str, + *, + workspace_id: int, + workflow_name: str, + workflow_id: int, + workflow_run_id: int, + workflow_run_attempt: int, + workflow_start_time: str, + ): + self._sql_backend = sql_backend + self._full_table_name = f"{ucx_catalog}.multiworkspace.workflow_runs" + self._workspace_id = workspace_id + self._workflow_name = workflow_name + self._workflow_start_time = workflow_start_time + self._workflow_id = workflow_id + self._workflow_run_id = workflow_run_id + self._workflow_run_attempt = workflow_run_attempt + + def record(self) -> None: + """Record a workflow run.""" + workflow_run = WorkflowRun( + started_at=dt.datetime.fromisoformat(self._workflow_start_time), + finished_at=dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0), + workspace_id=self._workspace_id, + workflow_name=self._workflow_name, + workflow_id=self._workflow_id, + workflow_run_id=self._workflow_run_id, + workflow_run_attempt=self._workflow_run_attempt, + ) + try: + self._sql_backend.save_table( + self._full_table_name, + [workflow_run], + WorkflowRun, + mode="append", + ) + except NotFound as e: + logger.error(f"Workflow run table not found: {self._full_table_name}", exc_info=e) diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index cc1d414be6..0b6bd8a41f 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -111,3 +111,17 @@ def refresh_table_migration_status(self, ctx: RuntimeContext) -> None: The results of the scan are stored in the `$inventory.migration_status` inventory table. """ ctx.migration_status_refresher.snapshot(force_refresh=True) + + @job_task( + depends_on=[ + crawl_grants, + assess_jobs, + assess_clusters, + assess_pipelines, + crawl_cluster_policies, + refresh_table_migration_status, + ] + ) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run of this workflow.""" + ctx.workflow_run_recorder.record() diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c4dc8f4c33..2fc3f47b08 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -53,7 +53,7 @@ from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.install import WorkspaceInstallation, WorkspaceInstaller, AccountInstaller from databricks.labs.ucx.installer.workflows import WorkflowsDeployment - +from databricks.labs.ucx.progress.install import ProgressTrackingInstallation from databricks.labs.ucx.runtime import Workflows from databricks.labs.ucx.workspace_access.groups import MigratedGroup, GroupManager @@ -445,7 +445,7 @@ def inventory_database(self) -> str: @cached_property def ucx_catalog(self) -> str: - return self._make_catalog(name=f"ucx-{self._make_random()}").name + return self._make_catalog(name=f"ucx_{self._make_random()}").name @cached_property def workspace_client(self) -> WorkspaceClient: @@ -748,6 +748,7 @@ def config(self) -> WorkspaceConfig: return WorkspaceConfig( warehouse_id=self._env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"), inventory_database=self.inventory_database, + ucx_catalog=self.ucx_catalog, connect=self.workspace_client.config, renamed_group_prefix=f'tmp-{self.inventory_database}-', ) @@ -948,6 +949,7 @@ def config(self) -> WorkspaceConfig: include_databases=self.created_databases, include_object_permissions=self.include_object_permissions, warehouse_id=self._env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"), + ucx_catalog=self.ucx_catalog, ) workspace_config = self.config_transform(workspace_config) self.installation.save(workspace_config) @@ -986,6 +988,10 @@ def workspace_installation(self): self.product_info, ) + @cached_property + def progress_tracking_installation(self) -> ProgressTrackingInstallation: + return ProgressTrackingInstallation(self.sql_backend, self.ucx_catalog) + @cached_property def extend_prompts(self): return {} diff --git a/tests/integration/progress/test_install.py b/tests/integration/progress/test_install.py new file mode 100644 index 0000000000..02f77d82b2 --- /dev/null +++ b/tests/integration/progress/test_install.py @@ -0,0 +1,7 @@ +def test_progress_tracking_installer_creates_workflow_runs_table(az_cli_ctx) -> None: + az_cli_ctx.progress_tracking_installation.run() + query = ( + f"SELECT 1 FROM tables WHERE table_catalog = '{az_cli_ctx.config.ucx_catalog}' " + "AND table_schema = 'multiworkspace' AND table_name = 'workflow_runs'" + ) + assert any(az_cli_ctx.sql_backend.fetch(query, catalog="system", schema="information_schema")) diff --git a/tests/integration/progress/test_workflow_runs.py b/tests/integration/progress/test_workflow_runs.py new file mode 100644 index 0000000000..cc7c11a680 --- /dev/null +++ b/tests/integration/progress/test_workflow_runs.py @@ -0,0 +1,29 @@ +import datetime as dt + + +def test_workflow_run_recorder_records_workflow_run(installation_ctx) -> None: + """Ensure that the workflow run recorder records a workflow run""" + start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0) + named_parameters = { + "workflow": "test", + "job_id": "123", + "parent_run_id": "456", + "start_time": start_time.isoformat(), + } + ctx = installation_ctx.replace(named_parameters=named_parameters) + ctx.progress_tracking_installation.run() + select_workflow_runs_query = f"SELECT * FROM {ctx.ucx_catalog}.multiworkspace.workflow_runs" + # Be confident that we are not selecting any workflow runs before the to-be-tested code + assert not any(ctx.sql_backend.fetch(select_workflow_runs_query)) + + ctx.workflow_run_recorder.record() + + rows = list(ctx.sql_backend.fetch(select_workflow_runs_query)) + assert len(rows) == 1 + assert rows[0].started_at == start_time + assert start_time <= rows[0].finished_at <= dt.datetime.now(tz=dt.timezone.utc) + assert rows[0].workspace_id == installation_ctx.workspace_client.get_workspace_id() + assert rows[0].workflow_name == "test" + assert rows[0].workflow_id == 123 + assert rows[0].workflow_run_id == 456 + assert rows[0].workflow_run_attempt == 0 diff --git a/tests/integration/progress/test_workflows.py b/tests/integration/progress/test_workflows.py index 3b049f802b..54c99db4f7 100644 --- a/tests/integration/progress/test_workflows.py +++ b/tests/integration/progress/test_workflows.py @@ -16,6 +16,13 @@ def test_running_real_migration_progress_job(installation_ctx: MockInstallationC installation_ctx.deployed_workflows.run_workflow("assessment") assert installation_ctx.deployed_workflows.validate_step("assessment") + # After the assessment, a user (maybe) installs the progress tracking + installation_ctx.progress_tracking_installation.run() + # Run the migration-progress workflow until completion. installation_ctx.deployed_workflows.run_workflow("migration-progress-experimental") assert installation_ctx.deployed_workflows.validate_step("migration-progress-experimental") + + # Ensure that the migration-progress workflow populated the `workflow_runs` table. + query = f"SELECT 1 FROM {installation_ctx.ucx_catalog}.multiworkspace.workflow_runs" + assert any(installation_ctx.sql_backend.fetch(query)), f"No workflow run captured: {query}" diff --git a/tests/unit/progress/test_install.py b/tests/unit/progress/test_install.py new file mode 100644 index 0000000000..d7013c316c --- /dev/null +++ b/tests/unit/progress/test_install.py @@ -0,0 +1,14 @@ +from databricks.labs.ucx.progress.install import ProgressTrackingInstallation + + +def test_progress_tracking_installation_run_creates_progress_tracking_schema(mock_backend) -> None: + installation = ProgressTrackingInstallation(mock_backend, "ucx") + installation.run() + assert "CREATE SCHEMA IF NOT EXISTS ucx.multiworkspace" in mock_backend.queries[0] + + +def test_progress_tracking_installation_run_creates_workflow_runs_table(mock_backend) -> None: + installation = ProgressTrackingInstallation(mock_backend, "ucx") + installation.run() + # Dataclass to schema conversion is tested within the lsql package + assert any("CREATE TABLE IF NOT EXISTS" in query for query in mock_backend.queries) diff --git a/tests/unit/progress/test_workflow_runs.py b/tests/unit/progress/test_workflow_runs.py new file mode 100644 index 0000000000..9ebd9527a4 --- /dev/null +++ b/tests/unit/progress/test_workflow_runs.py @@ -0,0 +1,30 @@ +import datetime as dt + +from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder + + +def test_workflow_run_record_records_workflow_run(mock_backend) -> None: + """Ensure that the workflow run recorder records a workflow run""" + start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0) + workflow_run_recorder = WorkflowRunRecorder( + mock_backend, + ucx_catalog="ucx", + workspace_id=123456789, + workflow_name="workflow", + workflow_id=123, + workflow_run_id=456, + workflow_run_attempt=0, + workflow_start_time=start_time.isoformat(), + ) + + workflow_run_recorder.record() + + rows = mock_backend.rows_written_for("ucx.multiworkspace.workflow_runs", "append") + assert len(rows) == 1 + assert rows[0].started_at == start_time + assert start_time <= rows[0].finished_at <= dt.datetime.now(tz=dt.timezone.utc) + assert rows[0].workspace_id == 123456789 + assert rows[0].workflow_name == "workflow" + assert rows[0].workflow_id == 123 + assert rows[0].workflow_run_id == 456 + assert rows[0].workflow_run_attempt == 0 diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index c0fcf5caed..1191e469b7 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -33,6 +33,7 @@ create_missing_principals, create_table_mapping, create_uber_principal, + create_ucx_catalog, download, ensure_assessment_run, installations, @@ -888,11 +889,20 @@ def test_assign_metastore_logs_account_id_and_assigns_metastore(caplog, acc_clie def test_create_ucx_catalog_calls_create_catalog(ws) -> None: prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"}) - create_catalogs_schemas(ws, prompts, ctx=WorkspaceContext(ws)) + create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws)) ws.catalogs.create.assert_called_once() +def test_create_ucx_catalog_creates_history_schema_and_table(ws, mock_backend) -> None: + prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"}) + + create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws).replace(sql_backend=mock_backend)) + + assert len(mock_backend.queries) > 0, "No queries executed on backend" + assert "CREATE SCHEMA" in mock_backend.queries[0] + + @pytest.mark.parametrize("run_as_collection", [False, True]) def test_migrate_tables_calls_migrate_table_job_run_now( run_as_collection,