diff --git a/core/google/cloud/_helpers.py b/core/google/cloud/_helpers.py index f94df65c167f..2c2f08dcfb45 100644 --- a/core/google/cloud/_helpers.py +++ b/core/google/cloud/_helpers.py @@ -439,16 +439,9 @@ def _timedelta_to_duration_pb(timedelta_val): :rtype: :class:`google.protobuf.duration_pb2.Duration` :returns: A duration object equivalent to the time delta. """ - seconds_decimal = timedelta_val.total_seconds() - # Truncate the parts other than the integer. - seconds = int(seconds_decimal) - if seconds_decimal < 0: - signed_micros = timedelta_val.microseconds - 10**6 - else: - signed_micros = timedelta_val.microseconds - # Convert nanoseconds to microseconds. - nanos = 1000 * signed_micros - return duration_pb2.Duration(seconds=seconds, nanos=nanos) + duration_pb = duration_pb2.Duration() + duration_pb.FromTimedelta(timedelta_val) + return duration_pb def _duration_pb_to_timedelta(duration_pb): diff --git a/pubsub/google/cloud/pubsub/_gax.py b/pubsub/google/cloud/pubsub/_gax.py index 599c99ef1d44..6b47b42524a3 100644 --- a/pubsub/google/cloud/pubsub/_gax.py +++ b/pubsub/google/cloud/pubsub/_gax.py @@ -30,6 +30,7 @@ from google.cloud._helpers import _to_bytes from google.cloud._helpers import _pb_timestamp_to_rfc3339 +from google.cloud._helpers import _timedelta_to_duration_pb from google.cloud._helpers import make_secure_channel from google.cloud._http import DEFAULT_USER_AGENT from google.cloud.exceptions import Conflict @@ -276,7 +277,9 @@ def list_subscriptions(self, project, page_size=0, page_token=None): return GAXIterator(self._client, page_iter, item_to_value) def subscription_create(self, subscription_path, topic_path, - ack_deadline=None, push_endpoint=None): + ack_deadline=None, push_endpoint=None, + retain_acked_messages=None, + message_retention_duration=None): """API call: create a subscription See: @@ -302,6 +305,18 @@ def subscription_create(self, subscription_path, topic_path, (Optional) URL to which messages will be pushed by the back-end. If not set, the application must pull messages. + :type retain_acked_messages: bool + :param retain_acked_messages: + (Optional) Whether to retain acked messages. If set, acked messages + are retained in the subscription's backlog for a duration indicated + by `message_retention_duration`. + + :type message_retention_duration: :class:`datetime.timedelta` + :param message_retention_duration: + (Optional) Whether to retain acked messages. If set, acked messages + are retained in the subscription's backlog for a duration indicated + by `message_retention_duration`. If unset, defaults to 7 days. + :rtype: dict :returns: ``Subscription`` resource returned from the API. """ @@ -310,13 +325,16 @@ def subscription_create(self, subscription_path, topic_path, else: push_config = None - if ack_deadline is None: - ack_deadline = 0 + if message_retention_duration is not None: + message_retention_duration = _timedelta_to_duration_pb( + message_retention_duration) try: sub_pb = self._gax_api.create_subscription( subscription_path, topic_path, - push_config=push_config, ack_deadline_seconds=ack_deadline) + push_config=push_config, ack_deadline_seconds=ack_deadline, + retain_acked_messages=retain_acked_messages, + message_retention_duration=message_retention_duration) except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: raise Conflict(topic_path) diff --git a/pubsub/google/cloud/pubsub/_http.py b/pubsub/google/cloud/pubsub/_http.py index e9538dce22d4..47fa7015c60d 100644 --- a/pubsub/google/cloud/pubsub/_http.py +++ b/pubsub/google/cloud/pubsub/_http.py @@ -20,6 +20,7 @@ import os from google.cloud import _http +from google.cloud._helpers import _timedelta_to_duration_pb from google.cloud.environment_vars import PUBSUB_EMULATOR from google.cloud.iterator import HTTPIterator @@ -295,7 +296,9 @@ def list_subscriptions(self, project, page_size=None, page_token=None): extra_params=extra_params) def subscription_create(self, subscription_path, topic_path, - ack_deadline=None, push_endpoint=None): + ack_deadline=None, push_endpoint=None, + retain_acked_messages=None, + message_retention_duration=None): """API call: create a subscription See: @@ -321,6 +324,18 @@ def subscription_create(self, subscription_path, topic_path, (Optional) URL to which messages will be pushed by the back-end. If not set, the application must pull messages. + :type retain_acked_messages: bool + :param retain_acked_messages: + (Optional) Whether to retain acked messages. If set, acked messages + are retained in the subscription's backlog for a duration indicated + by `message_retention_duration`. + + :type message_retention_duration: :class:`datetime.timedelta` + :param message_retention_duration: + (Optional) Whether to retain acked messages. If set, acked messages + are retained in the subscription's backlog for a duration indicated + by `message_retention_duration`. If unset, defaults to 7 days. + :rtype: dict :returns: ``Subscription`` resource returned from the API. """ @@ -333,6 +348,16 @@ def subscription_create(self, subscription_path, topic_path, if push_endpoint is not None: resource['pushConfig'] = {'pushEndpoint': push_endpoint} + if retain_acked_messages is not None: + resource['retainAckedMessages'] = retain_acked_messages + + if message_retention_duration is not None: + pb = _timedelta_to_duration_pb(message_retention_duration) + resource['messageRetentionDuration'] = { + 'seconds': pb.seconds, + 'nanos': pb.nanos + } + return self.api_request(method='PUT', path=path, data=resource) def subscription_get(self, subscription_path): diff --git a/pubsub/google/cloud/pubsub/subscription.py b/pubsub/google/cloud/pubsub/subscription.py index 19089380cf41..100ac13474b6 100644 --- a/pubsub/google/cloud/pubsub/subscription.py +++ b/pubsub/google/cloud/pubsub/subscription.py @@ -14,6 +14,8 @@ """Define API Subscriptions.""" +import datetime + from google.cloud.exceptions import NotFound from google.cloud.pubsub._helpers import topic_name_from_path from google.cloud.pubsub.iam import Policy @@ -43,6 +45,19 @@ class Subscription(object): (Optional) URL to which messages will be pushed by the back-end. If not set, the application must pull messages. + :type retain_acked_messages: bool + :param retain_acked_messages: + (Optional) Whether to retain acked messages. If set, acked messages + are retained in the subscription's backlog for a duration indicated + by `message_retention_duration`. + + :type message_retention_duration: :class:`datetime.timedelta` + :param message_retention_duration: + (Optional) Whether to retain acked messages. If set, acked messages + are retained in the subscription's backlog for a duration indicated + by `message_retention_duration`. If unset, defaults to 7 days. + + :type client: :class:`~google.cloud.pubsub.client.Client` :param client: (Optional) The client to use. If not passed, falls back to the @@ -57,6 +72,7 @@ class Subscription(object): """ def __init__(self, name, topic=None, ack_deadline=None, push_endpoint=None, + retain_acked_messages=None, message_retention_duration=None, client=None): if client is None and topic is None: @@ -71,6 +87,8 @@ def __init__(self, name, topic=None, ack_deadline=None, push_endpoint=None, self._project = self._client.project self.ack_deadline = ack_deadline self.push_endpoint = push_endpoint + self.retain_acked_messages = retain_acked_messages + self.message_retention_duration = message_retention_duration @classmethod def from_api_repr(cls, resource, client, topics=None): @@ -107,10 +125,21 @@ def from_api_repr(cls, resource, client, topics=None): ack_deadline = resource.get('ackDeadlineSeconds') push_config = resource.get('pushConfig', {}) push_endpoint = push_config.get('pushEndpoint') + retain_acked_messages = resource.get('retainAckedMessages') + resource_duration = resource.get('duration', {}) + message_retention_duration = datetime.timedelta( + seconds=resource_duration.get('seconds', 0), + microseconds=resource_duration.get('nanos', 0) / 1000) if topic is None: return cls(name, ack_deadline=ack_deadline, - push_endpoint=push_endpoint, client=client) - return cls(name, topic, ack_deadline, push_endpoint) + push_endpoint=push_endpoint, + retain_acked_messages=retain_acked_messages, + message_retention_duration=message_retention_duration, + client=client) + return cls(name, topic=topic, ack_deadline=ack_deadline, + push_endpoint=push_endpoint, + retain_acked_messages=retain_acked_messages, + message_retention_duration=message_retention_duration) @property def project(self): @@ -182,8 +211,10 @@ def create(self, client=None): client = self._require_client(client) api = client.subscriber_api api.subscription_create( - self.full_name, self.topic.full_name, self.ack_deadline, - self.push_endpoint) + self.full_name, self.topic.full_name, + ack_deadline=self.ack_deadline, push_endpoint=self.push_endpoint, + retain_acked_messages=self.retain_acked_messages, + message_retention_duration=self.message_retention_duration) def exists(self, client=None): """API call: test existence of the subscription via a GET request diff --git a/pubsub/google/cloud/pubsub/topic.py b/pubsub/google/cloud/pubsub/topic.py index 5490617a3ea5..4e038370c535 100644 --- a/pubsub/google/cloud/pubsub/topic.py +++ b/pubsub/google/cloud/pubsub/topic.py @@ -52,7 +52,9 @@ def __init__(self, name, client, timestamp_messages=False): self._client = client self.timestamp_messages = timestamp_messages - def subscription(self, name, ack_deadline=None, push_endpoint=None): + def subscription(self, name, ack_deadline=None, push_endpoint=None, + retain_acked_messages=None, + message_retention_duration=None): """Creates a subscription bound to the current topic. Example: pull-mode subcription, default paramter values @@ -85,11 +87,25 @@ def subscription(self, name, ack_deadline=None, push_endpoint=None): back-end. If not set, the application must pull messages. + :type retain_acked_messages: bool + :param retain_acked_messages: + (Optional) Whether to retain acked messages. If set, acked messages + are retained in the subscription's backlog for a duration indicated + by `message_retention_duration`. + + :type message_retention_duration: :class:`datetime.timedelta` + :param message_retention_duration: + (Optional) Whether to retain acked messages. If set, acked messages + are retained in the subscription's backlog for a duration indicated + by `message_retention_duration`. If unset, defaults to 7 days. + :rtype: :class:`Subscription` :returns: The subscription created with the passed in arguments. """ - return Subscription(name, self, ack_deadline=ack_deadline, - push_endpoint=push_endpoint) + return Subscription( + name, self, ack_deadline=ack_deadline, push_endpoint=push_endpoint, + retain_acked_messages=retain_acked_messages, + message_retention_duration=message_retention_duration) @classmethod def from_api_repr(cls, resource, client): diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 93ad19b76647..5327d0418114 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import os import unittest @@ -155,6 +156,26 @@ def test_create_subscription_w_ack_deadline(self): self.assertEqual(subscription.ack_deadline, 120) self.assertIs(subscription.topic, topic) + def test_create_subscription_w_message_retention(self): + TOPIC_NAME = 'create-sub-ack' + unique_resource_id('-') + topic = Config.CLIENT.topic(TOPIC_NAME) + self.assertFalse(topic.exists()) + topic.create() + self.to_delete.append(topic) + SUBSCRIPTION_NAME = 'subscribing-now' + unique_resource_id() + duration = datetime.timedelta(hours=12) + subscription = topic.subscription( + SUBSCRIPTION_NAME, retain_acked_messages=True, + message_retention_duration=duration) + self.assertFalse(subscription.exists()) + subscription.create() + self.to_delete.append(subscription) + self.assertTrue(subscription.exists()) + self.assertEqual(subscription.name, SUBSCRIPTION_NAME) + self.assertTrue(subscription.retain_acked_messages) + self.assertEqual(subscription.message_retention_duration, duration) + self.assertIs(subscription.topic, topic) + def test_list_subscriptions(self): TOPIC_NAME = 'list-sub' + unique_resource_id('-') topic = Config.CLIENT.topic(TOPIC_NAME) @@ -287,3 +308,6 @@ def test_subscription_iam_policy(self): policy.viewers = viewers new_policy = subscription.set_iam_policy(policy) self.assertEqual(new_policy.viewers, policy.viewers) + + # TODO(geigerj): set retain_acked_messages=True in snapshot system test once + # PR #3303 is merged diff --git a/pubsub/tests/unit/test__gax.py b/pubsub/tests/unit/test__gax.py index 8a41d3ff70f8..2da629e92bc8 100644 --- a/pubsub/tests/unit/test__gax.py +++ b/pubsub/tests/unit/test__gax.py @@ -543,12 +543,54 @@ def test_subscription_create(self): 'topic': self.TOPIC_PATH, } self.assertEqual(resource, expected) - name, topic, push_config, ack_deadline, options = ( + (name, topic, push_config, ack_deadline, retain_acked_messages, + message_retention_duration, options) = ( gax_api._create_subscription_called_with) self.assertEqual(name, self.SUB_PATH) self.assertEqual(topic, self.TOPIC_PATH) self.assertIsNone(push_config) - self.assertEqual(ack_deadline, 0) + self.assertEqual(ack_deadline, None) + self.assertIsNone(retain_acked_messages) + self.assertIsNone(message_retention_duration) + self.assertIsNone(options) + + def test_subscription_create_optional_params(self): + import datetime + + from google.cloud.proto.pubsub.v1.pubsub_pb2 import Subscription + + sub_pb = Subscription(name=self.SUB_PATH, topic=self.TOPIC_PATH) + gax_api = _GAXSubscriberAPI(_create_subscription_response=sub_pb) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + expected_ack_deadline = 1729 + expected_push_endpoint = 'push-endpoint' + expected_retain_acked_messages = True + expected_message_retention_duration = datetime.timedelta( + days=1, hours=7, minutes=2, seconds=9) + + resource = api.subscription_create( + self.SUB_PATH, self.TOPIC_PATH, ack_deadline=expected_ack_deadline, + push_endpoint=expected_push_endpoint, + retain_acked_messages=expected_retain_acked_messages, + message_retention_duration=expected_message_retention_duration) + + expected = { + 'name': self.SUB_PATH, + 'topic': self.TOPIC_PATH, + } + self.assertEqual(resource, expected) + (name, topic, push_config, ack_deadline, retain_acked_messages, + message_retention_duration, options) = ( + gax_api._create_subscription_called_with) + print(gax_api._create_subscription_called_with) + self.assertEqual(name, self.SUB_PATH) + self.assertEqual(topic, self.TOPIC_PATH) + self.assertEqual(push_config.push_endpoint, expected_push_endpoint) + self.assertEqual(ack_deadline, expected_ack_deadline) + self.assertEqual(retain_acked_messages, expected_retain_acked_messages) + self.assertEqual(message_retention_duration.seconds, + expected_message_retention_duration.total_seconds()) self.assertIsNone(options) def test_subscription_create_already_exists(self): @@ -563,12 +605,15 @@ def test_subscription_create_already_exists(self): api.subscription_create( self.SUB_PATH, self.TOPIC_PATH, DEADLINE, self.PUSH_ENDPOINT) - name, topic, push_config, ack_deadline, options = ( + (name, topic, push_config, ack_deadline, retain_acked_messages, + message_retention_duration, options) = ( gax_api._create_subscription_called_with) self.assertEqual(name, self.SUB_PATH) self.assertEqual(topic, self.TOPIC_PATH) self.assertEqual(push_config.push_endpoint, self.PUSH_ENDPOINT) self.assertEqual(ack_deadline, DEADLINE) + self.assertIsNone(retain_acked_messages) + self.assertIsNone(message_retention_duration) self.assertIsNone(options) def test_subscription_create_error(self): @@ -581,12 +626,15 @@ def test_subscription_create_error(self): with self.assertRaises(GaxError): api.subscription_create(self.SUB_PATH, self.TOPIC_PATH) - name, topic, push_config, ack_deadline, options = ( + (name, topic, push_config, ack_deadline, retain_acked_messages, + message_retention_duration, options) = ( gax_api._create_subscription_called_with) self.assertEqual(name, self.SUB_PATH) self.assertEqual(topic, self.TOPIC_PATH) self.assertIsNone(push_config) - self.assertEqual(ack_deadline, 0) + self.assertEqual(ack_deadline, None) + self.assertIsNone(retain_acked_messages) + self.assertIsNone(message_retention_duration) self.assertIsNone(options) def test_subscription_get_hit(self): @@ -1158,13 +1206,16 @@ def list_subscriptions(self, project, page_size, options=None): self._list_subscriptions_called_with = (project, page_size, options) return self._list_subscriptions_response - def create_subscription(self, name, topic, - push_config, ack_deadline_seconds, + def create_subscription(self, name, topic, push_config=None, + ack_deadline_seconds=None, + retain_acked_messages=None, + message_retention_duration=None, options=None): from google.gax.errors import GaxError self._create_subscription_called_with = ( - name, topic, push_config, ack_deadline_seconds, options) + name, topic, push_config, ack_deadline_seconds, + retain_acked_messages, message_retention_duration, options) if self._random_gax_error: raise GaxError('error') if self._create_subscription_conflict: diff --git a/pubsub/tests/unit/test__http.py b/pubsub/tests/unit/test__http.py index 3f94b966dd66..8d6d6f05fcf6 100644 --- a/pubsub/tests/unit/test__http.py +++ b/pubsub/tests/unit/test__http.py @@ -607,6 +607,34 @@ def test_subscription_create_defaults(self): self.assertEqual(connection._called_with['path'], path) self.assertEqual(connection._called_with['data'], RESOURCE) + def test_subscription_create_retain_messages(self): + import datetime + + RESOURCE = {'topic': self.TOPIC_PATH, + 'retainAckedMessages': True, + 'messageRetentionDuration': { + 'seconds': 1729, + 'nanos': 2718 * 1000 + } + } + RETURNED = RESOURCE.copy() + RETURNED['name'] = self.SUB_PATH + connection = _Connection(RETURNED) + client = _Client(connection, self.PROJECT) + api = self._make_one(client) + + resource = api.subscription_create( + self.SUB_PATH, self.TOPIC_PATH, + retain_acked_messages=True, + message_retention_duration=datetime.timedelta( + seconds=1729, microseconds=2718)) + + self.assertEqual(resource, RETURNED) + self.assertEqual(connection._called_with['method'], 'PUT') + path = '/%s' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], RESOURCE) + def test_subscription_create_explicit(self): ACK_DEADLINE = 90 PUSH_ENDPOINT = 'https://api.example.com/push' diff --git a/pubsub/tests/unit/test_subscription.py b/pubsub/tests/unit/test_subscription.py index f16f0ad9126e..feebf069915d 100644 --- a/pubsub/tests/unit/test_subscription.py +++ b/pubsub/tests/unit/test_subscription.py @@ -189,8 +189,9 @@ def test_create_pull_wo_ack_deadline_w_bound_client(self): subscription.create() - self.assertEqual(api._subscription_created, - (self.SUB_PATH, self.TOPIC_PATH, None, None)) + self.assertEqual( + api._subscription_created, + (self.SUB_PATH, self.TOPIC_PATH, None, None, None, None)) def test_create_push_w_ack_deadline_w_alternate_client(self): RESPONSE = { @@ -211,7 +212,8 @@ def test_create_push_w_ack_deadline_w_alternate_client(self): self.assertEqual( api._subscription_created, - (self.SUB_PATH, self.TOPIC_PATH, self.DEADLINE, self.ENDPOINT)) + (self.SUB_PATH, self.TOPIC_PATH, self.DEADLINE, self.ENDPOINT, + None, None)) def test_exists_miss_w_bound_client(self): client = _Client(project=self.PROJECT) @@ -648,9 +650,12 @@ def test_check_iam_permissions_w_alternate_client(self): class _FauxSubscribererAPI(object): def subscription_create(self, subscription_path, topic_path, - ack_deadline=None, push_endpoint=None): + ack_deadline=None, push_endpoint=None, + retain_acked_messages=None, + message_retention_duration=None): self._subscription_created = ( - subscription_path, topic_path, ack_deadline, push_endpoint) + subscription_path, topic_path, ack_deadline, push_endpoint, + retain_acked_messages, message_retention_duration) return self._subscription_create_response def subscription_get(self, subscription_path):