Skip to content

Commit

Permalink
Add timeout + retry settings to Sessions/Snapshots (#6536)
Browse files Browse the repository at this point in the history
  • Loading branch information
crwilcox authored and tseaver committed Nov 20, 2018
1 parent 74dc48d commit 6eecfdc
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 8 deletions.
7 changes: 5 additions & 2 deletions spanner/google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

# pylint: disable=ungrouped-imports
from google.api_core.exceptions import Aborted, GoogleAPICallError, NotFound
import google.api_core.gapic_v1.method
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.snapshot import Snapshot
Expand Down Expand Up @@ -197,7 +198,9 @@ def read(self, table, columns, keyset, index='', limit=0):
"""
return self.snapshot().read(table, columns, keyset, index, limit)

def execute_sql(self, sql, params=None, param_types=None, query_mode=None):
def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT):
"""Perform an ``ExecuteStreamingSql`` API request.
:type sql: str
Expand All @@ -222,7 +225,7 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None):
:returns: a result set instance which can be used to consume rows.
"""
return self.snapshot().execute_sql(
sql, params, param_types, query_mode)
sql, params, param_types, query_mode, retry=retry, timeout=timeout)

def batch(self):
"""Factory to create a batch for this session.
Expand Down
9 changes: 7 additions & 2 deletions spanner/google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from google.cloud.spanner_v1.proto.transaction_pb2 import TransactionSelector

from google.api_core.exceptions import ServiceUnavailable
import google.api_core.gapic_v1.method
from google.cloud._helpers import _datetime_to_pb_timestamp
from google.cloud._helpers import _timedelta_to_duration_pb
from google.cloud.spanner_v1._helpers import _make_value_pb
Expand Down Expand Up @@ -143,7 +144,9 @@ def read(self, table, columns, keyset, index='', limit=0, partition=None):
return StreamedResultSet(iterator)

def execute_sql(self, sql, params=None, param_types=None,
query_mode=None, partition=None):
query_mode=None, partition=None,
retry=google.api_core.gapic_v1.method.DEFAULT,
timeout=google.api_core.gapic_v1.method.DEFAULT):
"""Perform an ``ExecuteStreamingSql`` API request.
:type sql: str
Expand Down Expand Up @@ -204,7 +207,9 @@ def execute_sql(self, sql, params=None, param_types=None,
query_mode=query_mode,
partition_token=partition,
seqno=self._execute_sql_count,
metadata=metadata)
metadata=metadata,
retry=retry,
timeout=timeout)

iterator = _restart_on_unavailable(restart)

Expand Down
34 changes: 33 additions & 1 deletion spanner/tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


import unittest

import google.api_core.gapic_v1.method
import mock


Expand Down Expand Up @@ -371,6 +371,36 @@ def test_execute_sql_defaults(self):
None,
None,
None,
timeout=google.api_core.gapic_v1.method.DEFAULT,
retry=google.api_core.gapic_v1.method.DEFAULT,
)

def test_execute_sql_non_default_retry(self):
from google.protobuf.struct_pb2 import Struct, Value
from google.cloud.spanner_v1.proto.type_pb2 import STRING

SQL = 'SELECT first_name, age FROM citizens'
database = self._make_database()
session = self._make_one(database)
session._session_id = 'DEADBEEF'

params = Struct(fields={'foo': Value(string_value='bar')})
param_types = {'foo': STRING}

with mock.patch(
'google.cloud.spanner_v1.session.Snapshot') as snapshot:
found = session.execute_sql(
SQL, params, param_types, 'PLAN', retry=None, timeout=None)

self.assertIs(found, snapshot().execute_sql.return_value)

snapshot().execute_sql.assert_called_once_with(
SQL,
params,
param_types,
'PLAN',
timeout=None,
retry=None
)

def test_execute_sql_explicit(self):
Expand All @@ -397,6 +427,8 @@ def test_execute_sql_explicit(self):
params,
param_types,
'PLAN',
timeout=google.api_core.gapic_v1.method.DEFAULT,
retry=google.api_core.gapic_v1.method.DEFAULT,
)

def test_batch_not_created(self):
Expand Down
16 changes: 13 additions & 3 deletions spanner/tests/unit/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


import unittest

import google.api_core.gapic_v1.method
import mock


Expand Down Expand Up @@ -333,7 +333,9 @@ def test_execute_sql_w_params_wo_param_types(self):
derived.execute_sql(SQL_QUERY_WITH_PARAM, PARAMS)

def _execute_sql_helper(
self, multi_use, first=True, count=0, partition=None, sql_count=0):
self, multi_use, first=True, count=0, partition=None, sql_count=0,
timeout=google.api_core.gapic_v1.method.DEFAULT,
retry=google.api_core.gapic_v1.method.DEFAULT):
from google.protobuf.struct_pb2 import Struct
from google.cloud.spanner_v1.proto.result_set_pb2 import (
PartialResultSet, ResultSetMetadata, ResultSetStats)
Expand Down Expand Up @@ -380,7 +382,7 @@ def _execute_sql_helper(

result_set = derived.execute_sql(
SQL_QUERY_WITH_PARAM, PARAMS, PARAM_TYPES,
query_mode=MODE, partition=partition)
query_mode=MODE, partition=partition, retry=retry, timeout=timeout)

self.assertEqual(derived._read_request_count, count + 1)

Expand Down Expand Up @@ -417,6 +419,8 @@ def _execute_sql_helper(
partition_token=partition,
seqno=sql_count,
metadata=[('google-cloud-resource-prefix', database.name)],
timeout=timeout,
retry=retry,
)

self.assertEqual(derived._execute_sql_count, sql_count + 1)
Expand All @@ -441,6 +445,12 @@ def test_execute_sql_w_multi_use_w_first_w_count_gt_0(self):
with self.assertRaises(ValueError):
self._execute_sql_helper(multi_use=True, first=True, count=1)

def test_execute_sql_w_retry(self):
self._execute_sql_helper(multi_use=False, retry=None)

def test_execute_sql_w_timeout(self):
self._execute_sql_helper(multi_use=False, timeout=None)

def _partition_read_helper(
self, multi_use, w_txn,
size=None, max_partitions=None, index=None):
Expand Down

0 comments on commit 6eecfdc

Please sign in to comment.