diff --git a/src/databricks/labs/ucx/install.py b/src/databricks/labs/ucx/install.py index dfc4c90fbb..83d6ef99bc 100644 --- a/src/databricks/labs/ucx/install.py +++ b/src/databricks/labs/ucx/install.py @@ -23,7 +23,6 @@ AlreadyExists, BadRequest, Cancelled, - DatabricksError, DataLoss, DeadlineExceeded, InternalError, @@ -436,28 +435,20 @@ def run_workflow(self, step: str): except OperationFailed as err: # currently we don't have any good message from API, so we have to work around it. job_run = self._ws.jobs.get_run(job_run_waiter.run_id) - raise self._infer_nested_error(job_run) from err + raise self._infer_error_from_job_run(job_run) from err - def _infer_nested_error(self, job_run) -> Exception: - errors: list[DatabricksError] = [] + def _infer_error_from_job_run(self, job_run) -> Exception: + errors: list[Exception] = [] timeouts: list[DeadlineExceeded] = [] assert job_run.tasks is not None for run_task in job_run.tasks: - if not run_task.state: + error = self._infer_error_from_task_run(run_task) + if not error: continue - if run_task.state.result_state == jobs.RunResultState.TIMEDOUT: - msg = f"{run_task.task_key}: The run was stopped after reaching the timeout" - timeouts.append(DeadlineExceeded(msg)) + if isinstance(error, DeadlineExceeded): + timeouts.append(error) continue - if run_task.state.result_state != jobs.RunResultState.FAILED: - continue - assert run_task.run_id is not None - run_output = self._ws.jobs.get_run_output(run_task.run_id) - if logger.isEnabledFor(logging.DEBUG): - if run_output and run_output.error_trace: - sys.stderr.write(run_output.error_trace) - if run_output and run_output.error: - errors.append(self._infer_task_exception(f"{run_task.task_key}: {run_output.error}")) + errors.append(error) assert job_run.state is not None assert job_run.state.state_message is not None if len(errors) == 1: @@ -467,8 +458,29 @@ def _infer_nested_error(self, job_run) -> Exception: return Unknown(job_run.state.state_message) return ManyError(all_errors) + def _infer_error_from_task_run(self, run_task: jobs.RunTask) -> Exception | None: + if not run_task.state: + return None + if run_task.state.result_state == jobs.RunResultState.TIMEDOUT: + msg = f"{run_task.task_key}: The run was stopped after reaching the timeout" + return DeadlineExceeded(msg) + if run_task.state.result_state != jobs.RunResultState.FAILED: + return None + assert run_task.run_id is not None + run_output = self._ws.jobs.get_run_output(run_task.run_id) + if not run_output: + msg = f'No run output. {run_task.state.state_message}' + return InternalError(msg) + if logger.isEnabledFor(logging.DEBUG): + if run_output.error_trace: + sys.stderr.write(run_output.error_trace) + if not run_output.error: + msg = f'No error in run output. {run_task.state.state_message}' + return InternalError(msg) + return self._infer_task_exception(f"{run_task.task_key}: {run_output.error}") + @staticmethod - def _infer_task_exception(haystack: str) -> DatabricksError: + def _infer_task_exception(haystack: str) -> Exception: needles = [ BadRequest, Unauthenticated, @@ -490,8 +502,10 @@ def _infer_task_exception(haystack: str) -> DatabricksError: RequestLimitExceeded, Unknown, DataLoss, + ValueError, + KeyError, ] - constructors: dict[re.Pattern, type[DatabricksError]] = { + constructors: dict[re.Pattern, type[Exception]] = { re.compile(r".*\[TABLE_OR_VIEW_NOT_FOUND] (.*)"): NotFound, re.compile(r".*\[SCHEMA_NOT_FOUND] (.*)"): NotFound, } diff --git a/src/databricks/labs/ucx/workspace_access/generic.py b/src/databricks/labs/ucx/workspace_access/generic.py index c59657ab23..ceed0c93d2 100644 --- a/src/databricks/labs/ucx/workspace_access/generic.py +++ b/src/databricks/labs/ucx/workspace_access/generic.py @@ -418,6 +418,7 @@ def inner() -> list[GenericPermissionsInfo]: result = ws.api_client.do( "GET", "/api/2.0/feature-store/feature-tables/search", query={"page_token": token, "max_results": 200} ) + assert isinstance(result, dict) for table in result.get("feature_tables", []): feature_tables.append(GenericPermissionsInfo(table["id"], "feature-tables")) diff --git a/tests/integration/test_installation.py b/tests/integration/test_installation.py index 0148099056..463829b761 100644 --- a/tests/integration/test_installation.py +++ b/tests/integration/test_installation.py @@ -12,7 +12,7 @@ from databricks.labs.blueprint.parallel import Threads from databricks.labs.blueprint.tui import MockPrompts from databricks.labs.blueprint.wheels import WheelsV2 -from databricks.sdk.errors import InvalidParameterValue, NotFound, Unknown +from databricks.sdk.errors import InvalidParameterValue, NotFound from databricks.sdk.retries import retried from databricks.sdk.service import compute, sql from databricks.sdk.service.iam import PermissionLevel @@ -36,7 +36,7 @@ @pytest.fixture -def new_installation(ws, sql_backend, env_or_skip, inventory_schema, make_random, make_cluster_policy): +def new_installation(ws, sql_backend, env_or_skip, inventory_schema, make_random): cleanup = [] def factory(config_transform: Callable[[WorkspaceConfig], WorkspaceConfig] | None = None): @@ -98,7 +98,7 @@ def factory(config_transform: Callable[[WorkspaceConfig], WorkspaceConfig] | Non pending.uninstall() -@retried(on=[NotFound, Unknown, TimeoutError], timeout=timedelta(minutes=5)) +@retried(on=[NotFound, TimeoutError], timeout=timedelta(minutes=5)) def test_job_failure_propagates_correct_error_message_and_logs(ws, sql_backend, new_installation): install = new_installation() @@ -113,7 +113,7 @@ def test_job_failure_propagates_correct_error_message_and_logs(ws, sql_backend, assert len(workflow_run_logs) == 1 -@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=3)) +@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=3)) def test_job_cluster_policy(ws, new_installation): install = new_installation(lambda wc: replace(wc, override_clusters=None)) user_name = ws.current_user.me().user_name @@ -154,7 +154,7 @@ def test_new_job_cluster_with_policy_assessment( assert before[ws_group_a.display_name] == PermissionLevel.CAN_USE -@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=10)) +@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=10)) def test_running_real_assessment_job( ws, new_installation, make_ucx_group, make_cluster_policy, make_cluster_policy_permissions ): @@ -175,7 +175,7 @@ def test_running_real_assessment_job( assert before[ws_group_a.display_name] == PermissionLevel.CAN_USE -@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=5)) +@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5)) def test_running_real_migrate_groups_job( ws, sql_backend, new_installation, make_ucx_group, make_cluster_policy, make_cluster_policy_permissions ): @@ -208,7 +208,7 @@ def test_running_real_migrate_groups_job( assert found[f"{install.config.renamed_group_prefix}{ws_group_a.display_name}"] == PermissionLevel.CAN_USE -@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=5)) +@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5)) def test_running_real_validate_groups_permissions_job( ws, sql_backend, new_installation, make_group, make_query, make_query_permissions ): @@ -264,7 +264,7 @@ def test_running_real_validate_groups_permissions_job_fails( request_object_type="cluster-policies", request_object_id=cluster_policy.policy_id, access_control_list=[] ) - with pytest.raises(Unknown, match=r"Detected \d+ failures: ValueError"): + with pytest.raises(ValueError): install.run_workflow("validate-groups-permissions")