Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always retry on HTTP 429 and 503, even if Retry-After header is not available #396

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 2 additions & 65 deletions databricks/sdk/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from .azure import (ARM_DATABRICKS_RESOURCE_ID, ENVIRONMENTS, AzureEnvironment,
add_sp_management_token, add_workspace_id_header)
from .errors import DatabricksError, _error_mapper
from .oauth import (ClientCredentials, OAuthClient, OidcEndpoints, Refreshable,
Token, TokenCache, TokenSource)
from .retries import retried
Expand Down Expand Up @@ -892,70 +893,6 @@ def copy(self):
return cpy


class ErrorDetail:

def __init__(self,
type: str = None,
reason: str = None,
domain: str = None,
metadata: dict = None,
**kwargs):
self.type = type
self.reason = reason
self.domain = domain
self.metadata = metadata

@classmethod
def from_dict(cls, d: Dict[str, any]) -> 'ErrorDetail':
if '@type' in d:
d['type'] = d['@type']
return cls(**d)


class DatabricksError(IOError):
""" Generic error from Databricks REST API """
# Known ErrorDetail types
_error_info_type = "type.googleapis.com/google.rpc.ErrorInfo"

def __init__(self,
message: str = None,
*,
error_code: str = None,
detail: str = None,
status: str = None,
scimType: str = None,
error: str = None,
retry_after_secs: int = None,
details: List[Dict[str, any]] = None,
**kwargs):
if error:
# API 1.2 has different response format, let's adapt
message = error
if detail:
# Handle SCIM error message details
# @see https://tools.ietf.org/html/rfc7644#section-3.7.3
if detail == "null":
message = "SCIM API Internal Error"
else:
message = detail
# add more context from SCIM responses
message = f"{scimType} {message}".strip(" ")
error_code = f"SCIM_{status}"
super().__init__(message if message else error)
self.error_code = error_code
self.retry_after_secs = retry_after_secs
self.details = [ErrorDetail.from_dict(detail) for detail in details] if details else []
self.kwargs = kwargs

def get_error_info(self) -> List[ErrorDetail]:
return self._get_details_by_type(DatabricksError._error_info_type)

def _get_details_by_type(self, error_type) -> List[ErrorDetail]:
if self.details == None:
return []
return [detail for detail in self.details if detail.type == error_type]


class ApiClient:
_cfg: Config

Expand Down Expand Up @@ -1180,7 +1117,7 @@ def _make_nicer_error(self, *, response: requests.Response, **kwargs) -> Databri
if is_too_many_requests_or_unavailable:
kwargs['retry_after_secs'] = self._parse_retry_after(response)
kwargs['message'] = message
return DatabricksError(**kwargs)
return _error_mapper(status_code, kwargs)

def _record_request_log(self, response: requests.Response, raw=False):
if not logger.isEnabledFor(logging.DEBUG):
Expand Down
210 changes: 208 additions & 2 deletions databricks/sdk/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,212 @@
from typing import Dict, List


class OperationFailed(RuntimeError):
pass


class OperationTimeout(RuntimeError):
pass
class OperationTimeout(RuntimeError, TimeoutError):
pass


class ErrorDetail:

def __init__(self,
type: str = None,
reason: str = None,
domain: str = None,
metadata: dict = None,
**kwargs):
self.type = type
self.reason = reason
self.domain = domain
self.metadata = metadata

@classmethod
def from_dict(cls, d: Dict[str, any]) -> 'ErrorDetail':
if '@type' in d:
d['type'] = d['@type']
return cls(**d)


class DatabricksError(IOError):
""" Generic error from Databricks REST API """
# Known ErrorDetail types
_error_info_type = "type.googleapis.com/google.rpc.ErrorInfo"

def __init__(self,
message: str = None,
*,
error_code: str = None,
detail: str = None,
status: str = None,
scimType: str = None,
error: str = None,
retry_after_secs: int = None,
details: List[Dict[str, any]] = None,
**kwargs):
if error:
# API 1.2 has different response format, let's adapt
message = error
if detail:
# Handle SCIM error message details
# @see https://tools.ietf.org/html/rfc7644#section-3.7.3
if detail == "null":
message = "SCIM API Internal Error"
else:
message = detail
# add more context from SCIM responses
message = f"{scimType} {message}".strip(" ")
error_code = f"SCIM_{status}"
super().__init__(message if message else error)
self.error_code = error_code
self.retry_after_secs = retry_after_secs
self.details = [ErrorDetail.from_dict(detail) for detail in details] if details else []
self.kwargs = kwargs

def get_error_info(self) -> List[ErrorDetail]:
return self._get_details_by_type(DatabricksError._error_info_type)

