diff --git a/pubsub/CHANGELOG.md b/pubsub/CHANGELOG.md index 51adc776fffc..446162ace07f 100644 --- a/pubsub/CHANGELOG.md +++ b/pubsub/CHANGELOG.md @@ -17,8 +17,8 @@ names (#4476). - Logging changes - Adding debug logs when lease management exits (#4484) - - Adding debug logs when hen `QueueCallbackThread` exits (#4494). - Instances handle theprocessing of messages in a + - Adding debug logs when `QueueCallbackThread` exits (#4494). + Instances handle the processing of messages in a subscription (e.g. to `ack`). - Using a named logger in `publisher.batch.thread` (#4473) - Adding newlines before logging protobuf payloads (#4471) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py b/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py index 394c6e67d39f..737bfcd1fe88 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py @@ -96,21 +96,21 @@ "gRPC C Core" -> "gRPC Python" [label="queue", dir="both"] "gRPC Python" -> "Consumer" [label="responses", color="red"] "Consumer" -> "request generator thread" [label="starts", color="gray"] - "Policy" -> "QueueCallbackThread" [label="starts", color="gray"] + "Policy" -> "QueueCallbackWorker" [label="starts", color="gray"] "request generator thread" -> "gRPC Python" [label="requests", color="blue"] "Consumer" -> "Policy" [label="responses", color="red"] "Policy" -> "futures.Executor" [label="response", color="red"] "futures.Executor" -> "callback" [label="response", color="red"] "callback" -> "callback_request_queue" [label="requests", color="blue"] - "callback_request_queue" -> "QueueCallbackThread" + "callback_request_queue" -> "QueueCallbackWorker" [label="consumed by", color="blue"] - "QueueCallbackThread" -> "Consumer" + "QueueCallbackWorker" -> "Consumer" [label="send_response", color="blue"] } This part is actually up to the Policy to enable. The consumer just provides a -thread-safe queue for requests. The :cls:`QueueCallbackThread` can be used by +thread-safe queue for requests. The :cls:`QueueCallbackWorker` can be used by the Policy implementation to spin up the worker thread to pump the concurrency-safe queue. See the Pub/Sub subscriber implementation for an example of this. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py b/pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py index e1a5a1d15968..b0f166e1a3fa 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py @@ -21,7 +21,7 @@ __all__ = ( 'HelperThreadRegistry', - 'QueueCallbackThread', + 'QueueCallbackWorker', 'STOP', ) @@ -125,14 +125,9 @@ def stop_all(self): self.stop(name) -class QueueCallbackThread(object): +class QueueCallbackWorker(object): """A helper that executes a callback for every item in the queue. - .. note:: - - This is not actually a thread, but it is intended to be a target - for a thread. - Calls a blocking ``get()`` on the ``queue`` until it encounters :attr:`STOP`. @@ -141,8 +136,10 @@ class QueueCallbackThread(object): concurrency boundary implemented by ``executor``. Items will be popped off (with a blocking ``get()``) until :attr:`STOP` is encountered. - callback (Callable): A callback that can process items pulled off - of the queue. + callback (Callable[[str, Dict], Any]): A callback that can process + items pulled off of the queue. Items are assumed to be a pair + of a method name to be invoked and a dictionary of keyword + arguments for that method. """ def __init__(self, queue, callback): @@ -153,12 +150,13 @@ def __call__(self): while True: item = self.queue.get() if item == STOP: - _LOGGER.debug('Exiting the QueueCallbackThread.') + _LOGGER.debug('Exiting the QueueCallbackWorker.') return # Run the callback. If any exceptions occur, log them and # continue. try: - self._callback(item) + action, kwargs = item + self._callback(action, kwargs) except Exception as exc: _LOGGER.error('%s: %s', exc.__class__.__name__, exc) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index 3ccf454fd58a..5fffcd09f068 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -117,9 +117,9 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(), ) self._executor = executor _LOGGER.debug('Creating callback requests thread (not starting).') - self._callback_requests = _helper_threads.QueueCallbackThread( + self._callback_requests = _helper_threads.QueueCallbackWorker( self._request_queue, - self.on_callback_request, + self.dispatch_callback, ) def close(self): @@ -180,10 +180,33 @@ def open(self, callback): # Return the future. return self._future - def on_callback_request(self, callback_request): - """Map the callback request to the appropriate gRPC request.""" - action, kwargs = callback_request[0], callback_request[1] - getattr(self, action)(**kwargs) + def dispatch_callback(self, action, kwargs): + """Map the callback request to the appropriate gRPC request. + + Args: + action (str): The method to be invoked. + kwargs (Dict[str, Any]): The keyword arguments for the method + specified by ``action``. + + Raises: + ValueError: If ``action`` isn't one of the expected actions + "ack", "drop", "lease", "modify_ack_deadline" or "nack". + """ + if action == 'ack': + self.ack(**kwargs) + elif action == 'drop': + self.drop(**kwargs) + elif action == 'lease': + self.lease(**kwargs) + elif action == 'modify_ack_deadline': + self.modify_ack_deadline(**kwargs) + elif action == 'nack': + self.nack(**kwargs) + else: + raise ValueError( + 'Unexpected action', action, + 'Must be one of "ack", "drop", "lease", ' + '"modify_ack_deadline" or "nack".') def on_exception(self, exception): """Handle the exception. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py index 5f5ca33d8efc..ec889b7fc2fd 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_helper_threads.py @@ -117,33 +117,35 @@ def test_stop_all_noop(): assert len(registry._helper_threads) == 0 -def test_queue_callback_thread(): +def test_queue_callback_worker(): queue_ = queue.Queue() callback = mock.Mock(spec=()) - qct = _helper_threads.QueueCallbackThread(queue_, callback) + qct = _helper_threads.QueueCallbackWorker(queue_, callback) # Set up an appropriate mock for the queue, and call the queue callback # thread. with mock.patch.object(queue.Queue, 'get') as get: - get.side_effect = (mock.sentinel.A, _helper_threads.STOP) + item1 = ('action', mock.sentinel.A) + get.side_effect = (item1, _helper_threads.STOP) qct() # Assert that we got the expected calls. assert get.call_count == 2 - callback.assert_called_once_with(mock.sentinel.A) + callback.assert_called_once_with('action', mock.sentinel.A) -def test_queue_callback_thread_exception(): +def test_queue_callback_worker_exception(): queue_ = queue.Queue() callback = mock.Mock(spec=(), side_effect=(Exception,)) - qct = _helper_threads.QueueCallbackThread(queue_, callback) + qct = _helper_threads.QueueCallbackWorker(queue_, callback) # Set up an appropriate mock for the queue, and call the queue callback # thread. with mock.patch.object(queue.Queue, 'get') as get: - get.side_effect = (mock.sentinel.A, _helper_threads.STOP) + item1 = ('action', mock.sentinel.A) + get.side_effect = (item1, _helper_threads.STOP) qct() # Assert that we got the expected calls. assert get.call_count == 2 - callback.assert_called_once_with(mock.sentinel.A) + callback.assert_called_once_with('action', mock.sentinel.A) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py index fef7df01dea0..f73bfea21e12 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py @@ -81,11 +81,30 @@ def test_open(thread_start, htr_start): thread_start.assert_called() -def test_on_callback_request(): +def test_dispatch_callback_valid_actions(): policy = create_policy() - with mock.patch.object(policy, 'call_rpc') as call_rpc: - policy.on_callback_request(('call_rpc', {'something': 42})) - call_rpc.assert_called_once_with(something=42) + kwargs = {'foo': 10, 'bar': 13.37} + actions = ( + 'ack', + 'drop', + 'lease', + 'modify_ack_deadline', + 'nack', + ) + for action in actions: + with mock.patch.object(policy, action) as mocked: + policy.dispatch_callback(action, kwargs) + mocked.assert_called_once_with(**kwargs) + + +def test_dispatch_callback_invalid_action(): + policy = create_policy() + with pytest.raises(ValueError) as exc_info: + policy.dispatch_callback('gecko', {}) + + assert len(exc_info.value.args) == 3 + assert exc_info.value.args[0] == 'Unexpected action' + assert exc_info.value.args[1] == 'gecko' def test_on_exception_deadline_exceeded():