Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new subscription fields #3307

Merged
merged 1 commit into from
Apr 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions core/google/cloud/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,9 @@ def _timedelta_to_duration_pb(timedelta_val):
:rtype: :class:`google.protobuf.duration_pb2.Duration`
:returns: A duration object equivalent to the time delta.
"""
seconds_decimal = timedelta_val.total_seconds()
# Truncate the parts other than the integer.
seconds = int(seconds_decimal)
if seconds_decimal < 0:
signed_micros = timedelta_val.microseconds - 10**6
else:
signed_micros = timedelta_val.microseconds
# Convert nanoseconds to microseconds.
nanos = 1000 * signed_micros
return duration_pb2.Duration(seconds=seconds, nanos=nanos)
duration_pb = duration_pb2.Duration()
duration_pb.FromTimedelta(timedelta_val)
return duration_pb


def _duration_pb_to_timedelta(duration_pb):
Expand Down
26 changes: 22 additions & 4 deletions pubsub/google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from google.cloud._helpers import _to_bytes
from google.cloud._helpers import _pb_timestamp_to_rfc3339
from google.cloud._helpers import _timedelta_to_duration_pb
from google.cloud._helpers import make_secure_channel
from google.cloud._http import DEFAULT_USER_AGENT
from google.cloud.exceptions import Conflict
Expand Down Expand Up @@ -276,7 +277,9 @@ def list_subscriptions(self, project, page_size=0, page_token=None):
return GAXIterator(self._client, page_iter, item_to_value)

def subscription_create(self, subscription_path, topic_path,
ack_deadline=None, push_endpoint=None):
ack_deadline=None, push_endpoint=None,
retain_acked_messages=None,
message_retention_duration=None):
"""API call: create a subscription

