-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bigtable records bulk update (MutateRows RPC call) #3401
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
It would be good to add a system test or two, which can run against the emulator or the live service.
https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigtable/tests/system.py
|
||
|
||
def _mutate_rows_request(table_name, rows): | ||
"""Creates a request to read rows in a table. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
"""Creates a request to read rows in a table. | ||
|
||
:type table_name: str | ||
:param table_name: The name of the table to read from. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
mutations_count += 1 | ||
entry.mutations.add().CopyFrom(mutation) | ||
if mutations_count > 100000: | ||
raise TooManyMutationsError('Maximum number of the entries mutations ' |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
|
||
def _check_rows(table_name, rows): | ||
"""Checks that all rows belong to the table. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -29,6 +29,14 @@ | |||
from google.cloud.bigtable.row_data import PartialRowsData | |||
|
|||
|
|||
class RowBelongingError(Exception): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
@property | ||
def table(self): | ||
return self._table |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
|
||
class TooManyMutationsError(Exception): | ||
"""The number of mutations for bulk request is too big.""" |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
:rtype: list | ||
:returns: A list of tuples (``MutateRowsResponse.Entry`` protobuf | ||
corresponding to the errors, :class:`.DirectRow`) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
for response in responses: | ||
for entry in response.entries: | ||
if not entry.status.code: | ||
rows[entry.index].clear() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
responses = client._data_stub.MutateRows(mutate_rows_request) | ||
for response in responses: | ||
for entry in response.entries: | ||
if not entry.status.code: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigtable/tests/unit/test_row.py
Outdated
@@ -16,6 +16,26 @@ | |||
import unittest | |||
|
|||
|
|||
class TestRow(unittest.TestCase): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigtable/tests/unit/test_table.py
Outdated
def test__mutate_rows_too_many_mutations(self): | ||
from google.cloud.bigtable.table import TooManyMutationsError | ||
from google.cloud.bigtable.row import DirectRow | ||
from google.cloud.bigtable._generated.data_pb2 import Mutation |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigtable/tests/unit/test_table.py
Outdated
rows = [DirectRow(row_key=b'row_key', table='table'), | ||
DirectRow(row_key=b'row_key_2', table='table')] | ||
rows[0]._pb_mutations = [mutation for _ in range(0, 50000)] | ||
rows[1]._pb_mutations = [mutation for _ in range(0, 50001)] |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigtable/tests/unit/test_table.py
Outdated
@@ -569,6 +616,11 @@ def _SampleRowKeysRequestPB(*args, **kw): | |||
|
|||
return messages_v2_pb2.SampleRowKeysRequest(*args, **kw) | |||
|
|||
def _MutateRowsRequestPB(*args, **kw): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigtable/tests/unit/test_table.py
Outdated
@@ -16,6 +16,53 @@ | |||
import unittest | |||
|
|||
|
|||
class Test___mutate_rows_request(unittest.TestCase): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# Conflicts: # bigtable/google/cloud/bigtable/row.py
Also, could you please take a look why cover failed? Strings it points to are covered... Can't figure out. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very close from my perspective!
:rtype: :class:`data_messages_v2_pb2.MutateRowsRequest` | ||
:returns: The ``MutateRowsRequest`` protobuf corresponding to the inputs. | ||
:raises: :exc:`~.table.TooManyMutationsError` if the number of mutations is | ||
grater than 100,000 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -276,6 +288,34 @@ def read_rows(self, start_key=None, end_key=None, limit=None, | |||
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` | |||
return PartialRowsData(response_iterator) | |||
|
|||
def mutate_rows(self, rows): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Each element of the list contains a status of the corresponding row mutations.
Fixed. Could you please take another look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good from my side! We'll just need @dhermes or someone from the python team to approve.
@@ -300,21 +300,19 @@ def mutate_rows(self, rows): | |||
:param rows: List or other iterable of :class:`.DirectRow` instances. | |||
|
|||
:rtype: list | |||
:returns: A list of tuples (``MutateRowsResponse.Entry`` protobuf | |||
corresponding to the errors, :class:`.DirectRow`) | |||
:returns: A list of corresponding to each row statuses. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Thanks a lot @dhermes. We appreciate all the effort. |
Sorry I didn't have a chance today guys. I'll try to take a look this weekend / ASAP Tuesday. |
No worries @dhermes, next week is fine! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Why are only
DirectRow
instances allowed? Is that part of the
method or just part of the implementation? - ISTM that the most "robust" way to actually turn the PB response
into a list of status codes would be to keep a list of tuples
(index, status)
and then before returning from the function,
call a helper to coerce that into a list. (You could also use
a dictionary rather than a list of tuples but you want to make
sure you don't set a key that's already in the dictionary.)
for entry in response.entries: | ||
responses_statuses[entry.index] = entry.status | ||
if entry.status.code == 0: | ||
rows[entry.index].clear() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
entry.mutations.add().CopyFrom(mutation) | ||
if mutations_count > _MAX_BULK_MUTATIONS: | ||
raise TooManyMutationsError('Maximum number of mutations is %s' % | ||
_MAX_BULK_MUTATIONS) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
_check_row_type(row) | ||
entry = request_pb.entries.add() | ||
entry.row_key = row.row_key | ||
for mutation in row._get_mutations(None): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
|
||
def _check_row_table_name(table_name, row): | ||
"""Checks that a row belong to the table. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigtable/tests/system.py
Outdated
row_2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL2) | ||
row_2.commit() | ||
rows = [row_1, row_2] | ||
self.rows_to_delete.extend(rows) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigtable/tests/unit/test_table.py
Outdated
row_2 = DirectRow(row_key=b'row_key_2', table=table) | ||
row_2.set_cell('cf', b'col', b'value2') | ||
|
||
response = MutateRowsResponse() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
client = self._instance._client | ||
responses = client._data_stub.MutateRows(mutate_rows_request) | ||
|
||
responses_statuses = [None for _ in range(len(rows))] |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
responses_statuses[entry.index] = entry.status | ||
if entry.status.code == 0: | ||
rows[entry.index].clear() | ||
return responses_statuses |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
responses_statuses = [None for _ in range(len(rows))] | ||
for response in responses: | ||
for entry in response.entries: | ||
responses_statuses[entry.index] = entry.status |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
:param rows: List or other iterable of :class:`.DirectRow` instances. | ||
|
||
:rtype: list | ||
:returns: A list of corresponding to each row statuses. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
""" | ||
if not isinstance(row, DirectRow): | ||
raise TypeError('Bulk processing can not be applied for ' | ||
'conditional or append mutations.') |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigtable_pb2 as data_messages_v2_pb2) | ||
|
||
return data_messages_v2_pb2.MutateRowsRequest(*args, **kw) | ||
|
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -29,6 +29,18 @@ | |||
from google.cloud.bigtable.row_data import PartialRowsData | |||
|
|||
|
|||
# Maximum number of mutations in bulk | |||
_MAX_BULK_MUTATIONS = 100000 |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Hey! Could you please take another look? @dhermes asked: "Why are only DirectRow instances allowed? Is that part of the method or just part of the implementation?" |
So there's good news and bad news. 👍 The good news is that everyone that needs to sign a CLA (the pull request submitter and all commit authors) have done so. Everything is all good there. 😕 The bad news is that it appears that one or more commits were authored by someone other than the pull request submitter. We need to confirm that they're okay with their commits being contributed to this project. Please have them confirm that here in the pull request. Note to project maintainer: This is a terminal state, meaning the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation of bulk updates for DirectRow (MutateRows RPC call)