diff --git a/README.md b/README.md index e7a75f7363..93059d248b 100644 --- a/README.md +++ b/README.md @@ -396,6 +396,9 @@ which can be used for further analysis and decision-making through the [assessme 9. `assess_pipelines`: This task scans through all the Pipelines and identifies those pipelines that have Azure Service Principals embedded in their configurations. A list of all the pipelines with matching configurations is stored in the `$inventory.pipelines` table. 10. `assess_azure_service_principals`: This task scans through all the clusters configurations, cluster policies, job cluster configurations, Pipeline configurations, and Warehouse configuration and identifies all the Azure Service Principals who have been given access to the Azure storage accounts via spark configurations referred in those entities. The list of all the Azure Service Principals referred in those configurations is saved in the `$inventory.azure_service_principals` table. 11. `assess_global_init_scripts`: This task scans through all the global init scripts and identifies if there is an Azure Service Principal who has been given access to the Azure storage accounts via spark configurations referred in those scripts. +12. `assess_dashboards`: This task scans through all the dashboards and analyzes embedded queries for migration problems. It also collects direct filesystem access patterns that require attention. +13. `assess_workflows`: This task scans through all the jobs and tasks and analyzes notebooks and files for migration problems. It also collects direct filesystem access patterns that require attention. + ![report](docs/assessment-report.png) @@ -711,11 +714,16 @@ in the Migration dashboard. > Please note that this is an experimental workflow. -The `experimental-workflow-linter` workflow lints accessible code belonging to all workflows/jobs present in the -workspace. The linting emits problems indicating what to resolve for making the code Unity Catalog compatible. +The `experimental-workflow-linter` workflow lints accessible code from 2 sources: + - all workflows/jobs present in the workspace + - all dashboards/queries present in the workspace +The linting emits problems indicating what to resolve for making the code Unity Catalog compatible. +The linting also locates direct filesystem access that need to be migrated. -Once the workflow completes, the output will be stored in `$inventory_database.workflow_problems` table, and displayed -in the Migration dashboard. +Once the workflow completes: + - problems are stored in the `$inventory_database.workflow_problems`/`$inventory_database.query_problems` table + - direct filesystem access are stored in the `$inventory_database.directfs_in_paths`/`$inventory_database.directfs_in_queries` table + - all the above are displayed in the Migration dashboard. ![code compatibility problems](docs/code_compatibility_problems.png) diff --git a/src/databricks/labs/ucx/queries/assessment/main/35_0_code_compatibility_problems.md b/src/databricks/labs/ucx/queries/assessment/main/35_0_code_compatibility_problems.md new file mode 100644 index 0000000000..851d95a766 --- /dev/null +++ b/src/databricks/labs/ucx/queries/assessment/main/35_0_code_compatibility_problems.md @@ -0,0 +1,8 @@ +## Code compatibility problems + +The tables below assist with verifying if workflows and dashboards are Unity Catalog compatible. It can be filtered on the path, +problem code and workflow name. +Each row: +- Points to a problem detected in the code using the code path, query or workflow & task reference and start/end line & column; +- Explains the problem with a human-readable message and a code. + diff --git a/src/databricks/labs/ucx/queries/assessment/main/35_1_code_compatibility_problems_in_workflows.sql b/src/databricks/labs/ucx/queries/assessment/main/35_1_code_compatibility_problems_in_workflows.sql new file mode 100644 index 0000000000..64a7e956a5 --- /dev/null +++ b/src/databricks/labs/ucx/queries/assessment/main/35_1_code_compatibility_problems_in_workflows.sql @@ -0,0 +1,35 @@ +/* +--title 'Workflow migration problems' +--width 6 +--overrides '{"spec":{ + "encodings":{ + "columns": [ + {"fieldName": "path", "booleanValues": ["false", "true"], "linkUrlTemplate": "/#workspace/{{ link }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "type": "string", "displayAs": "link", "title": "path"}, + {"fieldName": "code", "booleanValues": ["false", "true"], "type": "string", "displayAs": "string", "title": "code"}, + {"fieldName": "message", "booleanValues": ["false", "true"], "type": "string", "displayAs": "string", "title": "message"}, + {"fieldName": "workflow_name", "booleanValues": ["false", "true"], "linkUrlTemplate": "/jobs/{{ workflow_id }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "type": "string", "displayAs": "link", "title": "workflow_name"}, + {"fieldName": "task_key", "booleanValues": ["false", "true"], "imageUrlTemplate": "{{ @ }}", "linkUrlTemplate": "/jobs/{{ workflow_id }}/tasks/{{ @ }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "type": "string", "displayAs": "link", "title": "task_key"}, + {"fieldName": "start_line", "booleanValues": ["false", "true"], "type": "integer", "displayAs": "number", "title": "start_line"}, + {"fieldName": "start_col", "booleanValues": ["false", "true"], "type": "integer", "displayAs": "number", "title": "start_col"}, + {"fieldName": "end_line", "booleanValues": ["false", "true"], "type": "integer", "displayAs": "number", "title": "end_line"}, + {"fieldName": "end_col", "booleanValues": ["false", "true"], "type": "integer", "displayAs": "number", "title": "end_col"} + ]}, + "invisibleColumns": [ + {"name": "link", "booleanValues": ["false", "true"], "linkUrlTemplate": "{{ @ }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "type": "string", "displayAs": "link", "title": "link"}, + {"name": "workflow_id", "booleanValues": ["false", "true"], "linkUrlTemplate": "{{ @ }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "type": "string", "displayAs": "link", "title": "workflow_id"} + ] + }}' +*/ +SELECT + substring_index(path, '@databricks.com/', -1) as path, + path as link, + code, + message, + job_id AS workflow_id, + job_name AS workflow_name, + task_key, + start_line, + start_col, + end_line, + end_col +FROM inventory.workflow_problems diff --git a/src/databricks/labs/ucx/queries/assessment/main/35_2_code_compatibility_problems_in_dashboards.sql b/src/databricks/labs/ucx/queries/assessment/main/35_2_code_compatibility_problems_in_dashboards.sql new file mode 100644 index 0000000000..a918e9682c --- /dev/null +++ b/src/databricks/labs/ucx/queries/assessment/main/35_2_code_compatibility_problems_in_dashboards.sql @@ -0,0 +1,29 @@ +/* +--title 'Dashboard compatibility problems' +--width 6 +--overrides '{"spec":{ + "encodings":{ + "columns": [ + {"fieldName": "code", "booleanValues": ["false", "true"], "type": "string", "displayAs": "string", "title": "code"}, + {"fieldName": "message", "booleanValues": ["false", "true"], "type": "string", "displayAs": "string", "title": "message"}, + {"fieldName": "dashboard_name", "booleanValues": ["false", "true"], "linkUrlTemplate": "/sql/dashboards/{{ dashboard_id }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "type": "string", "displayAs": "link", "title": "Dashboard"}, + {"fieldName": "query_name", "booleanValues": ["false", "true"], "linkUrlTemplate": "/sql/editor/{{ query_id }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "type": "string", "displayAs": "link", "title": "Query"} + ]}, + "invisibleColumns": [ + {"name": "dashboard_parent", "booleanValues": ["false", "true"], "linkUrlTemplate": "{{ @ }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "type": "string", "displayAs": "link", "title": "dashboard_parent"}, + {"name": "dashboard_id", "booleanValues": ["false", "true"], "linkUrlTemplate": "{{ @ }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "type": "string", "displayAs": "link", "title": "dashboard_id"}, + {"name": "query_parent", "booleanValues": ["false", "true"], "linkUrlTemplate": "{{ @ }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "type": "string", "displayAs": "link", "title": "query_parent"}, + {"name": "query_id", "booleanValues": ["false", "true"], "linkUrlTemplate": "{{ @ }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "type": "string", "displayAs": "link", "title": "query_id"} + ] + }}' +*/ +SELECT + dashboard_id, + dashboard_parent, + dashboard_name, + query_id, + query_parent, + query_name, + code, + message +FROM inventory.query_problems diff --git a/src/databricks/labs/ucx/queries/assessment/main/36_0_direct_filesystem_access_problems.md b/src/databricks/labs/ucx/queries/assessment/main/36_0_direct_filesystem_access_problems.md new file mode 100644 index 0000000000..3f5b255681 --- /dev/null +++ b/src/databricks/labs/ucx/queries/assessment/main/36_0_direct_filesystem_access_problems.md @@ -0,0 +1,14 @@ +--- +height: 4 +--- + +# Direct filesystem access problems + +The table below assists with verifying if workflows and dashboards require direct filesystem access. +As a reminder, `dbfs:/` is not supported in Unity Catalog, and more generally direct filesystem access is discouraged. +Rather, data should be accessed via Unity tables. + +Each row: +- Points to a direct filesystem access detected in the code using the code path, query or workflow & task reference and start/end line & column; +- Provides the _lineage_ i.e. which `workflow -> task -> notebook...` execution sequence leads to that access. + diff --git a/src/databricks/labs/ucx/queries/assessment/main/36_1_direct_filesystem_accesses.sql b/src/databricks/labs/ucx/queries/assessment/main/36_1_direct_filesystem_accesses.sql new file mode 100644 index 0000000000..b038dbfbc8 --- /dev/null +++ b/src/databricks/labs/ucx/queries/assessment/main/36_1_direct_filesystem_accesses.sql @@ -0,0 +1,62 @@ +/* +--title 'Direct filesystem access problems' +--width 6 +--overrides '{"spec":{ + "encodings":{ + "columns": [ + {"fieldName": "location", "title": "location", "type": "string", "displayAs": "string", "booleanValues": ["false", "true"]}, + {"fieldName": "is_read", "title": "is_read", "type": "boolean", "displayAs": "boolean", "booleanValues": ["false", "true"]}, + {"fieldName": "is_write", "title": "is_write", "type": "boolean", "displayAs": "boolean", "booleanValues": ["false", "true"]}, + {"fieldName": "source", "title": "source", "type": "string", "displayAs": "link", "linkUrlTemplate": "{{ source_link }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "booleanValues": ["false", "true"]}, + {"fieldName": "timestamp", "title": "last_modified", "type": "datetime", "displayAs": "datetime", "dateTimeFormat": "ll LTS (z)", "booleanValues": ["false", "true"]}, + {"fieldName": "lineage", "title": "lineage", "type": "string", "displayAs": "link", "linkUrlTemplate": "{{ lineage_link }}", "linkTextTemplate": "{{ @ }}", "linkTitleTemplate": "{{ @ }}", "linkOpenInNewTab": true, "booleanValues": ["false", "true"]}, + {"fieldName": "lineage_data", "title": "lineage_data", "type": "complex", "displayAs": "json", "booleanValues": ["false", "true"]}, + {"fieldName": "assessment_start", "title": "assessment_start", "type": "datetime", "displayAs": "datetime", "dateTimeFormat": "ll LTS (z)", "booleanValues": ["false", "true"]}, + {"fieldName": "assessment_end", "title": "assessment_end", "type": "datetime", "displayAs": "datetime", "dateTimeFormat": "ll LTS (z)", "booleanValues": ["false", "true"]} + ]}, + "invisibleColumns": [ + {"fieldName": "source_link", "title": "source_link", "type": "string", "displayAs": "string", "booleanValues": ["false", "true"]}, + {"fieldName": "lineage_type", "title": "lineage_type", "type": "string", "displayAs": "string", "booleanValues": ["false", "true"]}, + {"fieldName": "lineage_id", "title": "lineage_id", "type": "string", "displayAs": "string", "booleanValues": ["false", "true"]}, + {"fieldName": "lineage_link", "title": "lineage_link", "type": "string", "displayAs": "string", "booleanValues": ["false", "true"]} + ] + }}' +*/ +SELECT + path as location, + is_read, + is_write, + if( startswith(source_id, '/'), substring_index(source_id, '@databricks.com/', -1), split_part(source_id, '/', 2)) as source, + if( startswith(source_id, '/'), concat('/#workspace/', source_id), concat('/sql/editor/', split_part(source_id, '/', 2))) as source_link, + source_timestamp as `timestamp`, + case + when lineage.object_type = 'WORKFLOW' then concat('Workflow: ', lineage.other.name) + when lineage.object_type = 'TASK' then concat('Task: ', split_part(lineage.object_id, '/', 2)) + when lineage.object_type = 'NOTEBOOK' then concat('Notebook: ', substring_index(lineage.object_id, '@databricks.com/', -1)) + when lineage.object_type = 'FILE' then concat('File: ', substring_index(lineage.object_id, '@databricks.com/', -1)) + when lineage.object_type = 'DASHBOARD' then concat('Dashboard: ', lineage.other.name) + when lineage.object_type = 'QUERY' then concat('Query: ', lineage.other.name) + end as lineage, + lineage.object_type as lineage_type, + lineage.object_id as lineage_id, + case + when lineage.object_type = 'WORKFLOW' then concat('/jobs/', lineage.object_id) + when lineage.object_type = 'TASK' then concat('/jobs/', split_part(lineage.object_id, '/', 1), '/tasks/', split_part(lineage.object_id, '/', 2)) + when lineage.object_type = 'NOTEBOOK' then concat('/#workspace/', lineage.object_id) + when lineage.object_type = 'FILE' then concat('/#workspace/', lineage.object_id) + when lineage.object_type = 'DASHBOARD' then concat('/sql/dashboards/', lineage.object_id) + when lineage.object_type = 'QUERY' then concat('/sql/editor/', split_part(lineage.object_id, '/', 2)) + end as lineage_link, + lineage.other as lineage_data, + assessment_start, + assessment_end +from (SELECT + path, + is_read, + is_write, + source_id, + source_timestamp, + explode(source_lineage) as lineage, + assessment_start_timestamp as assessment_start, + assessment_end_timestamp as assessment_end +FROM inventory.directfs) diff --git a/src/databricks/labs/ucx/source_code/graph.py b/src/databricks/labs/ucx/source_code/graph.py index 347690f067..a722e9ef3c 100644 --- a/src/databricks/labs/ucx/source_code/graph.py +++ b/src/databricks/labs/ucx/source_code/graph.py @@ -337,7 +337,8 @@ def __repr__(self): @property def lineage(self) -> list[LineageAtom]: - return [LineageAtom(object_type="PATH", object_id=str(self.path))] + object_type = "NOTEBOOK" if is_a_notebook(self.path) else "FILE" + return [LineageAtom(object_type=object_type, object_id=str(self.path))] class SourceContainer(abc.ABC): diff --git a/src/databricks/labs/ucx/source_code/jobs.py b/src/databricks/labs/ucx/source_code/jobs.py index b5a4e47991..eb50b7ef31 100644 --- a/src/databricks/labs/ucx/source_code/jobs.py +++ b/src/databricks/labs/ucx/source_code/jobs.py @@ -86,8 +86,8 @@ def __repr__(self): @property def lineage(self) -> list[LineageAtom]: job_name = (None if self._job.settings is None else self._job.settings.name) or "unknown job" - job_lineage = LineageAtom("JOB", str(self._job.job_id), {"name": job_name}) - task_lineage = LineageAtom("TASK", self._task.task_key) + job_lineage = LineageAtom("WORKFLOW", str(self._job.job_id), {"name": job_name}) + task_lineage = LineageAtom("TASK", f"{self._job.job_id}/{self._task.task_key}") return [job_lineage, task_lineage] @@ -469,8 +469,8 @@ def _collect_task_dfsas( job_name = job.settings.name if job.settings and job.settings.name else "" for dfsa in DfsaCollectorWalker(graph, set(), self._path_lookup, session_state): atoms = [ - LineageAtom(object_type="JOB", object_id=job_id, other={"name": job_name}), - LineageAtom(object_type="TASK", object_id=task.task_key), + LineageAtom(object_type="WORKFLOW", object_id=job_id, other={"name": job_name}), + LineageAtom(object_type="TASK", object_id=f"{job_id}/{task.task_key}"), ] yield dataclasses.replace(dfsa, source_lineage=atoms + dfsa.source_lineage) diff --git a/src/databricks/labs/ucx/source_code/queries.py b/src/databricks/labs/ucx/source_code/queries.py index 5bd11b1bc7..26a2c14a26 100644 --- a/src/databricks/labs/ucx/source_code/queries.py +++ b/src/databricks/labs/ucx/source_code/queries.py @@ -66,7 +66,7 @@ def refresh_report(self, sql_backend: SqlBackend, inventory_database: str): linted_queries.add(query.id) problems = self.lint_query(query) all_problems.extend(problems) - dfsas = self.collect_dfsas_from_query(query) + dfsas = self.collect_dfsas_from_query("no-dashboard-id", query) all_dfsas.extend(dfsas) # dump problems logger.info(f"Saving {len(all_problems)} linting problems...") @@ -123,7 +123,7 @@ def _lint_and_collect_from_dashboard( dashboard_name=dashboard_name, ) ) - dfsas = self.collect_dfsas_from_query(query) + dfsas = self.collect_dfsas_from_query(dashboard_id, query) for dfsa in dfsas: atom = LineageAtom( object_type="DASHBOARD", @@ -155,11 +155,11 @@ def lint_query(self, query: LegacyQuery) -> Iterable[QueryProblem]: ) @classmethod - def collect_dfsas_from_query(cls, query: LegacyQuery) -> Iterable[DirectFsAccess]: + def collect_dfsas_from_query(cls, dashboard_id: str, query: LegacyQuery) -> Iterable[DirectFsAccess]: if query.query is None: return linter = DirectFsAccessSqlLinter() - source_id = query.id or "no id" + source_id = f"{dashboard_id}/{query.id}" source_name = query.name or "" source_timestamp = cls._read_timestamp(query.updated_at) source_lineage = [LineageAtom(object_type="QUERY", object_id=source_id, other={"name": source_name})] diff --git a/tests/integration/assessment/test_dashboards.py b/tests/integration/assessment/test_dashboards.py new file mode 100644 index 0000000000..e95193faf5 --- /dev/null +++ b/tests/integration/assessment/test_dashboards.py @@ -0,0 +1,111 @@ +from datetime import datetime, timezone, timedelta + +import pytest + +from databricks.labs.ucx.source_code.directfs_access import DirectFsAccess, LineageAtom +from databricks.labs.ucx.source_code.jobs import JobProblem +from databricks.sdk.service.iam import PermissionLevel + +from databricks.labs.ucx.source_code.queries import QueryProblem + + +def _populate_workflow_problems(installation_ctx): + job_problems = [ + JobProblem( + job_id=12345, + job_name="Peter the Job", + task_key="23456", + path="parent/child.py", + code="sql-parse-error", + message="Could not parse SQL", + start_line=1234, + start_col=22, + end_line=1234, + end_col=32, + ) + ] + installation_ctx.sql_backend.save_table( + f'{installation_ctx.inventory_database}.workflow_problems', + job_problems, + JobProblem, + mode='overwrite', + ) + + +def _populate_dashboard_problems(installation_ctx): + query_problems = [ + QueryProblem( + dashboard_id="12345", + dashboard_parent="dashbards/parent", + dashboard_name="my_dashboard", + query_id="23456", + query_parent="queries/parent", + query_name="my_query", + code="sql-parse-error", + message="Could not parse SQL", + ) + ] + installation_ctx.sql_backend.save_table( + f'{installation_ctx.inventory_database}.query_problems', + query_problems, + QueryProblem, + mode='overwrite', + ) + + +def _populate_directfs_problems(installation_ctx): + dfsas = [ + DirectFsAccess( + path="some_path", + is_read=False, + is_write=True, + source_id="xyz.py", + source_timestamp=datetime.now(timezone.utc) - timedelta(hours=2.0), + source_lineage=[ + LineageAtom(object_type="WORKFLOW", object_id="my_workflow"), + LineageAtom(object_type="TASK", object_id="my_workflow/my_task"), + LineageAtom(object_type="NOTEBOOK", object_id="my_notebook"), + LineageAtom(object_type="FILE", object_id="my file"), + ], + assessment_start_timestamp=datetime.now(timezone.utc) - timedelta(minutes=5.0), + assessment_end_timestamp=datetime.now(timezone.utc) - timedelta(minutes=2.0), + ) + ] + installation_ctx.directfs_access_crawler_for_paths.dump_all(dfsas) + dfsas = [ + DirectFsAccess( + path="some_path", + is_read=False, + is_write=True, + source_id="xyz.py", + source_timestamp=datetime.now(timezone.utc) - timedelta(hours=2.0), + source_lineage=[ + LineageAtom(object_type="DASHBOARD", object_id="my_dashboard"), + LineageAtom(object_type="QUERY", object_id="my_dashboard/my_query"), + ], + assessment_start_timestamp=datetime.now(timezone.utc) - timedelta(minutes=5.0), + assessment_end_timestamp=datetime.now(timezone.utc) - timedelta(minutes=2.0), + ) + ] + installation_ctx.directfs_access_crawler_for_queries.dump_all(dfsas) + + +@pytest.mark.skip("Development tool") +def test_dashboard_with_prepopulated_data(installation_ctx, make_cluster_policy, make_cluster_policy_permissions): + """the purpose of this test is to prepopulate data used by the dashboard without running an actual -lengthy- assessment""" + ucx_group, _ = installation_ctx.make_ucx_group() + cluster_policy = make_cluster_policy() + make_cluster_policy_permissions( + object_id=cluster_policy.policy_id, + permission_level=PermissionLevel.CAN_USE, + group_name=ucx_group.display_name, + ) + installation_ctx.__dict__['include_object_permissions'] = [f"cluster-policies:{cluster_policy.policy_id}"] + installation_ctx.workspace_installation.run() + # populate data + _populate_workflow_problems(installation_ctx) + _populate_dashboard_problems(installation_ctx) + _populate_directfs_problems(installation_ctx) + print(f"\nInventory database is {installation_ctx.inventory_database}\n") + # put a breakpoint here + print("Put a breakpoint here! Then go check the dashboard in your workspace ;-)\n") diff --git a/tests/integration/assessment/test_workflows.py b/tests/integration/assessment/test_workflows.py index 7955e93c69..1110528bcf 100644 --- a/tests/integration/assessment/test_workflows.py +++ b/tests/integration/assessment/test_workflows.py @@ -5,7 +5,7 @@ from databricks.sdk.service.iam import PermissionLevel -@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=8)) +@retried(on=[NotFound, InvalidParameterValue]) def test_running_real_assessment_job( ws, installation_ctx, @@ -25,7 +25,7 @@ def test_running_real_assessment_job( populate_for_linting(installation_ctx.installation) - installation_ctx.deployed_workflows.run_workflow("assessment") + installation_ctx.deployed_workflows.run_workflow("assessment", max_wait=timedelta(minutes=25)) assert installation_ctx.deployed_workflows.validate_step("assessment") after = installation_ctx.generic_permissions_support.load_as_dict("cluster-policies", cluster_policy.policy_id) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index d2f48cdfa7..9e18b42284 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -28,8 +28,10 @@ from databricks.sdk.retries import retried from databricks.sdk.service import iam from databricks.sdk.service.catalog import FunctionInfo, SchemaInfo, TableInfo -from databricks.sdk.service.iam import Group +from databricks.sdk.service.compute import ClusterSpec from databricks.sdk.service.dashboards import Dashboard as SDKDashboard +from databricks.sdk.service.iam import Group +from databricks.sdk.service.jobs import Task, SparkPythonTask from databricks.sdk.service.sql import Dashboard, WidgetPosition, WidgetOptions, LegacyQuery from databricks.labs.ucx.__about__ import __version__ @@ -1236,19 +1238,65 @@ def _run(command: str) -> str: @pytest.fixture -def populate_for_linting(ws, make_random, make_job, make_notebook, make_query, make_dashboard, watchdog_purge_suffix): +def create_file_job(ws, make_random, watchdog_remove_after, watchdog_purge_suffix, log_workspace_link): + + def create(installation, **_kwargs): + # create args + data = {"name": f"dummy-{make_random(4)}"} + # create file to run + file_name = f"dummy_{make_random(4)}_{watchdog_purge_suffix}" + file_path = WorkspacePath(ws, installation.install_folder()) / file_name + file_path.write_text("spark.read.parquet('dbfs://mnt/foo/bar')") + task = Task( + task_key=make_random(4), + description=make_random(4), + new_cluster=ClusterSpec( + num_workers=1, + node_type_id=ws.clusters.select_node_type(local_disk=True, min_memory_gb=16), + spark_version=ws.clusters.select_spark_version(latest=True), + ), + spark_python_task=SparkPythonTask(python_file=str(file_path)), + timeout_seconds=0, + ) + data["tasks"] = [task] + # add RemoveAfter tag for job cleanup + data["tags"] = [{"key": "RemoveAfter", "value": watchdog_remove_after}] + job = ws.jobs.create(**data) + log_workspace_link(data["name"], f'job/{job.job_id}', anchor=False) + return job + + yield from factory("job", create, lambda item: ws.jobs.delete(item.job_id)) + + +@pytest.fixture +def populate_for_linting( + ws, + make_random, + make_job, + make_notebook, + make_query, + make_dashboard, + create_file_job, + watchdog_purge_suffix, +): + + def create_notebook_job(installation): + path = Path(installation.install_folder()) / f"dummy_{make_random(4)}_{watchdog_purge_suffix}" + notebook_text = "spark.read.parquet('dbfs://mnt/foo1/bar1')" + notebook_path = make_notebook(path=path, content=io.BytesIO(notebook_text.encode("utf-8"))) + return make_job(notebook_path=notebook_path) + def populate_workspace(installation): # keep linting scope to minimum to avoid test timeouts - path = Path(installation.install_folder()) / f"dummy-{make_random(4)}-{watchdog_purge_suffix}" - notebook_path = make_notebook(path=path, content=io.BytesIO(b"spark.read.parquet('dbfs://mnt/foo/bar')")) - job = make_job(notebook_path=notebook_path) - query = make_query(sql_query='SELECT * from parquet.`dbfs://mnt/foo/bar`') + file_job = create_file_job(installation=installation) + notebook_job = create_notebook_job(installation=installation) + query = make_query(sql_query='SELECT * from parquet.`dbfs://mnt/foo2/bar2`') dashboard = make_dashboard(query=query) # can't use installation.load(WorkspaceConfig)/installation.save() because they populate empty credentials config_path = WorkspacePath(ws, installation.install_folder()) / "config.yml" text = config_path.read_text() config = yaml.safe_load(text) - config["include_job_ids"] = [job.job_id] + config["include_job_ids"] = [file_job.job_id, notebook_job.job_id] config["include_dashboard_ids"] = [dashboard.id] text = yaml.dump(config) config_path.unlink() diff --git a/tests/integration/source_code/test_jobs.py b/tests/integration/source_code/test_jobs.py index 4c5ae4f758..451b5a3815 100644 --- a/tests/integration/source_code/test_jobs.py +++ b/tests/integration/source_code/test_jobs.py @@ -163,7 +163,7 @@ def test_job_linter_some_notebook_graph_with_problems( assert all(any(message.endswith(expected) for message in last_messages) for expected in expected_messages) assert len(dfsas) == 2 - task_keys = set(task.task_key for task in j.settings.tasks) + task_keys = set(f"{j.job_id}/{task.task_key}" for task in j.settings.tasks) yesterday = datetime.now(timezone.utc) - timedelta(days=1) for dfsa in dfsas: assert dfsa.source_id != DirectFsAccess.UNKNOWN @@ -172,7 +172,7 @@ def test_job_linter_some_notebook_graph_with_problems( assert dfsa.assessment_start_timestamp > yesterday assert dfsa.assessment_end_timestamp > yesterday assert dfsa.source_lineage[0] == LineageAtom( - object_type="JOB", object_id=str(j.job_id), other={"name": j.settings.name} + object_type="WORKFLOW", object_id=str(j.job_id), other={"name": j.settings.name} ) assert dfsa.source_lineage[1].object_type == "TASK" assert dfsa.source_lineage[1].object_id in task_keys diff --git a/tests/integration/source_code/test_queries.py b/tests/integration/source_code/test_queries.py index 5ecfbf45bd..10d4ded773 100644 --- a/tests/integration/source_code/test_queries.py +++ b/tests/integration/source_code/test_queries.py @@ -13,7 +13,8 @@ def test_query_linter_lints_queries_and_stores_dfsas(simple_ctx, ws, sql_backend assert len(problems) == 1 crawler = DirectFsAccessCrawler.for_queries(sql_backend, simple_ctx.inventory_database) all_dfsas = crawler.snapshot() - dfsas = [dfsa for dfsa in all_dfsas if dfsa.source_id == query.id] + source_id = f"{_dashboard.id}/{query.id}" + dfsas = [dfsa for dfsa in all_dfsas if dfsa.source_id == source_id] assert len(dfsas) == 1 dfsa = dfsas[0] assert len(dfsa.source_lineage) == 2 @@ -25,6 +26,6 @@ def test_query_linter_lints_queries_and_stores_dfsas(simple_ctx, ws, sql_backend assert lineage.other.get("name", None) == _dashboard.name lineage = dfsa.source_lineage[1] assert lineage.object_type == "QUERY" - assert lineage.object_id == query.id + assert lineage.object_id == source_id assert lineage.other assert lineage.other.get("name", None) == query.name diff --git a/tests/unit/source_code/test_queries.py b/tests/unit/source_code/test_queries.py index 216a4647a9..0ed1e187ba 100644 --- a/tests/unit/source_code/test_queries.py +++ b/tests/unit/source_code/test_queries.py @@ -27,7 +27,7 @@ def test_query_linter_collects_dfsas_from_queries(name, query, dfsa_paths, is_re crawlers = create_autospec(DirectFsAccessCrawler) query = LegacyQuery.from_dict({"parent": "workspace", "name": name, "query": query}) linter = QueryLinter(ws, migration_index, crawlers, None) - dfsas = linter.collect_dfsas_from_query(query) + dfsas = linter.collect_dfsas_from_query("no-dashboard-id", query) ws.assert_not_called() crawlers.assert_not_called() assert set(dfsa.path for dfsa in dfsas) == set(dfsa_paths)