Skip to content

Commit

Permalink
Adding subscription factory on Pub / Sub client.
Browse files Browse the repository at this point in the history
Fixes #3369.
  • Loading branch information
dhermes committed May 4, 2017
1 parent 520637c commit 7475da9
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 11 deletions.
22 changes: 22 additions & 0 deletions docs/pubsub_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,28 @@ def do_something_with(sub): # pylint: disable=unused-argument
# [END client_list_subscriptions]


@snippet
def client_topic(client, to_delete): # pylint: disable=unused-argument
"""Topic factory."""
TOPIC_NAME = 'topic_factory-%d' % (_millis(),)

# [START client_topic]
topic = client.topic(TOPIC_NAME)
# [END client_topic]


@snippet
def client_subscription(client, to_delete): # pylint: disable=unused-argument
"""Subscription factory."""
SUBSCRIPTION_NAME = 'subscription_factory-%d' % (_millis(),)

# [START client_subscription]
subscription = client.topic(
SUBSCRIPTION_NAME, ack_deadline=60,
retain_acked_messages=True)
# [END client_subscription]


@snippet
def topic_create(client, to_delete):
"""Create a topic."""
Expand Down
47 changes: 47 additions & 0 deletions pubsub/google/cloud/pubsub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from google.cloud.pubsub._http import _PublisherAPI as JSONPublisherAPI
from google.cloud.pubsub._http import _SubscriberAPI as JSONSubscriberAPI
from google.cloud.pubsub._http import _IAMPolicyAPI
from google.cloud.pubsub.subscription import Subscription
from google.cloud.pubsub.topic import Topic

try:
Expand Down Expand Up @@ -225,6 +226,7 @@ def topic(self, name, timestamp_messages=False):
.. literalinclude:: pubsub_snippets.py
:start-after: [START client_topic]
:end-before: [END client_topic]
:dedent: 4
:type name: str
:param name: the name of the topic to be constructed.
Expand All @@ -236,3 +238,48 @@ def topic(self, name, timestamp_messages=False):
:returns: Topic created with the current client.
"""
return Topic(name, client=self, timestamp_messages=timestamp_messages)

def subscription(self, name, ack_deadline=None, push_endpoint=None,
retain_acked_messages=None,
message_retention_duration=None):
"""Creates a subscription bound to the current client.
Example:
.. literalinclude:: pubsub_snippets.py
:start-after: [START client_subscription]
:end-before: [END client_subscription]
:dedent: 4
:type name: str
:param name: the name of the subscription to be constructed.
:type ack_deadline: int
:param ack_deadline: (Optional) The deadline (in seconds) by which
messages pulledfrom the back-end must be
acknowledged.
:type push_endpoint: str
:param push_endpoint:
(Optional) URL to which messages will be pushed by the back-end.
If not set, the application must pull messages.
:type retain_acked_messages: bool
:param retain_acked_messages:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by ``message_retention_duration``.
:type message_retention_duration: :class:`datetime.timedelta`
:param message_retention_duration:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by ``message_retention_duration``. If unset, defaults to 7 days.
:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
:returns: Subscription created with the current client.
"""
return Subscription(
name, ack_deadline=ack_deadline, push_endpoint=push_endpoint,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration, client=self)
50 changes: 39 additions & 11 deletions pubsub/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import unittest

import mock
Expand Down Expand Up @@ -365,7 +366,17 @@ def test_list_subscriptions_w_missing_key(self):
self.assertEqual(api._listed_subscriptions,
(self.PROJECT, None, None))

def test_topic(self):
def test_list_snapshots(self):
creds = _make_credentials()
client = self._make_one(project=self.PROJECT, credentials=creds)
client._connection = object()
api = _FauxSubscriberAPI()
response = api._list_snapshots_response = object()
client._subscriber_api = api
self.assertEqual(client.list_snapshots(), response)
self.assertEqual(api._listed_snapshots, (self.PROJECT, None, None))

def test_topic_factory(self):
PROJECT = 'PROJECT'
TOPIC_NAME = 'TOPIC_NAME'
creds = _make_credentials()
Expand All @@ -379,17 +390,33 @@ def test_topic(self):
'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME))
self.assertFalse(new_topic.timestamp_messages)

def test_list_snapshots(self):
def test_subscription_factory(self):
project = 'PROJECT'
creds = _make_credentials()
client = self._make_one(project=self.PROJECT, credentials=creds)
client._connection = object()
api = _FauxSubscriberAPI()
response = api._list_snapshots_response = object()
client._subscriber_api = api
self.assertEqual(client.list_snapshots(), response)
self.assertEqual(api._listed_snapshots, (self.PROJECT, None, None))
client_obj = self._make_one(project=project, credentials=creds)

sub_name = 'hoot-n-holler'
ack_deadline = 60,
push_endpoint = 'https://api.example.com/push'
message_retention_duration = datetime.timedelta(3600)
new_subscription = client_obj.subscription(
sub_name, ack_deadline=ack_deadline,
push_endpoint=push_endpoint,
retain_acked_messages=True,
message_retention_duration=message_retention_duration)

self.assertEqual(new_subscription.name, sub_name)
self.assertIsNone(new_subscription.topic)
self.assertIs(new_subscription._client, client_obj)
self.assertEqual(new_subscription._project, project)
self.assertEqual(new_subscription.ack_deadline, ack_deadline)
self.assertEqual(new_subscription.push_endpoint, push_endpoint)
self.assertTrue(new_subscription.retain_acked_messages)
self.assertEqual(
new_subscription.message_retention_duration,
message_retention_duration)



class _Iterator(object):

def __init__(self, items, token):
Expand Down Expand Up @@ -419,7 +446,8 @@ def list_subscriptions(self, project, page_size, page_token):

def list_snapshots(self, project, page_size, page_token):
self._listed_snapshots = (project, page_size, page_token)
return self._list_snapshots_response
return self._list_snapshots_response


class _Connection(object):

Expand Down

0 comments on commit 7475da9

Please sign in to comment.