Skip to content

Commit

Permalink
Break Policy<->Consumer reference cycle. (#4552)
Browse files Browse the repository at this point in the history
Make `Policy` the parent of `Consumer` and explicitly require
passing a `policy` into `Consumer.start_consuming()` (and its
helpers).
  • Loading branch information
dhermes authored Dec 7, 2017
1 parent e7b6b98 commit 840df1b
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 57 deletions.
49 changes: 32 additions & 17 deletions pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,7 @@ class Consumer(object):
low. The Consumer and end-user can configure any sort of executor they want
for the actual processing of the responses, which may be CPU intensive.
"""
def __init__(self, policy):
"""
Args:
policy (Consumer): The consumer policy, which defines how
requests and responses are handled.
"""
self._policy = policy
def __init__(self):
self._request_queue = queue.Queue()
self.stopped = threading.Event()
self._put_lock = threading.Lock()
Expand All @@ -197,18 +191,24 @@ def send_request(self, request):
with self._put_lock:
self._request_queue.put(request)

def _request_generator_thread(self):
def _request_generator_thread(self, policy):
"""Generate requests for the stream.
This blocks for new requests on the request queue and yields them to
gRPC.
Args:
policy (~.pubsub_v1.subscriber.policy.base.BasePolicy): The policy
that owns this consumer. A policy is used to create the
initial request used to open the streaming pull bidirectional
stream.
Yields:
google.cloud.pubsub_v1.types.StreamingPullRequest: Requests
"""
# First, yield the initial request. This occurs on every new
# connection, fundamentally including a resumed connection.
initial_request = self._policy.get_initial_request(ack_queue=True)
initial_request = policy.get_initial_request(ack_queue=True)
_LOGGER.debug('Sending initial request:\n%r', initial_request)
yield initial_request

Expand Down Expand Up @@ -290,8 +290,13 @@ def _stop_request_generator(self, request_generator):
_LOGGER.debug('Successfully closed request generator.')
return True

def _blocking_consume(self):
"""Consume the stream indefinitely."""
def _blocking_consume(self, policy):
"""Consume the stream indefinitely.
Args:
policy (~.pubsub_v1.subscriber.policy.base.BasePolicy): The policy,
which defines how requests and responses are handled.
"""
while True:
# It is possible that a timeout can cause the stream to not
# exit cleanly when the user has called stop_consuming(). This
Expand All @@ -301,12 +306,12 @@ def _blocking_consume(self):
_LOGGER.debug('Event signalled consumer exit.')
break

request_generator = self._request_generator_thread()
response_generator = self._policy.call_rpc(request_generator)
request_generator = self._request_generator_thread(policy)
response_generator = policy.call_rpc(request_generator)
try:
for response in response_generator:
_LOGGER.debug('Received response:\n%r', response)
self._policy.on_response(response)
policy.on_response(response)

# If the loop above exits without an exception, then the
# request stream terminated cleanly, which should only happen
Expand All @@ -315,19 +320,29 @@ def _blocking_consume(self):
_LOGGER.debug('Clean RPC loop exit signalled consumer exit.')
break
except Exception as exc:
recover = self._policy.on_exception(exc)
recover = policy.on_exception(exc)
if recover:
recover = self._stop_request_generator(request_generator)
if not recover:
self._stop_no_join()
return

def start_consuming(self):
"""Start consuming the stream."""
def start_consuming(self, policy):
"""Start consuming the stream.
Sets the ``_consumer_thread`` member on the current consumer with
a newly started thread.
Args:
policy (~.pubsub_v1.subscriber.policy.base.BasePolicy): The policy
that owns this consumer. A policy defines how requests and
responses are handled.
"""
self.stopped.clear()
thread = threading.Thread(
name=_BIDIRECTIONAL_CONSUMER_NAME,
target=self._blocking_consume,
args=(policy,),
)
thread.daemon = True
thread.start()
Expand Down
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/_histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Histogram(object):
The default implementation uses the 99th percentile of previous ack
times to implicitly lease messages; however, custom
:class:`~.pubsub_v1.subscriber.consumer.base.BaseConsumer` subclasses
:class:`~.pubsub_v1.subscriber._consumer.Consumer` subclasses
are free to use a different formula.
The precision of data stored is to the nearest integer. Additionally,
Expand Down
4 changes: 2 additions & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def subscribe(self, subscription, callback=None, flow_control=()):
"""Return a representation of an individual subscription.
This method creates and returns a ``Consumer`` object (that is, a
:class:`~.pubsub_v1.subscriber.consumer.base.BaseConsumer`)
:class:`~.pubsub_v1.subscriber._consumer.Consumer`)
subclass) bound to the topic. It does `not` create the subcription
on the backend (or do any API call at all); it simply returns an
object capable of doing these things.
Expand All @@ -122,7 +122,7 @@ def subscribe(self, subscription, callback=None, flow_control=()):
inundated with too many messages at once.
Returns:
~.pubsub_v1.subscriber.consumer.base.BaseConsumer: An instance
~.pubsub_v1.subscriber._consumer.Consumer: An instance
of the defined ``consumer_class`` on the client.
Raises:
Expand Down
4 changes: 2 additions & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Message(object):
them in callbacks on subscriptions; most users should never have a need
to instantiate them by hand. (The exception to this is if you are
implementing a custom subclass to
:class:`~.pubsub_v1.subscriber.consumer.BaseConsumer`.)
:class:`~.pubsub_v1.subscriber._consumer.Consumer`.)
Attributes:
message_id (str): The message ID. In general, you should not need
Expand Down Expand Up @@ -186,7 +186,7 @@ def modify_ack_deadline(self, seconds):
The default implementation handles this for you; you should not need
to manually deal with setting ack deadlines. The exception case is
if you are implementing your own custom subclass of
:class:`~.pubsub_v1.subcriber.consumer.BaseConsumer`.
:class:`~.pubsub_v1.subcriber._consumer.Consumer`.
.. note::
This is not an extension; it *sets* the deadline to the given
Expand Down
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(self, client, subscription,
flow_control=types.FlowControl(), histogram_data=None):
self._client = client
self._subscription = subscription
self._consumer = _consumer.Consumer(self)
self._consumer = _consumer.Consumer()
self._ack_deadline = 10
self._last_histogram_size = 0
self._future = None
Expand Down
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def open(self, callback):
self._callback = callback
self._start_dispatch()
# Actually start consuming messages.
self._consumer.start_consuming()
self._consumer.start_consuming(self)
self._start_lease_worker()

