Skip to content

Commit

Permalink
Drop 'Database.read' and 'Database.execute_sql' convenience methods. (g…
Browse files Browse the repository at this point in the history
…oogleapis#3787)

Because the context managers they use returned the session to the database's
pool, application code could not safely iterate over the result sets
returned by the methods.

Update docs for 'Snapshot.read' and 'Snapshot.execute_sql' to emphasize
iteration of their results sets before the session is returned to the
database pool (i.e., within the 'with' block which constructs the snapshot).

Closes googleapis#3769.
  • Loading branch information
tseaver authored and landrito committed Aug 22, 2017
1 parent b2927df commit 909ca9c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 138 deletions.
44 changes: 32 additions & 12 deletions docs/spanner/snapshot-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,22 @@ fails if the result set is too large,

.. code:: python
result = snapshot.read(
table='table-name', columns=['first_name', 'last_name', 'age'],
key_set=['phred@example.com', 'bharney@example.com'])
with database.snapshot() as snapshot:
result = snapshot.read(
table='table-name', columns=['first_name', 'last_name', 'age'],
key_set=['phred@example.com', 'bharney@example.com'])
for row in result.rows:
print(row)
for row in result.rows:
print(row)
.. note::

The result set returned by
:meth:`~google.cloud.spanner.snapshot.Snapshot.execute_sql` *must not* be
iterated after the snapshot's session has been returned to the database's
session pool. Therefore, unless your application creates sessions
manually, perform all iteration within the context of the
``with database.snapshot()`` block.

.. note::

Expand All @@ -68,14 +78,24 @@ fails if the result set is too large,

.. code:: python
QUERY = (
'SELECT e.first_name, e.last_name, p.telephone '
'FROM employees as e, phones as p '
'WHERE p.employee_id == e.employee_id')
result = snapshot.execute_sql(QUERY)
with database.snapshot() as snapshot:
QUERY = (
'SELECT e.first_name, e.last_name, p.telephone '
'FROM employees as e, phones as p '
'WHERE p.employee_id == e.employee_id')
result = snapshot.execute_sql(QUERY)
for row in result.rows:
print(row)
.. note::

for row in result.rows:
print(row)
The result set returned by
:meth:`~google.cloud.spanner.snapshot.Snapshot.execute_sql` *must not* be
iterated after the snapshot's session has been returned to the database's
session pool. Therefore, unless your application creates sessions
manually, perform all iteration within the context of the
``with database.snapshot()`` block.


Next Step
Expand Down
62 changes: 0 additions & 62 deletions spanner/google/cloud/spanner/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,68 +313,6 @@ def session(self):
"""
return Session(self)

def read(self, table, columns, keyset, index='', limit=0,
resume_token=b''):
"""Perform a ``StreamingRead`` API request for rows in a table.
:type table: str
:param table: name of the table from which to fetch data
:type columns: list of str
:param columns: names of columns to be retrieved
:type keyset: :class:`~google.cloud.spanner.keyset.KeySet`
:param keyset: keys / ranges identifying rows to be retrieved
:type index: str
:param index: (Optional) name of index to use, rather than the
table's primary key
:type limit: int
:param limit: (Optional) maxiumn number of rows to return
:type resume_token: bytes
:param resume_token: token for resuming previously-interrupted read
:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
with SessionCheckout(self._pool) as session:
return session.read(
table, columns, keyset, index, limit, resume_token)

def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
resume_token=b''):
"""Perform an ``ExecuteStreamingSql`` API request.
:type sql: str
:param sql: SQL query statement
:type params: dict, {str -> column value}
:param params: values for parameter replacement. Keys must match
the names used in ``sql``.
:type param_types:
dict, {str -> :class:`google.spanner.v1.type_pb2.TypeCode`}
:param param_types: (Optional) explicit types for one or more param
values; overrides default type detection on the
back-end.
:type query_mode:
:class:`google.spanner.v1.spanner_pb2.ExecuteSqlRequest.QueryMode`
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1
:type resume_token: bytes
:param resume_token: token for resuming previously-interrupted query
:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
with SessionCheckout(self._pool) as session:
return session.execute_sql(
sql, params, param_types, query_mode, resume_token)

def run_in_transaction(self, func, *args, **kw):
"""Perform a unit of work in a transaction, retrying on abort.
Expand Down
19 changes: 10 additions & 9 deletions spanner/tests/system/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def test_update_database_ddl(self):

self.assertEqual(len(temp_db.ddl_statements), len(DDL_STATEMENTS))

def test_db_batch_insert_then_db_snapshot_read_and_db_read(self):
def test_db_batch_insert_then_db_snapshot_read(self):
retry = RetryInstanceState(_has_all_ddl)
retry(self._db.reload)()

Expand All @@ -310,10 +310,7 @@ def test_db_batch_insert_then_db_snapshot_read_and_db_read(self):

self._check_row_data(from_snap)

