diff --git a/docs/pubsub-api.rst b/docs/pubsub-api.rst index 0346bd1b09dc..434cd7c2c5aa 100644 --- a/docs/pubsub-api.rst +++ b/docs/pubsub-api.rst @@ -5,25 +5,18 @@ Pub/Sub ------- -:mod:`gcloud.pubsub` -~~~~~~~~~~~~~~~~~~~~~~~ - -.. automodule:: gcloud.pubsub - :members: get_connection, get_default_connection, - set_default_connection, set_defaults - -Connections +Client ~~~~~~~~~~~ -.. automodule:: gcloud.pubsub.connection +.. automodule:: gcloud.pubsub.client :members: :undoc-members: :show-inheritance: -Interacting with the API -~~~~~~~~~~~~~~~~~~~~~~~~ +Connections +~~~~~~~~~~~ -.. automodule:: gcloud.pubsub.api +.. automodule:: gcloud.pubsub.connection :members: :undoc-members: :show-inheritance: diff --git a/docs/pubsub-usage.rst b/docs/pubsub-usage.rst index 5c6e80d7338a..4ef7355425ec 100644 --- a/docs/pubsub-usage.rst +++ b/docs/pubsub-usage.rst @@ -1,16 +1,28 @@ Using the API ============= -Connection / Authorization --------------------------- +Authorization / Configuration +----------------------------- -- Inferred defaults used to create connection if none configured explicitly: +- Use :class:`Client ` objects to configure + your applications. - - credentials (derived from GAE / GCE environ if present). +- :class:`Client ` objects hold both a ``project`` + and an authenticated connection to the PubSub service. - - ``project`` (derived from GAE / GCE environ if present). +- The authentication credentials can be implicitly determined from the + environment or directly via + :meth:`from_service_account_json ` + and + :meth:`from_service_account_p12 `. - - ``scopes`` +- After setting ``GOOGLE_APPLICATION_CREDENTIALS`` and ``GCLOUD_PROJECT`` + environment variables, create a :class:`Client ` + + .. doctest:: + + >>> from gcloud import pubsub + >>> client = pubsub.Client() Manage topics for a project @@ -20,24 +32,18 @@ Create a new topic for the default project: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> topic = Topic('topic_name') - >>> topic.create() # API request - -Create a new topic for an explicit project: - -.. doctest:: - - >>> from gcloud.pubsub import Topic - >>> topic = Topic('topic_name', project='my.project') + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') >>> topic.create() # API request -Check for the existance of a topic: +Check for the existence of a topic: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> topic = Topic('topic_name') + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') >>> topic.exists() # API request True @@ -45,17 +51,9 @@ List topics for the default project: .. doctest:: - >>> from gcloud.pubsub import list_topics - >>> topics, next_page_token = list_topics() # API request - >>> [topic.name for topic in topics] - ['topic_name'] - -List topics for an explicit project: - -.. doctest:: - - >>> from gcloud.pubsub import list_topics - >>> topics, next_page_token = list_topics(project='my.project') # API request + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topics, next_page_token = client.list_topics() # API request >>> [topic.name for topic in topics] ['topic_name'] @@ -63,8 +61,9 @@ Delete a topic: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> topic = Topic('topic_name') + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') >>> topic.delete() # API request @@ -75,8 +74,9 @@ Publish a single message to a topic, without attributes: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> topic = Topic('topic_name') + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') >>> topic.publish('this is the message_payload') # API request @@ -84,8 +84,9 @@ Publish a single message to a topic, with attributes: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> topic = Topic('topic_name') + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') >>> topic.publish('this is another message_payload', ... attr1='value1', attr2='value2') # API request @@ -94,8 +95,9 @@ Publish a set of messages to a topic (as a single request): .. doctest:: - >>> from gcloud.pubsub import Topic - >>> topic = Topic('topic_name') + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') >>> with topic.batch() as batch: ... batch.publish('this is the first message_payload') ... batch.publish('this is the second message_payload', @@ -116,41 +118,43 @@ Create a new pull subscription for a topic: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> from gcloud.pubsub import Subscription - >>> topic = Topic('topic_name') - >>> subscription = Subscription('subscription_name', topic) + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') + >>> subscription = pubsub.Subscription('subscription_name', topic) >>> subscription.create() # API request Create a new pull subscription for a topic with a non-default ACK deadline: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> from gcloud.pubsub import Subscription - >>> topic = Topic('topic_name') - >>> subscription = Subscription('subscription_name', ack_deadline=90) + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') + >>> subscription = pubsub.Subscription('subscription_name', topic, + ... ack_deadline=90) >>> subscription.create() # API request Create a new push subscription for a topic: .. doctest:: + >>> from gcloud import pubsub >>> ENDPOINT = 'https://example.com/hook' - >>> from gcloud.pubsub import Topic - >>> from gcloud.pubsub import Subscription - >>> topic = Topic('topic_name') - >>> subscription = Subscription('subscription_name', push_endpoint=ENDPOINT) + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') + >>> subscription = pubsub.Subscription('subscription_name', topic, + ... push_endpoint=ENDPOINT) >>> subscription.create() # API request Check for the existence of a subscription: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> from gcloud.pubsub import Subscription - >>> topic = Topic('topic_name') - >>> subscription = Subscription('subscription_name', topic) + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') + >>> subscription = pubsub.Subscription('subscription_name', topic) >>> subscription.exists() # API request True @@ -158,31 +162,33 @@ Convert a pull subscription to push: .. doctest:: + >>> from gcloud import pubsub >>> ENDPOINT = 'https://example.com/hook' - >>> from gcloud.pubsub import Topic - >>> from gcloud.pubsub import Subscription - >>> topic = Topic('topic_name') - >>> subscription = Subscription('subscription_name', topic) + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') + >>> subscription = pubsub.Subscription('subscription_name', topic) >>> subscription.modify_push_configuration(push_endpoint=ENDPOINT) # API request Convert a push subscription to pull: .. doctest:: + >>> from gcloud import pubsub >>> ENDPOINT = 'https://example.com/hook' - >>> from gcloud.pubsub import Topic - >>> topic = Topic('topic_name') - >>> subscription = Subscription('subscription_name', topic, - ... push_endpoint=ENDPOINT) + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') + >>> subscription = pubusb.Subscription('subscription_name', topic, + ... push_endpoint=ENDPOINT) >>> subscription.modify_push_configuration(push_endpoint=None) # API request List subscriptions for a topic: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> topic = Topic('topic_name') - >>> subscriptions, next_page_token = topic.list_subscriptions() # API request + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> subscriptions, next_page_token = client.list_subscriptions( + ... topic_name='topic_name') # API request >>> [subscription.name for subscription in subscriptions] ['subscription_name'] @@ -190,8 +196,9 @@ List all subscriptions for the default project: .. doctest:: - >>> from gcloud.pubsub import list_subscriptions - >>> subscription, next_page_tokens = list_subscriptions() # API request + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> subscription, next_page_tokens = client.list_subscriptions() # API request >>> [subscription.name for subscription in subscriptions] ['subscription_name'] @@ -199,10 +206,10 @@ Delete a subscription: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> from gcloud.pubsub import Subscription - >>> topic = Topic('topic_name') - >>> subscription = Subscription('subscription_name', topic) + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') + >>> subscription = pubsub.Subscription('subscription_name', topic) >>> subscription.delete() # API request @@ -213,13 +220,13 @@ Fetch pending messages for a pull subscription: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> from gcloud.pubsub import Subscription - >>> topic = Topic('topic_name') - >>> subscription = Subscription('subscription_name', topic) - >>> with topic: - ... topic.publish('this is the first message_payload') - ... topic.publish('this is the second message_payload', + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') + >>> subscription = pubsub.Subscription('subscription_name', topic) + >>> with topic.batch() as batch: + ... batch.publish('this is the first message_payload') + ... batch.publish('this is the second message_payload', ... attr1='value1', attr2='value2') >>> received = subscription.pull() # API request >>> messages = [recv[1] for recv in received] @@ -242,13 +249,13 @@ Fetch a limited number of pending messages for a pull subscription: .. doctest:: - >>> from gcloud.pubsub import Topic - >>> from gcloud.pubsub import Subscription - >>> topic = Topic('topic_name') - >>> subscription = Subscription('subscription_name', topic) - >>> with topic: - ... topic.publish('this is the first message_payload') - ... topic.publish('this is the second message_payload', + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') + >>> subscription = pubsub.Subscription('subscription_name', topic) + >>> with topic.batch() as batch: + ... batch.publish('this is the first message_payload') + ... batch.publish('this is the second message_payload', ... attr1='value1', attr2='value2') >>> received = subscription.pull(max_messages=1) # API request >>> messages = [recv[1] for recv in received] @@ -258,10 +265,10 @@ Fetch messages for a pull subscription without blocking (none pending): .. doctest:: - >>> from gcloud.pubsub import Topic - >>> from gcloud.pubsub import Subscription - >>> topic = Topic('topic_name') - >>> subscription = Subscription('subscription_name', topic) + >>> from gcloud import pubsub + >>> client = pubsub.Client() + >>> topic = client.topic('topic_name') + >>> subscription = pubsub.Subscription('subscription_name', topic) >>> received = subscription.pull(max_messages=1) # API request >>> messages = [recv[1] for recv in received] >>> [message.id for message in messages] diff --git a/gcloud/pubsub/__init__.py b/gcloud/pubsub/__init__.py index 69bca901f2a9..155454c30ab0 100644 --- a/gcloud/pubsub/__init__.py +++ b/gcloud/pubsub/__init__.py @@ -14,7 +14,6 @@ """GCloud Pubsub API wrapper. - The main concepts with this API are: - :class:`gcloud.pubsub.topic.Topic` represents an endpoint to which messages @@ -24,54 +23,7 @@ subscription (either pull or push) to a topic. """ -from gcloud._helpers import get_default_project -from gcloud._helpers import set_default_project -from gcloud.pubsub import _implicit_environ -from gcloud.pubsub._implicit_environ import get_default_connection -from gcloud.pubsub.api import list_subscriptions -from gcloud.pubsub.api import list_topics +from gcloud.pubsub.client import Client from gcloud.pubsub.connection import SCOPE -from gcloud.pubsub.connection import Connection from gcloud.pubsub.subscription import Subscription from gcloud.pubsub.topic import Topic - - -def set_default_connection(connection=None): - """Set default connection either explicitly or implicitly as fall-back. - - :type connection: :class:`gcloud.pubsub.connection.Connection` - :param connection: A connection provided to be the default. - """ - _implicit_environ._DEFAULTS.connection = connection or get_connection() - - -def set_defaults(project=None, connection=None): - """Set defaults either explicitly or implicitly as fall-back. - - Uses the arguments to call the individual default methods. - - :type project: string - :param project: Optional. The name of the project to connect to. - - :type connection: :class:`gcloud.pubsub.connection.Connection` - :param connection: Optional. A connection provided to be the default. - """ - set_default_project(project=project) - set_default_connection(connection=connection) - - -def get_connection(): - """Shortcut method to establish a connection to Cloud Storage. - - Use this if you are going to access several buckets with the same - set of credentials: - - >>> from gcloud import pubsub - >>> connection = pubsub.get_connection() - >>> bucket1 = pubsub.get_bucket('bucket1', connection=connection) - >>> bucket2 = pubsub.get_bucket('bucket2', connection=connection) - - :rtype: :class:`gcloud.pubsub.connection.Connection` - :returns: A connection defined with the proper credentials. - """ - return Connection.from_environment() diff --git a/gcloud/pubsub/_implicit_environ.py b/gcloud/pubsub/_implicit_environ.py deleted file mode 100644 index 9e8b90cbeb67..000000000000 --- a/gcloud/pubsub/_implicit_environ.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright 2015 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Module to provide implicit behavior based on enviroment. - -Allows the pubsub package to infer the default connection from the enviroment. -""" - - -class _DefaultsContainer(object): - """Container for defaults. - - :type connection: :class:`gcloud.pubsub.connection.Connection` - :param connection: Persistent implied connection from environment. - """ - - def __init__(self, connection=None): - self.connection = connection - - -def get_default_connection(): - """Get default connection. - - :rtype: :class:`gcloud.pubsub.connection.Connection` or ``NoneType`` - :returns: The default connection if one has been set. - """ - return _DEFAULTS.connection - - -def _require_connection(connection=None): - """Infer a connection from the environment, if not passed explicitly. - - :type connection: :class:`gcloud.pubsub.connection.Connection` - :param connection: Optional. - - :rtype: :class:`gcloud.pubsub.connection.Connection` - :returns: A connection based on the current environment. - :raises: :class:`EnvironmentError` if ``connection`` is ``None``, and - cannot be inferred from the environment. - """ - if connection is None: - connection = get_default_connection() - - if connection is None: - raise EnvironmentError('Connection could not be inferred.') - - return connection - - -_DEFAULTS = _DefaultsContainer() diff --git a/gcloud/pubsub/_testing.py b/gcloud/pubsub/_testing.py deleted file mode 100644 index 26a69ec95a3a..000000000000 --- a/gcloud/pubsub/_testing.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Shared pubsub testing utilities.""" - -from gcloud._testing import _Monkey -from gcloud.pubsub import _implicit_environ -from gcloud.pubsub._implicit_environ import _DefaultsContainer - - -def _monkey_defaults(*args, **kwargs): - mock_defaults = _DefaultsContainer(*args, **kwargs) - return _Monkey(_implicit_environ, _DEFAULTS=mock_defaults) - - -def _setup_defaults(test_case, *args, **kwargs): - test_case._replaced_defaults = _implicit_environ._DEFAULTS - _implicit_environ._DEFAULTS = _DefaultsContainer(*args, **kwargs) - - -def _tear_down_defaults(test_case): - _implicit_environ._DEFAULTS = test_case._replaced_defaults diff --git a/gcloud/pubsub/api.py b/gcloud/pubsub/api.py deleted file mode 100644 index 6849e17a98d9..000000000000 --- a/gcloud/pubsub/api.py +++ /dev/null @@ -1,130 +0,0 @@ -# Copyright 2015 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Define API functions (not bound to classes).""" - -from gcloud._helpers import get_default_project -from gcloud.pubsub._implicit_environ import _require_connection -from gcloud.pubsub.subscription import Subscription -from gcloud.pubsub.topic import Topic - - -def list_topics(page_size=None, page_token=None, - project=None, connection=None): - """List topics for a given project. - - See: - https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list - - :type page_size: int - :param page_size: maximum number of topics to return, If not passed, - defaults to a value set by the API. - - :type page_token: string - :param page_token: opaque marker for the next "page" of topics. If not - passed, the API will return the first page of topics. - - :type project: string - :param project: project ID to query. If not passed, defaults to the - project ID inferred from the environment. - - :type connection: :class:`gcloud.pubsub.connection.Connection` - :param connection: connection to use for the query. If not passed, - defaults to the connection inferred from the - environment. - - :rtype: tuple, (list, str) - :returns: list of :class:`gcloud.pubsub.topic.Topic`, plus a - "next page token" string: if not None, indicates that - more topics can be retrieved with another call (pass that - value as ``page_token``). - """ - if project is None: - project = get_default_project() - - connection = _require_connection(connection) - - params = {} - - if page_size is not None: - params['pageSize'] = page_size - - if page_token is not None: - params['pageToken'] = page_token - - path = '/projects/%s/topics' % project - resp = connection.api_request(method='GET', path=path, query_params=params) - topics = [Topic.from_api_repr(resource) for resource in resp['topics']] - return topics, resp.get('nextPageToken') - - -def list_subscriptions(page_size=None, page_token=None, topic_name=None, - project=None, connection=None): - """List subscriptions for a given project. - - See: - https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list - - and (where ``topic_name`` is passed): - https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/subscriptions/list - - :type page_size: int - :param page_size: maximum number of topics to return, If not passed, - defaults to a value set by the API. - - :type page_token: string - :param page_token: opaque marker for the next "page" of topics. If not - passed, the API will return the first page of topics. - - :type topic_name: string - :param topic_name: limit results to subscriptions bound to the given topic. - - :type project: string - :param project: project ID to query. If not passed, defaults to the - project ID inferred from the environment. - - :type connection: :class:`gcloud.pubsub.connection.Connection` - :param connection: connection to use for the query. If not passed, - defaults to the connection inferred from the - environment. - - :rtype: tuple, (list, str) - :returns: list of :class:`gcloud.pubsub.subscription.Subscription`, plus a - "next page token" string: if not None, indicates that - more topics can be retrieved with another call (pass that - value as ``page_token``). - """ - if project is None: - project = get_default_project() - - connection = _require_connection(connection) - - params = {} - - if page_size is not None: - params['pageSize'] = page_size - - if page_token is not None: - params['pageToken'] = page_token - - if topic_name is None: - path = '/projects/%s/subscriptions' % project - else: - path = '/projects/%s/topics/%s/subscriptions' % (project, topic_name) - - resp = connection.api_request(method='GET', path=path, query_params=params) - topics = {} - subscriptions = [Subscription.from_api_repr(resource, topics=topics) - for resource in resp['subscriptions']] - return subscriptions, resp.get('nextPageToken') diff --git a/gcloud/pubsub/client.py b/gcloud/pubsub/client.py new file mode 100644 index 000000000000..cca81da176c6 --- /dev/null +++ b/gcloud/pubsub/client.py @@ -0,0 +1,213 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""gcloud pubsub client for interacting with API.""" + + +from gcloud._helpers import _get_production_project +from gcloud.credentials import get_credentials +from gcloud.credentials import get_for_service_account_json +from gcloud.credentials import get_for_service_account_p12 +from gcloud.pubsub.connection import Connection +from gcloud.pubsub.subscription import Subscription +from gcloud.pubsub.topic import Topic + + +class Client(object): + """Client to bundle configuration needed for API requests. + + :type project: string + :param project: the project which the client acts on behalf of. Will be + passed when creating a topic. If not passed, + falls back to the default inferred from the environment. + + :type credentials: :class:`oauth2client.client.OAuth2Credentials` or + :class:`NoneType` + :param credentials: The OAuth2 Credentials to use for the connection + owned by this client. If not passed (and if no ``http`` + object is passed), falls back to the default inferred + from the environment. + + :type http: :class:`httplib2.Http` or class that defines ``request()``. + :param http: An optional HTTP object to make requests. If not passed, an + ``http`` object is created that is bound to the + ``credentials`` for the current object. + + :raises: :class:`ValueError` if the project is neither passed in nor + set in the environment. + """ + def __init__(self, project=None, credentials=None, http=None): + if project is None: + project = _get_production_project() + if project is None: + raise ValueError('Project was not passed and could not be ' + 'determined from the environment.') + self.project = project + + if credentials is None and http is None: + credentials = get_credentials() + self.connection = Connection(credentials=credentials, http=http) + + @classmethod + def from_service_account_json(cls, json_credentials_path, project=None): + """Factory to retrieve JSON credentials while creating client. + + :type json_credentials_path: string + :param json_credentials_path: The path to a private key file (this file + was given to you when you created the + service account). This file must contain + a JSON object with a private key and + other credentials information (downloaded + from the Google APIs console). + + :type project: string + :param project: the project which the client acts on behalf of. Will be + passed when creating a topic. If not passed, falls + back to the default inferred from the environment. + + :rtype: :class:`gcloud.pubsub.client.Client` + :returns: The client created with the retrieved JSON credentials. + """ + credentials = get_for_service_account_json(json_credentials_path) + return cls(project=project, credentials=credentials) + + @classmethod + def from_service_account_p12(cls, client_email, private_key_path, + project=None): + """Factory to retrieve P12 credentials while creating client. + + .. note:: + Unless you have an explicit reason to use a PKCS12 key for your + service account, we recommend using a JSON key. + + :type client_email: string + :param client_email: The e-mail attached to the service account. + + :type private_key_path: string + :param private_key_path: The path to a private key file (this file was + given to you when you created the service + account). This file must be in P12 format. + + :type project: string + :param project: the project which the client acts on behalf of. Will be + passed when creating a topic. If not passed, falls + back to the default inferred from the environment. + + :rtype: :class:`gcloud.pubsub.client.Client` + :returns: The client created with the retrieved P12 credentials. + """ + credentials = get_for_service_account_p12(client_email, + private_key_path) + return cls(project=project, credentials=credentials) + + def list_topics(self, page_size=None, page_token=None): + """List topics for the project associated with this client. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list + + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + + :type page_token: string + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of + topics. + + :rtype: tuple, (list, str) + :returns: list of :class:`gcloud.pubsub.topic.Topic`, plus a + "next page token" string: if not None, indicates that + more topics can be retrieved with another call (pass that + value as ``page_token``). + """ + params = {} + + if page_size is not None: + params['pageSize'] = page_size + + if page_token is not None: + params['pageToken'] = page_token + + path = '/projects/%s/topics' % (self.project,) + resp = self.connection.api_request(method='GET', path=path, + query_params=params) + topics = [Topic.from_api_repr(resource, self) + for resource in resp['topics']] + return topics, resp.get('nextPageToken') + + def list_subscriptions(self, page_size=None, page_token=None, + topic_name=None): + """List subscriptions for the project associated with this client. + + See: + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/list + + and (where ``topic_name`` is passed): + https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/subscriptions/list + + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + + :type page_token: string + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of + topics. + + :type topic_name: string + :param topic_name: limit results to subscriptions bound to the given + topic. + + :rtype: tuple, (list, str) + :returns: list of :class:`gcloud.pubsub.subscription.Subscription`, + plus a "next page token" string: if not None, indicates that + more topics can be retrieved with another call (pass that + value as ``page_token``). + """ + params = {} + + if page_size is not None: + params['pageSize'] = page_size + + if page_token is not None: + params['pageToken'] = page_token + + if topic_name is None: + path = '/projects/%s/subscriptions' % (self.project,) + else: + path = '/projects/%s/topics/%s/subscriptions' % (self.project, + topic_name) + + resp = self.connection.api_request(method='GET', path=path, + query_params=params) + topics = {} + subscriptions = [Subscription.from_api_repr(resource, self, + topics=topics) + for resource in resp['subscriptions']] + return subscriptions, resp.get('nextPageToken') + + def topic(self, name, timestamp_messages=False): + """Creates a topic bound to the current client. + + :type name: string + :param name: the name of the topic to be constructed. + + :type timestamp_messages: boolean + :param timestamp_messages: To be passed to ``Topic`` constructor. + + :rtype: :class:`gcloud.pubsub.topic.Topic` + :returns: Topic created with the current client. + """ + return Topic(name, client=self, timestamp_messages=timestamp_messages) diff --git a/gcloud/pubsub/subscription.py b/gcloud/pubsub/subscription.py index 51154f5bb239..cb3023a286d7 100644 --- a/gcloud/pubsub/subscription.py +++ b/gcloud/pubsub/subscription.py @@ -17,7 +17,6 @@ from gcloud.exceptions import NotFound from gcloud.pubsub.message import Message from gcloud.pubsub.topic import Topic -from gcloud.pubsub._implicit_environ import _require_connection class Subscription(object): @@ -47,24 +46,30 @@ def __init__(self, name, topic, ack_deadline=None, push_endpoint=None): self.push_endpoint = push_endpoint @classmethod - def from_api_repr(cls, resource, topics=None): + def from_api_repr(cls, resource, client, topics=None): """Factory: construct a topic given its API representation :type resource: dict :param resource: topic resource representation returned from the API + :type client: :class:`gcloud.pubsub.client.Client` + :param client: Client which holds credentials and project + configuration for a topic. + :type topics: dict or None :param topics: A mapping of topic names -> topics. If not passed, the subscription will have a newly-created topic. :rtype: :class:`gcloud.pubsub.subscription.Subscription` + :returns: Subscription parsed from ``resource``. """ if topics is None: topics = {} t_name = resource['topic'] topic = topics.get(t_name) if topic is None: - topic = topics[t_name] = Topic.from_api_repr({'name': t_name}) + topic = topics[t_name] = Topic.from_api_repr({'name': t_name}, + client) _, _, _, name = resource['name'].split('/') ack_deadline = resource.get('ackDeadlineSeconds') push_config = resource.get('pushConfig', {}) @@ -77,15 +82,30 @@ def path(self): project = self.topic.project return '/projects/%s/subscriptions/%s' % (project, self.name) - def create(self, connection=None): + def _require_client(self, client): + """Check client or verify over-ride. + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the topic of the + current subscription. + + :rtype: :class:`gcloud.pubsub.client.Client` + :returns: The client passed in or the currently bound client. + """ + if client is None: + client = self.topic._client + return client + + def create(self, client=None): """API call: create the subscription via a PUT request See: https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/create - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the topic's connection. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. """ data = {'topic': self.topic.full_name} @@ -95,44 +115,44 @@ def create(self, connection=None): if self.push_endpoint is not None: data['pushConfig'] = {'pushEndpoint': self.push_endpoint} - connection = _require_connection(connection) - connection.api_request(method='PUT', path=self.path, data=data) + client = self._require_client(client) + client.connection.api_request(method='PUT', path=self.path, data=data) - def exists(self, connection=None): + def exists(self, client=None): """API call: test existence of the subscription via a GET request See https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the topic's connection. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. """ - connection = _require_connection(connection) + client = self._require_client(client) try: - connection.api_request(method='GET', path=self.path) + client.connection.api_request(method='GET', path=self.path) except NotFound: return False else: return True - def reload(self, connection=None): + def reload(self, client=None): """API call: sync local subscription configuration via a GET request See https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the topic's connection. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. """ - connection = _require_connection(connection) - data = connection.api_request(method='GET', path=self.path) + client = self._require_client(client) + data = client.connection.api_request(method='GET', path=self.path) self.ack_deadline = data.get('ackDeadline') push_config = data.get('pushConfig', {}) self.push_endpoint = push_config.get('pushEndpoint') - def modify_push_configuration(self, push_endpoint, connection=None): + def modify_push_configuration(self, push_endpoint, client=None): """API call: update the push endpoint for the subscription. See: @@ -143,21 +163,21 @@ def modify_push_configuration(self, push_endpoint, connection=None): back-end. If None, the application must pull messages. - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the topic's connection. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. """ - connection = _require_connection(connection) + client = self._require_client(client) data = {} config = data['pushConfig'] = {} if push_endpoint is not None: config['pushEndpoint'] = push_endpoint - connection.api_request(method='POST', - path='%s:modifyPushConfig' % self.path, - data=data) + client.connection.api_request( + method='POST', path='%s:modifyPushConfig' % (self.path,), + data=data) self.push_endpoint = push_endpoint - def pull(self, return_immediately=False, max_messages=1, connection=None): + def pull(self, return_immediately=False, max_messages=1, client=None): """API call: retrieve messages for the subscription. See: @@ -172,25 +192,24 @@ def pull(self, return_immediately=False, max_messages=1, connection=None): :type max_messages: int :param max_messages: the maximum number of messages to return. - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the topic's connection. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. :rtype: list of (ack_id, message) tuples :returns: sequence of tuples: ``ack_id`` is the ID to be used in a subsequent call to :meth:`acknowledge`, and ``message`` is an instance of :class:`gcloud.pubsub.message.Message`. """ - connection = _require_connection(connection) + client = self._require_client(client) data = {'returnImmediately': return_immediately, 'maxMessages': max_messages} - response = connection.api_request(method='POST', - path='%s:pull' % self.path, - data=data) + response = client.connection.api_request( + method='POST', path='%s:pull' % (self.path,), data=data) return [(info['ackId'], Message.from_api_repr(info['message'])) for info in response.get('receivedMessages', ())] - def acknowledge(self, ack_ids, connection=None): + def acknowledge(self, ack_ids, client=None): """API call: acknowledge retrieved messages for the subscription. See: @@ -199,17 +218,16 @@ def acknowledge(self, ack_ids, connection=None): :type ack_ids: list of string :param ack_ids: ack IDs of messages being acknowledged - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the topic's connection. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. """ - connection = _require_connection(connection) + client = self._require_client(client) data = {'ackIds': ack_ids} - connection.api_request(method='POST', - path='%s:acknowledge' % self.path, - data=data) + client.connection.api_request( + method='POST', path='%s:acknowledge' % (self.path,), data=data) - def modify_ack_deadline(self, ack_id, ack_deadline, connection=None): + def modify_ack_deadline(self, ack_id, ack_deadline, client=None): """API call: update acknowledgement deadline for a retrieved message. See: @@ -221,25 +239,25 @@ def modify_ack_deadline(self, ack_id, ack_deadline, connection=None): :type ack_deadline: int :param ack_deadline: new deadline for the message, in seconds - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the topic's connection. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. """ - connection = _require_connection(connection) + client = self._require_client(client) data = {'ackId': ack_id, 'ackDeadlineSeconds': ack_deadline} - connection.api_request(method='POST', - path='%s:modifyAckDeadline' % self.path, - data=data) + client.connection.api_request( + method='POST', path='%s:modifyAckDeadline' % (self.path,), + data=data) - def delete(self, connection=None): + def delete(self, client=None): """API call: delete the subscription via a DELETE request. See: https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/delete - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the topic's connection. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. """ - connection = _require_connection(connection) - connection.api_request(method='DELETE', path=self.path) + client = self._require_client(client) + client.connection.api_request(method='DELETE', path=self.path) diff --git a/gcloud/pubsub/test___init__.py b/gcloud/pubsub/test___init__.py deleted file mode 100644 index 19197c7105f3..000000000000 --- a/gcloud/pubsub/test___init__.py +++ /dev/null @@ -1,114 +0,0 @@ -# Copyright 2015 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest2 - - -class Test_set_default_connection(unittest2.TestCase): - - def setUp(self): - from gcloud.pubsub._testing import _setup_defaults - _setup_defaults(self) - - def tearDown(self): - from gcloud.pubsub._testing import _tear_down_defaults - _tear_down_defaults(self) - - def _callFUT(self, connection=None): - from gcloud.pubsub import set_default_connection - return set_default_connection(connection=connection) - - def test_set_explicit(self): - from gcloud.pubsub import _implicit_environ - - self.assertEqual(_implicit_environ.get_default_connection(), None) - fake_cnxn = object() - self._callFUT(connection=fake_cnxn) - self.assertEqual(_implicit_environ.get_default_connection(), fake_cnxn) - - def test_set_implicit(self): - from gcloud._testing import _Monkey - from gcloud import pubsub - from gcloud.pubsub import _implicit_environ - - self.assertEqual(_implicit_environ.get_default_connection(), None) - - fake_cnxn = object() - _called_args = [] - _called_kwargs = [] - - def mock_get_connection(*args, **kwargs): - _called_args.append(args) - _called_kwargs.append(kwargs) - return fake_cnxn - - with _Monkey(pubsub, get_connection=mock_get_connection): - self._callFUT() - - self.assertEqual(_implicit_environ.get_default_connection(), fake_cnxn) - self.assertEqual(_called_args, [()]) - self.assertEqual(_called_kwargs, [{}]) - - -class Test_set_defaults(unittest2.TestCase): - - def _callFUT(self, project=None, connection=None): - from gcloud.pubsub import set_defaults - return set_defaults(project=project, connection=connection) - - def test_it(self): - from gcloud._testing import _Monkey - from gcloud import pubsub - - PROJECT = object() - CONNECTION = object() - - SET_PROJECT_CALLED = [] - - def call_set_project(project=None): - SET_PROJECT_CALLED.append(project) - - SET_CONNECTION_CALLED = [] - - def call_set_connection(connection=None): - SET_CONNECTION_CALLED.append(connection) - - with _Monkey(pubsub, - set_default_connection=call_set_connection, - set_default_project=call_set_project): - self._callFUT(project=PROJECT, connection=CONNECTION) - - self.assertEqual(SET_PROJECT_CALLED, [PROJECT]) - self.assertEqual(SET_CONNECTION_CALLED, [CONNECTION]) - - -class Test_get_connection(unittest2.TestCase): - - def _callFUT(self, *args, **kw): - from gcloud.pubsub import get_connection - return get_connection(*args, **kw) - - def test_it(self): - from gcloud import credentials - from gcloud.pubsub import SCOPE - from gcloud.pubsub.connection import Connection - from gcloud.test_credentials import _Client - from gcloud._testing import _Monkey - client = _Client() - with _Monkey(credentials, client=client): - found = self._callFUT() - self.assertTrue(isinstance(found, Connection)) - self.assertTrue(found._credentials is client._signed) - self.assertEqual(found._credentials._scopes, SCOPE) - self.assertTrue(client._get_app_default_called) diff --git a/gcloud/pubsub/test__implicit_environ.py b/gcloud/pubsub/test__implicit_environ.py deleted file mode 100644 index 4e03cb205886..000000000000 --- a/gcloud/pubsub/test__implicit_environ.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest2 - - -class Test_get_default_connection(unittest2.TestCase): - - def _callFUT(self): - from gcloud.pubsub._implicit_environ import get_default_connection - return get_default_connection() - - def test_wo_override(self): - self.assertTrue(self._callFUT() is None) - - -class Test__require_connection(unittest2.TestCase): - - def _callFUT(self, connection=None): - from gcloud.pubsub._implicit_environ import _require_connection - return _require_connection(connection=connection) - - def _monkey(self, connection): - from gcloud.pubsub._testing import _monkey_defaults - return _monkey_defaults(connection=connection) - - def test_implicit_unset(self): - with self._monkey(None): - with self.assertRaises(EnvironmentError): - self._callFUT() - - def test_implicit_unset_passed_explicitly(self): - CONNECTION = object() - with self._monkey(None): - self.assertTrue(self._callFUT(CONNECTION) is CONNECTION) - - def test_implicit_set(self): - IMPLICIT_CONNECTION = object() - with self._monkey(IMPLICIT_CONNECTION): - self.assertTrue(self._callFUT() is IMPLICIT_CONNECTION) - - def test_implicit_set_passed_explicitly(self): - IMPLICIT_CONNECTION = object() - CONNECTION = object() - with self._monkey(IMPLICIT_CONNECTION): - self.assertTrue(self._callFUT(CONNECTION) is CONNECTION) diff --git a/gcloud/pubsub/test_api.py b/gcloud/pubsub/test_api.py deleted file mode 100644 index e48622b37529..000000000000 --- a/gcloud/pubsub/test_api.py +++ /dev/null @@ -1,202 +0,0 @@ -# Copyright 2015 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest2 - - -class Test_list_topics(unittest2.TestCase): - - def _callFUT(self, *args, **kw): - from gcloud.pubsub.api import list_topics - return list_topics(*args, **kw) - - def test_w_explicit_connection_no_paging(self): - from gcloud.pubsub.topic import Topic - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - returned = {'topics': [{'name': TOPIC_PATH}]} - conn = _Connection(returned) - topics, next_page_token = self._callFUT(project=PROJECT, - connection=conn) - self.assertEqual(len(topics), 1) - self.assertTrue(isinstance(topics[0], Topic)) - self.assertEqual(topics[0].name, TOPIC_NAME) - self.assertEqual(next_page_token, None) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) - self.assertEqual(req['query_params'], {}) - - def test_w_implicit_connection_and_project_wo_paging(self): - from gcloud._testing import _monkey_defaults as _monkey_base_defaults - from gcloud.pubsub._testing import _monkey_defaults - from gcloud.pubsub.topic import Topic - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - TOKEN = 'TOKEN' - returned = {'topics': [{'name': TOPIC_PATH}], - 'nextPageToken': TOKEN} - conn = _Connection(returned) - with _monkey_base_defaults(project=PROJECT): - with _monkey_defaults(connection=conn): - topics, next_page_token = self._callFUT() - self.assertEqual(len(topics), 1) - self.assertTrue(isinstance(topics[0], Topic)) - self.assertEqual(topics[0].name, TOPIC_NAME) - self.assertEqual(next_page_token, TOKEN) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) - self.assertEqual(req['query_params'], {}) - - def test_w_explicit_connection_and_project_w_paging(self): - from gcloud.pubsub.topic import Topic - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - TOKEN1 = 'TOKEN1' - TOKEN2 = 'TOKEN2' - SIZE = 1 - returned = {'topics': [{'name': TOPIC_PATH}], - 'nextPageToken': TOKEN2} - conn = _Connection(returned) - topics, next_page_token = self._callFUT(SIZE, TOKEN1, PROJECT, conn) - self.assertEqual(len(topics), 1) - self.assertTrue(isinstance(topics[0], Topic)) - self.assertEqual(topics[0].name, TOPIC_NAME) - self.assertEqual(next_page_token, TOKEN2) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) - self.assertEqual(req['query_params'], - {'pageSize': SIZE, 'pageToken': TOKEN1}) - - -class Test_list_subscriptions(unittest2.TestCase): - - def _callFUT(self, *args, **kw): - from gcloud.pubsub.api import list_subscriptions - return list_subscriptions(*args, **kw) - - def test_w_implicit_connection_wo_paging(self): - from gcloud._testing import _monkey_defaults as _monkey_base_defaults - from gcloud.pubsub._testing import _monkey_defaults - from gcloud.pubsub.subscription import Subscription - PROJECT = 'PROJECT' - SUB_NAME = 'subscription_name' - SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) - TOPIC_NAME = 'topic_name' - TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - SUB_INFO = [{'name': SUB_PATH, 'topic': TOPIC_PATH}] - returned = {'subscriptions': SUB_INFO} - conn = _Connection(returned) - with _monkey_base_defaults(project=PROJECT): - with _monkey_defaults(connection=conn): - subscriptions, next_page_token = self._callFUT() - self.assertEqual(len(subscriptions), 1) - self.assertTrue(isinstance(subscriptions[0], Subscription)) - self.assertEqual(subscriptions[0].name, SUB_NAME) - self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME) - self.assertEqual(next_page_token, None) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT) - self.assertEqual(req['query_params'], {}) - - def test_w_explicit_connection_and_project_w_paging(self): - from gcloud.pubsub.subscription import Subscription - PROJECT = 'PROJECT' - SUB_NAME = 'subscription_name' - SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) - TOPIC_NAME = 'topic_name' - TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - ACK_DEADLINE = 42 - PUSH_ENDPOINT = 'https://push.example.com/endpoint' - TOKEN1 = 'TOKEN1' - TOKEN2 = 'TOKEN2' - SIZE = 1 - SUB_INFO = [{'name': SUB_PATH, - 'topic': TOPIC_PATH, - 'ackDeadlineSeconds': ACK_DEADLINE, - 'pushConfig': {'pushEndpoint': PUSH_ENDPOINT}}] - returned = {'subscriptions': SUB_INFO, 'nextPageToken': TOKEN2} - conn = _Connection(returned) - subscriptions, next_page_token = self._callFUT(SIZE, TOKEN1, - project=PROJECT, - connection=conn) - self.assertEqual(len(subscriptions), 1) - self.assertTrue(isinstance(subscriptions[0], Subscription)) - self.assertEqual(subscriptions[0].name, SUB_NAME) - self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME) - self.assertEqual(subscriptions[0].ack_deadline, ACK_DEADLINE) - self.assertEqual(subscriptions[0].push_endpoint, PUSH_ENDPOINT) - self.assertEqual(next_page_token, TOKEN2) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT) - self.assertEqual(req['query_params'], - {'pageSize': SIZE, 'pageToken': TOKEN1}) - - def test_w_topic_name(self): - from gcloud.pubsub.subscription import Subscription - PROJECT = 'PROJECT' - SUB_NAME_1 = 'subscription_1' - SUB_PATH_1 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_1) - SUB_NAME_2 = 'subscription_2' - SUB_PATH_2 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_2) - TOPIC_NAME = 'topic_name' - TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - SUB_INFO = [{'name': SUB_PATH_1, 'topic': TOPIC_PATH}, - {'name': SUB_PATH_2, 'topic': TOPIC_PATH}] - TOKEN = 'TOKEN' - returned = {'subscriptions': SUB_INFO, 'nextPageToken': TOKEN} - conn = _Connection(returned) - subscriptions, next_page_token = self._callFUT(topic_name=TOPIC_NAME, - project=PROJECT, - connection=conn) - self.assertEqual(len(subscriptions), 2) - self.assertTrue(isinstance(subscriptions[0], Subscription)) - self.assertEqual(subscriptions[0].name, SUB_NAME_1) - self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME) - self.assertTrue(isinstance(subscriptions[1], Subscription)) - self.assertEqual(subscriptions[1].name, SUB_NAME_2) - self.assertEqual(subscriptions[1].topic.name, TOPIC_NAME) - self.assertTrue(subscriptions[1].topic is subscriptions[0].topic) - self.assertEqual(next_page_token, TOKEN) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] - self.assertEqual(req['method'], 'GET') - self.assertEqual(req['path'], - '/projects/%s/topics/%s/subscriptions' - % (PROJECT, TOPIC_NAME)) - self.assertEqual(req['query_params'], {}) - - -class _Connection(object): - - def __init__(self, *responses): - self._responses = responses - self._requested = [] - - def api_request(self, **kw): - self._requested.append(kw) - response, self._responses = self._responses[0], self._responses[1:] - return response diff --git a/gcloud/pubsub/test_client.py b/gcloud/pubsub/test_client.py new file mode 100644 index 000000000000..f0f1d30a81be --- /dev/null +++ b/gcloud/pubsub/test_client.py @@ -0,0 +1,342 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class TestClient(unittest2.TestCase): + + def _getTargetClass(self): + from gcloud.pubsub.client import Client + return Client + + def _makeOne(self, *args, **kw): + return self._getTargetClass()(*args, **kw) + + def test_ctor_defaults(self): + from gcloud._testing import _Monkey + from gcloud.pubsub import SCOPE + from gcloud.pubsub import client + from gcloud.pubsub.connection import Connection + + PROJECT = 'PROJECT' + CREDS = _Credentials() + FUNC_CALLS = [] + + def mock_get_proj(): + FUNC_CALLS.append('_get_production_project') + return PROJECT + + def mock_get_credentials(): + FUNC_CALLS.append('get_credentials') + return CREDS + + with _Monkey(client, get_credentials=mock_get_credentials, + _get_production_project=mock_get_proj): + client_obj = self._makeOne() + + self.assertEqual(client_obj.project, PROJECT) + self.assertTrue(isinstance(client_obj.connection, Connection)) + self.assertTrue(client_obj.connection._credentials is CREDS) + self.assertEqual(client_obj.connection._credentials._scopes, SCOPE) + self.assertEqual(FUNC_CALLS, + ['_get_production_project', 'get_credentials']) + + def test_ctor_missing_project(self): + from gcloud._testing import _Monkey + from gcloud.pubsub import client + + FUNC_CALLS = [] + + def mock_get_proj(): + FUNC_CALLS.append('_get_production_project') + return None + + with _Monkey(client, _get_production_project=mock_get_proj): + self.assertRaises(ValueError, self._makeOne) + + self.assertEqual(FUNC_CALLS, ['_get_production_project']) + + def test_ctor_explicit(self): + from gcloud.pubsub import SCOPE + from gcloud.pubsub.connection import Connection + + PROJECT = 'PROJECT' + CREDS = _Credentials() + + client_obj = self._makeOne(project=PROJECT, credentials=CREDS) + + self.assertEqual(client_obj.project, PROJECT) + self.assertTrue(isinstance(client_obj.connection, Connection)) + self.assertTrue(client_obj.connection._credentials is CREDS) + self.assertEqual(CREDS._scopes, SCOPE) + + def test_from_service_account_json(self): + from gcloud._testing import _Monkey + from gcloud.pubsub import client + from gcloud.pubsub.connection import Connection + + PROJECT = 'PROJECT' + KLASS = self._getTargetClass() + CREDS = _Credentials() + _CALLED = [] + + def mock_creds(arg1): + _CALLED.append((arg1,)) + return CREDS + + BOGUS_ARG = object() + with _Monkey(client, get_for_service_account_json=mock_creds): + client_obj = KLASS.from_service_account_json( + BOGUS_ARG, project=PROJECT) + + self.assertEqual(client_obj.project, PROJECT) + self.assertTrue(isinstance(client_obj.connection, Connection)) + self.assertTrue(client_obj.connection._credentials is CREDS) + self.assertEqual(_CALLED, [(BOGUS_ARG,)]) + + def test_from_service_account_p12(self): + from gcloud._testing import _Monkey + from gcloud.pubsub import client + from gcloud.pubsub.connection import Connection + + PROJECT = 'PROJECT' + KLASS = self._getTargetClass() + CREDS = _Credentials() + _CALLED = [] + + def mock_creds(arg1, arg2): + _CALLED.append((arg1, arg2)) + return CREDS + + BOGUS_ARG1 = object() + BOGUS_ARG2 = object() + with _Monkey(client, get_for_service_account_p12=mock_creds): + client_obj = KLASS.from_service_account_p12( + BOGUS_ARG1, BOGUS_ARG2, project=PROJECT) + + self.assertEqual(client_obj.project, PROJECT) + self.assertTrue(isinstance(client_obj.connection, Connection)) + self.assertTrue(client_obj.connection._credentials is CREDS) + self.assertEqual(_CALLED, [(BOGUS_ARG1, BOGUS_ARG2)]) + + def test_list_topics_no_paging(self): + from gcloud.pubsub.topic import Topic + PROJECT = 'PROJECT' + CREDS = _Credentials() + + CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + + RETURNED = {'topics': [{'name': TOPIC_PATH}]} + # Replace the connection on the client with one of our own. + CLIENT_OBJ.connection = _Connection(RETURNED) + + # Execute request. + topics, next_page_token = CLIENT_OBJ.list_topics() + # Test values are correct. + self.assertEqual(len(topics), 1) + self.assertTrue(isinstance(topics[0], Topic)) + self.assertEqual(topics[0].name, TOPIC_NAME) + self.assertEqual(next_page_token, None) + self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) + req = CLIENT_OBJ.connection._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) + self.assertEqual(req['query_params'], {}) + + def test_list_topics_with_paging(self): + from gcloud.pubsub.topic import Topic + PROJECT = 'PROJECT' + CREDS = _Credentials() + + CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + TOKEN1 = 'TOKEN1' + TOKEN2 = 'TOKEN2' + SIZE = 1 + RETURNED = {'topics': [{'name': TOPIC_PATH}], + 'nextPageToken': TOKEN2} + # Replace the connection on the client with one of our own. + CLIENT_OBJ.connection = _Connection(RETURNED) + + # Execute request. + topics, next_page_token = CLIENT_OBJ.list_topics(SIZE, TOKEN1) + # Test values are correct. + self.assertEqual(len(topics), 1) + self.assertTrue(isinstance(topics[0], Topic)) + self.assertEqual(topics[0].name, TOPIC_NAME) + self.assertEqual(next_page_token, TOKEN2) + self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) + req = CLIENT_OBJ.connection._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/projects/%s/topics' % PROJECT) + self.assertEqual(req['query_params'], + {'pageSize': SIZE, 'pageToken': TOKEN1}) + + def test_list_subscriptions_no_paging(self): + from gcloud.pubsub.subscription import Subscription + PROJECT = 'PROJECT' + CREDS = _Credentials() + + CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + + SUB_NAME = 'subscription_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_INFO = [{'name': SUB_PATH, 'topic': TOPIC_PATH}] + RETURNED = {'subscriptions': SUB_INFO} + # Replace the connection on the client with one of our own. + CLIENT_OBJ.connection = _Connection(RETURNED) + + # Execute request. + subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions() + # Test values are correct. + self.assertEqual(len(subscriptions), 1) + self.assertTrue(isinstance(subscriptions[0], Subscription)) + self.assertEqual(subscriptions[0].name, SUB_NAME) + self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME) + self.assertEqual(next_page_token, None) + self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) + req = CLIENT_OBJ.connection._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT) + self.assertEqual(req['query_params'], {}) + + def test_list_subscriptions_with_paging(self): + from gcloud.pubsub.subscription import Subscription + PROJECT = 'PROJECT' + CREDS = _Credentials() + + CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + + SUB_NAME = 'subscription_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + ACK_DEADLINE = 42 + PUSH_ENDPOINT = 'https://push.example.com/endpoint' + TOKEN1 = 'TOKEN1' + TOKEN2 = 'TOKEN2' + SIZE = 1 + SUB_INFO = [{'name': SUB_PATH, + 'topic': TOPIC_PATH, + 'ackDeadlineSeconds': ACK_DEADLINE, + 'pushConfig': {'pushEndpoint': PUSH_ENDPOINT}}] + RETURNED = {'subscriptions': SUB_INFO, 'nextPageToken': TOKEN2} + # Replace the connection on the client with one of our own. + CLIENT_OBJ.connection = _Connection(RETURNED) + + # Execute request. + subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions( + SIZE, TOKEN1) + # Test values are correct. + self.assertEqual(len(subscriptions), 1) + self.assertTrue(isinstance(subscriptions[0], Subscription)) + self.assertEqual(subscriptions[0].name, SUB_NAME) + self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME) + self.assertEqual(subscriptions[0].ack_deadline, ACK_DEADLINE) + self.assertEqual(subscriptions[0].push_endpoint, PUSH_ENDPOINT) + self.assertEqual(next_page_token, TOKEN2) + self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) + req = CLIENT_OBJ.connection._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], '/projects/%s/subscriptions' % PROJECT) + self.assertEqual(req['query_params'], + {'pageSize': SIZE, 'pageToken': TOKEN1}) + + def test_list_subscriptions_with_topic_name(self): + from gcloud.pubsub.subscription import Subscription + PROJECT = 'PROJECT' + CREDS = _Credentials() + + CLIENT_OBJ = self._makeOne(project=PROJECT, credentials=CREDS) + + SUB_NAME_1 = 'subscription_1' + SUB_PATH_1 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_1) + SUB_NAME_2 = 'subscription_2' + SUB_PATH_2 = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME_2) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SUB_INFO = [{'name': SUB_PATH_1, 'topic': TOPIC_PATH}, + {'name': SUB_PATH_2, 'topic': TOPIC_PATH}] + TOKEN = 'TOKEN' + RETURNED = {'subscriptions': SUB_INFO, 'nextPageToken': TOKEN} + # Replace the connection on the client with one of our own. + CLIENT_OBJ.connection = _Connection(RETURNED) + + # Execute request. + subscriptions, next_page_token = CLIENT_OBJ.list_subscriptions( + topic_name=TOPIC_NAME) + # Test values are correct. + self.assertEqual(len(subscriptions), 2) + self.assertTrue(isinstance(subscriptions[0], Subscription)) + self.assertEqual(subscriptions[0].name, SUB_NAME_1) + self.assertEqual(subscriptions[0].topic.name, TOPIC_NAME) + self.assertTrue(isinstance(subscriptions[1], Subscription)) + self.assertEqual(subscriptions[1].name, SUB_NAME_2) + self.assertEqual(subscriptions[1].topic.name, TOPIC_NAME) + self.assertTrue(subscriptions[1].topic is subscriptions[0].topic) + self.assertEqual(next_page_token, TOKEN) + self.assertEqual(len(CLIENT_OBJ.connection._requested), 1) + req = CLIENT_OBJ.connection._requested[0] + self.assertEqual(req['method'], 'GET') + self.assertEqual(req['path'], + '/projects/%s/topics/%s/subscriptions' + % (PROJECT, TOPIC_NAME)) + self.assertEqual(req['query_params'], {}) + + def test_topic(self): + PROJECT = 'PROJECT' + TOPIC_NAME = 'TOPIC_NAME' + CREDS = _Credentials() + + client_obj = self._makeOne(project=PROJECT, credentials=CREDS) + new_topic = client_obj.topic(TOPIC_NAME) + self.assertEqual(new_topic.name, TOPIC_NAME) + self.assertTrue(new_topic._client is client_obj) + self.assertEqual(new_topic.project, PROJECT) + self.assertEqual(new_topic.full_name, + 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) + self.assertFalse(new_topic.timestamp_messages) + + +class _Credentials(object): + + _scopes = None + + @staticmethod + def create_scoped_required(): + return True + + def create_scoped(self, scope): + self._scopes = scope + return self + + +class _Connection(object): + + def __init__(self, *responses): + self._responses = responses + self._requested = [] + + def api_request(self, **kw): + self._requested.append(kw) + response, self._responses = self._responses[0], self._responses[1:] + return response diff --git a/gcloud/pubsub/test_subscription.py b/gcloud/pubsub/test_subscription.py index 45d2a97ead34..db966ac3c12a 100644 --- a/gcloud/pubsub/test_subscription.py +++ b/gcloud/pubsub/test_subscription.py @@ -58,7 +58,8 @@ def test_from_api_repr_no_topics(self): 'ackDeadlineSeconds': DEADLINE, 'pushConfig': {'pushEndpoint': ENDPOINT}} klass = self._getTargetClass() - subscription = klass.from_api_repr(resource) + client = _Client(project=PROJECT) + subscription = klass.from_api_repr(resource, client) self.assertEqual(subscription.name, SUB_NAME) topic = subscription.topic self.assertTrue(isinstance(topic, Topic)) @@ -82,7 +83,8 @@ def test_from_api_repr_w_topics_no_topic_match(self): 'pushConfig': {'pushEndpoint': ENDPOINT}} topics = {} klass = self._getTargetClass() - subscription = klass.from_api_repr(resource, topics=topics) + client = _Client(project=PROJECT) + subscription = klass.from_api_repr(resource, client, topics=topics) self.assertEqual(subscription.name, SUB_NAME) topic = subscription.topic self.assertTrue(isinstance(topic, Topic)) @@ -107,14 +109,14 @@ def test_from_api_repr_w_topics_w_topic_match(self): topic = object() topics = {TOPIC_PATH: topic} klass = self._getTargetClass() - subscription = klass.from_api_repr(resource, topics=topics) + client = _Client(project=PROJECT) + subscription = klass.from_api_repr(resource, client, topics=topics) self.assertEqual(subscription.name, SUB_NAME) self.assertTrue(subscription.topic is topic) self.assertEqual(subscription.ack_deadline, DEADLINE) self.assertEqual(subscription.push_endpoint, ENDPOINT) - def test_create_pull_wo_ack_deadline_w_implicit_connection(self): - from gcloud.pubsub._testing import _monkey_defaults + def test_create_pull_wo_ack_deadline_w_bound_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -122,17 +124,17 @@ def test_create_pull_wo_ack_deadline_w_implicit_connection(self): TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) BODY = {'topic': TOPIC_PATH} conn = _Connection({'name': SUB_PATH}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) subscription = self._makeOne(SUB_NAME, topic) - with _monkey_defaults(connection=conn): - subscription.create() + subscription.create() self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'PUT') self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req['data'], BODY) - def test_create_push_w_ack_deadline_w_explicit_connection(self): + def test_create_push_w_ack_deadline_w_alternate_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -143,51 +145,57 @@ def test_create_push_w_ack_deadline_w_explicit_connection(self): BODY = {'topic': TOPIC_PATH, 'ackDeadline': DEADLINE, 'pushConfig': {'pushEndpoint': ENDPOINT}} - conn = _Connection({'name': SUB_PATH}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + conn1 = _Connection({'name': SUB_PATH}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'name': SUB_PATH}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) subscription = self._makeOne(SUB_NAME, topic, DEADLINE, ENDPOINT) - subscription.create(connection=conn) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + subscription.create(client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'PUT') self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req['data'], BODY) - def test_exists_miss_w_implicit_connection(self): - from gcloud.pubsub._testing import _monkey_defaults + def test_exists_miss_w_bound_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' conn = _Connection() - topic = _Topic(TOPIC_NAME, project=PROJECT) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) subscription = self._makeOne(SUB_NAME, topic) - with _monkey_defaults(connection=conn): - self.assertFalse(subscription.exists()) + self.assertFalse(subscription.exists()) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req.get('query_params'), None) - def test_exists_hit_w_explicit_connection(self): + def test_exists_hit_w_alternate_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({'name': SUB_PATH, 'topic': TOPIC_PATH}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + conn1 = _Connection({'name': SUB_PATH, 'topic': TOPIC_PATH}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'name': SUB_PATH, 'topic': TOPIC_PATH}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) subscription = self._makeOne(SUB_NAME, topic) - self.assertTrue(subscription.exists(connection=conn)) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + self.assertTrue(subscription.exists(client=CLIENT2)) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % SUB_PATH) self.assertEqual(req.get('query_params'), None) - def test_reload_w_implicit_connection(self): - from gcloud.pubsub._testing import _monkey_defaults + def test_reload_w_bound_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -199,10 +207,10 @@ def test_reload_w_implicit_connection(self): 'topic': TOPIC_PATH, 'ackDeadline': DEADLINE, 'pushConfig': {'pushEndpoint': ENDPOINT}}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) subscription = self._makeOne(SUB_NAME, topic) - with _monkey_defaults(connection=conn): - subscription.reload() + subscription.reload() self.assertEqual(subscription.ack_deadline, DEADLINE) self.assertEqual(subscription.push_endpoint, ENDPOINT) self.assertEqual(len(conn._requested), 1) @@ -210,7 +218,7 @@ def test_reload_w_implicit_connection(self): self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % SUB_PATH) - def test_reload_w_explicit_connection(self): + def test_reload_w_alternate_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -218,32 +226,35 @@ def test_reload_w_explicit_connection(self): TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) DEADLINE = 42 ENDPOINT = 'https://api.example.com/push' - conn = _Connection({'name': SUB_PATH, - 'topic': TOPIC_PATH, - 'ackDeadline': DEADLINE, - 'pushConfig': {'pushEndpoint': ENDPOINT}}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + conn1 = _Connection() + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'name': SUB_PATH, + 'topic': TOPIC_PATH, + 'ackDeadline': DEADLINE, + 'pushConfig': {'pushEndpoint': ENDPOINT}}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) subscription = self._makeOne(SUB_NAME, topic) - subscription.reload(connection=conn) + subscription.reload(client=CLIENT2) self.assertEqual(subscription.ack_deadline, DEADLINE) self.assertEqual(subscription.push_endpoint, ENDPOINT) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % SUB_PATH) - def test_modify_push_config_w_endpoint_w_implicit_connection(self): - from gcloud.pubsub._testing import _monkey_defaults + def test_modify_push_config_w_endpoint_w_bound_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' ENDPOINT = 'https://api.example.com/push' conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) subscription = self._makeOne(SUB_NAME, topic) - with _monkey_defaults(connection=conn): - subscription.modify_push_configuration(push_endpoint=ENDPOINT) + subscription.modify_push_configuration(push_endpoint=ENDPOINT) self.assertEqual(subscription.push_endpoint, ENDPOINT) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -252,28 +263,31 @@ def test_modify_push_config_w_endpoint_w_implicit_connection(self): self.assertEqual(req['data'], {'pushConfig': {'pushEndpoint': ENDPOINT}}) - def test_modify_push_config_wo_endpoint_w_explicit_connection(self): + def test_modify_push_config_wo_endpoint_w_alternate_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' ENDPOINT = 'https://api.example.com/push' - conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + conn1 = _Connection({}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) subscription = self._makeOne(SUB_NAME, topic, push_endpoint=ENDPOINT) subscription.modify_push_configuration(push_endpoint=None, - connection=conn) + client=CLIENT2) self.assertEqual(subscription.push_endpoint, None) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'POST') self.assertEqual(req['path'], '/%s:modifyPushConfig' % SUB_PATH) self.assertEqual(req['data'], {'pushConfig': {}}) - def test_pull_wo_return_immediately_max_messages_w_implicit_conn(self): + def test_pull_wo_return_immediately_max_messages_w_bound_client(self): import base64 from gcloud.pubsub.message import Message - from gcloud.pubsub._testing import _monkey_defaults PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -285,10 +299,10 @@ def test_pull_wo_return_immediately_max_messages_w_implicit_conn(self): MESSAGE = {'messageId': MSG_ID, 'data': B64} REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} conn = _Connection({'receivedMessages': [REC_MESSAGE]}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) subscription = self._makeOne(SUB_NAME, topic) - with _monkey_defaults(connection=conn): - pulled = subscription.pull() + pulled = subscription.pull() self.assertEqual(len(pulled), 1) ack_id, message = pulled[0] self.assertEqual(ack_id, ACK_ID) @@ -303,7 +317,7 @@ def test_pull_wo_return_immediately_max_messages_w_implicit_conn(self): self.assertEqual(req['data'], {'returnImmediately': False, 'maxMessages': 1}) - def test_pull_w_return_immediately_w_max_messages_w_explicit_conn(self): + def test_pull_w_return_immediately_w_max_messages_w_alt_client(self): import base64 from gcloud.pubsub.message import Message PROJECT = 'PROJECT' @@ -316,11 +330,14 @@ def test_pull_w_return_immediately_w_max_messages_w_explicit_conn(self): B64 = base64.b64encode(PAYLOAD) MESSAGE = {'messageId': MSG_ID, 'data': B64, 'attributes': {'a': 'b'}} REC_MESSAGE = {'ackId': ACK_ID, 'message': MESSAGE} - conn = _Connection({'receivedMessages': [REC_MESSAGE]}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + conn1 = _Connection() + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'receivedMessages': [REC_MESSAGE]}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) subscription = self._makeOne(SUB_NAME, topic) pulled = subscription.pull(return_immediately=True, max_messages=3, - connection=conn) + client=CLIENT2) self.assertEqual(len(pulled), 1) ack_id, message = pulled[0] self.assertEqual(ack_id, ACK_ID) @@ -328,8 +345,9 @@ def test_pull_w_return_immediately_w_max_messages_w_explicit_conn(self): self.assertEqual(message.data, PAYLOAD) self.assertEqual(message.message_id, MSG_ID) self.assertEqual(message.attributes, {'a': 'b'}) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'POST') self.assertEqual(req['path'], '/%s:pull' % SUB_PATH) self.assertEqual(req['data'], @@ -341,9 +359,10 @@ def test_pull_wo_receivedMessages(self): SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) subscription = self._makeOne(SUB_NAME, topic) - pulled = subscription.pull(return_immediately=False, connection=conn) + pulled = subscription.pull(return_immediately=False) self.assertEqual(len(pulled), 0) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -352,8 +371,7 @@ def test_pull_wo_receivedMessages(self): self.assertEqual(req['data'], {'returnImmediately': False, 'maxMessages': 1}) - def test_acknowledge_w_implicit_connection(self): - from gcloud.pubsub._testing import _monkey_defaults + def test_acknowledge_w_bound_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -361,35 +379,38 @@ def test_acknowledge_w_implicit_connection(self): ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) subscription = self._makeOne(SUB_NAME, topic) - with _monkey_defaults(connection=conn): - subscription.acknowledge([ACK_ID1, ACK_ID2]) + subscription.acknowledge([ACK_ID1, ACK_ID2]) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') self.assertEqual(req['path'], '/%s:acknowledge' % SUB_PATH) self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]}) - def test_acknowledge_w_explicit_connection(self): + def test_acknowledge_w_alternate_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' ACK_ID1 = 'DEADBEEF' ACK_ID2 = 'BEADCAFE' - conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + conn1 = _Connection({}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) subscription = self._makeOne(SUB_NAME, topic) - subscription.acknowledge([ACK_ID1, ACK_ID2], connection=conn) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + subscription.acknowledge([ACK_ID1, ACK_ID2], client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'POST') self.assertEqual(req['path'], '/%s:acknowledge' % SUB_PATH) self.assertEqual(req['data'], {'ackIds': [ACK_ID1, ACK_ID2]}) - def test_modify_ack_deadline_w_implicit_connection(self): - from gcloud.pubsub._testing import _monkey_defaults + def test_modify_ack_deadline_w_bound_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -397,10 +418,10 @@ def test_modify_ack_deadline_w_implicit_connection(self): ACK_ID = 'DEADBEEF' DEADLINE = 42 conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) subscription = self._makeOne(SUB_NAME, topic) - with _monkey_defaults(connection=conn): - subscription.modify_ack_deadline(ACK_ID, DEADLINE) + subscription.modify_ack_deadline(ACK_ID, DEADLINE) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'POST') @@ -408,51 +429,58 @@ def test_modify_ack_deadline_w_implicit_connection(self): self.assertEqual(req['data'], {'ackId': ACK_ID, 'ackDeadlineSeconds': DEADLINE}) - def test_modify_ack_deadline_w_explicit_connection(self): + def test_modify_ack_deadline_w_alternate_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' ACK_ID = 'DEADBEEF' DEADLINE = 42 - conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + conn1 = _Connection({}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) subscription = self._makeOne(SUB_NAME, topic) - subscription.modify_ack_deadline(ACK_ID, DEADLINE, connection=conn) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + subscription.modify_ack_deadline(ACK_ID, DEADLINE, client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'POST') self.assertEqual(req['path'], '/%s:modifyAckDeadline' % SUB_PATH) self.assertEqual(req['data'], {'ackId': ACK_ID, 'ackDeadlineSeconds': DEADLINE}) - def test_delete_w_implicit_connection(self): - from gcloud.pubsub._testing import _monkey_defaults + def test_delete_w_bound_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = _Topic(TOPIC_NAME, client=CLIENT) subscription = self._makeOne(SUB_NAME, topic) - with _monkey_defaults(connection=conn): - subscription.delete() + subscription.delete() self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'DELETE') self.assertEqual(req['path'], '/%s' % SUB_PATH) - def test_delete_w_explicit_connection(self): + def test_delete_w_alternate_client(self): PROJECT = 'PROJECT' SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) TOPIC_NAME = 'topic_name' - conn = _Connection({}) - topic = _Topic(TOPIC_NAME, project=PROJECT) + conn1 = _Connection({}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = _Topic(TOPIC_NAME, client=CLIENT1) subscription = self._makeOne(SUB_NAME, topic) - subscription.delete(connection=conn) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + subscription.delete(client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'DELETE') self.assertEqual(req['path'], '/%s' % SUB_PATH) @@ -477,8 +505,16 @@ def api_request(self, **kw): class _Topic(object): - def __init__(self, name, project): + def __init__(self, name, client): self.name = name + self._client = client + self.project = client.project + self.full_name = 'projects/%s/topics/%s' % (client.project, name) + self.path = '/projects/%s/topics/%s' % (client.project, name) + + +class _Client(object): + + def __init__(self, project, connection=None): self.project = project - self.full_name = 'projects/%s/topics/%s' % (project, name) - self.path = '/projects/%s/topics/%s' % (project, name) + self.connection = connection diff --git a/gcloud/pubsub/test_topic.py b/gcloud/pubsub/test_topic.py index af90c2705c10..4d4942db6a23 100644 --- a/gcloud/pubsub/test_topic.py +++ b/gcloud/pubsub/test_topic.py @@ -24,23 +24,12 @@ def _getTargetClass(self): def _makeOne(self, *args, **kw): return self._getTargetClass()(*args, **kw) - def test_ctor_wo_inferred_project(self): - from gcloud._testing import _monkey_defaults - TOPIC_NAME = 'topic_name' - PROJECT = 'PROJECT' - with _monkey_defaults(project=PROJECT): - topic = self._makeOne(TOPIC_NAME) - self.assertEqual(topic.name, TOPIC_NAME) - self.assertEqual(topic.project, PROJECT) - self.assertEqual(topic.full_name, - 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) - self.assertFalse(topic.timestamp_messages) - - def test_ctor_w_explicit_project_and_timestamp(self): + def test_ctor_w_explicit_timestamp(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' + CLIENT = _Client(project=PROJECT) topic = self._makeOne(TOPIC_NAME, - project=PROJECT, + client=CLIENT, timestamp_messages=True) self.assertEqual(topic.name, TOPIC_NAME) self.assertEqual(topic.project, PROJECT) @@ -49,73 +38,88 @@ def test_ctor_w_explicit_project_and_timestamp(self): self.assertTrue(topic.timestamp_messages) def test_from_api_repr(self): - from gcloud.pubsub._testing import _monkey_defaults TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' + CLIENT = _Client(project=PROJECT) PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) resource = {'name': PATH} klass = self._getTargetClass() - conn = _Connection() - with _monkey_defaults(connection=conn): - topic = klass.from_api_repr(resource) + topic = klass.from_api_repr(resource, client=CLIENT) self.assertEqual(topic.name, TOPIC_NAME) + self.assertTrue(topic._client is CLIENT) self.assertEqual(topic.project, PROJECT) self.assertEqual(topic.full_name, PATH) - def test_create_w_implicit_connection(self): - from gcloud.pubsub._testing import _monkey_defaults + def test_from_api_repr_with_bad_client(self): + TOPIC_NAME = 'topic_name' + PROJECT1 = 'PROJECT1' + PROJECT2 = 'PROJECT2' + CLIENT = _Client(project=PROJECT1) + PATH = 'projects/%s/topics/%s' % (PROJECT2, TOPIC_NAME) + resource = {'name': PATH} + klass = self._getTargetClass() + self.assertRaises(ValueError, klass.from_api_repr, + resource, client=CLIENT) + + def test_create_w_bound_client(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'name': PATH}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT) - with _monkey_defaults(connection=conn): - topic.create() + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + topic.create() self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'PUT') self.assertEqual(req['path'], '/%s' % PATH) - def test_create_w_explicit_connection(self): + def test_create_w_alternate_client(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({'name': PATH}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT) - topic.create(connection=conn) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + conn1 = _Connection({'name': PATH}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'name': PATH}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = self._makeOne(TOPIC_NAME, client=CLIENT1) + topic.create(client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'PUT') self.assertEqual(req['path'], '/%s' % PATH) - def test_exists_miss_w_implicit_connection(self): - from gcloud.pubsub._testing import _monkey_defaults + def test_exists_miss_w_bound_client(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection() - topic = self._makeOne(TOPIC_NAME, project=PROJECT) - with _monkey_defaults(connection=conn): - self.assertFalse(topic.exists()) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + self.assertFalse(topic.exists()) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % PATH) - def test_exists_hit_w_explicit_connection(self): + def test_exists_hit_w_alternate_client(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({'name': PATH}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT) - self.assertTrue(topic.exists(connection=conn)) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + conn1 = _Connection({'name': PATH}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'name': PATH}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = self._makeOne(TOPIC_NAME, client=CLIENT1) + self.assertTrue(topic.exists(client=CLIENT2)) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'GET') self.assertEqual(req['path'], '/%s' % PATH) - def test_publish_single_bytes_wo_attrs_w_implicit_connection(self): - from gcloud.pubsub._testing import _monkey_defaults + def test_publish_single_bytes_wo_attrs_w_bound_client(self): import base64 TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' @@ -126,9 +130,9 @@ def test_publish_single_bytes_wo_attrs_w_implicit_connection(self): 'attributes': {}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'messageIds': [MSGID]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT) - with _monkey_defaults(connection=conn): - msgid = topic.publish(PAYLOAD) + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + msgid = topic.publish(PAYLOAD) self.assertEqual(msgid, MSGID) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -136,13 +140,12 @@ def test_publish_single_bytes_wo_attrs_w_implicit_connection(self): self.assertEqual(req['path'], '/%s:publish' % PATH) self.assertEqual(req['data'], {'messages': [MESSAGE]}) - def test_publish_single_bytes_wo_attrs_w_add_timestamp_explicit_conn(self): + def test_publish_single_bytes_wo_attrs_w_add_timestamp_alt_client(self): import base64 import datetime from gcloud.pubsub import topic as MUT from gcloud._helpers import _RFC3339_MICROS from gcloud._testing import _Monkey - from gcloud.pubsub._testing import _monkey_defaults NOW = datetime.datetime.utcnow() def _utcnow(): @@ -156,22 +159,26 @@ def _utcnow(): MESSAGE = {'data': B64, 'attributes': {'timestamp': NOW.strftime(_RFC3339_MICROS)}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({'messageIds': [MSGID]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT, + conn1 = _Connection({'messageIds': [MSGID]}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'messageIds': [MSGID]}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + + topic = self._makeOne(TOPIC_NAME, client=CLIENT1, timestamp_messages=True) with _Monkey(MUT, _NOW=_utcnow): - with _monkey_defaults(connection=conn): - msgid = topic.publish(PAYLOAD, connection=conn) + msgid = topic.publish(PAYLOAD, client=CLIENT2) + self.assertEqual(msgid, MSGID) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'POST') self.assertEqual(req['path'], '/%s:publish' % PATH) self.assertEqual(req['data'], {'messages': [MESSAGE]}) def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self): import base64 - from gcloud.pubsub._testing import _monkey_defaults TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PAYLOAD = b'This is the message text' @@ -182,10 +189,10 @@ def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self): 'attributes': {'timestamp': OVERRIDE}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'messageIds': [MSGID]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT, + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT, timestamp_messages=True) - with _monkey_defaults(connection=conn): - msgid = topic.publish(PAYLOAD, timestamp=OVERRIDE) + msgid = topic.publish(PAYLOAD, timestamp=OVERRIDE) self.assertEqual(msgid, MSGID) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -195,7 +202,6 @@ def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self): def test_publish_single_w_attrs(self): import base64 - from gcloud.pubsub._testing import _monkey_defaults TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PAYLOAD = b'This is the message text' @@ -205,9 +211,9 @@ def test_publish_single_w_attrs(self): 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'messageIds': [MSGID]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT) - with _monkey_defaults(connection=conn): - msgid = topic.publish(PAYLOAD, attr1='value1', attr2='value2') + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + msgid = topic.publish(PAYLOAD, attr1='value1', attr2='value2') self.assertEqual(msgid, MSGID) self.assertEqual(len(conn._requested), 1) req = conn._requested[0] @@ -215,9 +221,8 @@ def test_publish_single_w_attrs(self): self.assertEqual(req['path'], '/%s:publish' % PATH) self.assertEqual(req['data'], {'messages': [MESSAGE]}) - def test_publish_multiple_w_implicit_connection(self): + def test_publish_multiple_w_bound_client(self): import base64 - from gcloud.pubsub._testing import _monkey_defaults TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PAYLOAD1 = b'This is the first message text' @@ -232,11 +237,11 @@ def test_publish_multiple_w_implicit_connection(self): 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({'messageIds': [MSGID1, MSGID2]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT) - with _monkey_defaults(connection=conn): - with topic.batch() as batch: - batch.publish(PAYLOAD1) - batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + with topic.batch() as batch: + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) self.assertEqual(len(conn._requested), 1) @@ -245,7 +250,7 @@ def test_publish_multiple_w_implicit_connection(self): self.assertEqual(req['path'], '/%s:publish' % PATH) self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) - def test_publish_multiple_w_explicit_connection(self): + def test_publish_multiple_w_alternate_client(self): import base64 TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' @@ -260,15 +265,19 @@ def test_publish_multiple_w_explicit_connection(self): MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({'messageIds': [MSGID1, MSGID2]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT) - with topic.batch(connection=conn) as batch: + conn1 = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = self._makeOne(TOPIC_NAME, client=CLIENT1) + with topic.batch(client=CLIENT2) as batch: batch.publish(PAYLOAD1) batch.publish(PAYLOAD2, attr1='value1', attr2='value2') self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'POST') self.assertEqual(req['path'], '/%s:publish' % PATH) self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) @@ -281,41 +290,44 @@ def test_publish_multiple_error(self): MSGID1 = 'DEADBEEF' MSGID2 = 'BEADCAFE' conn = _Connection({'messageIds': [MSGID1, MSGID2]}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT) + CLIENT = _Client(project=PROJECT) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) try: with topic.batch() as batch: - batch.publish(PAYLOAD1, connection=conn) - batch.publish(PAYLOAD2, attr1='value1', attr2='value2', - connection=conn) + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') raise _Bugout() except _Bugout: pass self.assertEqual(list(batch), []) self.assertEqual(len(conn._requested), 0) - def test_delete_w_implicit_connection(self): - from gcloud.pubsub._testing import _monkey_defaults + def test_delete_w_bound_client(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) conn = _Connection({}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT) - with _monkey_defaults(connection=conn): - topic.delete() + CLIENT = _Client(project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, client=CLIENT) + topic.delete() self.assertEqual(len(conn._requested), 1) req = conn._requested[0] self.assertEqual(req['method'], 'DELETE') self.assertEqual(req['path'], '/%s' % PATH) - def test_delete_w_explicit_connection(self): + def test_delete_w_alternate_client(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) - conn = _Connection({}) - topic = self._makeOne(TOPIC_NAME, project=PROJECT) - topic.delete(connection=conn) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + conn1 = _Connection({}) + CLIENT1 = _Client(project=PROJECT, connection=conn1) + conn2 = _Connection({}) + CLIENT2 = _Client(project=PROJECT, connection=conn2) + topic = self._makeOne(TOPIC_NAME, client=CLIENT1) + topic.delete(client=CLIENT2) + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'DELETE') self.assertEqual(req['path'], '/%s' % PATH) @@ -326,60 +338,61 @@ def _getTargetClass(self): from gcloud.pubsub.topic import Batch return Batch - def _makeOne(self, topic): - return self._getTargetClass()(topic) + def _makeOne(self, *args, **kwargs): + return self._getTargetClass()(*args, **kwargs) def test_ctor_defaults(self): topic = _Topic() - batch = self._makeOne(topic) + CLIENT = _Client(project='PROJECT') + batch = self._makeOne(topic, CLIENT) self.assertTrue(batch.topic is topic) + self.assertTrue(batch.client is CLIENT) self.assertEqual(len(batch.messages), 0) self.assertEqual(len(batch.message_ids), 0) def test___iter___empty(self): topic = _Topic() - batch = self._makeOne(topic) + client = object() + batch = self._makeOne(topic, client) self.assertEqual(list(batch), []) def test___iter___non_empty(self): topic = _Topic() - batch = self._makeOne(topic) + client = object() + batch = self._makeOne(topic, client) batch.message_ids[:] = ['ONE', 'TWO', 'THREE'] self.assertEqual(list(batch), ['ONE', 'TWO', 'THREE']) def test_publish_bytes_wo_attrs(self): import base64 - from gcloud.pubsub._testing import _monkey_defaults PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MESSAGE = {'data': B64, 'attributes': {}} connection = _Connection() + CLIENT = _Client(project='PROJECT', connection=connection) topic = _Topic() - batch = self._makeOne(topic) - with _monkey_defaults(connection=connection): - batch.publish(PAYLOAD) + batch = self._makeOne(topic, client=CLIENT) + batch.publish(PAYLOAD) self.assertEqual(len(connection._requested), 0) self.assertEqual(batch.messages, [MESSAGE]) def test_publish_bytes_w_add_timestamp(self): import base64 - from gcloud.pubsub._testing import _monkey_defaults PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD).decode('ascii') MESSAGE = {'data': B64, 'attributes': {'timestamp': 'TIMESTAMP'}} connection = _Connection() + CLIENT = _Client(project='PROJECT', connection=connection) topic = _Topic(timestamp_messages=True) - with _monkey_defaults(connection=connection): - batch = self._makeOne(topic) + batch = self._makeOne(topic, client=CLIENT) batch.publish(PAYLOAD) self.assertEqual(len(connection._requested), 0) self.assertEqual(batch.messages, [MESSAGE]) - def test_commit_w_implicit_connection(self): + def test_commit_w_bound_client(self): import base64 - from gcloud.pubsub._testing import _monkey_defaults PAYLOAD1 = b'This is the first message text' PAYLOAD2 = b'This is the second message text' B64_1 = base64.b64encode(PAYLOAD1) @@ -391,12 +404,12 @@ def test_commit_w_implicit_connection(self): MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT = _Client(project='PROJECT', connection=conn) topic = _Topic() - batch = self._makeOne(topic) - with _monkey_defaults(connection=conn): - batch.publish(PAYLOAD1) - batch.publish(PAYLOAD2, attr1='value1', attr2='value2') - batch.commit() + batch = self._makeOne(topic, client=CLIENT) + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + batch.commit() self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) self.assertEqual(len(conn._requested), 1) @@ -405,7 +418,7 @@ def test_commit_w_implicit_connection(self): self.assertEqual(req['path'], '%s:publish' % topic.path) self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) - def test_commit_w_explicit_connection(self): + def test_commit_w_alternate_client(self): import base64 PAYLOAD1 = b'This is the first message text' PAYLOAD2 = b'This is the second message text' @@ -417,23 +430,26 @@ def test_commit_w_explicit_connection(self): 'attributes': {}} MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} - conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + conn1 = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT1 = _Client(project='PROJECT', connection=conn1) + conn2 = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT2 = _Client(project='PROJECT', connection=conn2) topic = _Topic() - batch = self._makeOne(topic) + batch = self._makeOne(topic, client=CLIENT1) batch.publish(PAYLOAD1) batch.publish(PAYLOAD2, attr1='value1', attr2='value2') - batch.commit(connection=conn) + batch.commit(client=CLIENT2) self.assertEqual(list(batch), [MSGID1, MSGID2]) self.assertEqual(list(batch.messages), []) - self.assertEqual(len(conn._requested), 1) - req = conn._requested[0] + self.assertEqual(len(conn1._requested), 0) + self.assertEqual(len(conn2._requested), 1) + req = conn2._requested[0] self.assertEqual(req['method'], 'POST') self.assertEqual(req['path'], '%s:publish' % topic.path) self.assertEqual(req['data'], {'messages': [MESSAGE1, MESSAGE2]}) def test_context_mgr_success(self): import base64 - from gcloud.pubsub._testing import _monkey_defaults PAYLOAD1 = b'This is the first message text' PAYLOAD2 = b'This is the second message text' B64_1 = base64.b64encode(PAYLOAD1) @@ -445,13 +461,13 @@ def test_context_mgr_success(self): MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT = _Client(project='PROJECT', connection=conn) topic = _Topic() - batch = self._makeOne(topic) + batch = self._makeOne(topic, client=CLIENT) - with _monkey_defaults(connection=conn): - with batch as other: - batch.publish(PAYLOAD1) - batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + with batch as other: + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') self.assertTrue(other is batch) self.assertEqual(list(batch), [MSGID1, MSGID2]) @@ -464,7 +480,6 @@ def test_context_mgr_success(self): def test_context_mgr_failure(self): import base64 - from gcloud.pubsub._testing import _monkey_defaults PAYLOAD1 = b'This is the first message text' PAYLOAD2 = b'This is the second message text' B64_1 = base64.b64encode(PAYLOAD1) @@ -476,15 +491,15 @@ def test_context_mgr_failure(self): MESSAGE2 = {'data': B64_2.decode('ascii'), 'attributes': {'attr1': 'value1', 'attr2': 'value2'}} conn = _Connection({'messageIds': [MSGID1, MSGID2]}) + CLIENT = _Client(project='PROJECT', connection=conn) topic = _Topic() - batch = self._makeOne(topic) + batch = self._makeOne(topic, client=CLIENT) try: - with _monkey_defaults(connection=conn): - with batch as other: - batch.publish(PAYLOAD1) - batch.publish(PAYLOAD2, attr1='value1', attr2='value2') - raise _Bugout() + with batch as other: + batch.publish(PAYLOAD1) + batch.publish(PAYLOAD2, attr1='value1', attr2='value2') + raise _Bugout() except _Bugout: pass @@ -524,5 +539,12 @@ def _timestamp_message(self, attrs): attrs['timestamp'] = 'TIMESTAMP' +class _Client(object): + + def __init__(self, project, connection=None): + self.project = project + self.connection = connection + + class _Bugout(Exception): pass diff --git a/gcloud/pubsub/topic.py b/gcloud/pubsub/topic.py index ab8a61e7779e..9a0af105747d 100644 --- a/gcloud/pubsub/topic.py +++ b/gcloud/pubsub/topic.py @@ -17,10 +17,8 @@ import base64 import datetime -from gcloud._helpers import get_default_project from gcloud._helpers import _RFC3339_MICROS from gcloud.exceptions import NotFound -from gcloud.pubsub._implicit_environ import _require_connection _NOW = datetime.datetime.utcnow @@ -36,33 +34,47 @@ class Topic(object): :type name: string :param name: the name of the topic - :type project: string - :param project: the project to which the topic belongs. If not passed, - falls back to the default inferred from the environment. + :type client: :class:`gcloud.pubsub.client.Client` + :param client: A client which holds credentials and project configuration + for the topic (which requires a project). :type timestamp_messages: boolean :param timestamp_messages: If true, the topic will add a ``timestamp`` key to the attributes of each published message: the value will be an RFC 3339 timestamp. """ - def __init__(self, name, project=None, timestamp_messages=False): - if project is None: - project = get_default_project() + def __init__(self, name, client, timestamp_messages=False): self.name = name - self.project = project + self._client = client self.timestamp_messages = timestamp_messages @classmethod - def from_api_repr(cls, resource): + def from_api_repr(cls, resource, client): """Factory: construct a topic given its API representation :type resource: dict :param resource: topic resource representation returned from the API + :type client: :class:`gcloud.pubsub.client.Client` + :param client: Client which holds credentials and project + configuration for the topic. + :rtype: :class:`gcloud.pubsub.topic.Topic` + :returns: Topic parsed from ``resource``. + :raises: :class:`ValueError` if ``client`` is not ``None`` and the + project from the resource does not agree with the project + from the client. """ _, project, _, name = resource['name'].split('/') - return cls(name, project) + if client.project != project: + raise ValueError('Project from clientshould agree with ' + 'project from resource.') + return cls(name, client=client) + + @property + def project(self): + """Project bound to the topic.""" + return self._client.project @property def full_name(self): @@ -74,33 +86,47 @@ def path(self): """URL path for the topic's APIs""" return '/%s' % (self.full_name) - def create(self, connection=None): + def _require_client(self, client): + """Check client or verify over-ride. + + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. + + :rtype: :class:`gcloud.pubsub.client.Client` + :returns: The client passed in or the currently bound client. + """ + if client is None: + client = self._client + return client + + def create(self, client=None): """API call: create the topic via a PUT request See: https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/create - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the ``connection`` attribute. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. """ - connection = _require_connection(connection) - connection.api_request(method='PUT', path=self.path) + client = self._require_client(client) + client.connection.api_request(method='PUT', path=self.path) - def exists(self, connection=None): + def exists(self, client=None): """API call: test for the existence of the topic via a GET request See https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/get - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the ``connection`` attribute. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. """ - connection = _require_connection(connection) + client = self._require_client(client) try: - connection.api_request(method='GET', path=self.path) + client.connection.api_request(method='GET', path=self.path) except NotFound: return False else: @@ -116,7 +142,7 @@ def _timestamp_message(self, attrs): if self.timestamp_messages and 'timestamp' not in attrs: attrs['timestamp'] = _NOW().strftime(_RFC3339_MICROS) - def publish(self, message, connection=None, **attrs): + def publish(self, message, client=None, **attrs): """API call: publish a message to a topic via a POST request See: @@ -125,9 +151,9 @@ def publish(self, message, connection=None, **attrs): :type message: bytes :param message: the message payload - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the ``connection`` attribute. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. :type attrs: dict (string -> string) :message attrs: key-value pairs to send as message attributes @@ -135,36 +161,41 @@ def publish(self, message, connection=None, **attrs): :rtype: str :returns: message ID assigned by the server to the published message """ - connection = _require_connection(connection) + client = self._require_client(client) self._timestamp_message(attrs) message_b = base64.b64encode(message).decode('ascii') message_data = {'data': message_b, 'attributes': attrs} data = {'messages': [message_data]} - response = connection.api_request(method='POST', - path='%s:publish' % self.path, - data=data) + response = client.connection.api_request( + method='POST', path='%s:publish' % (self.path,), data=data) return response['messageIds'][0] - def batch(self, connection=None): + def batch(self, client=None): """Return a batch to use as a context manager. - :rtype: :class:Batch + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. + + :rtype: :class:`Batch` + :returns: A batch to use as a context manager. """ - return Batch(self, connection=connection) + client = self._require_client(client) + return Batch(self, client) - def delete(self, connection=None): + def delete(self, client=None): """API call: delete the topic via a DELETE request See: https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/topics/delete - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the ``connection`` attribute. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current topic. """ - connection = _require_connection(connection) - connection.api_request(method='DELETE', path=self.path) + client = self._require_client(client) + client.connection.api_request(method='DELETE', path=self.path) class Batch(object): @@ -175,15 +206,14 @@ class Batch(object): :type topic: :class:`gcloud.pubsub.topic.Topic` :param topic: the topic being published - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the implicit default. + :type client: :class:`gcloud.pubsub.client.Client` + :param client: The client to use. """ - def __init__(self, topic, connection=None): + def __init__(self, topic, client): self.topic = topic self.messages = [] self.message_ids = [] - self.connection = connection + self.client = client def __enter__(self): return self @@ -209,18 +239,17 @@ def publish(self, message, **attrs): {'data': base64.b64encode(message).decode('ascii'), 'attributes': attrs}) - def commit(self, connection=None): + def commit(self, client=None): """Send saved messages as a single API call. - :type connection: :class:`gcloud.pubsub.connection.Connection` or None - :param connection: the connection to use. If not passed, - falls back to the ``connection`` attribute. + :type client: :class:`gcloud.pubsub.client.Client` or ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current batch. """ - if connection is None and self.connection is not None: - connection = self.connection - connection = _require_connection(connection) - response = connection.api_request(method='POST', - path='%s:publish' % self.topic.path, - data={'messages': self.messages[:]}) + if client is None: + client = self.client + response = client.connection.api_request( + method='POST', path='%s:publish' % self.topic.path, + data={'messages': self.messages[:]}) self.message_ids.extend(response['messageIds']) del self.messages[:] diff --git a/regression/pubsub.py b/regression/pubsub.py index 2c86148ac4da..a3f83cf60d24 100644 --- a/regression/pubsub.py +++ b/regression/pubsub.py @@ -19,11 +19,10 @@ from gcloud import _helpers from gcloud import pubsub from gcloud.pubsub.subscription import Subscription -from gcloud.pubsub.topic import Topic _helpers._PROJECT_ENV_VAR_NAME = 'GCLOUD_TESTS_PROJECT_ID' -pubsub.set_defaults() +CLIENT = pubsub.Client() class TestPubsub(unittest2.TestCase): @@ -37,7 +36,7 @@ def tearDown(self): def test_create_topic(self): TOPIC_NAME = 'a-new-topic' - topic = Topic(TOPIC_NAME) + topic = CLIENT.topic(TOPIC_NAME) self.assertFalse(topic.exists()) topic.create() self.to_delete.append(topic) @@ -51,21 +50,20 @@ def test_list_topics(self): 'newest%d' % (1000 * time.time(),), ] for topic_name in topics_to_create: - topic = Topic(topic_name) + topic = CLIENT.topic(topic_name) topic.create() self.to_delete.append(topic) # Retrieve the topics. - all_topics, _ = pubsub.list_topics() - project = pubsub.get_default_project() + all_topics, _ = CLIENT.list_topics() created = [topic for topic in all_topics if topic.name in topics_to_create and - topic.project == project] + topic.project == CLIENT.project] self.assertEqual(len(created), len(topics_to_create)) def test_create_subscription(self): TOPIC_NAME = 'subscribe-me' - topic = Topic(TOPIC_NAME) + topic = CLIENT.topic(TOPIC_NAME) self.assertFalse(topic.exists()) topic.create() self.to_delete.append(topic) @@ -80,7 +78,7 @@ def test_create_subscription(self): def test_list_subscriptions(self): TOPIC_NAME = 'subscribe-me' - topic = Topic(TOPIC_NAME) + topic = CLIENT.topic(TOPIC_NAME) self.assertFalse(topic.exists()) topic.create() self.to_delete.append(topic) @@ -95,7 +93,7 @@ def test_list_subscriptions(self): self.to_delete.append(subscription) # Retrieve the subscriptions. - all_subscriptions, _ = pubsub.list_subscriptions() + all_subscriptions, _ = CLIENT.list_subscriptions() created = [subscription for subscription in all_subscriptions if subscription.name in subscriptions_to_create and subscription.topic.name == TOPIC_NAME] @@ -103,7 +101,7 @@ def test_list_subscriptions(self): def test_message_pull_mode_e2e(self): TOPIC_NAME = 'subscribe-me' - topic = Topic(TOPIC_NAME, timestamp_messages=True) + topic = CLIENT.topic(TOPIC_NAME, timestamp_messages=True) self.assertFalse(topic.exists()) topic.create() self.to_delete.append(topic)