Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using protobuf CommitRequest in datastore Connection.commit. #1341

Merged
merged 1 commit into from
Jan 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions gcloud/datastore/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ class Batch(object):
:type client: :class:`gcloud.datastore.client.Client`
:param client: The client used to connect to datastore.
"""

_id = None # "protected" attribute, always None for non-transactions

def __init__(self, client):
self._client = client
self._mutation = _datastore_pb2.Mutation()
self._commit_request = _datastore_pb2.CommitRequest()
self._partial_key_entities = []

def current(self):
Expand Down Expand Up @@ -114,6 +115,9 @@ def _add_complete_key_entity_pb(self):
:returns: The newly created entity protobuf that will be
updated and sent with a commit.
"""
# We use ``upsert`` for entities with completed keys, rather than
# ``insert`` or ``update``, in order not to create race conditions
# based on prior existence / removal of the entity.
return self.mutations.upsert.add()

def _add_delete_key_pb(self):
Expand All @@ -129,17 +133,16 @@ def _add_delete_key_pb(self):
def mutations(self):
"""Getter for the changes accumulated by this batch.

Every batch is committed with a single Mutation
representing the 'work' to be done as part of the batch.
Inside a batch, calling :meth:`put` with an entity, or
:meth:`delete` with a key, builds up the mutation.
This getter returns the Mutation protobuf that
has been built-up so far.
Every batch is committed with a single commit request containing all
the work to be done as mutations. Inside a batch, calling :meth:`put`
with an entity, or :meth:`delete` with a key, builds up the request by
adding a new mutation. This getter returns the protobuf that has been
built-up so far.

:rtype: :class:`gcloud.datastore._generated.datastore_pb2.Mutation`
:returns: The Mutation protobuf to be sent in the commit request.
"""
return self._mutation
return self._commit_request.mutation

def put(self, entity):
"""Remember an entity's state to be saved during :meth:`commit`.
Expand All @@ -156,8 +159,8 @@ def put(self, entity):
"bytes" ('str' in Python2, 'bytes' in Python3) map to 'blob_value'.

When an entity has a partial key, calling :meth:`commit` sends it as
an ``insert_auto_id`` mutation and the key is completed. On return, the
key for the ``entity`` passed in as updated to match the key ID
an ``insert_auto_id`` mutation and the key is completed. On return,
the key for the ``entity`` passed in is updated to match the key ID
assigned by the server.

:type entity: :class:`gcloud.datastore.entity.Entity`
Expand Down Expand Up @@ -212,11 +215,10 @@ def commit(self):
context manager.
"""
_, updated_keys = self.connection.commit(
self.dataset_id, self.mutations, self._id)
self.dataset_id, self._commit_request, self._id)
# If the back-end returns without error, we are guaranteed that
# the response's 'insert_auto_id_key' will match (length and order)
# the request's 'insert_auto_id` entities, which are derived from
# our '_partial_key_entities' (no partial success).
# :meth:`Connection.commit` will return keys that match (length and
# order) directly ``_partial_key_entities``.
for new_key_pb, entity in zip(updated_keys,
self._partial_key_entities):
new_id = new_key_pb.path_element[-1].id
Expand Down
10 changes: 5 additions & 5 deletions gcloud/datastore/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,16 +293,16 @@ def begin_transaction(self, dataset_id):
_datastore_pb2.BeginTransactionResponse)
return response.transaction

def commit(self, dataset_id, mutation_pb, transaction_id):
"""Commit dataset mutations in context of current transation (if any).
def commit(self, dataset_id, commit_request, transaction_id):
"""Commit mutations in context of current transation (if any).

Maps the ``DatastoreService.Commit`` protobuf RPC.

:type dataset_id: string
:param dataset_id: The ID dataset to which the transaction applies.

:type mutation_pb: :class:`._generated.datastore_pb2.Mutation`
:param mutation_pb: The protobuf for the mutations being saved.
:type commit_request: :class:`._generated.datastore_pb2.CommitRequest`
:param commit_request: The protobuf with the mutations being committed.

