diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index b393cbfd5ec6..e9414247f9cd 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -50,6 +50,13 @@ _RESUME_THRESHOLD = 0.8 """The load threshold below which to resume the incoming message stream.""" +_DEFAULT_STREAM_ACK_DEADLINE = 60 +"""The default message acknowledge deadline in seconds for incoming message stream. + +This default deadline is dynamically modified for the messages that are added +to the lease management. +""" + def _maybe_wrap_exception(exception): """Wraps a gRPC exception class, if needed.""" @@ -384,8 +391,17 @@ def open(self, callback, on_callback_error): ) # Create the RPC - subscription = self._client.api.get_subscription(self._subscription) - stream_ack_deadline_seconds = subscription.ack_deadline_seconds + + # We must use a fixed value for the ACK deadline, as we cannot read it + # from the subscription. The latter would require `pubsub.subscriptions.get` + # permission, which is not granted to the default subscriber role + # `roles/pubsub.subscriber`. + # See also https://github.com/googleapis/google-cloud-python/issues/9339 + # + # When dynamic lease management is enabled for the "on hold" messages, + # the default stream ACK deadline should again be set based on the + # historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`. + stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE get_initial_request = functools.partial( self._get_initial_request, stream_ack_deadline_seconds diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index c8030a5773db..cb00a4b91ecd 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -381,6 +381,10 @@ class CallbackError(Exception): with pytest.raises(CallbackError): future.result(timeout=30) + @pytest.mark.xfail( + reason="The default stream ACK deadline is static and received messages " + "exceeding FlowControl.max_messages are currently not lease managed." + ) def test_streaming_pull_ack_deadline( self, publisher, subscriber, project, topic_path, subscription_path, cleanup ): @@ -395,7 +399,7 @@ def test_streaming_pull_ack_deadline( # Subscribe to the topic. This must happen before the messages # are published. subscriber.create_subscription( - subscription_path, topic_path, ack_deadline_seconds=60 + subscription_path, topic_path, ack_deadline_seconds=240 ) # publish some messages and wait for completion @@ -403,7 +407,7 @@ def test_streaming_pull_ack_deadline( # subscribe to the topic callback = StreamingPullCallback( - processing_time=15, # more than the default ACK deadline of 10 seconds + processing_time=70, # more than the default stream ACK deadline (60s) resolve_at_msg_count=3, # one more than the published messages count ) flow_control = types.FlowControl(max_messages=1) @@ -411,13 +415,13 @@ def test_streaming_pull_ack_deadline( subscription_path, callback, flow_control=flow_control ) - # We expect to process the first two messages in 2 * 15 seconds, and + # We expect to process the first two messages in 2 * 70 seconds, and # any duplicate message that is re-sent by the backend in additional - # 15 seconds, totalling 45 seconds (+ overhead) --> if there have been - # no duplicates in 60 seconds, we can reasonably assume that there + # 70 seconds, totalling 210 seconds (+ overhead) --> if there have been + # no duplicates in 240 seconds, we can reasonably assume that there # won't be any. try: - callback.done_future.result(timeout=60) + callback.done_future.result(timeout=240) except exceptions.TimeoutError: # future timed out, because we received no excessive messages assert sorted(callback.seen_message_ids) == [1, 2] diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 352b09ba83fc..2bd20caa04d1 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -404,12 +404,9 @@ def test_heartbeat_inactive(): "google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True ) def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc): + stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE + manager = make_manager() - manager._client.api.get_subscription.return_value = types.Subscription( - name="projects/foo/subscriptions/bar", - topic="projects/foo/topics/baz", - ack_deadline_seconds=123, - ) manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error) @@ -437,7 +434,8 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi ) initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"] assert initial_request_arg.func == manager._get_initial_request - assert initial_request_arg.args[0] == 123 + assert initial_request_arg.args[0] == stream_ack_deadline + assert not manager._client.api.get_subscription.called resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with( manager._on_rpc_done