Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assess source code as part of the assessment #2678

Merged
merged 13 commits into from
Sep 23, 2024
Merged
12 changes: 12 additions & 0 deletions src/databricks/labs/ucx/assessment/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ def crawl_groups(self, ctx: RuntimeContext):
"""Scans all groups for the local group migration scope"""
ctx.group_manager.snapshot()

@job_task
def assess_dashboards(self, ctx: RuntimeContext):
"""Scans all dashboards for migration issues in SQL code of embedded widgets.
Also stores direct filesystem accesses for display in the migration dashboard."""
ctx.query_linter.refresh_report(ctx.sql_backend, ctx.inventory_database)

@job_task
def assess_workflows(self, ctx: RuntimeContext):
"""Scans all jobs for migration issues in notebooks.
Also stores direct filesystem accesses for display in the migration dashboard."""
ctx.workflow_linter.refresh_report(ctx.sql_backend, ctx.inventory_database)


class Failing(Workflow):
def __init__(self):
Expand Down
3 changes: 3 additions & 0 deletions src/databricks/labs/ucx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class WorkspaceConfig: # pylint: disable=too-many-instance-attributes
# [INTERNAL ONLY] Whether the assessment should capture only specific object permissions.
include_object_permissions: list[str] | None = None

# [INTERNAL ONLY] Whether the assessment should lint only specific dashboards.
include_dashboard_ids: list[str] | None = None

def replace_inventory_variable(self, text: str) -> str:
return text.replace("$inventory", f"hive_metastore.{self.inventory_database}")

Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ def query_linter(self):
self.workspace_client,
TableMigrationIndex([]), # TODO: bring back self.tables_migrator.index()
self.directfs_access_crawler_for_queries,
self.config.include_dashboard_ids,
)

@cached_property
Expand Down
24 changes: 17 additions & 7 deletions src/databricks/labs/ucx/source_code/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ def __init__(
ws: WorkspaceClient,
migration_index: TableMigrationIndex,
directfs_crawler: DirectFsAccessCrawler,
include_dashboard_ids: list[str] | None,
):
self._ws = ws
self._migration_index = migration_index
self._directfs_crawler = directfs_crawler
self._include_dashboard_ids = include_dashboard_ids

def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
assessment_start = datetime.now(timezone.utc)
Expand All @@ -53,16 +55,12 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
all_problems: list[QueryProblem] = []
all_dfsas: list[DirectFsAccess] = []
# first lint and collect queries from dashboards
for dashboard in all_dashboards:
if not dashboard.id:
continue
dashboard = self._ws.dashboards.get(dashboard_id=dashboard.id)
for dashboard_id in self._dashboard_ids_in_scope():
dashboard = self._ws.dashboards.get(dashboard_id=dashboard_id)
problems, dfsas = self._lint_and_collect_from_dashboard(dashboard, linted_queries)
all_problems.extend(problems)
all_dfsas.extend(dfsas)
for query in self._ws.queries_legacy.list():
if query.id is None:
continue
for query in self._queries_in_scope():
if query.id in linted_queries:
continue
linted_queries.add(query.id)
Expand All @@ -88,6 +86,18 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str):
]
self._directfs_crawler.dump_all(all_dfsas)

def _dashboard_ids_in_scope(self) -> list[str]:
if self._include_dashboard_ids is not None: # an empty list is accepted
return self._include_dashboard_ids
all_dashboards = self._ws.dashboards.list()
return [dashboard.id for dashboard in all_dashboards if dashboard.id]

def _queries_in_scope(self):
if self._include_dashboard_ids is not None: # an empty list is accepted
return []
all_queries = self._ws.queries_legacy.list()
return [query for query in all_queries if query.id]

def _lint_and_collect_from_dashboard(
self, dashboard: Dashboard, linted_queries: set[str]
) -> tuple[Iterable[QueryProblem], Iterable[DirectFsAccess]]:
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/assessment/test_ext_hms.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dataclasses
import datetime as dt
import io