:type transaction_id: string or None
:param transaction_id: The transaction ID returned from
Expand All @@ -315,14 +315,14 @@ def commit(self, dataset_id, mutation_pb, transaction_id):
that was completed in the commit.
"""
request = _datastore_pb2.CommitRequest()
request.CopyFrom(commit_request)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


if transaction_id:
request.mode = _datastore_pb2.CommitRequest.TRANSACTIONAL
request.transaction = transaction_id
else:
request.mode = _datastore_pb2.CommitRequest.NON_TRANSACTIONAL

request.mutation.CopyFrom(mutation_pb)
response = self._rpc(dataset_id, 'commit', request,
_datastore_pb2.CommitResponse)
return _parse_commit_response(response)
Expand Down
14 changes: 7 additions & 7 deletions gcloud/datastore/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def test_commit(self):
batch.commit()

self.assertEqual(connection._committed,
[(_DATASET, batch.mutations, None)])
[(_DATASET, batch._commit_request, None)])

def test_commit_w_partial_key_entities(self):
_DATASET = 'DATASET'
Expand All @@ -225,7 +225,7 @@ def test_commit_w_partial_key_entities(self):
batch.commit()

self.assertEqual(connection._committed,
[(_DATASET, batch.mutations, None)])
[(_DATASET, batch._commit_request, None)])
self.assertFalse(entity.key.is_partial)
self.assertEqual(entity.key._id, _NEW_ID)

Expand All @@ -248,7 +248,7 @@ def test_as_context_mgr_wo_error(self):
mutated_entity = _mutated_pb(self, batch.mutations, 'upsert')
self.assertEqual(mutated_entity.key, key._key)
self.assertEqual(connection._committed,
[(_DATASET, batch.mutations, None)])
[(_DATASET, batch._commit_request, None)])

def test_as_context_mgr_nested(self):
_DATASET = 'DATASET'
Expand Down Expand Up @@ -280,8 +280,8 @@ def test_as_context_mgr_nested(self):
self.assertEqual(mutated_entity2.key, key2._key)

self.assertEqual(connection._committed,
[(_DATASET, batch2.mutations, None),
(_DATASET, batch1.mutations, None)])
[(_DATASET, batch2._commit_request, None),
(_DATASET, batch1._commit_request, None)])

def test_as_context_mgr_w_error(self):
_DATASET = 'DATASET'
Expand Down Expand Up @@ -329,8 +329,8 @@ def __init__(self, *new_keys):
self._committed = []
self._index_updates = 0

def commit(self, dataset_id, mutation, transaction_id):
self._committed.append((dataset_id, mutation, transaction_id))
def commit(self, dataset_id, commit_request, transaction_id):
self._committed.append((dataset_id, commit_request, transaction_id))
return self._index_updates, self._completed_keys


Expand Down
14 changes: 8 additions & 6 deletions gcloud/datastore/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,9 +625,10 @@ def test_put_multi_no_batch_w_partial_key(self):
self.assertTrue(result is None)

self.assertEqual(len(client.connection._commit_cw), 1)
dataset_id, mutation, transaction_id = client.connection._commit_cw[0]
(dataset_id,
commit_req, transaction_id) = client.connection._commit_cw[0]
self.assertEqual(dataset_id, self.DATASET_ID)
inserts = list(mutation.insert_auto_id)
inserts = list(commit_req.mutation.insert_auto_id)
self.assertEqual(len(inserts), 1)
self.assertEqual(inserts[0].key, key.to_protobuf())

Expand Down Expand Up @@ -697,9 +698,10 @@ def test_delete_multi_no_batch(self):
result = client.delete_multi([key])
self.assertEqual(result, None)
self.assertEqual(len(client.connection._commit_cw), 1)
dataset_id, mutation, transaction_id = client.connection._commit_cw[0]
(dataset_id,
commit_req, transaction_id) = client.connection._commit_cw[0]
self.assertEqual(dataset_id, self.DATASET_ID)
self.assertEqual(list(mutation.delete), [key.to_protobuf()])
self.assertEqual(list(commit_req.mutation.delete), [key.to_protobuf()])
self.assertTrue(transaction_id is None)

def test_delete_multi_w_existing_batch(self):
Expand Down Expand Up @@ -1012,8 +1014,8 @@ def lookup(self, dataset_id, key_pbs, eventual=False, transaction_id=None):
results, missing, deferred = triple
return results, missing, deferred

def commit(self, dataset_id, mutation, transaction_id):
self._commit_cw.append((dataset_id, mutation, transaction_id))
def commit(self, dataset_id, commit_request, transaction_id):
self._commit_cw.append((dataset_id, commit_request, transaction_id))
response, self._commit = self._commit[0], self._commit[1:]
return self._index_updates, response

Expand Down
10 changes: 6 additions & 4 deletions gcloud/datastore/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,8 @@ def test_commit_wo_transaction(self):
DATASET_ID = 'DATASET'
key_pb = self._make_key_pb(DATASET_ID)
rsp_pb = datastore_pb2.CommitResponse()
mutation = datastore_pb2.Mutation()
req_pb = datastore_pb2.CommitRequest()
mutation = req_pb.mutation
insert = mutation.upsert.add()
insert.key.CopyFrom(key_pb)
value_pb = _new_value_pb(insert, 'foo')
Expand All @@ -700,7 +701,7 @@ def mock_parse(response):
return expected_result

with _Monkey(MUT, _parse_commit_response=mock_parse):
result = conn.commit(DATASET_ID, mutation, None)
result = conn.commit(DATASET_ID, req_pb, None)

self.assertTrue(result is expected_result)
cw = http._called_with
Expand All @@ -722,7 +723,8 @@ def test_commit_w_transaction(self):
DATASET_ID = 'DATASET'
key_pb = self._make_key_pb(DATASET_ID)
rsp_pb = datastore_pb2.CommitResponse()
mutation = datastore_pb2.Mutation()
req_pb = datastore_pb2.CommitRequest()
mutation = req_pb.mutation
insert = mutation.upsert.add()
insert.key.CopyFrom(key_pb)
value_pb = _new_value_pb(insert, 'foo')
Expand All @@ -747,7 +749,7 @@ def mock_parse(response):
return expected_result

with _Monkey(MUT, _parse_commit_response=mock_parse):
result = conn.commit(DATASET_ID, mutation, b'xact')
result = conn.commit(DATASET_ID, req_pb, b'xact')

self.assertTrue(result is expected_result)
cw = http._called_with
Expand Down
19 changes: 11 additions & 8 deletions gcloud/datastore/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ def test_commit_no_partial_keys(self):
connection = _Connection(234)
client = _Client(_DATASET, connection)
xact = self._makeOne(client)
xact._mutation = mutation = object()
xact._commit_request = commit_request = object()
xact.begin()
xact.commit()
self.assertEqual(connection._committed, (_DATASET, mutation, 234))
self.assertEqual(connection._committed,
(_DATASET, commit_request, 234))
self.assertEqual(xact.id, None)

def test_commit_w_partial_keys(self):
Expand All @@ -118,10 +119,11 @@ def test_commit_w_partial_keys(self):
xact = self._makeOne(client)
entity = _Entity()
xact.put(entity)
xact._mutation = mutation = object()
xact._commit_request = commit_request = object()
xact.begin()
xact.commit()
self.assertEqual(connection._committed, (_DATASET, mutation, 234))
self.assertEqual(connection._committed,
(_DATASET, commit_request, 234))
self.assertEqual(xact.id, None)
self.assertEqual(entity.key.path, [{'kind': _KIND, 'id': _ID}])

Expand All @@ -130,11 +132,12 @@ def test_context_manager_no_raise(self):
connection = _Connection(234)
client = _Client(_DATASET, connection)
xact = self._makeOne(client)
xact._mutation = mutation = object()
xact._commit_request = commit_request = object()
with xact:
self.assertEqual(xact.id, 234)
self.assertEqual(connection._begun, _DATASET)
self.assertEqual(connection._committed, (_DATASET, mutation, 234))
self.assertEqual(connection._committed,
(_DATASET, commit_request, 234))
self.assertEqual(xact.id, None)

def test_context_manager_w_raise(self):
Expand Down Expand Up @@ -186,8 +189,8 @@ def begin_transaction(self, dataset_id):
def rollback(self, dataset_id, transaction_id):
self._rolled_back = dataset_id, transaction_id

def commit(self, dataset_id, mutation, transaction_id):
self._committed = (dataset_id, mutation, transaction_id)
def commit(self, dataset_id, commit_request, transaction_id):
self._committed = (dataset_id, commit_request, transaction_id)
return self._index_updates, self._completed_keys


Expand Down