From f2704ad3691e04ed7124f335a15844749b145d8a Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Fri, 19 Jan 2018 00:43:55 -0800 Subject: [PATCH] BigTable: PartialRowsData iterator --- bigtable/google/cloud/bigtable/row_data.py | 124 ++++++++++++--------- bigtable/tests/system.py | 17 +++ bigtable/tests/unit/test_row_data.py | 46 +++++++- 3 files changed, 128 insertions(+), 59 deletions(-) diff --git a/bigtable/google/cloud/bigtable/row_data.py b/bigtable/google/cloud/bigtable/row_data.py index 0c4ecdacced9b..82017ae963807 100644 --- a/bigtable/google/cloud/bigtable/row_data.py +++ b/bigtable/google/cloud/bigtable/row_data.py @@ -221,6 +221,69 @@ def __eq__(self, other): def __ne__(self, other): return not self == other + def __iter__(self): + return self._consume_next(True) + + def _consume_next(self, yield_=False): + """ Helper for consume_next. + + :type yield_: bool + :param yield_: if True, yields rows as they complete, + else finish iteration of the response_iterator + """ + while True: + response = six.next(self._response_iterator) + self._counter += 1 + + if self._last_scanned_row_key is None: # first response + if response.last_scanned_row_key: + raise InvalidReadRowsResponse() + + self._last_scanned_row_key = response.last_scanned_row_key + + row = self._row + cell = self._cell + + for chunk in response.chunks: + + self._validate_chunk(chunk) + + if chunk.reset_row: + row = self._row = None + cell = self._cell = self._previous_cell = None + continue + + if row is None: + row = self._row = PartialRowData(chunk.row_key) + + if cell is None: + qualifier = None + if chunk.HasField('qualifier'): + qualifier = chunk.qualifier.value + + cell = self._cell = PartialCellData( + chunk.row_key, + chunk.family_name.value, + qualifier, + chunk.timestamp_micros, + chunk.labels, + chunk.value) + self._copy_from_previous(cell) + else: + cell.append_value(chunk.value) + + if chunk.commit_row: + self._save_current_row() + if yield_: + yield self._previous_row + row = cell = None + continue + + if chunk.value_size == 0: + self._save_current_cell() + cell = None + break + @property def state(self): """State machine state. @@ -262,54 +325,10 @@ def consume_next(self): Parse the response and its chunks into a new/existing row in :attr:`_rows`. Rows are returned in order by row key. """ - response = six.next(self._response_iterator) - self._counter += 1 - - if self._last_scanned_row_key is None: # first response - if response.last_scanned_row_key: - raise InvalidReadRowsResponse() - - self._last_scanned_row_key = response.last_scanned_row_key - - row = self._row - cell = self._cell - - for chunk in response.chunks: - - self._validate_chunk(chunk) - - if chunk.reset_row: - row = self._row = None - cell = self._cell = self._previous_cell = None - continue - - if row is None: - row = self._row = PartialRowData(chunk.row_key) - - if cell is None: - qualifier = None - if chunk.HasField('qualifier'): - qualifier = chunk.qualifier.value - - cell = self._cell = PartialCellData( - chunk.row_key, - chunk.family_name.value, - qualifier, - chunk.timestamp_micros, - chunk.labels, - chunk.value) - self._copy_from_previous(cell) - else: - cell.append_value(chunk.value) - - if chunk.commit_row: - self._save_current_row() - row = cell = None - continue - - if chunk.value_size == 0: - self._save_current_cell() - cell = None + try: + next(self._consume_next(False)) + except StopIteration: + return False def consume_all(self, max_loops=None): """Consume the streamed responses until there are no more. @@ -324,13 +343,12 @@ def consume_all(self, max_loops=None): """ curr_loop = 0 if max_loops is None: - max_loops = float('inf') + while True: + if self.consume_next() is False: # guard against None + return while curr_loop < max_loops: curr_loop += 1 - try: - self.consume_next() - except StopIteration: - break + self.consume_next() @staticmethod def _validate_chunk_status(chunk): diff --git a/bigtable/tests/system.py b/bigtable/tests/system.py index c889b181673ec..7fc0dad0dcf66 100644 --- a/bigtable/tests/system.py +++ b/bigtable/tests/system.py @@ -401,6 +401,23 @@ def test_read_row(self): } self.assertEqual(partial_row_data.cells, expected_row_contents) + def test_read_rows_iter(self): + row = self._table.row(ROW_KEY) + row_alt = self._table.row(ROW_KEY_ALT) + self.rows_to_delete.extend([row, row_alt]) + + cell1, cell2, cell3, cell4 = self._write_to_row(row, row_alt, + row, row_alt) + row.commit() + row_alt.commit() + keys = [ROW_KEY, ROW_KEY_ALT] + rows_data = self._table.read_rows() + self.assertEqual(rows_data.rows, {}) + for data, key in zip(rows_data, keys): + self.assertEqual(data.row_key, key) + self.assertEqual(data, self._table.read_row(key)) + self.assertEqual(data.cells, self._table.read_row(key).cells) + def test_read_rows(self): row = self._table.row(ROW_KEY) row_alt = self._table.row(ROW_KEY_ALT) diff --git a/bigtable/tests/unit/test_row_data.py b/bigtable/tests/unit/test_row_data.py index 375097c54e79d..594b3a6d1938d 100644 --- a/bigtable/tests/unit/test_row_data.py +++ b/bigtable/tests/unit/test_row_data.py @@ -203,9 +203,12 @@ def __init__(self, *args, **kwargs): self._consumed = [] def consume_next(self): - value = self._response_iterator.next() - self._consumed.append(value) - return value + try: + value = self._response_iterator.next() + self._consumed.append(value) + return value + except StopIteration: + return False return FakePartialRowsData @@ -522,6 +525,21 @@ def test_invalid_last_row_missing_commit(self): # Non-error cases + def test_iter(self): + values = [mock.Mock()] * 3 + chunks, results = self._load_json_test('two rows') + + for value in values: + value.chunks = chunks + response_iterator = _MockCancellableIterator(*values) + + partial_rows = self._make_one(response_iterator) + partial_rows._last_scanned_row_key = 'BEFORE' + + for data, value in zip(partial_rows, results): + flattened = self._sort_flattend_cells(_flatten_cells(data)) + self.assertEqual(flattened[0], value) + _marker = object() def _match_results(self, testcase_name, expected_result=_marker): @@ -641,12 +659,28 @@ def _flatten_cells(prd): from google.cloud._helpers import _bytes_to_unicode from google.cloud._helpers import _microseconds_from_datetime - for row_key, row in prd.rows.items(): - for family_name, family in row.cells.items(): + try: + # Flatten PartialRowsData + for row_key, row in prd.rows.items(): + for family_name, family in row.cells.items(): + for qualifier, column in family.items(): + for cell in column: + yield { + u'rk': _bytes_to_unicode(row_key), + u'fm': family_name, + u'qual': _bytes_to_unicode(qualifier), + u'ts': _microseconds_from_datetime(cell.timestamp), + u'value': _bytes_to_unicode(cell.value), + u'label': u' '.join(cell.labels), + u'error': False, + } + except AttributeError: + # Flatten PartialRowData + for family_name, family in prd.cells.items(): for qualifier, column in family.items(): for cell in column: yield { - u'rk': _bytes_to_unicode(row_key), + u'rk': _bytes_to_unicode(prd.row_key), u'fm': family_name, u'qual': _bytes_to_unicode(qualifier), u'ts': _microseconds_from_datetime(cell.timestamp),