diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index af6883fd067e..b393cbfd5ec6 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -208,7 +208,7 @@ def load(self): float: The load value. """ if self._leaser is None: - return 0 + return 0.0 return max( [ @@ -384,14 +384,26 @@ def open(self, callback, on_callback_error): ) # Create the RPC + subscription = self._client.api.get_subscription(self._subscription) + stream_ack_deadline_seconds = subscription.ack_deadline_seconds + + get_initial_request = functools.partial( + self._get_initial_request, stream_ack_deadline_seconds + ) self._rpc = bidi.ResumableBidiRpc( start_rpc=self._client.api.streaming_pull, - initial_request=self._get_initial_request, + initial_request=get_initial_request, should_recover=self._should_recover, throttle_reopen=True, ) self._rpc.add_done_callback(self._on_rpc_done) + _LOGGER.debug( + "Creating a stream, default ACK deadline set to {} seconds.".format( + stream_ack_deadline_seconds + ) + ) + # Create references to threads self._dispatcher = dispatcher.Dispatcher(self, self._scheduler.queue) self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response) @@ -462,12 +474,16 @@ def close(self, reason=None): for callback in self._close_callbacks: callback(self, reason) - def _get_initial_request(self): + def _get_initial_request(self, stream_ack_deadline_seconds): """Return the initial request for the RPC. This defines the initial request that must always be sent to Pub/Sub immediately upon opening the subscription. + Args: + stream_ack_deadline_seconds (int): + The default message acknowledge deadline for the stream. + Returns: google.cloud.pubsub_v1.types.StreamingPullRequest: A request suitable for being the first request on the stream (and not @@ -486,7 +502,7 @@ def _get_initial_request(self): request = types.StreamingPullRequest( modify_deadline_ack_ids=list(lease_ids), modify_deadline_seconds=[self.ack_deadline] * len(lease_ids), - stream_ack_deadline_seconds=self.ack_histogram.percentile(99), + stream_ack_deadline_seconds=stream_ack_deadline_seconds, subscription=self._subscription, ) @@ -511,14 +527,6 @@ def _on_response(self, response): self._messages_on_hold.qsize(), ) - # Immediately modack the messages we received, as this tells the server - # that we've received them. - items = [ - requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99)) - for message in response.received_messages - ] - self._dispatcher.modify_ack_deadline(items) - invoke_callbacks_for = [] for received_message in response.received_messages: @@ -535,6 +543,15 @@ def _on_response(self, response): else: self._messages_on_hold.put(message) + # Immediately (i.e. without waiting for the auto lease management) + # modack the messages we received and not put on hold, as this tells + # the server that we've received them. + items = [ + requests.ModAckRequest(message.ack_id, self._ack_histogram.percentile(99)) + for message in invoke_callbacks_for + ] + self._dispatcher.modify_ack_deadline(items) + _LOGGER.debug( "Scheduling callbacks for %s new messages, new total on hold %s.", len(invoke_callbacks_for), diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 7ffb4a580194..c8030a5773db 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -381,6 +381,55 @@ class CallbackError(Exception): with pytest.raises(CallbackError): future.result(timeout=30) + def test_streaming_pull_ack_deadline( + self, publisher, subscriber, project, topic_path, subscription_path, cleanup + ): + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + + # Create a topic and a subscription, then subscribe to the topic. This + # must happen before the messages are published. + publisher.create_topic(topic_path) + + # Subscribe to the topic. This must happen before the messages + # are published. + subscriber.create_subscription( + subscription_path, topic_path, ack_deadline_seconds=60 + ) + + # publish some messages and wait for completion + self._publish_messages(publisher, topic_path, batch_sizes=[2]) + + # subscribe to the topic + callback = StreamingPullCallback( + processing_time=15, # more than the default ACK deadline of 10 seconds + resolve_at_msg_count=3, # one more than the published messages count + ) + flow_control = types.FlowControl(max_messages=1) + subscription_future = subscriber.subscribe( + subscription_path, callback, flow_control=flow_control + ) + + # We expect to process the first two messages in 2 * 15 seconds, and + # any duplicate message that is re-sent by the backend in additional + # 15 seconds, totalling 45 seconds (+ overhead) --> if there have been + # no duplicates in 60 seconds, we can reasonably assume that there + # won't be any. + try: + callback.done_future.result(timeout=60) + except exceptions.TimeoutError: + # future timed out, because we received no excessive messages + assert sorted(callback.seen_message_ids) == [1, 2] + else: + pytest.fail( + "Expected to receive 2 messages, but got at least {}.".format( + len(callback.seen_message_ids) + ) + ) + finally: + subscription_future.cancel() + def test_streaming_pull_max_messages( self, publisher, topic_path, subscriber, subscription_path, cleanup ): diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 877ccf97fd9a..352b09ba83fc 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -405,6 +405,11 @@ def test_heartbeat_inactive(): ) def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc): manager = make_manager() + manager._client.api.get_subscription.return_value = types.Subscription( + name="projects/foo/subscriptions/bar", + topic="projects/foo/topics/baz", + ack_deadline_seconds=123, + ) manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error) @@ -426,10 +431,14 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi resumable_bidi_rpc.assert_called_once_with( start_rpc=manager._client.api.streaming_pull, - initial_request=manager._get_initial_request, + initial_request=mock.ANY, should_recover=manager._should_recover, throttle_reopen=True, ) + initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"] + assert initial_request_arg.func == manager._get_initial_request + assert initial_request_arg.args[0] == 123 + resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with( manager._on_rpc_done ) @@ -574,11 +583,11 @@ def test__get_initial_request(): manager._leaser = mock.create_autospec(leaser.Leaser, instance=True) manager._leaser.ack_ids = ["1", "2"] - initial_request = manager._get_initial_request() + initial_request = manager._get_initial_request(123) assert isinstance(initial_request, types.StreamingPullRequest) assert initial_request.subscription == "subscription-name" - assert initial_request.stream_ack_deadline_seconds == 10 + assert initial_request.stream_ack_deadline_seconds == 123 assert initial_request.modify_deadline_ack_ids == ["1", "2"] assert initial_request.modify_deadline_seconds == [10, 10] @@ -587,11 +596,11 @@ def test__get_initial_request_wo_leaser(): manager = make_manager() manager._leaser = None - initial_request = manager._get_initial_request() + initial_request = manager._get_initial_request(123) assert isinstance(initial_request, types.StreamingPullRequest) assert initial_request.subscription == "subscription-name" - assert initial_request.stream_ack_deadline_seconds == 10 + assert initial_request.stream_ack_deadline_seconds == 123 assert initial_request.modify_deadline_ack_ids == [] assert initial_request.modify_deadline_seconds == [] @@ -660,12 +669,10 @@ def test__on_response_with_leaser_overload(): # are called in the expected way. manager._on_response(response) + # only the messages that are added to the lease management and dispatched to + # callbacks should have their ACK deadline extended dispatcher.modify_ack_deadline.assert_called_once_with( - [ - requests.ModAckRequest("fack", 10), - requests.ModAckRequest("back", 10), - requests.ModAckRequest("zack", 10), - ] + [requests.ModAckRequest("fack", 10)] ) # one message should be scheduled, the leaser capacity allows for it