diff --git a/databricks/sdk/core.py b/databricks/sdk/core.py index 372288e0d..a9418870b 100644 --- a/databricks/sdk/core.py +++ b/databricks/sdk/core.py @@ -24,6 +24,7 @@ from .azure import (ARM_DATABRICKS_RESOURCE_ID, ENVIRONMENTS, AzureEnvironment, add_sp_management_token, add_workspace_id_header) +from .errors import DatabricksError, _error_mapper from .oauth import (ClientCredentials, OAuthClient, OidcEndpoints, Refreshable, Token, TokenCache, TokenSource) from .retries import retried @@ -892,70 +893,6 @@ def copy(self): return cpy -class ErrorDetail: - - def __init__(self, - type: str = None, - reason: str = None, - domain: str = None, - metadata: dict = None, - **kwargs): - self.type = type - self.reason = reason - self.domain = domain - self.metadata = metadata - - @classmethod - def from_dict(cls, d: Dict[str, any]) -> 'ErrorDetail': - if '@type' in d: - d['type'] = d['@type'] - return cls(**d) - - -class DatabricksError(IOError): - """ Generic error from Databricks REST API """ - # Known ErrorDetail types - _error_info_type = "type.googleapis.com/google.rpc.ErrorInfo" - - def __init__(self, - message: str = None, - *, - error_code: str = None, - detail: str = None, - status: str = None, - scimType: str = None, - error: str = None, - retry_after_secs: int = None, - details: List[Dict[str, any]] = None, - **kwargs): - if error: - # API 1.2 has different response format, let's adapt - message = error - if detail: - # Handle SCIM error message details - # @see https://tools.ietf.org/html/rfc7644#section-3.7.3 - if detail == "null": - message = "SCIM API Internal Error" - else: - message = detail - # add more context from SCIM responses - message = f"{scimType} {message}".strip(" ") - error_code = f"SCIM_{status}" - super().__init__(message if message else error) - self.error_code = error_code - self.retry_after_secs = retry_after_secs - self.details = [ErrorDetail.from_dict(detail) for detail in details] if details else [] - self.kwargs = kwargs - - def get_error_info(self) -> List[ErrorDetail]: - return self._get_details_by_type(DatabricksError._error_info_type) - - def _get_details_by_type(self, error_type) -> List[ErrorDetail]: - if self.details == None: - return [] - return [detail for detail in self.details if detail.type == error_type] - - class ApiClient: _cfg: Config @@ -1180,7 +1117,7 @@ def _make_nicer_error(self, *, response: requests.Response, **kwargs) -> Databri if is_too_many_requests_or_unavailable: kwargs['retry_after_secs'] = self._parse_retry_after(response) kwargs['message'] = message - return DatabricksError(**kwargs) + return _error_mapper(status_code, kwargs) def _record_request_log(self, response: requests.Response, raw=False): if not logger.isEnabledFor(logging.DEBUG): diff --git a/databricks/sdk/errors.py b/databricks/sdk/errors.py index 7fe5dab39..b03d7a50d 100644 --- a/databricks/sdk/errors.py +++ b/databricks/sdk/errors.py @@ -1,6 +1,212 @@ +from typing import Dict, List + + class OperationFailed(RuntimeError): pass -class OperationTimeout(RuntimeError): - pass \ No newline at end of file +class OperationTimeout(RuntimeError, TimeoutError): + pass + + +class ErrorDetail: + + def __init__(self, + type: str = None, + reason: str = None, + domain: str = None, + metadata: dict = None, + **kwargs): + self.type = type + self.reason = reason + self.domain = domain + self.metadata = metadata + + @classmethod + def from_dict(cls, d: Dict[str, any]) -> 'ErrorDetail': + if '@type' in d: + d['type'] = d['@type'] + return cls(**d) + + +class DatabricksError(IOError): + """ Generic error from Databricks REST API """ + # Known ErrorDetail types + _error_info_type = "type.googleapis.com/google.rpc.ErrorInfo" + + def __init__(self, + message: str = None, + *, + error_code: str = None, + detail: str = None, + status: str = None, + scimType: str = None, + error: str = None, + retry_after_secs: int = None, + details: List[Dict[str, any]] = None, + **kwargs): + if error: + # API 1.2 has different response format, let's adapt + message = error + if detail: + # Handle SCIM error message details + # @see https://tools.ietf.org/html/rfc7644#section-3.7.3 + if detail == "null": + message = "SCIM API Internal Error" + else: + message = detail + # add more context from SCIM responses + message = f"{scimType} {message}".strip(" ") + error_code = f"SCIM_{status}" + super().__init__(message if message else error) + self.error_code = error_code + self.retry_after_secs = retry_after_secs + self.details = [ErrorDetail.from_dict(detail) for detail in details] if details else [] + self.kwargs = kwargs + + def get_error_info(self) -> List[ErrorDetail]: + return self._get_details_by_type(DatabricksError._error_info_type) + + def _get_details_by_type(self, error_type) -> List[ErrorDetail]: + if self.details == None: + return [] + return [detail for detail in self.details if detail.type == error_type] + + +class InternalError(DatabricksError): + """Some invariants expected by the underlying system have been broken. + This error generally cannot be resolved by the user.""" + + +class DataLoss(InternalError): + """Unrecoverable data loss or corruption.""" + + +class Unknown(InternalError): + """This error is used as a fallback if the platform-side mapping is missing some reason, also it can be used + to indicate an unknown error between the client library and the server.""" + + +class TemporarilyUnavailable(DatabricksError): + """The service is currently unavailable. This is most likely a transient condition, which can be + corrected by retrying with a backoff. Note that it is not always safe to retry-on-idempotent + operations.""" + + +class BadRequest(DatabricksError): + """The request is invalid.""" + + +class InvalidParameterValue(BadRequest): + """Supplied value for a parameter was invalid (e.g., giving a number for a string parameter).""" + + +class DeadlineExceeded(DatabricksError, TimeoutError): + """The deadline expired before the operation could complete. For operations that change the state + of the system, this error may be returned even if the operation has completed successfully. + + For example, a successful response from a server could have been delayed long enough for + the deadline to expire. When possible - implementations should make sure further processing of + the request is aborted, e.g. by throwing an exception instead of making the RPC request, + making the database query, etc.""" + + +class Cancelled(DatabricksError): + """The operation was explicitly canceled by the caller.""" + + +class NotFound(DatabricksError, LookupError): + """Operation was performed on a resource that does not exist, e.g. file or directory was not found.""" + + +class Unauthenticated(DatabricksError, PermissionError): + """The request does not have valid authentication (AuthN) credentials for the operation.""" + + +class PermissionDenied(DatabricksError, PermissionError): + """The caller does not have permission to execute the specified operation. + This error code does not imply the request is valid or the requested entity exists or + satisfies other pre-conditions.""" + + +class TooManyRequests(DatabricksError, ResourceWarning): + """Maps to HTTP code: 429 Too Many Requests""" + + +class ResourceExhausted(TooManyRequests): + """Operation is rejected due to per-user rate limiting, e.g. some resource has been exhausted, + per-user quota triggered, or the entire file system is out of space.""" + + +class RequestLimitExceeded(TooManyRequests): + """Cluster request was rejected because it would exceed a resource limit.""" + + +class ResourceConflict(DatabricksError): + """Maps to all HTTP 409 (Conflict) responses.""" + + +class AlreadyExists(ResourceConflict): + """Operation was rejected due a conflict with an existing resource, e.g. attempted to create + file or directory that already exists.""" + + +# TODO: or should it be class Aborted(AlreadyExists): ?... +class Aborted(ResourceConflict): + """The operation was aborted, typically due to a concurrency issue such as a sequencer check failure, + transaction abort, or transaction conflict.""" + + +class OperationNotImplemented(DatabricksError, NotImplementedError): + """The operation is not implemented or is not supported/enabled in this service. + This exception extends `NotImplementedError` from Python Standard Library.""" + + +_STATUS_CODE_MAPPING = { + 400: BadRequest, # also InvalidParameterValue + 401: Unauthenticated, + 403: PermissionDenied, + 404: NotFound, + 409: ResourceConflict, # also Aborted, AlreadyExists + 429: TooManyRequests, # also RequestLimitExceeded, ResourceExhausted + 499: Cancelled, + 500: InternalError, + 501: OperationNotImplemented, + 503: TemporarilyUnavailable, + 504: DeadlineExceeded, +} + +_ERROR_CODE_MAPPING = { + # HTTP 400 variants + 'INVALID_PARAMETER_VALUE': InvalidParameterValue, + + # HTTP 409 variants + 'ABORTED': Aborted, + 'ALREADY_EXISTS': AlreadyExists, + 'RESOURCE_ALREADY_EXISTS': AlreadyExists, + + # HTTP 429 variants + 'RESOURCE_EXHAUSTED': ResourceExhausted, + 'REQUEST_LIMIT_EXCEEDED': RequestLimitExceeded, + + # HTTP 500 variants + 'UNKNOWN': Unknown, + 'DATA_LOSS': DataLoss, +} + + +def _error_mapper(status_code: int, raw: dict) -> DatabricksError: + error_code = raw.get('error_code', None) + if error_code in _ERROR_CODE_MAPPING: + # more specific error codes override more generic HTTP status codes + return _ERROR_CODE_MAPPING[error_code](**raw) + + if status_code in _STATUS_CODE_MAPPING: + # more generic HTTP status codes matched after more specific error codes, + # where there's a default exception class per HTTP status code, and we do + # rely on Databricks platform exception mapper to do the right thing. + return _STATUS_CODE_MAPPING[status_code](**raw) + + # backwards-compatible error creation for cases like using older versions of + # the SDK on way never releases of the platform. + return DatabricksError(**raw) diff --git a/databricks/sdk/retries.py b/databricks/sdk/retries.py index bf36264e0..3bc9ccf26 100644 --- a/databricks/sdk/retries.py +++ b/databricks/sdk/retries.py @@ -5,6 +5,8 @@ from random import random from typing import Callable, List, Optional, Type +from databricks.sdk.errors import TemporarilyUnavailable, TooManyRequests + logger = logging.getLogger('databricks.sdk') @@ -30,12 +32,14 @@ def wrapper(*args, **kwargs): except Exception as err: last_err = err retry_reason = None - # sleep 10s max per attempt, unless it's HTTP 429 or 503 + # sleep 10s max per attempt sleep = min(10, attempt) - retry_after_secs = getattr(err, 'retry_after_secs', None) - if retry_after_secs is not None: - # cannot depend on DatabricksError directly because of circular dependency - sleep = retry_after_secs + if isinstance(err, (TooManyRequests, TemporarilyUnavailable)): + if err.retry_after_secs is not None: + # unless a valid `Retry-After` header is present in + # HTTP 429 or 503 responses, and it's greater than + # current sleep interval. + sleep = max(err.retry_after_secs, sleep) retry_reason = 'throttled by platform' elif is_retryable is not None: retry_reason = is_retryable(err) diff --git a/tests/test_errors.py b/tests/test_errors.py new file mode 100644 index 000000000..2fde75252 --- /dev/null +++ b/tests/test_errors.py @@ -0,0 +1,55 @@ +import pytest + +from databricks.sdk import errors + + +def test_error_code_has_precedence_over_http_status(): + err = errors._error_mapper(400, {'error_code': 'INVALID_PARAMETER_VALUE', 'message': 'nope'}) + assert errors.InvalidParameterValue == type(err) + + +def test_http_status_code_maps_fine(): + err = errors._error_mapper(400, {'error_code': 'MALFORMED_REQUEST', 'message': 'nope'}) + assert errors.BadRequest == type(err) + + +def test_other_errors_also_map_fine(): + err = errors._error_mapper(417, {'error_code': 'WHOOPS', 'message': 'nope'}) + assert errors.DatabricksError == type(err) + + +def test_missing_error_code(): + err = errors._error_mapper(522, {'message': 'nope'}) + assert errors.DatabricksError == type(err) + + +@pytest.mark.parametrize('status_code, error_code, klass', + [(400, ..., errors.BadRequest), (400, 'INVALID_PARAMETER_VALUE', errors.BadRequest), + (400, 'INVALID_PARAMETER_VALUE', errors.InvalidParameterValue), + (400, 'REQUEST_LIMIT_EXCEEDED', errors.TooManyRequests), (400, ..., IOError), + (401, ..., errors.Unauthenticated), (401, ..., PermissionError), + (401, ..., IOError), (403, ..., errors.PermissionDenied), + (403, ..., PermissionError), (403, ..., IOError), (404, ..., errors.NotFound), + (404, ..., LookupError), (404, ..., IOError), (409, ..., errors.ResourceConflict), + (409, 'ABORTED', errors.Aborted), (409, 'ABORTED', errors.ResourceConflict), + (409, 'ALREADY_EXISTS', errors.AlreadyExists), + (409, 'ALREADY_EXISTS', errors.ResourceConflict), (409, ..., IOError), + (429, ..., errors.TooManyRequests), + (429, 'REQUEST_LIMIT_EXCEEDED', errors.TooManyRequests), + (429, 'REQUEST_LIMIT_EXCEEDED', errors.RequestLimitExceeded), + (429, 'RESOURCE_EXHAUSTED', errors.TooManyRequests), + (429, 'RESOURCE_EXHAUSTED', errors.ResourceExhausted), (429, ..., ResourceWarning), + (417, 'REQUEST_LIMIT_EXCEEDED', ResourceWarning), (429, ..., IOError), + (499, ..., errors.Cancelled), (499, ..., IOError), (500, ..., errors.InternalError), + (500, 'UNKNOWN', errors.InternalError), (500, 'UNKNOWN', errors.Unknown), + (500, 'DATA_LOSS', errors.InternalError), (500, 'DATA_LOSS', errors.DataLoss), + (500, ..., IOError), (501, ..., errors.OperationNotImplemented), + (501, ..., NotImplementedError), (501, ..., IOError), + (503, ..., errors.TemporarilyUnavailable), (503, ..., IOError), + (504, ..., errors.DeadlineExceeded), (504, ..., TimeoutError), (504, ..., IOError), + (444, ..., errors.DatabricksError), (444, ..., IOError), ]) +def test_subclasses(status_code, error_code, klass): + try: + raise errors._error_mapper(status_code, {'error_code': error_code, 'message': 'nope'}) + except klass: + return