diff --git a/tests/unit/test_job_retry.py b/tests/unit/test_job_retry.py index d7049c5ca..998e0e9f6 100644 --- a/tests/unit/test_job_retry.py +++ b/tests/unit/test_job_retry.py @@ -24,7 +24,7 @@ from google.cloud.bigquery.client import Client from google.cloud.bigquery import _job_helpers -from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY +from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY from .helpers import make_connection @@ -35,8 +35,7 @@ # - Pass NotFound retry to `result`. # - Pass BadRequest retry to query, with the value passed to `result` overriding. @pytest.mark.parametrize("job_retry_on_query", [None, "Query", "Result", "Both"]) -@mock.patch("time.sleep") -def test_retry_failed_jobs(sleep, client, job_retry_on_query): +def test_retry_failed_jobs(job_retry_on_query): """ Test retry of job failures, as opposed to API-invocation failures. """ @@ -53,171 +52,208 @@ def test_retry_failed_jobs(sleep, client, job_retry_on_query): ) if job_retry_on_query is None: - reason = "rateLimitExceeded" + errs = [{"reason": "rateLimitExceeded"}] else: - reason = "notFound" - - err = dict(reason=reason) - responses = [ - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE")), - dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), - ] - - def api_request(method, path, query_params=None, data=None, **kw): - response = responses.pop(0) - if data: - response["jobReference"] = data["jobReference"] - else: - response["jobReference"] = dict( - jobId=path.split("/")[-1], projectId="PROJECT" - ) - return response - - conn = client._connection = make_connection() - conn.api_request.side_effect = api_request + errs = [{"reason": "notFound"}] + + freezegun.freeze_time(auto_tick_seconds=1) + client = mock.create_autospec(Client) + client._call_api.__name__ = "_call_api" + client._call_api.__qualname__ = "Client._call_api" + client._call_api.__annotations__ = {} + client._call_api.__type_params__ = () + client._call_api.side_effect = ( + { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "jobComplete": False, + }, + google.api_core.exceptions.InternalServerError("job_retry me", errors=errs), + { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "jobComplete": True, + "schema": { + "fields": [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INT64", "mode": "NULLABLE"}, + ], + }, + "rows": [ + {"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]}, + {"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]}, + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ], + }, + ) if job_retry_on_query == "Query": - job_retry = dict(job_retry=retry_notfound) + job_retry = retry_notfound elif job_retry_on_query == "Both": # This will be overridden in `result` - job_retry = dict(job_retry=retry_badrequest) + job_retry = retry_badrequest else: - job_retry = {} - job = client.query("select 1", **job_retry) - - orig_job_id = job.job_id - job_retry = ( - dict(job_retry=retry_notfound) - if job_retry_on_query in ("Result", "Both") - else {} - ) - result = job.result(**job_retry) - assert result.total_rows == 1 - assert not responses # We made all the calls we expected to. - - # The job adjusts it's job id based on the id of the last attempt. - assert job.job_id != orig_job_id - assert job.job_id == conn.mock_calls[3][2]["data"]["jobReference"]["jobId"] + job_retry = None - # We had to sleep three times - assert len(sleep.mock_calls) == 3 - - # Sleeps are random, however they're more than 0 - assert min(c[1][0] for c in sleep.mock_calls) > 0 - - # They're at most 2 * (multiplier**(number of sleeps - 1)) * initial - # The default multiplier is 2 - assert max(c[1][0] for c in sleep.mock_calls) <= 8 - - # We can ask for the result again: - responses = [ - dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), - ] - orig_job_id = job.job_id - result = job.result() - assert result.total_rows == 1 - assert not responses # We made all the calls we expected to. + rows = _job_helpers.query_and_wait( + client, + query="SELECT 1", + location="request-location", + project="request-project", + job_config=None, + page_size=None, + max_results=None, + retry=DEFAULT_RETRY, + job_retry=job_retry, + ) - # We wouldn't (and didn't) fail, because we're dealing with a successful job. - # So the job id hasn't changed. - assert job.job_id == orig_job_id + assert len(list(rows)) == 4 # With job_retry_on_query, we're testing 4 scenarios: # - Pass None retry to `query`. # - Pass None retry to `result`. @pytest.mark.parametrize("job_retry_on_query", ["Query", "Result"]) -@mock.patch("time.sleep") -def test_disable_retry_failed_jobs(sleep, client, job_retry_on_query): +def test_disable_retry_failed_jobs(job_retry_on_query): """ Test retry of job failures, as opposed to API-invocation failures. """ - err = dict(reason="rateLimitExceeded") - responses = [dict(status=dict(state="DONE", errors=[err], errorResult=err))] * 3 - - def api_request(method, path, query_params=None, data=None, **kw): - response = responses.pop(0) - response["jobReference"] = data["jobReference"] - return response - - conn = client._connection = make_connection() - conn.api_request.side_effect = api_request - if job_retry_on_query == "Query": - job_retry = dict(job_retry=None) - else: - job_retry = {} - job = client.query("select 1", **job_retry) + freezegun.freeze_time(auto_tick_seconds=1) + client = mock.create_autospec(Client) + client._call_api.__name__ = "_call_api" + client._call_api.__qualname__ = "Client._call_api" + client._call_api.__annotations__ = {} + client._call_api.__type_params__ = () + client._call_api.side_effect = ( + { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "jobComplete": False, + }, + google.api_core.exceptions.InternalServerError( + "job_retry me", errors=[{"reason": "rateLimitExceeded"}] + ), + ) - orig_job_id = job.job_id - job_retry = dict(job_retry=None) if job_retry_on_query == "Result" else {} - with pytest.raises(google.api_core.exceptions.Forbidden): - job.result(**job_retry) + rows = _job_helpers.query_and_wait( + client, + query="SELECT 1", + location="request-location", + project="request-project", + job_config=None, + page_size=None, + max_results=None, + retry=None, # Explicitly disable retry + job_retry=None, + ) - assert job.job_id == orig_job_id - assert len(sleep.mock_calls) == 0 + with pytest.raises(google.api_core.exceptions.InternalServerError): + list(rows) # Raise the last error -@mock.patch("time.sleep") -def test_retry_failed_jobs_after_retry_failed(sleep, client): +def test_retry_failed_jobs_after_retry_failed(client): """ If at first you don't succeed, maybe you will later. :) """ - conn = client._connection = make_connection() - - with freezegun.freeze_time("2024-01-01 00:00:00") as frozen_datetime: - err = dict(reason="rateLimitExceeded") - - def api_request(method, path, query_params=None, data=None, **kw): - calls = sleep.mock_calls - if calls: - frozen_datetime.tick(delta=datetime.timedelta(seconds=calls[-1][1][0])) - response = dict(status=dict(state="DONE", errors=[err], errorResult=err)) - response["jobReference"] = data["jobReference"] - return response - - conn.api_request.side_effect = api_request - - job = client.query("select 1") - orig_job_id = job.job_id - - with pytest.raises(google.api_core.exceptions.RetryError): - job.result() - - # We never got a successful job, so the job id never changed: - assert job.job_id == orig_job_id - - # We failed because we couldn't succeed after 120 seconds. - # But we can try again: - err2 = dict(reason="backendError") # We also retry on this - responses = [ - dict(status=dict(state="DONE", errors=[err2], errorResult=err2)), - dict(status=dict(state="DONE", errors=[err], errorResult=err)), - dict(status=dict(state="DONE", errors=[err2], errorResult=err2)), - dict(status=dict(state="DONE")), - dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"), - ] - - def api_request(method, path, query_params=None, data=None, **kw): - calls = sleep.mock_calls - frozen_datetime.tick(delta=datetime.timedelta(seconds=calls[-1][1][0])) - response = responses.pop(0) - if data: - response["jobReference"] = data["jobReference"] - else: - response["jobReference"] = dict( - jobId=path.split("/")[-1], projectId="PROJECT" - ) - return response - - conn.api_request.side_effect = api_request - result = job.result() - assert result.total_rows == 1 - assert not responses # We made all the calls we expected to. - assert job.job_id != orig_job_id + + freezegun.freeze_time(auto_tick_seconds=1) + client = mock.create_autospec(Client) + client._call_api.__name__ = "_call_api" + client._call_api.__qualname__ = "Client._call_api" + client._call_api.__annotations__ = {} + client._call_api.__type_params__ = () + client._call_api.side_effect = ( + { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "jobComplete": False, + }, + google.api_core.exceptions.InternalServerError( + "job_retry me", errors=[{"reason": "rateLimitExceeded"}] + ), + # Responses for subsequent success + { + "jobReference": { + "jobId": "job1", + "projectId": "project", + "location": "location", + }, + "jobComplete": False, + }, + google.api_core.exceptions.BadRequest( + "job_retry me", errors=[{"reason": "backendError"}] + ), + google.api_core.exceptions.InternalServerError( + "job_retry me", errors=[{"reason": "rateLimitExceeded"}] + ), + google.api_core.exceptions.BadRequest( + "job_retry me", errors=[{"reason": "backendError"}] + ), + { + "jobReference": { + "projectId": "response-project", + "jobId": "abc", + "location": "response-location", + }, + "jobComplete": True, + "schema": { + "fields": [ + {"name": "full_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "age", "type": "INT64", "mode": "NULLABLE"}, + ], + }, + "rows": [ + {"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]}, + {"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]}, + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ], + }, + ) + + rows = _job_helpers.query_and_wait( + client, + query="SELECT 1", + location="request-location", + project="request-project", + job_config=None, + page_size=None, + max_results=None, + retry=DEFAULT_RETRY, + job_retry=DEFAULT_JOB_RETRY, + ) + # TODO: different test to test if it retries until it times out + with pytest.raises(google.api_core.exceptions.RetryError): + list(rows) # Trigger the initial retry failure + + # Second attempt with successful retries + rows = _job_helpers.query_and_wait( + client, + query="SELECT 1", + location="request-location", + project="request-project", + job_config=None, + page_size=None, + max_results=None, + retry=DEFAULT_RETRY, + job_retry=DEFAULT_RETRY, + ) + + assert len(list(rows)) == 4 def test_raises_on_job_retry_on_query_with_non_retryable_jobs(client): @@ -301,7 +337,7 @@ def test_query_and_wait_retries_job_for_DDL_queries(): job_config=None, page_size=None, max_results=None, - retry=DEFAULT_JOB_RETRY, + retry=DEFAULT_RETRY, job_retry=DEFAULT_JOB_RETRY, ) assert len(list(rows)) == 4