Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-352] catch and retry malformed json #4982

Merged
merged 14 commits into from
Apr 5, 2022
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20220331-143923.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Catch more cases to retry package retrieval for deps pointing to the hub. Also start to cache the package requests.
time: 2022-03-31T14:39:23.952705-05:00
custom:
Author: emmyoop
Issue: "4849"
PR: "4982"
146 changes: 111 additions & 35 deletions core/dbt/clients/registry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
import functools
from typing import Any, Dict, List
import requests
from dbt.events.functions import fire_event
from dbt.events.types import RegistryProgressMakingGETRequest, RegistryProgressGETResponse
from dbt.events.types import (
RegistryProgressMakingGETRequest,
RegistryProgressGETResponse,
RegistryIndexProgressMakingGETRequest,
RegistryIndexProgressGETResponse,
RegistryResponseUnexpectedType,
RegistryResponseMissingTopKeys,
RegistryResponseMissingNestedKeys,
RegistryResponseExtraNestedKeys,
)
from dbt.utils import memoized, _connection_exception_retry as connection_exception_retry
from dbt import deprecations
import os
Expand All @@ -12,55 +22,77 @@
DEFAULT_REGISTRY_BASE_URL = "https://hub.getdbt.com/"


def _get_url(url, registry_base_url=None):
def _get_url(name, registry_base_url=None):
if registry_base_url is None:
registry_base_url = DEFAULT_REGISTRY_BASE_URL
url = "api/v1/{}.json".format(name)

return "{}{}".format(registry_base_url, url)


def _get_with_retries(path, registry_base_url=None):
get_fn = functools.partial(_get, path, registry_base_url)
def _get_with_retries(package_name, registry_base_url=None):
get_fn = functools.partial(_get_cached, package_name, registry_base_url)
return connection_exception_retry(get_fn, 5)


