From 7475da9cc1f93f5c26592300340237dc97f9dc20 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 4 May 2017 11:35:09 -0700 Subject: [PATCH] Adding subscription factory on Pub / Sub client. Fixes #3369. --- docs/pubsub_snippets.py | 22 ++++++++++++ pubsub/google/cloud/pubsub/client.py | 47 ++++++++++++++++++++++++++ pubsub/tests/unit/test_client.py | 50 ++++++++++++++++++++++------ 3 files changed, 108 insertions(+), 11 deletions(-) diff --git a/docs/pubsub_snippets.py b/docs/pubsub_snippets.py index 8584dedcd34dd..8c970c57cef3f 100644 --- a/docs/pubsub_snippets.py +++ b/docs/pubsub_snippets.py @@ -65,6 +65,28 @@ def do_something_with(sub): # pylint: disable=unused-argument # [END client_list_subscriptions] +@snippet +def client_topic(client, to_delete): # pylint: disable=unused-argument + """Topic factory.""" + TOPIC_NAME = 'topic_factory-%d' % (_millis(),) + + # [START client_topic] + topic = client.topic(TOPIC_NAME) + # [END client_topic] + + +@snippet +def client_subscription(client, to_delete): # pylint: disable=unused-argument + """Subscription factory.""" + SUBSCRIPTION_NAME = 'subscription_factory-%d' % (_millis(),) + + # [START client_subscription] + subscription = client.topic( + SUBSCRIPTION_NAME, ack_deadline=60, + retain_acked_messages=True) + # [END client_subscription] + + @snippet def topic_create(client, to_delete): """Create a topic.""" diff --git a/pubsub/google/cloud/pubsub/client.py b/pubsub/google/cloud/pubsub/client.py index cccecd27f4c40..1b38780da7fe2 100644 --- a/pubsub/google/cloud/pubsub/client.py +++ b/pubsub/google/cloud/pubsub/client.py @@ -22,6 +22,7 @@ from google.cloud.pubsub._http import _PublisherAPI as JSONPublisherAPI from google.cloud.pubsub._http import _SubscriberAPI as JSONSubscriberAPI from google.cloud.pubsub._http import _IAMPolicyAPI +from google.cloud.pubsub.subscription import Subscription from google.cloud.pubsub.topic import Topic try: @@ -225,6 +226,7 @@ def topic(self, name, timestamp_messages=False): .. literalinclude:: pubsub_snippets.py :start-after: [START client_topic] :end-before: [END client_topic] + :dedent: 4 :type name: str :param name: the name of the topic to be constructed. @@ -236,3 +238,48 @@ def topic(self, name, timestamp_messages=False): :returns: Topic created with the current client. """ return Topic(name, client=self, timestamp_messages=timestamp_messages) + + def subscription(self, name, ack_deadline=None, push_endpoint=None, + retain_acked_messages=None, + message_retention_duration=None): + """Creates a subscription bound to the current client. + + Example: + + .. literalinclude:: pubsub_snippets.py + :start-after: [START client_subscription] + :end-before: [END client_subscription] + :dedent: 4 + + :type name: str + :param name: the name of the subscription to be constructed. + + :type ack_deadline: int + :param ack_deadline: (Optional) The deadline (in seconds) by which + messages pulledfrom the back-end must be + acknowledged. + + :type push_endpoint: str + :param push_endpoint: + (Optional) URL to which messages will be pushed by the back-end. + If not set, the application must pull messages. + + :type retain_acked_messages: bool + :param retain_acked_messages: + (Optional) Whether to retain acked messages. If set, acked messages + are retained in the subscription's backlog for a duration indicated + by ``message_retention_duration``. + + :type message_retention_duration: :class:`datetime.timedelta` + :param message_retention_duration: + (Optional) Whether to retain acked messages. If set, acked messages + are retained in the subscription's backlog for a duration indicated + by ``message_retention_duration``. If unset, defaults to 7 days. + + :rtype: :class:`~google.cloud.pubsub.subscription.Subscription` + :returns: Subscription created with the current client. + """ + return Subscription( + name, ack_deadline=ack_deadline, push_endpoint=push_endpoint, + retain_acked_messages=retain_acked_messages, + message_retention_duration=message_retention_duration, client=self) diff --git a/pubsub/tests/unit/test_client.py b/pubsub/tests/unit/test_client.py index e251a0632dc85..f71e9ba21d0bd 100644 --- a/pubsub/tests/unit/test_client.py +++ b/pubsub/tests/unit/test_client.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import unittest import mock @@ -365,7 +366,17 @@ def test_list_subscriptions_w_missing_key(self): self.assertEqual(api._listed_subscriptions, (self.PROJECT, None, None)) - def test_topic(self): + def test_list_snapshots(self): + creds = _make_credentials() + client = self._make_one(project=self.PROJECT, credentials=creds) + client._connection = object() + api = _FauxSubscriberAPI() + response = api._list_snapshots_response = object() + client._subscriber_api = api + self.assertEqual(client.list_snapshots(), response) + self.assertEqual(api._listed_snapshots, (self.PROJECT, None, None)) + + def test_topic_factory(self): PROJECT = 'PROJECT' TOPIC_NAME = 'TOPIC_NAME' creds = _make_credentials() @@ -379,17 +390,33 @@ def test_topic(self): 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) self.assertFalse(new_topic.timestamp_messages) - def test_list_snapshots(self): + def test_subscription_factory(self): + project = 'PROJECT' creds = _make_credentials() - client = self._make_one(project=self.PROJECT, credentials=creds) - client._connection = object() - api = _FauxSubscriberAPI() - response = api._list_snapshots_response = object() - client._subscriber_api = api - self.assertEqual(client.list_snapshots(), response) - self.assertEqual(api._listed_snapshots, (self.PROJECT, None, None)) + client_obj = self._make_one(project=project, credentials=creds) + + sub_name = 'hoot-n-holler' + ack_deadline = 60, + push_endpoint = 'https://api.example.com/push' + message_retention_duration = datetime.timedelta(3600) + new_subscription = client_obj.subscription( + sub_name, ack_deadline=ack_deadline, + push_endpoint=push_endpoint, + retain_acked_messages=True, + message_retention_duration=message_retention_duration) + + self.assertEqual(new_subscription.name, sub_name) + self.assertIsNone(new_subscription.topic) + self.assertIs(new_subscription._client, client_obj) + self.assertEqual(new_subscription._project, project) + self.assertEqual(new_subscription.ack_deadline, ack_deadline) + self.assertEqual(new_subscription.push_endpoint, push_endpoint) + self.assertTrue(new_subscription.retain_acked_messages) + self.assertEqual( + new_subscription.message_retention_duration, + message_retention_duration) + - class _Iterator(object): def __init__(self, items, token): @@ -419,7 +446,8 @@ def list_subscriptions(self, project, page_size, page_token): def list_snapshots(self, project, page_size, page_token): self._listed_snapshots = (project, page_size, page_token) - return self._list_snapshots_response + return self._list_snapshots_response + class _Connection(object):