See:
Expand All @@ -302,6 +305,18 @@ def subscription_create(self, subscription_path, topic_path,
(Optional) URL to which messages will be pushed by the back-end.
If not set, the application must pull messages.

:type retain_acked_messages: bool
:param retain_acked_messages:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`.

:type message_retention_duration: :class:`datetime.timedelta`
:param message_retention_duration:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`. If unset, defaults to 7 days.

:rtype: dict
:returns: ``Subscription`` resource returned from the API.
"""
Expand All @@ -310,13 +325,16 @@ def subscription_create(self, subscription_path, topic_path,
else:
push_config = None

if ack_deadline is None:
ack_deadline = 0
if message_retention_duration is not None:
message_retention_duration = _timedelta_to_duration_pb(
message_retention_duration)

try:
sub_pb = self._gax_api.create_subscription(
subscription_path, topic_path,
push_config=push_config, ack_deadline_seconds=ack_deadline)
push_config=push_config, ack_deadline_seconds=ack_deadline,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
raise Conflict(topic_path)
Expand Down
27 changes: 26 additions & 1 deletion pubsub/google/cloud/pubsub/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import os

from google.cloud import _http
from google.cloud._helpers import _timedelta_to_duration_pb
from google.cloud.environment_vars import PUBSUB_EMULATOR
from google.cloud.iterator import HTTPIterator

Expand Down Expand Up @@ -295,7 +296,9 @@ def list_subscriptions(self, project, page_size=None, page_token=None):
extra_params=extra_params)

def subscription_create(self, subscription_path, topic_path,
ack_deadline=None, push_endpoint=None):
ack_deadline=None, push_endpoint=None,
retain_acked_messages=None,
message_retention_duration=None):
"""API call: create a subscription

See:
Expand All @@ -321,6 +324,18 @@ def subscription_create(self, subscription_path, topic_path,
(Optional) URL to which messages will be pushed by the back-end.
If not set, the application must pull messages.

:type retain_acked_messages: bool
:param retain_acked_messages:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`.

:type message_retention_duration: :class:`datetime.timedelta`
:param message_retention_duration:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`. If unset, defaults to 7 days.

:rtype: dict
:returns: ``Subscription`` resource returned from the API.
"""
Expand All @@ -333,6 +348,16 @@ def subscription_create(self, subscription_path, topic_path,
if push_endpoint is not None:
resource['pushConfig'] = {'pushEndpoint': push_endpoint}

if retain_acked_messages is not None:
resource['retainAckedMessages'] = retain_acked_messages

if message_retention_duration is not None:
pb = _timedelta_to_duration_pb(message_retention_duration)
resource['messageRetentionDuration'] = {
'seconds': pb.seconds,
'nanos': pb.nanos
}

return self.api_request(method='PUT', path=path, data=resource)

def subscription_get(self, subscription_path):
Expand Down
39 changes: 35 additions & 4 deletions pubsub/google/cloud/pubsub/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Define API Subscriptions."""

import datetime

from google.cloud.exceptions import NotFound
from google.cloud.pubsub._helpers import topic_name_from_path
from google.cloud.pubsub.iam import Policy
Expand Down Expand Up @@ -43,6 +45,19 @@ class Subscription(object):
(Optional) URL to which messages will be pushed by the back-end. If
not set, the application must pull messages.

:type retain_acked_messages: bool
:param retain_acked_messages:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`.

:type message_retention_duration: :class:`datetime.timedelta`
:param message_retention_duration:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`. If unset, defaults to 7 days.


:type client: :class:`~google.cloud.pubsub.client.Client`
:param client:
(Optional) The client to use. If not passed, falls back to the
Expand All @@ -57,6 +72,7 @@ class Subscription(object):
"""

def __init__(self, name, topic=None, ack_deadline=None, push_endpoint=None,
retain_acked_messages=None, message_retention_duration=None,
client=None):

if client is None and topic is None:
Expand All @@ -71,6 +87,8 @@ def __init__(self, name, topic=None, ack_deadline=None, push_endpoint=None,
self._project = self._client.project
self.ack_deadline = ack_deadline
self.push_endpoint = push_endpoint
self.retain_acked_messages = retain_acked_messages
self.message_retention_duration = message_retention_duration

@classmethod
def from_api_repr(cls, resource, client, topics=None):
Expand Down Expand Up @@ -107,10 +125,21 @@ def from_api_repr(cls, resource, client, topics=None):
ack_deadline = resource.get('ackDeadlineSeconds')
push_config = resource.get('pushConfig', {})
push_endpoint = push_config.get('pushEndpoint')
retain_acked_messages = resource.get('retainAckedMessages')
resource_duration = resource.get('duration', {})
message_retention_duration = datetime.timedelta(
seconds=resource_duration.get('seconds', 0),
microseconds=resource_duration.get('nanos', 0) / 1000)
if topic is None:
return cls(name, ack_deadline=ack_deadline,
push_endpoint=push_endpoint, client=client)
return cls(name, topic, ack_deadline, push_endpoint)
push_endpoint=push_endpoint,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration,
client=client)
return cls(name, topic=topic, ack_deadline=ack_deadline,
push_endpoint=push_endpoint,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration)

@property
def project(self):
Expand Down Expand Up @@ -182,8 +211,10 @@ def create(self, client=None):
client = self._require_client(client)
api = client.subscriber_api
api.subscription_create(
self.full_name, self.topic.full_name, self.ack_deadline,
self.push_endpoint)
self.full_name, self.topic.full_name,
ack_deadline=self.ack_deadline, push_endpoint=self.push_endpoint,
retain_acked_messages=self.retain_acked_messages,
message_retention_duration=self.message_retention_duration)

def exists(self, client=None):
"""API call: test existence of the subscription via a GET request
Expand Down
22 changes: 19 additions & 3 deletions pubsub/google/cloud/pubsub/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def __init__(self, name, client, timestamp_messages=False):
self._client = client
self.timestamp_messages = timestamp_messages

def subscription(self, name, ack_deadline=None, push_endpoint=None):
def subscription(self, name, ack_deadline=None, push_endpoint=None,
retain_acked_messages=None,
message_retention_duration=None):
"""Creates a subscription bound to the current topic.

Example: pull-mode subcription, default paramter values
Expand Down Expand Up @@ -85,11 +87,25 @@ def subscription(self, name, ack_deadline=None, push_endpoint=None):
back-end. If not set, the application must pull
messages.

:type retain_acked_messages: bool
:param retain_acked_messages:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`.

:type message_retention_duration: :class:`datetime.timedelta`
:param message_retention_duration:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by `message_retention_duration`. If unset, defaults to 7 days.

:rtype: :class:`Subscription`
:returns: The subscription created with the passed in arguments.
"""
return Subscription(name, self, ack_deadline=ack_deadline,
push_endpoint=push_endpoint)
return Subscription(
name, self, ack_deadline=ack_deadline, push_endpoint=push_endpoint,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration)

@classmethod
def from_api_repr(cls, resource, client):
Expand Down
24 changes: 24 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import os
import unittest

Expand Down Expand Up @@ -155,6 +156,26 @@ def test_create_subscription_w_ack_deadline(self):
self.assertEqual(subscription.ack_deadline, 120)
self.assertIs(subscription.topic, topic)

def test_create_subscription_w_message_retention(self):
TOPIC_NAME = 'create-sub-ack' + unique_resource_id('-')
topic = Config.CLIENT.topic(TOPIC_NAME)
self.assertFalse(topic.exists())
topic.create()
self.to_delete.append(topic)
SUBSCRIPTION_NAME = 'subscribing-now' + unique_resource_id()
duration = datetime.timedelta(hours=12)
subscription = topic.subscription(
SUBSCRIPTION_NAME, retain_acked_messages=True,
message_retention_duration=duration)
self.assertFalse(subscription.exists())
subscription.create()
self.to_delete.append(subscription)
self.assertTrue(subscription.exists())
self.assertEqual(subscription.name, SUBSCRIPTION_NAME)
self.assertTrue(subscription.retain_acked_messages)
self.assertEqual(subscription.message_retention_duration, duration)
self.assertIs(subscription.topic, topic)

def test_list_subscriptions(self):
TOPIC_NAME = 'list-sub' + unique_resource_id('-')
topic = Config.CLIENT.topic(TOPIC_NAME)
Expand Down Expand Up @@ -283,3 +304,6 @@ def test_subscription_iam_policy(self):
policy.viewers.add(policy.user('jjg@google.com'))
new_policy = subscription.set_iam_policy(policy)
self.assertEqual(new_policy.viewers, policy.viewers)

# TODO(geigerj): set retain_acked_messages=True in snapshot system test once
# PR #3303 is merged
Loading