diff --git a/gcloud/_testing.py b/gcloud/_testing.py index 0a440e817436..15ef5dd298e1 100644 --- a/gcloud/_testing.py +++ b/gcloud/_testing.py @@ -59,3 +59,17 @@ def __init__(self, items, page_token): def next(self): items, self._items = self._items, None return items + + +class _GAXBundlingEvent(object): + + result = None + + def __init__(self, result): + self._result = result + + def is_set(self): + return self.result is not None + + def wait(self, *_): + self.result = self._result diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 28ac6c23e294..0639833feb73 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -162,17 +162,17 @@ def topic_publish(self, topic_path, messages): :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not exist """ - options = CallOptions(is_bundling=False) message_pbs = [_message_pb_from_dict(message) for message in messages] try: - result = self._gax_api.publish(topic_path, message_pbs, - options=options) + event = self._gax_api.publish(topic_path, message_pbs) + if not event.is_set(): + event.wait() except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(topic_path) raise - return result.message_ids + return event.result.message_ids def topic_list_subscriptions(self, topic_path, page_size=0, page_token=None): diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index 2426d2dfb7e8..d285cb6e3260 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -204,12 +204,15 @@ def test_topic_delete_error(self): def test_topic_publish_hit(self): import base64 + from gcloud._testing import _GAXBundlingEvent PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {}} response = _PublishResponsePB([MSGID]) - gax_api = _GAXPublisherAPI(_publish_response=response) + event = _GAXBundlingEvent(response) + event.wait() # already received result + gax_api = _GAXPublisherAPI(_publish_response=event) api = self._makeOne(gax_api) resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE]) @@ -220,7 +223,29 @@ def test_topic_publish_hit(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {}) - self.assertEqual(options.is_bundling, False) + self.assertEqual(options, None) + + def test_topic_publish_hit_with_wait(self): + import base64 + from gcloud._testing import _GAXBundlingEvent + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, 'attributes': {}} + response = _PublishResponsePB([MSGID]) + event = _GAXBundlingEvent(response) + gax_api = _GAXPublisherAPI(_publish_response=event) + api = self._makeOne(gax_api) + + resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + self.assertEqual(resource, [MSGID]) + topic_path, message_pbs, options = gax_api._publish_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + message_pb, = message_pbs + self.assertEqual(message_pb.data, B64) + self.assertEqual(message_pb.attributes, {}) + self.assertEqual(options, None) def test_topic_publish_miss_w_attrs_w_bytes_payload(self): import base64 @@ -239,7 +264,7 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {'foo': 'bar'}) - self.assertEqual(options.is_bundling, False) + self.assertEqual(options, None) def test_topic_publish_error(self): import base64 @@ -258,7 +283,7 @@ def test_topic_publish_error(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {}) - self.assertEqual(options.is_bundling, False) + self.assertEqual(options, None) def test_topic_list_subscriptions_no_paging(self): from google.gax import INITIAL_PAGE