Skip to content

Commit

Permalink
Closes #4340 - Specify Read Consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
chemelnucfin committed Nov 4, 2017
1 parent 8806a6a commit b1258fe
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 64 deletions.
83 changes: 37 additions & 46 deletions datastore/google/cloud/datastore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,24 @@

import os

from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2

from google.cloud._helpers import _LocalStack
from google.cloud._helpers import (
_determine_default_project as _base_default_project)
from google.cloud._helpers import (_determine_default_project as
_default_project)

from google.cloud.client import ClientWithProject
from google.cloud.environment_vars import DISABLE_GRPC
from google.cloud.environment_vars import GCD_DATASET
from google.cloud.environment_vars import GCD_HOST

from google.cloud.datastore._http import HTTPDatastoreAPI
from google.cloud.datastore import helpers
from google.cloud.datastore._http import HTTPDatastoreAPI
from google.cloud.datastore.batch import Batch
from google.cloud.datastore.entity import Entity
from google.cloud.datastore.key import Key
from google.cloud.datastore.query import Query
from google.cloud.datastore.transaction import Transaction

from google.cloud.environment_vars import DISABLE_GRPC
from google.cloud.environment_vars import GCD_DATASET
from google.cloud.environment_vars import GCD_HOST

try:
from google.cloud.datastore._gax import make_datastore_api
_HAVE_GRPC = True
Expand Down Expand Up @@ -74,7 +75,7 @@ def _determine_default_project(project=None):
project = _get_gcd_project()

if project is None:
project = _base_default_project(project=project)
project = _default_project(project=project)

return project

