Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow specifying read consistency #4343

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 33 additions & 46 deletions datastore/google/cloud/datastore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,21 @@

import os

from google.cloud.datastore_v1.proto import datastore_pb2 as _datastore_pb2

from google.cloud._helpers import _LocalStack
from google.cloud._helpers import (
_determine_default_project as _base_default_project)
from google.cloud._helpers import (_determine_default_project as
_base_default_project)

This comment was marked as spam.

from google.cloud.client import ClientWithProject
from google.cloud.environment_vars import DISABLE_GRPC
from google.cloud.environment_vars import GCD_DATASET
from google.cloud.environment_vars import GCD_HOST

from google.cloud.datastore._http import HTTPDatastoreAPI
from google.cloud.datastore import helpers
from google.cloud.datastore._http import HTTPDatastoreAPI
from google.cloud.datastore.batch import Batch
from google.cloud.datastore.entity import Entity
from google.cloud.datastore.key import Key
from google.cloud.datastore.query import Query
from google.cloud.datastore.transaction import Transaction
from google.cloud.environment_vars import DISABLE_GRPC
from google.cloud.environment_vars import GCD_DATASET
from google.cloud.environment_vars import GCD_HOST

try:
from google.cloud.datastore._gax import make_datastore_api
_HAVE_GRPC = True
Expand Down Expand Up @@ -131,7 +129,7 @@ def _extended_lookup(datastore_api, project, key_pbs,
results = []

loop_num = 0
read_options = _get_read_options(eventual, transaction_id)
read_options = helpers.get_read_options(eventual, transaction_id)
while loop_num < _MAX_LOOPS: # loop against possible deferred.
loop_num += 1
lookup_response = datastore_api.lookup(
Expand Down Expand Up @@ -279,7 +277,8 @@ def current_transaction(self):
if isinstance(transaction, Transaction):
return transaction

def get(self, key, missing=None, deferred=None, transaction=None):
def get(self, key, missing=None, deferred=None,
transaction=None, eventual=False):
"""Retrieve an entity from a single key (if it exists).

.. note::
Expand All @@ -305,15 +304,26 @@ def get(self, key, missing=None, deferred=None, transaction=None):
:param transaction: (Optional) Transaction to use for read consistency.
If not passed, uses current transaction, if set.

:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency, but cannot
be used inside a transaction or will raise ValueError.

:rtype: :class:`google.cloud.datastore.entity.Entity` or ``NoneType``
:returns: The requested entity if it exists.

:raises: :class:`ValueError` if eventual is True and in a transaction.
"""
entities = self.get_multi(keys=[key], missing=missing,
deferred=deferred, transaction=transaction)
entities = self.get_multi(keys=[key],
missing=missing,
deferred=deferred,
transaction=transaction,
eventual=eventual)
if entities:
return entities[0]

def get_multi(self, keys, missing=None, deferred=None, transaction=None):
def get_multi(self, keys, missing=None, deferred=None,
transaction=None, eventual=False):
"""Retrieve entities, along with their attributes.

:type keys: list of :class:`google.cloud.datastore.key.Key`
Expand All @@ -334,10 +344,17 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None):
:param transaction: (Optional) Transaction to use for read consistency.
If not passed, uses current transaction, if set.

:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.

:rtype: list of :class:`google.cloud.datastore.entity.Entity`
:returns: The requested entities.
:raises: :class:`ValueError` if one or more of ``keys`` has a project
which does not match our project.
:raises: :class:`ValueError` if eventual is True and in a transaction.
"""
if not keys:
return []
Expand All @@ -353,7 +370,8 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None):
entity_pbs = _extended_lookup(
datastore_api=self._datastore_api,
project=self.project,
key_pbs=[k.to_protobuf() for k in keys],
key_pbs=[key.to_protobuf() for key in keys],

This comment was marked as spam.

eventual=eventual,
missing=missing,
deferred=deferred,
transaction_id=transaction and transaction.id,
Expand Down Expand Up @@ -581,34 +599,3 @@ def do_something(entity):
if 'namespace' not in kwargs:
kwargs['namespace'] = self.namespace
return Query(self, **kwargs)


def _get_read_options(eventual, transaction_id):
"""Validate rules for read options, and assign to the request.

Helper method for ``lookup()`` and ``run_query``.

:type eventual: bool
:param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG``
consistency should be used.

:type transaction_id: bytes
:param transaction_id: A transaction identifier (may be null).

:rtype: :class:`.datastore_pb2.ReadOptions`
:returns: The read options corresponding to the inputs.
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
``transaction_id`` is not ``None``.
"""
if transaction_id is None:
if eventual:
return _datastore_pb2.ReadOptions(
read_consistency=_datastore_pb2.ReadOptions.EVENTUAL)
else:
return _datastore_pb2.ReadOptions()
else:
if eventual:
raise ValueError('eventual must be False when in a transaction')
else:
return _datastore_pb2.ReadOptions(
transaction=transaction_id)
42 changes: 37 additions & 5 deletions datastore/google/cloud/datastore/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@

import datetime
import itertools

from google.protobuf import struct_pb2
from google.type import latlng_pb2
import six

from google.cloud._helpers import _datetime_to_pb_timestamp
from google.cloud._helpers import _pb_timestamp_to_datetime
from google.cloud.datastore_v1.proto import entity_pb2 as _entity_pb2
from google.cloud.datastore_v1.proto import datastore_pb2
from google.cloud.datastore_v1.proto import entity_pb2
from google.cloud.datastore.entity import Entity
from google.cloud.datastore.key import Key

from google.protobuf import struct_pb2
from google.type import latlng_pb2

This comment was marked as spam.



def _get_meaning(value_pb, is_list=False):
"""Get the meaning from a protobuf value.
Expand Down Expand Up @@ -204,7 +205,7 @@ def entity_to_protobuf(entity):
:rtype: :class:`.entity_pb2.Entity`
:returns: The protobuf representing the entity.
"""
entity_pb = _entity_pb2.Entity()
entity_pb = entity_pb2.Entity()
if entity.key is not None:
key_pb = entity.key.to_protobuf()
entity_pb.key.CopyFrom(key_pb)
Expand Down Expand Up @@ -233,6 +234,37 @@ def entity_to_protobuf(entity):
return entity_pb


def get_read_options(eventual, transaction_id):
"""Validate rules for read options, and assign to the request.

Helper method for ``lookup()`` and ``run_query``.

:type eventual: bool
:param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG``
consistency should be used.

:type transaction_id: bytes
:param transaction_id: A transaction identifier (may be null).

:rtype: :class:`.datastore_pb2.ReadOptions`
:returns: The read options corresponding to the inputs.
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
``transaction_id`` is not ``None``.
"""
if transaction_id is None:
if eventual:
return datastore_pb2.ReadOptions(
read_consistency=datastore_pb2.ReadOptions.EVENTUAL)
else:
return datastore_pb2.ReadOptions()
else:
if eventual:
raise ValueError('eventual must be False when in a transaction')
else:
return datastore_pb2.ReadOptions(
transaction=transaction_id)


def key_from_protobuf(pb):
"""Factory method for creating a key based on a protobuf.

Expand Down
66 changes: 41 additions & 25 deletions datastore/google/cloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@
from google.api_core import page_iterator
from google.cloud._helpers import _ensure_tuple_or_list

from google.cloud.datastore_v1.proto import datastore_pb2 as _datastore_pb2
from google.cloud.datastore_v1.proto import entity_pb2 as _entity_pb2
from google.cloud.datastore_v1.proto import query_pb2 as _query_pb2
from google.cloud.datastore_v1.proto import entity_pb2
from google.cloud.datastore_v1.proto import query_pb2
from google.cloud.datastore import helpers
from google.cloud.datastore.key import Key


_NOT_FINISHED = _query_pb2.QueryResultBatch.NOT_FINISHED
_NOT_FINISHED = query_pb2.QueryResultBatch.NOT_FINISHED

_FINISHED = (
_query_pb2.QueryResultBatch.NO_MORE_RESULTS,
_query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT,
_query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_CURSOR,
query_pb2.QueryResultBatch.NO_MORE_RESULTS,
query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT,
query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_CURSOR,
)


Expand Down Expand Up @@ -81,11 +80,11 @@ class Query(object):
"""

OPERATORS = {
'<=': _query_pb2.PropertyFilter.LESS_THAN_OR_EQUAL,
'>=': _query_pb2.PropertyFilter.GREATER_THAN_OR_EQUAL,
'<': _query_pb2.PropertyFilter.LESS_THAN,
'>': _query_pb2.PropertyFilter.GREATER_THAN,
'=': _query_pb2.PropertyFilter.EQUAL,
'<=': query_pb2.PropertyFilter.LESS_THAN_OR_EQUAL,
'>=': query_pb2.PropertyFilter.GREATER_THAN_OR_EQUAL,
'<': query_pb2.PropertyFilter.LESS_THAN,
'>': query_pb2.PropertyFilter.GREATER_THAN,
'=': query_pb2.PropertyFilter.EQUAL,
}
"""Mapping of operator strings and their protobuf equivalents."""

Expand Down Expand Up @@ -331,7 +330,7 @@ def distinct_on(self, value):
self._distinct_on[:] = value

def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
client=None):
client=None, eventual=False):
"""Execute the Query; return an iterator for the matching entities.

For example::
Expand All @@ -358,18 +357,28 @@ def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
:param end_cursor: (Optional) cursor passed through to the iterator.

:type client: :class:`google.cloud.datastore.client.Client`
:param client: client used to connect to datastore.
:param client: (Optional) client used to connect to datastore.
If not supplied, uses the query's value.

:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.

:rtype: :class:`Iterator`
:returns: The iterator for the query.
"""
if client is None:
client = self._client

