diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 1fc1a7d5aa..7243567fdf 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -27,6 +27,7 @@ from databricks.labs.ucx.hive_metastore.udfs import Udf from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder from databricks.labs.ucx.progress.history import ProgressEncoder +from databricks.labs.ucx.progress.jobs import JobsProgressEncoder from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder # As with GlobalContext, service factories unavoidably have a lot of public methods. @@ -199,10 +200,10 @@ def grants_progress(self) -> ProgressEncoder[Grant]: @cached_property def jobs_progress(self) -> ProgressEncoder[JobInfo]: - return ProgressEncoder( + return JobsProgressEncoder( self.sql_backend, self.job_ownership, - JobInfo, + self.inventory_database, self.parent_run_id, self.workspace_id, self.config.ucx_catalog, diff --git a/src/databricks/labs/ucx/progress/jobs.py b/src/databricks/labs/ucx/progress/jobs.py index e69de29bb2..198139543c 100644 --- a/src/databricks/labs/ucx/progress/jobs.py +++ b/src/databricks/labs/ucx/progress/jobs.py @@ -0,0 +1,54 @@ +import collections +from dataclasses import replace +from functools import cached_property + +from databricks.labs.lsql.backends import SqlBackend + +from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership +from databricks.labs.ucx.progress.history import ProgressEncoder +from databricks.labs.ucx.progress.install import Historical +from databricks.labs.ucx.source_code.jobs import JobProblem + + +class JobsProgressEncoder(ProgressEncoder[JobInfo]): + + def __init__( + self, + sql_backend: SqlBackend, + ownership: JobOwnership, + inventory_database: str, + run_id: int, + workspace_id: int, + catalog: str, + schema: str = "multiworkspace", + table: str = "historical", + ) -> None: + super().__init__( + sql_backend, + ownership, + JobInfo, + run_id, + workspace_id, + catalog, + schema, + table, + ) + self._inventory_database = inventory_database + + @cached_property + def _job_problems(self) -> dict[int, list[str]]: + index = collections.defaultdict(list) + for row in self._sql_backend.fetch( + 'SELECT * FROM workflow_problems', + catalog='hive_metastore', + schema=self._inventory_database, + ): + job_problem = JobProblem(**row.asDict()) + failure = f'{job_problem.code}: {job_problem.task_key} task: {job_problem.path}: {job_problem.message}' + index[job_problem.job_id].append(failure) + return index + + def _encode_record_as_historical(self, record: JobInfo) -> Historical: + historical = super()._encode_record_as_historical(record) + failures = self._job_problems.get(int(record.job_id), []) + return replace(historical, failures=historical.failures + failures) diff --git a/tests/unit/progress/test_jobs.py b/tests/unit/progress/test_jobs.py new file mode 100644 index 0000000000..23c2aa1435 --- /dev/null +++ b/tests/unit/progress/test_jobs.py @@ -0,0 +1,66 @@ +from unittest.mock import create_autospec + +from databricks.labs.lsql import Row +from databricks.labs.lsql.backends import MockBackend + +from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo +from databricks.labs.ucx.progress.jobs import JobsProgressEncoder +from databricks.labs.ucx import __version__ + + +def test_jobs_progress_encoder() -> None: + common = { + 'message': 'some failure', + 'job_name': 'job_name', + 'start_line': 1, + 'start_col': 2, + 'end_line': 3, + 'end_col': 4, + } + sql_backend = MockBackend( + rows={ + "workflow_problems": [ + Row(job_id=1, code="cannot-autofix-table-reference", task_key="a", path="/some/path", **common), + Row(job_id=1, code="catalog-api-in-shared-clusters", task_key="b", path="/some/other", **common), + Row(job_id=2, code="catalog-api-in-shared-clusters", task_key="c", path="/x", **common), + ], + } + ) + job_ownership = create_autospec(JobOwnership) + job_ownership.owner_of.return_value = "some_owner" + jobs_progress_encoder = JobsProgressEncoder( + sql_backend, + job_ownership, + "inventory", + 2, + 3, + "ucx", + ) + + jobs_progress_encoder.append_inventory_snapshot( + [ + JobInfo( + job_id='1', + success=0, + failures='["some failure from config"]', + ) + ] + ) + + rows = sql_backend.rows_written_for('`ucx`.`multiworkspace`.`historical`', 'append') + assert rows == [ + Row( + workspace_id=3, + job_run_id=2, + object_type='JobInfo', + object_id=['1'], + data={'job_id': '1', 'success': '0'}, + failures=[ + 'some failure from config', + 'cannot-autofix-table-reference: a task: /some/path: some failure', + 'catalog-api-in-shared-clusters: b task: /some/other: some failure', + ], + owner='some_owner', + ucx_version=__version__, + ) + ]