From 69ddb8f455152aa2cb0f654bdfe4b2680f1213d5 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 25 Sep 2024 11:52:52 +0200 Subject: [PATCH 01/65] Add history install --- src/databricks/labs/ucx/progress/install.py | 56 +++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 src/databricks/labs/ucx/progress/install.py diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py new file mode 100644 index 0000000000..2799319914 --- /dev/null +++ b/src/databricks/labs/ucx/progress/install.py @@ -0,0 +1,56 @@ +import datetime as dt +import logging +from dataclasses import dataclass + +from databricks.labs.lsql.backends import Dataclass, SqlBackend +from databricks.sdk.errors import InternalError +from databricks.sdk.retries import retried + +from databricks.labs.ucx.framework.utils import escape_sql_identifier + + +logger = logging.getLogger(__name__) + + +@dataclass +class Record: + workspace_id: int # The workspace id + run_id: int # The workflow run id that crawled the objects + run_start_time: dt.datetime # The workflow run timestamp that crawled the objects + object_type: str # The object type, e.g. TABLE, VIEW. Forms a composite key together with object_id + object_id: str # The object id, e.g. hive_metastore.database.table. Forms a composite key together with object_id + object_data: str # The object data; the attributes of the corresponding ucx data class, e.g. table name, table ... + failures: list # The failures indicating the object is not UC compatible + owner: str # The object owner + ucx_version: str # The ucx semantic version + snapshot_id: int # An identifier for the snapshot + + +class HistoryInstallation: + """Install resources for UCX's artifacts history. + + `InternalError` are retried on create statements for resilience on sporadic Databricks issues. + """ + + _SCHEMA = "history" + + def __init__(self, sql_backend: SqlBackend, ucx_catalog: str) -> None: + self._backend = sql_backend + self._ucx_catalog = ucx_catalog + + def run(self) -> None: + self._create_schema() + self._create_table("records", Record) + logger.info(f"Installation completed successfully!") + + @retried(on=[InternalError], timeout=dt.timedelta(minutes=1)) + def _create_schema(self) -> None: + schema = f"{self._ucx_catalog}.{self._SCHEMA}" + logger.info(f"Creating {schema} database...") + self._backend.execute(f"CREATE SCHEMA IF NOT EXISTS {escape_sql_identifier(schema, maxsplit=1)}") + + @retried(on=[InternalError], timeout=dt.timedelta(minutes=1)) + def _create_table(self, name: str, klass: Dataclass) -> None: + full_name = f"{self._ucx_catalog}.{self._SCHEMA}.{name}" + logger.info(f"Create {full_name} table ...") + self._backend.create_table(escape_sql_identifier(full_name), klass) From 02d8a302b2a838b282ce8931e98a07d7415bf706 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 25 Sep 2024 11:55:56 +0200 Subject: [PATCH 02/65] Add history install to `WorkspaceContext` --- src/databricks/labs/ucx/contexts/workspace_cli.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/databricks/labs/ucx/contexts/workspace_cli.py b/src/databricks/labs/ucx/contexts/workspace_cli.py index 0ef0edc3aa..da1ed70242 100644 --- a/src/databricks/labs/ucx/contexts/workspace_cli.py +++ b/src/databricks/labs/ucx/contexts/workspace_cli.py @@ -18,6 +18,7 @@ from databricks.labs.ucx.azure.resources import AzureAPIClient, AzureResources from databricks.labs.ucx.contexts.application import CliContext from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex +from databricks.labs.ucx.progress.install import HistoryInstallation from databricks.labs.ucx.source_code.base import CurrentSessionState from databricks.labs.ucx.source_code.linters.context import LinterContext from databricks.labs.ucx.source_code.linters.files import LocalFileMigrator, LocalCodeLinter @@ -179,6 +180,10 @@ def iam_role_creation(self): def notebook_loader(self) -> NotebookLoader: return NotebookLoader() + @cached_property + def history_installation(self) -> HistoryInstallation: + return HistoryInstallation(self.sql_backend, self.config.ucx_catalog) + class LocalCheckoutContext(WorkspaceContext): """Local context extends Workspace context to provide extra properties From 26314ea619d4b3bafcd5a5e80174ef5af3d08393 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 25 Sep 2024 11:58:28 +0200 Subject: [PATCH 03/65] Run history installation in create UCX catalog --- src/databricks/labs/ucx/cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 04f0b22d0b..ce72e610aa 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -611,6 +611,7 @@ def create_ucx_catalog(w: WorkspaceClient, prompts: Prompts, ctx: WorkspaceConte """ workspace_context = ctx or WorkspaceContext(w) workspace_context.catalog_schema.create_ucx_catalog(prompts) + workspace_context.history_installation.run() @ucx.command From 786f09785f16c40e3726bf779e5ca13f855f4cae Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 25 Sep 2024 11:59:46 +0200 Subject: [PATCH 04/65] Fix test --- tests/unit/test_cli.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index c0fcf5caed..5e2a766d53 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -33,6 +33,7 @@ create_missing_principals, create_table_mapping, create_uber_principal, + create_ucx_catalog, download, ensure_assessment_run, installations, @@ -888,7 +889,7 @@ def test_assign_metastore_logs_account_id_and_assigns_metastore(caplog, acc_clie def test_create_ucx_catalog_calls_create_catalog(ws) -> None: prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"}) - create_catalogs_schemas(ws, prompts, ctx=WorkspaceContext(ws)) + create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws)) ws.catalogs.create.assert_called_once() From 0180114d467f324f74109ba0e01e7385b63e7a01 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 25 Sep 2024 12:04:42 +0200 Subject: [PATCH 05/65] Add unit test for creating UCX history schema and table --- tests/unit/test_cli.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 5e2a766d53..4b3c510c65 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -894,6 +894,15 @@ def test_create_ucx_catalog_calls_create_catalog(ws) -> None: ws.catalogs.create.assert_called_once() +def test_create_ucx_catalog_creates_history_schema_and_table(ws, mock_backend) -> None: + prompts = MockPrompts({"Please provide storage location url for catalog: .*": "metastore"}) + + create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws).replace(sql_backend=mock_backend)) + + assert "CREATE SCHEMA" in mock_backend.queries[0] + assert "CREATE TABLE" in mock_backend.queries[1] + + @pytest.mark.parametrize("run_as_collection", [False, True]) def test_migrate_tables_calls_migrate_table_job_run_now( run_as_collection, From 1e7cdac089c92ebc7a337982b232af0f48e311ea Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 25 Sep 2024 12:07:07 +0200 Subject: [PATCH 06/65] Test history installation to create history schema --- tests/unit/progress/test_install.py | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 tests/unit/progress/test_install.py diff --git a/tests/unit/progress/test_install.py b/tests/unit/progress/test_install.py new file mode 100644 index 0000000000..bef7243d53 --- /dev/null +++ b/tests/unit/progress/test_install.py @@ -0,0 +1,7 @@ +from databricks.labs.ucx.progress.install import HistoryInstallation + + +def test_history_installation_run_creates_history_schema(mock_backend) -> None: + installation = HistoryInstallation(mock_backend, "ucx") + installation.run() + assert "CREATE SCHEMA IF NOT EXISTS `ucx`.`history`" == mock_backend.queries[0] From 50fb6e51e0aeb52bb9eafc44dda42ee0807c0881 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 25 Sep 2024 12:11:44 +0200 Subject: [PATCH 07/65] Format --- src/databricks/labs/ucx/progress/install.py | 2 +- tests/unit/progress/test_install.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py index 2799319914..b0e3ae3159 100644 --- a/src/databricks/labs/ucx/progress/install.py +++ b/src/databricks/labs/ucx/progress/install.py @@ -41,7 +41,7 @@ def __init__(self, sql_backend: SqlBackend, ucx_catalog: str) -> None: def run(self) -> None: self._create_schema() self._create_table("records", Record) - logger.info(f"Installation completed successfully!") + logger.info("Installation completed successfully!") @retried(on=[InternalError], timeout=dt.timedelta(minutes=1)) def _create_schema(self) -> None: diff --git a/tests/unit/progress/test_install.py b/tests/unit/progress/test_install.py index bef7243d53..c0954d45ae 100644 --- a/tests/unit/progress/test_install.py +++ b/tests/unit/progress/test_install.py @@ -4,4 +4,4 @@ def test_history_installation_run_creates_history_schema(mock_backend) -> None: installation = HistoryInstallation(mock_backend, "ucx") installation.run() - assert "CREATE SCHEMA IF NOT EXISTS `ucx`.`history`" == mock_backend.queries[0] + assert mock_backend.queries[0] == "CREATE SCHEMA IF NOT EXISTS `ucx`.`history`" From a3cd1c5af5a4ef8d04698917d0ce03f1dcbd6661 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 25 Sep 2024 13:22:52 +0200 Subject: [PATCH 08/65] Create UCX catalog on `MockWorkspaceContext` --- tests/integration/conftest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c4dc8f4c33..b0c87d9293 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -748,6 +748,7 @@ def config(self) -> WorkspaceConfig: return WorkspaceConfig( warehouse_id=self._env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"), inventory_database=self.inventory_database, + ucx_catalog=self.ucx_catalog, connect=self.workspace_client.config, renamed_group_prefix=f'tmp-{self.inventory_database}-', ) From 60389b53432ef92236964fb42c120a0c2f95ad55 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 13:12:35 +0200 Subject: [PATCH 09/65] Remove `Record` data class and table creation --- src/databricks/labs/ucx/progress/install.py | 16 ---------------- tests/unit/test_cli.py | 1 - 2 files changed, 17 deletions(-) diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py index b0e3ae3159..817823bc9f 100644 --- a/src/databricks/labs/ucx/progress/install.py +++ b/src/databricks/labs/ucx/progress/install.py @@ -1,6 +1,5 @@ import datetime as dt import logging -from dataclasses import dataclass from databricks.labs.lsql.backends import Dataclass, SqlBackend from databricks.sdk.errors import InternalError @@ -12,20 +11,6 @@ logger = logging.getLogger(__name__) -@dataclass -class Record: - workspace_id: int # The workspace id - run_id: int # The workflow run id that crawled the objects - run_start_time: dt.datetime # The workflow run timestamp that crawled the objects - object_type: str # The object type, e.g. TABLE, VIEW. Forms a composite key together with object_id - object_id: str # The object id, e.g. hive_metastore.database.table. Forms a composite key together with object_id - object_data: str # The object data; the attributes of the corresponding ucx data class, e.g. table name, table ... - failures: list # The failures indicating the object is not UC compatible - owner: str # The object owner - ucx_version: str # The ucx semantic version - snapshot_id: int # An identifier for the snapshot - - class HistoryInstallation: """Install resources for UCX's artifacts history. @@ -40,7 +25,6 @@ def __init__(self, sql_backend: SqlBackend, ucx_catalog: str) -> None: def run(self) -> None: self._create_schema() - self._create_table("records", Record) logger.info("Installation completed successfully!") @retried(on=[InternalError], timeout=dt.timedelta(minutes=1)) diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 4b3c510c65..6c02404ebf 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -900,7 +900,6 @@ def test_create_ucx_catalog_creates_history_schema_and_table(ws, mock_backend) - create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws).replace(sql_backend=mock_backend)) assert "CREATE SCHEMA" in mock_backend.queries[0] - assert "CREATE TABLE" in mock_backend.queries[1] @pytest.mark.parametrize("run_as_collection", [False, True]) From 8aeba4070f633082f2faee4d67a14094d71bcd1e Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 13:15:09 +0200 Subject: [PATCH 10/65] Make test more robust --- tests/unit/test_cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 6c02404ebf..1191e469b7 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -899,6 +899,7 @@ def test_create_ucx_catalog_creates_history_schema_and_table(ws, mock_backend) - create_ucx_catalog(ws, prompts, ctx=WorkspaceContext(ws).replace(sql_backend=mock_backend)) + assert len(mock_backend.queries) > 0, "No queries executed on backend" assert "CREATE SCHEMA" in mock_backend.queries[0] From 516107449856feae4206555cefe43a6b7f7e10d6 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 13:18:41 +0200 Subject: [PATCH 11/65] Use `SchemaDeployer` to deploy schema --- src/databricks/labs/ucx/progress/install.py | 31 ++++----------------- tests/unit/progress/test_install.py | 2 +- 2 files changed, 7 insertions(+), 26 deletions(-) diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py index 817823bc9f..e452c68d04 100644 --- a/src/databricks/labs/ucx/progress/install.py +++ b/src/databricks/labs/ucx/progress/install.py @@ -1,40 +1,21 @@ -import datetime as dt import logging -from databricks.labs.lsql.backends import Dataclass, SqlBackend -from databricks.sdk.errors import InternalError -from databricks.sdk.retries import retried - -from databricks.labs.ucx.framework.utils import escape_sql_identifier +from databricks.labs.lsql.backends import SqlBackend +from databricks.labs.lsql.deployment import SchemaDeployer logger = logging.getLogger(__name__) class HistoryInstallation: - """Install resources for UCX's artifacts history. - - `InternalError` are retried on create statements for resilience on sporadic Databricks issues. - """ + """Install resources for UCX's artifacts history.""" _SCHEMA = "history" def __init__(self, sql_backend: SqlBackend, ucx_catalog: str) -> None: - self._backend = sql_backend - self._ucx_catalog = ucx_catalog + # `mod` is required parameter, but it's not used in this context. + self._schema_deployer = SchemaDeployer(sql_backend, self._SCHEMA, mod=None, catalog=ucx_catalog) def run(self) -> None: - self._create_schema() + self._schema_deployer.deploy_schema() logger.info("Installation completed successfully!") - - @retried(on=[InternalError], timeout=dt.timedelta(minutes=1)) - def _create_schema(self) -> None: - schema = f"{self._ucx_catalog}.{self._SCHEMA}" - logger.info(f"Creating {schema} database...") - self._backend.execute(f"CREATE SCHEMA IF NOT EXISTS {escape_sql_identifier(schema, maxsplit=1)}") - - @retried(on=[InternalError], timeout=dt.timedelta(minutes=1)) - def _create_table(self, name: str, klass: Dataclass) -> None: - full_name = f"{self._ucx_catalog}.{self._SCHEMA}.{name}" - logger.info(f"Create {full_name} table ...") - self._backend.create_table(escape_sql_identifier(full_name), klass) diff --git a/tests/unit/progress/test_install.py b/tests/unit/progress/test_install.py index c0954d45ae..6bff481600 100644 --- a/tests/unit/progress/test_install.py +++ b/tests/unit/progress/test_install.py @@ -4,4 +4,4 @@ def test_history_installation_run_creates_history_schema(mock_backend) -> None: installation = HistoryInstallation(mock_backend, "ucx") installation.run() - assert mock_backend.queries[0] == "CREATE SCHEMA IF NOT EXISTS `ucx`.`history`" + assert mock_backend.queries[0] == "CREATE SCHEMA IF NOT EXISTS ucx.history" From 4fe1de64049e905e839108520be73df9bd8557b3 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 13:22:44 +0200 Subject: [PATCH 12/65] Add `WorkflowRun` dataclass --- src/databricks/labs/ucx/progress/install.py | 29 +++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py index e452c68d04..81a50d9b2a 100644 --- a/src/databricks/labs/ucx/progress/install.py +++ b/src/databricks/labs/ucx/progress/install.py @@ -1,4 +1,6 @@ +import datetime as dt import logging +from dataclasses import dataclass from databricks.labs.lsql.backends import SqlBackend from databricks.labs.lsql.deployment import SchemaDeployer @@ -7,6 +9,33 @@ logger = logging.getLogger(__name__) +@dataclass(frozen=True, kw_only=True) +class WorkflowRun: + started_at: dt.datetime + """The timestamp of the workflow run start.""" + + finished_at: dt.datetime + """The timestamp of the workflow run end.""" + + workspace_id: int + """The workspace id in which the workflow ran.""" + + workflow_name: str + """The workflow name that ran.""" + + workflow_id: int + """"The workflow id of the workflow that ran.""" + + workflow_run_id: int + """The workflow run id.""" + + run_as: str + """The identity the workflow was run as`""" + + status: str + """The workflow run final status""" + + class HistoryInstallation: """Install resources for UCX's artifacts history.""" From 11c3cb99ee757a66abf2455ec7f66ed2fccc05d5 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 13:24:24 +0200 Subject: [PATCH 13/65] Make test more robust --- tests/unit/progress/test_install.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/progress/test_install.py b/tests/unit/progress/test_install.py index 6bff481600..38f847f809 100644 --- a/tests/unit/progress/test_install.py +++ b/tests/unit/progress/test_install.py @@ -4,4 +4,4 @@ def test_history_installation_run_creates_history_schema(mock_backend) -> None: installation = HistoryInstallation(mock_backend, "ucx") installation.run() - assert mock_backend.queries[0] == "CREATE SCHEMA IF NOT EXISTS ucx.history" + assert "CREATE SCHEMA IF NOT EXISTS ucx.history" in mock_backend.queries[0] From 1ad8bb4e3a344f2d05888d736c6acd0414d88bcf Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 13:26:22 +0200 Subject: [PATCH 14/65] Create `workflow_runs` table --- src/databricks/labs/ucx/progress/install.py | 1 + tests/unit/progress/test_install.py | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py index 81a50d9b2a..a5a889784a 100644 --- a/src/databricks/labs/ucx/progress/install.py +++ b/src/databricks/labs/ucx/progress/install.py @@ -47,4 +47,5 @@ def __init__(self, sql_backend: SqlBackend, ucx_catalog: str) -> None: def run(self) -> None: self._schema_deployer.deploy_schema() + self._schema_deployer.deploy_table("workflow_runs", WorkflowRun) logger.info("Installation completed successfully!") diff --git a/tests/unit/progress/test_install.py b/tests/unit/progress/test_install.py index 38f847f809..d5967aa3c8 100644 --- a/tests/unit/progress/test_install.py +++ b/tests/unit/progress/test_install.py @@ -5,3 +5,10 @@ def test_history_installation_run_creates_history_schema(mock_backend) -> None: installation = HistoryInstallation(mock_backend, "ucx") installation.run() assert "CREATE SCHEMA IF NOT EXISTS ucx.history" in mock_backend.queries[0] + + +def test_history_installation_run_creates_workflow_runs_table(mock_backend) -> None: + installation = HistoryInstallation(mock_backend, "ucx") + installation.run() + # Dataclass to schema conversion is tested within the lsql package + assert any("CREATE TABLE IF NOT EXISTS" in query for query in mock_backend.queries) From 68145f98308658aa3cbcc9e73dcc8051150cf81b Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 16:09:33 +0200 Subject: [PATCH 15/65] Rename `HistoryInstallation` to `ProgressTrackingInstaller` --- src/databricks/labs/ucx/cli.py | 2 +- src/databricks/labs/ucx/contexts/workspace_cli.py | 6 +++--- src/databricks/labs/ucx/progress/install.py | 4 ++-- tests/unit/progress/test_install.py | 10 +++++----- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index ce72e610aa..beafd6425a 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -611,7 +611,7 @@ def create_ucx_catalog(w: WorkspaceClient, prompts: Prompts, ctx: WorkspaceConte """ workspace_context = ctx or WorkspaceContext(w) workspace_context.catalog_schema.create_ucx_catalog(prompts) - workspace_context.history_installation.run() + workspace_context.progress_tracking_installer.run() @ucx.command diff --git a/src/databricks/labs/ucx/contexts/workspace_cli.py b/src/databricks/labs/ucx/contexts/workspace_cli.py index da1ed70242..c3bebeeeef 100644 --- a/src/databricks/labs/ucx/contexts/workspace_cli.py +++ b/src/databricks/labs/ucx/contexts/workspace_cli.py @@ -18,7 +18,7 @@ from databricks.labs.ucx.azure.resources import AzureAPIClient, AzureResources from databricks.labs.ucx.contexts.application import CliContext from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex -from databricks.labs.ucx.progress.install import HistoryInstallation +from databricks.labs.ucx.progress.install import ProgressTrackingInstaller from databricks.labs.ucx.source_code.base import CurrentSessionState from databricks.labs.ucx.source_code.linters.context import LinterContext from databricks.labs.ucx.source_code.linters.files import LocalFileMigrator, LocalCodeLinter @@ -181,8 +181,8 @@ def notebook_loader(self) -> NotebookLoader: return NotebookLoader() @cached_property - def history_installation(self) -> HistoryInstallation: - return HistoryInstallation(self.sql_backend, self.config.ucx_catalog) + def progress_tracking_installer(self) -> ProgressTrackingInstaller: + return ProgressTrackingInstaller(self.sql_backend, self.config.ucx_catalog) class LocalCheckoutContext(WorkspaceContext): diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py index a5a889784a..e469dfbd5e 100644 --- a/src/databricks/labs/ucx/progress/install.py +++ b/src/databricks/labs/ucx/progress/install.py @@ -36,8 +36,8 @@ class WorkflowRun: """The workflow run final status""" -class HistoryInstallation: - """Install resources for UCX's artifacts history.""" +class ProgressTrackingInstaller: + """Install resources for UCX's progress tracking.""" _SCHEMA = "history" diff --git a/tests/unit/progress/test_install.py b/tests/unit/progress/test_install.py index d5967aa3c8..bb2132f39a 100644 --- a/tests/unit/progress/test_install.py +++ b/tests/unit/progress/test_install.py @@ -1,14 +1,14 @@ -from databricks.labs.ucx.progress.install import HistoryInstallation +from databricks.labs.ucx.progress.install import ProgressTrackingInstaller -def test_history_installation_run_creates_history_schema(mock_backend) -> None: - installation = HistoryInstallation(mock_backend, "ucx") +def test_progress_tracking_installation_run_creates_progress_tracking_schema(mock_backend) -> None: + installation = ProgressTrackingInstaller(mock_backend, "ucx") installation.run() assert "CREATE SCHEMA IF NOT EXISTS ucx.history" in mock_backend.queries[0] -def test_history_installation_run_creates_workflow_runs_table(mock_backend) -> None: - installation = HistoryInstallation(mock_backend, "ucx") +def test_progress_tracking_installation_run_creates_workflow_runs_table(mock_backend) -> None: + installation = ProgressTrackingInstaller(mock_backend, "ucx") installation.run() # Dataclass to schema conversion is tested within the lsql package assert any("CREATE TABLE IF NOT EXISTS" in query for query in mock_backend.queries) From 9c08d3910671d5ce77940fc1eef508585025b7f3 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 16:12:17 +0200 Subject: [PATCH 16/65] Rename progress tracking schem to 'multiworkspace' --- src/databricks/labs/ucx/progress/install.py | 2 +- tests/unit/progress/test_install.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py index e469dfbd5e..10222de786 100644 --- a/src/databricks/labs/ucx/progress/install.py +++ b/src/databricks/labs/ucx/progress/install.py @@ -39,7 +39,7 @@ class WorkflowRun: class ProgressTrackingInstaller: """Install resources for UCX's progress tracking.""" - _SCHEMA = "history" + _SCHEMA = "multiworkspace" def __init__(self, sql_backend: SqlBackend, ucx_catalog: str) -> None: # `mod` is required parameter, but it's not used in this context. diff --git a/tests/unit/progress/test_install.py b/tests/unit/progress/test_install.py index bb2132f39a..a62fae6e75 100644 --- a/tests/unit/progress/test_install.py +++ b/tests/unit/progress/test_install.py @@ -4,7 +4,7 @@ def test_progress_tracking_installation_run_creates_progress_tracking_schema(mock_backend) -> None: installation = ProgressTrackingInstaller(mock_backend, "ucx") installation.run() - assert "CREATE SCHEMA IF NOT EXISTS ucx.history" in mock_backend.queries[0] + assert "CREATE SCHEMA IF NOT EXISTS ucx.multiworkspace" in mock_backend.queries[0] def test_progress_tracking_installation_run_creates_workflow_runs_table(mock_backend) -> None: From 2f0e01910c935171e645d3196a290e5e5533f638 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 16:21:43 +0200 Subject: [PATCH 17/65] Move WorkflowRun to separate module --- src/databricks/labs/ucx/progress/install.py | 31 +------------------ .../labs/ucx/progress/workflow_runs.py | 29 +++++++++++++++++ 2 files changed, 30 insertions(+), 30 deletions(-) create mode 100644 src/databricks/labs/ucx/progress/workflow_runs.py diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py index 10222de786..932fdc5d1b 100644 --- a/src/databricks/labs/ucx/progress/install.py +++ b/src/databricks/labs/ucx/progress/install.py @@ -1,41 +1,12 @@ -import datetime as dt import logging -from dataclasses import dataclass from databricks.labs.lsql.backends import SqlBackend from databricks.labs.lsql.deployment import SchemaDeployer - +from databricks.labs.ucx.progress.workflow_runs import WorkflowRun logger = logging.getLogger(__name__) -@dataclass(frozen=True, kw_only=True) -class WorkflowRun: - started_at: dt.datetime - """The timestamp of the workflow run start.""" - - finished_at: dt.datetime - """The timestamp of the workflow run end.""" - - workspace_id: int - """The workspace id in which the workflow ran.""" - - workflow_name: str - """The workflow name that ran.""" - - workflow_id: int - """"The workflow id of the workflow that ran.""" - - workflow_run_id: int - """The workflow run id.""" - - run_as: str - """The identity the workflow was run as`""" - - status: str - """The workflow run final status""" - - class ProgressTrackingInstaller: """Install resources for UCX's progress tracking.""" diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py new file mode 100644 index 0000000000..eb81d832e9 --- /dev/null +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -0,0 +1,29 @@ +import datetime as dt +from dataclasses import dataclass + + +@dataclass(frozen=True, kw_only=True) +class WorkflowRun: + started_at: dt.datetime + """The timestamp of the workflow run start.""" + + finished_at: dt.datetime + """The timestamp of the workflow run end.""" + + workspace_id: int + """The workspace id in which the workflow ran.""" + + workflow_name: str + """The workflow name that ran.""" + + workflow_id: int + """"The workflow id of the workflow that ran.""" + + workflow_run_id: int + """The workflow run id.""" + + run_as: str + """The identity the workflow was run as`""" + + status: str + """The workflow run final status""" From 302fc1df59e40150ea606dd005a8511ce73b55e0 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 16:42:47 +0200 Subject: [PATCH 18/65] Add workflow run recorder --- .../labs/ucx/progress/workflow_runs.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index eb81d832e9..8c996047db 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -1,6 +1,9 @@ import datetime as dt from dataclasses import dataclass +from databricks.labs.lsql.backends import SqlBackend +from databricks.sdk import WorkspaceClient + @dataclass(frozen=True, kw_only=True) class WorkflowRun: @@ -27,3 +30,43 @@ class WorkflowRun: status: str """The workflow run final status""" + + +class WorkflowRunRecorder: + """Record workflow runs in a database.""" + + def __init__( + self, + ws: WorkspaceClient, + sql_backend: SqlBackend, + *, + workflow_name: str, + workflow_id: int, + workflow_run_id: int, + attempt: int, + ): + self._ws = ws + self._sql_backend = sql_backend + self._full_table_name = f"{self._catalog}.multiworkspace.workflow_runs" + self._workflow_name = workflow_name + self._workflow_id = workflow_id + self._workflow_run_id = workflow_run_id + + def record(self) -> None: + """Record a workflow run in the database.""" + workflow_run = WorkflowRun( + started_at=dt.datetime.now(), # TODO: Update this + finished_at=dt.datetime.now(), + workspace_id=self._ws.get_workspace_id(), + workflow_name=self._workflow_name, + workflow_id=self._workflow_id, + workflow_run_id=self._workflow_run_id, + run_as="UNKOWN", # TODO Update this, + status="RUNNING", # Always RUNNING as it is called during a running workflow + ) + self._sql_backend.save_table( + self._full_table_name, + [workflow_run], + WorkflowRun, + mode="append", + ) From 8054949746c3cd470dc470cd30167987d22e7f72 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 16:44:18 +0200 Subject: [PATCH 19/65] Add start time to workflow --- src/databricks/labs/ucx/framework/tasks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/databricks/labs/ucx/framework/tasks.py b/src/databricks/labs/ucx/framework/tasks.py index 4fee024058..3ada818644 100644 --- a/src/databricks/labs/ucx/framework/tasks.py +++ b/src/databricks/labs/ucx/framework/tasks.py @@ -1,3 +1,4 @@ +import datetime as dt from collections.abc import Callable, Iterable from dataclasses import dataclass @@ -65,6 +66,7 @@ def parse_args(*argv) -> dict[str, str]: class Workflow: def __init__(self, name: str): self._name = name + self._start_time = dt.datetime.now() @property def name(self): From c58d9ed4fe73dbdae16ddc861fa6931ff9ab1337 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 16:45:17 +0200 Subject: [PATCH 20/65] Expect start time on record --- src/databricks/labs/ucx/progress/workflow_runs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index 8c996047db..aa48cdbb41 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -52,10 +52,10 @@ def __init__( self._workflow_id = workflow_id self._workflow_run_id = workflow_run_id - def record(self) -> None: + def record(self, *, start_time: dt.datetime) -> None: """Record a workflow run in the database.""" workflow_run = WorkflowRun( - started_at=dt.datetime.now(), # TODO: Update this + started_at=start_time, finished_at=dt.datetime.now(), workspace_id=self._ws.get_workspace_id(), workflow_name=self._workflow_name, From af119bba6e63b763c3fc1a794203c736531cffa2 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 16:48:04 +0200 Subject: [PATCH 21/65] Add attempt to workflow run --- src/databricks/labs/ucx/progress/workflow_runs.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index aa48cdbb41..dac7a3e244 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -25,6 +25,9 @@ class WorkflowRun: workflow_run_id: int """The workflow run id.""" + workflow_run_attempt: int + """The workflow run attempt.""" + run_as: str """The identity the workflow was run as`""" @@ -43,7 +46,7 @@ def __init__( workflow_name: str, workflow_id: int, workflow_run_id: int, - attempt: int, + workflow_run_attempt: int, ): self._ws = ws self._sql_backend = sql_backend @@ -51,6 +54,7 @@ def __init__( self._workflow_name = workflow_name self._workflow_id = workflow_id self._workflow_run_id = workflow_run_id + self._workflow_run_attempt = workflow_run_attempt def record(self, *, start_time: dt.datetime) -> None: """Record a workflow run in the database.""" @@ -61,6 +65,7 @@ def record(self, *, start_time: dt.datetime) -> None: workflow_name=self._workflow_name, workflow_id=self._workflow_id, workflow_run_id=self._workflow_run_id, + workflow_run_attempt=self._workflow_run_attempt, run_as="UNKOWN", # TODO Update this, status="RUNNING", # Always RUNNING as it is called during a running workflow ) From 01b438f17895e6716f89f1d14fe714702bdb6c28 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 17:06:10 +0200 Subject: [PATCH 22/65] Add job name and start time to task parameters --- src/databricks/labs/ucx/installer/workflows.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index e6148a0da5..aa01c9441d 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -55,9 +55,12 @@ TEST_RESOURCE_PURGE_TIMEOUT = timedelta(hours=1) TEST_NIGHTLY_CI_RESOURCES_PURGE_TIMEOUT = timedelta(hours=3) # Buffer for debugging nightly integration test runs +# See https://docs.databricks.com/en/jobs/parameter-value-references.html#supported-value-references EXTRA_TASK_PARAMS = { "job_id": "{{job_id}}", + "job_name": "{{job.name}}", "run_id": "{{run_id}}", + "start_time": "{{job.start_time.iso_datetime}}", "attempt": "{{job.repair_count}}", "parent_run_id": "{{parent_run_id}}", } From 1c9b31dcde086893e460bf7269d26c2d8800226c Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 17:08:45 +0200 Subject: [PATCH 23/65] Add `WorkflowRunRecorder` to `RuntimeContext` --- src/databricks/labs/ucx/contexts/workflow_task.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 488c224243..ef23a227d0 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -16,6 +16,7 @@ from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder +from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder class RuntimeContext(GlobalContext): @@ -109,3 +110,14 @@ def task_run_warning_recorder(self): self.inventory_database, int(self.named_parameters.get("attempt", "0")), ) + + @cached_property + def workflow_run_recorder(self): + return WorkflowRunRecorder( + self.workspace_client, + self.sql_backend, + workflow_name=self.named_parameters["job_name"], + workflow_id=int(self.named_parameters["job_id"]), + workflow_run_id=int(self.named_parameters["parent_run_id"]), + workflow_run_attempt=int(self.named_parameters.get("attempt", 0)), + ) From 48a1a86dced213f06c919295316f9f7cb747e6c9 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 17:11:21 +0200 Subject: [PATCH 24/65] Expect workflow start time on initialisation --- src/databricks/labs/ucx/contexts/workflow_task.py | 1 + src/databricks/labs/ucx/progress/workflow_runs.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index ef23a227d0..01218f4c27 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -120,4 +120,5 @@ def workflow_run_recorder(self): workflow_id=int(self.named_parameters["job_id"]), workflow_run_id=int(self.named_parameters["parent_run_id"]), workflow_run_attempt=int(self.named_parameters.get("attempt", 0)), + workflow_start_time=self.named_parameters["workflow_start_time"], ) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index dac7a3e244..ab4f126688 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -47,19 +47,21 @@ def __init__( workflow_id: int, workflow_run_id: int, workflow_run_attempt: int, + workflow_start_time: str, ): self._ws = ws self._sql_backend = sql_backend self._full_table_name = f"{self._catalog}.multiworkspace.workflow_runs" + self._workflow_start_time = workflow_start_time, self._workflow_name = workflow_name self._workflow_id = workflow_id self._workflow_run_id = workflow_run_id self._workflow_run_attempt = workflow_run_attempt - def record(self, *, start_time: dt.datetime) -> None: + def record(self) -> None: """Record a workflow run in the database.""" workflow_run = WorkflowRun( - started_at=start_time, + started_at=dt.datetime.fromisoformat(self._workflow_start_time), finished_at=dt.datetime.now(), workspace_id=self._ws.get_workspace_id(), workflow_name=self._workflow_name, From c7d8dff9abd1df9ee5af29bdf194e75aaabfb59a Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 17:14:52 +0200 Subject: [PATCH 25/65] Record workflow run --- src/databricks/labs/ucx/progress/workflows.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index cc1d414be6..35e4578f06 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -111,3 +111,17 @@ def refresh_table_migration_status(self, ctx: RuntimeContext) -> None: The results of the scan are stored in the `$inventory.migration_status` inventory table. """ ctx.migration_status_refresher.snapshot(force_refresh=True) + + @job_task( + depends_on=[ + crawl_grants, + assess_jobs, + assess_clusters, + assess_pipelines, + crawl_cluster_policies, + refresh_table_migration_status, + ] + ) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run (of this workflow.""" + ctx.workflow_run_recorder.record() From f9671945232f854149e3ae6d2ea0a2711ad8c6bd Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 17:15:18 +0200 Subject: [PATCH 26/65] Add record workflow run task to migration progress workflow --- src/databricks/labs/ucx/progress/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 35e4578f06..0b6bd8a41f 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -123,5 +123,5 @@ def refresh_table_migration_status(self, ctx: RuntimeContext) -> None: ] ) def record_workflow_run(self, ctx: RuntimeContext) -> None: - """Record the workflow run (of this workflow.""" + """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() From b84e60a0cc1252e5cd3e1ad80f0f4b4a9f769955 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 17:15:50 +0200 Subject: [PATCH 27/65] Format --- src/databricks/labs/ucx/progress/workflow_runs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index ab4f126688..35b230751a 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -52,7 +52,7 @@ def __init__( self._ws = ws self._sql_backend = sql_backend self._full_table_name = f"{self._catalog}.multiworkspace.workflow_runs" - self._workflow_start_time = workflow_start_time, + self._workflow_start_time = workflow_start_time self._workflow_name = workflow_name self._workflow_id = workflow_id self._workflow_run_id = workflow_run_id From 9c25c06a015c1a063547c0f20bb2c1bc19f5d4f4 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 17:16:07 +0200 Subject: [PATCH 28/65] Fix reference to catalog --- src/databricks/labs/ucx/progress/workflow_runs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index 35b230751a..950943120c 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -51,7 +51,7 @@ def __init__( ): self._ws = ws self._sql_backend = sql_backend - self._full_table_name = f"{self._catalog}.multiworkspace.workflow_runs" + self._full_table_name = f"{catalog}.multiworkspace.workflow_runs" self._workflow_start_time = workflow_start_time self._workflow_name = workflow_name self._workflow_id = workflow_id From fdb643d355664d517f0fa12efd3ebfe6fd56e105 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 26 Sep 2024 17:16:13 +0200 Subject: [PATCH 29/65] Fix TODO --- src/databricks/labs/ucx/progress/workflow_runs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index 950943120c..3b84e04567 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -68,7 +68,7 @@ def record(self) -> None: workflow_id=self._workflow_id, workflow_run_id=self._workflow_run_id, workflow_run_attempt=self._workflow_run_attempt, - run_as="UNKOWN", # TODO Update this, + run_as="UNKOWN", # TODO Update this status="RUNNING", # Always RUNNING as it is called during a running workflow ) self._sql_backend.save_table( From 34d44e56c8a88229493820381e811f6af8b65fbe Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 08:57:11 +0200 Subject: [PATCH 30/65] Remove start time on Workflow --- src/databricks/labs/ucx/framework/tasks.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/databricks/labs/ucx/framework/tasks.py b/src/databricks/labs/ucx/framework/tasks.py index 3ada818644..4fee024058 100644 --- a/src/databricks/labs/ucx/framework/tasks.py +++ b/src/databricks/labs/ucx/framework/tasks.py @@ -1,4 +1,3 @@ -import datetime as dt from collections.abc import Callable, Iterable from dataclasses import dataclass @@ -66,7 +65,6 @@ def parse_args(*argv) -> dict[str, str]: class Workflow: def __init__(self, name: str): self._name = name - self._start_time = dt.datetime.now() @property def name(self): From 6cf4c9759e114e30a362ca6f296e67c52979a4f2 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 09:06:02 +0200 Subject: [PATCH 31/65] Record UCX internal workflow name --- src/databricks/labs/ucx/contexts/workflow_task.py | 1 - src/databricks/labs/ucx/progress/workflow_runs.py | 12 +++++++----- src/databricks/labs/ucx/progress/workflows.py | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 01218f4c27..730044c1bd 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -116,7 +116,6 @@ def workflow_run_recorder(self): return WorkflowRunRecorder( self.workspace_client, self.sql_backend, - workflow_name=self.named_parameters["job_name"], workflow_id=int(self.named_parameters["job_id"]), workflow_run_id=int(self.named_parameters["parent_run_id"]), workflow_run_attempt=int(self.named_parameters.get("attempt", 0)), diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index 3b84e04567..e7e86b0d71 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -43,7 +43,6 @@ def __init__( ws: WorkspaceClient, sql_backend: SqlBackend, *, - workflow_name: str, workflow_id: int, workflow_run_id: int, workflow_run_attempt: int, @@ -53,18 +52,21 @@ def __init__( self._sql_backend = sql_backend self._full_table_name = f"{catalog}.multiworkspace.workflow_runs" self._workflow_start_time = workflow_start_time - self._workflow_name = workflow_name self._workflow_id = workflow_id self._workflow_run_id = workflow_run_id self._workflow_run_attempt = workflow_run_attempt - def record(self) -> None: - """Record a workflow run in the database.""" + def record(self, *, workflow_name: str) -> None: + """Record a workflow run in the database. + + Args: + workflow_name (str): The UCX internal workflow name. + """ workflow_run = WorkflowRun( started_at=dt.datetime.fromisoformat(self._workflow_start_time), finished_at=dt.datetime.now(), workspace_id=self._ws.get_workspace_id(), - workflow_name=self._workflow_name, + workflow_name=workflow_name, workflow_id=self._workflow_id, workflow_run_id=self._workflow_run_id, workflow_run_attempt=self._workflow_run_attempt, diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 0b6bd8a41f..c608f45f62 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -124,4 +124,4 @@ def refresh_table_migration_status(self, ctx: RuntimeContext) -> None: ) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" - ctx.workflow_run_recorder.record() + ctx.workflow_run_recorder.record(self._name) From 097727c5cf705ff6295073372baf79bf7adc3ee6 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 09:06:11 +0200 Subject: [PATCH 32/65] Remove job name from job parameters --- src/databricks/labs/ucx/installer/workflows.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index aa01c9441d..df4dcf1d48 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -58,7 +58,6 @@ # See https://docs.databricks.com/en/jobs/parameter-value-references.html#supported-value-references EXTRA_TASK_PARAMS = { "job_id": "{{job_id}}", - "job_name": "{{job.name}}", "run_id": "{{run_id}}", "start_time": "{{job.start_time.iso_datetime}}", "attempt": "{{job.repair_count}}", From cfb44adcd7996a5c049398e30da9dd33369b96d9 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 09:07:07 +0200 Subject: [PATCH 33/65] Remove WorkflowRun.status --- src/databricks/labs/ucx/progress/workflow_runs.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index e7e86b0d71..5e28f66dd9 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -31,9 +31,6 @@ class WorkflowRun: run_as: str """The identity the workflow was run as`""" - status: str - """The workflow run final status""" - class WorkflowRunRecorder: """Record workflow runs in a database.""" @@ -71,7 +68,6 @@ def record(self, *, workflow_name: str) -> None: workflow_run_id=self._workflow_run_id, workflow_run_attempt=self._workflow_run_attempt, run_as="UNKOWN", # TODO Update this - status="RUNNING", # Always RUNNING as it is called during a running workflow ) self._sql_backend.save_table( self._full_table_name, From 4c8242c3e48c5f0bdffd5c3f2b1cf5613eb45929 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 09:18:28 +0200 Subject: [PATCH 34/65] Add missing UCX catalog parameter --- src/databricks/labs/ucx/contexts/workflow_task.py | 1 + src/databricks/labs/ucx/progress/workflow_runs.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 730044c1bd..c5665c8f18 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -116,6 +116,7 @@ def workflow_run_recorder(self): return WorkflowRunRecorder( self.workspace_client, self.sql_backend, + self.config.ucx_catalog, workflow_id=int(self.named_parameters["job_id"]), workflow_run_id=int(self.named_parameters["parent_run_id"]), workflow_run_attempt=int(self.named_parameters.get("attempt", 0)), diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index 5e28f66dd9..e2566fbac0 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -39,6 +39,7 @@ def __init__( self, ws: WorkspaceClient, sql_backend: SqlBackend, + ucx_catalog: str, *, workflow_id: int, workflow_run_id: int, @@ -47,7 +48,7 @@ def __init__( ): self._ws = ws self._sql_backend = sql_backend - self._full_table_name = f"{catalog}.multiworkspace.workflow_runs" + self._full_table_name = f"{ucx_catalog}.multiworkspace.workflow_runs" self._workflow_start_time = workflow_start_time self._workflow_id = workflow_id self._workflow_run_id = workflow_run_id From 472136441674606667ff227a01c046bcabb73bd8 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 09:33:51 +0200 Subject: [PATCH 35/65] Test progress tracking installer creates workflow runs table --- tests/integration/progress/test_install.py | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 tests/integration/progress/test_install.py diff --git a/tests/integration/progress/test_install.py b/tests/integration/progress/test_install.py new file mode 100644 index 0000000000..d45e0b46a9 --- /dev/null +++ b/tests/integration/progress/test_install.py @@ -0,0 +1,7 @@ +def test_progress_tracking_installer_creates_workflow_runs_table(az_cli_ctx) -> None: + az_cli_ctx.progress_tracking_installer.run() + query = ( + f"SELECT 1 FROM tables WHERE table_catalog = '{az_cli_ctx.config.ucx_catalog}' " + "AND table_schema = 'multiworkspace' AND table_name = 'workflow_runs'" + ) + assert any(az_cli_ctx.sql_backend.fetch(query, catalog="system", schema="information_schema")) From e249dae6146e610ae4fac975955a0d47aa3207a4 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 09:44:26 +0200 Subject: [PATCH 36/65] Log when workflow run table is not found --- .../labs/ucx/progress/workflow_runs.py | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index e2566fbac0..ad872cb1fd 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -1,8 +1,13 @@ import datetime as dt +import logging from dataclasses import dataclass from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient +from databricks.sdk.errors import NotFound + + +logger = logging.getLogger(__name__) @dataclass(frozen=True, kw_only=True) @@ -70,9 +75,12 @@ def record(self, *, workflow_name: str) -> None: workflow_run_attempt=self._workflow_run_attempt, run_as="UNKOWN", # TODO Update this ) - self._sql_backend.save_table( - self._full_table_name, - [workflow_run], - WorkflowRun, - mode="append", - ) + try: + self._sql_backend.save_table( + self._full_table_name, + [workflow_run], + WorkflowRun, + mode="append", + ) + except NotFound as e: + logger.error(f"Workflow run table not found: {self._full_table_name}", exc_info=e) From 01f207a2af1791929b6a6d594ef151b7e5aa10c7 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 09:47:32 +0200 Subject: [PATCH 37/65] Test workflow run recorder records a record --- tests/integration/progress/test_workflow_runs.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 tests/integration/progress/test_workflow_runs.py diff --git a/tests/integration/progress/test_workflow_runs.py b/tests/integration/progress/test_workflow_runs.py new file mode 100644 index 0000000000..2bc83d0cc8 --- /dev/null +++ b/tests/integration/progress/test_workflow_runs.py @@ -0,0 +1,14 @@ +def test_workflow_run_recorder_records_workflow_run(az_cli_ctx, runtime_ctx) -> None: + az_cli_ctx.progress_tracking_installer.run() + query = f"SELECT 1 FROM {az_cli_ctx.ucx_catalog}.multiworkspace.workflow_runs" + assert not any(az_cli_ctx.sql_backend.fetch(query)) + + named_parameters = { + "job_id": "123", + "parent_run_id": "456", + "workflow_start_time": "2024-10-11T01:42:02.000000", + } + ctx = runtime_ctx.replace(named_parameters=named_parameters, ucx_catalog=az_cli_ctx.ucx_catalog) + ctx.workflow_run_recorder.record(workflow_name="test") + + assert any(az_cli_ctx.sql_backend.fetch(query)) From ea672e17979b8b734393ef71749b51c41a96ccdc Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 10:01:42 +0200 Subject: [PATCH 38/65] Make finished at timestamp more precise --- src/databricks/labs/ucx/progress/workflow_runs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index ad872cb1fd..740b32c2f0 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -67,7 +67,7 @@ def record(self, *, workflow_name: str) -> None: """ workflow_run = WorkflowRun( started_at=dt.datetime.fromisoformat(self._workflow_start_time), - finished_at=dt.datetime.now(), + finished_at=dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0), workspace_id=self._ws.get_workspace_id(), workflow_name=workflow_name, workflow_id=self._workflow_id, From ee194eadfab5e3722a8b2610128bd563c3b9f96b Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 10:02:05 +0200 Subject: [PATCH 39/65] Fix UNKNOWN typo --- src/databricks/labs/ucx/progress/workflow_runs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index 740b32c2f0..bbb7cb8215 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -73,7 +73,7 @@ def record(self, *, workflow_name: str) -> None: workflow_id=self._workflow_id, workflow_run_id=self._workflow_run_id, workflow_run_attempt=self._workflow_run_attempt, - run_as="UNKOWN", # TODO Update this + run_as="UNKNOWN", # TODO Update this ) try: self._sql_backend.save_table( From c60cc31cd9b8a231d86d419103c928551ad30aaa Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 10:02:17 +0200 Subject: [PATCH 40/65] Test workflow run values --- .../progress/test_workflow_runs.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/tests/integration/progress/test_workflow_runs.py b/tests/integration/progress/test_workflow_runs.py index 2bc83d0cc8..8b2985dbdf 100644 --- a/tests/integration/progress/test_workflow_runs.py +++ b/tests/integration/progress/test_workflow_runs.py @@ -1,14 +1,27 @@ +import datetime as dt + + def test_workflow_run_recorder_records_workflow_run(az_cli_ctx, runtime_ctx) -> None: az_cli_ctx.progress_tracking_installer.run() - query = f"SELECT 1 FROM {az_cli_ctx.ucx_catalog}.multiworkspace.workflow_runs" + query = f"SELECT * FROM {az_cli_ctx.ucx_catalog}.multiworkspace.workflow_runs" assert not any(az_cli_ctx.sql_backend.fetch(query)) + start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0) named_parameters = { "job_id": "123", "parent_run_id": "456", - "workflow_start_time": "2024-10-11T01:42:02.000000", + "workflow_start_time": start_time.isoformat(), } ctx = runtime_ctx.replace(named_parameters=named_parameters, ucx_catalog=az_cli_ctx.ucx_catalog) ctx.workflow_run_recorder.record(workflow_name="test") - assert any(az_cli_ctx.sql_backend.fetch(query)) + rows = list(az_cli_ctx.sql_backend.fetch(query)) + assert len(rows) == 1 + assert rows[0].started_at == start_time + assert start_time < rows[0].finished_at < dt.datetime.now(tz=dt.timezone.utc) + assert rows[0].workspace_id == ctx.workspace_client.get_workspace_id() + assert rows[0].workflow_name == "test" + assert rows[0].workflow_id == 123 + assert rows[0].workflow_run_id == 456 + assert rows[0].workflow_run_attempt == 0 + assert rows[0].run_as == "UNKNOWN" # TODO: Update this From 34975bfd6be738e5f1257f40e4051d2e5965ed70 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 10:02:34 +0200 Subject: [PATCH 41/65] Use non-escaping characters in UCX test catalog --- tests/integration/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b0c87d9293..19d8b5e592 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -445,7 +445,7 @@ def inventory_database(self) -> str: @cached_property def ucx_catalog(self) -> str: - return self._make_catalog(name=f"ucx-{self._make_random()}").name + return self._make_catalog(name=f"ucx_{self._make_random()}").name @cached_property def workspace_client(self) -> WorkspaceClient: From 2d830962443e1b829efc12cb845304e28b1ab7af Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 10:52:41 +0200 Subject: [PATCH 42/65] Test workflow runs is populated in the during real migration progress --- tests/integration/progress/test_workflows.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/integration/progress/test_workflows.py b/tests/integration/progress/test_workflows.py index 3b049f802b..11b1aad9a9 100644 --- a/tests/integration/progress/test_workflows.py +++ b/tests/integration/progress/test_workflows.py @@ -19,3 +19,7 @@ def test_running_real_migration_progress_job(installation_ctx: MockInstallationC # Run the migration-progress workflow until completion. installation_ctx.deployed_workflows.run_workflow("migration-progress-experimental") assert installation_ctx.deployed_workflows.validate_step("migration-progress-experimental") + + # Ensure that the migration-progress workflow populated the `workflow_runs` table. + query = f"SELECT 1 FROM {installation_ctx.ucx_catalog}.multiworkspace.workflow_runs" + assert any(installation_ctx.sql_backend.fetch(query)) From da602c989de2c83bfaefcd2f37a64dd44da8ea24 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 10:56:46 +0200 Subject: [PATCH 43/65] Use workflow from named parameters at init --- src/databricks/labs/ucx/contexts/workflow_task.py | 1 + src/databricks/labs/ucx/progress/workflow_runs.py | 12 +++++------- tests/integration/progress/test_workflow_runs.py | 3 ++- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index c5665c8f18..ce36119c09 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -117,6 +117,7 @@ def workflow_run_recorder(self): self.workspace_client, self.sql_backend, self.config.ucx_catalog, + workflow_name=self.named_parameters["workflow"], workflow_id=int(self.named_parameters["job_id"]), workflow_run_id=int(self.named_parameters["parent_run_id"]), workflow_run_attempt=int(self.named_parameters.get("attempt", 0)), diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index bbb7cb8215..68c26faf46 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -46,6 +46,7 @@ def __init__( sql_backend: SqlBackend, ucx_catalog: str, *, + workflow_name, workflow_id: int, workflow_run_id: int, workflow_run_attempt: int, @@ -54,22 +55,19 @@ def __init__( self._ws = ws self._sql_backend = sql_backend self._full_table_name = f"{ucx_catalog}.multiworkspace.workflow_runs" + self._workflow_name = workflow_name self._workflow_start_time = workflow_start_time self._workflow_id = workflow_id self._workflow_run_id = workflow_run_id self._workflow_run_attempt = workflow_run_attempt - def record(self, *, workflow_name: str) -> None: - """Record a workflow run in the database. - - Args: - workflow_name (str): The UCX internal workflow name. - """ + def record(self) -> None: + """Record a workflow run in the database.""" workflow_run = WorkflowRun( started_at=dt.datetime.fromisoformat(self._workflow_start_time), finished_at=dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0), workspace_id=self._ws.get_workspace_id(), - workflow_name=workflow_name, + workflow_name=self._workflow_name, workflow_id=self._workflow_id, workflow_run_id=self._workflow_run_id, workflow_run_attempt=self._workflow_run_attempt, diff --git a/tests/integration/progress/test_workflow_runs.py b/tests/integration/progress/test_workflow_runs.py index 8b2985dbdf..f8e63ddf3e 100644 --- a/tests/integration/progress/test_workflow_runs.py +++ b/tests/integration/progress/test_workflow_runs.py @@ -8,12 +8,13 @@ def test_workflow_run_recorder_records_workflow_run(az_cli_ctx, runtime_ctx) -> start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0) named_parameters = { + "workflow": "test", "job_id": "123", "parent_run_id": "456", "workflow_start_time": start_time.isoformat(), } ctx = runtime_ctx.replace(named_parameters=named_parameters, ucx_catalog=az_cli_ctx.ucx_catalog) - ctx.workflow_run_recorder.record(workflow_name="test") + ctx.workflow_run_recorder.record() rows = list(az_cli_ctx.sql_backend.fetch(query)) assert len(rows) == 1 From 950d2f0be28138000c2f9e83e990bad9263c998e Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 10:57:23 +0200 Subject: [PATCH 44/65] Add workspace id as task parameter --- src/databricks/labs/ucx/installer/workflows.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index df4dcf1d48..4fa0fb1628 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -62,6 +62,7 @@ "start_time": "{{job.start_time.iso_datetime}}", "attempt": "{{job.repair_count}}", "parent_run_id": "{{parent_run_id}}", + "workspace_id": "{{workspace.id}}", } DEBUG_NOTEBOOK = """# Databricks notebook source # MAGIC %md From 4be94542f8ad4b71633b87ba14ad0604131f8028 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 11:00:19 +0200 Subject: [PATCH 45/65] Pass new parameters to main --- src/databricks/labs/ucx/installer/workflows.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index 4fa0fb1628..452ca3eaaa 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -111,8 +111,10 @@ f'--task=' + dbutils.widgets.get('task'), f'--job_id=' + dbutils.widgets.get('job_id'), f'--run_id=' + dbutils.widgets.get('run_id'), + f'--start_time=' + dbutils.widgets.get('start_time'), f'--attempt=' + dbutils.widgets.get('attempt'), - f'--parent_run_id=' + dbutils.widgets.get('parent_run_id')) + f'--parent_run_id=' + dbutils.widgets.get('parent_run_id'), + f'--workspace_id=' + dbutils.widgets.get('workspace_id')) """ From 08aef94f96c1ba6e94c2dcf432b212d69fa11f13 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 11:03:24 +0200 Subject: [PATCH 46/65] Remove WorkflowRuns.run_as --- src/databricks/labs/ucx/progress/workflow_runs.py | 4 ---- tests/integration/progress/test_workflow_runs.py | 1 - 2 files changed, 5 deletions(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index 68c26faf46..26854f226f 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -33,9 +33,6 @@ class WorkflowRun: workflow_run_attempt: int """The workflow run attempt.""" - run_as: str - """The identity the workflow was run as`""" - class WorkflowRunRecorder: """Record workflow runs in a database.""" @@ -71,7 +68,6 @@ def record(self) -> None: workflow_id=self._workflow_id, workflow_run_id=self._workflow_run_id, workflow_run_attempt=self._workflow_run_attempt, - run_as="UNKNOWN", # TODO Update this ) try: self._sql_backend.save_table( diff --git a/tests/integration/progress/test_workflow_runs.py b/tests/integration/progress/test_workflow_runs.py index f8e63ddf3e..e81e5dafd3 100644 --- a/tests/integration/progress/test_workflow_runs.py +++ b/tests/integration/progress/test_workflow_runs.py @@ -25,4 +25,3 @@ def test_workflow_run_recorder_records_workflow_run(az_cli_ctx, runtime_ctx) -> assert rows[0].workflow_id == 123 assert rows[0].workflow_run_id == 456 assert rows[0].workflow_run_attempt == 0 - assert rows[0].run_as == "UNKNOWN" # TODO: Update this From d33986cfa1aae0184f021ffa656d3acbd05ff975 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 11:05:46 +0200 Subject: [PATCH 47/65] Get workspace id from job parameters --- src/databricks/labs/ucx/contexts/workflow_task.py | 2 +- src/databricks/labs/ucx/progress/workflow_runs.py | 6 +++--- tests/integration/progress/test_workflow_runs.py | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index ce36119c09..ce88f7987f 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -114,9 +114,9 @@ def task_run_warning_recorder(self): @cached_property def workflow_run_recorder(self): return WorkflowRunRecorder( - self.workspace_client, self.sql_backend, self.config.ucx_catalog, + workspace_id=int(self.named_parameters["workspace_id"]), workflow_name=self.named_parameters["workflow"], workflow_id=int(self.named_parameters["job_id"]), workflow_run_id=int(self.named_parameters["parent_run_id"]), diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index 26854f226f..560ccbd72b 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -39,19 +39,19 @@ class WorkflowRunRecorder: def __init__( self, - ws: WorkspaceClient, sql_backend: SqlBackend, ucx_catalog: str, *, + workspace_id: int, workflow_name, workflow_id: int, workflow_run_id: int, workflow_run_attempt: int, workflow_start_time: str, ): - self._ws = ws self._sql_backend = sql_backend self._full_table_name = f"{ucx_catalog}.multiworkspace.workflow_runs" + self._workspace_id = workspace_id self._workflow_name = workflow_name self._workflow_start_time = workflow_start_time self._workflow_id = workflow_id @@ -63,7 +63,7 @@ def record(self) -> None: workflow_run = WorkflowRun( started_at=dt.datetime.fromisoformat(self._workflow_start_time), finished_at=dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0), - workspace_id=self._ws.get_workspace_id(), + workspace_id=self._workspace_id, workflow_name=self._workflow_name, workflow_id=self._workflow_id, workflow_run_id=self._workflow_run_id, diff --git a/tests/integration/progress/test_workflow_runs.py b/tests/integration/progress/test_workflow_runs.py index e81e5dafd3..3a7d60f35a 100644 --- a/tests/integration/progress/test_workflow_runs.py +++ b/tests/integration/progress/test_workflow_runs.py @@ -8,6 +8,7 @@ def test_workflow_run_recorder_records_workflow_run(az_cli_ctx, runtime_ctx) -> start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0) named_parameters = { + "workspace_id": 123456789, "workflow": "test", "job_id": "123", "parent_run_id": "456", @@ -20,7 +21,7 @@ def test_workflow_run_recorder_records_workflow_run(az_cli_ctx, runtime_ctx) -> assert len(rows) == 1 assert rows[0].started_at == start_time assert start_time < rows[0].finished_at < dt.datetime.now(tz=dt.timezone.utc) - assert rows[0].workspace_id == ctx.workspace_client.get_workspace_id() + assert rows[0].workspace_id == 123456789 assert rows[0].workflow_name == "test" assert rows[0].workflow_id == 123 assert rows[0].workflow_run_id == 456 From 44f5f660728f8eafb8b5652402f593e72c4b4e3b Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 11:08:51 +0200 Subject: [PATCH 48/65] Set test local ucx catalog on installation context config --- tests/integration/conftest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 19d8b5e592..1b7e0265c3 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -949,6 +949,7 @@ def config(self) -> WorkspaceConfig: include_databases=self.created_databases, include_object_permissions=self.include_object_permissions, warehouse_id=self._env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"), + ucx_catalog=self.ucx_catalog, ) workspace_config = self.config_transform(workspace_config) self.installation.save(workspace_config) From 965a8fcb05172fd195d7814517d724eb1cabe5d0 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 11:17:00 +0200 Subject: [PATCH 49/65] Rename progress tracking installer to installation --- src/databricks/labs/ucx/cli.py | 2 +- src/databricks/labs/ucx/contexts/workspace_cli.py | 6 +++--- src/databricks/labs/ucx/progress/install.py | 2 +- tests/integration/progress/test_install.py | 2 +- tests/integration/progress/test_workflow_runs.py | 2 +- tests/unit/progress/test_install.py | 6 +++--- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index beafd6425a..b9b75b1640 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -611,7 +611,7 @@ def create_ucx_catalog(w: WorkspaceClient, prompts: Prompts, ctx: WorkspaceConte """ workspace_context = ctx or WorkspaceContext(w) workspace_context.catalog_schema.create_ucx_catalog(prompts) - workspace_context.progress_tracking_installer.run() + workspace_context.progress_tracking_installation.run() @ucx.command diff --git a/src/databricks/labs/ucx/contexts/workspace_cli.py b/src/databricks/labs/ucx/contexts/workspace_cli.py index c3bebeeeef..2c4830a863 100644 --- a/src/databricks/labs/ucx/contexts/workspace_cli.py +++ b/src/databricks/labs/ucx/contexts/workspace_cli.py @@ -18,7 +18,7 @@ from databricks.labs.ucx.azure.resources import AzureAPIClient, AzureResources from databricks.labs.ucx.contexts.application import CliContext from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex -from databricks.labs.ucx.progress.install import ProgressTrackingInstaller +from databricks.labs.ucx.progress.install import ProgressTrackingInstallation from databricks.labs.ucx.source_code.base import CurrentSessionState from databricks.labs.ucx.source_code.linters.context import LinterContext from databricks.labs.ucx.source_code.linters.files import LocalFileMigrator, LocalCodeLinter @@ -181,8 +181,8 @@ def notebook_loader(self) -> NotebookLoader: return NotebookLoader() @cached_property - def progress_tracking_installer(self) -> ProgressTrackingInstaller: - return ProgressTrackingInstaller(self.sql_backend, self.config.ucx_catalog) + def progress_tracking_installation(self) -> ProgressTrackingInstallation: + return ProgressTrackingInstallation(self.sql_backend, self.config.ucx_catalog) class LocalCheckoutContext(WorkspaceContext): diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py index 932fdc5d1b..193df72c50 100644 --- a/src/databricks/labs/ucx/progress/install.py +++ b/src/databricks/labs/ucx/progress/install.py @@ -7,7 +7,7 @@ logger = logging.getLogger(__name__) -class ProgressTrackingInstaller: +class ProgressTrackingInstallation: """Install resources for UCX's progress tracking.""" _SCHEMA = "multiworkspace" diff --git a/tests/integration/progress/test_install.py b/tests/integration/progress/test_install.py index d45e0b46a9..02f77d82b2 100644 --- a/tests/integration/progress/test_install.py +++ b/tests/integration/progress/test_install.py @@ -1,5 +1,5 @@ def test_progress_tracking_installer_creates_workflow_runs_table(az_cli_ctx) -> None: - az_cli_ctx.progress_tracking_installer.run() + az_cli_ctx.progress_tracking_installation.run() query = ( f"SELECT 1 FROM tables WHERE table_catalog = '{az_cli_ctx.config.ucx_catalog}' " "AND table_schema = 'multiworkspace' AND table_name = 'workflow_runs'" diff --git a/tests/integration/progress/test_workflow_runs.py b/tests/integration/progress/test_workflow_runs.py index 3a7d60f35a..98eeedf06b 100644 --- a/tests/integration/progress/test_workflow_runs.py +++ b/tests/integration/progress/test_workflow_runs.py @@ -2,7 +2,7 @@ def test_workflow_run_recorder_records_workflow_run(az_cli_ctx, runtime_ctx) -> None: - az_cli_ctx.progress_tracking_installer.run() + az_cli_ctx.progress_tracking_installation.run() query = f"SELECT * FROM {az_cli_ctx.ucx_catalog}.multiworkspace.workflow_runs" assert not any(az_cli_ctx.sql_backend.fetch(query)) diff --git a/tests/unit/progress/test_install.py b/tests/unit/progress/test_install.py index a62fae6e75..d7013c316c 100644 --- a/tests/unit/progress/test_install.py +++ b/tests/unit/progress/test_install.py @@ -1,14 +1,14 @@ -from databricks.labs.ucx.progress.install import ProgressTrackingInstaller +from databricks.labs.ucx.progress.install import ProgressTrackingInstallation def test_progress_tracking_installation_run_creates_progress_tracking_schema(mock_backend) -> None: - installation = ProgressTrackingInstaller(mock_backend, "ucx") + installation = ProgressTrackingInstallation(mock_backend, "ucx") installation.run() assert "CREATE SCHEMA IF NOT EXISTS ucx.multiworkspace" in mock_backend.queries[0] def test_progress_tracking_installation_run_creates_workflow_runs_table(mock_backend) -> None: - installation = ProgressTrackingInstaller(mock_backend, "ucx") + installation = ProgressTrackingInstallation(mock_backend, "ucx") installation.run() # Dataclass to schema conversion is tested within the lsql package assert any("CREATE TABLE IF NOT EXISTS" in query for query in mock_backend.queries) From e4e84fc324809af651396c2dbd90ac429c8057f1 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 11:23:51 +0200 Subject: [PATCH 50/65] Add ProgressTrackingInstallation to MockInstallationContext --- tests/integration/conftest.py | 7 ++++++- tests/integration/progress/test_workflow_runs.py | 16 +++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1b7e0265c3..c8ee042ca6 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -53,7 +53,7 @@ from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.install import WorkspaceInstallation, WorkspaceInstaller, AccountInstaller from databricks.labs.ucx.installer.workflows import WorkflowsDeployment - +from databricks.labs.ucx.progress.install import ProgressTrackingInstallation from databricks.labs.ucx.runtime import Workflows from databricks.labs.ucx.workspace_access.groups import MigratedGroup, GroupManager @@ -988,6 +988,11 @@ def workspace_installation(self): self.product_info, ) + @cached_property + def progress_tracking_installation(self) -> ProgressTrackingInstallation: + return ProgressTrackingInstallation(self.sql_backend, self.ucx_catalog) + + @cached_property def extend_prompts(self): return {} diff --git a/tests/integration/progress/test_workflow_runs.py b/tests/integration/progress/test_workflow_runs.py index 98eeedf06b..db81181a09 100644 --- a/tests/integration/progress/test_workflow_runs.py +++ b/tests/integration/progress/test_workflow_runs.py @@ -1,11 +1,8 @@ import datetime as dt -def test_workflow_run_recorder_records_workflow_run(az_cli_ctx, runtime_ctx) -> None: - az_cli_ctx.progress_tracking_installation.run() - query = f"SELECT * FROM {az_cli_ctx.ucx_catalog}.multiworkspace.workflow_runs" - assert not any(az_cli_ctx.sql_backend.fetch(query)) - +def test_workflow_run_recorder_records_workflow_run(installation_ctx) -> None: + """Ensure that the workflow run recorder records a workflow run""" start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0) named_parameters = { "workspace_id": 123456789, @@ -14,10 +11,15 @@ def test_workflow_run_recorder_records_workflow_run(az_cli_ctx, runtime_ctx) -> "parent_run_id": "456", "workflow_start_time": start_time.isoformat(), } - ctx = runtime_ctx.replace(named_parameters=named_parameters, ucx_catalog=az_cli_ctx.ucx_catalog) + ctx = installation_ctx.replace(named_parameters=named_parameters) + ctx.progress_tracking_installation.run() + select_workflow_runs_query = f"SELECT * FROM {ctx.ucx_catalog}.multiworkspace.workflow_runs" + # Be confident that we are not selecting any workflow runs before the to-be-tested code + assert not any(ctx.sql_backend.fetch(select_workflow_runs_query)) + ctx.workflow_run_recorder.record() - rows = list(az_cli_ctx.sql_backend.fetch(query)) + rows = list(ctx.sql_backend.fetch(select_workflow_runs_query)) assert len(rows) == 1 assert rows[0].started_at == start_time assert start_time < rows[0].finished_at < dt.datetime.now(tz=dt.timezone.utc) From 7070bba1c279655c22d663f2da3b06bd556c5b73 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 11:26:34 +0200 Subject: [PATCH 51/65] Format --- src/databricks/labs/ucx/progress/workflow_runs.py | 1 - tests/integration/conftest.py | 1 - 2 files changed, 2 deletions(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index 560ccbd72b..324630da18 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -3,7 +3,6 @@ from dataclasses import dataclass from databricks.labs.lsql.backends import SqlBackend -from databricks.sdk import WorkspaceClient from databricks.sdk.errors import NotFound diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c8ee042ca6..2fc3f47b08 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -992,7 +992,6 @@ def workspace_installation(self): def progress_tracking_installation(self) -> ProgressTrackingInstallation: return ProgressTrackingInstallation(self.sql_backend, self.ucx_catalog) - @cached_property def extend_prompts(self): return {} From fe943a01b1d67eb2db6d2cb3dbd8deac1c764824 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 11:31:43 +0200 Subject: [PATCH 52/65] Add unit test for workflow runs --- tests/unit/progress/test_workflow_runs.py | 31 +++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 tests/unit/progress/test_workflow_runs.py diff --git a/tests/unit/progress/test_workflow_runs.py b/tests/unit/progress/test_workflow_runs.py new file mode 100644 index 0000000000..2a30ac090a --- /dev/null +++ b/tests/unit/progress/test_workflow_runs.py @@ -0,0 +1,31 @@ +import datetime as dt + +from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder + + +def test_workflow_run_record_records_workflow_run(mock_backend) -> None: + """Ensure that the workflow run recorder records a workflow run""" + start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0) + workflow_run_recorder = WorkflowRunRecorder( + mock_backend, + ucx_catalog="ucx", + workspace_id=123456789, + workflow_name="workflow", + workflow_id=123, + workflow_run_id=456, + workflow_run_attempt=0, + workflow_start_time=start_time.isoformat(), + ) + + workflow_run_recorder.record() + + rows = mock_backend.rows_written_for("ucx.multiworkspace.workflow_runs", "append") + assert len(rows) == 1 + assert rows[0].started_at == start_time + assert start_time <= rows[0].finished_at <= dt.datetime.now(tz=dt.timezone.utc) + assert rows[0].workspace_id == 123456789 + assert rows[0].workflow_name == "workflow" + assert rows[0].workflow_id == 123 + assert rows[0].workflow_run_id == 456 + assert rows[0].workflow_run_attempt == 0 + From 64d8e43bc844e7f06b5f4d53908c9c6d8c229d90 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 11:31:54 +0200 Subject: [PATCH 53/65] Be saver with finished at test --- tests/integration/progress/test_workflow_runs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/progress/test_workflow_runs.py b/tests/integration/progress/test_workflow_runs.py index db81181a09..be866a4949 100644 --- a/tests/integration/progress/test_workflow_runs.py +++ b/tests/integration/progress/test_workflow_runs.py @@ -22,7 +22,7 @@ def test_workflow_run_recorder_records_workflow_run(installation_ctx) -> None: rows = list(ctx.sql_backend.fetch(select_workflow_runs_query)) assert len(rows) == 1 assert rows[0].started_at == start_time - assert start_time < rows[0].finished_at < dt.datetime.now(tz=dt.timezone.utc) + assert start_time <= rows[0].finished_at <= dt.datetime.now(tz=dt.timezone.utc) assert rows[0].workspace_id == 123456789 assert rows[0].workflow_name == "test" assert rows[0].workflow_id == 123 From ec2aada07c2fd39c767516f01fbb60048bacc412 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 11:35:05 +0200 Subject: [PATCH 54/65] Format README --- README.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 787fe09cd0..487aa93519 100644 --- a/README.md +++ b/README.md @@ -1029,11 +1029,16 @@ listed with the [`workflows` command](#workflows-command). databricks labs ucx update-migration-progress ``` -This command updates a subset of the inventory tables that are used to track workspace resources that need to be migrated. It does this by triggering the `migration-process-experimental` workflow to run on a workspace and waiting for it to complete. This can be used to ensure that dashboards and associated reporting are updated to reflect the current state of the workspace. +This command updates a subset of the inventory tables that are used to track workspace resources that need to be +migrated. It does this by triggering the `migration-process-experimental` workflow to run on a workspace and waiting for +it to complete. This can be used to ensure that dashboards and associated reporting are updated to reflect the current +state of the workspace. -_Note: Only a subset of the inventory is updated, *not* the complete inventory that is initially gathered by the [assessment workflow](#assessment-workflow)._ +_Note: Only a subset of the inventory is updated, *not* the complete inventory that is initially gathered by +the [assessment workflow](#assessment-workflow)._ -Workflows and their status can be listed with the [`workflows` command](#workflows-commandr), while failed workflows can be fixed with the [`repair-run` command](#repair-run-command). +Workflows and their status can be listed with the [`workflows` command](#workflows-commandr), while failed workflows can +be fixed with the [`repair-run` command](#repair-run-command). [[back to top](#databricks-labs-ucx)] From 3280e85645e1b200ee22e3edea0486d80800ba7d Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 11:47:31 +0200 Subject: [PATCH 55/65] Explain workflow runs in the README --- README.md | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 487aa93519..f6fd957fe7 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,7 @@ See [contributing instructions](CONTRIBUTING.md) to help improve this project. * [`table-migrated-to-uc`](#table-migrated-to-uc) * [`to-json-in-shared-clusters`](#to-json-in-shared-clusters) * [`unsupported-magic-line`](#unsupported-magic-line) + * [(EXPERIMENTAL) Migration Progress Workflow](#experimental-migration-progress-workflow) * [Utility commands](#utility-commands) * [`logs` command](#logs-command) * [`ensure-assessment-run` command](#ensure-assessment-run-command) @@ -126,7 +127,7 @@ See [contributing instructions](CONTRIBUTING.md) to help improve this project. * [`revert-cluster-remap` command](#revert-cluster-remap-command) * [`upload` command](#upload-command) * [`download` command](#download-command) - * [`join-collection` command](#join-collection command) + * [`join-collection` command](#join-collection-command) * [collection eligible command](#collection-eligible-command) * [Common Challenges and the Solutions](#common-challenges-and-the-solutions) * [Network Connectivity Issues](#network-connectivity-issues) @@ -994,6 +995,19 @@ This message indicates the code that could not be analysed by UCX. User must che [[back to top](#databricks-labs-ucx)] +## [EXPERIMENTAL] Migration Progress Workflow + +The `migration-process-experimental` workflow updates a subset of the inventory tables to track migration status of +workspace resources that need to be migrated. Besides updating the inventory tables, this workflow tracks the migration +progress by updating the following [UCX catalog](#create-ucx-catalog-command) tables: + +- `workflow_runs`: Tracks the status of the workflow runs. + +_Note: A subset of the inventory is updated, *not* the complete inventory that is initially gathered by +the [assessment workflow](#assessment-workflow)._ + +[[back to top](#databricks-labs-ucx)] + # Utility commands ## `logs` command @@ -1029,13 +1043,10 @@ listed with the [`workflows` command](#workflows-command). databricks labs ucx update-migration-progress ``` -This command updates a subset of the inventory tables that are used to track workspace resources that need to be -migrated. It does this by triggering the `migration-process-experimental` workflow to run on a workspace and waiting for -it to complete. This can be used to ensure that dashboards and associated reporting are updated to reflect the current -state of the workspace. - -_Note: Only a subset of the inventory is updated, *not* the complete inventory that is initially gathered by -the [assessment workflow](#assessment-workflow)._ +This command runs the [(experimental) migration progress workflow](#experimental-migration-progress-workflow) to update +the migration status of worksapce resources that need to be migrated. It does this by triggering +the `migration-process-experimental` workflow to run on a workspace and waiting for +it to complete. Workflows and their status can be listed with the [`workflows` command](#workflows-commandr), while failed workflows can be fixed with the [`repair-run` command](#repair-run-command). From b3146b167fb84a4e5ace15801b127cd8b6fa8635 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 12:05:29 +0200 Subject: [PATCH 56/65] Fix `workflow_start_time` should be `start_time` --- src/databricks/labs/ucx/contexts/workflow_task.py | 2 +- tests/integration/progress/test_workflow_runs.py | 2 +- tests/integration/progress/test_workflows.py | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index ce88f7987f..424bfc8ec8 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -121,5 +121,5 @@ def workflow_run_recorder(self): workflow_id=int(self.named_parameters["job_id"]), workflow_run_id=int(self.named_parameters["parent_run_id"]), workflow_run_attempt=int(self.named_parameters.get("attempt", 0)), - workflow_start_time=self.named_parameters["workflow_start_time"], + workflow_start_time=self.named_parameters["start_time"], ) diff --git a/tests/integration/progress/test_workflow_runs.py b/tests/integration/progress/test_workflow_runs.py index be866a4949..bad303de2e 100644 --- a/tests/integration/progress/test_workflow_runs.py +++ b/tests/integration/progress/test_workflow_runs.py @@ -9,7 +9,7 @@ def test_workflow_run_recorder_records_workflow_run(installation_ctx) -> None: "workflow": "test", "job_id": "123", "parent_run_id": "456", - "workflow_start_time": start_time.isoformat(), + "start_time": start_time.isoformat(), } ctx = installation_ctx.replace(named_parameters=named_parameters) ctx.progress_tracking_installation.run() diff --git a/tests/integration/progress/test_workflows.py b/tests/integration/progress/test_workflows.py index 11b1aad9a9..e8b0e8fc32 100644 --- a/tests/integration/progress/test_workflows.py +++ b/tests/integration/progress/test_workflows.py @@ -16,6 +16,9 @@ def test_running_real_migration_progress_job(installation_ctx: MockInstallationC installation_ctx.deployed_workflows.run_workflow("assessment") assert installation_ctx.deployed_workflows.validate_step("assessment") + # After the assessment, a user (maybe) installs the progress tracking + installation_ctx.progress_tracking_installation.run() + # Run the migration-progress workflow until completion. installation_ctx.deployed_workflows.run_workflow("migration-progress-experimental") assert installation_ctx.deployed_workflows.validate_step("migration-progress-experimental") From 950299159850f92811bc3a8008d146a581e995a9 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 12:15:04 +0200 Subject: [PATCH 57/65] Format --- src/databricks/labs/ucx/progress/workflow_runs.py | 2 +- tests/unit/progress/test_workflow_runs.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index 324630da18..66b4a81a57 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -42,7 +42,7 @@ def __init__( ucx_catalog: str, *, workspace_id: int, - workflow_name, + workflow_name: str, workflow_id: int, workflow_run_id: int, workflow_run_attempt: int, diff --git a/tests/unit/progress/test_workflow_runs.py b/tests/unit/progress/test_workflow_runs.py index 2a30ac090a..9ebd9527a4 100644 --- a/tests/unit/progress/test_workflow_runs.py +++ b/tests/unit/progress/test_workflow_runs.py @@ -28,4 +28,3 @@ def test_workflow_run_record_records_workflow_run(mock_backend) -> None: assert rows[0].workflow_id == 123 assert rows[0].workflow_run_id == 456 assert rows[0].workflow_run_attempt == 0 - From 57b7d8c367befc5d3e8e46004d9aa9b47c386ab1 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Fri, 27 Sep 2024 15:39:32 +0200 Subject: [PATCH 58/65] Get workspace id from workspace client --- src/databricks/labs/ucx/contexts/workflow_task.py | 2 +- src/databricks/labs/ucx/installer/workflows.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 424bfc8ec8..68e66cca80 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -116,7 +116,7 @@ def workflow_run_recorder(self): return WorkflowRunRecorder( self.sql_backend, self.config.ucx_catalog, - workspace_id=int(self.named_parameters["workspace_id"]), + workspace_id=self.workspace_client.get_workspace_id(), workflow_name=self.named_parameters["workflow"], workflow_id=int(self.named_parameters["job_id"]), workflow_run_id=int(self.named_parameters["parent_run_id"]), diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index 452ca3eaaa..945a79d861 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -62,7 +62,6 @@ "start_time": "{{job.start_time.iso_datetime}}", "attempt": "{{job.repair_count}}", "parent_run_id": "{{parent_run_id}}", - "workspace_id": "{{workspace.id}}", } DEBUG_NOTEBOOK = """# Databricks notebook source # MAGIC %md @@ -113,8 +112,7 @@ f'--run_id=' + dbutils.widgets.get('run_id'), f'--start_time=' + dbutils.widgets.get('start_time'), f'--attempt=' + dbutils.widgets.get('attempt'), - f'--parent_run_id=' + dbutils.widgets.get('parent_run_id'), - f'--workspace_id=' + dbutils.widgets.get('workspace_id')) + f'--parent_run_id=' + dbutils.widgets.get('parent_run_id')) """ From c8de2c968bcea52c9777b00cdf645eaf409ace23 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Tue, 1 Oct 2024 11:16:24 +0200 Subject: [PATCH 59/65] Fix typo's --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index f6fd957fe7..676739ec22 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ See [contributing instructions](CONTRIBUTING.md) to help improve this project. * [`table-migrated-to-uc`](#table-migrated-to-uc) * [`to-json-in-shared-clusters`](#to-json-in-shared-clusters) * [`unsupported-magic-line`](#unsupported-magic-line) - * [(EXPERIMENTAL) Migration Progress Workflow](#experimental-migration-progress-workflow) + * [[EXPERIMENTAL] Migration Progress Workflow](#experimental-migration-progress-workflow) * [Utility commands](#utility-commands) * [`logs` command](#logs-command) * [`ensure-assessment-run` command](#ensure-assessment-run-command) @@ -997,7 +997,7 @@ This message indicates the code that could not be analysed by UCX. User must che ## [EXPERIMENTAL] Migration Progress Workflow -The `migration-process-experimental` workflow updates a subset of the inventory tables to track migration status of +The `migration-progress-experimental` workflow updates a subset of the inventory tables to track migration status of workspace resources that need to be migrated. Besides updating the inventory tables, this workflow tracks the migration progress by updating the following [UCX catalog](#create-ucx-catalog-command) tables: @@ -1044,8 +1044,8 @@ databricks labs ucx update-migration-progress ``` This command runs the [(experimental) migration progress workflow](#experimental-migration-progress-workflow) to update -the migration status of worksapce resources that need to be migrated. It does this by triggering -the `migration-process-experimental` workflow to run on a workspace and waiting for +the migration status of workspace resources that need to be migrated. It does this by triggering +the `migration-progress-experimental` workflow to run on a workspace and waiting for it to complete. Workflows and their status can be listed with the [`workflows` command](#workflows-commandr), while failed workflows can From 32c1d5e1b3ac249ed796da65e1e24c36368239f5 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 2 Oct 2024 12:00:10 +0200 Subject: [PATCH 60/65] Add type hint for `WorkflowRunRecorder` --- src/databricks/labs/ucx/contexts/workflow_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 68e66cca80..715fb5c67d 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -112,7 +112,7 @@ def task_run_warning_recorder(self): ) @cached_property - def workflow_run_recorder(self): + def workflow_run_recorder(self) -> WorkflowRunRecorder: return WorkflowRunRecorder( self.sql_backend, self.config.ucx_catalog, From 931dfeb099213a3a87768fa974d8b43b33608b04 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 2 Oct 2024 12:02:26 +0200 Subject: [PATCH 61/65] Improve comment --- src/databricks/labs/ucx/progress/install.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/install.py b/src/databricks/labs/ucx/progress/install.py index 193df72c50..5f2ab69f7e 100644 --- a/src/databricks/labs/ucx/progress/install.py +++ b/src/databricks/labs/ucx/progress/install.py @@ -13,7 +13,7 @@ class ProgressTrackingInstallation: _SCHEMA = "multiworkspace" def __init__(self, sql_backend: SqlBackend, ucx_catalog: str) -> None: - # `mod` is required parameter, but it's not used in this context. + # `mod` is a required parameter, though, it's not used in this context without views. self._schema_deployer = SchemaDeployer(sql_backend, self._SCHEMA, mod=None, catalog=ucx_catalog) def run(self) -> None: From 92fb2fb0502588ddb576031af98fb7b6227eca83 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 2 Oct 2024 12:03:29 +0200 Subject: [PATCH 62/65] Improve docstring --- src/databricks/labs/ucx/progress/workflow_runs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/workflow_runs.py b/src/databricks/labs/ucx/progress/workflow_runs.py index 66b4a81a57..d92a59961b 100644 --- a/src/databricks/labs/ucx/progress/workflow_runs.py +++ b/src/databricks/labs/ucx/progress/workflow_runs.py @@ -58,7 +58,7 @@ def __init__( self._workflow_run_attempt = workflow_run_attempt def record(self) -> None: - """Record a workflow run in the database.""" + """Record a workflow run.""" workflow_run = WorkflowRun( started_at=dt.datetime.fromisoformat(self._workflow_start_time), finished_at=dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0), From 5adfc6a54d3dcecd98794b425a415cbffa3003e4 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 2 Oct 2024 12:09:33 +0200 Subject: [PATCH 63/65] Remove redundant argument --- src/databricks/labs/ucx/progress/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index c608f45f62..0b6bd8a41f 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -124,4 +124,4 @@ def refresh_table_migration_status(self, ctx: RuntimeContext) -> None: ) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" - ctx.workflow_run_recorder.record(self._name) + ctx.workflow_run_recorder.record() From b3f9be2217101e6234e8abaa0b3cd5ffee1864c7 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 2 Oct 2024 13:59:21 +0200 Subject: [PATCH 64/65] Add assert message --- tests/integration/progress/test_workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/progress/test_workflows.py b/tests/integration/progress/test_workflows.py index e8b0e8fc32..54c99db4f7 100644 --- a/tests/integration/progress/test_workflows.py +++ b/tests/integration/progress/test_workflows.py @@ -25,4 +25,4 @@ def test_running_real_migration_progress_job(installation_ctx: MockInstallationC # Ensure that the migration-progress workflow populated the `workflow_runs` table. query = f"SELECT 1 FROM {installation_ctx.ucx_catalog}.multiworkspace.workflow_runs" - assert any(installation_ctx.sql_backend.fetch(query)) + assert any(installation_ctx.sql_backend.fetch(query)), f"No workflow run captured: {query}" From d127cecedae783516ef3e4c12244a5dbedd67dd8 Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Wed, 2 Oct 2024 14:26:09 +0200 Subject: [PATCH 65/65] Get test workspace id from workspace client --- tests/integration/progress/test_workflow_runs.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/progress/test_workflow_runs.py b/tests/integration/progress/test_workflow_runs.py index bad303de2e..cc7c11a680 100644 --- a/tests/integration/progress/test_workflow_runs.py +++ b/tests/integration/progress/test_workflow_runs.py @@ -5,7 +5,6 @@ def test_workflow_run_recorder_records_workflow_run(installation_ctx) -> None: """Ensure that the workflow run recorder records a workflow run""" start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0) named_parameters = { - "workspace_id": 123456789, "workflow": "test", "job_id": "123", "parent_run_id": "456", @@ -23,7 +22,7 @@ def test_workflow_run_recorder_records_workflow_run(installation_ctx) -> None: assert len(rows) == 1 assert rows[0].started_at == start_time assert start_time <= rows[0].finished_at <= dt.datetime.now(tz=dt.timezone.utc) - assert rows[0].workspace_id == 123456789 + assert rows[0].workspace_id == installation_ctx.workspace_client.get_workspace_id() assert rows[0].workflow_name == "test" assert rows[0].workflow_id == 123 assert rows[0].workflow_run_id == 456