return Iterator(
self, client, limit=limit, offset=offset,
start_cursor=start_cursor, end_cursor=end_cursor)
return Iterator(self,
client,
limit=limit,
offset=offset,
start_cursor=start_cursor,
end_cursor=end_cursor,
eventual=eventual)


class Iterator(page_iterator.Iterator):
Expand All @@ -396,18 +405,25 @@ class Iterator(page_iterator.Iterator):
:type end_cursor: bytes
:param end_cursor: (Optional) Cursor to end paging through
query results.

:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
"""

next_page_token = None

def __init__(self, query, client, limit=None, offset=None,
start_cursor=None, end_cursor=None):
start_cursor=None, end_cursor=None, eventual=False):
super(Iterator, self).__init__(
client=client, item_to_value=_item_to_entity,
page_token=start_cursor, max_results=limit)
self._query = query
self._offset = offset
self._end_cursor = end_cursor
self._eventual = eventual
# The attributes below will change over the life of the iterator.
self._more_results = True
self._skipped_results = 0
Expand Down Expand Up @@ -483,12 +499,12 @@ def _next_page(self):
query_pb = self._build_protobuf()
transaction = self.client.current_transaction
if transaction is None:
read_options = _datastore_pb2.ReadOptions()
transaction_id = None
else:
read_options = _datastore_pb2.ReadOptions(
transaction=transaction.id)
transaction_id = transaction.id
read_options = helpers.get_read_options(self._eventual, transaction_id)

partition_id = _entity_pb2.PartitionId(
partition_id = entity_pb2.PartitionId(
project_id=self._query.project,
namespace_id=self._query.namespace)
response_pb = self.client._datastore_api.run_query(
Expand All @@ -512,7 +528,7 @@ def _pb_from_query(query):
it does not contain "in-flight" fields for ongoing query
executions (cursors, offset, limit).
"""
pb = _query_pb2.Query()
pb = query_pb2.Query()

for projection_name in query.projection:
pb.projection.add().property.name = projection_name
Expand All @@ -521,15 +537,15 @@ def _pb_from_query(query):
pb.kind.add().name = query.kind

composite_filter = pb.filter.composite_filter
composite_filter.op = _query_pb2.CompositeFilter.AND
composite_filter.op = query_pb2.CompositeFilter.AND

if query.ancestor:
ancestor_pb = query.ancestor.to_protobuf()

# Filter on __key__ HAS_ANCESTOR == ancestor.
ancestor_filter = composite_filter.filters.add().property_filter
ancestor_filter.property.name = '__key__'
ancestor_filter.op = _query_pb2.PropertyFilter.HAS_ANCESTOR
ancestor_filter.op = query_pb2.PropertyFilter.HAS_ANCESTOR
ancestor_filter.value.key_value.CopyFrom(ancestor_pb)

for property_name, operator, value in query.filters:
Expand Down
Loading