Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix and reformat jobs retry tests #1806

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
324 changes: 180 additions & 144 deletions tests/unit/test_job_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from google.cloud.bigquery.client import Client
from google.cloud.bigquery import _job_helpers
from google.cloud.bigquery.retry import DEFAULT_JOB_RETRY
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY

from .helpers import make_connection

Expand All @@ -35,8 +35,7 @@
# - Pass NotFound retry to `result`.
# - Pass BadRequest retry to query, with the value passed to `result` overriding.
@pytest.mark.parametrize("job_retry_on_query", [None, "Query", "Result", "Both"])
@mock.patch("time.sleep")
def test_retry_failed_jobs(sleep, client, job_retry_on_query):
def test_retry_failed_jobs(job_retry_on_query):
"""
Test retry of job failures, as opposed to API-invocation failures.
"""
Expand All @@ -53,171 +52,208 @@ def test_retry_failed_jobs(sleep, client, job_retry_on_query):
)

if job_retry_on_query is None:
reason = "rateLimitExceeded"
errs = [{"reason": "rateLimitExceeded"}]
else:
reason = "notFound"

err = dict(reason=reason)
responses = [
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE")),
dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"),
]

def api_request(method, path, query_params=None, data=None, **kw):
response = responses.pop(0)
if data:
response["jobReference"] = data["jobReference"]
else:
response["jobReference"] = dict(
jobId=path.split("/")[-1], projectId="PROJECT"
)
return response

conn = client._connection = make_connection()
conn.api_request.side_effect = api_request
errs = [{"reason": "notFound"}]

