From f7aa293a8c461ce9a56f4e0647e9fae8b32002b5 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Fri, 3 Nov 2017 16:27:26 -0700 Subject: [PATCH] Closes #4340 - Specify Read Consistency --- datastore/google/cloud/datastore/client.py | 83 +++++++++------------ datastore/google/cloud/datastore/helpers.py | 40 +++++++++- datastore/google/cloud/datastore/query.py | 36 ++++++--- datastore/tests/unit/test_client.py | 8 +- 4 files changed, 103 insertions(+), 64 deletions(-) diff --git a/datastore/google/cloud/datastore/client.py b/datastore/google/cloud/datastore/client.py index 4a4228a6b7f2..c137e2a4b8fd 100644 --- a/datastore/google/cloud/datastore/client.py +++ b/datastore/google/cloud/datastore/client.py @@ -15,23 +15,24 @@ import os -from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2 - from google.cloud._helpers import _LocalStack -from google.cloud._helpers import ( - _determine_default_project as _base_default_project) +from google.cloud._helpers import (_determine_default_project as + _default_project) + from google.cloud.client import ClientWithProject -from google.cloud.environment_vars import DISABLE_GRPC -from google.cloud.environment_vars import GCD_DATASET -from google.cloud.environment_vars import GCD_HOST -from google.cloud.datastore._http import HTTPDatastoreAPI from google.cloud.datastore import helpers +from google.cloud.datastore._http import HTTPDatastoreAPI from google.cloud.datastore.batch import Batch from google.cloud.datastore.entity import Entity from google.cloud.datastore.key import Key from google.cloud.datastore.query import Query from google.cloud.datastore.transaction import Transaction + +from google.cloud.environment_vars import DISABLE_GRPC +from google.cloud.environment_vars import GCD_DATASET +from google.cloud.environment_vars import GCD_HOST + try: from google.cloud.datastore._gax import make_datastore_api _HAVE_GRPC = True @@ -74,7 +75,7 @@ def _determine_default_project(project=None): project = _get_gcd_project() if project is None: - project = _base_default_project(project=project) + project = _default_project(project=project) return project @@ -131,7 +132,7 @@ def _extended_lookup(datastore_api, project, key_pbs, results = [] loop_num = 0 - read_options = _get_read_options(eventual, transaction_id) + read_options = helpers.get_read_options(eventual, transaction_id) while loop_num < _MAX_LOOPS: # loop against possible deferred. loop_num += 1 lookup_response = datastore_api.lookup( @@ -276,7 +277,8 @@ def current_transaction(self): if isinstance(transaction, Transaction): return transaction - def get(self, key, missing=None, deferred=None, transaction=None): + def get(self, key, missing=None, deferred=None, + transaction=None, eventual=False): """Retrieve an entity from a single key (if it exists). .. note:: @@ -302,15 +304,27 @@ def get(self, key, missing=None, deferred=None, transaction=None): :param transaction: (Optional) Transaction to use for read consistency. If not passed, uses current transaction, if set. + :type eventual: bool + :param eventual: (Optional) Defaults to strongly consistent (False). + Setting True will use eventual consistency, + but cannot be used inside a transaction or + will raise ValueError. + :rtype: :class:`google.cloud.datastore.entity.Entity` or ``NoneType`` :returns: The requested entity if it exists. + + :raises: :class:`ValueError` if eventual is True and in a transaction. """ - entities = self.get_multi(keys=[key], missing=missing, - deferred=deferred, transaction=transaction) + entities = self.get_multi(keys=[key], + missing=missing, + deferred=deferred, + transaction=transaction, + eventual=eventual) if entities: return entities[0] - def get_multi(self, keys, missing=None, deferred=None, transaction=None): + def get_multi(self, keys, missing=None, deferred=None, + transaction=None, eventual=False): """Retrieve entities, along with their attributes. :type keys: list of :class:`google.cloud.datastore.key.Key` @@ -331,10 +345,17 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None): :param transaction: (Optional) Transaction to use for read consistency. If not passed, uses current transaction, if set. + :type eventual: bool + :param eventual: (Optional) Defaults to strongly consistent (False). + Setting True will use eventual consistency, + but cannot be used inside a transaction or + will raise ValueError. + :rtype: list of :class:`google.cloud.datastore.entity.Entity` :returns: The requested entities. :raises: :class:`ValueError` if one or more of ``keys`` has a project which does not match our project. + :raises: :class:`ValueError` if eventual is True and in a transaction. """ if not keys: return [] @@ -350,7 +371,8 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None): entity_pbs = _extended_lookup( datastore_api=self._datastore_api, project=self.project, - key_pbs=[k.to_protobuf() for k in keys], + key_pbs=[key.to_protobuf() for key in keys], + eventual=eventual, missing=missing, deferred=deferred, transaction_id=transaction and transaction.id, @@ -578,34 +600,3 @@ def do_something(entity): if 'namespace' not in kwargs: kwargs['namespace'] = self.namespace return Query(self, **kwargs) - - -def _get_read_options(eventual, transaction_id): - """Validate rules for read options, and assign to the request. - - Helper method for ``lookup()`` and ``run_query``. - - :type eventual: bool - :param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG`` - consistency should be used. - - :type transaction_id: bytes - :param transaction_id: A transaction identifier (may be null). - - :rtype: :class:`.datastore_pb2.ReadOptions` - :returns: The read options corresponding to the inputs. - :raises: :class:`ValueError` if ``eventual`` is ``True`` and the - ``transaction_id`` is not ``None``. - """ - if transaction_id is None: - if eventual: - return _datastore_pb2.ReadOptions( - read_consistency=_datastore_pb2.ReadOptions.EVENTUAL) - else: - return _datastore_pb2.ReadOptions() - else: - if eventual: - raise ValueError('eventual must be False when in a transaction') - else: - return _datastore_pb2.ReadOptions( - transaction=transaction_id) diff --git a/datastore/google/cloud/datastore/helpers.py b/datastore/google/cloud/datastore/helpers.py index 056376965725..d059ba81bafd 100644 --- a/datastore/google/cloud/datastore/helpers.py +++ b/datastore/google/cloud/datastore/helpers.py @@ -19,16 +19,17 @@ import datetime import itertools - -from google.protobuf import struct_pb2 -from google.type import latlng_pb2 import six from google.cloud._helpers import _datetime_to_pb_timestamp from google.cloud._helpers import _pb_timestamp_to_datetime -from google.cloud.proto.datastore.v1 import entity_pb2 as _entity_pb2 from google.cloud.datastore.entity import Entity from google.cloud.datastore.key import Key +from google.cloud.proto.datastore.v1 import entity_pb2 as _entity_pb2 +from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2 + +from google.protobuf import struct_pb2 +from google.type import latlng_pb2 def _get_meaning(value_pb, is_list=False): @@ -233,6 +234,37 @@ def entity_to_protobuf(entity): return entity_pb +def get_read_options(eventual, transaction_id): + """Validate rules for read options, and assign to the request. + + Helper method for ``lookup()`` and ``run_query``. + + :type eventual: bool + :param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG`` + consistency should be used. + + :type transaction_id: bytes + :param transaction_id: A transaction identifier (may be null). + + :rtype: :class:`.datastore_pb2.ReadOptions` + :returns: The read options corresponding to the inputs. + :raises: :class:`ValueError` if ``eventual`` is ``True`` and the + ``transaction_id`` is not ``None``. + """ + if transaction_id is None: + if eventual: + return _datastore_pb2.ReadOptions( + read_consistency=_datastore_pb2.ReadOptions.EVENTUAL) + else: + return _datastore_pb2.ReadOptions() + else: + if eventual: + raise ValueError('eventual must be False when in a transaction') + else: + return _datastore_pb2.ReadOptions( + transaction=transaction_id) + + def key_from_protobuf(pb): """Factory method for creating a key based on a protobuf. diff --git a/datastore/google/cloud/datastore/query.py b/datastore/google/cloud/datastore/query.py index 477eccb04395..bfb82fa68e80 100644 --- a/datastore/google/cloud/datastore/query.py +++ b/datastore/google/cloud/datastore/query.py @@ -19,7 +19,6 @@ from google.api_core import page_iterator from google.cloud._helpers import _ensure_tuple_or_list -from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2 from google.cloud.proto.datastore.v1 import entity_pb2 as _entity_pb2 from google.cloud.proto.datastore.v1 import query_pb2 as _query_pb2 from google.cloud.datastore import helpers @@ -331,7 +330,7 @@ def distinct_on(self, value): self._distinct_on[:] = value def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None, - client=None): + client=None, eventual=False): """Execute the Query; return an iterator for the matching entities. For example:: @@ -358,18 +357,28 @@ def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None, :param end_cursor: (Optional) cursor passed through to the iterator. :type client: :class:`google.cloud.datastore.client.Client` - :param client: client used to connect to datastore. + :param client: (Optional) client used to connect to datastore. If not supplied, uses the query's value. + :type eventual: bool + :param eventual: (Optional) Defaults to strongly consistent (False). + Setting True will use eventual consistency, + but cannot be used inside a transaction or + will raise ValueError. + :rtype: :class:`Iterator` :returns: The iterator for the query. """ if client is None: client = self._client - return Iterator( - self, client, limit=limit, offset=offset, - start_cursor=start_cursor, end_cursor=end_cursor) + return Iterator(self, + client, + limit=limit, + offset=offset, + start_cursor=start_cursor, + end_cursor=end_cursor, + eventual=eventual) class Iterator(page_iterator.Iterator): @@ -396,18 +405,25 @@ class Iterator(page_iterator.Iterator): :type end_cursor: bytes :param end_cursor: (Optional) Cursor to end paging through query results. + + :type eventual: bool + :param eventual: (Optional) Defaults to strongly consistent (False). + Setting True will use eventual consistency, + but cannot be used inside a transaction or + will raise ValueError. """ next_page_token = None def __init__(self, query, client, limit=None, offset=None, - start_cursor=None, end_cursor=None): + start_cursor=None, end_cursor=None, eventual=False): super(Iterator, self).__init__( client=client, item_to_value=_item_to_entity, page_token=start_cursor, max_results=limit) self._query = query self._offset = offset self._end_cursor = end_cursor + self._eventual = eventual # The attributes below will change over the life of the iterator. self._more_results = True self._skipped_results = 0 @@ -483,10 +499,10 @@ def _next_page(self): query_pb = self._build_protobuf() transaction = self.client.current_transaction if transaction is None: - read_options = _datastore_pb2.ReadOptions() + transaction_id = None else: - read_options = _datastore_pb2.ReadOptions( - transaction=transaction.id) + transaction_id = transaction.id + read_options = helpers.get_read_options(self._eventual, transaction_id) partition_id = _entity_pb2.PartitionId( project_id=self._query.project, diff --git a/datastore/tests/unit/test_client.py b/datastore/tests/unit/test_client.py index 6477f53c5fa7..8bc70f2e3e80 100644 --- a/datastore/tests/unit/test_client.py +++ b/datastore/tests/unit/test_client.py @@ -85,7 +85,7 @@ def fallback_mock(project=None): patch = mock.patch.multiple( 'google.cloud.datastore.client', _get_gcd_project=gcd_mock, - _base_default_project=fallback_mock) + _default_project=fallback_mock) with patch: returned_project = self._call_fut(project_called) @@ -138,7 +138,7 @@ def test_constructor_w_project_no_environ(self): # Some environments (e.g. AppVeyor CI) run in GCE, so # this test would fail artificially. patch = mock.patch( - 'google.cloud.datastore.client._base_default_project', + 'google.cloud.datastore.client._default_project', return_value=None) with patch: self.assertRaises(EnvironmentError, self._make_one, None) @@ -1013,9 +1013,9 @@ def test_query_w_namespace_collision(self): class Test__get_read_options(unittest.TestCase): def _call_fut(self, eventual, transaction_id): - from google.cloud.datastore.client import _get_read_options + from google.cloud.datastore.helpers import get_read_options - return _get_read_options(eventual, transaction_id) + return get_read_options(eventual, transaction_id) def test_eventual_w_transaction(self): with self.assertRaises(ValueError):