diff --git a/databricks/sdk/core.py b/databricks/sdk/core.py index b686bd7fd..e028e4b15 100644 --- a/databricks/sdk/core.py +++ b/databricks/sdk/core.py @@ -1,7 +1,5 @@ import re -import urllib.parse from datetime import timedelta -from json import JSONDecodeError from types import TracebackType from typing import Any, BinaryIO, Iterator, Type from urllib.parse import urlencode @@ -12,8 +10,8 @@ from .config import * # To preserve backwards compatibility (as these definitions were previously in this module) from .credentials_provider import * -from .errors import DatabricksError, error_mapper -from .errors.private_link import _is_private_link_redirect +from .errors import DatabricksError, get_api_error +from .logger import RoundTrip from .oauth import retrieve_token from .retries import retried @@ -262,134 +260,23 @@ def _perform(self, auth=auth, stream=raw, timeout=self._http_timeout_seconds) - try: - self._record_request_log(response, raw=raw or data is not None or files is not None) - if not response.ok: # internally calls response.raise_for_status() - # TODO: experiment with traceback pruning for better readability - # See https://stackoverflow.com/a/58821552/277035 - payload = response.json() - raise self._make_nicer_error(response=response, **payload) from None - # Private link failures happen via a redirect to the login page. From a requests-perspective, the request - # is successful, but the response is not what we expect. We need to handle this case separately. - if _is_private_link_redirect(response): - raise self._make_nicer_error(response=response) from None - return response - except requests.exceptions.JSONDecodeError: - message = self._make_sense_from_html(response.text) - if not message: - message = response.reason - raise self._make_nicer_error(response=response, message=message) from None - - @staticmethod - def _make_sense_from_html(txt: str) -> str: - matchers = [r'
(.*)
', r'(.*)'] - for attempt in matchers: - expr = re.compile(attempt, re.MULTILINE) - match = expr.search(txt) - if not match: - continue - return match.group(1).strip() - return txt - - def _make_nicer_error(self, *, response: requests.Response, **kwargs) -> DatabricksError: - status_code = response.status_code - message = kwargs.get('message', 'request failed') - is_http_unauthorized_or_forbidden = status_code in (401, 403) - is_too_many_requests_or_unavailable = status_code in (429, 503) - if is_http_unauthorized_or_forbidden: - message = self._cfg.wrap_debug_info(message) - if is_too_many_requests_or_unavailable: - kwargs['retry_after_secs'] = self._parse_retry_after(response) - kwargs['message'] = message - return error_mapper(response, kwargs) - - def _record_request_log(self, response: requests.Response, raw=False): + self._record_request_log(response, raw=raw or data is not None or files is not None) + error = get_api_error(response) + if error is not None: + status_code = response.status_code + is_http_unauthorized_or_forbidden = status_code in (401, 403) + is_too_many_requests_or_unavailable = status_code in (429, 503) + if is_http_unauthorized_or_forbidden: + error.message = self._cfg.wrap_debug_info(error.message) + if is_too_many_requests_or_unavailable: + error.retry_after_secs = self._parse_retry_after(response) + raise error from None + return response + + def _record_request_log(self, response: requests.Response, raw: bool = False) -> None: if not logger.isEnabledFor(logging.DEBUG): return - request = response.request - url = urllib.parse.urlparse(request.url) - query = '' - if url.query: - query = f'?{urllib.parse.unquote(url.query)}' - sb = [f'{request.method} {urllib.parse.unquote(url.path)}{query}'] - if self._cfg.debug_headers: - if self._cfg.host: - sb.append(f'> * Host: {self._cfg.host}') - for k, v in request.headers.items(): - sb.append(f'> * {k}: {self._only_n_bytes(v, self._debug_truncate_bytes)}') - if request.body: - sb.append("> [raw stream]" if raw else self._redacted_dump("> ", request.body)) - sb.append(f'< {response.status_code} {response.reason}') - if raw and response.headers.get('Content-Type', None) != 'application/json': - # Raw streams with `Transfer-Encoding: chunked` do not have `Content-Type` header - sb.append("< [raw stream]") - elif response.content: - sb.append(self._redacted_dump("< ", response.content)) - logger.debug("\n".join(sb)) - - @staticmethod - def _mask(m: Dict[str, any]): - for k in m: - if k in {'bytes_value', 'string_value', 'token_value', 'value', 'content'}: - m[k] = "**REDACTED**" - - @staticmethod - def _map_keys(m: Dict[str, any]) -> List[str]: - keys = list(m.keys()) - keys.sort() - return keys - - @staticmethod - def _only_n_bytes(j: str, num_bytes: int = 96) -> str: - diff = len(j.encode('utf-8')) - num_bytes - if diff > 0: - return f"{j[:num_bytes]}... ({diff} more bytes)" - return j - - def _recursive_marshal_dict(self, m, budget) -> dict: - out = {} - self._mask(m) - for k in sorted(m.keys()): - raw = self._recursive_marshal(m[k], budget) - out[k] = raw - budget -= len(str(raw)) - return out - - def _recursive_marshal_list(self, s, budget) -> list: - out = [] - for i in range(len(s)): - if i > 0 >= budget: - out.append("... (%d additional elements)" % (len(s) - len(out))) - break - raw = self._recursive_marshal(s[i], budget) - out.append(raw) - budget -= len(str(raw)) - return out - - def _recursive_marshal(self, v: any, budget: int) -> any: - if isinstance(v, dict): - return self._recursive_marshal_dict(v, budget) - elif isinstance(v, list): - return self._recursive_marshal_list(v, budget) - elif isinstance(v, str): - return self._only_n_bytes(v, self._debug_truncate_bytes) - else: - return v - - def _redacted_dump(self, prefix: str, body: str) -> str: - if len(body) == 0: - return "" - try: - # Unmarshal body into primitive types. - tmp = json.loads(body) - max_bytes = 96 - if self._debug_truncate_bytes > max_bytes: - max_bytes = self._debug_truncate_bytes - # Re-marshal body taking redaction and character limit into account. - raw = self._recursive_marshal(tmp, max_bytes) - return "\n".join([f'{prefix}{line}' for line in json.dumps(raw, indent=2).split("\n")]) - except JSONDecodeError: - return f'{prefix}[non-JSON document of {len(body)} bytes]' + logger.debug(RoundTrip(response, self._cfg.debug_headers, self._debug_truncate_bytes, raw).generate()) class StreamingResponse(BinaryIO): diff --git a/databricks/sdk/errors/__init__.py b/databricks/sdk/errors/__init__.py index 749c95116..578406803 100644 --- a/databricks/sdk/errors/__init__.py +++ b/databricks/sdk/errors/__init__.py @@ -1,5 +1,6 @@ from .base import DatabricksError, ErrorDetail -from .mapper import error_mapper +from .mapper import _error_mapper +from .parser import get_api_error from .platform import * from .private_link import PrivateLinkValidationError from .sdk import * diff --git a/databricks/sdk/errors/base.py b/databricks/sdk/errors/base.py index 89be376b6..973c3644e 100644 --- a/databricks/sdk/errors/base.py +++ b/databricks/sdk/errors/base.py @@ -1,4 +1,5 @@ import re +import warnings from dataclasses import dataclass from typing import Dict, List, Optional @@ -41,9 +42,38 @@ def __init__(self, retry_after_secs: int = None, details: List[Dict[str, any]] = None, **kwargs): + """ + + :param message: + :param error_code: + :param detail: [Deprecated] + :param status: [Deprecated] + :param scimType: [Deprecated] + :param error: [Deprecated] + :param retry_after_secs: + :param details: + :param kwargs: + """ + # SCIM-specific parameters are deprecated + if detail: + warnings.warn( + "The 'detail' parameter of DatabricksError is deprecated and will be removed in a future version." + ) + if scimType: + warnings.warn( + "The 'scimType' parameter of DatabricksError is deprecated and will be removed in a future version." + ) + if status: + warnings.warn( + "The 'status' parameter of DatabricksError is deprecated and will be removed in a future version." + ) + + # API 1.2-specific parameters are deprecated if error: - # API 1.2 has different response format, let's adapt - message = error + warnings.warn( + "The 'error' parameter of DatabricksError is deprecated and will be removed in a future version." + ) + if detail: # Handle SCIM error message details # @see https://tools.ietf.org/html/rfc7644#section-3.7.3 diff --git a/databricks/sdk/errors/mapper.py b/databricks/sdk/errors/mapper.py index 0b809eb7e..282b09c76 100644 --- a/databricks/sdk/errors/mapper.py +++ b/databricks/sdk/errors/mapper.py @@ -4,11 +4,9 @@ from databricks.sdk.errors.base import DatabricksError from .overrides import _ALL_OVERRIDES -from .private_link import (_get_private_link_validation_error, - _is_private_link_redirect) -def error_mapper(response: requests.Response, raw: dict) -> DatabricksError: +def _error_mapper(response: requests.Response, raw: dict) -> DatabricksError: for override in _ALL_OVERRIDES: if override.matches(response, raw): return override.custom_error(**raw) @@ -23,8 +21,6 @@ def error_mapper(response: requests.Response, raw: dict) -> DatabricksError: # where there's a default exception class per HTTP status code, and we do # rely on Databricks platform exception mapper to do the right thing. return platform.STATUS_CODE_MAPPING[status_code](**raw) - if _is_private_link_redirect(response): - return _get_private_link_validation_error(response.url) # backwards-compatible error creation for cases like using older versions of # the SDK on way never releases of the platform. diff --git a/databricks/sdk/errors/parser.py b/databricks/sdk/errors/parser.py new file mode 100644 index 000000000..e2feb99d6 --- /dev/null +++ b/databricks/sdk/errors/parser.py @@ -0,0 +1,146 @@ +import abc +import json +import logging +import re +from typing import Optional + +import requests + +from ..logger import RoundTrip +from .base import DatabricksError +from .mapper import _error_mapper +from .private_link import (_get_private_link_validation_error, + _is_private_link_redirect) + + +class _ErrorParser(abc.ABC): + """A parser for errors from the Databricks REST API.""" + + @abc.abstractmethod + def parse_error(self, response: requests.Response, response_body: bytes) -> Optional[dict]: + """Parses an error from the Databricks REST API. If the error cannot be parsed, returns None.""" + + +class _EmptyParser(_ErrorParser): + """A parser that handles empty responses.""" + + def parse_error(self, response: requests.Response, response_body: bytes) -> Optional[dict]: + if len(response_body) == 0: + return {'message': response.reason} + return None + + +class _StandardErrorParser(_ErrorParser): + """ + Parses errors from the Databricks REST API using the standard error format. + """ + + def parse_error(self, response: requests.Response, response_body: bytes) -> Optional[dict]: + try: + payload_str = response_body.decode('utf-8') + resp: dict = json.loads(payload_str) + except json.JSONDecodeError as e: + logging.debug('_StandardErrorParser: unable to deserialize response as json', exc_info=e) + return None + + error_args = { + 'message': resp.get('message', 'request failed'), + 'error_code': resp.get('error_code'), + 'details': resp.get('details'), + } + + # Handle API 1.2-style errors + if 'error' in resp: + error_args['message'] = resp['error'] + + # Handle SCIM Errors + detail = resp.get('detail') + status = resp.get('status') + scim_type = resp.get('scimType') + if detail: + # Handle SCIM error message details + # @see https://tools.ietf.org/html/rfc7644#section-3.7.3 + error_args[ + 'message'] = f"{scim_type} {error_args.get('message', 'SCIM API Internal Error')}".strip(" ") + error_args['error_code'] = f"SCIM_{status}" + return error_args + + +class _StringErrorParser(_ErrorParser): + """ + Parses errors from the Databricks REST API in the format "ERROR_CODE: MESSAGE". + """ + + __STRING_ERROR_REGEX = re.compile(r'([A-Z_]+): (.*)') + + def parse_error(self, response: requests.Response, response_body: bytes) -> Optional[dict]: + payload_str = response_body.decode('utf-8') + match = self.__STRING_ERROR_REGEX.match(payload_str) + if not match: + logging.debug('_StringErrorParser: unable to parse response as string') + return None + error_code, message = match.groups() + return {'error_code': error_code, 'message': message, 'status': response.status_code, } + + +class _HtmlErrorParser(_ErrorParser): + """ + Parses errors from the Databricks REST API in HTML format. + """ + + __HTML_ERROR_REGEXES = [re.compile(r'
(.*)
'), re.compile(r'(.*)'), ] + + def parse_error(self, response: requests.Response, response_body: bytes) -> Optional[dict]: + payload_str = response_body.decode('utf-8') + for regex in self.__HTML_ERROR_REGEXES: + match = regex.search(payload_str) + if match: + message = match.group(1) if match.group(1) else response.reason + return { + 'status': response.status_code, + 'message': message, + 'error_code': response.reason.upper().replace(' ', '_') + } + logging.debug('_HtmlErrorParser: no
 tag found in error response')
