Skip to content

Commit

Permalink
Add 'eventual' flag to 'Connection.lookup()'/'Connection.run_query()'.
Browse files Browse the repository at this point in the history
Allows selecting EVENTUAL consistency, rather than the STRONG default.

Fixes #305.
  • Loading branch information
tseaver committed Dec 19, 2014
1 parent 21e5f45 commit a41ff58
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 14 deletions.
79 changes: 65 additions & 14 deletions gcloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ def dataset(self, *args, **kwargs):
kwargs['connection'] = self
return Dataset(*args, **kwargs)

def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
def lookup(self, dataset_id, key_pbs,
missing=None, deferred=None, eventual=False):
"""Lookup keys from a dataset in the Cloud Datastore.
Maps the ``DatastoreService.Lookup`` protobuf RPC.
Expand Down Expand Up @@ -211,22 +212,40 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
by the backend as "deferred" will be copied into it.
Use only as a keyword param.
:type eventual: bool
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency. If the connection has a current
transaction, this value *must* be false.
:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity`
(or a single Entity)
:returns: The entities corresponding to the keys provided.
If a single key was provided and no results matched,
this will return None.
If multiple keys were provided and no results matched,
this will return an empty list.
:raises: ValueError if ``eventual`` is True
"""
if missing is not None and missing != []:
raise ValueError('missing must be None or an empty list')

if deferred is not None and deferred != []:
raise ValueError('deferred must be None or an empty list')

transaction = self.transaction()
if eventual and transaction:
raise ValueError('eventual must be False when in a transaction')

lookup_request = datastore_pb.LookupRequest()

opts = lookup_request.read_options
ro_enum = datastore_pb.ReadOptions.ReadConsistency
if eventual:
opts.read_consistency = ro_enum.Value('EVENTUAL')
elif transaction:
opts.transaction = transaction

single_key = isinstance(key_pbs, datastore_pb.Key)

if single_key:
Expand All @@ -235,19 +254,42 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
for key_pb in key_pbs:
lookup_request.key.add().CopyFrom(key_pb)

results, m_found, d_found = self._lookup(lookup_request, dataset_id,
deferred is not None)

if missing is not None:
missing.extend(m_found)

if deferred is not None:
deferred.extend(d_found)

if single_key:
if results:
return results[0]
else:
return None

return results

def _lookup(self, lookup_request, dataset_id, stop_on_deferred):
"""Repeat lookup until all keys found (unless stop requested).
Helper method for ``lookup()``.
"""
results = []
missing = []
deferred = []
while True: # loop against possible deferred.
lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)

results.extend(
[result.entity for result in lookup_response.found])

if missing is not None:
missing.extend(
[result.entity for result in lookup_response.missing])
missing.extend(
[result.entity for result in lookup_response.missing])

if deferred is not None:
if stop_on_deferred:
deferred.extend([key for key in lookup_response.deferred])
break

Expand All @@ -257,16 +299,9 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None):
# We have deferred keys, and the user didn't ask to know about
# them, so retry (but only with the deferred ones).
_copy_deferred_keys(lookup_request, lookup_response)
return results, missing, deferred

if single_key:
if results:
return results[0]
else:
return None

return results

def run_query(self, dataset_id, query_pb, namespace=None):
def run_query(self, dataset_id, query_pb, namespace=None, eventual=False):
"""Run a query on the Cloud Datastore.
Maps the ``DatastoreService.RunQuery`` protobuf RPC.
Expand Down Expand Up @@ -310,8 +345,24 @@ def run_query(self, dataset_id, query_pb, namespace=None):
:type namespace: string
:param namespace: The namespace over which to run the query.
:type eventual: bool
:param eventual: If False (the default), request ``STRONG`` read
consistency. If True, request ``EVENTUAL`` read
consistency. If the connection has a current
transaction, this value *must* be false.
"""
transaction = self.transaction()
if eventual and transaction:
raise ValueError('eventual must be False when in a transaction')

request = datastore_pb.RunQueryRequest()
opts = request.read_options
ro_enum = datastore_pb.ReadOptions.ReadConsistency
if eventual:
opts.read_consistency = ro_enum.Value('EVENTUAL')
elif transaction:
opts.transaction = transaction

if namespace:
request.partition_id.namespace = namespace
Expand Down
174 changes: 174 additions & 0 deletions gcloud/datastore/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,78 @@ def test_lookup_single_key_empty_response(self):
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)