Expand Down Expand Up @@ -131,7 +132,7 @@ def _extended_lookup(datastore_api, project, key_pbs,
results = []

loop_num = 0
read_options = _get_read_options(eventual, transaction_id)
read_options = helpers.get_read_options(eventual, transaction_id)
while loop_num < _MAX_LOOPS: # loop against possible deferred.
loop_num += 1
lookup_response = datastore_api.lookup(
Expand Down Expand Up @@ -276,7 +277,8 @@ def current_transaction(self):
if isinstance(transaction, Transaction):
return transaction

def get(self, key, missing=None, deferred=None, transaction=None):
def get(self, key, missing=None, deferred=None,
transaction=None, eventual=False):
"""Retrieve an entity from a single key (if it exists).
.. note::
Expand All @@ -302,15 +304,27 @@ def get(self, key, missing=None, deferred=None, transaction=None):
:param transaction: (Optional) Transaction to use for read consistency.
If not passed, uses current transaction, if set.
:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
:rtype: :class:`google.cloud.datastore.entity.Entity` or ``NoneType``
:returns: The requested entity if it exists.
:raises: :class:`ValueError` if eventual is True and in a transaction.
"""
entities = self.get_multi(keys=[key], missing=missing,
deferred=deferred, transaction=transaction)
entities = self.get_multi(keys=[key],
missing=missing,
deferred=deferred,
transaction=transaction,
eventual=eventual)
if entities:
return entities[0]

def get_multi(self, keys, missing=None, deferred=None, transaction=None):
def get_multi(self, keys, missing=None, deferred=None,
transaction=None, eventual=False):
"""Retrieve entities, along with their attributes.
:type keys: list of :class:`google.cloud.datastore.key.Key`
Expand All @@ -331,10 +345,17 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None):
:param transaction: (Optional) Transaction to use for read consistency.
If not passed, uses current transaction, if set.
:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
:rtype: list of :class:`google.cloud.datastore.entity.Entity`
:returns: The requested entities.
:raises: :class:`ValueError` if one or more of ``keys`` has a project
which does not match our project.
:raises: :class:`ValueError` if eventual is True and in a transaction.
"""
if not keys:
return []
Expand All @@ -350,7 +371,8 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None):
entity_pbs = _extended_lookup(
datastore_api=self._datastore_api,
project=self.project,
key_pbs=[k.to_protobuf() for k in keys],
key_pbs=[key.to_protobuf() for key in keys],
eventual=eventual,
missing=missing,
deferred=deferred,
transaction_id=transaction and transaction.id,
Expand Down Expand Up @@ -578,34 +600,3 @@ def do_something(entity):
if 'namespace' not in kwargs:
kwargs['namespace'] = self.namespace
return Query(self, **kwargs)


def _get_read_options(eventual, transaction_id):
"""Validate rules for read options, and assign to the request.
Helper method for ``lookup()`` and ``run_query``.
:type eventual: bool
:param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG``
consistency should be used.
:type transaction_id: bytes
:param transaction_id: A transaction identifier (may be null).
:rtype: :class:`.datastore_pb2.ReadOptions`
:returns: The read options corresponding to the inputs.
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
``transaction_id`` is not ``None``.
"""
if transaction_id is None:
if eventual:
return _datastore_pb2.ReadOptions(
read_consistency=_datastore_pb2.ReadOptions.EVENTUAL)
else:
return _datastore_pb2.ReadOptions()
else:
if eventual:
raise ValueError('eventual must be False when in a transaction')
else:
return _datastore_pb2.ReadOptions(
transaction=transaction_id)
40 changes: 36 additions & 4 deletions datastore/google/cloud/datastore/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@

import datetime
import itertools

from google.protobuf import struct_pb2
from google.type import latlng_pb2
import six

from google.cloud._helpers import _datetime_to_pb_timestamp
from google.cloud._helpers import _pb_timestamp_to_datetime
from google.cloud.proto.datastore.v1 import entity_pb2 as _entity_pb2
from google.cloud.datastore.entity import Entity
from google.cloud.datastore.key import Key
from google.cloud.proto.datastore.v1 import entity_pb2 as _entity_pb2
from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2

from google.protobuf import struct_pb2
from google.type import latlng_pb2


def _get_meaning(value_pb, is_list=False):
Expand Down Expand Up @@ -233,6 +234,37 @@ def entity_to_protobuf(entity):
return entity_pb


def get_read_options(eventual, transaction_id):
"""Validate rules for read options, and assign to the request.
Helper method for ``lookup()`` and ``run_query``.
:type eventual: bool
:param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG``
consistency should be used.
:type transaction_id: bytes
:param transaction_id: A transaction identifier (may be null).
:rtype: :class:`.datastore_pb2.ReadOptions`
:returns: The read options corresponding to the inputs.
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
``transaction_id`` is not ``None``.
"""
if transaction_id is None:
if eventual:
return _datastore_pb2.ReadOptions(
read_consistency=_datastore_pb2.ReadOptions.EVENTUAL)
else:
return _datastore_pb2.ReadOptions()
else:
if eventual:
raise ValueError('eventual must be False when in a transaction')
else:
return _datastore_pb2.ReadOptions(
transaction=transaction_id)


def key_from_protobuf(pb):
"""Factory method for creating a key based on a protobuf.
Expand Down
36 changes: 26 additions & 10 deletions datastore/google/cloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from google.api_core import page_iterator
from google.cloud._helpers import _ensure_tuple_or_list

from google.cloud.proto.datastore.v1 import datastore_pb2 as _datastore_pb2
from google.cloud.proto.datastore.v1 import entity_pb2 as _entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2 as _query_pb2
from google.cloud.datastore import helpers
Expand Down Expand Up @@ -331,7 +330,7 @@ def distinct_on(self, value):
self._distinct_on[:] = value

def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
client=None):
client=None, eventual=False):
"""Execute the Query; return an iterator for the matching entities.
For example::
Expand All @@ -358,18 +357,28 @@ def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
:param end_cursor: (Optional) cursor passed through to the iterator.
:type client: :class:`google.cloud.datastore.client.Client`
:param client: client used to connect to datastore.
:param client: (Optional) client used to connect to datastore.
If not supplied, uses the query's value.
:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
:rtype: :class:`Iterator`
:returns: The iterator for the query.
"""
if client is None:
client = self._client

return Iterator(
self, client, limit=limit, offset=offset,
start_cursor=start_cursor, end_cursor=end_cursor)
return Iterator(self,
client,
limit=limit,
offset=offset,
start_cursor=start_cursor,
end_cursor=end_cursor,
eventual=eventual)


class Iterator(page_iterator.Iterator):
Expand All @@ -396,18 +405,25 @@ class Iterator(page_iterator.Iterator):
:type end_cursor: bytes
:param end_cursor: (Optional) Cursor to end paging through
query results.
:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
"""

next_page_token = None

def __init__(self, query, client, limit=None, offset=None,
start_cursor=None, end_cursor=None):
start_cursor=None, end_cursor=None, eventual=False):
super(Iterator, self).__init__(
client=client, item_to_value=_item_to_entity,
page_token=start_cursor, max_results=limit)
self._query = query
self._offset = offset
self._end_cursor = end_cursor
self._eventual = eventual
# The attributes below will change over the life of the iterator.
self._more_results = True
self._skipped_results = 0
Expand Down Expand Up @@ -483,10 +499,10 @@ def _next_page(self):
query_pb = self._build_protobuf()
transaction = self.client.current_transaction
if transaction is None:
read_options = _datastore_pb2.ReadOptions()
transaction_id = None
else:
read_options = _datastore_pb2.ReadOptions(
transaction=transaction.id)
transaction_id = transaction.id
read_options = helpers.get_read_options(self._eventual, transaction_id)

partition_id = _entity_pb2.PartitionId(
project_id=self._query.project,
Expand Down
8 changes: 4 additions & 4 deletions datastore/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def fallback_mock(project=None):
patch = mock.patch.multiple(
'google.cloud.datastore.client',
_get_gcd_project=gcd_mock,
_base_default_project=fallback_mock)
_default_project=fallback_mock)
with patch:
returned_project = self._call_fut(project_called)

Expand Down Expand Up @@ -138,7 +138,7 @@ def test_constructor_w_project_no_environ(self):
# Some environments (e.g. AppVeyor CI) run in GCE, so
# this test would fail artificially.
patch = mock.patch(
'google.cloud.datastore.client._base_default_project',
'google.cloud.datastore.client._default_project',
return_value=None)
with patch:
self.assertRaises(EnvironmentError, self._make_one, None)
Expand Down Expand Up @@ -1013,9 +1013,9 @@ def test_query_w_namespace_collision(self):
class Test__get_read_options(unittest.TestCase):

def _call_fut(self, eventual, transaction_id):
from google.cloud.datastore.client import _get_read_options
from google.cloud.datastore.helpers import get_read_options

return _get_read_options(eventual, transaction_id)
return get_read_options(eventual, transaction_id)

def test_eventual_w_transaction(self):
with self.assertRaises(ValueError):
Expand Down

0 comments on commit b1258fe

Please sign in to comment.