Skip to content

Commit

Permalink
Improved installation integration test flakiness (#998)
Browse files Browse the repository at this point in the history
- improved `_infer_error_from_job_run` and `_infer_error_from_task_run`
to also catch `KeyError` and `ValueError`
- removed retries for `Unknown` errors for installation tests
  • Loading branch information
nfx authored Mar 2, 2024
1 parent 5773358 commit c849cdf
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 27 deletions.
52 changes: 33 additions & 19 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
AlreadyExists,
BadRequest,
Cancelled,
DatabricksError,
DataLoss,
DeadlineExceeded,
InternalError,
Expand Down Expand Up @@ -436,28 +435,20 @@ def run_workflow(self, step: str):
except OperationFailed as err:
# currently we don't have any good message from API, so we have to work around it.
job_run = self._ws.jobs.get_run(job_run_waiter.run_id)
raise self._infer_nested_error(job_run) from err
raise self._infer_error_from_job_run(job_run) from err

def _infer_nested_error(self, job_run) -> Exception:
errors: list[DatabricksError] = []
def _infer_error_from_job_run(self, job_run) -> Exception:
errors: list[Exception] = []
timeouts: list[DeadlineExceeded] = []
assert job_run.tasks is not None
for run_task in job_run.tasks:
if not run_task.state:
error = self._infer_error_from_task_run(run_task)
if not error:
continue
if run_task.state.result_state == jobs.RunResultState.TIMEDOUT:
msg = f"{run_task.task_key}: The run was stopped after reaching the timeout"
timeouts.append(DeadlineExceeded(msg))
if isinstance(error, DeadlineExceeded):
timeouts.append(error)
continue
if run_task.state.result_state != jobs.RunResultState.FAILED:
continue
assert run_task.run_id is not None
run_output = self._ws.jobs.get_run_output(run_task.run_id)
if logger.isEnabledFor(logging.DEBUG):
if run_output and run_output.error_trace:
sys.stderr.write(run_output.error_trace)
if run_output and run_output.error:
errors.append(self._infer_task_exception(f"{run_task.task_key}: {run_output.error}"))
errors.append(error)
assert job_run.state is not None
assert job_run.state.state_message is not None
if len(errors) == 1:
Expand All @@ -467,8 +458,29 @@ def _infer_nested_error(self, job_run) -> Exception:
return Unknown(job_run.state.state_message)
return ManyError(all_errors)

def _infer_error_from_task_run(self, run_task: jobs.RunTask) -> Exception | None:
if not run_task.state:
return None
if run_task.state.result_state == jobs.RunResultState.TIMEDOUT:
msg = f"{run_task.task_key}: The run was stopped after reaching the timeout"
return DeadlineExceeded(msg)
if run_task.state.result_state != jobs.RunResultState.FAILED:
return None
assert run_task.run_id is not None
run_output = self._ws.jobs.get_run_output(run_task.run_id)
if not run_output:
msg = f'No run output. {run_task.state.state_message}'
return InternalError(msg)
if logger.isEnabledFor(logging.DEBUG):
if run_output.error_trace:
sys.stderr.write(run_output.error_trace)
if not run_output.error:
msg = f'No error in run output. {run_task.state.state_message}'
return InternalError(msg)
return self._infer_task_exception(f"{run_task.task_key}: {run_output.error}")

@staticmethod
def _infer_task_exception(haystack: str) -> DatabricksError:
def _infer_task_exception(haystack: str) -> Exception:
needles = [
BadRequest,
Unauthenticated,
Expand All @@ -490,8 +502,10 @@ def _infer_task_exception(haystack: str) -> DatabricksError:
RequestLimitExceeded,
Unknown,
DataLoss,
ValueError,
KeyError,
]
constructors: dict[re.Pattern, type[DatabricksError]] = {
constructors: dict[re.Pattern, type[Exception]] = {
re.compile(r".*\[TABLE_OR_VIEW_NOT_FOUND] (.*)"): NotFound,
re.compile(r".*\[SCHEMA_NOT_FOUND] (.*)"): NotFound,
}
Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/workspace_access/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ def inner() -> list[GenericPermissionsInfo]:
result = ws.api_client.do(
"GET", "/api/2.0/feature-store/feature-tables/search", query={"page_token": token, "max_results": 200}
)
assert isinstance(result, dict)
for table in result.get("feature_tables", []):
feature_tables.append(GenericPermissionsInfo(table["id"], "feature-tables"))

Expand Down
16 changes: 8 additions & 8 deletions tests/integration/test_installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from databricks.labs.blueprint.parallel import Threads
from databricks.labs.blueprint.tui import MockPrompts
from databricks.labs.blueprint.wheels import WheelsV2
from databricks.sdk.errors import InvalidParameterValue, NotFound, Unknown
from databricks.sdk.errors import InvalidParameterValue, NotFound
from databricks.sdk.retries import retried
from databricks.sdk.service import compute, sql
from databricks.sdk.service.iam import PermissionLevel
Expand All @@ -36,7 +36,7 @@


@pytest.fixture
def new_installation(ws, sql_backend, env_or_skip, inventory_schema, make_random, make_cluster_policy):
def new_installation(ws, sql_backend, env_or_skip, inventory_schema, make_random):
cleanup = []

def factory(config_transform: Callable[[WorkspaceConfig], WorkspaceConfig] | None = None):
Expand Down Expand Up @@ -98,7 +98,7 @@ def factory(config_transform: Callable[[WorkspaceConfig], WorkspaceConfig] | Non
pending.uninstall()


@retried(on=[NotFound, Unknown, TimeoutError], timeout=timedelta(minutes=5))
@retried(on=[NotFound, TimeoutError], timeout=timedelta(minutes=5))
def test_job_failure_propagates_correct_error_message_and_logs(ws, sql_backend, new_installation):
install = new_installation()

Expand All @@ -113,7 +113,7 @@ def test_job_failure_propagates_correct_error_message_and_logs(ws, sql_backend,
assert len(workflow_run_logs) == 1


@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=3))
@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=3))
def test_job_cluster_policy(ws, new_installation):
install = new_installation(lambda wc: replace(wc, override_clusters=None))
user_name = ws.current_user.me().user_name
Expand Down Expand Up @@ -154,7 +154,7 @@ def test_new_job_cluster_with_policy_assessment(
assert before[ws_group_a.display_name] == PermissionLevel.CAN_USE


@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=10))
@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=10))
def test_running_real_assessment_job(
ws, new_installation, make_ucx_group, make_cluster_policy, make_cluster_policy_permissions
):
Expand All @@ -175,7 +175,7 @@ def test_running_real_assessment_job(
assert before[ws_group_a.display_name] == PermissionLevel.CAN_USE


@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=5))
@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5))
def test_running_real_migrate_groups_job(
ws, sql_backend, new_installation, make_ucx_group, make_cluster_policy, make_cluster_policy_permissions
):
Expand Down Expand Up @@ -208,7 +208,7 @@ def test_running_real_migrate_groups_job(
assert found[f"{install.config.renamed_group_prefix}{ws_group_a.display_name}"] == PermissionLevel.CAN_USE


@retried(on=[NotFound, Unknown, InvalidParameterValue], timeout=timedelta(minutes=5))
@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5))
def test_running_real_validate_groups_permissions_job(
ws, sql_backend, new_installation, make_group, make_query, make_query_permissions
):
Expand Down Expand Up @@ -264,7 +264,7 @@ def test_running_real_validate_groups_permissions_job_fails(
request_object_type="cluster-policies", request_object_id=cluster_policy.policy_id, access_control_list=[]
)

with pytest.raises(Unknown, match=r"Detected \d+ failures: ValueError"):
with pytest.raises(ValueError):
install.run_workflow("validate-groups-permissions")


Expand Down

0 comments on commit c849cdf

Please sign in to comment.