# Return the future.
Expand Down
63 changes: 30 additions & 33 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,20 @@
from google.cloud.pubsub_v1.subscriber.policy import thread


def create_consumer():
creds = mock.Mock(spec=credentials.Credentials)
client = subscriber.Client(credentials=creds)
subscription = client.subscribe('sub_name_e')
return _consumer.Consumer(policy=subscription)


def test_send_request():
consumer = create_consumer()
consumer = _consumer.Consumer()
request = types.StreamingPullRequest(subscription='foo')
with mock.patch.object(queue.Queue, 'put') as put:
consumer.send_request(request)
put.assert_called_once_with(request)


def test_request_generator_thread():
consumer = create_consumer()
generator = consumer._request_generator_thread()
consumer = _consumer.Consumer()
creds = mock.Mock(spec=credentials.Credentials)
client = subscriber.Client(credentials=creds)
policy = client.subscribe('sub_name_e')
generator = consumer._request_generator_thread(policy)

# The first request that comes from the request generator thread
# should always be the initial request.
Expand All @@ -64,27 +60,24 @@ def test_request_generator_thread():


def test_blocking_consume():
consumer = create_consumer()
Policy = type(consumer._policy)
policy = mock.Mock(spec=('call_rpc', 'on_response'))
policy.call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)

# Establish that we get responses until we run out of them.
with mock.patch.object(Policy, 'call_rpc', autospec=True) as call_rpc:
call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
with mock.patch.object(Policy, 'on_response', autospec=True) as on_res:
consumer._blocking_consume()
assert on_res.call_count == 2
assert on_res.mock_calls[0][1][1] == mock.sentinel.A
assert on_res.mock_calls[1][1][1] == mock.sentinel.B
consumer = _consumer.Consumer()
assert consumer._blocking_consume(policy) is None
policy.call_rpc.assert_called_once()
policy.on_response.assert_has_calls(
[mock.call(mock.sentinel.A), mock.call(mock.sentinel.B)])


