From 6c7677ecb259672bbb9b6f7646919e602c698570 Mon Sep 17 00:00:00 2001 From: Gurov Ilya Date: Wed, 10 Jun 2020 00:13:52 +0300 Subject: [PATCH] refactor: incorporate will_accept() checks into publish() (#108) --- .../cloud/pubsub_v1/publisher/_batch/base.py | 26 ---------- .../pubsub_v1/publisher/_batch/thread.py | 4 +- .../pubsub_v1/publisher/batch/test_base.py | 30 ----------- .../pubsub_v1/publisher/batch/test_thread.py | 52 ++++++++++++++++--- .../publisher/test_publisher_client.py | 4 -- 5 files changed, 47 insertions(+), 69 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/base.py b/google/cloud/pubsub_v1/publisher/_batch/base.py index 53d3dee5b..212a4b277 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/base.py +++ b/google/cloud/pubsub_v1/publisher/_batch/base.py @@ -109,32 +109,6 @@ def status(self): """ raise NotImplementedError - def will_accept(self, message): - """Return True if the batch is able to accept the message. - - In concurrent implementations, the attributes on the current batch - may be modified by other workers. With this in mind, the caller will - likely want to hold a lock that will make sure the state remains - the same after the "will accept?" question is answered. - - Args: - message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message. - - Returns: - bool: Whether this batch can accept the message. - """ - # If this batch is not accepting messages generally, return False. - if self.status != BatchStatus.ACCEPTING_MESSAGES: - return False - - # If this message will make the batch exceed the ``max_messages`` - # setting, return False. - if len(self.messages) >= self.settings.max_messages: - return False - - # Okay, everything is good. - return True - def cancel(self, cancellation_reason): """Complete pending futures with an exception. diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 73210011d..67c9f2de3 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -333,8 +333,8 @@ def publish(self, message): self._status != base.BatchStatus.ERROR ), "Publish after stop() or publish error." - if not self.will_accept(message): - return future + if self.status != base.BatchStatus.ACCEPTING_MESSAGES: + return size_increase = types.PublishRequest(messages=[message]).ByteSize() diff --git a/tests/unit/pubsub_v1/publisher/batch/test_base.py b/tests/unit/pubsub_v1/publisher/batch/test_base.py index 96f18451d..f10b54ee5 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_base.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_base.py @@ -46,33 +46,3 @@ def test_len(): assert len(batch) == 0 batch.publish(types.PubsubMessage(data=b"foo")) assert len(batch) == 1 - - -def test_will_accept(): - batch = create_batch(status=BatchStatus.ACCEPTING_MESSAGES) - message = types.PubsubMessage() - assert batch.will_accept(message) is True - - -def test_will_accept_oversize(): - batch = create_batch( - settings=types.BatchSettings(max_bytes=10), - status=BatchStatus.ACCEPTING_MESSAGES, - ) - message = types.PubsubMessage(data=b"abcdefghijklmnopqrstuvwxyz") - assert batch.will_accept(message) is True - - -def test_will_not_accept_status(): - batch = create_batch(status="talk to the hand") - message = types.PubsubMessage() - assert batch.will_accept(message) is False - - -def test_will_not_accept_number(): - batch = create_batch( - settings=types.BatchSettings(max_messages=-1), - status=BatchStatus.ACCEPTING_MESSAGES, - ) - message = types.PubsubMessage(data=b"abc") - assert batch.will_accept(message) is False diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index ce288a48e..e9d2b09c0 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -287,18 +287,56 @@ def test_publish_updating_batch_size(): assert batch.size > 0 # I do not always trust protobuf. -def test_publish_not_will_accept(): +def test_publish(): + batch = create_batch() + message = types.PubsubMessage() + future = batch.publish(message) + + assert len(batch.messages) == 1 + assert batch._futures == [future] + + +def test_publish_max_messages_zero(): batch = create_batch(topic="topic_foo", max_messages=0) - base_request_size = types.PublishRequest(topic="topic_foo").ByteSize() - # Publish the message. message = types.PubsubMessage(data=b"foobarbaz") + with mock.patch.object(batch, "commit") as commit: + future = batch.publish(message) + + assert future is not None + assert len(batch.messages) == 1 + assert batch._futures == [future] + commit.assert_called_once() + + +def test_publish_max_messages_enforced(): + batch = create_batch(topic="topic_foo", max_messages=1) + + message = types.PubsubMessage(data=b"foobarbaz") + message2 = types.PubsubMessage(data=b"foobarbaz2") + + future = batch.publish(message) + future2 = batch.publish(message2) + + assert future is not None + assert future2 is None + assert len(batch.messages) == 1 + assert len(batch._futures) == 1 + + +def test_publish_max_bytes_enforced(): + batch = create_batch(topic="topic_foo", max_bytes=15) + + message = types.PubsubMessage(data=b"foobarbaz") + message2 = types.PubsubMessage(data=b"foobarbaz2") + future = batch.publish(message) + future2 = batch.publish(message2) - assert future is None - assert batch.size == base_request_size - assert batch.messages == [] - assert batch._futures == [] + assert future is not None + assert future2 is None + assert len(batch.messages) == 1 + assert len(batch._futures) == 1 def test_publish_exceed_max_messages(): diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 4e3a3870f..b58ed133f 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -135,7 +135,6 @@ def test_publish(): batch = mock.Mock(spec=client._batch_class) # Set the mock up to claim indiscriminately that it accepts all messages. - batch.will_accept.return_value = True batch.publish.side_effect = (future1, future2) topic = "topic/path" @@ -169,7 +168,6 @@ def test_publish_error_exceeding_flow_control_limits(): client = publisher.Client(credentials=creds, publisher_options=publisher_options) mock_batch = mock.Mock(spec=client._batch_class) - mock_batch.will_accept.return_value = True topic = "topic/path" client._set_batch(topic, mock_batch) @@ -216,7 +214,6 @@ def test_publish_attrs_bytestring(): # Use a mock in lieu of the actual batch class. batch = mock.Mock(spec=client._batch_class) # Set the mock up to claim indiscriminately that it accepts all messages. - batch.will_accept.return_value = True topic = "topic/path" client._set_batch(topic, batch) @@ -431,7 +428,6 @@ def test_publish_with_ordering_key(): future1.add_done_callback = mock.Mock(spec=["__call__"]) future2.add_done_callback = mock.Mock(spec=["__call__"]) - batch.will_accept.return_value = True batch.publish.side_effect = (future1, future2) topic = "topic/path"