Skip to content

Commit

Permalink
Move GAX-dependent class to top-level scope.
Browse files Browse the repository at this point in the history
Addresses:

#1764 (comment)

Drop local 'no-name-in-module' pylint suppression (master now disables
it in global config).
  • Loading branch information
tseaver committed May 12, 2016
1 parent 20bea75 commit 4877f5b
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 174 deletions.
328 changes: 162 additions & 166 deletions gcloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,175 +13,171 @@
# limitations under the License.

"""GAX wrapper for Pubsub API requests."""
try:
# pylint: disable=no-name-in-module
from google.gax import CallOptions
from google.gax.errors import GaxError
from google.gax.grpc import exc_to_code
# pylint: enable=no-name-in-module
except ImportError: # pragma: NO COVER
_HAVE_GAX = False
else:
_HAVE_GAX = True

from google.pubsub.v1.pubsub_pb2 import PubsubMessage
from grpc.beta.interfaces import StatusCode

from gcloud.exceptions import Conflict
from gcloud.exceptions import NotFound
from gcloud._helpers import _to_bytes

class _PublisherAPI(object):
"""Helper mapping publisher-related APIs.
:type gax_api: :class:`google.pubsub.v1.publisher_api.PublisherApi`
:param gax_api: API object used to make GAX requests.

# pylint: disable=import-error
from google.gax import CallOptions
from google.gax.errors import GaxError
from google.gax.grpc import exc_to_code
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
from grpc.beta.interfaces import StatusCode
# pylint: enable=import-error

from gcloud.exceptions import Conflict
from gcloud.exceptions import NotFound
from gcloud._helpers import _to_bytes


class _PublisherAPI(object):
"""Helper mapping publisher-related APIs.
:type gax_api: :class:`google.pubsub.v1.publisher_api.PublisherApi`
:param gax_api: API object used to make GAX requests.
"""
def __init__(self, gax_api):
self._gax_api = gax_api

def list_topics(self, project):
"""List topics for the project associated with this API.
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/list
:type project: string
:param project: project ID
:rtype: tuple, (list, str)
:returns: list of ``Topic`` resource dicts, plus a
"next page token" string: if not None, indicates that
more topics can be retrieved with another call (pass that
value as ``page_token``).
"""
def __init__(self, gax_api):
self._gax_api = gax_api

def list_topics(self, project):
"""List topics for the project associated with this API.
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/list
:type project: string
:param project: project ID
:rtype: tuple, (list, str)
:returns: list of ``Topic`` resource dicts, plus a
"next page token" string: if not None, indicates that
more topics can be retrieved with another call (pass that
value as ``page_token``).
"""
options = CallOptions(is_page_streaming=False)
path = 'projects/%s' % (project,)
response = self._gax_api.list_topics(path, options)
topics = [{'name': topic_pb.name} for topic_pb in response.topics]
return topics, response.next_page_token

def topic_create(self, topic_path):
"""API call: create a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create
:type topic_path: string
:param topic_path: fully-qualified path of the new topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:rtype: dict
:returns: ``Topic`` resource returned from the API.
:raises: :exc:`gcloud.exceptions.Conflict` if the topic already
exists
"""
try:
topic_pb = self._gax_api.create_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
raise Conflict(topic_path)
raise # pragma: NO COVER
return {'name': topic_pb.name}

def topic_get(self, topic_path):
"""API call: retrieve a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get
:type topic_path: string
:param topic_path: fully-qualified path of the topic, in format
options = CallOptions(is_page_streaming=False)
path = 'projects/%s' % (project,)
response = self._gax_api.list_topics(path, options)
topics = [{'name': topic_pb.name} for topic_pb in response.topics]
return topics, response.next_page_token

def topic_create(self, topic_path):
"""API call: create a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create
:type topic_path: string
:param topic_path: fully-qualified path of the new topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:rtype: dict
:returns: ``Topic`` resource returned from the API.
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
try:
topic_pb = self._gax_api.get_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise # pragma: NO COVER
return {'name': topic_pb.name}

def topic_delete(self, topic_path):
"""API call: delete a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create
:type topic_path: string
:param topic_path: fully-qualified path of the new topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:rtype: dict
:returns: ``Topic`` resource returned from the API.
"""
try:
self._gax_api.delete_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise # pragma: NO COVER

def topic_publish(self, topic_path, messages):
"""API call: publish one or more messages to a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish
:type topic_path: string
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:type messages: list of dict
:param messages: messages to be published.
:rtype: list of string
:returns: list of opaque IDs for published messages.
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
message_pbs = [_message_pb_from_dict(message)
for message in messages]
try:
response = self._gax_api.publish(topic_path, message_pbs)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise # pragma: NO COVER
return response.message_ids