def test_lookup_single_key_empty_response_w_eventual(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
rsp_pb = datastore_pb.LookupResponse()
conn = self._makeOne()
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'lookup',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
self.assertEqual(conn.lookup(DATASET_ID, key_pb, eventual=True), None)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.LookupRequest
request = rq_class()
request.ParseFromString(cw['body'])
keys = list(request.key)
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)
ro_enum = datastore_pb.ReadOptions.ReadConsistency
self.assertEqual(request.read_options.read_consistency,
ro_enum.Value('EVENTUAL'))
self.assertEqual(request.read_options.transaction, '')

def test_lookup_single_key_empty_response_w_eventual_and_transaction(self):
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
TRANSACTION = 'TRANSACTION'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
conn = self._makeOne()
conn.transaction(TRANSACTION)
self.assertRaises(
ValueError, conn.lookup, DATASET_ID, key_pb, eventual=True)

def test_lookup_single_key_empty_response_w_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key

DATASET_ID = 'DATASET'
TRANSACTION = 'TRANSACTION'
key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf()
rsp_pb = datastore_pb.LookupResponse()
conn = self._makeOne()
conn.transaction(TRANSACTION)
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'lookup',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
self.assertEqual(conn.lookup(DATASET_ID, key_pb), None)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.LookupRequest
request = rq_class()
request.ParseFromString(cw['body'])
keys = list(request.key)
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb)
self.assertEqual(request.read_options.transaction, TRANSACTION)

def test_lookup_single_key_nonempty_response(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.key import Key
Expand Down Expand Up @@ -443,6 +515,108 @@ def test_lookup_multiple_keys_w_deferred_from_backend_but_not_passed(self):
self.assertEqual(len(keys), 1)
self.assertEqual(keys[0], key_pb2)

def test_run_query_w_eventual_no_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb,
eventual=True)
self.assertEqual(pbs, [])
self.assertEqual(end, CURSOR)
self.assertTrue(more)
self.assertEqual(skipped, 0)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.RunQueryRequest
request = rq_class()
request.ParseFromString(cw['body'])
self.assertEqual(request.partition_id.namespace, '')
self.assertEqual(request.query, q_pb)
ro_enum = datastore_pb.ReadOptions.ReadConsistency
self.assertEqual(request.read_options.read_consistency,
ro_enum.Value('EVENTUAL'))
self.assertEqual(request.read_options.transaction, '')

def test_run_query_wo_eventual_w_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
TRANSACTION = 'TRANSACTION'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
conn.transaction(TRANSACTION)
URI = '/'.join([
conn.API_BASE_URL,
'datastore',
conn.API_VERSION,
'datasets',
DATASET_ID,
'runQuery',
])
http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString())
pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb)
self.assertEqual(pbs, [])
self.assertEqual(end, CURSOR)
self.assertTrue(more)
self.assertEqual(skipped, 0)
cw = http._called_with
self._verifyProtobufCall(cw, URI, conn)
rq_class = datastore_pb.RunQueryRequest
request = rq_class()
request.ParseFromString(cw['body'])
self.assertEqual(request.partition_id.namespace, '')
self.assertEqual(request.query, q_pb)
ro_enum = datastore_pb.ReadOptions.ReadConsistency
self.assertEqual(request.read_options.read_consistency,
ro_enum.Value('DEFAULT'))
self.assertEqual(request.read_options.transaction, TRANSACTION)

def test_run_query_w_eventual_and_transaction(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query

DATASET_ID = 'DATASET'
KIND = 'Nonesuch'
CURSOR = b'\x00'
TRANSACTION = 'TRANSACTION'
q_pb = Query(KIND, DATASET_ID).to_protobuf()
rsp_pb = datastore_pb.RunQueryResponse()
rsp_pb.batch.end_cursor = CURSOR
no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS
rsp_pb.batch.more_results = no_more
rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL
conn = self._makeOne()
conn.transaction(TRANSACTION)
self.assertRaises(
ValueError, conn.run_query, DATASET_ID, q_pb, eventual=True)

def test_run_query_wo_namespace_empty_result(self):
from gcloud.datastore.connection import datastore_pb
from gcloud.datastore.query import Query
Expand Down

0 comments on commit a41ff58

Please sign in to comment.