@mock.patch.object(_consumer, '_LOGGER')
def test_blocking_consume_when_exiting(_LOGGER):
consumer = create_consumer()
consumer = _consumer.Consumer()
assert consumer.stopped.is_set() is False
consumer.stopped.set()

# Make sure method cleanly exits.
assert consumer._blocking_consume() is None
assert consumer._blocking_consume(None) is None

_LOGGER.debug.assert_called_once_with('Event signalled consumer exit.')

Expand All @@ -107,12 +100,12 @@ def test_blocking_consume_on_exception():
exc = TypeError('Bad things!')
policy.on_response.side_effect = exc

consumer = _consumer.Consumer(policy=policy)
consumer = _consumer.Consumer()
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
policy.on_exception.side_effect = OnException()

# Establish that we get responses until we are sent the exiting event.
consumer._blocking_consume()
consumer._blocking_consume(policy)
assert consumer._consumer_thread is None

# Check mocks.
Expand All @@ -131,12 +124,12 @@ def test_blocking_consume_two_exceptions():
exc2 = ValueError('Something grumble.')
policy.on_response.side_effect = (exc1, exc2)

consumer = _consumer.Consumer(policy=policy)
consumer = _consumer.Consumer()
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
policy.on_exception.side_effect = OnException(acceptable=exc1)

# Establish that we get responses until we are sent the exiting event.
consumer._blocking_consume()
consumer._blocking_consume(policy)
assert consumer._consumer_thread is None

# Check mocks.
Expand All @@ -148,20 +141,24 @@ def test_blocking_consume_two_exceptions():


def test_start_consuming():
consumer = create_consumer()
creds = mock.Mock(spec=credentials.Credentials)
client = subscriber.Client(credentials=creds)
policy = client.subscribe('sub_name_e')
consumer = _consumer.Consumer()
with mock.patch.object(threading, 'Thread', autospec=True) as Thread:
consumer.start_consuming()
consumer.start_consuming(policy)

assert consumer.stopped.is_set() is False
Thread.assert_called_once_with(
name=_consumer._BIDIRECTIONAL_CONSUMER_NAME,
target=consumer._blocking_consume,
args=(policy,),
)
assert consumer._consumer_thread is Thread.return_value


def test_stop_consuming():
consumer = create_consumer()
consumer = _consumer.Consumer()
assert consumer.stopped.is_set() is False
thread = mock.Mock(spec=threading.Thread)
consumer._consumer_thread = thread
Expand All @@ -188,7 +185,7 @@ def test_stop_request_generator_not_running():
# - The request queue **is not** empty
# Expected result:
# - ``_stop_request_generator()`` successfully calls ``.close()``
consumer = create_consumer()
consumer = _consumer.Consumer()
queue_ = consumer._request_queue
received = queue.Queue()
request_generator = basic_queue_generator(queue_, received)
Expand Down Expand Up @@ -227,7 +224,7 @@ def test_stop_request_generator_close_failure():
# Expected result:
# - ``_stop_request_generator()`` falls through to the ``LOGGER.error``
# case and returns ``False``
consumer = create_consumer()
consumer = _consumer.Consumer()

request_generator = mock.Mock(spec=('close',))
request_generator.close.side_effect = TypeError('Really, not a generator')
Expand All @@ -247,7 +244,7 @@ def test_stop_request_generator_queue_non_empty():
# - ``_stop_request_generator()`` can't call ``.close()`` (since
# the generator is running) but then returns with ``False`` because
# the queue **is not** empty
consumer = create_consumer()
consumer = _consumer.Consumer()
# Attach a "fake" queue to the request generator so the generator can
# block on an empty queue while the consumer's queue is not empty.
queue_ = queue.Queue()
Expand Down Expand Up @@ -292,7 +289,7 @@ def test_stop_request_generator_running():
# the generator is running) but then verifies that the queue is
# empty and sends ``STOP`` into the queue to successfully stop
# the generator
consumer = create_consumer()
consumer = _consumer.Consumer()
queue_ = consumer._request_queue
received = queue.Queue()
request_generator = basic_queue_generator(queue_, received)
Expand Down

0 comments on commit 840df1b

Please sign in to comment.