-
Notifications
You must be signed in to change notification settings - Fork 87
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add and populate UCX
workflow_runs
table (#2754)
## Changes Add and populate workflow runs table ### Linked issues Resolves #2600 ### Functionality - [x] added relevant user documentation - [x] modified existing workflow: `migration-process-experimental` ### Tests - [ ] manually tested - [x] added unit tests - [x] added integration tests ### TODO - [ ] Handle concurrent writes, [see](#2754 (comment)) - [x] Decide on getting workflow run status from `parse_log_task` --> only add it to the migration progress workflow for now
- Loading branch information
1 parent
ab14010
commit cb40b60
Showing
15 changed files
with
265 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
import logging | ||
|
||
from databricks.labs.lsql.backends import SqlBackend | ||
from databricks.labs.lsql.deployment import SchemaDeployer | ||
from databricks.labs.ucx.progress.workflow_runs import WorkflowRun | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ProgressTrackingInstallation: | ||
"""Install resources for UCX's progress tracking.""" | ||
|
||
_SCHEMA = "multiworkspace" | ||
|
||
def __init__(self, sql_backend: SqlBackend, ucx_catalog: str) -> None: | ||
# `mod` is a required parameter, though, it's not used in this context without views. | ||
self._schema_deployer = SchemaDeployer(sql_backend, self._SCHEMA, mod=None, catalog=ucx_catalog) | ||
|
||
def run(self) -> None: | ||
self._schema_deployer.deploy_schema() | ||
self._schema_deployer.deploy_table("workflow_runs", WorkflowRun) | ||
logger.info("Installation completed successfully!") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import datetime as dt | ||
import logging | ||
from dataclasses import dataclass | ||
|
||
from databricks.labs.lsql.backends import SqlBackend | ||
from databricks.sdk.errors import NotFound | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@dataclass(frozen=True, kw_only=True) | ||
class WorkflowRun: | ||
started_at: dt.datetime | ||
"""The timestamp of the workflow run start.""" | ||
|
||
finished_at: dt.datetime | ||
"""The timestamp of the workflow run end.""" | ||
|
||
workspace_id: int | ||
"""The workspace id in which the workflow ran.""" | ||
|
||
workflow_name: str | ||
"""The workflow name that ran.""" | ||
|
||
workflow_id: int | ||
""""The workflow id of the workflow that ran.""" | ||
|
||
workflow_run_id: int | ||
"""The workflow run id.""" | ||
|
||
workflow_run_attempt: int | ||
"""The workflow run attempt.""" | ||
|
||
|
||
class WorkflowRunRecorder: | ||
"""Record workflow runs in a database.""" | ||
|
||
def __init__( | ||
self, | ||
sql_backend: SqlBackend, | ||
ucx_catalog: str, | ||
*, | ||
workspace_id: int, | ||
workflow_name: str, | ||
workflow_id: int, | ||
workflow_run_id: int, | ||
workflow_run_attempt: int, | ||
workflow_start_time: str, | ||
): | ||
self._sql_backend = sql_backend | ||
self._full_table_name = f"{ucx_catalog}.multiworkspace.workflow_runs" | ||
self._workspace_id = workspace_id | ||
self._workflow_name = workflow_name | ||
self._workflow_start_time = workflow_start_time | ||
self._workflow_id = workflow_id | ||
self._workflow_run_id = workflow_run_id | ||
self._workflow_run_attempt = workflow_run_attempt | ||
|
||
def record(self) -> None: | ||
"""Record a workflow run.""" | ||
workflow_run = WorkflowRun( | ||
started_at=dt.datetime.fromisoformat(self._workflow_start_time), | ||
finished_at=dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0), | ||
workspace_id=self._workspace_id, | ||
workflow_name=self._workflow_name, | ||
workflow_id=self._workflow_id, | ||
workflow_run_id=self._workflow_run_id, | ||
workflow_run_attempt=self._workflow_run_attempt, | ||
) | ||
try: | ||
self._sql_backend.save_table( | ||
self._full_table_name, | ||
[workflow_run], | ||
WorkflowRun, | ||
mode="append", | ||
) | ||
except NotFound as e: | ||
logger.error(f"Workflow run table not found: {self._full_table_name}", exc_info=e) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
def test_progress_tracking_installer_creates_workflow_runs_table(az_cli_ctx) -> None: | ||
az_cli_ctx.progress_tracking_installation.run() | ||
query = ( | ||
f"SELECT 1 FROM tables WHERE table_catalog = '{az_cli_ctx.config.ucx_catalog}' " | ||
"AND table_schema = 'multiworkspace' AND table_name = 'workflow_runs'" | ||
) | ||
assert any(az_cli_ctx.sql_backend.fetch(query, catalog="system", schema="information_schema")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import datetime as dt | ||
|
||
|
||
def test_workflow_run_recorder_records_workflow_run(installation_ctx) -> None: | ||
"""Ensure that the workflow run recorder records a workflow run""" | ||
start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0) | ||
named_parameters = { | ||
"workflow": "test", | ||
"job_id": "123", | ||
"parent_run_id": "456", | ||
"start_time": start_time.isoformat(), | ||
} | ||
ctx = installation_ctx.replace(named_parameters=named_parameters) | ||
ctx.progress_tracking_installation.run() | ||
select_workflow_runs_query = f"SELECT * FROM {ctx.ucx_catalog}.multiworkspace.workflow_runs" | ||
# Be confident that we are not selecting any workflow runs before the to-be-tested code | ||
assert not any(ctx.sql_backend.fetch(select_workflow_runs_query)) | ||
|
||
ctx.workflow_run_recorder.record() | ||
|
||
rows = list(ctx.sql_backend.fetch(select_workflow_runs_query)) | ||
assert len(rows) == 1 | ||
assert rows[0].started_at == start_time | ||
assert start_time <= rows[0].finished_at <= dt.datetime.now(tz=dt.timezone.utc) | ||
assert rows[0].workspace_id == installation_ctx.workspace_client.get_workspace_id() | ||
assert rows[0].workflow_name == "test" | ||
assert rows[0].workflow_id == 123 | ||
assert rows[0].workflow_run_id == 456 | ||
assert rows[0].workflow_run_attempt == 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from databricks.labs.ucx.progress.install import ProgressTrackingInstallation | ||
|
||
|
||
def test_progress_tracking_installation_run_creates_progress_tracking_schema(mock_backend) -> None: | ||
installation = ProgressTrackingInstallation(mock_backend, "ucx") | ||
installation.run() | ||
assert "CREATE SCHEMA IF NOT EXISTS ucx.multiworkspace" in mock_backend.queries[0] | ||
|
||
|
||
def test_progress_tracking_installation_run_creates_workflow_runs_table(mock_backend) -> None: | ||
installation = ProgressTrackingInstallation(mock_backend, "ucx") | ||
installation.run() | ||
# Dataclass to schema conversion is tested within the lsql package | ||
assert any("CREATE TABLE IF NOT EXISTS" in query for query in mock_backend.queries) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import datetime as dt | ||
|
||
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder | ||
|
||
|
||
def test_workflow_run_record_records_workflow_run(mock_backend) -> None: | ||
"""Ensure that the workflow run recorder records a workflow run""" | ||
start_time = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0) | ||
workflow_run_recorder = WorkflowRunRecorder( | ||
mock_backend, | ||
ucx_catalog="ucx", | ||
workspace_id=123456789, | ||
workflow_name="workflow", | ||
workflow_id=123, | ||
workflow_run_id=456, | ||
workflow_run_attempt=0, | ||
workflow_start_time=start_time.isoformat(), | ||
) | ||
|
||
workflow_run_recorder.record() | ||
|
||
rows = mock_backend.rows_written_for("ucx.multiworkspace.workflow_runs", "append") | ||
assert len(rows) == 1 | ||
assert rows[0].started_at == start_time | ||
assert start_time <= rows[0].finished_at <= dt.datetime.now(tz=dt.timezone.utc) | ||
assert rows[0].workspace_id == 123456789 | ||
assert rows[0].workflow_name == "workflow" | ||
assert rows[0].workflow_id == 123 | ||
assert rows[0].workflow_run_id == 456 | ||
assert rows[0].workflow_run_attempt == 0 |
Oops, something went wrong.