Skip to content

Commit

Permalink
Checking _Rendezvous.done() when stopping Pub / Sub request generator. (
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes authored Dec 7, 2017
1 parent 840df1b commit 061011d
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 15 deletions.
16 changes: 14 additions & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def _request_generator_thread(self, policy):
_LOGGER.debug('Sending request:\n%r', request)
yield request

def _stop_request_generator(self, request_generator):
def _stop_request_generator(self, request_generator, response_generator):
"""Ensure a request generator is closed.
This **must** be done when recovering from a retry-able exception.
Expand All @@ -237,12 +237,23 @@ def _stop_request_generator(self, request_generator):
Args:
request_generator (Generator): A streaming pull request generator
returned from :meth:`_request_generator_thread`.
response_generator (grpc.Future): The gRPC bidirectional stream
object that **was** consuming the ``request_generator``. (It
will actually spawn a thread to consume the requests, but
that thread will stop once the rendezvous has a status code
set.)
Returns:
bool: Indicates if the generator was successfully stopped. Will
be :data:`True` unless the queue is not empty and the generator
is running.
"""
if not response_generator.done():
_LOGGER.debug(
'Response generator must be done before stopping '
'request generator.')
return False

with self._put_lock:
try:
request_generator.close()
Expand Down Expand Up @@ -322,7 +333,8 @@ def _blocking_consume(self, policy):
except Exception as exc:
recover = policy.on_exception(exc)
if recover:
recover = self._stop_request_generator(request_generator)
recover = self._stop_request_generator(
request_generator, response_generator)
if not recover:
self._stop_no_join()
return
Expand Down
65 changes: 52 additions & 13 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,27 +115,32 @@ def test_blocking_consume_on_exception():


def test_blocking_consume_two_exceptions():
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
policy.call_rpc.side_effect = (
(mock.sentinel.A,),
(mock.sentinel.B,),
)
policy = mock.Mock(spec=('call_rpc', 'on_exception'))

exc1 = NameError('Oh noes.')
exc2 = ValueError('Something grumble.')
policy.on_response.side_effect = (exc1, exc2)
policy.on_exception.side_effect = OnException(acceptable=exc1)

response_generator1 = mock.MagicMock(spec=('__iter__', 'done'))
response_generator1.__iter__.side_effect = exc1
response_generator1.done.return_value = True
response_generator2 = mock.MagicMock(spec=('__iter__', 'done'))
response_generator2.__iter__.side_effect = exc2
policy.call_rpc.side_effect = (response_generator1, response_generator2)

consumer = _consumer.Consumer()
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
policy.on_exception.side_effect = OnException(acceptable=exc1)

# Establish that we get responses until we are sent the exiting event.
consumer._blocking_consume(policy)
assert consumer._consumer_thread is None

# Check mocks.
assert policy.call_rpc.call_count == 2
policy.on_response.assert_has_calls(
[mock.call(mock.sentinel.A), mock.call(mock.sentinel.B)])
response_generator1.__iter__.assert_called_once_with()
response_generator1.done.assert_called_once_with()
response_generator2.__iter__.assert_called_once_with()
response_generator2.done.assert_not_called()
policy.on_exception.assert_has_calls(
[mock.call(exc1), mock.call(exc2)])

Expand Down Expand Up @@ -179,6 +184,18 @@ def basic_queue_generator(queue, received):
yield value


def test_stop_request_generator_response_not_done():
consumer = _consumer.Consumer()

response_generator = mock.Mock(spec=('done',))
response_generator.done.return_value = False
stopped = consumer._stop_request_generator(None, response_generator)
assert stopped is False

# Check mocks.
response_generator.done.assert_called_once_with()


def test_stop_request_generator_not_running():
# Model scenario tested:
# - The request generator **is not** running
Expand Down Expand Up @@ -207,7 +224,10 @@ def test_stop_request_generator_not_running():
# Make sure it **isn't** done.
assert request_generator.gi_frame is not None

stopped = consumer._stop_request_generator(request_generator)
response_generator = mock.Mock(spec=('done',))
response_generator.done.return_value = True
stopped = consumer._stop_request_generator(
request_generator, response_generator)
assert stopped is True

# Make sure it **is** done.
Expand All @@ -217,6 +237,9 @@ def test_stop_request_generator_not_running():
assert queue_.get() == item2
assert queue_.empty()

# Check mocks.
response_generator.done.assert_called_once_with()


def test_stop_request_generator_close_failure():
# Model scenario tested:
Expand All @@ -229,11 +252,15 @@ def test_stop_request_generator_close_failure():
request_generator = mock.Mock(spec=('close',))
request_generator.close.side_effect = TypeError('Really, not a generator')

stopped = consumer._stop_request_generator(request_generator)
response_generator = mock.Mock(spec=('done',))
response_generator.done.return_value = True
stopped = consumer._stop_request_generator(
request_generator, response_generator)
assert stopped is False

# Make sure close() was only called once.
request_generator.close.assert_called_once_with()
response_generator.done.assert_called_once_with()


def test_stop_request_generator_queue_non_empty():
Expand Down Expand Up @@ -264,7 +291,10 @@ def test_stop_request_generator_queue_non_empty():
assert received.empty()
assert request_generator.gi_frame is not None

stopped = consumer._stop_request_generator(request_generator)
response_generator = mock.Mock(spec=('done',))
response_generator.done.return_value = True
stopped = consumer._stop_request_generator(
request_generator, response_generator)
assert stopped is False

# Make sure the generator is **still** not finished.
Expand All @@ -279,6 +309,9 @@ def test_stop_request_generator_queue_non_empty():
pass
assert received.get() == item2

# Check mocks.
response_generator.done.assert_called_once_with()


def test_stop_request_generator_running():
# Model scenario tested:
Expand All @@ -304,7 +337,10 @@ def test_stop_request_generator_running():
assert received.empty()
assert request_generator.gi_frame is not None

stopped = consumer._stop_request_generator(request_generator)
response_generator = mock.Mock(spec=('done',))
response_generator.done.return_value = True
stopped = consumer._stop_request_generator(
request_generator, response_generator)
assert stopped is True

# Make sure it **is** done, though we may have to wait until
Expand All @@ -316,3 +352,6 @@ def test_stop_request_generator_running():
assert request_generator.gi_frame is None
assert received.get() == _helper_threads.STOP
assert queue_.empty()

# Check mocks.
response_generator.done.assert_called_once_with()

0 comments on commit 061011d

Please sign in to comment.