Skip to content

Commit

Permalink
Merge pull request #299 from tseaver/272-add-allocate_ids-rpc
Browse files Browse the repository at this point in the history
Fix #272: add 'allocate_ids' RPC to connection
  • Loading branch information
tseaver committed Oct 28, 2014
2 parents d094d30 + 162f508 commit 7d155f7
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 195 deletions.
202 changes: 117 additions & 85 deletions gcloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,52 +153,77 @@ def dataset(self, *args, **kwargs):
kwargs['connection'] = self
return Dataset(*args, **kwargs)

def begin_transaction(self, dataset_id, serializable=False):
"""Begin a transaction.
:type dataset_id: string
:param dataset_id: The dataset over which to execute the transaction.
"""
def lookup(self, dataset_id, key_pbs):
"""Lookup keys from a dataset in the Cloud Datastore.
if self.transaction():
raise ValueError('Cannot start a transaction with another already '
'in progress.')
Maps the ``DatastoreService.Lookup`` protobuf RPC.
request = datastore_pb.BeginTransactionRequest()
This method deals only with protobufs
(:class:`gcloud.datastore.datastore_v1_pb2.Key`
and
:class:`gcloud.datastore.datastore_v1_pb2.Entity`)
and is used under the hood for methods like
:func:`gcloud.datastore.dataset.Dataset.get_entity`:
if serializable:
request.isolation_level = (
datastore_pb.BeginTransactionRequest.SERIALIZABLE)
else:
request.isolation_level = (
datastore_pb.BeginTransactionRequest.SNAPSHOT)
>>> from gcloud import datastore
>>> from gcloud.datastore.key import Key
>>> connection = datastore.get_connection(email, key_path)
>>> dataset = connection.dataset('dataset-id')
>>> key = Key(dataset=dataset).kind('MyKind').id(1234)
response = self._rpc(dataset_id, 'beginTransaction', request,
datastore_pb.BeginTransactionResponse)
Using the :class:`gcloud.datastore.dataset.Dataset` helper:
return response.transaction
>>> dataset.get_entity(key)
<Entity object>
def rollback_transaction(self, dataset_id):
"""Rollback the connection's existing transaction.
Using the ``connection`` class directly:
Raises a ``ValueError``
if the connection isn't currently in a transaction.
>>> connection.lookup('dataset-id', key.to_protobuf())
<Entity protobuf>
:type dataset_id: string
:param dataset_id: The dataset to which the transaction belongs.
:param dataset_id: The dataset to look up the keys.
:type key_pbs: list of :class:`gcloud.datastore.datastore_v1_pb2.Key`
(or a single Key)
:param key_pbs: The key (or keys) to retrieve from the datastore.
:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity`
(or a single Entity)
:returns: The entities corresponding to the keys provided.
If a single key was provided and no results matched,
this will return None.
If multiple keys were provided and no results matched,
this will return an empty list.
"""
if not self.transaction() or not self.transaction().id():
raise ValueError('No transaction to rollback.')
lookup_request = datastore_pb.LookupRequest()

request = datastore_pb.RollbackRequest()
request.transaction = self.transaction().id()
# Nothing to do with this response, so just execute the method.
self._rpc(dataset_id, 'rollback', request,
datastore_pb.RollbackResponse)
single_key = isinstance(key_pbs, datastore_pb.Key)

if single_key:
key_pbs = [key_pbs]

for key_pb in key_pbs:
lookup_request.key.add().CopyFrom(key_pb)

lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)

results = [result.entity for result in lookup_response.found]

if single_key:
if results:
return results[0]
else:
return None

return results

def run_query(self, dataset_id, query_pb, namespace=None):
"""Run a query on the Cloud Datastore.
Maps the ``DatastoreService.RunQuery`` protobuf RPC.
Given a Query protobuf,
sends a ``runQuery`` request to the Cloud Datastore API
and returns a list of entity protobufs matching the query.
Expand Down Expand Up @@ -250,73 +275,38 @@ def run_query(self, dataset_id, query_pb, namespace=None):
response.batch.skipped_results,
)

def lookup(self, dataset_id, key_pbs):
"""Lookup keys from a dataset in the Cloud Datastore.
This method deals only with protobufs
(:class:`gcloud.datastore.datastore_v1_pb2.Key`
and
:class:`gcloud.datastore.datastore_v1_pb2.Entity`)
and is used under the hood for methods like
:func:`gcloud.datastore.dataset.Dataset.get_entity`:
>>> from gcloud import datastore
>>> from gcloud.datastore.key import Key
>>> connection = datastore.get_connection(email, key_path)
>>> dataset = connection.dataset('dataset-id')
>>> key = Key(dataset=dataset).kind('MyKind').id(1234)
Using the :class:`gcloud.datastore.dataset.Dataset` helper:
>>> dataset.get_entity(key)
<Entity object>
Using the ``connection`` class directly:
def begin_transaction(self, dataset_id, serializable=False):
"""Begin a transaction.
>>> connection.lookup('dataset-id', key.to_protobuf())
<Entity protobuf>
Maps the ``DatastoreService.BeginTransaction`` protobuf RPC.
:type dataset_id: string
:param dataset_id: The dataset to look up the keys.
:type key_pbs: list of :class:`gcloud.datastore.datastore_v1_pb2.Key`
(or a single Key)
:param key_pbs: The key (or keys) to retrieve from the datastore.
:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Entity`
(or a single Entity)
:returns: The entities corresponding to the keys provided.
If a single key was provided and no results matched,
this will return None.
If multiple keys were provided and no results matched,
this will return an empty list.
:param dataset_id: The dataset over which to execute the transaction.
"""
lookup_request = datastore_pb.LookupRequest()

single_key = isinstance(key_pbs, datastore_pb.Key)

if single_key:
key_pbs = [key_pbs]

for key_pb in key_pbs:
lookup_request.key.add().CopyFrom(key_pb)
if self.transaction():
raise ValueError('Cannot start a transaction with another already '
'in progress.')

lookup_response = self._rpc(dataset_id, 'lookup', lookup_request,
datastore_pb.LookupResponse)
request = datastore_pb.BeginTransactionRequest()

results = [result.entity for result in lookup_response.found]
if serializable:
request.isolation_level = (
datastore_pb.BeginTransactionRequest.SERIALIZABLE)
else:
request.isolation_level = (
datastore_pb.BeginTransactionRequest.SNAPSHOT)

if single_key:
if results:
return results[0]
else:
return None
response = self._rpc(dataset_id, 'beginTransaction', request,
datastore_pb.BeginTransactionResponse)

return results
return response.transaction

def commit(self, dataset_id, mutation_pb):
"""Commit dataset mutations in context of current transation (if any).
Maps the ``DatastoreService.Commit`` protobuf RPC.
:type dataset_id: string
:param dataset_id: The dataset in which to perform the changes.
Expand All @@ -339,6 +329,48 @@ def commit(self, dataset_id, mutation_pb):
datastore_pb.CommitResponse)
return response.mutation_result

