From 867ffe9a89d2d883603a92dc88610319851bbf00 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 27 Apr 2016 15:33:29 -0400 Subject: [PATCH 01/13] Normalize JSON API method docstrings. --- gcloud/pubsub/connection.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/gcloud/pubsub/connection.py b/gcloud/pubsub/connection.py index 196d58cea7a6..525cd7b9d451 100644 --- a/gcloud/pubsub/connection.py +++ b/gcloud/pubsub/connection.py @@ -101,7 +101,7 @@ def __init__(self, connection): self._connection = connection def list_topics(self, project, page_size=None, page_token=None): - """List topics for the project associated with this API. + """API call: list topics for a given project See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/list @@ -138,7 +138,7 @@ def list_topics(self, project, page_size=None, page_token=None): return resp.get('topics', ()), resp.get('nextPageToken') def topic_create(self, topic_path): - """API call: create a topic via a PUT request + """API call: create a topic See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create @@ -154,7 +154,7 @@ def topic_create(self, topic_path): return conn.api_request(method='PUT', path='/%s' % (topic_path,)) def topic_get(self, topic_path): - """API call: retrieve a topic via a GET request + """API call: retrieve a topic See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get @@ -170,7 +170,7 @@ def topic_get(self, topic_path): return conn.api_request(method='GET', path='/%s' % (topic_path,)) def topic_delete(self, topic_path): - """API call: delete a topic via a DELETE request + """API call: delete a topic See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/delete @@ -183,7 +183,7 @@ def topic_delete(self, topic_path): conn.api_request(method='DELETE', path='/%s' % (topic_path,)) def topic_publish(self, topic_path, messages): - """API call: publish a message to a topic via a POST request + """API call: publish one or more messages to a topic See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish @@ -206,7 +206,7 @@ def topic_publish(self, topic_path, messages): def topic_list_subscriptions(self, topic_path, page_size=None, page_token=None): - """API call: list subscriptions bound to a topic via a GET request + """API call: list subscriptions bound to a topic See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list @@ -253,7 +253,7 @@ def __init__(self, connection): self._connection = connection def list_subscriptions(self, project, page_size=None, page_token=None): - """List subscriptions for the project associated with this API. + """API call: list subscriptions for a given project See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/list @@ -291,7 +291,7 @@ def list_subscriptions(self, project, page_size=None, page_token=None): def subscription_create(self, subscription_path, topic_path, ack_deadline=None, push_endpoint=None): - """API call: create a subscription via a PUT request + """API call: create a subscription See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/create @@ -331,7 +331,7 @@ def subscription_create(self, subscription_path, topic_path, return conn.api_request(method='PUT', path=path, data=resource) def subscription_get(self, subscription_path): - """API call: retrieve a subscription via a GET request + """API call: retrieve a subscription See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/get @@ -349,7 +349,7 @@ def subscription_get(self, subscription_path): return conn.api_request(method='GET', path=path) def subscription_delete(self, subscription_path): - """API call: delete a subscription via a DELETE request + """API call: delete a subscription See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/delete @@ -365,7 +365,7 @@ def subscription_delete(self, subscription_path): def subscription_modify_push_config(self, subscription_path, push_endpoint): - """API call: update push config of a subscription via a POST request + """API call: update push config of a subscription See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig @@ -387,7 +387,7 @@ def subscription_modify_push_config(self, subscription_path, def subscription_pull(self, subscription_path, return_immediately=False, max_messages=1): - """API call: update push config of a subscription via a POST request + """API call: retrieve messages for a subscription See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig @@ -419,7 +419,7 @@ def subscription_pull(self, subscription_path, return_immediately=False, return response.get('receivedMessages', ()) def subscription_acknowledge(self, subscription_path, ack_ids): - """API call: acknowledge retrieved messages for the subscription. + """API call: acknowledge retrieved messages See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig @@ -441,7 +441,7 @@ def subscription_acknowledge(self, subscription_path, ack_ids): def subscription_modify_ack_deadline(self, subscription_path, ack_ids, ack_deadline): - """API call: acknowledge retrieved messages for the subscription. + """API call: update ack deadline for retrieved messages See: https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyAckDeadline @@ -478,7 +478,7 @@ def __init__(self, connection): self._connection = connection def get_iam_policy(self, target_path): - """Fetch the IAM policy for the target. + """API call: fetch the IAM policy for the target See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/getIamPolicy @@ -495,7 +495,7 @@ def get_iam_policy(self, target_path): return conn.api_request(method='GET', path=path) def set_iam_policy(self, target_path, policy): - """Update the IAM policy for the target. + """API call: update the IAM policy for the target See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/setIamPolicy @@ -516,7 +516,7 @@ def set_iam_policy(self, target_path, policy): return conn.api_request(method='POST', path=path, data=wrapped) def test_iam_permissions(self, target_path, permissions): - """Update the IAM policy for the target. + """API call: test permissions See: https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/testIamPermissions From 6b6b7acf4d2d28e1f0ef63cb6303add76b73db26 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Wed, 27 Apr 2016 17:10:15 -0400 Subject: [PATCH 02/13] Implement a GAX version of '_PublisherAPI'. Note: paging methods ('list_topics' and 'list_topic_subscriptions') are only partially implemented, because the GAX API does not permit passing in a page token (it can be coerced into *returning* only a single page of results, and its token, though). See: - https://github.com/googleapis/gax-python/issues/86 - https://github.com/googleapis/gax-python/pull/94 --- gcloud/pubsub/_gax.py | 178 +++++++++++++++++++++++ gcloud/pubsub/test__gax.py | 282 +++++++++++++++++++++++++++++++++++++ 2 files changed, 460 insertions(+) create mode 100644 gcloud/pubsub/_gax.py create mode 100644 gcloud/pubsub/test__gax.py diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py new file mode 100644 index 000000000000..96a78a3c2441 --- /dev/null +++ b/gcloud/pubsub/_gax.py @@ -0,0 +1,178 @@ +# Copyright 2016 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""GAX wrapper for Pubsub API requests.""" + +try: + # pylint: disable=no-name-in-module + from google.gax import CallOptions + from google.gax.errors import GaxError + from google.pubsub.v1.pubsub_pb2 import PubsubMessage + # pylint: enable=no-name-in-module +except ImportError: # pragma: NO COVER + _HAVE_GAX = False +else: + _HAVE_GAX = True + + from gcloud.exceptions import Conflict + from gcloud.exceptions import NotFound + + class _PublisherAPI(object): + """Helper mapping publisher-related APIs. + + :type gax_api: :class:`google.pubsub.v1.publisher_api.PublisherApi` + :param gax_api: API object used to make GAX requests. + """ + def __init__(self, gax_api): + self._gax_api = gax_api + + def list_topics(self, project): + """List topics for the project associated with this API. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/list + + :type project: string + :param project: project ID + + :rtype: tuple, (list, str) + :returns: list of ``Topic`` resource dicts, plus a + "next page token" string: if not None, indicates that + more topics can be retrieved with another call (pass that + value as ``page_token``). + """ + options = CallOptions(is_page_streaming=False) + path = 'projects/%s' % (project,) + response = self._gax_api.list_topics(path, options) + topics = [{'name': topic_pb.name} for topic_pb in response.topics] + return topics, response.next_page_token + + def topic_create(self, topic_path): + """API call: create a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create + + :type topic_path: string + :param topic_path: fully-qualfied path of the new topic, in format + ``projects//topics/``. + + :rtype: dict + :returns: ``Topic`` resource returned from the API. + :raises: :exc:`gcloud.exceptions.Conflict` if the topic already + exists + """ + try: + topic_pb = self._gax_api.create_topic(topic_path) + except GaxError: + raise Conflict(topic_path) + return {'name': topic_pb.name} + + def topic_get(self, topic_path): + """API call: retrieve a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get + + :type topic_path: string + :param topic_path: fully-qualfied path of the topic, in format + ``projects//topics/``. + + :rtype: dict + :returns: ``Topic`` resource returned from the API. + :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not + exist + """ + try: + topic_pb = self._gax_api.get_topic(topic_path) + except GaxError: + raise NotFound(topic_path) + return {'name': topic_pb.name} + + def topic_delete(self, topic_path): + """API call: delete a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create + + :type topic_path: string + :param topic_path: fully-qualfied path of the new topic, in format + ``projects//topics/``. + + :rtype: dict + :returns: ``Topic`` resource returned from the API. + """ + try: + self._gax_api.delete_topic(topic_path) + except GaxError: + pass + + def topic_publish(self, topic_path, messages): + """API call: publish one or more messages to a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish + + :type topic_path: string + :param topic_path: fully-qualfied path of the topic, in format + ``projects//topics/``. + + :type messages: list of dict + :param messages: messages to be published. + + :rtype: list of string + :returns: list of opaque IDs for published messages. + :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not + exist + """ + message_pbs = [_message_pb_from_dict(message) + for message in messages] + try: + response = self._gax_api.publish(topic_path, message_pbs) + except GaxError: + raise NotFound(topic_path) + return response.message_ids + + def topic_list_subscriptions(self, topic_path): + """API call: list subscriptions bound to a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list + + :type topic_path: string + :param topic_path: fully-qualfied path of the topic, in format + ``projects//topics/``. + + :rtype: list of strings + :returns: fully-qualified names of subscriptions for the supplied + topic. + :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not + exist + """ + options = CallOptions(is_page_streaming=False) + try: + response = self._gax_api.list_topic_subscriptions( + topic_path, options) + except GaxError: + raise NotFound(topic_path) + subs = [{'topic': topic_path, 'name': subscription} + for subscription in response.subscriptions] + return subs, response.next_page_token + + +def _message_pb_from_dict(message): + data = message['data'] + if not isinstance(data, str): + data = data.encode('ascii') + return PubsubMessage(data=data, attributes=message['attributes']) diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py new file mode 100644 index 000000000000..95801d5a97b3 --- /dev/null +++ b/gcloud/pubsub/test__gax.py @@ -0,0 +1,282 @@ +# Copyright 2016 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +try: + # pylint: disable=no-name-in-module + from gcloud.pubsub._gax import _HAVE_GAX + # pylint: enable=no-name-in-module +except ImportError: # pragma: NO COVER + _HAVE_GAX = False + + +@unittest2.skipUnless(_HAVE_GAX, 'No gax-python') +class Test_PublisherAPI(unittest2.TestCase): + PROJECT = 'PROJECT' + PROJECT_PATH = 'projects/%s' % (PROJECT,) + LIST_TOPICS_PATH = '%s/topics' % (PROJECT_PATH,) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + LIST_TOPIC_SUBSCRIPTIONS_PATH = '%s/subscriptions' % (TOPIC_PATH,) + SUB_NAME = 'sub_name' + SUB_PATH = '%s/subscriptions/%s' % (TOPIC_PATH, SUB_NAME) + + def _getTargetClass(self): + from gcloud.pubsub._gax import _PublisherAPI + return _PublisherAPI + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor(self): + gax_api = _GAXPublisherAPI() + api = self._makeOne(gax_api) + self.assertTrue(api._gax_api is gax_api) + + def test_list_topics_no_paging(self): + response = _ListTopicsResponsePB([_TopicPB(self.TOPIC_PATH)]) + gax_api = _GAXPublisherAPI(_list_topics_response=response) + api = self._makeOne(gax_api) + + topics, next_token = api.list_topics(self.PROJECT) + + self.assertEqual(len(topics), 1) + topic = topics[0] + self.assertIsInstance(topic, dict) + self.assertEqual(topic['name'], self.TOPIC_PATH) + self.assertEqual(next_token, None) + + name, options = gax_api._list_topics_called_with + self.assertEqual(name, self.PROJECT_PATH) + self.assertFalse(options.is_page_streaming) + + def test_topic_create(self): + topic_pb = _TopicPB(self.TOPIC_PATH) + gax_api = _GAXPublisherAPI(_create_topic_response=topic_pb) + api = self._makeOne(gax_api) + + resource = api.topic_create(self.TOPIC_PATH) + + self.assertEqual(resource, {'name': self.TOPIC_PATH}) + topic_path, options = gax_api._create_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_create_already_exists(self): + from gcloud.exceptions import Conflict + gax_api = _GAXPublisherAPI(_create_topic_conflict=True) + api = self._makeOne(gax_api) + + with self.assertRaises(Conflict): + api.topic_create(self.TOPIC_PATH) + + topic_path, options = gax_api._create_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_get_hit(self): + topic_pb = _TopicPB(self.TOPIC_PATH) + gax_api = _GAXPublisherAPI(_get_topic_response=topic_pb) + api = self._makeOne(gax_api) + + resource = api.topic_get(self.TOPIC_PATH) + + self.assertEqual(resource, {'name': self.TOPIC_PATH}) + topic_path, options = gax_api._get_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_get_miss(self): + from gcloud.exceptions import NotFound + gax_api = _GAXPublisherAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.topic_get(self.TOPIC_PATH) + + topic_path, options = gax_api._get_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_delete_hit(self): + gax_api = _GAXPublisherAPI(_delete_topic_ok=True) + api = self._makeOne(gax_api) + + api.topic_delete(self.TOPIC_PATH) + + topic_path, options = gax_api._delete_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_delete_miss(self): + gax_api = _GAXPublisherAPI(_delete_topic_ok=False) + api = self._makeOne(gax_api) + + api.topic_delete(self.TOPIC_PATH) + + topic_path, options = gax_api._delete_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + + def test_topic_publish_hit(self): + import base64 + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, 'attributes': {}} + response = _PublishResponsePB([MSGID]) + gax_api = _GAXPublisherAPI(_publish_response=response) + api = self._makeOne(gax_api) + + resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + self.assertEqual(resource, [MSGID]) + topic_path, message_pbs, options = gax_api._publish_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + message_pb, = message_pbs + self.assertEqual(message_pb.data, B64) + self.assertEqual(message_pb.attributes, {}) + self.assertEqual(options, None) + + def test_topic_publish_miss_w_attrs_w_bytes_payload(self): + import base64 + from gcloud.exceptions import NotFound + PAYLOAD = u'This is the message text' + B64 = base64.b64encode(PAYLOAD) + MESSAGE = {'data': B64, 'attributes': {'foo': 'bar'}} + gax_api = _GAXPublisherAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + topic_path, message_pbs, options = gax_api._publish_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + message_pb, = message_pbs + self.assertEqual(message_pb.data, B64) + self.assertEqual(message_pb.attributes, {'foo': 'bar'}) + self.assertEqual(options, None) + + def test_topic_list_subscriptions_no_paging(self): + response = _ListTopicSubscriptionsResponsePB([self.SUB_PATH]) + gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response) + api = self._makeOne(gax_api) + + subscriptions, next_token = api.topic_list_subscriptions( + self.TOPIC_PATH) + + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertIsInstance(subscription, dict) + self.assertEqual(subscription['name'], self.SUB_PATH) + self.assertEqual(subscription['topic'], self.TOPIC_PATH) + self.assertEqual(next_token, None) + + topic_path, options = gax_api._list_topic_subscriptions_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertFalse(options.is_page_streaming) + + def test_topic_list_subscriptions_miss(self): + from gcloud.exceptions import NotFound + gax_api = _GAXPublisherAPI() + api = self._makeOne(gax_api) + + with self.assertRaises(NotFound): + api.topic_list_subscriptions(self.TOPIC_PATH) + + topic_path, options = gax_api._list_topic_subscriptions_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertFalse(options.is_page_streaming) + + +class _GAXPublisherAPI(object): + + _create_topic_conflict = False + + def __init__(self, **kw): + self.__dict__.update(kw) + + def list_topics(self, name, options): + self._list_topics_called_with = name, options + return self._list_topics_response + + def create_topic(self, name, options=None): + # pylint: disable=no-name-in-module + from google.gax.errors import GaxError + self._create_topic_called_with = name, options + if self._create_topic_conflict: + raise GaxError('conflict') + return self._create_topic_response + + def get_topic(self, name, options=None): + # pylint: disable=no-name-in-module + from google.gax.errors import GaxError + self._get_topic_called_with = name, options + try: + return self._get_topic_response + except AttributeError: + raise GaxError('miss') + + def delete_topic(self, name, options=None): + # pylint: disable=no-name-in-module + from google.gax.errors import GaxError + self._delete_topic_called_with = name, options + if not self._delete_topic_ok: + raise GaxError('miss') + + def publish(self, topic, messages, options=None): + # pylint: disable=no-name-in-module + from google.gax.errors import GaxError + self._publish_called_with = topic, messages, options + try: + return self._publish_response + except AttributeError: + raise GaxError('miss') + + def list_topic_subscriptions(self, topic, options=None): + # pylint: disable=no-name-in-module + from google.gax.errors import GaxError + self._list_topic_subscriptions_called_with = topic, options + try: + return self._list_topic_subscriptions_response + except AttributeError: + raise GaxError('miss') + + +class _TopicPB(object): + + def __init__(self, name): + self.name = name + + +class _PublishResponsePB(object): + + def __init__(self, message_ids): + self.message_ids = message_ids + + +class _ListTopicsResponsePB(object): + + def __init__(self, topic_pbs, next_page_token=None): + self.topics = topic_pbs + self.next_page_token = next_page_token + + +class _ListTopicSubscriptionsResponsePB(object): + + def __init__(self, subscriptions, next_page_token=None): + self.subscriptions = subscriptions + self.next_page_token = next_page_token From 9962e363384564a3e78c803bf6f687e25a0486ad Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 28 Apr 2016 12:44:09 -0400 Subject: [PATCH 03/13] s/fully-qualfied/fully-qualified/g Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1764#discussion_r61443954 --- gcloud/pubsub/_gax.py | 10 +++++----- gcloud/pubsub/connection.py | 26 +++++++++++++------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 96a78a3c2441..98a7f14d821c 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -65,7 +65,7 @@ def topic_create(self, topic_path): https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create :type topic_path: string - :param topic_path: fully-qualfied path of the new topic, in format + :param topic_path: fully-qualified path of the new topic, in format ``projects//topics/``. :rtype: dict @@ -86,7 +86,7 @@ def topic_get(self, topic_path): https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get :type topic_path: string - :param topic_path: fully-qualfied path of the topic, in format + :param topic_path: fully-qualified path of the topic, in format ``projects//topics/``. :rtype: dict @@ -107,7 +107,7 @@ def topic_delete(self, topic_path): https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create :type topic_path: string - :param topic_path: fully-qualfied path of the new topic, in format + :param topic_path: fully-qualified path of the new topic, in format ``projects//topics/``. :rtype: dict @@ -125,7 +125,7 @@ def topic_publish(self, topic_path, messages): https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish :type topic_path: string - :param topic_path: fully-qualfied path of the topic, in format + :param topic_path: fully-qualified path of the topic, in format ``projects//topics/``. :type messages: list of dict @@ -151,7 +151,7 @@ def topic_list_subscriptions(self, topic_path): https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list :type topic_path: string - :param topic_path: fully-qualfied path of the topic, in format + :param topic_path: fully-qualified path of the topic, in format ``projects//topics/``. :rtype: list of strings diff --git a/gcloud/pubsub/connection.py b/gcloud/pubsub/connection.py index 525cd7b9d451..76c232e45a10 100644 --- a/gcloud/pubsub/connection.py +++ b/gcloud/pubsub/connection.py @@ -144,7 +144,7 @@ def topic_create(self, topic_path): https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create :type topic_path: string - :param topic_path: the fully-qualfied path of the new topic, in format + :param topic_path: the fully-qualified path of the new topic, in format ``projects//topics/``. :rtype: dict @@ -160,7 +160,7 @@ def topic_get(self, topic_path): https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get :type topic_path: string - :param topic_path: the fully-qualfied path of the topic, in format + :param topic_path: the fully-qualified path of the topic, in format ``projects//topics/``. :rtype: dict @@ -176,7 +176,7 @@ def topic_delete(self, topic_path): https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/delete :type topic_path: string - :param topic_path: the fully-qualfied path of the topic, in format + :param topic_path: the fully-qualified path of the topic, in format ``projects//topics/``. """ conn = self._connection @@ -189,7 +189,7 @@ def topic_publish(self, topic_path, messages): https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish :type topic_path: string - :param topic_path: the fully-qualfied path of the topic, in format + :param topic_path: the fully-qualified path of the topic, in format ``projects//topics/``. :type messages: list of dict @@ -212,7 +212,7 @@ def topic_list_subscriptions(self, topic_path, page_size=None, https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list :type topic_path: string - :param topic_path: the fully-qualfied path of the topic, in format + :param topic_path: the fully-qualified path of the topic, in format ``projects//topics/``. :type page_size: int @@ -297,12 +297,12 @@ def subscription_create(self, subscription_path, topic_path, https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/create :type subscription_path: string - :param subscription_path: the fully-qualfied path of the new + :param subscription_path: the fully-qualified path of the new subscription, in format ``projects//subscriptions/``. :type topic_path: string - :param topic_path: the fully-qualfied path of the topic being + :param topic_path: the fully-qualified path of the topic being subscribed, in format ``projects//topics/``. @@ -337,7 +337,7 @@ def subscription_get(self, subscription_path): https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/get :type subscription_path: string - :param subscription_path: the fully-qualfied path of the subscription, + :param subscription_path: the fully-qualified path of the subscription, in format ``projects//subscriptions/``. @@ -355,7 +355,7 @@ def subscription_delete(self, subscription_path): https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/delete :type subscription_path: string - :param subscription_path: the fully-qualfied path of the subscription, + :param subscription_path: the fully-qualified path of the subscription, in format ``projects//subscriptions/``. """ @@ -371,7 +371,7 @@ def subscription_modify_push_config(self, subscription_path, https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig :type subscription_path: string - :param subscription_path: the fully-qualfied path of the new + :param subscription_path: the fully-qualified path of the new subscription, in format ``projects//subscriptions/``. @@ -393,7 +393,7 @@ def subscription_pull(self, subscription_path, return_immediately=False, https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig :type subscription_path: string - :param subscription_path: the fully-qualfied path of the new + :param subscription_path: the fully-qualified path of the new subscription, in format ``projects//subscriptions/``. @@ -425,7 +425,7 @@ def subscription_acknowledge(self, subscription_path, ack_ids): https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig :type subscription_path: string - :param subscription_path: the fully-qualfied path of the new + :param subscription_path: the fully-qualified path of the new subscription, in format ``projects//subscriptions/``. @@ -447,7 +447,7 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids, https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyAckDeadline :type subscription_path: string - :param subscription_path: the fully-qualfied path of the new + :param subscription_path: the fully-qualified path of the new subscription, in format ``projects//subscriptions/``. From 03d55a12efdb8ec501dfe370888e9b8fcfb03f44 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 28 Apr 2016 12:49:13 -0400 Subject: [PATCH 04/13] Use 'gcloud._helpers._to_bytes'. Also, add docstring for private helper function. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1764#discussion_r61444148 https://github.com/GoogleCloudPlatform/gcloud-python/pull/1764#discussion_r61444278 --- gcloud/pubsub/_gax.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 98a7f14d821c..39531477cdd3 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -27,6 +27,7 @@ from gcloud.exceptions import Conflict from gcloud.exceptions import NotFound + from gcloud._helpers import _to_bytes class _PublisherAPI(object): """Helper mapping publisher-related APIs. @@ -172,7 +173,6 @@ def topic_list_subscriptions(self, topic_path): def _message_pb_from_dict(message): - data = message['data'] - if not isinstance(data, str): - data = data.encode('ascii') - return PubsubMessage(data=data, attributes=message['attributes']) + """Helper for :meth:`_PublisherAPI.topic_publish`.""" + return PubsubMessage(data=_to_bytes(message['data']), + attributes=message['attributes']) From ec045695504db81c068c9e5e5ef63024af5e5158 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 28 Apr 2016 13:09:13 -0400 Subject: [PATCH 05/13] Install 'gcloud[grpc]' deps for coveralls run. --- tox.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/tox.ini b/tox.ini index 76d7576a249a..1a7bfef15472 100644 --- a/tox.ini +++ b/tox.ini @@ -54,6 +54,7 @@ deps = [testenv:coveralls] basepython = {[testenv:cover]basepython} commands = + pip install gcloud[grpc] {[testenv]covercmd} coveralls deps = From 6b30128100189e2359bfb73b34ec76327be5e406 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 3 May 2016 16:25:13 -0400 Subject: [PATCH 06/13] Add tests for miss/conflict failure cases to JSON Topic API helper. --- gcloud/pubsub/test_connection.py | 82 ++++++++++++++++++++++++++++++-- 1 file changed, 78 insertions(+), 4 deletions(-) diff --git a/gcloud/pubsub/test_connection.py b/gcloud/pubsub/test_connection.py index 03895dda0bab..fe641aad9e7c 100644 --- a/gcloud/pubsub/test_connection.py +++ b/gcloud/pubsub/test_connection.py @@ -199,7 +199,20 @@ def test_topic_create(self): path = '/%s' % (self.TOPIC_PATH,) self.assertEqual(connection._called_with['path'], path) - def test_topic_get(self): + def test_topic_create_already_exists(self): + from gcloud.exceptions import Conflict + connection = _Connection() + connection._no_response_error = Conflict + api = self._makeOne(connection) + + with self.assertRaises(Conflict): + api.topic_create(self.TOPIC_PATH) + + self.assertEqual(connection._called_with['method'], 'PUT') + path = '/%s' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + + def test_topic_get_hit(self): RETURNED = {'name': self.TOPIC_PATH} connection = _Connection(RETURNED) api = self._makeOne(connection) @@ -211,7 +224,19 @@ def test_topic_get(self): path = '/%s' % (self.TOPIC_PATH,) self.assertEqual(connection._called_with['path'], path) - def test_topic_delete(self): + def test_topic_get_miss(self): + from gcloud.exceptions import NotFound + connection = _Connection() + api = self._makeOne(connection) + + with self.assertRaises(NotFound): + api.topic_get(self.TOPIC_PATH) + + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + + def test_topic_delete_hit(self): RETURNED = {} connection = _Connection(RETURNED) api = self._makeOne(connection) @@ -222,7 +247,19 @@ def test_topic_delete(self): path = '/%s' % (self.TOPIC_PATH,) self.assertEqual(connection._called_with['path'], path) - def test_topic_publish(self): + def test_topic_delete_miss(self): + from gcloud.exceptions import NotFound + connection = _Connection() + api = self._makeOne(connection) + + with self.assertRaises(NotFound): + api.topic_delete(self.TOPIC_PATH) + + self.assertEqual(connection._called_with['method'], 'DELETE') + path = '/%s' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + + def test_topic_publish_hit(self): import base64 PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') @@ -241,6 +278,24 @@ def test_topic_publish(self): self.assertEqual(connection._called_with['data'], {'messages': [MESSAGE]}) + def test_topic_publish_miss(self): + import base64 + from gcloud.exceptions import NotFound + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MESSAGE = {'data': B64, 'attributes': {}} + connection = _Connection() + api = self._makeOne(connection) + + with self.assertRaises(NotFound): + api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:publish' % (self.TOPIC_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], + {'messages': [MESSAGE]}) + def test_topic_list_subscriptions_no_paging(self): SUB_INFO = {'name': self.SUB_PATH, 'topic': self.TOPIC_PATH} RETURNED = {'subscriptions': [SUB_INFO]} @@ -306,6 +361,19 @@ def test_topic_list_subscriptions_missing_key(self): self.assertEqual(connection._called_with['path'], path) self.assertEqual(connection._called_with['query_params'], {}) + def test_topic_list_subscriptions_miss(self): + from gcloud.exceptions import NotFound + connection = _Connection() + api = self._makeOne(connection) + + with self.assertRaises(NotFound): + api.topic_list_subscriptions(self.TOPIC_PATH) + + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_TOPIC_SUBSCRIPTIONS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], {}) + class Test_SubscriberAPI(_Base): @@ -665,11 +733,17 @@ def test_test_iam_permissions_missing_key(self): class _Connection(object): _called_with = None + _no_response_error = None def __init__(self, *responses): self._responses = responses def api_request(self, **kw): + from gcloud.exceptions import NotFound self._called_with = kw - response, self._responses = self._responses[0], self._responses[1:] + try: + response, self._responses = self._responses[0], self._responses[1:] + except IndexError: + err_class = self._no_response_error or NotFound + raise err_class('miss') return response From 609f323e33ad35c1fde4efc837d907bad4718f82 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 3 May 2016 16:26:52 -0400 Subject: [PATCH 07/13] Make GAX 'topic_delete' consistent w/ JSON version. I.e., raise 'NotFound' on a miss. --- gcloud/pubsub/_gax.py | 2 +- gcloud/pubsub/test__gax.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 39531477cdd3..2746f6ffaf3e 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -117,7 +117,7 @@ def topic_delete(self, topic_path): try: self._gax_api.delete_topic(topic_path) except GaxError: - pass + raise NotFound(topic_path) def topic_publish(self, topic_path, messages): """API call: publish one or more messages to a topic diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index 95801d5a97b3..2c60fc726f43 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -122,10 +122,12 @@ def test_topic_delete_hit(self): self.assertEqual(options, None) def test_topic_delete_miss(self): + from gcloud.exceptions import NotFound gax_api = _GAXPublisherAPI(_delete_topic_ok=False) api = self._makeOne(gax_api) - api.topic_delete(self.TOPIC_PATH) + with self.assertRaises(NotFound): + api.topic_delete(self.TOPIC_PATH) topic_path, options = gax_api._delete_topic_called_with self.assertEqual(topic_path, self.TOPIC_PATH) From cbc24de1005e4abcca01d0c36184808a12b19485 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 10 May 2016 18:20:24 -0400 Subject: [PATCH 08/13] Pin grpcio at 0.13.1. 0.14.0 causes google.pubsub.v1 to barf (see: https://github.com/googleapis/googleapis/issues/17). --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index c3c20735bc5a..ff2af731ec0c 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ ] GRPC_EXTRAS = [ - 'grpcio >= 0.13.1', + 'grpcio == 0.13.1', 'gax-google-pubsub-v1', ] From e5f034bc5b9eebf658cde6f46eacb39630ddb86f Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Tue, 10 May 2016 18:22:18 -0400 Subject: [PATCH 09/13] Narrow redirected GaxErrors using 'cause.code'. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1764/#discussion_r61443749 et ff. --- gcloud/pubsub/_gax.py | 36 ++++++++++++++++++++++++------------ gcloud/pubsub/test__gax.py | 29 ++++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 2746f6ffaf3e..000c15d5509d 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -13,18 +13,20 @@ # limitations under the License. """GAX wrapper for Pubsub API requests.""" - try: # pylint: disable=no-name-in-module from google.gax import CallOptions from google.gax.errors import GaxError - from google.pubsub.v1.pubsub_pb2 import PubsubMessage + from google.gax.grpc import exc_to_code # pylint: enable=no-name-in-module except ImportError: # pragma: NO COVER _HAVE_GAX = False else: _HAVE_GAX = True + from google.pubsub.v1.pubsub_pb2 import PubsubMessage + from grpc.beta.interfaces import StatusCode + from gcloud.exceptions import Conflict from gcloud.exceptions import NotFound from gcloud._helpers import _to_bytes @@ -76,8 +78,10 @@ def topic_create(self, topic_path): """ try: topic_pb = self._gax_api.create_topic(topic_path) - except GaxError: - raise Conflict(topic_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: + raise Conflict(topic_path) + raise # pragma: NO COVER return {'name': topic_pb.name} def topic_get(self, topic_path): @@ -97,8 +101,10 @@ def topic_get(self, topic_path): """ try: topic_pb = self._gax_api.get_topic(topic_path) - except GaxError: - raise NotFound(topic_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise # pragma: NO COVER return {'name': topic_pb.name} def topic_delete(self, topic_path): @@ -116,8 +122,10 @@ def topic_delete(self, topic_path): """ try: self._gax_api.delete_topic(topic_path) - except GaxError: - raise NotFound(topic_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise # pragma: NO COVER def topic_publish(self, topic_path, messages): """API call: publish one or more messages to a topic @@ -141,8 +149,10 @@ def topic_publish(self, topic_path, messages): for message in messages] try: response = self._gax_api.publish(topic_path, message_pbs) - except GaxError: - raise NotFound(topic_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise # pragma: NO COVER return response.message_ids def topic_list_subscriptions(self, topic_path): @@ -165,8 +175,10 @@ def topic_list_subscriptions(self, topic_path): try: response = self._gax_api.list_topic_subscriptions( topic_path, options) - except GaxError: - raise NotFound(topic_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise # pragma: NO COVER subs = [{'topic': topic_path, 'name': subscription} for subscription in response.subscriptions] return subs, response.next_page_token diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index 2c60fc726f43..a9c88a314ce2 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -215,12 +215,31 @@ def list_topics(self, name, options): self._list_topics_called_with = name, options return self._list_topics_response + def _make_grpc_error(self, status_code): + from grpc.framework.interfaces.face.face import AbortionError + + class _DummyException(AbortionError): + code = status_code + + def __init__(self): + pass + + return _DummyException() + + def _make_grpc_not_found(self): + from grpc.beta.interfaces import StatusCode + return self._make_grpc_error(StatusCode.NOT_FOUND) + + def _make_grpc_failed_precondition(self): + from grpc.beta.interfaces import StatusCode + return self._make_grpc_error(StatusCode.FAILED_PRECONDITION) + def create_topic(self, name, options=None): # pylint: disable=no-name-in-module from google.gax.errors import GaxError self._create_topic_called_with = name, options if self._create_topic_conflict: - raise GaxError('conflict') + raise GaxError('conflict', self._make_grpc_failed_precondition()) return self._create_topic_response def get_topic(self, name, options=None): @@ -230,14 +249,14 @@ def get_topic(self, name, options=None): try: return self._get_topic_response except AttributeError: - raise GaxError('miss') + raise GaxError('miss', self._make_grpc_not_found()) def delete_topic(self, name, options=None): # pylint: disable=no-name-in-module from google.gax.errors import GaxError self._delete_topic_called_with = name, options if not self._delete_topic_ok: - raise GaxError('miss') + raise GaxError('miss', self._make_grpc_not_found()) def publish(self, topic, messages, options=None): # pylint: disable=no-name-in-module @@ -246,7 +265,7 @@ def publish(self, topic, messages, options=None): try: return self._publish_response except AttributeError: - raise GaxError('miss') + raise GaxError('miss', self._make_grpc_not_found()) def list_topic_subscriptions(self, topic, options=None): # pylint: disable=no-name-in-module @@ -255,7 +274,7 @@ def list_topic_subscriptions(self, topic, options=None): try: return self._list_topic_subscriptions_response except AttributeError: - raise GaxError('miss') + raise GaxError('miss', self._make_grpc_not_found()) class _TopicPB(object): From 20bea751681899e51534a9adf3007a45957cd344 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 12 May 2016 14:00:54 -0400 Subject: [PATCH 10/13] Don't shadow 'grpc' via '_testing' stubs. --- tox.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tox.ini b/tox.ini index 1a7bfef15472..f177130cb6ac 100644 --- a/tox.ini +++ b/tox.ini @@ -50,6 +50,8 @@ commands = deps = {[testenv]deps} coverage +setenv = + PYTHONPATH = [testenv:coveralls] basepython = {[testenv:cover]basepython} From 4877f5bda1de4052977fe5c5e2067d8dc1b1f8c6 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 12 May 2016 14:13:42 -0400 Subject: [PATCH 11/13] Move GAX-dependent class to top-level scope. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1764#issuecomment-217948111 Drop local 'no-name-in-module' pylint suppression (master now disables it in global config). --- gcloud/pubsub/_gax.py | 328 ++++++++++++++++++------------------- gcloud/pubsub/test__gax.py | 13 +- 2 files changed, 167 insertions(+), 174 deletions(-) diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 000c15d5509d..26821bdcc2e4 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -13,175 +13,171 @@ # limitations under the License. """GAX wrapper for Pubsub API requests.""" -try: - # pylint: disable=no-name-in-module - from google.gax import CallOptions - from google.gax.errors import GaxError - from google.gax.grpc import exc_to_code - # pylint: enable=no-name-in-module -except ImportError: # pragma: NO COVER - _HAVE_GAX = False -else: - _HAVE_GAX = True - - from google.pubsub.v1.pubsub_pb2 import PubsubMessage - from grpc.beta.interfaces import StatusCode - - from gcloud.exceptions import Conflict - from gcloud.exceptions import NotFound - from gcloud._helpers import _to_bytes - - class _PublisherAPI(object): - """Helper mapping publisher-related APIs. - - :type gax_api: :class:`google.pubsub.v1.publisher_api.PublisherApi` - :param gax_api: API object used to make GAX requests. + +# pylint: disable=import-error +from google.gax import CallOptions +from google.gax.errors import GaxError +from google.gax.grpc import exc_to_code +from google.pubsub.v1.pubsub_pb2 import PubsubMessage +from grpc.beta.interfaces import StatusCode +# pylint: enable=import-error + +from gcloud.exceptions import Conflict +from gcloud.exceptions import NotFound +from gcloud._helpers import _to_bytes + + +class _PublisherAPI(object): + """Helper mapping publisher-related APIs. + + :type gax_api: :class:`google.pubsub.v1.publisher_api.PublisherApi` + :param gax_api: API object used to make GAX requests. + """ + def __init__(self, gax_api): + self._gax_api = gax_api + + def list_topics(self, project): + """List topics for the project associated with this API. + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/list + + :type project: string + :param project: project ID + + :rtype: tuple, (list, str) + :returns: list of ``Topic`` resource dicts, plus a + "next page token" string: if not None, indicates that + more topics can be retrieved with another call (pass that + value as ``page_token``). """ - def __init__(self, gax_api): - self._gax_api = gax_api - - def list_topics(self, project): - """List topics for the project associated with this API. - - See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/list - - :type project: string - :param project: project ID - - :rtype: tuple, (list, str) - :returns: list of ``Topic`` resource dicts, plus a - "next page token" string: if not None, indicates that - more topics can be retrieved with another call (pass that - value as ``page_token``). - """ - options = CallOptions(is_page_streaming=False) - path = 'projects/%s' % (project,) - response = self._gax_api.list_topics(path, options) - topics = [{'name': topic_pb.name} for topic_pb in response.topics] - return topics, response.next_page_token - - def topic_create(self, topic_path): - """API call: create a topic - - See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create - - :type topic_path: string - :param topic_path: fully-qualified path of the new topic, in format - ``projects//topics/``. - - :rtype: dict - :returns: ``Topic`` resource returned from the API. - :raises: :exc:`gcloud.exceptions.Conflict` if the topic already - exists - """ - try: - topic_pb = self._gax_api.create_topic(topic_path) - except GaxError as exc: - if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: - raise Conflict(topic_path) - raise # pragma: NO COVER - return {'name': topic_pb.name} - - def topic_get(self, topic_path): - """API call: retrieve a topic - - See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get - - :type topic_path: string - :param topic_path: fully-qualified path of the topic, in format + options = CallOptions(is_page_streaming=False) + path = 'projects/%s' % (project,) + response = self._gax_api.list_topics(path, options) + topics = [{'name': topic_pb.name} for topic_pb in response.topics] + return topics, response.next_page_token + + def topic_create(self, topic_path): + """API call: create a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create + + :type topic_path: string + :param topic_path: fully-qualified path of the new topic, in format ``projects//topics/``. - :rtype: dict - :returns: ``Topic`` resource returned from the API. - :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not - exist - """ - try: - topic_pb = self._gax_api.get_topic(topic_path) - except GaxError as exc: - if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: - raise NotFound(topic_path) - raise # pragma: NO COVER - return {'name': topic_pb.name} - - def topic_delete(self, topic_path): - """API call: delete a topic - - See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create - - :type topic_path: string - :param topic_path: fully-qualified path of the new topic, in format - ``projects//topics/``. - - :rtype: dict - :returns: ``Topic`` resource returned from the API. - """ - try: - self._gax_api.delete_topic(topic_path) - except GaxError as exc: - if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: - raise NotFound(topic_path) - raise # pragma: NO COVER - - def topic_publish(self, topic_path, messages): - """API call: publish one or more messages to a topic - - See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish - - :type topic_path: string - :param topic_path: fully-qualified path of the topic, in format - ``projects//topics/``. - - :type messages: list of dict - :param messages: messages to be published. - - :rtype: list of string - :returns: list of opaque IDs for published messages. - :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not - exist - """ - message_pbs = [_message_pb_from_dict(message) - for message in messages] - try: - response = self._gax_api.publish(topic_path, message_pbs) - except GaxError as exc: - if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: - raise NotFound(topic_path) - raise # pragma: NO COVER - return response.message_ids - - def topic_list_subscriptions(self, topic_path): - """API call: list subscriptions bound to a topic - - See: - https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list - - :type topic_path: string - :param topic_path: fully-qualified path of the topic, in format - ``projects//topics/``. - - :rtype: list of strings - :returns: fully-qualified names of subscriptions for the supplied - topic. - :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not - exist - """ - options = CallOptions(is_page_streaming=False) - try: - response = self._gax_api.list_topic_subscriptions( - topic_path, options) - except GaxError as exc: - if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: - raise NotFound(topic_path) - raise # pragma: NO COVER - subs = [{'topic': topic_path, 'name': subscription} - for subscription in response.subscriptions] - return subs, response.next_page_token + :rtype: dict + :returns: ``Topic`` resource returned from the API. + :raises: :exc:`gcloud.exceptions.Conflict` if the topic already + exists + """ + try: + topic_pb = self._gax_api.create_topic(topic_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: + raise Conflict(topic_path) + raise # pragma: NO COVER + return {'name': topic_pb.name} + + def topic_get(self, topic_path): + """API call: retrieve a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get + + :type topic_path: string + :param topic_path: fully-qualified path of the topic, in format + ``projects//topics/``. + + :rtype: dict + :returns: ``Topic`` resource returned from the API. + :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not + exist + """ + try: + topic_pb = self._gax_api.get_topic(topic_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise # pragma: NO COVER + return {'name': topic_pb.name} + + def topic_delete(self, topic_path): + """API call: delete a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create + + :type topic_path: string + :param topic_path: fully-qualified path of the new topic, in format + ``projects//topics/``. + + :rtype: dict + :returns: ``Topic`` resource returned from the API. + """ + try: + self._gax_api.delete_topic(topic_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise # pragma: NO COVER + + def topic_publish(self, topic_path, messages): + """API call: publish one or more messages to a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish + + :type topic_path: string + :param topic_path: fully-qualified path of the topic, in format + ``projects//topics/``. + + :type messages: list of dict + :param messages: messages to be published. + + :rtype: list of string + :returns: list of opaque IDs for published messages. + :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not + exist + """ + message_pbs = [_message_pb_from_dict(message) + for message in messages] + try: + response = self._gax_api.publish(topic_path, message_pbs) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise # pragma: NO COVER + return response.message_ids + + def topic_list_subscriptions(self, topic_path): + """API call: list subscriptions bound to a topic + + See: + https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list + + :type topic_path: string + :param topic_path: fully-qualified path of the topic, in format + ``projects//topics/``. + + :rtype: list of strings + :returns: fully-qualified names of subscriptions for the supplied + topic. + :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not + exist + """ + options = CallOptions(is_page_streaming=False) + try: + response = self._gax_api.list_topic_subscriptions( + topic_path, options) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(topic_path) + raise # pragma: NO COVER + subs = [{'topic': topic_path, 'name': subscription} + for subscription in response.subscriptions] + return subs, response.next_page_token def _message_pb_from_dict(message): diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index a9c88a314ce2..c5718efab0fc 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -16,11 +16,13 @@ try: - # pylint: disable=no-name-in-module - from gcloud.pubsub._gax import _HAVE_GAX - # pylint: enable=no-name-in-module + # pylint: disable=unused-import + import gcloud.pubsub._gax + # pylint: enable=unused-import except ImportError: # pragma: NO COVER _HAVE_GAX = False +else: + _HAVE_GAX = True @unittest2.skipUnless(_HAVE_GAX, 'No gax-python') @@ -235,7 +237,6 @@ def _make_grpc_failed_precondition(self): return self._make_grpc_error(StatusCode.FAILED_PRECONDITION) def create_topic(self, name, options=None): - # pylint: disable=no-name-in-module from google.gax.errors import GaxError self._create_topic_called_with = name, options if self._create_topic_conflict: @@ -243,7 +244,6 @@ def create_topic(self, name, options=None): return self._create_topic_response def get_topic(self, name, options=None): - # pylint: disable=no-name-in-module from google.gax.errors import GaxError self._get_topic_called_with = name, options try: @@ -252,14 +252,12 @@ def get_topic(self, name, options=None): raise GaxError('miss', self._make_grpc_not_found()) def delete_topic(self, name, options=None): - # pylint: disable=no-name-in-module from google.gax.errors import GaxError self._delete_topic_called_with = name, options if not self._delete_topic_ok: raise GaxError('miss', self._make_grpc_not_found()) def publish(self, topic, messages, options=None): - # pylint: disable=no-name-in-module from google.gax.errors import GaxError self._publish_called_with = topic, messages, options try: @@ -268,7 +266,6 @@ def publish(self, topic, messages, options=None): raise GaxError('miss', self._make_grpc_not_found()) def list_topic_subscriptions(self, topic, options=None): - # pylint: disable=no-name-in-module from google.gax.errors import GaxError self._list_topic_subscriptions_called_with = topic, options try: From a3cc3b11b59438ff95af15dd7151f2d62430ae8c Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 12 May 2016 14:23:45 -0400 Subject: [PATCH 12/13] Coverage for unexpected GaxError. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1764#discussion_r62888373 --- gcloud/pubsub/_gax.py | 10 ++--- gcloud/pubsub/test__gax.py | 78 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 5 deletions(-) diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index 26821bdcc2e4..4b4832dbce37 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -77,7 +77,7 @@ def topic_create(self, topic_path): except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: raise Conflict(topic_path) - raise # pragma: NO COVER + raise return {'name': topic_pb.name} def topic_get(self, topic_path): @@ -100,7 +100,7 @@ def topic_get(self, topic_path): except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(topic_path) - raise # pragma: NO COVER + raise return {'name': topic_pb.name} def topic_delete(self, topic_path): @@ -121,7 +121,7 @@ def topic_delete(self, topic_path): except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(topic_path) - raise # pragma: NO COVER + raise def topic_publish(self, topic_path, messages): """API call: publish one or more messages to a topic @@ -148,7 +148,7 @@ def topic_publish(self, topic_path, messages): except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(topic_path) - raise # pragma: NO COVER + raise return response.message_ids def topic_list_subscriptions(self, topic_path): @@ -174,7 +174,7 @@ def topic_list_subscriptions(self, topic_path): except GaxError as exc: if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: raise NotFound(topic_path) - raise # pragma: NO COVER + raise subs = [{'topic': topic_path, 'name': subscription} for subscription in response.subscriptions] return subs, response.next_page_token diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index c5718efab0fc..2feae097c76f 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -89,6 +89,18 @@ def test_topic_create_already_exists(self): self.assertEqual(topic_path, self.TOPIC_PATH) self.assertEqual(options, None) + def test_topic_create_error(self): + from google.gax.errors import GaxError + gax_api = _GAXPublisherAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.topic_create(self.TOPIC_PATH) + + topic_path, options = gax_api._create_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + def test_topic_get_hit(self): topic_pb = _TopicPB(self.TOPIC_PATH) gax_api = _GAXPublisherAPI(_get_topic_response=topic_pb) @@ -113,6 +125,18 @@ def test_topic_get_miss(self): self.assertEqual(topic_path, self.TOPIC_PATH) self.assertEqual(options, None) + def test_topic_get_error(self): + from google.gax.errors import GaxError + gax_api = _GAXPublisherAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.topic_get(self.TOPIC_PATH) + + topic_path, options = gax_api._get_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + def test_topic_delete_hit(self): gax_api = _GAXPublisherAPI(_delete_topic_ok=True) api = self._makeOne(gax_api) @@ -135,6 +159,18 @@ def test_topic_delete_miss(self): self.assertEqual(topic_path, self.TOPIC_PATH) self.assertEqual(options, None) + def test_topic_delete_error(self): + from google.gax.errors import GaxError + gax_api = _GAXPublisherAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.topic_delete(self.TOPIC_PATH) + + topic_path, options = gax_api._delete_topic_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertEqual(options, None) + def test_topic_publish_hit(self): import base64 PAYLOAD = b'This is the message text' @@ -174,6 +210,25 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self): self.assertEqual(message_pb.attributes, {'foo': 'bar'}) self.assertEqual(options, None) + def test_topic_publish_error(self): + import base64 + from google.gax.errors import GaxError + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MESSAGE = {'data': B64, 'attributes': {}} + gax_api = _GAXPublisherAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + + topic_path, message_pbs, options = gax_api._publish_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + message_pb, = message_pbs + self.assertEqual(message_pb.data, B64) + self.assertEqual(message_pb.attributes, {}) + self.assertEqual(options, None) + def test_topic_list_subscriptions_no_paging(self): response = _ListTopicSubscriptionsResponsePB([self.SUB_PATH]) gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response) @@ -205,10 +260,23 @@ def test_topic_list_subscriptions_miss(self): self.assertEqual(topic_path, self.TOPIC_PATH) self.assertFalse(options.is_page_streaming) + def test_topic_list_subscriptions_error(self): + from google.gax.errors import GaxError + gax_api = _GAXPublisherAPI(_random_gax_error=True) + api = self._makeOne(gax_api) + + with self.assertRaises(GaxError): + api.topic_list_subscriptions(self.TOPIC_PATH) + + topic_path, options = gax_api._list_topic_subscriptions_called_with + self.assertEqual(topic_path, self.TOPIC_PATH) + self.assertFalse(options.is_page_streaming) + class _GAXPublisherAPI(object): _create_topic_conflict = False + _random_gax_error = False def __init__(self, **kw): self.__dict__.update(kw) @@ -239,6 +307,8 @@ def _make_grpc_failed_precondition(self): def create_topic(self, name, options=None): from google.gax.errors import GaxError self._create_topic_called_with = name, options + if self._random_gax_error: + raise GaxError('error') if self._create_topic_conflict: raise GaxError('conflict', self._make_grpc_failed_precondition()) return self._create_topic_response @@ -246,6 +316,8 @@ def create_topic(self, name, options=None): def get_topic(self, name, options=None): from google.gax.errors import GaxError self._get_topic_called_with = name, options + if self._random_gax_error: + raise GaxError('error') try: return self._get_topic_response except AttributeError: @@ -254,12 +326,16 @@ def get_topic(self, name, options=None): def delete_topic(self, name, options=None): from google.gax.errors import GaxError self._delete_topic_called_with = name, options + if self._random_gax_error: + raise GaxError('error') if not self._delete_topic_ok: raise GaxError('miss', self._make_grpc_not_found()) def publish(self, topic, messages, options=None): from google.gax.errors import GaxError self._publish_called_with = topic, messages, options + if self._random_gax_error: + raise GaxError('error') try: return self._publish_response except AttributeError: @@ -268,6 +344,8 @@ def publish(self, topic, messages, options=None): def list_topic_subscriptions(self, topic, options=None): from google.gax.errors import GaxError self._list_topic_subscriptions_called_with = topic, options + if self._random_gax_error: + raise GaxError('error') try: return self._list_topic_subscriptions_response except AttributeError: From 46720fa66c2bcc6ca1fd863f5ef4a834ce9069a6 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Thu, 12 May 2016 22:16:30 -0400 Subject: [PATCH 13/13] coveralls: Don't shadow 'grpc' via '_testing' stubs. --- tox.ini | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tox.ini b/tox.ini index f177130cb6ac..37dbbfbedac2 100644 --- a/tox.ini +++ b/tox.ini @@ -62,6 +62,8 @@ commands = deps = {[testenv:cover]deps} coveralls +setenv = + PYTHONPATH = passenv = {[testenv:system-tests]passenv} [testenv:docs]