diff --git a/.changes/unreleased/Fixes-20220331-143923.yaml b/.changes/unreleased/Fixes-20220331-143923.yaml new file mode 100644 index 00000000000..f42d0a00241 --- /dev/null +++ b/.changes/unreleased/Fixes-20220331-143923.yaml @@ -0,0 +1,7 @@ +kind: Fixes +body: Catch more cases to retry package retrieval for deps pointing to the hub. Also start to cache the package requests. +time: 2022-03-31T14:39:23.952705-05:00 +custom: + Author: emmyoop + Issue: "4849" + PR: "4982" diff --git a/core/dbt/clients/registry.py b/core/dbt/clients/registry.py index cf9003e4f2e..af68dae55fe 100644 --- a/core/dbt/clients/registry.py +++ b/core/dbt/clients/registry.py @@ -1,9 +1,16 @@ import functools +from typing import Any, Dict, List import requests from dbt.events.functions import fire_event from dbt.events.types import ( RegistryProgressMakingGETRequest, - RegistryProgressGETResponse + RegistryProgressGETResponse, + RegistryIndexProgressMakingGETRequest, + RegistryIndexProgressGETResponse, + RegistryResponseUnexpectedType, + RegistryResponseMissingTopKeys, + RegistryResponseMissingNestedKeys, + RegistryResponseExtraNestedKeys, ) from dbt.utils import memoized, _connection_exception_retry as connection_exception_retry from dbt import deprecations @@ -15,56 +22,78 @@ DEFAULT_REGISTRY_BASE_URL = 'https://hub.getdbt.com/' -def _get_url(url, registry_base_url=None): +def _get_url(name, registry_base_url=None): if registry_base_url is None: registry_base_url = DEFAULT_REGISTRY_BASE_URL + url = "api/v1/{}.json".format(name) return '{}{}'.format(registry_base_url, url) -def _get_with_retries(path, registry_base_url=None): - get_fn = functools.partial(_get, path, registry_base_url) +def _get_with_retries(package_name, registry_base_url=None): + get_fn = functools.partial(_get_cached, package_name, registry_base_url) return connection_exception_retry(get_fn, 5) -def _get(path, registry_base_url=None): - url = _get_url(path, registry_base_url) +def _get(package_name, registry_base_url=None): + url = _get_url(package_name, registry_base_url) fire_event(RegistryProgressMakingGETRequest(url=url)) + # all exceptions from requests get caught in the retry logic so no need to wrap this here resp = requests.get(url, timeout=30) fire_event(RegistryProgressGETResponse(url=url, resp_code=resp.status_code)) resp.raise_for_status() - # It is unexpected for the content of the response to be None so if it is, raising this error - # will cause this function to retry (if called within _get_with_retries) and hopefully get - # a response. This seems to happen when there's an issue with the Hub. + # The response should always be a dictionary. Anything else is unexpected, raise error. + # Raising this error will cause this function to retry (if called within _get_with_retries) + # and hopefully get a valid response. This seems to happen when there's an issue with the Hub. + # Since we control what we expect the HUB to return, this is safe. # See https://github.com/dbt-labs/dbt-core/issues/4577 - if resp.json() is None: - raise requests.exceptions.ContentDecodingError( - 'Request error: The response is None', response=resp - ) - return resp.json() - - -def index(registry_base_url=None): - return _get_with_retries('api/v1/index.json', registry_base_url) - - -index_cached = memoized(index) - - -def packages(registry_base_url=None): - return _get_with_retries('api/v1/packages.json', registry_base_url) - + # and https://github.com/dbt-labs/dbt-core/issues/4849 + response = resp.json() -def package(name, registry_base_url=None): - response = _get_with_retries('api/v1/{}.json'.format(name), registry_base_url) + if not isinstance(response, dict): # This will also catch Nonetype + error_msg = ( + f"Request error: Expected a response type of but got {type(response)} instead" + ) + fire_event(RegistryResponseUnexpectedType(response=response)) + raise requests.exceptions.ContentDecodingError(error_msg, response=resp) + + # check for expected top level keys + expected_keys = {"name", "versions"} + if not expected_keys.issubset(response): + error_msg = ( + f"Request error: Expected the response to contain keys {expected_keys} " + f"but is missing {expected_keys.difference(set(response))}" + ) + fire_event(RegistryResponseMissingTopKeys(response=response)) + raise requests.exceptions.ContentDecodingError(error_msg, response=resp) + + # check for the keys we need nested under each version + expected_version_keys = {"name", "packages", "downloads"} + all_keys = set().union(*(response["versions"][d] for d in response["versions"])) + if not expected_version_keys.issubset(all_keys): + error_msg = ( + "Request error: Expected the response for the version to contain keys " + f"{expected_version_keys} but is missing {expected_version_keys.difference(all_keys)}" + ) + fire_event(RegistryResponseMissingNestedKeys(response=response)) + raise requests.exceptions.ContentDecodingError(error_msg, response=resp) + + # all version responses should contain identical keys. + has_extra_keys = set().difference(*(response["versions"][d] for d in response["versions"])) + if has_extra_keys: + error_msg = ( + "Request error: Keys for all versions do not match. Found extra key(s) " + f"of {has_extra_keys}." + ) + fire_event(RegistryResponseExtraNestedKeys(response=response)) + raise requests.exceptions.ContentDecodingError(error_msg, response=resp) # Either redirectnamespace or redirectname in the JSON response indicate a redirect # redirectnamespace redirects based on package ownership # redirectname redirects based on package name # Both can be present at the same time, or neither. Fails gracefully to old name - - if ('redirectnamespace' in response) or ('redirectname' in response): + if ("redirectnamespace" in response) or ("redirectname" in response): if ('redirectnamespace' in response) and response['redirectnamespace'] is not None: use_namespace = response['redirectnamespace'] @@ -77,15 +106,59 @@ def package(name, registry_base_url=None): use_name = response['name'] new_nwo = use_namespace + "/" + use_name - deprecations.warn('package-redirect', old_name=name, new_name=new_nwo) + deprecations.warn("package-redirect", old_name=package_name, new_name=new_nwo) + + return response + + +_get_cached = memoized(_get) + + +def package(package_name, registry_base_url=None) -> Dict[str, Any]: + # returns a dictionary of metadata for all versions of a package + response = _get_with_retries(package_name, registry_base_url) + return response["versions"] + + +def package_version(package_name, version, registry_base_url=None) -> Dict[str, Any]: + # returns the metadata of a specific version of a package + response = package(package_name, registry_base_url) + return response[version] + + +def get_available_versions(package_name) -> List["str"]: + # returns a list of all available versions of a package + response = package(package_name) + return list(response) + + +def _get_index(registry_base_url=None): + + url = _get_url("index", registry_base_url) + fire_event(RegistryIndexProgressMakingGETRequest(url=url)) + # all exceptions from requests get caught in the retry logic so no need to wrap this here + resp = requests.get(url, timeout=30) + fire_event(RegistryIndexProgressGETResponse(url=url, resp_code=resp.status_code)) + resp.raise_for_status() + + # The response should be a list. Anything else is unexpected, raise an error. + # Raising this error will cause this function to retry and hopefully get a valid response. + + response = resp.json() + + if not isinstance(response, list): # This will also catch Nonetype + error_msg = ( + f"Request error: The response type of {type(response)} is not valid: {resp.text}" + ) + raise requests.exceptions.ContentDecodingError(error_msg, response=resp) return response -def package_version(name, version, registry_base_url=None): - return _get_with_retries('api/v1/{}/{}.json'.format(name, version), registry_base_url) +def index(registry_base_url=None) -> List[str]: + # this returns a list of all packages on the Hub + get_index_fn = functools.partial(_get_index, registry_base_url) + return connection_exception_retry(get_index_fn, 5) -def get_available_versions(name): - response = package(name) - return list(response['versions']) +index_cached = memoized(index) diff --git a/core/dbt/events/types.py b/core/dbt/events/types.py index 8b377bf9073..f1b178ca28a 100644 --- a/core/dbt/events/types.py +++ b/core/dbt/events/types.py @@ -301,6 +301,25 @@ def message(self) -> str: return f" Checked out at {self.end_sha}." +@dataclass +class RegistryIndexProgressMakingGETRequest(DebugLevel, Cli, File): + url: str + code: str = "M022" + + def message(self) -> str: + return f"Making package index registry request: GET {self.url}" + + +@dataclass +class RegistryIndexProgressGETResponse(DebugLevel, Cli, File): + url: str + resp_code: int + code: str = "M023" + + def message(self) -> str: + return f"Response from registry index: GET {self.url} {self.resp_code}" + + @dataclass class RegistryProgressMakingGETRequest(DebugLevel, Cli, File): url: str @@ -320,6 +339,45 @@ def message(self) -> str: return f"Response from registry: GET {self.url} {self.resp_code}" +@dataclass +class RegistryResponseUnexpectedType(DebugLevel, File): + response: str + code: str = "M024" + + def message(self) -> str: + return f"Response was None: {self.response}" + + +@dataclass +class RegistryResponseMissingTopKeys(DebugLevel, File): + response: str + code: str = "M025" + + def message(self) -> str: + # expected/actual keys logged in exception + return f"Response missing top level keys: {self.response}" + + +@dataclass +class RegistryResponseMissingNestedKeys(DebugLevel, File): + response: str + code: str = "M026" + + def message(self) -> str: + # expected/actual keys logged in exception + return f"Response missing nested keys: {self.response}" + + +@dataclass +class RegistryResponseExtraNestedKeys(DebugLevel, File): + response: str + code: str = "M027" + + def message(self) -> str: + # expected/actual keys logged in exception + return f"Response contained inconsistent keys: {self.response}" + + # TODO this was actually `logger.exception(...)` not `logger.error(...)` @dataclass class SystemErrorRetrievingModTime(ErrorLevel, Cli, File): @@ -2457,6 +2515,14 @@ def message(self) -> str: GitNothingToDo(sha="") GitProgressUpdatedCheckoutRange(start_sha="", end_sha="") GitProgressCheckedOutAt(end_sha="") + RegistryIndexProgressMakingGETRequest(url="") + RegistryIndexProgressGETResponse(url="", resp_code=1234) + RegistryProgressMakingGETRequest(url="") + RegistryProgressGETResponse(url="", resp_code=1234) + RegistryResponseUnexpectedType(response=""), + RegistryResponseMissingTopKeys(response=""), + RegistryResponseMissingNestedKeys(response=""), + RegistryResponseExtraNestedKeys(response=""), SystemErrorRetrievingModTime(path="") SystemCouldNotWrite(path="", reason="", exc=Exception("")) SystemExecutingCmd(cmd=[""]) diff --git a/test/unit/test_events.py b/test/unit/test_events.py index cdfb65ff963..a1b390fa8ce 100644 --- a/test/unit/test_events.py +++ b/test/unit/test_events.py @@ -389,6 +389,12 @@ def MockNode(): PrintDebugStackTrace(), MainReportArgs(Namespace()), RegistryProgressMakingGETRequest(''), + RegistryIndexProgressMakingGETRequest(''), + RegistryIndexProgressGETResponse(url="", resp_code=1), + RegistryResponseUnexpectedType(''), + RegistryResponseMissingTopKeys(''), + RegistryResponseMissingNestedKeys(''), + RegistryResponseExtraNestedKeys(''), DepsUTD(), PartialParsingNotEnabled(), SQlRunnerException(Exception('')),