Skip to content

Commit

Permalink
Combine static code analysis results with historical job snapshots
Browse files Browse the repository at this point in the history
Fix #3059
  • Loading branch information
nfx committed Oct 24, 2024
1 parent ea0217d commit 3e7f6a7
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 2 deletions.
5 changes: 3 additions & 2 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from databricks.labs.ucx.hive_metastore.udfs import Udf
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder

# As with GlobalContext, service factories unavoidably have a lot of public methods.
Expand Down Expand Up @@ -199,10 +200,10 @@ def grants_progress(self) -> ProgressEncoder[Grant]:

@cached_property
def jobs_progress(self) -> ProgressEncoder[JobInfo]:
return ProgressEncoder(
return JobsProgressEncoder(
self.sql_backend,
self.job_ownership,
JobInfo,
self.inventory_database,
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
Expand Down
54 changes: 54 additions & 0 deletions src/databricks/labs/ucx/progress/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import collections
from dataclasses import replace
from functools import cached_property

from databricks.labs.lsql.backends import SqlBackend

from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical
from databricks.labs.ucx.source_code.jobs import JobProblem


class JobsProgressEncoder(ProgressEncoder[JobInfo]):

def __init__(
self,
sql_backend: SqlBackend,
ownership: JobOwnership,
inventory_database: str,
run_id: int,
workspace_id: int,
catalog: str,
schema: str = "multiworkspace",
table: str = "historical",
) -> None:
super().__init__(
sql_backend,
ownership,
JobInfo,
run_id,
workspace_id,
catalog,
schema,
table,
)
self._inventory_database = inventory_database

@cached_property
def _job_problems(self) -> dict[int, list[str]]:
index = collections.defaultdict(list)
for row in self._sql_backend.fetch(
'SELECT * FROM workflow_problems',
catalog='hive_metastore',
schema=self._inventory_database,
):
job_problem = JobProblem(**row.asDict())
failure = f'{job_problem.code}: {job_problem.task_key} task: {job_problem.path}: {job_problem.message}'
index[job_problem.job_id].append(failure)
return index

def _encode_record_as_historical(self, record: JobInfo) -> Historical:
historical = super()._encode_record_as_historical(record)
failures = self._job_problems.get(int(record.job_id), [])
return replace(historical, failures=historical.failures + failures)
66 changes: 66 additions & 0 deletions tests/unit/progress/test_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from unittest.mock import create_autospec

from databricks.labs.lsql import Row
from databricks.labs.lsql.backends import MockBackend

from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
from databricks.labs.ucx import __version__


def test_jobs_progress_encoder() -> None:
common = {
'message': 'some failure',
'job_name': 'job_name',
'start_line': 1,
'start_col': 2,
'end_line': 3,
'end_col': 4,
}
sql_backend = MockBackend(
rows={
"workflow_problems": [
Row(job_id=1, code="cannot-autofix-table-reference", task_key="a", path="/some/path", **common),
Row(job_id=1, code="catalog-api-in-shared-clusters", task_key="b", path="/some/other", **common),
Row(job_id=2, code="catalog-api-in-shared-clusters", task_key="c", path="/x", **common),
],
}
)
job_ownership = create_autospec(JobOwnership)
job_ownership.owner_of.return_value = "some_owner"
jobs_progress_encoder = JobsProgressEncoder(
sql_backend,
job_ownership,
"inventory",
2,
3,
"ucx",
)

jobs_progress_encoder.append_inventory_snapshot(
[
JobInfo(
job_id='1',
success=0,
failures='["some failure from config"]',
)
]
)

rows = sql_backend.rows_written_for('`ucx`.`multiworkspace`.`historical`', 'append')
assert rows == [
Row(
workspace_id=3,
job_run_id=2,
object_type='JobInfo',
object_id=['1'],
data={'job_id': '1', 'success': '0'},
failures=[
'some failure from config',
'cannot-autofix-table-reference: a task: /some/path: some failure',
'catalog-api-in-shared-clusters: b task: /some/other: some failure',
],
owner='some_owner',
ucx_version=__version__,
)
]

0 comments on commit 3e7f6a7

Please sign in to comment.