from_db = list(self._db.read(self.TABLE, self.COLUMNS, self.ALL))
self._check_row_data(from_db)

def test_db_run_in_transaction_then_db_execute_sql(self):
def test_db_run_in_transaction_then_snapshot_execute_sql(self):
retry = RetryInstanceState(_has_all_ddl)
retry(self._db.reload)()

Expand All @@ -329,7 +326,8 @@ def _unit_of_work(transaction, test):

self._db.run_in_transaction(_unit_of_work, test=self)

rows = list(self._db.execute_sql(self.SQL))
with self._db.snapshot() as after:
rows = list(after.execute_sql(self.SQL))
self._check_row_data(rows)

def test_db_run_in_transaction_twice(self):
Expand All @@ -346,7 +344,8 @@ def _unit_of_work(transaction, test):
self._db.run_in_transaction(_unit_of_work, test=self)
self._db.run_in_transaction(_unit_of_work, test=self)

rows = list(self._db.execute_sql(self.SQL))
with self._db.snapshot() as after:
rows = list(after.execute_sql(self.SQL))
self._check_row_data(rows)


Expand Down Expand Up @@ -1085,15 +1084,17 @@ def setUpClass(cls):

def _verify_one_column(self, table_desc):
sql = 'SELECT chunk_me FROM {}'.format(table_desc.table)
rows = list(self._db.execute_sql(sql))
with self._db.snapshot() as snapshot:
rows = list(snapshot.execute_sql(sql))
self.assertEqual(len(rows), table_desc.row_count)
expected = table_desc.value()
for row in rows:
self.assertEqual(row[0], expected)

def _verify_two_columns(self, table_desc):
sql = 'SELECT chunk_me, chunk_me_2 FROM {}'.format(table_desc.table)
rows = list(self._db.execute_sql(sql))
with self._db.snapshot() as snapshot:
rows = list(snapshot.execute_sql(sql))
self.assertEqual(len(rows), table_desc.row_count)
expected = table_desc.value()
for row in rows:
Expand Down
55 changes: 0 additions & 55 deletions spanner/tests/unit/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,21 +621,6 @@ def test_session_factory(self):
self.assertIs(session.session_id, None)
self.assertIs(session._database, database)

def test_execute_sql_defaults(self):
QUERY = 'SELECT * FROM employees'
client = _Client()
instance = _Instance(self.INSTANCE_NAME, client=client)
pool = _Pool()
session = _Session()
pool.put(session)
session._execute_result = []
database = self._make_one(self.DATABASE_ID, instance, pool=pool)

rows = list(database.execute_sql(QUERY))

self.assertEqual(rows, [])
self.assertEqual(session._executed, (QUERY, None, None, None, b''))

def test_run_in_transaction_wo_args(self):
import datetime

Expand Down Expand Up @@ -678,38 +663,6 @@ def test_run_in_transaction_w_args(self):
self.assertEqual(session._retried,
(_unit_of_work, (SINCE,), {'until': UNTIL}))

def test_read(self):
from google.cloud.spanner.keyset import KeySet

TABLE_NAME = 'citizens'
COLUMNS = ['email', 'first_name', 'last_name', 'age']
KEYS = ['bharney@example.com', 'phred@example.com']
KEYSET = KeySet(keys=KEYS)
INDEX = 'email-address-index'
LIMIT = 20
TOKEN = b'DEADBEEF'
client = _Client()
instance = _Instance(self.INSTANCE_NAME, client=client)
pool = _Pool()
session = _Session()
pool.put(session)
database = self._make_one(self.DATABASE_ID, instance, pool=pool)

rows = list(database.read(
TABLE_NAME, COLUMNS, KEYSET, INDEX, LIMIT, TOKEN))

self.assertEqual(rows, [])

(table, columns, key_set, index, limit,
resume_token) = session._read_with

self.assertEqual(table, TABLE_NAME)
self.assertEqual(columns, COLUMNS)
self.assertEqual(key_set, KEYSET)
self.assertEqual(index, INDEX)
self.assertEqual(limit, LIMIT)
self.assertEqual(resume_token, TOKEN)

def test_batch(self):
from google.cloud.spanner.database import BatchCheckout

Expand Down Expand Up @@ -951,18 +904,10 @@ def __init__(self, database=None, name=_BaseTest.SESSION_NAME):
self._database = database
self.name = name

def execute_sql(self, sql, params, param_types, query_mode, resume_token):
self._executed = (sql, params, param_types, query_mode, resume_token)
return iter(self._rows)

def run_in_transaction(self, func, *args, **kw):
self._retried = (func, args, kw)
return self._committed

def read(self, table, columns, keyset, index, limit, resume_token):
self._read_with = (table, columns, keyset, index, limit, resume_token)
return iter(self._rows)


class _SessionPB(object):
name = TestDatabase.SESSION_NAME
Expand Down

0 comments on commit 909ca9c

Please sign in to comment.