+        return None
+
+
+# A list of ErrorParsers that are tried in order to parse an API error from a response body. Most errors should be
+# parsable by the _StandardErrorParser, but additional parsers can be added here for specific error formats. The order
+# of the parsers is not important, as the set of errors that can be parsed by each parser should be disjoint.
+_error_parsers = [_EmptyParser(), _StandardErrorParser(), _StringErrorParser(), _HtmlErrorParser(), ]
+
+
+def _unknown_error(response: requests.Response) -> str:
+    """A standard error message that can be shown when an API response cannot be parsed.
+
+    This error message includes a link to the issue tracker for the SDK for users to report the issue to us.
+    """
+    request_log = RoundTrip(response, debug_headers=True, debug_truncate_bytes=10 * 1024).generate()
+    return (
+        'This is likely a bug in the Databricks SDK for Python or the underlying '
+        'API. Please report this issue with the following debugging information to the SDK issue tracker at '
+        f'https://github.com/databricks/databricks-sdk-go/issues. Request log:```{request_log}```')
+
+
+def get_api_error(response: requests.Response) -> Optional[DatabricksError]:
+    """
+    Handles responses from the REST API and returns a DatabricksError if the response indicates an error.
+    :param response: The response from the REST API.
+    :return: A DatabricksError if the response indicates an error, otherwise None.
+    """
+    if not response.ok:
+        content = response.content
+        for parser in _error_parsers:
+            try:
+                error_args = parser.parse_error(response, content)
+                if error_args:
+                    return _error_mapper(response, error_args)
+            except Exception as e:
+                logging.debug(f'Error parsing response with {parser}, continuing', exc_info=e)
+        return _error_mapper(response, {'message': 'unable to parse response. ' + _unknown_error(response)})
+
+    # Private link failures happen via a redirect to the login page. From a requests-perspective, the request
+    # is successful, but the response is not what we expect. We need to handle this case separately.
+    if _is_private_link_redirect(response):
+        return _get_private_link_validation_error(response.url)
diff --git a/databricks/sdk/errors/private_link.py b/databricks/sdk/errors/private_link.py
index e8cc5eadf..946b41b50 100644
--- a/databricks/sdk/errors/private_link.py
+++ b/databricks/sdk/errors/private_link.py
@@ -51,7 +51,7 @@ def _is_private_link_redirect(resp: requests.Response) -> bool:
     return parsed.path == '/login.html' and 'error=private-link-validation-error' in parsed.query
 
 