def _get(path, registry_base_url=None):
url = _get_url(path, registry_base_url)
def _get(package_name, registry_base_url=None):
url = _get_url(package_name, registry_base_url)
fire_event(RegistryProgressMakingGETRequest(url=url))
# all exceptions from requests get caught in the retry logic so no need to wrap this here
resp = requests.get(url, timeout=30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this line throw at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But we're catching all exceptions from requests in the retry logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add a comment to that effect then.

fire_event(RegistryProgressGETResponse(url=url, resp_code=resp.status_code))
resp.raise_for_status()

# It is unexpected for the content of the response to be None so if it is, raising this error
# will cause this function to retry (if called within _get_with_retries) and hopefully get
# a response. This seems to happen when there's an issue with the Hub.
# The response should always be a dictionary. Anything else is unexpected, raise error.
# Raising this error will cause this function to retry (if called within _get_with_retries)
# and hopefully get a valid response. This seems to happen when there's an issue with the Hub.
# Since we control what we expect the HUB to return, this is safe.
# See https://github.com/dbt-labs/dbt-core/issues/4577
if resp.json() is None:
raise requests.exceptions.ContentDecodingError(
"Request error: The response is None", response=resp
)
return resp.json()


def index(registry_base_url=None):
return _get_with_retries("api/v1/index.json", registry_base_url)


index_cached = memoized(index)


def packages(registry_base_url=None):
return _get_with_retries("api/v1/packages.json", registry_base_url)

# and https://github.com/dbt-labs/dbt-core/issues/4849
response = resp.json()

def package(name, registry_base_url=None):
response = _get_with_retries("api/v1/{}.json".format(name), registry_base_url)
if not isinstance(response, dict): # This will also catch Nonetype
error_msg = (
f"Request error: Expected a response type of <dict> but got {type(response)} instead"
)
fire_event(RegistryResponseUnexpectedType(response=response))
raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

# check for expected top level keys
expected_keys = {"name", "versions"}
if not expected_keys.issubset(response):
error_msg = (
f"Request error: Expected the response to contain keys {expected_keys} "
f"but is missing {expected_keys.difference(set(response))}"
)
fire_event(RegistryResponseMissingTopKeys(response=response))
raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

# check for the keys we need nested under each version
expected_version_keys = {"name", "packages", "downloads"}
all_keys = set().union(*(response["versions"][d] for d in response["versions"]))
if not expected_version_keys.issubset(all_keys):
error_msg = (
"Request error: Expected the response for the version to contain keys "
f"{expected_version_keys} but is missing {expected_version_keys.difference(all_keys)}"
)
fire_event(RegistryResponseMissingNestedKeys(response=response))
raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

# all version responses should contain identical keys.
has_extra_keys = set().difference(*(response["versions"][d] for d in response["versions"]))
if has_extra_keys:
error_msg = (
"Request error: Keys for all versions do not match. Found extra key(s) "
f"of {has_extra_keys}."
)
fire_event(RegistryResponseExtraNestedKeys(response=response))
raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

# Either redirectnamespace or redirectname in the JSON response indicate a redirect
# redirectnamespace redirects based on package ownership
# redirectname redirects based on package name
# Both can be present at the same time, or neither. Fails gracefully to old name

if ("redirectnamespace" in response) or ("redirectname" in response):

if ("redirectnamespace" in response) and response["redirectnamespace"] is not None:
Expand All @@ -74,15 +106,59 @@ def package(name, registry_base_url=None):
use_name = response["name"]

new_nwo = use_namespace + "/" + use_name
deprecations.warn("package-redirect", old_name=name, new_name=new_nwo)
deprecations.warn("package-redirect", old_name=package_name, new_name=new_nwo)

return response


_get_cached = memoized(_get)


def package(package_name, registry_base_url=None) -> Dict[str, Any]:
# returns a dictionary of metadata for all versions of a package
response = _get_with_retries(package_name, registry_base_url)
return response["versions"]


def package_version(package_name, version, registry_base_url=None) -> Dict[str, Any]:
# returns the metadata of a specific version of a package
response = package(package_name, registry_base_url)
return response[version]


def get_available_versions(package_name) -> List["str"]:
# returns a list of all available versions of a package
response = package(package_name)
return list(response)


def _get_index(registry_base_url=None):

url = _get_url("index", registry_base_url)
fire_event(RegistryIndexProgressMakingGETRequest(url=url))
# all exceptions from requests get caught in the retry logic so no need to wrap this here
resp = requests.get(url, timeout=30)
fire_event(RegistryIndexProgressGETResponse(url=url, resp_code=resp.status_code))
resp.raise_for_status()

# The response should be a list. Anything else is unexpected, raise an error.
# Raising this error will cause this function to retry and hopefully get a valid response.

response = resp.json()

if not isinstance(response, list): # This will also catch Nonetype
error_msg = (
f"Request error: The response type of {type(response)} is not valid: {resp.text}"
)
raise requests.exceptions.ContentDecodingError(error_msg, response=resp)

return response


def package_version(name, version, registry_base_url=None):
return _get_with_retries("api/v1/{}/{}.json".format(name, version), registry_base_url)
def index(registry_base_url=None) -> List[str]:
# this returns a list of all packages on the Hub
get_index_fn = functools.partial(_get_index, registry_base_url)
return connection_exception_retry(get_index_fn, 5)


def get_available_versions(name):
response = package(name)
return list(response["versions"])
index_cached = memoized(index)
66 changes: 66 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,25 @@ def message(self) -> str:
return f" Checked out at {self.end_sha}."


@dataclass
class RegistryIndexProgressMakingGETRequest(DebugLevel):
url: str
code: str = "M022"

def message(self) -> str:
return f"Making package index registry request: GET {self.url}"


@dataclass
class RegistryIndexProgressGETResponse(DebugLevel):
url: str
resp_code: int
code: str = "M023"

def message(self) -> str:
return f"Response from registry index: GET {self.url} {self.resp_code}"


@dataclass
class RegistryProgressMakingGETRequest(DebugLevel):
url: str
Expand All @@ -310,6 +329,45 @@ def message(self) -> str:
return f"Response from registry: GET {self.url} {self.resp_code}"


@dataclass
class RegistryResponseUnexpectedType(DebugLevel):
response: str
code: str = "M024"

def message(self) -> str:
return f"Response was None: {self.response}"


@dataclass
class RegistryResponseMissingTopKeys(DebugLevel):
response: str
code: str = "M025"

def message(self) -> str:
# expected/actual keys logged in exception
return f"Response missing top level keys: {self.response}"


@dataclass
class RegistryResponseMissingNestedKeys(DebugLevel):
response: str
code: str = "M026"

def message(self) -> str:
# expected/actual keys logged in exception
return f"Response missing nested keys: {self.response}"


@dataclass
class RegistryResponseExtraNestedKeys(DebugLevel):
response: str
code: str = "M027"

def message(self) -> str:
# expected/actual keys logged in exception
return f"Response contained inconsistent keys: {self.response}"


# TODO this was actually `logger.exception(...)` not `logger.error(...)`
@dataclass
class SystemErrorRetrievingModTime(ErrorLevel):
Expand Down Expand Up @@ -2422,6 +2480,14 @@ def message(self) -> str:
GitNothingToDo(sha="")
GitProgressUpdatedCheckoutRange(start_sha="", end_sha="")
GitProgressCheckedOutAt(end_sha="")
RegistryIndexProgressMakingGETRequest(url="")
RegistryIndexProgressGETResponse(url="", resp_code=1234)
RegistryProgressMakingGETRequest(url="")
RegistryProgressGETResponse(url="", resp_code=1234)
RegistryResponseUnexpectedType(response=""),
RegistryResponseMissingTopKeys(response=""),
RegistryResponseMissingNestedKeys(response=""),
RegistryResponseExtraNestedKeys(response=""),
SystemErrorRetrievingModTime(path="")
SystemCouldNotWrite(path="", reason="", exc=Exception(""))
SystemExecutingCmd(cmd=[""])
Expand Down
6 changes: 6 additions & 0 deletions test/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,12 @@ def MockNode():
PrintDebugStackTrace(),
MainReportArgs(args={}),
RegistryProgressMakingGETRequest(url=''),
RegistryIndexProgressMakingGETRequest(url=""),
RegistryIndexProgressGETResponse(url="", resp_code=1),
RegistryResponseUnexpectedType(response=""),
RegistryResponseMissingTopKeys(response=""),
RegistryResponseMissingNestedKeys(response=""),
RegistryResponseExtraNestedKeys(response=""),
DepsUTD(),
PartialParsingNotEnabled(),
SQlRunnerException(exc=Exception('')),
Expand Down
1 change: 0 additions & 1 deletion tests/functional/permission/test_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def test_create_schema_permissions(
project,
):
# now it should work!
# breakpoint()
project.run_sql("grant create on database {} to noaccess".format(project.database))
project.run_sql(
'grant usage, create on schema "{}" to noaccess'.format(project.test_schema)
Expand Down