From 061011d0213f82ca5ccaa9dec0a12713faaa2899 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 7 Dec 2017 14:49:57 -0800 Subject: [PATCH] Checking _Rendezvous.done() when stopping Pub / Sub request generator. (#4554) --- .../cloud/pubsub_v1/subscriber/_consumer.py | 16 ++++- .../pubsub_v1/subscriber/test_consumer.py | 65 +++++++++++++++---- 2 files changed, 66 insertions(+), 15 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py b/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py index 88d1b19ab468..a0db4c547070 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py @@ -223,7 +223,7 @@ def _request_generator_thread(self, policy): _LOGGER.debug('Sending request:\n%r', request) yield request - def _stop_request_generator(self, request_generator): + def _stop_request_generator(self, request_generator, response_generator): """Ensure a request generator is closed. This **must** be done when recovering from a retry-able exception. @@ -237,12 +237,23 @@ def _stop_request_generator(self, request_generator): Args: request_generator (Generator): A streaming pull request generator returned from :meth:`_request_generator_thread`. + response_generator (grpc.Future): The gRPC bidirectional stream + object that **was** consuming the ``request_generator``. (It + will actually spawn a thread to consume the requests, but + that thread will stop once the rendezvous has a status code + set.) Returns: bool: Indicates if the generator was successfully stopped. Will be :data:`True` unless the queue is not empty and the generator is running. """ + if not response_generator.done(): + _LOGGER.debug( + 'Response generator must be done before stopping ' + 'request generator.') + return False + with self._put_lock: try: request_generator.close() @@ -322,7 +333,8 @@ def _blocking_consume(self, policy): except Exception as exc: recover = policy.on_exception(exc) if recover: - recover = self._stop_request_generator(request_generator) + recover = self._stop_request_generator( + request_generator, response_generator) if not recover: self._stop_no_join() return diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py index 047607e7f562..a05f50b412ca 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py @@ -115,18 +115,21 @@ def test_blocking_consume_on_exception(): def test_blocking_consume_two_exceptions(): - policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception')) - policy.call_rpc.side_effect = ( - (mock.sentinel.A,), - (mock.sentinel.B,), - ) + policy = mock.Mock(spec=('call_rpc', 'on_exception')) + exc1 = NameError('Oh noes.') exc2 = ValueError('Something grumble.') - policy.on_response.side_effect = (exc1, exc2) + policy.on_exception.side_effect = OnException(acceptable=exc1) + + response_generator1 = mock.MagicMock(spec=('__iter__', 'done')) + response_generator1.__iter__.side_effect = exc1 + response_generator1.done.return_value = True + response_generator2 = mock.MagicMock(spec=('__iter__', 'done')) + response_generator2.__iter__.side_effect = exc2 + policy.call_rpc.side_effect = (response_generator1, response_generator2) consumer = _consumer.Consumer() consumer._consumer_thread = mock.Mock(spec=threading.Thread) - policy.on_exception.side_effect = OnException(acceptable=exc1) # Establish that we get responses until we are sent the exiting event. consumer._blocking_consume(policy) @@ -134,8 +137,10 @@ def test_blocking_consume_two_exceptions(): # Check mocks. assert policy.call_rpc.call_count == 2 - policy.on_response.assert_has_calls( - [mock.call(mock.sentinel.A), mock.call(mock.sentinel.B)]) + response_generator1.__iter__.assert_called_once_with() + response_generator1.done.assert_called_once_with() + response_generator2.__iter__.assert_called_once_with() + response_generator2.done.assert_not_called() policy.on_exception.assert_has_calls( [mock.call(exc1), mock.call(exc2)]) @@ -179,6 +184,18 @@ def basic_queue_generator(queue, received): yield value +def test_stop_request_generator_response_not_done(): + consumer = _consumer.Consumer() + + response_generator = mock.Mock(spec=('done',)) + response_generator.done.return_value = False + stopped = consumer._stop_request_generator(None, response_generator) + assert stopped is False + + # Check mocks. + response_generator.done.assert_called_once_with() + + def test_stop_request_generator_not_running(): # Model scenario tested: # - The request generator **is not** running @@ -207,7 +224,10 @@ def test_stop_request_generator_not_running(): # Make sure it **isn't** done. assert request_generator.gi_frame is not None - stopped = consumer._stop_request_generator(request_generator) + response_generator = mock.Mock(spec=('done',)) + response_generator.done.return_value = True + stopped = consumer._stop_request_generator( + request_generator, response_generator) assert stopped is True # Make sure it **is** done. @@ -217,6 +237,9 @@ def test_stop_request_generator_not_running(): assert queue_.get() == item2 assert queue_.empty() + # Check mocks. + response_generator.done.assert_called_once_with() + def test_stop_request_generator_close_failure(): # Model scenario tested: @@ -229,11 +252,15 @@ def test_stop_request_generator_close_failure(): request_generator = mock.Mock(spec=('close',)) request_generator.close.side_effect = TypeError('Really, not a generator') - stopped = consumer._stop_request_generator(request_generator) + response_generator = mock.Mock(spec=('done',)) + response_generator.done.return_value = True + stopped = consumer._stop_request_generator( + request_generator, response_generator) assert stopped is False # Make sure close() was only called once. request_generator.close.assert_called_once_with() + response_generator.done.assert_called_once_with() def test_stop_request_generator_queue_non_empty(): @@ -264,7 +291,10 @@ def test_stop_request_generator_queue_non_empty(): assert received.empty() assert request_generator.gi_frame is not None - stopped = consumer._stop_request_generator(request_generator) + response_generator = mock.Mock(spec=('done',)) + response_generator.done.return_value = True + stopped = consumer._stop_request_generator( + request_generator, response_generator) assert stopped is False # Make sure the generator is **still** not finished. @@ -279,6 +309,9 @@ def test_stop_request_generator_queue_non_empty(): pass assert received.get() == item2 + # Check mocks. + response_generator.done.assert_called_once_with() + def test_stop_request_generator_running(): # Model scenario tested: @@ -304,7 +337,10 @@ def test_stop_request_generator_running(): assert received.empty() assert request_generator.gi_frame is not None - stopped = consumer._stop_request_generator(request_generator) + response_generator = mock.Mock(spec=('done',)) + response_generator.done.return_value = True + stopped = consumer._stop_request_generator( + request_generator, response_generator) assert stopped is True # Make sure it **is** done, though we may have to wait until @@ -316,3 +352,6 @@ def test_stop_request_generator_running(): assert request_generator.gi_frame is None assert received.get() == _helper_threads.STOP assert queue_.empty() + + # Check mocks. + response_generator.done.assert_called_once_with()