def _get_details_by_type(self, error_type) -> List[ErrorDetail]:
if self.details == None:
return []
return [detail for detail in self.details if detail.type == error_type]


class InternalError(DatabricksError):
"""Some invariants expected by the underlying system have been broken.
This error generally cannot be resolved by the user."""


class DataLoss(InternalError):
"""Unrecoverable data loss or corruption."""


class Unknown(InternalError):
"""This error is used as a fallback if the platform-side mapping is missing some reason, also it can be used
to indicate an unknown error between the client library and the server."""


class TemporarilyUnavailable(DatabricksError):
"""The service is currently unavailable. This is most likely a transient condition, which can be
corrected by retrying with a backoff. Note that it is not always safe to retry-on-idempotent
operations."""


class BadRequest(DatabricksError):
"""The request is invalid."""


class InvalidParameterValue(BadRequest):
"""Supplied value for a parameter was invalid (e.g., giving a number for a string parameter)."""


class DeadlineExceeded(DatabricksError, TimeoutError):
"""The deadline expired before the operation could complete. For operations that change the state
of the system, this error may be returned even if the operation has completed successfully.

For example, a successful response from a server could have been delayed long enough for
the deadline to expire. When possible - implementations should make sure further processing of
the request is aborted, e.g. by throwing an exception instead of making the RPC request,
making the database query, etc."""


class Cancelled(DatabricksError):
"""The operation was explicitly canceled by the caller."""


class NotFound(DatabricksError, LookupError):
"""Operation was performed on a resource that does not exist, e.g. file or directory was not found."""


class Unauthenticated(DatabricksError, PermissionError):
"""The request does not have valid authentication (AuthN) credentials for the operation."""


class PermissionDenied(DatabricksError, PermissionError):
"""The caller does not have permission to execute the specified operation.
This error code does not imply the request is valid or the requested entity exists or
satisfies other pre-conditions."""


class TooManyRequests(DatabricksError, ResourceWarning):
"""Maps to HTTP code: 429 Too Many Requests"""


class ResourceExhausted(TooManyRequests):
"""Operation is rejected due to per-user rate limiting, e.g. some resource has been exhausted,
per-user quota triggered, or the entire file system is out of space."""


class RequestLimitExceeded(TooManyRequests):
"""Cluster request was rejected because it would exceed a resource limit."""


class ResourceConflict(DatabricksError):
"""Maps to all HTTP 409 (Conflict) responses."""


class AlreadyExists(ResourceConflict):
"""Operation was rejected due a conflict with an existing resource, e.g. attempted to create
file or directory that already exists."""


# TODO: or should it be class Aborted(AlreadyExists): ?...
class Aborted(ResourceConflict):
"""The operation was aborted, typically due to a concurrency issue such as a sequencer check failure,
transaction abort, or transaction conflict."""


class OperationNotImplemented(DatabricksError, NotImplementedError):
"""The operation is not implemented or is not supported/enabled in this service.
This exception extends `NotImplementedError` from Python Standard Library."""


_STATUS_CODE_MAPPING = {
400: BadRequest, # also InvalidParameterValue
401: Unauthenticated,
403: PermissionDenied,
404: NotFound,
409: ResourceConflict, # also Aborted, AlreadyExists
429: TooManyRequests, # also RequestLimitExceeded, ResourceExhausted
499: Cancelled,
500: InternalError,
501: OperationNotImplemented,
503: TemporarilyUnavailable,
504: DeadlineExceeded,
}

_ERROR_CODE_MAPPING = {
# HTTP 400 variants
'INVALID_PARAMETER_VALUE': InvalidParameterValue,

# HTTP 409 variants
'ABORTED': Aborted,
'ALREADY_EXISTS': AlreadyExists,
'RESOURCE_ALREADY_EXISTS': AlreadyExists,

# HTTP 429 variants
'RESOURCE_EXHAUSTED': ResourceExhausted,
'REQUEST_LIMIT_EXCEEDED': RequestLimitExceeded,

# HTTP 500 variants
'UNKNOWN': Unknown,
'DATA_LOSS': DataLoss,
}


def _error_mapper(status_code: int, raw: dict) -> DatabricksError:
error_code = raw.get('error_code', None)
if error_code in _ERROR_CODE_MAPPING:
# more specific error codes override more generic HTTP status codes
return _ERROR_CODE_MAPPING[error_code](**raw)

if status_code in _STATUS_CODE_MAPPING:
# more generic HTTP status codes matched after more specific error codes,
# where there's a default exception class per HTTP status code, and we do
# rely on Databricks platform exception mapper to do the right thing.
return _STATUS_CODE_MAPPING[status_code](**raw)