from databricks.labs.lsql.backends import CommandExecutionBackend
from databricks.sdk.service.iam import PermissionLevel
Expand All @@ -11,6 +12,9 @@ def test_running_real_assessment_job_ext_hms(
env_or_skip,
make_cluster_policy,
make_cluster_policy_permissions,
make_notebook,
make_job,
make_dashboard,
):
cluster_id = env_or_skip('TEST_EXT_HMS_CLUSTER_ID')
ext_hms_ctx = installation_ctx.replace(
Expand Down Expand Up @@ -39,6 +43,12 @@ def test_running_real_assessment_job_ext_hms(

# Under ideal circumstances this can take 10-16 minutes (depending on whether there are compute instances available
# via the integration pool). Allow some margin to reduce spurious failures.
notebook_path = make_notebook(content=io.BytesIO(b"import xyz"))
job = make_job(notebook_path=notebook_path)
installation_ctx.config.include_job_ids = [job.job_id]

dashboard = make_dashboard()
installation_ctx.config.include_dashboard_ids = [dashboard.id]
ext_hms_ctx.deployed_workflows.run_workflow("assessment", max_wait=dt.timedelta(minutes=25))

# assert the workflow is successful. the tasks on sql warehouse will fail so skip checking them
Expand Down
18 changes: 14 additions & 4 deletions tests/integration/assessment/test_workflows.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
from datetime import timedelta

from databricks.sdk.errors import NotFound, InvalidParameterValue
Expand All @@ -6,19 +7,28 @@


@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=8))
def test_running_real_assessment_job(ws, installation_ctx, make_cluster_policy, make_cluster_policy_permissions):
ws_group_a, _ = installation_ctx.make_ucx_group()
def test_running_real_assessment_job(
ws, installation_ctx, make_cluster_policy, make_cluster_policy_permissions, make_job, make_notebook, make_dashboard
):
ws_group, _ = installation_ctx.make_ucx_group()
cluster_policy = make_cluster_policy()
make_cluster_policy_permissions(
object_id=cluster_policy.policy_id,
permission_level=PermissionLevel.CAN_USE,
group_name=ws_group_a.display_name,
group_name=ws_group.display_name,
)
installation_ctx.__dict__['include_object_permissions'] = [f"cluster-policies:{cluster_policy.policy_id}"]
installation_ctx.workspace_installation.run()

notebook_path = make_notebook(content=io.BytesIO(b"import xyz"))
job = make_job(notebook_path=notebook_path)
installation_ctx.config.include_job_ids = [job.job_id]

dashboard = make_dashboard()
installation_ctx.config.include_dashboard_ids = [dashboard.id]

installation_ctx.deployed_workflows.run_workflow("assessment")
assert installation_ctx.deployed_workflows.validate_step("assessment")

after = installation_ctx.generic_permissions_support.load_as_dict("cluster-policies", cluster_policy.policy_id)
assert after[ws_group_a.display_name] == PermissionLevel.CAN_USE
assert after[ws_group.display_name] == PermissionLevel.CAN_USE
2 changes: 1 addition & 1 deletion tests/integration/source_code/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
def test_query_linter_lints_queries_and_stores_dfsas(simple_ctx, ws, sql_backend, make_query, make_dashboard):
query = make_query(sql_query="SELECT * from csv.`dbfs://some_folder/some_file.csv`")
_dashboard = make_dashboard(query=query)
linter = QueryLinter(ws, TableMigrationIndex([]), simple_ctx.directfs_access_crawler_for_queries)
linter = QueryLinter(ws, TableMigrationIndex([]), simple_ctx.directfs_access_crawler_for_queries, None)
linter.refresh_report(sql_backend, simple_ctx.inventory_database)
all_problems = sql_backend.fetch("SELECT * FROM query_problems", schema=simple_ctx.inventory_database)
problems = [row for row in all_problems if row["query_name"] == query.name]
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/source_code/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_query_linter_collects_dfsas_from_queries(name, query, dfsa_paths, is_re
ws = create_autospec(WorkspaceClient)
crawlers = create_autospec(DirectFsAccessCrawler)
query = LegacyQuery.from_dict({"parent": "workspace", "name": name, "query": query})
linter = QueryLinter(ws, migration_index, crawlers)
linter = QueryLinter(ws, migration_index, crawlers, None)
dfsas = linter.collect_dfsas_from_query(query)
ws.assert_not_called()
crawlers.assert_not_called()
Expand Down
Loading