Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spanner: add backoff for `run_in_transaction' when backend does not provide 'RetryInfo' in response. #8461

Merged
merged 6 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions spanner/google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ def run_in_transaction(self, func, *args, **kw):
reraises any non-ABORT execptions raised by ``func``.
"""
deadline = time.time() + kw.pop("timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS)
attempts = 0

while True:
if self._transaction is None:
Expand All @@ -291,11 +292,13 @@ def run_in_transaction(self, func, *args, **kw):
txn = self._transaction
if txn._transaction_id is None:
txn.begin()

try:
attempts += 1
return_value = func(txn, *args, **kw)
except Aborted as exc:
del self._transaction
_delay_until_retry(exc, deadline)
_delay_until_retry(exc, deadline, attempts)
continue
except GoogleAPICallError:
del self._transaction
Expand All @@ -308,7 +311,7 @@ def run_in_transaction(self, func, *args, **kw):
txn.commit()
except Aborted as exc:
del self._transaction
_delay_until_retry(exc, deadline)
_delay_until_retry(exc, deadline, attempts)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
except GoogleAPICallError:
del self._transaction
raise
Expand All @@ -320,7 +323,7 @@ def run_in_transaction(self, func, *args, **kw):
#
# Rational: this function factors out complex shared deadline / retry
# handling from two `except:` clauses.
def _delay_until_retry(exc, deadline):
def _delay_until_retry(exc, deadline, attempts):
"""Helper for :meth:`Session.run_in_transaction`.

Detect retryable abort, and impose server-supplied delay.
Expand All @@ -330,6 +333,9 @@ def _delay_until_retry(exc, deadline):

:type deadline: float
:param deadline: maximum timestamp to continue retrying the transaction.

:type attempts: int
:param attempts: number of call retries
"""
cause = exc.errors[0]

Expand All @@ -338,7 +344,7 @@ def _delay_until_retry(exc, deadline):
if now >= deadline:
raise

delay = _get_retry_delay(cause)
delay = _get_retry_delay(cause, attempts)
if delay is not None:

if now + delay > deadline:
Expand All @@ -350,14 +356,17 @@ def _delay_until_retry(exc, deadline):
# pylint: enable=misplaced-bare-raise


def _get_retry_delay(cause):
def _get_retry_delay(cause, attempts):
"""Helper for :func:`_delay_until_retry`.

:type exc: :class:`grpc.Call`
:param exc: exception for aborted transaction

:rtype: float
:returns: seconds to wait before retrying the transaction.

:type attempts: int
:param attempts: number of call retries
"""
metadata = dict(cause.trailing_metadata())
retry_info_pb = metadata.get("google.rpc.retryinfo-bin")
Expand All @@ -366,3 +375,5 @@ def _get_retry_delay(cause):
retry_info.ParseFromString(retry_info_pb)
nanos = retry_info.retry_delay.nanos
return retry_info.retry_delay.seconds + nanos / 1.0e9

return 2 ** attempts
crwilcox marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 9 additions & 7 deletions spanner/tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,18 +1033,20 @@ def unit_of_work(txn, *args, **kw):
called_with.append((txn, args, kw))
txn.insert(TABLE_NAME, COLUMNS, VALUES)

# retry once w/ timeout_secs=1
def _time(_results=[1, 1.5, 2.5]):
# retry several times to check backoff
def _time(_results=[1, 2, 4, 8]):
return _results.pop(0)

with mock.patch("time.time", _time):
with mock.patch("time.sleep") as sleep_mock:
with self.assertRaises(Aborted):
session.run_in_transaction(unit_of_work, timeout_secs=1)
session.run_in_transaction(unit_of_work, timeout_secs=8)

sleep_mock.assert_not_called()
sleep_mock.assert_has_calls(
(mock.call(2), mock.call(4))
)

self.assertEqual(len(called_with), 2)
self.assertEqual(len(called_with), 3)
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
for txn, args, kw in called_with:
self.assertIsInstance(txn, Transaction)
self.assertIsNone(txn.committed)
Expand All @@ -1061,7 +1063,7 @@ def _time(_results=[1, 1.5, 2.5]):
metadata=[("google-cloud-resource-prefix", database.name)],
)
]
* 2,
* 3,
)
self.assertEqual(
gax_api.commit.call_args_list,
Expand All @@ -1073,5 +1075,5 @@ def _time(_results=[1, 1.5, 2.5]):
metadata=[("google-cloud-resource-prefix", database.name)],
)
]
* 2,
* 3,
)