From 20b717d637435b6765fbd7fee09dff26b2f0c5a4 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 23 Sep 2019 17:49:29 +0200 Subject: [PATCH 1/2] feat(pubsub): set default stream ACK deadline to subscriptions' When subscribing, it makes sense to use the configured subscription's maximum ACK deadline for the streaming pull, instead of an optimistic minimum of 10 seconds. Using an optimistic deadline affects messages that are put on hold and are not lease managed, because by the time the client dispatches them to the user's callback, the optimistic ACK deadline could have already been missed, resulting in the backend unnecessary re-sending those messages, even if the subscription's ACK deadline has not been hit yet. --- .../_protocol/streaming_pull_manager.py | 41 +++++++++++----- pubsub/tests/system.py | 49 +++++++++++++++++++ .../subscriber/test_streaming_pull_manager.py | 27 ++++++---- 3 files changed, 95 insertions(+), 22 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index af6883fd067e..b393cbfd5ec6 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -208,7 +208,7 @@ def load(self): float: The load value. """ if self._leaser is None: - return 0 + return 0.0 return max( [ @@ -384,14 +384,26 @@ def open(self, callback, on_callback_error): ) # Create the RPC + subscription = self._client.api.get_subscription(self._subscription) + stream_ack_deadline_seconds = subscription.ack_deadline_seconds + + get_initial_request = functools.partial( + self._get_initial_request, stream_ack_deadline_seconds + ) self._rpc = bidi.ResumableBidiRpc( start_rpc=self._client.api.streaming_pull, - initial_request=self._get_initial_request, + initial_request=get_initial_request, should_recover=self._should_recover, throttle_reopen=True, ) self._rpc.add_done_callback(self._on_rpc_done) + _LOGGER.debug( + "Creating a stream, default ACK deadline set to {} seconds.".format( + stream_ack_deadline_seconds + ) + ) + # Create references to threads self._dispatcher = dispatcher.Dispatcher(self, self._scheduler.queue) self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response) @@ -462,12 +474,16 @@ def close(self, reason=None): for callback in self._close_callbacks: callback(self, reason) - def _get_initial_request(self): + def _get_initial_request(self, stream_ack_deadline_seconds): """Return the initial request for the RPC. This defines the initial request that must always be sent to Pub/Sub immediately upon opening the subscription. + Args: + stream_ack_deadline_seconds (int): + The default message acknowledge deadline for the stream. + Returns: google.cloud.pubsub_v1.types.StreamingPullRequest: A request suitable for being the first request on the stream (and not @@ -486,7 +502,7 @@ def _get_initial_request(self): request = types.StreamingPullRequest( modify_deadline_ack_ids=list(lease_ids), modify_deadline_seconds=[self.ack_deadline] * len(lease_ids), - stream_ack_deadline_seconds=self.ack_histogram.percentile(99), + stream_ack_deadline_seconds=stream_ack_deadline_seconds, subscription=self._subscription, ) @@ -511,14 +527,6 @@ def _on_response(self, response): self._messages_on_hold.qsize(), ) - # Immediately modack the messages we received, as this tells the server - # that we've received them. - items = [ - requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99)) - for message in response.received_messages - ] - self._dispatcher.modify_ack_deadline(items) - invoke_callbacks_for = [] for received_message in response.received_messages: @@ -535,6 +543,15 @@ def _on_response(self, response): else: self._messages_on_hold.put(message) + # Immediately (i.e. without waiting for the auto lease management) + # modack the messages we received and not put on hold, as this tells + # the server that we've received them. + items = [ + requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99)) + for message in invoke_callbacks_for + ] + self._dispatcher.modify_ack_deadline(items) + _LOGGER.debug( "Scheduling callbacks for %s new messages, new total on hold %s.", len(invoke_callbacks_for), diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 7ffb4a580194..8dbf92dbbbba 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -381,6 +381,55 @@ class CallbackError(Exception): with pytest.raises(CallbackError): future.result(timeout=30) + def test_streaming_pull_ack_deadline( + self, publisher, subscriber, project, topic_path, subscription_path, cleanup + ): + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + + # Create a topic and a subscription, then subscribe to the topic. This + # must happen before the messages are published. + publisher.create_topic(topic_path) + + # Subscribe to the topic. This must happen before the messages + # are published. + subscriber.create_subscription( + subscription_path, topic_path, ack_deadline_seconds=60 + ) + + # publish some messages and wait for completion + self._publish_messages(publisher, topic_path, batch_sizes=[2]) + + # subscribe to the topic + callback = StreamingPullCallback( + processing_time=15, # more than the default ACK deadline of 10 seconds + resolve_at_msg_count=3, # one more than the published messages count + ) + flow_control = types.FlowControl(max_messages=1) + sub_future = subscriber.subscribe( + subscription_path, callback, flow_control=flow_control + ) + + # We expect to process the first two messages in 2 * 15 seconds, and + # any duplicate message that is re-sent by the backend in additional + # 15 seconds, totalling 45 seconds (+ overhead) --> if there have been + # no duplicates in 60 seconds, we can reasonably assume that there + # won't be any. + try: + callback.done_future.result(timeout=60) + except exceptions.TimeoutError: + # future timed out, because we received no excessive messages + assert sorted(callback.seen_message_ids) == [1, 2] + else: + pytest.fail( + "Expected to receive 2 messages, but got at least {}.".format( + len(callback.seen_message_ids) + ) + ) + finally: + sub_future.cancel() + def test_streaming_pull_max_messages( self, publisher, topic_path, subscriber, subscription_path, cleanup ): diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 877ccf97fd9a..352b09ba83fc 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -405,6 +405,11 @@ def test_heartbeat_inactive(): ) def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc): manager = make_manager() + manager._client.api.get_subscription.return_value = types.Subscription( + name="projects/foo/subscriptions/bar", + topic="projects/foo/topics/baz", + ack_deadline_seconds=123, + ) manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error) @@ -426,10 +431,14 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi resumable_bidi_rpc.assert_called_once_with( start_rpc=manager._client.api.streaming_pull, - initial_request=manager._get_initial_request, + initial_request=mock.ANY, should_recover=manager._should_recover, throttle_reopen=True, ) + initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"] + assert initial_request_arg.func == manager._get_initial_request + assert initial_request_arg.args[0] == 123 + resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with( manager._on_rpc_done ) @@ -574,11 +583,11 @@ def test__get_initial_request(): manager._leaser = mock.create_autospec(leaser.Leaser, instance=True) manager._leaser.ack_ids = ["1", "2"] - initial_request = manager._get_initial_request() + initial_request = manager._get_initial_request(123) assert isinstance(initial_request, types.StreamingPullRequest) assert initial_request.subscription == "subscription-name" - assert initial_request.stream_ack_deadline_seconds == 10 + assert initial_request.stream_ack_deadline_seconds == 123 assert initial_request.modify_deadline_ack_ids == ["1", "2"] assert initial_request.modify_deadline_seconds == [10, 10] @@ -587,11 +596,11 @@ def test__get_initial_request_wo_leaser(): manager = make_manager() manager._leaser = None - initial_request = manager._get_initial_request() + initial_request = manager._get_initial_request(123) assert isinstance(initial_request, types.StreamingPullRequest) assert initial_request.subscription == "subscription-name" - assert initial_request.stream_ack_deadline_seconds == 10 + assert initial_request.stream_ack_deadline_seconds == 123 assert initial_request.modify_deadline_ack_ids == [] assert initial_request.modify_deadline_seconds == [] @@ -660,12 +669,10 @@ def test__on_response_with_leaser_overload(): # are called in the expected way. manager._on_response(response) + # only the messages that are added to the lease management and dispatched to + # callbacks should have their ACK deadline extended dispatcher.modify_ack_deadline.assert_called_once_with( - [ - requests.ModAckRequest("fack", 10), - requests.ModAckRequest("back", 10), - requests.ModAckRequest("zack", 10), - ] + [requests.ModAckRequest("fack", 10)] ) # one message should be scheduled, the leaser capacity allows for it From dd9ca040ba123b61f44f301d4265706d938e0612 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 26 Sep 2019 09:47:41 +0200 Subject: [PATCH 2/2] Rename sub_future to subscription_future --- pubsub/tests/system.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 8dbf92dbbbba..c8030a5773db 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -407,7 +407,7 @@ def test_streaming_pull_ack_deadline( resolve_at_msg_count=3, # one more than the published messages count ) flow_control = types.FlowControl(max_messages=1) - sub_future = subscriber.subscribe( + subscription_future = subscriber.subscribe( subscription_path, callback, flow_control=flow_control ) @@ -428,7 +428,7 @@ def test_streaming_pull_ack_deadline( ) ) finally: - sub_future.cancel() + subscription_future.cancel() def test_streaming_pull_max_messages( self, publisher, topic_path, subscriber, subscription_path, cleanup