-def _get_private_link_validation_error(url: str) -> _PrivateLinkInfo:
+def _get_private_link_validation_error(url: str) -> PrivateLinkValidationError:
     parsed = parse.urlparse(url)
     env = get_environment_for_hostname(parsed.hostname)
     return PrivateLinkValidationError(message=_private_link_info_map[env.cloud].error_message(),
diff --git a/databricks/sdk/logger/__init__.py b/databricks/sdk/logger/__init__.py
new file mode 100644
index 000000000..f843f05f6
--- /dev/null
+++ b/databricks/sdk/logger/__init__.py
@@ -0,0 +1 @@
+from .round_trip_logger import RoundTrip
diff --git a/databricks/sdk/logger/round_trip_logger.py b/databricks/sdk/logger/round_trip_logger.py
new file mode 100644
index 000000000..f1d177aaa
--- /dev/null
+++ b/databricks/sdk/logger/round_trip_logger.py
@@ -0,0 +1,118 @@
+import json
+import urllib.parse
+from typing import Dict, List
+
+import requests
+
+
+class RoundTrip:
+    """
+    A utility class for converting HTTP requests and responses to strings.
+
+    :param response: The response object to stringify.
+    :param debug_headers: Whether to include headers in the generated string.
+    :param debug_truncate_bytes: The maximum number of bytes to include in the generated string.
+    :param raw: Whether the response is a stream or not. If True, the response will not be logged directly.
+    """
+
+    def __init__(self,
+                 response: requests.Response,
+                 debug_headers: bool,
+                 debug_truncate_bytes: int,
+                 raw=False):
+        self._debug_headers = debug_headers
+        self._debug_truncate_bytes = max(debug_truncate_bytes, 96)
+        self._raw = raw
+        self._response = response
+
+    def generate(self) -> str:
+        """
+        Generate a string representation of the request and response. The string will include the request method, URL,
+        headers, and body, as well as the response status code, reason, headers, and body. Outgoing information
+        will be prefixed with `>`, and incoming information will be prefixed with `<`.
+        :return: A string representation of the request.
+        """
+        request = self._response.request
+        url = urllib.parse.urlparse(request.url)
+        query = ''
+        if url.query:
+            query = f'?{urllib.parse.unquote(url.query)}'
+        sb = [f'{request.method} {urllib.parse.unquote(url.path)}{query}']
+        if self._debug_headers:
+            for k, v in request.headers.items():
+                sb.append(f'> * {k}: {self._only_n_bytes(v, self._debug_truncate_bytes)}')
+        if request.body:
+            sb.append("> [raw stream]" if self._raw else self._redacted_dump("> ", request.body))
+        sb.append(f'< {self._response.status_code} {self._response.reason}')
+        if self._raw and self._response.headers.get('Content-Type', None) != 'application/json':
+            # Raw streams with `Transfer-Encoding: chunked` do not have `Content-Type` header
+            sb.append("< [raw stream]")
+        elif self._response.content:
+            sb.append(self._redacted_dump("< ", self._response.content.decode('utf-8')))
+        return '\n'.join(sb)
+
+    @staticmethod
+    def _mask(m: Dict[str, any]):
+        for k in m:
+            if k in {'bytes_value', 'string_value', 'token_value', 'value', 'content'}:
+                m[k] = "**REDACTED**"
+
+    @staticmethod
+    def _map_keys(m: Dict[str, any]) -> List[str]:
+        keys = list(m.keys())
+        keys.sort()
+        return keys
+
+    @staticmethod
+    def _only_n_bytes(j: str, num_bytes: int = 96) -> str:
+        diff = len(j.encode('utf-8')) - num_bytes
+        if diff > 0:
+            return f"{j[:num_bytes]}... ({diff} more bytes)"
+        return j
+
+    def _recursive_marshal_dict(self, m, budget) -> dict:
+        out = {}
+        self._mask(m)
+        for k in sorted(m.keys()):
+            raw = self._recursive_marshal(m[k], budget)
+            out[k] = raw
+            budget -= len(str(raw))
+        return out
+
+    def _recursive_marshal_list(self, s, budget) -> list:
+        out = []
+        for i in range(len(s)):
+            if i > 0 >= budget:
+                out.append("... (%d additional elements)" % (len(s) - len(out)))
+                break
+            raw = self._recursive_marshal(s[i], budget)
+            out.append(raw)
+            budget -= len(str(raw))
+        return out
+
+    def _recursive_marshal(self, v: any, budget: int) -> any:
+        if isinstance(v, dict):
+            return self._recursive_marshal_dict(v, budget)
+        elif isinstance(v, list):
+            return self._recursive_marshal_list(v, budget)
+        elif isinstance(v, str):
+            return self._only_n_bytes(v, self._debug_truncate_bytes)
+        else:
+            return v
+
+    def _redacted_dump(self, prefix: str, body: str) -> str:
+        if len(body) == 0:
+            return ""
+        try:
+            # Unmarshal body into primitive types.
+            tmp = json.loads(body)
+            max_bytes = 96
+            if self._debug_truncate_bytes > max_bytes:
+                max_bytes = self._debug_truncate_bytes
+            # Re-marshal body taking redaction and character limit into account.
+            raw = self._recursive_marshal(tmp, max_bytes)
+            return "\n".join([f'{prefix}{line}' for line in json.dumps(raw, indent=2).split("\n")])
+        except json.JSONDecodeError:
+            to_log = self._only_n_bytes(body, self._debug_truncate_bytes)
+            log_lines = [prefix + x.strip('\r') for x in to_log.split("\n")]
+            return '\n'.join(log_lines)
diff --git a/tests/test_errors.py b/tests/test_errors.py
index 87111c4b4..1dfcfaf26 100644
--- a/tests/test_errors.py
+++ b/tests/test_errors.py
@@ -1,97 +1,117 @@
+import http.client
+import json
+from typing import List, Optional, Tuple
+
 import pytest
 import requests
 
 from databricks.sdk import errors
 
 
-def fake_response(status_code: int) -> requests.Response:
+def fake_response(method: str,
+                  status_code: int,
+                  response_body: str,
+                  path: Optional[str] = None) -> requests.Response:
     resp = requests.Response()
     resp.status_code = status_code
-    resp.request = requests.Request('GET', 'https://databricks.com/api/2.0/service').prepare()
+    resp.reason = http.client.responses.get(status_code, '')
+    if path is None:
+        path = '/api/2.0/service'
+    resp.request = requests.Request(method, f"https://databricks.com{path}").prepare()
+    resp._content = response_body.encode('utf-8')
     return resp
 
 
-def test_error_code_has_precedence_over_http_status():
-    err = errors.error_mapper(fake_response(400), {
-        'error_code': 'INVALID_PARAMETER_VALUE',
-        'message': 'nope'
-    })
-    assert errors.InvalidParameterValue == type(err)
-
-
-def test_http_status_code_maps_fine():
-    err = errors.error_mapper(fake_response(400), {'error_code': 'MALFORMED_REQUEST', 'message': 'nope'})
-    assert errors.BadRequest == type(err)
-
-
-def test_other_errors_also_map_fine():
-    err = errors.error_mapper(fake_response(417), {'error_code': 'WHOOPS', 'message': 'nope'})
-    assert errors.DatabricksError == type(err)
-
+def fake_valid_response(method: str,
+                        status_code: int,
+                        error_code: str,
+                        message: str,
+                        path: Optional[str] = None) -> requests.Response:
+    body = {'message': message}
+    if error_code:
+        body['error_code'] = error_code
+    return fake_response(method, status_code, json.dumps(body), path)
 
-def test_missing_error_code():
-    err = errors.error_mapper(fake_response(522), {'message': 'nope'})
-    assert errors.DatabricksError == type(err)
 
-
-def test_private_link_error():
+def make_private_link_response() -> requests.Response:
     resp = requests.Response()
     resp.url = 'https://databricks.com/login.html?error=private-link-validation-error'
     resp.request = requests.Request('GET', 'https://databricks.com/api/2.0/service').prepare()
-    err = errors.error_mapper(resp, {})
-    assert errors.PrivateLinkValidationError == type(err)
-
-
-@pytest.mark.parametrize('status_code, error_code, klass',
-                         [(400, ..., errors.BadRequest), (400, 'INVALID_PARAMETER_VALUE', errors.BadRequest),
-                          (400, 'INVALID_PARAMETER_VALUE', errors.InvalidParameterValue),
-                          (400, 'REQUEST_LIMIT_EXCEEDED', errors.TooManyRequests), (400, ..., IOError),
-                          (401, ..., errors.Unauthenticated), (401, ..., IOError),
-                          (403, ..., errors.PermissionDenied),
-                          (403, ..., IOError), (404, ..., errors.NotFound), (404, ..., IOError),
-                          (409, ..., errors.ResourceConflict), (409, 'ABORTED', errors.Aborted),
-                          (409, 'ABORTED', errors.ResourceConflict),
-                          (409, 'ALREADY_EXISTS', errors.AlreadyExists),
-                          (409, 'ALREADY_EXISTS', errors.ResourceConflict), (409, ..., IOError),
-                          (429, ..., errors.TooManyRequests),
-                          (429, 'REQUEST_LIMIT_EXCEEDED', errors.TooManyRequests),
-                          (429, 'REQUEST_LIMIT_EXCEEDED', errors.RequestLimitExceeded),
-                          (429, 'RESOURCE_EXHAUSTED', errors.TooManyRequests),
-                          (429, 'RESOURCE_EXHAUSTED', errors.ResourceExhausted), (429, ..., IOError),
-                          (499, ..., errors.Cancelled), (499, ..., IOError), (500, ..., errors.InternalError),
-                          (500, 'UNKNOWN', errors.InternalError), (500, 'UNKNOWN', errors.Unknown),
-                          (500, 'DATA_LOSS', errors.InternalError), (500, 'DATA_LOSS', errors.DataLoss),
-                          (500, ..., IOError), (501, ..., errors.NotImplemented), (501, ..., IOError),
-                          (503, ..., errors.TemporarilyUnavailable), (503, ..., IOError),
-                          (504, ..., errors.DeadlineExceeded), (504, ..., IOError),
-                          (444, ..., errors.DatabricksError), (444, ..., IOError), ])
-def test_subclasses(status_code, error_code, klass):
-    try:
-        raise errors.error_mapper(fake_response(status_code), {'error_code': error_code, 'message': 'nope'})
-    except klass:
-        return
+    resp._content = b'{}'
+    resp.status_code = 200
+    return resp
 
 
-@pytest.mark.parametrize('verb, path, status_code, error_code, message, expected_error',
-                         [[
-                             'GET', '/api/2.0/clusters/get', 400, 'INVALID_PARAMETER_VALUE',
-                             'Cluster abcde does not exist', errors.ResourceDoesNotExist
-                         ],
-                          [
-                              'GET', '/api/2.0/jobs/get', 400, 'INVALID_PARAMETER_VALUE',
-                              'Job abcde does not exist', errors.ResourceDoesNotExist
-                          ],
-                          [
-                              'GET', '/api/2.1/jobs/get', 400, 'INVALID_PARAMETER_VALUE',
-                              'Job abcde does not exist', errors.ResourceDoesNotExist
-                          ],
-                          [
-                              'GET', '/api/2.1/jobs/get', 400, 'INVALID_PARAMETER_VALUE',
-                              'Invalid spark version', errors.InvalidParameterValue
-                          ], ])
-def test_error_overrides(verb, path, status_code, error_code, message, expected_error):
-    resp = requests.Response()
-    resp.status_code = status_code
-    resp.request = requests.Request(verb, f'https://databricks.com{path}').prepare()
-    with pytest.raises(expected_error):
-        raise errors.error_mapper(resp, {'error_code': error_code, 'message': message})
+# This should be `(int, str, type)` but doesn't work in Python 3.7-3.8.
+base_subclass_test_cases: List[Tuple[int, str,
+                                     type]] = [(400, '', errors.BadRequest),
+                                               (400, 'INVALID_PARAMETER_VALUE', errors.BadRequest),
+                                               (400, 'INVALID_PARAMETER_VALUE', errors.InvalidParameterValue),
+                                               (400, 'REQUEST_LIMIT_EXCEEDED', errors.TooManyRequests),
+                                               (400, '', IOError), (401, '', errors.Unauthenticated),
+                                               (401, '', IOError), (403, '', errors.PermissionDenied),
+                                               (403, '', IOError), (404, '', errors.NotFound),
+                                               (404, '', IOError), (409, '', errors.ResourceConflict),
+                                               (409, 'ABORTED', errors.Aborted),
+                                               (409, 'ABORTED', errors.ResourceConflict),
+                                               (409, 'ALREADY_EXISTS', errors.AlreadyExists),
+                                               (409, 'ALREADY_EXISTS', errors.ResourceConflict),
+                                               (409, '', IOError), (429, '', errors.TooManyRequests),
+                                               (429, 'REQUEST_LIMIT_EXCEEDED', errors.TooManyRequests),
+                                               (429, 'REQUEST_LIMIT_EXCEEDED', errors.RequestLimitExceeded),
+                                               (429, 'RESOURCE_EXHAUSTED', errors.TooManyRequests),
+                                               (429, 'RESOURCE_EXHAUSTED', errors.ResourceExhausted),
+                                               (429, '', IOError), (499, '', errors.Cancelled),
+                                               (499, '', IOError), (500, '', errors.InternalError),
+                                               (500, 'UNKNOWN', errors.InternalError),
+                                               (500, 'UNKNOWN', errors.Unknown),
+                                               (500, 'DATA_LOSS', errors.InternalError),
+                                               (500, 'DATA_LOSS', errors.DataLoss), (500, '', IOError),
+                                               (501, '', errors.NotImplemented), (501, '', IOError),
+                                               (503, '', errors.TemporarilyUnavailable), (503, '', IOError),
+                                               (504, '', errors.DeadlineExceeded), (504, '', IOError),
+                                               (444, '', errors.DatabricksError), (444, '', IOError), ]
+
+subclass_test_cases = [(fake_valid_response('GET', x[0], x[1], 'nope'), x[2], 'nope')
+                       for x in base_subclass_test_cases]
+
+
+@pytest.mark.parametrize(
+    'response, expected_error, expected_message', subclass_test_cases +
+    [(fake_response('GET', 400, ''), errors.BadRequest, 'Bad Request'),
+     (fake_valid_response('GET', 417, 'WHOOPS', 'nope'), errors.DatabricksError, 'nope'),
+     (fake_valid_response('GET', 522, '', 'nope'), errors.DatabricksError, 'nope'),
+     (make_private_link_response(), errors.PrivateLinkValidationError,
+      ('The requested workspace has AWS PrivateLink enabled and is not accessible from the current network. '
+       'Ensure that AWS PrivateLink is properly configured and that your device has access to the AWS VPC '
+       'endpoint. For more information, see '
+       'https://docs.databricks.com/en/security/network/classic/privatelink.html.'),
+      ),
+     (fake_valid_response(
+         'GET', 400, 'INVALID_PARAMETER_VALUE', 'Cluster abcde does not exist',
+         '/api/2.0/clusters/get'), errors.ResourceDoesNotExist, 'Cluster abcde does not exist'),
+     (fake_valid_response('GET', 400, 'INVALID_PARAMETER_VALUE', 'Job abcde does not exist',
+                          '/api/2.0/jobs/get'), errors.ResourceDoesNotExist, 'Job abcde does not exist'),
+     (fake_valid_response('GET', 400, 'INVALID_PARAMETER_VALUE', 'Job abcde does not exist',
+                          '/api/2.1/jobs/get'), errors.ResourceDoesNotExist, 'Job abcde does not exist'),
+     (fake_valid_response('GET', 400, 'INVALID_PARAMETER_VALUE', 'Invalid spark version',
+                          '/api/2.1/jobs/get'), errors.InvalidParameterValue, 'Invalid spark version'),
+     (fake_response(
+         'GET', 400,
+         'MALFORMED_REQUEST: vpc_endpoints malformed parameters: VPC Endpoint ... with use_case ... cannot be attached in ... list'
+     ), errors.BadRequest,
+      'vpc_endpoints malformed parameters: VPC Endpoint ... with use_case ... cannot be attached in ... list'
+      ),
+     (fake_response('GET', 400, '
Worker environment not ready
'), errors.BadRequest, + 'Worker environment not ready'), + (fake_response('GET', 400, 'this is not a real response'), errors.BadRequest, + ('unable to parse response. This is likely a bug in the Databricks SDK for Python or the underlying API. ' + 'Please report this issue with the following debugging information to the SDK issue tracker at ' + 'https://github.com/databricks/databricks-sdk-go/issues. Request log:```GET /api/2.0/service\n' + '< 400 Bad Request\n' + '< this is not a real response```')), ]) +def test_get_api_error(response, expected_error, expected_message): + with pytest.raises(errors.DatabricksError) as e: + raise errors.get_api_error(response) + assert isinstance(e.value, expected_error) + assert str(e.value) == expected_message