Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that pipeline assessment doesn't fail if a pipeline is deleted… #3034

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions src/databricks/labs/ucx/assessment/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound

from databricks.labs.ucx.assessment.clusters import CheckClusterMixin
from databricks.labs.ucx.framework.crawlers import CrawlerBase
Expand All @@ -31,37 +32,35 @@ def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):

def _crawl(self) -> Iterable[PipelineInfo]:
all_pipelines = list(self._ws.pipelines.list_pipelines())
return list(self._assess_pipelines(all_pipelines))

def _assess_pipelines(self, all_pipelines) -> Iterable[PipelineInfo]:
for pipeline in all_pipelines:
creator_name = pipeline.creator_user_name or None
if not creator_name:
logger.warning(
f"Pipeline {pipeline.name} have Unknown creator, it means that the original creator "
f"has been deleted and should be re-created"
)
pipeline_info = PipelineInfo(
pipeline_id=pipeline.pipeline_id,
pipeline_name=pipeline.name,
creator_name=creator_name,
success=1,
failures="[]",
)

failures = []
pipeline_response = self._ws.pipelines.get(pipeline.pipeline_id)
try:
assert pipeline.pipeline_id is not None
pipeline_response = self._ws.pipelines.get(pipeline.pipeline_id)
except NotFound:
logger.warning(f"Pipeline disappeared, cannot assess: {pipeline.name} (id={pipeline.pipeline_id})")
continue
assert pipeline_response.spec is not None
pipeline_config = pipeline_response.spec.configuration
failures = []
if pipeline_config:
failures.extend(self._check_spark_conf(pipeline_config, "pipeline"))
clusters = pipeline_response.spec.clusters
if clusters:
self._pipeline_clusters(clusters, failures)
pipeline_info.failures = json.dumps(failures)
if len(failures) > 0:
pipeline_info.success = 0
yield pipeline_info
failures_as_json = json.dumps(failures)
yield PipelineInfo(
pipeline_id=pipeline.pipeline_id,
pipeline_name=pipeline.name,
creator_name=creator_name,
success=int(not failures),
failures=failures_as_json,
)

def _pipeline_clusters(self, clusters, failures):
for cluster in clusters:
Expand Down
27 changes: 26 additions & 1 deletion tests/unit/assessment/test_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
from unittest.mock import create_autospec

from databricks.labs.lsql.backends import MockBackend
from databricks.sdk.service.pipelines import GetPipelineResponse, PipelineStateInfo
from databricks.sdk.service.pipelines import GetPipelineResponse, PipelineStateInfo, PipelineSpec
from databricks.sdk.errors import ResourceDoesNotExist

from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler
from databricks.labs.ucx.assessment.pipelines import PipelineOwnership, PipelineInfo, PipelinesCrawler
Expand Down Expand Up @@ -47,6 +49,29 @@ def test_pipeline_list_with_no_config():
assert len(crawler) == 0


def test_pipeline_disappears_during_crawl(ws, mock_backend, caplog) -> None:
"""Check that crawling doesn't fail if a pipeline is deleted after we list the pipelines but before we assess it."""
ws.pipelines.list_pipelines.return_value = (
PipelineStateInfo(pipeline_id="1", name="will_remain"),
PipelineStateInfo(pipeline_id="2", name="will_disappear"),
)

def mock_get(pipeline_id: str) -> GetPipelineResponse:
if pipeline_id == "2":
raise ResourceDoesNotExist("Simulated disappearance")
return GetPipelineResponse(pipeline_id=pipeline_id, spec=PipelineSpec(id=pipeline_id))

ws.pipelines.get = mock_get

with caplog.at_level(logging.WARNING):
results = PipelinesCrawler(ws, mock_backend, "a_schema").snapshot()

assert results == [
PipelineInfo(pipeline_id="1", pipeline_name="will_remain", creator_name=None, success=1, failures="[]")
]
assert "Pipeline disappeared, cannot assess: will_disappear (id=2)" in caplog.messages


def test_pipeline_crawler_creator():
ws = mock_workspace_client()
ws.pipelines.list_pipelines.return_value = (
Expand Down
Loading