Skip to content

Commit

Permalink
fix(pubsub): streaming pull shouldn't need subscriptions.get permission
Browse files Browse the repository at this point in the history
Pulling the mesages with the streaming pull should work with the
default pubsub.subscriber role.

This commit removes the call to fetch a subscription, and replaces the
subscription's ACK deadline with a fixed deadline of 60 seconds.

That *will* re-introduce the issue googleapis#9252, but at least in a less severe
manner.
  • Loading branch information
plamut committed Sep 30, 2019
1 parent 1026350 commit 99a0bfa
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
_RESUME_THRESHOLD = 0.8
"""The load threshold below which to resume the incoming message stream."""

_DEFAULT_STREAM_ACK_DEADLINE = 60
"""The default message acknowledge deadline in seconds for incoming message stream.
This default deadline is dynamically modified for the messages that are added
to the lease management.
"""


def _maybe_wrap_exception(exception):
"""Wraps a gRPC exception class, if needed."""
Expand Down Expand Up @@ -384,8 +391,17 @@ def open(self, callback, on_callback_error):
)

# Create the RPC
subscription = self._client.api.get_subscription(self._subscription)
stream_ack_deadline_seconds = subscription.ack_deadline_seconds

# We must use a fixed value for the ACK deadline, as we cannot read it
# from the subscription. The latter would require `pubsub.subscriptions.get`
# permission, which is not granted to the default subscriber role
# `roles/pubsub.subscriber`.
# See also https://github.com/googleapis/google-cloud-python/issues/9339
#
# When dynamic lease management is enabled for the "on hold" messages,
# the defautl stream ACK deadline should again be set based on the
# historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`.
stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE

get_initial_request = functools.partial(
self._get_initial_request, stream_ack_deadline_seconds
Expand Down
16 changes: 10 additions & 6 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ class CallbackError(Exception):
with pytest.raises(CallbackError):
future.result(timeout=30)

@pytest.mark.xfail(
reason="The default stream ACK deadline is static and received messages "
"exceeding FlowControl.max_messages are currently not lease managed."
)
def test_streaming_pull_ack_deadline(
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
):
Expand All @@ -395,29 +399,29 @@ def test_streaming_pull_ack_deadline(
# Subscribe to the topic. This must happen before the messages
# are published.
subscriber.create_subscription(
subscription_path, topic_path, ack_deadline_seconds=60
subscription_path, topic_path, ack_deadline_seconds=240
)

# publish some messages and wait for completion
self._publish_messages(publisher, topic_path, batch_sizes=[2])

# subscribe to the topic
callback = StreamingPullCallback(
processing_time=15, # more than the default ACK deadline of 10 seconds
processing_time=70, # more than the default stream ACK deadline (60s)
resolve_at_msg_count=3, # one more than the published messages count
)
flow_control = types.FlowControl(max_messages=1)
subscription_future = subscriber.subscribe(
subscription_path, callback, flow_control=flow_control
)

# We expect to process the first two messages in 2 * 15 seconds, and
# We expect to process the first two messages in 2 * 70 seconds, and
# any duplicate message that is re-sent by the backend in additional
# 15 seconds, totalling 45 seconds (+ overhead) --> if there have been
# no duplicates in 60 seconds, we can reasonably assume that there
# 70 seconds, totalling 210 seconds (+ overhead) --> if there have been
# no duplicates in 240 seconds, we can reasonably assume that there
# won't be any.
try:
callback.done_future.result(timeout=60)
callback.done_future.result(timeout=240)
except exceptions.TimeoutError:
# future timed out, because we received no excessive messages
assert sorted(callback.seen_message_ids) == [1, 2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,9 @@ def test_heartbeat_inactive():
"google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True
)
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE

manager = make_manager()
manager._client.api.get_subscription.return_value = types.Subscription(
name="projects/foo/subscriptions/bar",
topic="projects/foo/topics/baz",
ack_deadline_seconds=123,
)

manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)

Expand Down Expand Up @@ -437,7 +434,8 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
)
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
assert initial_request_arg.func == manager._get_initial_request
assert initial_request_arg.args[0] == 123
assert initial_request_arg.args[0] == stream_ack_deadline
assert not manager._client.api.get_subscription.called

resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
manager._on_rpc_done
Expand Down

0 comments on commit 99a0bfa

Please sign in to comment.