Skip to content

Commit

Permalink
Add backoff for `run_in_transaction' when backend does not provide 'R…
Browse files Browse the repository at this point in the history
…etryInfo' in response. (#8461)
  • Loading branch information
Gurov Ilya authored and tseaver committed Jul 8, 2019
1 parent b504293 commit 56358af
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 12 deletions.
22 changes: 17 additions & 5 deletions spanner/google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.snapshot import Snapshot
from google.cloud.spanner_v1.transaction import Transaction
import random

# pylint: enable=ungrouped-imports

Expand Down Expand Up @@ -283,6 +284,7 @@ def run_in_transaction(self, func, *args, **kw):
reraises any non-ABORT execptions raised by ``func``.
"""
deadline = time.time() + kw.pop("timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS)
attempts = 0

while True:
if self._transaction is None:
Expand All @@ -291,11 +293,13 @@ def run_in_transaction(self, func, *args, **kw):
txn = self._transaction
if txn._transaction_id is None:
txn.begin()

try:
attempts += 1
return_value = func(txn, *args, **kw)
except Aborted as exc:
del self._transaction
_delay_until_retry(exc, deadline)
_delay_until_retry(exc, deadline, attempts)
continue
except GoogleAPICallError:
del self._transaction
Expand All @@ -308,7 +312,7 @@ def run_in_transaction(self, func, *args, **kw):
txn.commit()
except Aborted as exc:
del self._transaction
_delay_until_retry(exc, deadline)
_delay_until_retry(exc, deadline, attempts)
except GoogleAPICallError:
del self._transaction
raise
Expand All @@ -320,7 +324,7 @@ def run_in_transaction(self, func, *args, **kw):
#
# Rational: this function factors out complex shared deadline / retry
# handling from two `except:` clauses.
def _delay_until_retry(exc, deadline):
def _delay_until_retry(exc, deadline, attempts):
"""Helper for :meth:`Session.run_in_transaction`.
Detect retryable abort, and impose server-supplied delay.
Expand All @@ -330,6 +334,9 @@ def _delay_until_retry(exc, deadline):
:type deadline: float
:param deadline: maximum timestamp to continue retrying the transaction.
:type attempts: int
:param attempts: number of call retries
"""
cause = exc.errors[0]

Expand All @@ -338,7 +345,7 @@ def _delay_until_retry(exc, deadline):
if now >= deadline:
raise

delay = _get_retry_delay(cause)
delay = _get_retry_delay(cause, attempts)
if delay is not None:

if now + delay > deadline:
Expand All @@ -350,14 +357,17 @@ def _delay_until_retry(exc, deadline):
# pylint: enable=misplaced-bare-raise


def _get_retry_delay(cause):
def _get_retry_delay(cause, attempts):
"""Helper for :func:`_delay_until_retry`.
:type exc: :class:`grpc.Call`
:param exc: exception for aborted transaction
:rtype: float
:returns: seconds to wait before retrying the transaction.
:type attempts: int
:param attempts: number of call retries
"""
metadata = dict(cause.trailing_metadata())
retry_info_pb = metadata.get("google.rpc.retryinfo-bin")
Expand All @@ -366,3 +376,5 @@ def _get_retry_delay(cause):
retry_info.ParseFromString(retry_info_pb)
nanos = retry_info.retry_delay.nanos
return retry_info.retry_delay.seconds + nanos / 1.0e9

return 2 ** attempts + random.random()
44 changes: 37 additions & 7 deletions spanner/tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1033,18 +1033,22 @@ def unit_of_work(txn, *args, **kw):
called_with.append((txn, args, kw))
txn.insert(TABLE_NAME, COLUMNS, VALUES)

# retry once w/ timeout_secs=1
def _time(_results=[1, 1.5, 2.5]):
# retry several times to check backoff
def _time(_results=[1, 2, 4, 8]):
return _results.pop(0)

with mock.patch("time.time", _time):
with mock.patch("time.sleep") as sleep_mock:
with self.assertRaises(Aborted):
session.run_in_transaction(unit_of_work, timeout_secs=1)
session.run_in_transaction(unit_of_work, timeout_secs=8)

sleep_mock.assert_not_called()
# unpacking call args into list
call_args = [call_[0][0] for call_ in sleep_mock.call_args_list]
call_args = list(map(int, call_args))
assert call_args == [2, 4]
assert sleep_mock.call_count == 2

self.assertEqual(len(called_with), 2)
self.assertEqual(len(called_with), 3)
for txn, args, kw in called_with:
self.assertIsInstance(txn, Transaction)
self.assertIsNone(txn.committed)
Expand All @@ -1061,7 +1065,7 @@ def _time(_results=[1, 1.5, 2.5]):
metadata=[("google-cloud-resource-prefix", database.name)],
)
]
* 2,
* 3,
)
self.assertEqual(
gax_api.commit.call_args_list,
Expand All @@ -1073,5 +1077,31 @@ def _time(_results=[1, 1.5, 2.5]):
metadata=[("google-cloud-resource-prefix", database.name)],
)
]
* 2,
* 3,
)

def test_delay_helper_w_no_delay(self):
from google.cloud.spanner_v1.session import _delay_until_retry

metadata_mock = mock.Mock()
metadata_mock.trailing_metadata.return_value = {}

exc_mock = mock.Mock(errors=[metadata_mock])

def _time_func():
return 3

# check if current time > deadline
with mock.patch("time.time", _time_func):
with self.assertRaises(Exception):
_delay_until_retry(exc_mock, 2, 1)

with mock.patch("time.time", _time_func):
with mock.patch(
"google.cloud.spanner_v1.session._get_retry_delay"
) as get_retry_delay_mock:
with mock.patch("time.sleep") as sleep_mock:
get_retry_delay_mock.return_value = None

_delay_until_retry(exc_mock, 6, 1)
sleep_mock.assert_not_called()

0 comments on commit 56358af

Please sign in to comment.