freezegun.freeze_time(auto_tick_seconds=1)
client = mock.create_autospec(Client)
client._call_api.__name__ = "_call_api"
client._call_api.__qualname__ = "Client._call_api"
client._call_api.__annotations__ = {}
client._call_api.__type_params__ = ()
client._call_api.side_effect = (
{
"jobReference": {
"projectId": "response-project",
"jobId": "abc",
"location": "response-location",
},
"jobComplete": False,
},
google.api_core.exceptions.InternalServerError("job_retry me", errors=errs),
{
"jobReference": {
"projectId": "response-project",
"jobId": "abc",
"location": "response-location",
},
"jobComplete": True,
"schema": {
"fields": [
{"name": "full_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "age", "type": "INT64", "mode": "NULLABLE"},
],
},
"rows": [
{"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]},
{"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]},
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
],
},
)

if job_retry_on_query == "Query":
job_retry = dict(job_retry=retry_notfound)
job_retry = retry_notfound
elif job_retry_on_query == "Both":
# This will be overridden in `result`
job_retry = dict(job_retry=retry_badrequest)
job_retry = retry_badrequest
else:
job_retry = {}
job = client.query("select 1", **job_retry)

orig_job_id = job.job_id
job_retry = (
dict(job_retry=retry_notfound)
if job_retry_on_query in ("Result", "Both")
else {}
)
result = job.result(**job_retry)
assert result.total_rows == 1
assert not responses # We made all the calls we expected to.

# The job adjusts it's job id based on the id of the last attempt.
assert job.job_id != orig_job_id
assert job.job_id == conn.mock_calls[3][2]["data"]["jobReference"]["jobId"]
job_retry = None

# We had to sleep three times
assert len(sleep.mock_calls) == 3

# Sleeps are random, however they're more than 0
assert min(c[1][0] for c in sleep.mock_calls) > 0

# They're at most 2 * (multiplier**(number of sleeps - 1)) * initial
# The default multiplier is 2
assert max(c[1][0] for c in sleep.mock_calls) <= 8

# We can ask for the result again:
responses = [
dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"),
]
orig_job_id = job.job_id
result = job.result()
assert result.total_rows == 1
assert not responses # We made all the calls we expected to.
rows = _job_helpers.query_and_wait(
client,
query="SELECT 1",
location="request-location",
project="request-project",
job_config=None,
page_size=None,
max_results=None,
retry=DEFAULT_RETRY,
job_retry=job_retry,
)

# We wouldn't (and didn't) fail, because we're dealing with a successful job.
# So the job id hasn't changed.
assert job.job_id == orig_job_id
assert len(list(rows)) == 4


# With job_retry_on_query, we're testing 4 scenarios:
# - Pass None retry to `query`.
# - Pass None retry to `result`.
@pytest.mark.parametrize("job_retry_on_query", ["Query", "Result"])
@mock.patch("time.sleep")
def test_disable_retry_failed_jobs(sleep, client, job_retry_on_query):
def test_disable_retry_failed_jobs(job_retry_on_query):
"""
Test retry of job failures, as opposed to API-invocation failures.
"""
err = dict(reason="rateLimitExceeded")
responses = [dict(status=dict(state="DONE", errors=[err], errorResult=err))] * 3

def api_request(method, path, query_params=None, data=None, **kw):
response = responses.pop(0)
response["jobReference"] = data["jobReference"]
return response

conn = client._connection = make_connection()
conn.api_request.side_effect = api_request

if job_retry_on_query == "Query":
job_retry = dict(job_retry=None)
else:
job_retry = {}
job = client.query("select 1", **job_retry)
freezegun.freeze_time(auto_tick_seconds=1)
client = mock.create_autospec(Client)
client._call_api.__name__ = "_call_api"
client._call_api.__qualname__ = "Client._call_api"
client._call_api.__annotations__ = {}
client._call_api.__type_params__ = ()
client._call_api.side_effect = (
{
"jobReference": {
"projectId": "response-project",
"jobId": "abc",
"location": "response-location",
},
"jobComplete": False,
},
google.api_core.exceptions.InternalServerError(
"job_retry me", errors=[{"reason": "rateLimitExceeded"}]
),
)

orig_job_id = job.job_id
job_retry = dict(job_retry=None) if job_retry_on_query == "Result" else {}
with pytest.raises(google.api_core.exceptions.Forbidden):
job.result(**job_retry)
rows = _job_helpers.query_and_wait(
client,
query="SELECT 1",
location="request-location",
project="request-project",
job_config=None,
page_size=None,
max_results=None,
retry=None, # Explicitly disable retry
job_retry=None,
)

assert job.job_id == orig_job_id
assert len(sleep.mock_calls) == 0
with pytest.raises(google.api_core.exceptions.InternalServerError):
list(rows) # Raise the last error


@mock.patch("time.sleep")
def test_retry_failed_jobs_after_retry_failed(sleep, client):
def test_retry_failed_jobs_after_retry_failed(client):
"""
If at first you don't succeed, maybe you will later. :)
"""
conn = client._connection = make_connection()

with freezegun.freeze_time("2024-01-01 00:00:00") as frozen_datetime:
err = dict(reason="rateLimitExceeded")

def api_request(method, path, query_params=None, data=None, **kw):
calls = sleep.mock_calls
if calls:
frozen_datetime.tick(delta=datetime.timedelta(seconds=calls[-1][1][0]))
response = dict(status=dict(state="DONE", errors=[err], errorResult=err))
response["jobReference"] = data["jobReference"]
return response

conn.api_request.side_effect = api_request

job = client.query("select 1")
orig_job_id = job.job_id

with pytest.raises(google.api_core.exceptions.RetryError):
job.result()

# We never got a successful job, so the job id never changed:
assert job.job_id == orig_job_id

# We failed because we couldn't succeed after 120 seconds.
# But we can try again:
err2 = dict(reason="backendError") # We also retry on this
responses = [
dict(status=dict(state="DONE", errors=[err2], errorResult=err2)),
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE", errors=[err2], errorResult=err2)),
dict(status=dict(state="DONE")),
dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"),
]

def api_request(method, path, query_params=None, data=None, **kw):
calls = sleep.mock_calls
frozen_datetime.tick(delta=datetime.timedelta(seconds=calls[-1][1][0]))
response = responses.pop(0)
if data:
response["jobReference"] = data["jobReference"]
else:
response["jobReference"] = dict(
jobId=path.split("/")[-1], projectId="PROJECT"
)
return response

conn.api_request.side_effect = api_request
result = job.result()
assert result.total_rows == 1
assert not responses # We made all the calls we expected to.
assert job.job_id != orig_job_id

freezegun.freeze_time(auto_tick_seconds=1)
client = mock.create_autospec(Client)
client._call_api.__name__ = "_call_api"
client._call_api.__qualname__ = "Client._call_api"
client._call_api.__annotations__ = {}
client._call_api.__type_params__ = ()
client._call_api.side_effect = (
{
"jobReference": {
"projectId": "response-project",
"jobId": "abc",
"location": "response-location",
},
"jobComplete": False,
},
google.api_core.exceptions.InternalServerError(
"job_retry me", errors=[{"reason": "rateLimitExceeded"}]
),
# Responses for subsequent success
{
"jobReference": {
"jobId": "job1",
"projectId": "project",
"location": "location",
},
"jobComplete": False,
},
google.api_core.exceptions.BadRequest(
"job_retry me", errors=[{"reason": "backendError"}]
),
google.api_core.exceptions.InternalServerError(
"job_retry me", errors=[{"reason": "rateLimitExceeded"}]
),
google.api_core.exceptions.BadRequest(
"job_retry me", errors=[{"reason": "backendError"}]
),
{
"jobReference": {
"projectId": "response-project",
"jobId": "abc",
"location": "response-location",
},
"jobComplete": True,
"schema": {
"fields": [
{"name": "full_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "age", "type": "INT64", "mode": "NULLABLE"},
],
},
"rows": [
{"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]},
{"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]},
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
],
},
)

rows = _job_helpers.query_and_wait(
client,
query="SELECT 1",
location="request-location",
project="request-project",
job_config=None,
page_size=None,
max_results=None,
retry=DEFAULT_RETRY,
job_retry=DEFAULT_JOB_RETRY,
)
# TODO: different test to test if it retries until it times out
with pytest.raises(google.api_core.exceptions.RetryError):
list(rows) # Trigger the initial retry failure

# Second attempt with successful retries
rows = _job_helpers.query_and_wait(
client,
query="SELECT 1",
location="request-location",
project="request-project",
job_config=None,
page_size=None,
max_results=None,
retry=DEFAULT_RETRY,
job_retry=DEFAULT_RETRY,
)

assert len(list(rows)) == 4


def test_raises_on_job_retry_on_query_with_non_retryable_jobs(client):
Expand Down Expand Up @@ -301,7 +337,7 @@ def test_query_and_wait_retries_job_for_DDL_queries():
job_config=None,
page_size=None,
max_results=None,
retry=DEFAULT_JOB_RETRY,
retry=DEFAULT_RETRY,
job_retry=DEFAULT_JOB_RETRY,
)
assert len(list(rows)) == 4
Expand Down