# backwards-compatible error creation for cases like using older versions of
# the SDK on way never releases of the platform.
return DatabricksError(**raw)
14 changes: 9 additions & 5 deletions databricks/sdk/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from random import random
from typing import Callable, List, Optional, Type

from databricks.sdk.errors import TemporarilyUnavailable, TooManyRequests

logger = logging.getLogger('databricks.sdk')


Expand All @@ -30,12 +32,14 @@ def wrapper(*args, **kwargs):
except Exception as err:
last_err = err
retry_reason = None
# sleep 10s max per attempt, unless it's HTTP 429 or 503
# sleep 10s max per attempt
sleep = min(10, attempt)
retry_after_secs = getattr(err, 'retry_after_secs', None)
if retry_after_secs is not None:
# cannot depend on DatabricksError directly because of circular dependency
sleep = retry_after_secs
if isinstance(err, (TooManyRequests, TemporarilyUnavailable)):
if err.retry_after_secs is not None:
# unless a valid `Retry-After` header is present in
# HTTP 429 or 503 responses, and it's greater than
# current sleep interval.
sleep = max(err.retry_after_secs, sleep)
retry_reason = 'throttled by platform'
elif is_retryable is not None:
retry_reason = is_retryable(err)
Expand Down
55 changes: 55 additions & 0 deletions tests/test_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import pytest

from databricks.sdk import errors


def test_error_code_has_precedence_over_http_status():
err = errors._error_mapper(400, {'error_code': 'INVALID_PARAMETER_VALUE', 'message': 'nope'})
assert errors.InvalidParameterValue == type(err)


def test_http_status_code_maps_fine():
err = errors._error_mapper(400, {'error_code': 'MALFORMED_REQUEST', 'message': 'nope'})
assert errors.BadRequest == type(err)


def test_other_errors_also_map_fine():
err = errors._error_mapper(417, {'error_code': 'WHOOPS', 'message': 'nope'})
assert errors.DatabricksError == type(err)


def test_missing_error_code():
err = errors._error_mapper(522, {'message': 'nope'})
assert errors.DatabricksError == type(err)


@pytest.mark.parametrize('status_code, error_code, klass',
[(400, ..., errors.BadRequest), (400, 'INVALID_PARAMETER_VALUE', errors.BadRequest),
(400, 'INVALID_PARAMETER_VALUE', errors.InvalidParameterValue),
(400, 'REQUEST_LIMIT_EXCEEDED', errors.TooManyRequests), (400, ..., IOError),
(401, ..., errors.Unauthenticated), (401, ..., PermissionError),
(401, ..., IOError), (403, ..., errors.PermissionDenied),
(403, ..., PermissionError), (403, ..., IOError), (404, ..., errors.NotFound),
(404, ..., LookupError), (404, ..., IOError), (409, ..., errors.ResourceConflict),
(409, 'ABORTED', errors.Aborted), (409, 'ABORTED', errors.ResourceConflict),
(409, 'ALREADY_EXISTS', errors.AlreadyExists),
(409, 'ALREADY_EXISTS', errors.ResourceConflict), (409, ..., IOError),
(429, ..., errors.TooManyRequests),
(429, 'REQUEST_LIMIT_EXCEEDED', errors.TooManyRequests),
(429, 'REQUEST_LIMIT_EXCEEDED', errors.RequestLimitExceeded),
(429, 'RESOURCE_EXHAUSTED', errors.TooManyRequests),
(429, 'RESOURCE_EXHAUSTED', errors.ResourceExhausted), (429, ..., ResourceWarning),
(417, 'REQUEST_LIMIT_EXCEEDED', ResourceWarning), (429, ..., IOError),
(499, ..., errors.Cancelled), (499, ..., IOError), (500, ..., errors.InternalError),
(500, 'UNKNOWN', errors.InternalError), (500, 'UNKNOWN', errors.Unknown),
(500, 'DATA_LOSS', errors.InternalError), (500, 'DATA_LOSS', errors.DataLoss),
(500, ..., IOError), (501, ..., errors.OperationNotImplemented),
(501, ..., NotImplementedError), (501, ..., IOError),
(503, ..., errors.TemporarilyUnavailable), (503, ..., IOError),
(504, ..., errors.DeadlineExceeded), (504, ..., TimeoutError), (504, ..., IOError),
(444, ..., errors.DatabricksError), (444, ..., IOError), ])
def test_subclasses(status_code, error_code, klass):
try:
raise errors._error_mapper(status_code, {'error_code': error_code, 'message': 'nope'})
except klass:
return