Skip to content

Commit

Permalink
Re-enable bundling. Closes #1911
Browse files Browse the repository at this point in the history
  • Loading branch information
daspecster committed Jul 1, 2016
1 parent 7d29482 commit ed0f77a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 8 deletions.
14 changes: 14 additions & 0 deletions gcloud/_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,17 @@ def __init__(self, items, page_token):
def next(self):
items, self._items = self._items, None
return items


class _GAXBundlingEvent(object):

result = None

def __init__(self, result):
self._result = result

def is_set(self):
return self.result is not None

def wait(self, *_):
self.result = self._result

This comment has been minimized.

Copy link
@bjwatson

bjwatson Jul 1, 2016

This might not be needed if the commented-out code below is re-enabled.

14 changes: 10 additions & 4 deletions gcloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,23 @@ def topic_publish(self, topic_path, messages):
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
options = CallOptions(is_bundling=False)
options = CallOptions(is_bundling=True)
message_pbs = [_message_pb_from_dict(message)
for message in messages]
try:
result = self._gax_api.publish(topic_path, message_pbs,
options=options)
# result = self._gax_api.publish(topic_path, message_pbs,
# options=options)

event = self._gax_api.publish(topic_path, message_pbs,
options=options)
if not event.is_set():
event.wait()

This comment has been minimized.

Copy link
@bjwatson

bjwatson Jul 1, 2016

I think the commented-out code above is still desired. Is that what you were saying @geigerj?

This comment has been minimized.

Copy link
@geigerj

geigerj Jul 6, 2016

Contributor

Well, this code is putting a single Publish call into a bundle, then waiting until the timeout causes the bundle to send. So, it's effectively just an inefficient version of the commented-out code with bundling disabled (unless it's being used in a threaded environment).

This comment has been minimized.

Copy link
@daspecster

daspecster Jul 6, 2016

Author Contributor

Ok cool, so we can just set options.is_bundling = True and we should be good?

This comment has been minimized.

Copy link
@geigerj

geigerj Jul 6, 2016

Contributor

Not sure I understand -- if options.is_bundling=True, then the _gax_api.publish() will return an Event rather than a proto response, so the code will have to be different.

To be clear, is this method topic_publish intended to immediately return a response?

This comment has been minimized.

Copy link
@geigerj

geigerj Jul 6, 2016

Contributor

(I think I'm missing some of the context of this change -- I'm just trying to figure out how this _gax.py wrapper is actually using or exposing the GAX bundling feature.)

This comment has been minimized.

Copy link
@daspecster

daspecster Jul 6, 2016

Author Contributor

I think it might be clearer to discuss in #1950.

This comment has been minimized.

Copy link
@tseaver

tseaver Jul 6, 2016

Contributor

@geigerj This method is indeed supposed to return the message IDs immediately. I'm sure we don't understand GAX bundling well enough to figure out whether / how to be using it here, so maybe we should just leave it disabled. Is the bundling feature documented somewhere?

This comment has been minimized.

Copy link
@geigerj

geigerj Jul 6, 2016

Contributor

If you want to return message IDs immediately, you should disable bundling. The benefit of bundling is in a situation where you're willing to wait for a response asynchronously (or ignore the response altogether) so that you can fit multiple calls to publish into a single RPC.

Our documentation really isn't sufficient in this regard; I've filed googleapis/gapic-generator#287 to improve it.

As @daspecster suggested, we can continue the discussion on #1950.

except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
return result.message_ids
# return result.message_ids
return event.result.message_ids

def topic_list_subscriptions(self, topic_path, page_size=0,
page_token=None):
Expand Down
33 changes: 29 additions & 4 deletions gcloud/pubsub/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,15 @@ def test_topic_delete_error(self):

def test_topic_publish_hit(self):
import base64
from gcloud._testing import _GAXBundlingEvent
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD).decode('ascii')
MSGID = 'DEADBEEF'
MESSAGE = {'data': B64, 'attributes': {}}
response = _PublishResponsePB([MSGID])
gax_api = _GAXPublisherAPI(_publish_response=response)
event = _GAXBundlingEvent(response)
event.wait() # already received result
gax_api = _GAXPublisherAPI(_publish_response=event)
api = self._makeOne(gax_api)

resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE])
Expand All @@ -220,7 +223,29 @@ def test_topic_publish_hit(self):
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {})
self.assertEqual(options.is_bundling, False)
self.assertEqual(options.is_bundling, True)

def test_topic_publish_hit_with_wait(self):
import base64
from gcloud._testing import _GAXBundlingEvent
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD).decode('ascii')
MSGID = 'DEADBEEF'
MESSAGE = {'data': B64, 'attributes': {}}
response = _PublishResponsePB([MSGID])
event = _GAXBundlingEvent(response)
gax_api = _GAXPublisherAPI(_publish_response=event)
api = self._makeOne(gax_api)

resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE])

self.assertEqual(resource, [MSGID])
topic_path, message_pbs, options = gax_api._publish_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {})
self.assertEqual(options.is_bundling, True)

def test_topic_publish_miss_w_attrs_w_bytes_payload(self):
import base64
Expand All @@ -239,7 +264,7 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self):
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {'foo': 'bar'})
self.assertEqual(options.is_bundling, False)
self.assertEqual(options.is_bundling, True)

def test_topic_publish_error(self):
import base64
Expand All @@ -258,7 +283,7 @@ def test_topic_publish_error(self):
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {})
self.assertEqual(options.is_bundling, False)
self.assertEqual(options.is_bundling, True)

def test_topic_list_subscriptions_no_paging(self):
from google.gax import INITIAL_PAGE
Expand Down

0 comments on commit ed0f77a

Please sign in to comment.