def topic_list_subscriptions(self, topic_path):
"""API call: list subscriptions bound to a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list
:type topic_path: string
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:rtype: list of strings
:returns: fully-qualified names of subscriptions for the supplied
topic.
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
options = CallOptions(is_page_streaming=False)
try:
response = self._gax_api.list_topic_subscriptions(
topic_path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise # pragma: NO COVER
subs = [{'topic': topic_path, 'name': subscription}
for subscription in response.subscriptions]
return subs, response.next_page_token
:rtype: dict
:returns: ``Topic`` resource returned from the API.
:raises: :exc:`gcloud.exceptions.Conflict` if the topic already
exists
"""
try:
topic_pb = self._gax_api.create_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
raise Conflict(topic_path)
raise # pragma: NO COVER
return {'name': topic_pb.name}

def topic_get(self, topic_path):
"""API call: retrieve a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/get
:type topic_path: string
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:rtype: dict
:returns: ``Topic`` resource returned from the API.
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
try:
topic_pb = self._gax_api.get_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise # pragma: NO COVER
return {'name': topic_pb.name}

def topic_delete(self, topic_path):
"""API call: delete a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/create
:type topic_path: string
:param topic_path: fully-qualified path of the new topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:rtype: dict
:returns: ``Topic`` resource returned from the API.
"""
try:
self._gax_api.delete_topic(topic_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise # pragma: NO COVER

def topic_publish(self, topic_path, messages):
"""API call: publish one or more messages to a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics/publish
:type topic_path: string
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:type messages: list of dict
:param messages: messages to be published.
:rtype: list of string
:returns: list of opaque IDs for published messages.
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
message_pbs = [_message_pb_from_dict(message)
for message in messages]
try:
response = self._gax_api.publish(topic_path, message_pbs)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise # pragma: NO COVER
return response.message_ids

def topic_list_subscriptions(self, topic_path):
"""API call: list subscriptions bound to a topic
See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.topics.subscriptions/list
:type topic_path: string
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
:rtype: list of strings
:returns: fully-qualified names of subscriptions for the supplied
topic.
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
options = CallOptions(is_page_streaming=False)
try:
response = self._gax_api.list_topic_subscriptions(
topic_path, options)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise # pragma: NO COVER
subs = [{'topic': topic_path, 'name': subscription}
for subscription in response.subscriptions]
return subs, response.next_page_token


def _message_pb_from_dict(message):
Expand Down
13 changes: 5 additions & 8 deletions gcloud/pubsub/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@


try:
# pylint: disable=no-name-in-module
from gcloud.pubsub._gax import _HAVE_GAX
# pylint: enable=no-name-in-module
# pylint: disable=unused-import
import gcloud.pubsub._gax
# pylint: enable=unused-import
except ImportError: # pragma: NO COVER
_HAVE_GAX = False
else:
_HAVE_GAX = True


@unittest2.skipUnless(_HAVE_GAX, 'No gax-python')
Expand Down Expand Up @@ -235,15 +237,13 @@ def _make_grpc_failed_precondition(self):
return self._make_grpc_error(StatusCode.FAILED_PRECONDITION)

def create_topic(self, name, options=None):
# pylint: disable=no-name-in-module
from google.gax.errors import GaxError
self._create_topic_called_with = name, options
if self._create_topic_conflict:
raise GaxError('conflict', self._make_grpc_failed_precondition())
return self._create_topic_response

def get_topic(self, name, options=None):
# pylint: disable=no-name-in-module
from google.gax.errors import GaxError
self._get_topic_called_with = name, options
try:
Expand All @@ -252,14 +252,12 @@ def get_topic(self, name, options=None):
raise GaxError('miss', self._make_grpc_not_found())

def delete_topic(self, name, options=None):
# pylint: disable=no-name-in-module
from google.gax.errors import GaxError
self._delete_topic_called_with = name, options
if not self._delete_topic_ok:
raise GaxError('miss', self._make_grpc_not_found())

def publish(self, topic, messages, options=None):
# pylint: disable=no-name-in-module
from google.gax.errors import GaxError
self._publish_called_with = topic, messages, options
try:
Expand All @@ -268,7 +266,6 @@ def publish(self, topic, messages, options=None):
raise GaxError('miss', self._make_grpc_not_found())

def list_topic_subscriptions(self, topic, options=None):
# pylint: disable=no-name-in-module
from google.gax.errors import GaxError
self._list_topic_subscriptions_called_with = topic, options
try:
Expand Down

0 comments on commit 4877f5b

Please sign in to comment.