From ed0f77a0cf99affbe331b35fc45dde16b370a2a4 Mon Sep 17 00:00:00 2001 From: Thomas Schultz Date: Fri, 1 Jul 2016 11:20:48 -0600 Subject: [PATCH 1/2] Re-enable bundling. Closes #1911 --- gcloud/_testing.py | 14 ++++++++++++++ gcloud/pubsub/_gax.py | 14 ++++++++++---- gcloud/pubsub/test__gax.py | 33 +++++++++++++++++++++++++++++---- 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/gcloud/_testing.py b/gcloud/_testing.py index 0a440e817436..15ef5dd298e1 100644 --- a/gcloud/_testing.py +++ b/gcloud/_testing.py @@ -59,3 +59,17 @@ def __init__(self, items, page_token): def next(self): items, self._items = self._items, None return items + + +class _GAXBundlingEvent(object): + + result = None + + def __init__(self, result): + self._result = result + + def is_set(self): + return self.result is not None + + def wait(self, *_): + self.result = self._result diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 28ac6c23e294..ce105f69fc5a 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -162,17 +162,23 @@ def topic_publish(self, topic_path, messages): :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not exist """ - options = CallOptions(is_bundling=False) + options = CallOptions(is_bundling=True) message_pbs = [_message_pb_from_dict(message) for message in messages] try: - result = self._gax_api.publish(topic_path, message_pbs, - options=options) + # result = self._gax_api.publish(topic_path, message_pbs, + # options=options) + + event = self._gax_api.publish(topic_path, message_pbs, + options=options) + if not event.is_set(): + event.wait() except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(topic_path) raise - return result.message_ids + # return result.message_ids + return event.result.message_ids def topic_list_subscriptions(self, topic_path, page_size=0, page_token=None): diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index 2426d2dfb7e8..98d4d4e7bbcd 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -204,12 +204,15 @@ def test_topic_delete_error(self): def test_topic_publish_hit(self): import base64 + from gcloud._testing import _GAXBundlingEvent PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MSGID = 'DEADBEEF' MESSAGE = {'data': B64, 'attributes': {}} response = _PublishResponsePB([MSGID]) - gax_api = _GAXPublisherAPI(_publish_response=response) + event = _GAXBundlingEvent(response) + event.wait() # already received result + gax_api = _GAXPublisherAPI(_publish_response=event) api = self._makeOne(gax_api) resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE]) @@ -220,7 +223,29 @@ def test_topic_publish_hit(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {}) - self.assertEqual(options.is_bundling, False) + self.assertEqual(options.is_bundling, True) + + def test_topic_publish_hit_with_wait(self): + import base64 + from gcloud._testing import _GAXBundlingEvent + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, 'attributes': {}} + response = _PublishResponsePB([MSGID]) + event = _GAXBundlingEvent(response) + gax_api = _GAXPublisherAPI(_publish_response=event) + api = self._makeOne(gax_api) + + resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + self.assertEqual(resource, [MSGID]) + topic_path, message_pbs, options = gax_api._publish_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + message_pb, = message_pbs + self.assertEqual(message_pb.data, B64) + self.assertEqual(message_pb.attributes, {}) + self.assertEqual(options.is_bundling, True) def test_topic_publish_miss_w_attrs_w_bytes_payload(self): import base64 @@ -239,7 +264,7 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {'foo': 'bar'}) - self.assertEqual(options.is_bundling, False) + self.assertEqual(options.is_bundling, True) def test_topic_publish_error(self): import base64 @@ -258,7 +283,7 @@ def test_topic_publish_error(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {}) - self.assertEqual(options.is_bundling, False) + self.assertEqual(options.is_bundling, True) def test_topic_list_subscriptions_no_paging(self): from google.gax import INITIAL_PAGE From da259a8ec77caa424d14d7e2f4f86b5a666cff50 Mon Sep 17 00:00:00 2001 From: Thomas Schultz Date: Fri, 1 Jul 2016 12:20:39 -0600 Subject: [PATCH 2/2] Remove CallOptions and missed commented code. --- gcloud/pubsub/_gax.py | 8 +------- gcloud/pubsub/test__gax.py | 8 ++++---- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index ce105f69fc5a..0639833feb73 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -162,22 +162,16 @@ def topic_publish(self, topic_path, messages): :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not exist """ - options = CallOptions(is_bundling=True) message_pbs = [_message_pb_from_dict(message) for message in messages] try: - # result = self._gax_api.publish(topic_path, message_pbs, - # options=options) - - event = self._gax_api.publish(topic_path, message_pbs, - options=options) + event = self._gax_api.publish(topic_path, message_pbs) if not event.is_set(): event.wait() except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(topic_path) raise - # return result.message_ids return event.result.message_ids def topic_list_subscriptions(self, topic_path, page_size=0, diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index 98d4d4e7bbcd..d285cb6e3260 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -223,7 +223,7 @@ def test_topic_publish_hit(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {}) - self.assertEqual(options.is_bundling, True) + self.assertEqual(options, None) def test_topic_publish_hit_with_wait(self): import base64 @@ -245,7 +245,7 @@ def test_topic_publish_hit_with_wait(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {}) - self.assertEqual(options.is_bundling, True) + self.assertEqual(options, None) def test_topic_publish_miss_w_attrs_w_bytes_payload(self): import base64 @@ -264,7 +264,7 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {'foo': 'bar'}) - self.assertEqual(options.is_bundling, True) + self.assertEqual(options, None) def test_topic_publish_error(self): import base64 @@ -283,7 +283,7 @@ def test_topic_publish_error(self): message_pb, = message_pbs self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {}) - self.assertEqual(options.is_bundling, True) + self.assertEqual(options, None) def test_topic_list_subscriptions_no_paging(self): from google.gax import INITIAL_PAGE