Skip to content

Commit

Permalink
fix: retry query job after ambiguous failures
Browse files Browse the repository at this point in the history
  • Loading branch information
tswast committed Apr 16, 2024
1 parent 6a43cfd commit 12fa9fb
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 5 deletions.
26 changes: 24 additions & 2 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
# Allow for a few retries after the API request times out. This relevant for
# rateLimitExceeded errors, which can be raised either by the Google load
# balancer or the BigQuery job server.
_DEFAULT_JOB_DEADLINE = 3.0 * _DEFAULT_RETRY_DEADLINE
_DEFAULT_JOB_DEADLINE = 4.0 * _DEFAULT_RETRY_DEADLINE


def _should_retry(exc):
Expand Down Expand Up @@ -73,10 +73,32 @@ def _should_retry(exc):
deadline on the retry object.
"""

job_retry_reasons = "rateLimitExceeded", "backendError", "jobRateLimitExceeded"
job_retry_reasons = (
"rateLimitExceeded",
"backendError",
"internalError",
"jobRateLimitExceeded",
)


def _job_should_retry(exc):
# Sometimes we have ambiguous errors, such as 'backendError' which could
# be due to an API problem or a job problem. For these, make sure we retry
# our is_job_done function.
#
# Note: This won't restart the job unless we know for sure it's because of
# the job status and set restart_query_job = True in that loop. This means
# that we might end up calling this predicate twice for the same job
# but from different paths: (1) from jobs.getQueryResults RetryError and
# (2) from translating the job error from the body of a jobs.get response.
#
# Note: If we start retrying job types other than queries where we don't
# call the problematic getQueryResults API to check the status, we need
# to provide a different predicate, as there shouldn't be ambiguous
# errors in those cases.
if isinstance(exc, exceptions.RetryError):
exc = exc.cause

if not hasattr(exc, "errors") or len(exc.errors) == 0:
return False

Expand Down
168 changes: 165 additions & 3 deletions tests/unit/test_job_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from google.cloud.bigquery.client import Client
from google.cloud.bigquery import _job_helpers
from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY
import google.cloud.bigquery.retry

from .helpers import make_connection

Expand Down Expand Up @@ -126,6 +126,168 @@ def api_request(method, path, query_params=None, data=None, **kw):
assert job.job_id == orig_job_id


def test_query_retry_with_default_retry_and_ambiguous_errors_only_retries_with_failed_job(
client, monkeypatch
):
"""
Some errors like 'rateLimitExceeded' can be ambiguous. Make sure we only
retry the job when we know for sure that the job has failed for a retriable
reason. We can only be sure after a "successful" call to jobs.get to fetch
the failed job status.
"""
job_counter = 0

def make_job_id(*args, **kwargs):
nonlocal job_counter
job_counter += 1
return f"{job_counter}"

monkeypatch.setattr(_job_helpers, "make_job_id", make_job_id)

project = client.project
job_reference_1 = {"projectId": project, "jobId": "1", "location": "test-loc"}
job_reference_2 = {"projectId": project, "jobId": "2", "location": "test-loc"}
NUM_API_RETRIES = 2

# This error is modeled after a real customer exception in
# https://github.com/googleapis/python-bigquery/issues/707.
internal_error = google.api_core.exceptions.InternalServerError(
"Job failed just because...",
errors=[
{"reason": "internalError"},
],
)
responses = [
# jobs.insert
{"jobReference": job_reference_1, "status": {"state": "PENDING"}},
# jobs.get
{"jobReference": job_reference_1, "status": {"state": "RUNNING"}},
# jobs.getQueryResults x2
#
# Note: internalError is ambiguous in jobs.getQueryResults. The
# problem could be at the Google Frontend leve or it could be because
# the job has failed due to some transient issues and the BigQuery
# REST API is translating the job failed status into failure HTTP
# codes.
#
# TODO(GH#1903): We shouldn't retry nearly this many times when we get
# ambiguous errors from jobs.getQueryResults.
# See: https://github.com/googleapis/python-bigquery/issues/1903
internal_error,
internal_error,
# jobs.get -- the job has failed
{
"jobReference": job_reference_1,
"status": {"state": "DONE", "errorResult": {"reason": "internalError"}},
},
# jobs.insert
{"jobReference": job_reference_2, "status": {"state": "PENDING"}},
# jobs.get
{"jobReference": job_reference_2, "status": {"state": "RUNNING"}},
# jobs.getQueryResults
{"jobReference": job_reference_2, "jobComplete": True},
# jobs.get
{"jobReference": job_reference_2, "status": {"state": "DONE"}},
]

conn = client._connection = make_connection()
conn.api_request.side_effect = responses

with freezegun.freeze_time(
# Note: because of exponential backoff and a bit of jitter,
# NUM_API_RETRIES will get less accurate the greater the value.
# We add 1 because we know there will be at least some additional
# calls to fetch the time / sleep before the retry deadline is hit.
auto_tick_seconds=(
google.cloud.bigquery.retry._DEFAULT_RETRY_DEADLINE / NUM_API_RETRIES
)
+ 1,
):
job = client.query("select 1")
job.result()

conn.api_request.assert_has_calls(
[
# jobs.insert
mock.call(
method="POST",
path="/projects/PROJECT/jobs",
data={
"jobReference": {"jobId": "1", "projectId": "PROJECT"},
"configuration": {
"query": {"useLegacySql": False, "query": "select 1"}
},
},
timeout=None,
),
# jobs.get
mock.call(
method="GET",
path="/projects/PROJECT/jobs/1",
query_params={"location": "test-loc"},
timeout=None,
),
# jobs.getQueryResults x2
mock.call(
method="GET",
path="/projects/PROJECT/queries/1",
query_params={"maxResults": 0, "location": "test-loc"},
timeout=None,
),
mock.call(
method="GET",
path="/projects/PROJECT/queries/1",
query_params={"maxResults": 0, "location": "test-loc"},
timeout=None,
),
# jobs.get -- verify that the job has failed
mock.call(
method="GET",
path="/projects/PROJECT/jobs/1",
query_params={"location": "test-loc"},
timeout=None,
),
# jobs.insert
mock.call(
method="POST",
path="/projects/PROJECT/jobs",
data={
"jobReference": {
# Make sure that we generated a new job ID.
"jobId": "2",
"projectId": "PROJECT",
},
"configuration": {
"query": {"useLegacySql": False, "query": "select 1"}
},
},
timeout=None,
),
# jobs.get
mock.call(
method="GET",
path="/projects/PROJECT/jobs/2",
query_params={"location": "test-loc"},
timeout=None,
),
# jobs.getQueryResults
mock.call(
method="GET",
path="/projects/PROJECT/queries/2",
query_params={"maxResults": 0, "location": "test-loc"},
timeout=None,
),
# jobs.get
mock.call(
method="GET",
path="/projects/PROJECT/jobs/2",
query_params={"location": "test-loc"},
timeout=None,
),
]
)


# With job_retry_on_query, we're testing 4 scenarios:
# - Pass None retry to `query`.
# - Pass None retry to `result`.
Expand Down Expand Up @@ -301,8 +463,8 @@ def test_query_and_wait_retries_job_for_DDL_queries():
job_config=None,
page_size=None,
max_results=None,
retry=DEFAULT_JOB_RETRY,
job_retry=DEFAULT_JOB_RETRY,
retry=google.cloud.bigquery.retry.DEFAULT_RETRY,
job_retry=google.cloud.bigquery.retry.DEFAULT_JOB_RETRY,
)
assert len(list(rows)) == 4

Expand Down

0 comments on commit 12fa9fb

Please sign in to comment.