diff --git a/gcloud/datastore/connection.py b/gcloud/datastore/connection.py index 97735be78135..52f269141b69 100644 --- a/gcloud/datastore/connection.py +++ b/gcloud/datastore/connection.py @@ -167,7 +167,8 @@ def dataset(self, *args, **kwargs): kwargs['connection'] = self return Dataset(*args, **kwargs) - def lookup(self, dataset_id, key_pbs, missing=None, deferred=None): + def lookup(self, dataset_id, key_pbs, + missing=None, deferred=None, eventual=False): """Lookup keys from a dataset in the Cloud Datastore. Maps the ``DatastoreService.Lookup`` protobuf RPC. @@ -211,6 +212,12 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None): by the backend as "deferred" will be copied into it. Use only as a keyword param. + :type eventual: bool + :param eventual: If False (the default), request ``STRONG`` read + consistency. If True, request ``EVENTUAL`` read + consistency. If the connection has a current + transaction, this value *must* be false. + :rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity` (or a single Entity) :returns: The entities corresponding to the keys provided. @@ -218,6 +225,7 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None): this will return None. If multiple keys were provided and no results matched, this will return an empty list. + :raises: ValueError if ``eventual`` is True """ if missing is not None and missing != []: raise ValueError('missing must be None or an empty list') @@ -226,6 +234,7 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None): raise ValueError('deferred must be None or an empty list') lookup_request = datastore_pb.LookupRequest() + self._set_read_options(lookup_request, eventual) single_key = isinstance(key_pbs, datastore_pb.Key) @@ -235,28 +244,14 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None): for key_pb in key_pbs: lookup_request.key.add().CopyFrom(key_pb) - results = [] - while True: # loop against possible deferred. - lookup_response = self._rpc(dataset_id, 'lookup', lookup_request, - datastore_pb.LookupResponse) + results, missing_found, deferred_found = self._lookup( + lookup_request, dataset_id, deferred is not None) - results.extend( - [result.entity for result in lookup_response.found]) + if missing is not None: + missing.extend(missing_found) - if missing is not None: - missing.extend( - [result.entity for result in lookup_response.missing]) - - if deferred is not None: - deferred.extend([key for key in lookup_response.deferred]) - break - - if not lookup_response.deferred: - break - - # We have deferred keys, and the user didn't ask to know about - # them, so retry (but only with the deferred ones). - _copy_deferred_keys(lookup_request, lookup_response) + if deferred is not None: + deferred.extend(deferred_found) if single_key: if results: @@ -266,7 +261,7 @@ def lookup(self, dataset_id, key_pbs, missing=None, deferred=None): return results - def run_query(self, dataset_id, query_pb, namespace=None): + def run_query(self, dataset_id, query_pb, namespace=None, eventual=False): """Run a query on the Cloud Datastore. Maps the ``DatastoreService.RunQuery`` protobuf RPC. @@ -310,8 +305,15 @@ def run_query(self, dataset_id, query_pb, namespace=None): :type namespace: string :param namespace: The namespace over which to run the query. + + :type eventual: bool + :param eventual: If False (the default), request ``STRONG`` read + consistency. If True, request ``EVENTUAL`` read + consistency. If the connection has a current + transaction, this value *must* be false. """ request = datastore_pb.RunQueryRequest() + self._set_read_options(request, eventual) if namespace: request.partition_id.namespace = namespace @@ -514,6 +516,51 @@ def delete_entities(self, dataset_id, key_pbs): return True + def _lookup(self, lookup_request, dataset_id, stop_on_deferred): + """Repeat lookup until all keys found (unless stop requested). + + Helper method for ``lookup()``. + """ + results = [] + missing = [] + deferred = [] + while True: # loop against possible deferred. + lookup_response = self._rpc(dataset_id, 'lookup', lookup_request, + datastore_pb.LookupResponse) + + results.extend( + [result.entity for result in lookup_response.found]) + + missing.extend( + [result.entity for result in lookup_response.missing]) + + if stop_on_deferred: + deferred.extend([key for key in lookup_response.deferred]) + break + + if not lookup_response.deferred: + break + + # We have deferred keys, and the user didn't ask to know about + # them, so retry (but only with the deferred ones). + _copy_deferred_keys(lookup_request, lookup_response) + return results, missing, deferred + + def _set_read_options(self, request, eventual): + """Validate rules for read options, and assign to the request. + + Helper method for ``lookup()`` and ``run_query``. + """ + transaction = self.transaction() + if eventual and transaction: + raise ValueError('eventual must be False when in a transaction') + + opts = request.read_options + if eventual: + opts.read_consistency = datastore_pb.ReadOptions.EVENTUAL + elif transaction: + opts.transaction = transaction + def _copy_deferred_keys(lookup_request, lookup_response): """Clear requested keys and copy deferred keys back in. diff --git a/gcloud/datastore/test_connection.py b/gcloud/datastore/test_connection.py index 752caa6be8cc..ba68138bd1b8 100644 --- a/gcloud/datastore/test_connection.py +++ b/gcloud/datastore/test_connection.py @@ -231,6 +231,77 @@ def test_lookup_single_key_empty_response(self): self.assertEqual(len(keys), 1) self.assertEqual(keys[0], key_pb) + def test_lookup_single_key_empty_response_w_eventual(self): + from gcloud.datastore.connection import datastore_pb + from gcloud.datastore.key import Key + + DATASET_ID = 'DATASET' + key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf() + rsp_pb = datastore_pb.LookupResponse() + conn = self._makeOne() + URI = '/'.join([ + conn.API_BASE_URL, + 'datastore', + conn.API_VERSION, + 'datasets', + DATASET_ID, + 'lookup', + ]) + http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) + self.assertEqual(conn.lookup(DATASET_ID, key_pb, eventual=True), None) + cw = http._called_with + self._verifyProtobufCall(cw, URI, conn) + rq_class = datastore_pb.LookupRequest + request = rq_class() + request.ParseFromString(cw['body']) + keys = list(request.key) + self.assertEqual(len(keys), 1) + self.assertEqual(keys[0], key_pb) + self.assertEqual(request.read_options.read_consistency, + datastore_pb.ReadOptions.EVENTUAL) + self.assertEqual(request.read_options.transaction, '') + + def test_lookup_single_key_empty_response_w_eventual_and_transaction(self): + from gcloud.datastore.key import Key + + DATASET_ID = 'DATASET' + TRANSACTION = 'TRANSACTION' + key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf() + conn = self._makeOne() + conn.transaction(TRANSACTION) + self.assertRaises( + ValueError, conn.lookup, DATASET_ID, key_pb, eventual=True) + + def test_lookup_single_key_empty_response_w_transaction(self): + from gcloud.datastore.connection import datastore_pb + from gcloud.datastore.key import Key + + DATASET_ID = 'DATASET' + TRANSACTION = 'TRANSACTION' + key_pb = Key(path=[{'kind': 'Kind', 'id': 1234}]).to_protobuf() + rsp_pb = datastore_pb.LookupResponse() + conn = self._makeOne() + conn.transaction(TRANSACTION) + URI = '/'.join([ + conn.API_BASE_URL, + 'datastore', + conn.API_VERSION, + 'datasets', + DATASET_ID, + 'lookup', + ]) + http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) + self.assertEqual(conn.lookup(DATASET_ID, key_pb), None) + cw = http._called_with + self._verifyProtobufCall(cw, URI, conn) + rq_class = datastore_pb.LookupRequest + request = rq_class() + request.ParseFromString(cw['body']) + keys = list(request.key) + self.assertEqual(len(keys), 1) + self.assertEqual(keys[0], key_pb) + self.assertEqual(request.read_options.transaction, TRANSACTION) + def test_lookup_single_key_nonempty_response(self): from gcloud.datastore.connection import datastore_pb from gcloud.datastore.key import Key @@ -443,6 +514,106 @@ def test_lookup_multiple_keys_w_deferred_from_backend_but_not_passed(self): self.assertEqual(len(keys), 1) self.assertEqual(keys[0], key_pb2) + def test_run_query_w_eventual_no_transaction(self): + from gcloud.datastore.connection import datastore_pb + from gcloud.datastore.query import Query + + DATASET_ID = 'DATASET' + KIND = 'Nonesuch' + CURSOR = b'\x00' + q_pb = Query(KIND, DATASET_ID).to_protobuf() + rsp_pb = datastore_pb.RunQueryResponse() + rsp_pb.batch.end_cursor = CURSOR + no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS + rsp_pb.batch.more_results = no_more + rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL + conn = self._makeOne() + URI = '/'.join([ + conn.API_BASE_URL, + 'datastore', + conn.API_VERSION, + 'datasets', + DATASET_ID, + 'runQuery', + ]) + http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) + pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb, + eventual=True) + self.assertEqual(pbs, []) + self.assertEqual(end, CURSOR) + self.assertTrue(more) + self.assertEqual(skipped, 0) + cw = http._called_with + self._verifyProtobufCall(cw, URI, conn) + rq_class = datastore_pb.RunQueryRequest + request = rq_class() + request.ParseFromString(cw['body']) + self.assertEqual(request.partition_id.namespace, '') + self.assertEqual(request.query, q_pb) + self.assertEqual(request.read_options.read_consistency, + datastore_pb.ReadOptions.EVENTUAL) + self.assertEqual(request.read_options.transaction, '') + + def test_run_query_wo_eventual_w_transaction(self): + from gcloud.datastore.connection import datastore_pb + from gcloud.datastore.query import Query + + DATASET_ID = 'DATASET' + KIND = 'Nonesuch' + CURSOR = b'\x00' + TRANSACTION = 'TRANSACTION' + q_pb = Query(KIND, DATASET_ID).to_protobuf() + rsp_pb = datastore_pb.RunQueryResponse() + rsp_pb.batch.end_cursor = CURSOR + no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS + rsp_pb.batch.more_results = no_more + rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL + conn = self._makeOne() + conn.transaction(TRANSACTION) + URI = '/'.join([ + conn.API_BASE_URL, + 'datastore', + conn.API_VERSION, + 'datasets', + DATASET_ID, + 'runQuery', + ]) + http = conn._http = Http({'status': '200'}, rsp_pb.SerializeToString()) + pbs, end, more, skipped = conn.run_query(DATASET_ID, q_pb) + self.assertEqual(pbs, []) + self.assertEqual(end, CURSOR) + self.assertTrue(more) + self.assertEqual(skipped, 0) + cw = http._called_with + self._verifyProtobufCall(cw, URI, conn) + rq_class = datastore_pb.RunQueryRequest + request = rq_class() + request.ParseFromString(cw['body']) + self.assertEqual(request.partition_id.namespace, '') + self.assertEqual(request.query, q_pb) + self.assertEqual(request.read_options.read_consistency, + datastore_pb.ReadOptions.DEFAULT) + self.assertEqual(request.read_options.transaction, TRANSACTION) + + def test_run_query_w_eventual_and_transaction(self): + from gcloud.datastore.connection import datastore_pb + from gcloud.datastore.query import Query + + DATASET_ID = 'DATASET' + KIND = 'Nonesuch' + CURSOR = b'\x00' + TRANSACTION = 'TRANSACTION' + q_pb = Query(KIND, DATASET_ID).to_protobuf() + rsp_pb = datastore_pb.RunQueryResponse() + rsp_pb.batch.end_cursor = CURSOR + no_more = datastore_pb.QueryResultBatch.NO_MORE_RESULTS + rsp_pb.batch.more_results = no_more + rsp_pb.batch.entity_result_type = datastore_pb.EntityResult.FULL + conn = self._makeOne() + conn.transaction(TRANSACTION) + self.assertRaises( + ValueError, conn.run_query, DATASET_ID, q_pb, eventual=True) + def test_run_query_wo_namespace_empty_result(self): from gcloud.datastore.connection import datastore_pb from gcloud.datastore.query import Query