From 12fa9fb5823a8d66ddd2fbfdd6dc252388bf3cda Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 16 Apr 2024 12:09:07 -0500 Subject: [PATCH] fix: retry query job after ambiguous failures --- google/cloud/bigquery/retry.py | 26 ++++- tests/unit/test_job_retry.py | 168 ++++++++++++++++++++++++++++++++- 2 files changed, 189 insertions(+), 5 deletions(-) diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index 01b127972..9e3489f6d 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -39,7 +39,7 @@ # Allow for a few retries after the API request times out. This relevant for # rateLimitExceeded errors, which can be raised either by the Google load # balancer or the BigQuery job server. -_DEFAULT_JOB_DEADLINE = 3.0 * _DEFAULT_RETRY_DEADLINE +_DEFAULT_JOB_DEADLINE = 4.0 * _DEFAULT_RETRY_DEADLINE def _should_retry(exc): @@ -73,10 +73,32 @@ def _should_retry(exc): deadline on the retry object. """ -job_retry_reasons = "rateLimitExceeded", "backendError", "jobRateLimitExceeded" +job_retry_reasons = ( + "rateLimitExceeded", + "backendError", + "internalError", + "jobRateLimitExceeded", +) def _job_should_retry(exc): + # Sometimes we have ambiguous errors, such as 'backendError' which could + # be due to an API problem or a job problem. For these, make sure we retry + # our is_job_done function. + # + # Note: This won't restart the job unless we know for sure it's because of + # the job status and set restart_query_job = True in that loop. This means + # that we might end up calling this predicate twice for the same job + # but from different paths: (1) from jobs.getQueryResults RetryError and + # (2) from translating the job error from the body of a jobs.get response. + # + # Note: If we start retrying job types other than queries where we don't + # call the problematic getQueryResults API to check the status, we need + # to provide a different predicate, as there shouldn't be ambiguous + # errors in those cases. + if isinstance(exc, exceptions.RetryError): + exc = exc.cause + if not hasattr(exc, "errors") or len(exc.errors) == 0: return False diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index 52c994ca4..fcb2180c3 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -24,7 +24,7 @@ from google.cloud.bigquery.client import Client from google.cloud.bigquery import _job_helpers -from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY +import google.cloud.bigquery.retry from .helpers import make_connection @@ -126,6 +126,168 @@ def api_request(method, path, query_params=None, data=None, **kw): assert job.job_id == orig_job_id +def test_query_retry_with_default_retry_and_ambiguous_errors_only_retries_with_failed_job( + client, monkeypatch +): + """ + Some errors like 'rateLimitExceeded' can be ambiguous. Make sure we only + retry the job when we know for sure that the job has failed for a retriable + reason. We can only be sure after a "successful" call to jobs.get to fetch + the failed job status. + """ + job_counter = 0 + + def make_job_id(*args, **kwargs): + nonlocal job_counter + job_counter += 1 + return f"{job_counter}" + + monkeypatch.setattr(_job_helpers, "make_job_id", make_job_id) + + project = client.project + job_reference_1 = {"projectId": project, "jobId": "1", "location": "test-loc"} + job_reference_2 = {"projectId": project, "jobId": "2", "location": "test-loc"} + NUM_API_RETRIES = 2 + + # This error is modeled after a real customer exception in + # https://github.com/googleapis/python-bigquery/issues/707. + internal_error = google.api_core.exceptions.InternalServerError( + "Job failed just because...", + errors=[ + {"reason": "internalError"}, + ], + ) + responses = [ + # jobs.insert + {"jobReference": job_reference_1, "status": {"state": "PENDING"}}, + # jobs.get + {"jobReference": job_reference_1, "status": {"state": "RUNNING"}}, + # jobs.getQueryResults x2 + # + # Note: internalError is ambiguous in jobs.getQueryResults. The + # problem could be at the Google Frontend leve or it could be because + # the job has failed due to some transient issues and the BigQuery + # REST API is translating the job failed status into failure HTTP + # codes. + # + # TODO(GH#1903): We shouldn't retry nearly this many times when we get + # ambiguous errors from jobs.getQueryResults. + # See: https://github.com/googleapis/python-bigquery/issues/1903 + internal_error, + internal_error, + # jobs.get -- the job has failed + { + "jobReference": job_reference_1, + "status": {"state": "DONE", "errorResult": {"reason": "internalError"}}, + }, + # jobs.insert + {"jobReference": job_reference_2, "status": {"state": "PENDING"}}, + # jobs.get + {"jobReference": job_reference_2, "status": {"state": "RUNNING"}}, + # jobs.getQueryResults + {"jobReference": job_reference_2, "jobComplete": True}, + # jobs.get + {"jobReference": job_reference_2, "status": {"state": "DONE"}}, + ] + + conn = client._connection = make_connection() + conn.api_request.side_effect = responses + + with freezegun.freeze_time( + # Note: because of exponential backoff and a bit of jitter, + # NUM_API_RETRIES will get less accurate the greater the value. + # We add 1 because we know there will be at least some additional + # calls to fetch the time / sleep before the retry deadline is hit. + auto_tick_seconds=( + google.cloud.bigquery.retry._DEFAULT_RETRY_DEADLINE / NUM_API_RETRIES + ) + + 1, + ): + job = client.query("select 1") + job.result() + + conn.api_request.assert_has_calls( + [ + # jobs.insert + mock.call( + method="POST", + path="/projects/PROJECT/jobs", + data={ + "jobReference": {"jobId": "1", "projectId": "PROJECT"}, + "configuration": { + "query": {"useLegacySql": False, "query": "select 1"} + }, + }, + timeout=None, + ), + # jobs.get + mock.call( + method="GET", + path="/projects/PROJECT/jobs/1", + query_params={"location": "test-loc"}, + timeout=None, + ), + # jobs.getQueryResults x2 + mock.call( + method="GET", + path="/projects/PROJECT/queries/1", + query_params={"maxResults": 0, "location": "test-loc"}, + timeout=None, + ), + mock.call( + method="GET", + path="/projects/PROJECT/queries/1", + query_params={"maxResults": 0, "location": "test-loc"}, + timeout=None, + ), + # jobs.get -- verify that the job has failed + mock.call( + method="GET", + path="/projects/PROJECT/jobs/1", + query_params={"location": "test-loc"}, + timeout=None, + ), + # jobs.insert + mock.call( + method="POST", + path="/projects/PROJECT/jobs", + data={ + "jobReference": { + # Make sure that we generated a new job ID. + "jobId": "2", + "projectId": "PROJECT", + }, + "configuration": { + "query": {"useLegacySql": False, "query": "select 1"} + }, + }, + timeout=None, + ), + # jobs.get + mock.call( + method="GET", + path="/projects/PROJECT/jobs/2", + query_params={"location": "test-loc"}, + timeout=None, + ), + # jobs.getQueryResults + mock.call( + method="GET", + path="/projects/PROJECT/queries/2", + query_params={"maxResults": 0, "location": "test-loc"}, + timeout=None, + ), + # jobs.get + mock.call( + method="GET", + path="/projects/PROJECT/jobs/2", + query_params={"location": "test-loc"}, + timeout=None, + ), + ] + ) + + # With job_retry_on_query, we're testing 4 scenarios: # - Pass None retry to `query`. # - Pass None retry to `result`. @@ -301,8 +463,8 @@ def test_query_and_wait_retries_job_for_DDL_queries(): job_config=None, page_size=None, max_results=None, - retry=DEFAULT_JOB_RETRY, - job_retry=DEFAULT_JOB_RETRY, + retry=google.cloud.bigquery.retry.DEFAULT_RETRY, + job_retry=google.cloud.bigquery.retry.DEFAULT_JOB_RETRY, ) assert len(list(rows)) == 4