diff --git a/src/databricks/labs/ucx/assessment/pipelines.py b/src/databricks/labs/ucx/assessment/pipelines.py index 19bc8c558b..75cf643ddd 100644 --- a/src/databricks/labs/ucx/assessment/pipelines.py +++ b/src/databricks/labs/ucx/assessment/pipelines.py @@ -5,6 +5,7 @@ from databricks.labs.lsql.backends import SqlBackend from databricks.sdk import WorkspaceClient +from databricks.sdk.errors import NotFound from databricks.labs.ucx.assessment.clusters import CheckClusterMixin from databricks.labs.ucx.framework.crawlers import CrawlerBase @@ -31,9 +32,6 @@ def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): def _crawl(self) -> Iterable[PipelineInfo]: all_pipelines = list(self._ws.pipelines.list_pipelines()) - return list(self._assess_pipelines(all_pipelines)) - - def _assess_pipelines(self, all_pipelines) -> Iterable[PipelineInfo]: for pipeline in all_pipelines: creator_name = pipeline.creator_user_name or None if not creator_name: @@ -41,27 +39,28 @@ def _assess_pipelines(self, all_pipelines) -> Iterable[PipelineInfo]: f"Pipeline {pipeline.name} have Unknown creator, it means that the original creator " f"has been deleted and should be re-created" ) - pipeline_info = PipelineInfo( - pipeline_id=pipeline.pipeline_id, - pipeline_name=pipeline.name, - creator_name=creator_name, - success=1, - failures="[]", - ) - - failures = [] - pipeline_response = self._ws.pipelines.get(pipeline.pipeline_id) + try: + assert pipeline.pipeline_id is not None + pipeline_response = self._ws.pipelines.get(pipeline.pipeline_id) + except NotFound: + logger.warning(f"Pipeline disappeared, cannot assess: {pipeline.name} (id={pipeline.pipeline_id})") + continue assert pipeline_response.spec is not None pipeline_config = pipeline_response.spec.configuration + failures = [] if pipeline_config: failures.extend(self._check_spark_conf(pipeline_config, "pipeline")) clusters = pipeline_response.spec.clusters if clusters: self._pipeline_clusters(clusters, failures) - pipeline_info.failures = json.dumps(failures) - if len(failures) > 0: - pipeline_info.success = 0 - yield pipeline_info + failures_as_json = json.dumps(failures) + yield PipelineInfo( + pipeline_id=pipeline.pipeline_id, + pipeline_name=pipeline.name, + creator_name=creator_name, + success=int(not failures), + failures=failures_as_json, + ) def _pipeline_clusters(self, clusters, failures): for cluster in clusters: diff --git a/tests/unit/assessment/test_pipelines.py b/tests/unit/assessment/test_pipelines.py index 949e441f78..9a637538c2 100644 --- a/tests/unit/assessment/test_pipelines.py +++ b/tests/unit/assessment/test_pipelines.py @@ -1,7 +1,9 @@ +import logging from unittest.mock import create_autospec from databricks.labs.lsql.backends import MockBackend -from databricks.sdk.service.pipelines import GetPipelineResponse, PipelineStateInfo +from databricks.sdk.service.pipelines import GetPipelineResponse, PipelineStateInfo, PipelineSpec +from databricks.sdk.errors import ResourceDoesNotExist from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler from databricks.labs.ucx.assessment.pipelines import PipelineOwnership, PipelineInfo, PipelinesCrawler @@ -47,6 +49,29 @@ def test_pipeline_list_with_no_config(): assert len(crawler) == 0 +def test_pipeline_disappears_during_crawl(ws, mock_backend, caplog) -> None: + """Check that crawling doesn't fail if a pipeline is deleted after we list the pipelines but before we assess it.""" + ws.pipelines.list_pipelines.return_value = ( + PipelineStateInfo(pipeline_id="1", name="will_remain"), + PipelineStateInfo(pipeline_id="2", name="will_disappear"), + ) + + def mock_get(pipeline_id: str) -> GetPipelineResponse: + if pipeline_id == "2": + raise ResourceDoesNotExist("Simulated disappearance") + return GetPipelineResponse(pipeline_id=pipeline_id, spec=PipelineSpec(id=pipeline_id)) + + ws.pipelines.get = mock_get + + with caplog.at_level(logging.WARNING): + results = PipelinesCrawler(ws, mock_backend, "a_schema").snapshot() + + assert results == [ + PipelineInfo(pipeline_id="1", pipeline_name="will_remain", creator_name=None, success=1, failures="[]") + ] + assert "Pipeline disappeared, cannot assess: will_disappear (id=2)" in caplog.messages + + def test_pipeline_crawler_creator(): ws = mock_workspace_client() ws.pipelines.list_pipelines.return_value = (