def rollback(self, dataset_id):
"""Rollback the connection's existing transaction.
Maps the ``DatastoreService.Rollback`` protobuf RPC.
Raises a ``ValueError``
if the connection isn't currently in a transaction.
:type dataset_id: string
:param dataset_id: The dataset to which the transaction belongs.
"""
if not self.transaction() or not self.transaction().id():
raise ValueError('No transaction to rollback.')

request = datastore_pb.RollbackRequest()
request.transaction = self.transaction().id()
# Nothing to do with this response, so just execute the method.
self._rpc(dataset_id, 'rollback', request,
datastore_pb.RollbackResponse)

def allocate_ids(self, dataset_id, key_pbs):
"""Obtain backend-generated IDs for a set of keys.
Maps the ``DatastoreService.AllocateIds`` protobuf RPC.
:type dataset_id: string
:param dataset_id: The dataset to which the transaction belongs.
:type key_pbs: list of :class:`gcloud.datastore.datastore_v1_pb2.Key`
:param key_pbs: The keys for which the backend should allocate IDs.
:rtype: list of :class:`gcloud.datastore.datastore_v1_pb2.Key`
:returns: An equal number of keys, with IDs filled in by the backend.
"""
request = datastore_pb.AllocateIdsRequest()
for key_pb in key_pbs:
request.key.add().CopyFrom(key_pb)
# Nothing to do with this response, so just execute the method.
response = self._rpc(dataset_id, 'allocateIds', request,
datastore_pb.AllocateIdsResponse)
return list(response.key)

def save_entity(self, dataset_id, key_pb, properties):
"""Save an entity to the Cloud Datastore with the provided properties.
Expand Down
Loading

0 comments on commit 7d155f7

Please sign in to comment.