From 492917b1be784e63361bb34821569136e0a30a77 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Fri, 10 Apr 2015 15:14:10 -0400 Subject: [PATCH 1/3] #797: add sortable 'Message.timestamp' property. Allows subscribers of timestamping topics to sort messages. --- gcloud/pubsub/message.py | 19 ++++++++++++++++++ gcloud/pubsub/test_message.py | 36 +++++++++++++++++++++++++++++++++++ regression/pubsub.py | 25 ++++++++++++++++-------- 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/gcloud/pubsub/message.py b/gcloud/pubsub/message.py index 38b907e62b69..009ca7ad8946 100644 --- a/gcloud/pubsub/message.py +++ b/gcloud/pubsub/message.py @@ -15,6 +15,11 @@ """Define API Topics.""" import base64 +import datetime + +import pytz + +RFC3369 = '%Y-%m-%dT%H:%M:%S.%fZ' class Message(object): @@ -44,6 +49,20 @@ def attributes(self): self._attributes = {} return self._attributes + @property + def timestamp(self): + """Return timestamp from attributes, if passed. + + :rtype: datetime + :returns: timestamp (in UTC timezone) parsed from RFC 3369 timestamp + :raises: ValueError if timestamp not in ``attributes``, or malformed + """ + stamp = self.attributes.get('timestamp') + if stamp is None: + raise ValueError('No timestamp') + return datetime.datetime.strptime(stamp, RFC3369).replace( + tzinfo=pytz.UTC) + @classmethod def from_api_repr(cls, api_repr): """Factory: construct message from API representation. diff --git a/gcloud/pubsub/test_message.py b/gcloud/pubsub/test_message.py index 32b2854a4eaa..58af1dbe8242 100644 --- a/gcloud/pubsub/test_message.py +++ b/gcloud/pubsub/test_message.py @@ -66,3 +66,39 @@ def test_from_api_repr_w_attributes(self): self.assertEqual(message.data, DATA) self.assertEqual(message.message_id, MESSAGE_ID) self.assertEqual(message.attributes, ATTRS) + + def test_timestamp_no_attributes(self): + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + message = self._makeOne(data=DATA, message_id=MESSAGE_ID) + + def _to_fail(): + return message.timestamp + + self.assertRaises(ValueError, _to_fail) + + def test_timestamp_wo_timestamp_in_attributes(self): + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + ATTRS = {'a': 'b'} + message = self._makeOne(data=DATA, message_id=MESSAGE_ID, + attributes=ATTRS) + + def _to_fail(): + return message.timestamp + + self.assertRaises(ValueError, _to_fail) + + def test_timestamp_w_timestamp_in_attributes(self): + from datetime import datetime + import pytz + DATA = b'DEADBEEF' + MESSAGE_ID = b'12345' + TIMESTAMP = '2015-04-10T18:42:27.131956Z' + RFC3369 = '%Y-%m-%dT%H:%M:%S.%fZ' + naive = datetime.strptime(TIMESTAMP, RFC3369) + timestamp = naive.replace(tzinfo=pytz.utc) + ATTRS = {'timestamp': TIMESTAMP} + message = self._makeOne(data=DATA, message_id=MESSAGE_ID, + attributes=ATTRS) + self.assertEqual(message.timestamp, timestamp) diff --git a/regression/pubsub.py b/regression/pubsub.py index 208f10f6d0fa..7c0db3cffc6e 100644 --- a/regression/pubsub.py +++ b/regression/pubsub.py @@ -103,7 +103,7 @@ def test_list_subscriptions(self): def test_message_pull_mode_e2e(self): TOPIC_NAME = 'subscribe-me' - topic = Topic(TOPIC_NAME) + topic = Topic(TOPIC_NAME, timestamp_messages=True) self.assertFalse(topic.exists()) topic.create() self.to_delete.append(topic) @@ -113,14 +113,23 @@ def test_message_pull_mode_e2e(self): subscription.create() self.to_delete.append(subscription) - MESSAGE = b'MESSAGE' - EXTRA = 'EXTRA' - topic.publish(MESSAGE, extra=EXTRA) + MESSAGE_1 = b'MESSAGE ONE' + MESSAGE_2 = b'MESSAGE ONE' + EXTRA_1 = 'EXTRA 1' + EXTRA_2 = 'EXTRA 2' + topic.publish(MESSAGE_1, extra=EXTRA_1) + topic.publish(MESSAGE_2, extra=EXTRA_2) - received = subscription.pull() + received = subscription.pull(max_messages=2) ack_ids = [recv[0] for recv in received] subscription.acknowledge(ack_ids) messages = [recv[1] for recv in received] - message, = messages - self.assertEqual(message.data, MESSAGE) - self.assertEqual(message.attributes, {'extra': EXTRA}) + + def _by_timestamp(message): + return message.timestamp + + message1, message2 = sorted(messages, key=_by_timestamp) + self.assertEqual(message1.data, MESSAGE_1) + self.assertEqual(message1.attributes['extra'], EXTRA_1) + self.assertEqual(message2.data, MESSAGE_2) + self.assertEqual(message2.attributes['extra'], EXTRA_2) From ddb06dbc46dceae67554c1fe988c659d93b845c9 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Fri, 10 Apr 2015 15:35:20 -0400 Subject: [PATCH 2/3] Rename strptime format constant: - Ref. correct RFC. - Private - Add '_MICROS' to indicate which format from the sheaf allowed by the RFC. Addresses points 1 and 2 from: https://github.com/GoogleCloudPlatform/gcloud-python/pull/816#discussion_r28172973. and: https://github.com/GoogleCloudPlatform/gcloud-python/pull/816#discussion_r28173336 --- gcloud/pubsub/message.py | 6 +++--- gcloud/pubsub/test_message.py | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/gcloud/pubsub/message.py b/gcloud/pubsub/message.py index 009ca7ad8946..b83c82a0def2 100644 --- a/gcloud/pubsub/message.py +++ b/gcloud/pubsub/message.py @@ -19,7 +19,7 @@ import pytz -RFC3369 = '%Y-%m-%dT%H:%M:%S.%fZ' +_RFC3339_MICROS = '%Y-%m-%dT%H:%M:%S.%fZ' class Message(object): @@ -54,13 +54,13 @@ def timestamp(self): """Return timestamp from attributes, if passed. :rtype: datetime - :returns: timestamp (in UTC timezone) parsed from RFC 3369 timestamp + :returns: timestamp (in UTC timezone) parsed from RFC 3339 timestamp :raises: ValueError if timestamp not in ``attributes``, or malformed """ stamp = self.attributes.get('timestamp') if stamp is None: raise ValueError('No timestamp') - return datetime.datetime.strptime(stamp, RFC3369).replace( + return datetime.datetime.strptime(stamp, _RFC3339_MICROS).replace( tzinfo=pytz.UTC) @classmethod diff --git a/gcloud/pubsub/test_message.py b/gcloud/pubsub/test_message.py index 58af1dbe8242..6cf97f677ec2 100644 --- a/gcloud/pubsub/test_message.py +++ b/gcloud/pubsub/test_message.py @@ -91,13 +91,13 @@ def _to_fail(): def test_timestamp_w_timestamp_in_attributes(self): from datetime import datetime - import pytz + from pytz import utc + from gcloud.pubsub.message import _RFC3339_MICROS DATA = b'DEADBEEF' MESSAGE_ID = b'12345' TIMESTAMP = '2015-04-10T18:42:27.131956Z' - RFC3369 = '%Y-%m-%dT%H:%M:%S.%fZ' - naive = datetime.strptime(TIMESTAMP, RFC3369) - timestamp = naive.replace(tzinfo=pytz.utc) + naive = datetime.strptime(TIMESTAMP, _RFC3339_MICROS) + timestamp = naive.replace(tzinfo=utc) ATTRS = {'timestamp': TIMESTAMP} message = self._makeOne(data=DATA, message_id=MESSAGE_ID, attributes=ATTRS) From 04ced702a2d3189d79224fc9b3cb1bef10f8d242 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Fri, 10 Apr 2015 19:50:21 -0400 Subject: [PATCH 3/3] Add note about sortability to docstring for 'Message.timestamp'. --- gcloud/pubsub/message.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/gcloud/pubsub/message.py b/gcloud/pubsub/message.py index b83c82a0def2..1a04c3ee6fa7 100644 --- a/gcloud/pubsub/message.py +++ b/gcloud/pubsub/message.py @@ -51,11 +51,15 @@ def attributes(self): @property def timestamp(self): - """Return timestamp from attributes, if passed. + """Return sortable timestamp from attributes, if passed. + + Allows sorting messages in publication order (assuming consistent + clocks across all publishers). :rtype: datetime :returns: timestamp (in UTC timezone) parsed from RFC 3339 timestamp - :raises: ValueError if timestamp not in ``attributes``, or malformed + :raises: ValueError if timestamp not in ``attributes``, or if it does + not match the RFC 3339 format. """ stamp = self.attributes.get('timestamp') if stamp is None: