diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS deleted file mode 100644 index cf01548a9..000000000 --- a/.github/CODEOWNERS +++ /dev/null @@ -1,11 +0,0 @@ -# Code owners file. -# This file controls who is tagged for review for any given pull request. -# -# For syntax help see: -# https://help.github.com/en/github/creating-cloning-and-archiving-repositories/about-code-owners#codeowners-syntax - - -# The python-samples-owners team is the default owner for anything not -# explicitly taken by someone else. - - /samples/ @anguillanneuf @hongalex @googleapis/python-samples-owners diff --git a/samples/AUTHORING_GUIDE.md b/samples/AUTHORING_GUIDE.md deleted file mode 100644 index 55c97b32f..000000000 --- a/samples/AUTHORING_GUIDE.md +++ /dev/null @@ -1 +0,0 @@ -See https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/AUTHORING_GUIDE.md \ No newline at end of file diff --git a/samples/CONTRIBUTING.md b/samples/CONTRIBUTING.md deleted file mode 100644 index 34c882b6f..000000000 --- a/samples/CONTRIBUTING.md +++ /dev/null @@ -1 +0,0 @@ -See https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/CONTRIBUTING.md \ No newline at end of file diff --git a/samples/snippets/README.rst b/samples/snippets/README.rst deleted file mode 100644 index 2676680af..000000000 --- a/samples/snippets/README.rst +++ /dev/null @@ -1,282 +0,0 @@ - -.. This file is automatically generated. Do not edit this file directly. - -Google Cloud Pub/Sub Python Samples -=============================================================================== - -.. image:: https://gstatic.com/cloudssh/images/open-btn.png - :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/README.rst - - -This directory contains samples for Google Cloud Pub/Sub. `Google Cloud Pub/Sub`_ is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. - - - - -.. _Google Cloud Pub/Sub: https://cloud.google.com/pubsub/docs - - -Setup -------------------------------------------------------------------------------- - - - -Authentication -++++++++++++++ - -This sample requires you to have authentication setup. Refer to the -`Authentication Getting Started Guide`_ for instructions on setting up -credentials for applications. - -.. _Authentication Getting Started Guide: - https://cloud.google.com/docs/authentication/getting-started - - - - -Install Dependencies -++++++++++++++++++++ - -#. Clone python-docs-samples and change directory to the sample directory you want to use. - - .. code-block:: bash - - $ git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git - -#. Install `pip`_ and `virtualenv`_ if you do not already have them. You may want to refer to the `Python Development Environment Setup Guide`_ for Google Cloud Platform for instructions. - - .. _Python Development Environment Setup Guide: - https://cloud.google.com/python/setup - -#. Create a virtualenv. Samples are compatible with Python 3.6+. - - .. code-block:: bash - - $ virtualenv env - $ source env/bin/activate - -#. Install the dependencies needed to run the samples. - - .. code-block:: bash - - $ pip install -r requirements.txt - -.. _pip: https://pip.pypa.io/ -.. _virtualenv: https://virtualenv.pypa.io/ - - - - - - -Samples -------------------------------------------------------------------------------- - - -Quickstart -+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - -.. image:: https://gstatic.com/cloudssh/images/open-btn.png - :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/quickstart.py,pubsub/cloud-client/README.rst - - - - -To run this sample: - -.. code-block:: bash - - $ python quickstart.py - - - - -Publisher -+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - -.. image:: https://gstatic.com/cloudssh/images/open-btn.png - :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/publisher.py,pubsub/cloud-client/README.rst - - - - -To run this sample: - -.. code-block:: bash - - $ python publisher.py - - - usage: publisher.py [-h] - project_id - {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} - ... - - This application demonstrates how to perform basic operations on topics - with the Cloud Pub/Sub API. - - For more information, see the README.md under /pubsub and the documentation - at https://cloud.google.com/pubsub/docs. - - positional arguments: - project_id Your Google Cloud project ID - {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} - list Lists all Pub/Sub topics in the given project. - create Create a new Pub/Sub topic. - delete Deletes an existing Pub/Sub topic. - publish Publishes multiple messages to a Pub/Sub topic. - publish-with-custom-attributes - Publishes multiple messages with custom attributes to - a Pub/Sub topic. - publish-with-error-handler - Publishes multiple messages to a Pub/Sub topic with an - error handler. - publish-with-batch-settings - Publishes multiple messages to a Pub/Sub topic with - batch settings. - publish-with-retry-settings - Publishes messages with custom retry settings. - - optional arguments: - -h, --help show this help message and exit - - - - - -Subscribers -+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - -.. image:: https://gstatic.com/cloudssh/images/open-btn.png - :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/subscriber.py,pubsub/cloud-client/README.rst - - - - -To run this sample: - -.. code-block:: bash - - $ python subscriber.py - - - usage: subscriber.py [-h] - project_id - {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} - ... - - This application demonstrates how to perform basic operations on - subscriptions with the Cloud Pub/Sub API. - - For more information, see the README.md under /pubsub and the documentation - at https://cloud.google.com/pubsub/docs. - - positional arguments: - project_id Your Google Cloud project ID - {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} - list-in-topic Lists all subscriptions for a given topic. - list-in-project Lists all subscriptions in the current project. - create Create a new pull subscription on the given topic. - create-with-dead-letter-policy - Create a subscription with dead letter policy. - create-push Create a new push subscription on the given topic. - delete Deletes an existing Pub/Sub topic. - update-push Updates an existing Pub/Sub subscription's push - endpoint URL. Note that certain properties of a - subscription, such as its topic, are not modifiable. - update-dead-letter-policy - Update a subscription's dead letter policy. - remove-dead-letter-policy - Remove dead letter policy from a subscription. - receive Receives messages from a pull subscription. - receive-custom-attributes - Receives messages from a pull subscription. - receive-flow-control - Receives messages from a pull subscription with flow - control. - receive-synchronously - Pulling messages synchronously. - receive-synchronously-with-lease - Pulling messages synchronously with lease management - listen-for-errors Receives messages and catches errors from a pull - subscription. - receive-messages-with-delivery-attempts - - optional arguments: - -h, --help show this help message and exit - - - - - -Identity and Access Management -+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - -.. image:: https://gstatic.com/cloudssh/images/open-btn.png - :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/iam.py,pubsub/cloud-client/README.rst - - - - -To run this sample: - -.. code-block:: bash - - $ python iam.py - - - usage: iam.py [-h] - project - {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} - ... - - This application demonstrates how to perform basic operations on IAM - policies with the Cloud Pub/Sub API. - - For more information, see the README.md under /pubsub and the documentation - at https://cloud.google.com/pubsub/docs. - - positional arguments: - project Your Google Cloud project ID - {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} - get-topic-policy Prints the IAM policy for the given topic. - get-subscription-policy - Prints the IAM policy for the given subscription. - set-topic-policy Sets the IAM policy for a topic. - set-subscription-policy - Sets the IAM policy for a topic. - check-topic-permissions - Checks to which permissions are available on the given - topic. - check-subscription-permissions - Checks to which permissions are available on the given - subscription. - - optional arguments: - -h, --help show this help message and exit - - - - - - - - - -The client library -------------------------------------------------------------------------------- - -This sample uses the `Google Cloud Client Library for Python`_. -You can read the documentation for more details on API usage and use GitHub -to `browse the source`_ and `report issues`_. - -.. _Google Cloud Client Library for Python: - https://googlecloudplatform.github.io/google-cloud-python/ -.. _browse the source: - https://github.com/GoogleCloudPlatform/google-cloud-python -.. _report issues: - https://github.com/GoogleCloudPlatform/google-cloud-python/issues - - - -.. _Google Cloud SDK: https://cloud.google.com/sdk/ diff --git a/samples/snippets/README.rst.in b/samples/snippets/README.rst.in deleted file mode 100644 index ddbc64712..000000000 --- a/samples/snippets/README.rst.in +++ /dev/null @@ -1,30 +0,0 @@ -# This file is used to generate README.rst - -product: - name: Google Cloud Pub/Sub - short_name: Cloud Pub/Sub - url: https://cloud.google.com/pubsub/docs - description: > - `Google Cloud Pub/Sub`_ is a fully-managed real-time messaging service that - allows you to send and receive messages between independent applications. - -setup: -- auth -- install_deps - -samples: -- name: Quickstart - file: quickstart.py -- name: Publisher - file: publisher.py - show_help: true -- name: Subscribers - file: subscriber.py - show_help: true -- name: Identity and Access Management - file: iam.py - show_help: true - -cloud_client_library: true - -folder: pubsub/cloud-client \ No newline at end of file diff --git a/samples/snippets/iam.py b/samples/snippets/iam.py deleted file mode 100644 index 71c55d764..000000000 --- a/samples/snippets/iam.py +++ /dev/null @@ -1,231 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2019 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This application demonstrates how to perform basic operations on IAM -policies with the Cloud Pub/Sub API. - -For more information, see the README.md under /pubsub and the documentation -at https://cloud.google.com/pubsub/docs. -""" - -import argparse - - -def get_topic_policy(project, topic_id): - """Prints the IAM policy for the given topic.""" - # [START pubsub_get_topic_policy] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - client = pubsub_v1.PublisherClient() - topic_path = client.topic_path(project, topic_id) - - policy = client.get_iam_policy(topic_path) - - print("Policy for topic {}:".format(topic_path)) - for binding in policy.bindings: - print("Role: {}, Members: {}".format(binding.role, binding.members)) - # [END pubsub_get_topic_policy] - - -def get_subscription_policy(project, subscription_id): - """Prints the IAM policy for the given subscription.""" - # [START pubsub_get_subscription_policy] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - client = pubsub_v1.SubscriberClient() - subscription_path = client.subscription_path(project, subscription_id) - - policy = client.get_iam_policy(subscription_path) - - print("Policy for subscription {}:".format(subscription_path)) - for binding in policy.bindings: - print("Role: {}, Members: {}".format(binding.role, binding.members)) - - client.close() - # [END pubsub_get_subscription_policy] - - -def set_topic_policy(project, topic_id): - """Sets the IAM policy for a topic.""" - # [START pubsub_set_topic_policy] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - client = pubsub_v1.PublisherClient() - topic_path = client.topic_path(project, topic_id) - - policy = client.get_iam_policy(topic_path) - - # Add all users as viewers. - policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) - - # Add a group as a publisher. - policy.bindings.add( - role="roles/pubsub.publisher", members=["group:cloud-logs@google.com"] - ) - - # Set the policy - policy = client.set_iam_policy(topic_path, policy) - - print("IAM policy for topic {} set: {}".format(topic_id, policy)) - # [END pubsub_set_topic_policy] - - -def set_subscription_policy(project, subscription_id): - """Sets the IAM policy for a topic.""" - # [START pubsub_set_subscription_policy] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - client = pubsub_v1.SubscriberClient() - subscription_path = client.subscription_path(project, subscription_id) - - policy = client.get_iam_policy(subscription_path) - - # Add all users as viewers. - policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) - - # Add a group as an editor. - policy.bindings.add(role="roles/editor", members=["group:cloud-logs@google.com"]) - - # Set the policy - policy = client.set_iam_policy(subscription_path, policy) - - print("IAM policy for subscription {} set: {}".format(subscription_id, policy)) - - client.close() - # [END pubsub_set_subscription_policy] - - -def check_topic_permissions(project, topic_id): - """Checks to which permissions are available on the given topic.""" - # [START pubsub_test_topic_permissions] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - client = pubsub_v1.PublisherClient() - topic_path = client.topic_path(project, topic_id) - - permissions_to_check = ["pubsub.topics.publish", "pubsub.topics.update"] - - allowed_permissions = client.test_iam_permissions(topic_path, permissions_to_check) - - print( - "Allowed permissions for topic {}: {}".format(topic_path, allowed_permissions) - ) - # [END pubsub_test_topic_permissions] - - -def check_subscription_permissions(project, subscription_id): - """Checks to which permissions are available on the given subscription.""" - # [START pubsub_test_subscription_permissions] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - client = pubsub_v1.SubscriberClient() - subscription_path = client.subscription_path(project, subscription_id) - - permissions_to_check = [ - "pubsub.subscriptions.consume", - "pubsub.subscriptions.update", - ] - - allowed_permissions = client.test_iam_permissions( - subscription_path, permissions_to_check - ) - - print( - "Allowed permissions for subscription {}: {}".format( - subscription_path, allowed_permissions - ) - ) - - client.close() - # [END pubsub_test_subscription_permissions] - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("project", help="Your Google Cloud project ID") - - subparsers = parser.add_subparsers(dest="command") - - get_topic_policy_parser = subparsers.add_parser( - "get-topic-policy", help=get_topic_policy.__doc__ - ) - get_topic_policy_parser.add_argument("topic_id") - - get_subscription_policy_parser = subparsers.add_parser( - "get-subscription-policy", help=get_subscription_policy.__doc__ - ) - get_subscription_policy_parser.add_argument("subscription_id") - - set_topic_policy_parser = subparsers.add_parser( - "set-topic-policy", help=set_topic_policy.__doc__ - ) - set_topic_policy_parser.add_argument("topic_id") - - set_subscription_policy_parser = subparsers.add_parser( - "set-subscription-policy", help=set_subscription_policy.__doc__ - ) - set_subscription_policy_parser.add_argument("subscription_id") - - check_topic_permissions_parser = subparsers.add_parser( - "check-topic-permissions", help=check_topic_permissions.__doc__ - ) - check_topic_permissions_parser.add_argument("topic_id") - - check_subscription_permissions_parser = subparsers.add_parser( - "check-subscription-permissions", help=check_subscription_permissions.__doc__, - ) - check_subscription_permissions_parser.add_argument("subscription_id") - - args = parser.parse_args() - - if args.command == "get-topic-policy": - get_topic_policy(args.project, args.topic_id) - elif args.command == "get-subscription-policy": - get_subscription_policy(args.project, args.subscription_id) - elif args.command == "set-topic-policy": - set_topic_policy(args.project, args.topic_id) - elif args.command == "set-subscription-policy": - set_subscription_policy(args.project, args.subscription_id) - elif args.command == "check-topic-permissions": - check_topic_permissions(args.project, args.topic_id) - elif args.command == "check-subscription-permissions": - check_subscription_permissions(args.project, args.subscription_id) diff --git a/samples/snippets/iam_test.py b/samples/snippets/iam_test.py deleted file mode 100644 index d196953f6..000000000 --- a/samples/snippets/iam_test.py +++ /dev/null @@ -1,118 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import uuid - -from google.cloud import pubsub_v1 -import pytest - -import iam - -UUID = uuid.uuid4().hex -PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] -TOPIC = "iam-test-topic-" + UUID -SUBSCRIPTION = "iam-test-subscription-" + UUID - - -@pytest.fixture(scope="module") -def publisher_client(): - yield pubsub_v1.PublisherClient() - - -@pytest.fixture(scope="module") -def topic(publisher_client): - topic_path = publisher_client.topic_path(PROJECT, TOPIC) - - try: - publisher_client.delete_topic(topic_path) - except Exception: - pass - - publisher_client.create_topic(topic_path) - - yield topic_path - - publisher_client.delete_topic(topic_path) - - -@pytest.fixture(scope="module") -def subscriber_client(): - subscriber_client = pubsub_v1.SubscriberClient() - yield subscriber_client - subscriber_client.close() - - -@pytest.fixture -def subscription(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION) - - try: - subscriber_client.delete_subscription(subscription_path) - except Exception: - pass - - subscriber_client.create_subscription(subscription_path, topic=topic) - - yield subscription_path - - subscriber_client.delete_subscription(subscription_path) - - -def test_get_topic_policy(topic, capsys): - iam.get_topic_policy(PROJECT, TOPIC) - - out, _ = capsys.readouterr() - assert topic in out - - -def test_get_subscription_policy(subscription, capsys): - iam.get_subscription_policy(PROJECT, SUBSCRIPTION) - - out, _ = capsys.readouterr() - assert subscription in out - - -def test_set_topic_policy(publisher_client, topic): - iam.set_topic_policy(PROJECT, TOPIC) - - policy = publisher_client.get_iam_policy(topic) - assert "roles/pubsub.publisher" in str(policy) - assert "allUsers" in str(policy) - - -def test_set_subscription_policy(subscriber_client, subscription): - iam.set_subscription_policy(PROJECT, SUBSCRIPTION) - - policy = subscriber_client.get_iam_policy(subscription) - assert "roles/pubsub.viewer" in str(policy) - assert "allUsers" in str(policy) - - -def test_check_topic_permissions(topic, capsys): - iam.check_topic_permissions(PROJECT, TOPIC) - - out, _ = capsys.readouterr() - - assert topic in out - assert "pubsub.topics.publish" in out - - -def test_check_subscription_permissions(subscription, capsys): - iam.check_subscription_permissions(PROJECT, SUBSCRIPTION) - - out, _ = capsys.readouterr() - - assert subscription in out - assert "pubsub.subscriptions.consume" in out diff --git a/samples/snippets/noxfile.py b/samples/snippets/noxfile.py deleted file mode 100644 index ba55d7ce5..000000000 --- a/samples/snippets/noxfile.py +++ /dev/null @@ -1,224 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import os -from pathlib import Path -import sys - -import nox - - -# WARNING - WARNING - WARNING - WARNING - WARNING -# WARNING - WARNING - WARNING - WARNING - WARNING -# DO NOT EDIT THIS FILE EVER! -# WARNING - WARNING - WARNING - WARNING - WARNING -# WARNING - WARNING - WARNING - WARNING - WARNING - -# Copy `noxfile_config.py` to your directory and modify it instead. - - -# `TEST_CONFIG` dict is a configuration hook that allows users to -# modify the test configurations. The values here should be in sync -# with `noxfile_config.py`. Users will copy `noxfile_config.py` into -# their directory and modify it. - -TEST_CONFIG = { - # You can opt out from the test for specific Python versions. - 'ignored_versions': ["2.7"], - - # An envvar key for determining the project id to use. Change it - # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a - # build specific Cloud project. You can also use your own string - # to use your own Cloud project. - 'gcloud_project_env': 'GOOGLE_CLOUD_PROJECT', - # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', - - # A dictionary you want to inject into your test. Don't put any - # secrets here. These values will override predefined values. - 'envs': {}, -} - - -try: - # Ensure we can import noxfile_config in the project's directory. - sys.path.append('.') - from noxfile_config import TEST_CONFIG_OVERRIDE -except ImportError as e: - print("No user noxfile_config found: detail: {}".format(e)) - TEST_CONFIG_OVERRIDE = {} - -# Update the TEST_CONFIG with the user supplied values. -TEST_CONFIG.update(TEST_CONFIG_OVERRIDE) - - -def get_pytest_env_vars(): - """Returns a dict for pytest invocation.""" - ret = {} - - # Override the GCLOUD_PROJECT and the alias. - env_key = TEST_CONFIG['gcloud_project_env'] - # This should error out if not set. - ret['GOOGLE_CLOUD_PROJECT'] = os.environ[env_key] - - # Apply user supplied envs. - ret.update(TEST_CONFIG['envs']) - return ret - - -# DO NOT EDIT - automatically generated. -# All versions used to tested samples. -ALL_VERSIONS = ["2.7", "3.6", "3.7", "3.8"] - -# Any default versions that should be ignored. -IGNORED_VERSIONS = TEST_CONFIG['ignored_versions'] - -TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) - -INSTALL_LIBRARY_FROM_SOURCE = bool(os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False)) -# -# Style Checks -# - - -def _determine_local_import_names(start_dir): - """Determines all import names that should be considered "local". - - This is used when running the linter to insure that import order is - properly checked. - """ - file_ext_pairs = [os.path.splitext(path) for path in os.listdir(start_dir)] - return [ - basename - for basename, extension in file_ext_pairs - if extension == ".py" - or os.path.isdir(os.path.join(start_dir, basename)) - and basename not in ("__pycache__") - ] - - -# Linting with flake8. -# -# We ignore the following rules: -# E203: whitespace before ‘:’ -# E266: too many leading ‘#’ for block comment -# E501: line too long -# I202: Additional newline in a section of imports -# -# We also need to specify the rules which are ignored by default: -# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121'] -FLAKE8_COMMON_ARGS = [ - "--show-source", - "--builtin=gettext", - "--max-complexity=20", - "--import-order-style=google", - "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py", - "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202", - "--max-line-length=88", -] - - -@nox.session -def lint(session): - session.install("flake8", "flake8-import-order") - - local_names = _determine_local_import_names(".") - args = FLAKE8_COMMON_ARGS + [ - "--application-import-names", - ",".join(local_names), - "." - ] - session.run("flake8", *args) - - -# -# Sample Tests -# - - -PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"] - - -def _session_tests(session, post_install=None): - """Runs py.test for a particular project.""" - if os.path.exists("requirements.txt"): - session.install("-r", "requirements.txt") - - if os.path.exists("requirements-test.txt"): - session.install("-r", "requirements-test.txt") - - if INSTALL_LIBRARY_FROM_SOURCE: - session.install("-e", _get_repo_root()) - - if post_install: - post_install(session) - - session.run( - "pytest", - *(PYTEST_COMMON_ARGS + session.posargs), - # Pytest will return 5 when no tests are collected. This can happen - # on travis where slow and flaky tests are excluded. - # See http://doc.pytest.org/en/latest/_modules/_pytest/main.html - success_codes=[0, 5], - env=get_pytest_env_vars() - ) - - -@nox.session(python=ALL_VERSIONS) -def py(session): - """Runs py.test for a sample using the specified version of Python.""" - if session.python in TESTED_VERSIONS: - _session_tests(session) - else: - session.skip("SKIPPED: {} tests are disabled for this sample.".format( - session.python - )) - - -# -# Readmegen -# - - -def _get_repo_root(): - """ Returns the root folder of the project. """ - # Get root of this repository. Assume we don't have directories nested deeper than 10 items. - p = Path(os.getcwd()) - for i in range(10): - if p is None: - break - if Path(p / ".git").exists(): - return str(p) - p = p.parent - raise Exception("Unable to detect repository root.") - - -GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")]) - - -@nox.session -@nox.parametrize("path", GENERATED_READMES) -def readmegen(session, path): - """(Re-)generates the readme for a sample.""" - session.install("jinja2", "pyyaml") - dir_ = os.path.dirname(path) - - if os.path.exists(os.path.join(dir_, "requirements.txt")): - session.install("-r", os.path.join(dir_, "requirements.txt")) - - in_file = os.path.join(dir_, "README.rst.in") - session.run( - "python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file - ) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py deleted file mode 100644 index 477b31b9c..000000000 --- a/samples/snippets/publisher.py +++ /dev/null @@ -1,334 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2016 Google LLC. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This application demonstrates how to perform basic operations on topics -with the Cloud Pub/Sub API. - -For more information, see the README.md under /pubsub and the documentation -at https://cloud.google.com/pubsub/docs. -""" - -import argparse - - -def list_topics(project_id): - """Lists all Pub/Sub topics in the given project.""" - # [START pubsub_list_topics] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - - publisher = pubsub_v1.PublisherClient() - project_path = publisher.project_path(project_id) - - for topic in publisher.list_topics(project_path): - print(topic) - # [END pubsub_list_topics] - - -def create_topic(project_id, topic_id): - """Create a new Pub/Sub topic.""" - # [START pubsub_quickstart_create_topic] - # [START pubsub_create_topic] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project_id, topic_id) - - topic = publisher.create_topic(topic_path) - - print("Topic created: {}".format(topic)) - # [END pubsub_quickstart_create_topic] - # [END pubsub_create_topic] - - -def delete_topic(project_id, topic_id): - """Deletes an existing Pub/Sub topic.""" - # [START pubsub_delete_topic] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project_id, topic_id) - - publisher.delete_topic(topic_path) - - print("Topic deleted: {}".format(topic_path)) - # [END pubsub_delete_topic] - - -def publish_messages(project_id, topic_id): - """Publishes multiple messages to a Pub/Sub topic.""" - # [START pubsub_quickstart_publisher] - # [START pubsub_publish] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - # The `topic_path` method creates a fully qualified identifier - # in the form `projects/{project_id}/topics/{topic_id}` - topic_path = publisher.topic_path(project_id, topic_id) - - for n in range(1, 10): - data = u"Message number {}".format(n) - # Data must be a bytestring - data = data.encode("utf-8") - # When you publish a message, the client returns a future. - future = publisher.publish(topic_path, data=data) - print(future.result()) - - print("Published messages.") - # [END pubsub_quickstart_publisher] - # [END pubsub_publish] - - -def publish_messages_with_custom_attributes(project_id, topic_id): - """Publishes multiple messages with custom attributes - to a Pub/Sub topic.""" - # [START pubsub_publish_custom_attributes] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project_id, topic_id) - - for n in range(1, 10): - data = u"Message number {}".format(n) - # Data must be a bytestring - data = data.encode("utf-8") - # Add two attributes, origin and username, to the message - future = publisher.publish( - topic_path, data, origin="python-sample", username="gcp" - ) - print(future.result()) - - print("Published messages with custom attributes.") - # [END pubsub_publish_custom_attributes] - - -def publish_messages_with_error_handler(project_id, topic_id): - # [START pubsub_publish_messages_error_handler] - """Publishes multiple messages to a Pub/Sub topic with an error handler.""" - import time - - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project_id, topic_id) - - futures = dict() - - def get_callback(f, data): - def callback(f): - try: - print(f.result()) - futures.pop(data) - except: # noqa - print("Please handle {} for {}.".format(f.exception(), data)) - - return callback - - for i in range(10): - data = str(i) - futures.update({data: None}) - # When you publish a message, the client returns a future. - future = publisher.publish( - topic_path, data=data.encode("utf-8") # data must be a bytestring. - ) - futures[data] = future - # Publish failures shall be handled in the callback function. - future.add_done_callback(get_callback(future, data)) - - # Wait for all the publish futures to resolve before exiting. - while futures: - time.sleep(5) - - print("Published message with error handler.") - # [END pubsub_publish_messages_error_handler] - - -def publish_messages_with_batch_settings(project_id, topic_id): - """Publishes multiple messages to a Pub/Sub topic with batch settings.""" - # [START pubsub_publisher_batch_settings] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - # Configure the batch to publish as soon as there is ten messages, - # one kilobyte of data, or one second has passed. - batch_settings = pubsub_v1.types.BatchSettings( - max_messages=10, # default 100 - max_bytes=1024, # default 1 MB - max_latency=1, # default 10 ms - ) - publisher = pubsub_v1.PublisherClient(batch_settings) - topic_path = publisher.topic_path(project_id, topic_id) - - # Resolve the publish future in a separate thread. - def callback(future): - message_id = future.result() - print(message_id) - - for n in range(1, 10): - data = u"Message number {}".format(n) - # Data must be a bytestring - data = data.encode("utf-8") - future = publisher.publish(topic_path, data=data) - # Non-blocking. Allow the publisher client to batch multiple messages. - future.add_done_callback(callback) - - print("Published messages with batch settings.") - # [END pubsub_publisher_batch_settings] - - -def publish_messages_with_retry_settings(project_id, topic_id): - """Publishes messages with custom retry settings.""" - # [START pubsub_publisher_retry_settings] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - # Configure the retry settings. Defaults will be overwritten. - retry_settings = { - "interfaces": { - "google.pubsub.v1.Publisher": { - "retry_codes": { - "publish": [ - "ABORTED", - "CANCELLED", - "DEADLINE_EXCEEDED", - "INTERNAL", - "RESOURCE_EXHAUSTED", - "UNAVAILABLE", - "UNKNOWN", - ] - }, - "retry_params": { - "messaging": { - "initial_retry_delay_millis": 100, # default: 100 - "retry_delay_multiplier": 1.3, # default: 1.3 - "max_retry_delay_millis": 60000, # default: 60000 - "initial_rpc_timeout_millis": 5000, # default: 25000 - "rpc_timeout_multiplier": 1.0, # default: 1.0 - "max_rpc_timeout_millis": 600000, # default: 30000 - "total_timeout_millis": 600000, # default: 600000 - } - }, - "methods": { - "Publish": { - "retry_codes_name": "publish", - "retry_params_name": "messaging", - } - }, - } - } - } - - publisher = pubsub_v1.PublisherClient(client_config=retry_settings) - topic_path = publisher.topic_path(project_id, topic_id) - - for n in range(1, 10): - data = u"Message number {}".format(n) - # Data must be a bytestring - data = data.encode("utf-8") - future = publisher.publish(topic_path, data=data) - print(future.result()) - - print("Published messages with retry settings.") - # [END pubsub_publisher_retry_settings] - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("project_id", help="Your Google Cloud project ID") - - subparsers = parser.add_subparsers(dest="command") - subparsers.add_parser("list", help=list_topics.__doc__) - - create_parser = subparsers.add_parser("create", help=create_topic.__doc__) - create_parser.add_argument("topic_id") - - delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) - delete_parser.add_argument("topic_id") - - publish_parser = subparsers.add_parser("publish", help=publish_messages.__doc__) - publish_parser.add_argument("topic_id") - - publish_with_custom_attributes_parser = subparsers.add_parser( - "publish-with-custom-attributes", - help=publish_messages_with_custom_attributes.__doc__, - ) - publish_with_custom_attributes_parser.add_argument("topic_id") - - publish_with_error_handler_parser = subparsers.add_parser( - "publish-with-error-handler", help=publish_messages_with_error_handler.__doc__, - ) - publish_with_error_handler_parser.add_argument("topic_id") - - publish_with_batch_settings_parser = subparsers.add_parser( - "publish-with-batch-settings", - help=publish_messages_with_batch_settings.__doc__, - ) - publish_with_batch_settings_parser.add_argument("topic_id") - - publish_with_retry_settings_parser = subparsers.add_parser( - "publish-with-retry-settings", - help=publish_messages_with_retry_settings.__doc__, - ) - publish_with_retry_settings_parser.add_argument("topic_id") - - args = parser.parse_args() - - if args.command == "list": - list_topics(args.project_id) - elif args.command == "create": - create_topic(args.project_id, args.topic_id) - elif args.command == "delete": - delete_topic(args.project_id, args.topic_id) - elif args.command == "publish": - publish_messages(args.project_id, args.topic_id) - elif args.command == "publish-with-custom-attributes": - publish_messages_with_custom_attributes(args.project_id, args.topic_id) - elif args.command == "publish-with-error-handler": - publish_messages_with_error_handler(args.project_id, args.topic_id) - elif args.command == "publish-with-batch-settings": - publish_messages_with_batch_settings(args.project_id, args.topic_id) - elif args.command == "publish-with-retry-settings": - publish_messages_with_retry_settings(args.project_id, args.topic_id) diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py deleted file mode 100644 index b5c2ea1ea..000000000 --- a/samples/snippets/publisher_test.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import time -import uuid - -import backoff -from google.cloud import pubsub_v1 -import mock -import pytest - -import publisher - -UUID = uuid.uuid4().hex -PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] -TOPIC_ADMIN = "publisher-test-topic-admin-" + UUID -TOPIC_PUBLISH = "publisher-test-topic-publish-" + UUID - - -@pytest.fixture -def client(): - yield pubsub_v1.PublisherClient() - - -@pytest.fixture -def topic_admin(client): - topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) - - try: - topic = client.get_topic(topic_path) - except: # noqa - topic = client.create_topic(topic_path) - - yield topic.name - # Teardown of `topic_admin` is handled in `test_delete()`. - - -@pytest.fixture -def topic_publish(client): - topic_path = client.topic_path(PROJECT, TOPIC_PUBLISH) - - try: - topic = client.get_topic(topic_path) - except: # noqa - topic = client.create_topic(topic_path) - - yield topic.name - - client.delete_topic(topic.name) - - -def _make_sleep_patch(): - real_sleep = time.sleep - - def new_sleep(period): - if period == 60: - real_sleep(5) - raise RuntimeError("sigil") - else: - real_sleep(period) - - return mock.patch("time.sleep", new=new_sleep) - - -def test_list(client, topic_admin, capsys): - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - publisher.list_topics(PROJECT) - out, _ = capsys.readouterr() - assert topic_admin in out - - eventually_consistent_test() - - -def test_create(client): - topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) - try: - client.delete_topic(topic_path) - except Exception: - pass - - publisher.create_topic(PROJECT, TOPIC_ADMIN) - - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - assert client.get_topic(topic_path) - - eventually_consistent_test() - - -def test_delete(client, topic_admin): - publisher.delete_topic(PROJECT, TOPIC_ADMIN) - - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - with pytest.raises(Exception): - client.get_topic(client.topic_path(PROJECT, TOPIC_ADMIN)) - - eventually_consistent_test() - - -def test_publish(topic_publish, capsys): - publisher.publish_messages(PROJECT, TOPIC_PUBLISH) - - out, _ = capsys.readouterr() - assert "Published" in out - - -def test_publish_with_custom_attributes(topic_publish, capsys): - publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_PUBLISH) - - out, _ = capsys.readouterr() - assert "Published" in out - - -def test_publish_with_batch_settings(topic_publish, capsys): - publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_PUBLISH) - - out, _ = capsys.readouterr() - assert "Published" in out - - -def test_publish_with_retry_settings(topic_publish, capsys): - publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_PUBLISH) - - out, _ = capsys.readouterr() - assert "Published" in out - - -def test_publish_with_error_handler(topic_publish, capsys): - publisher.publish_messages_with_error_handler(PROJECT, TOPIC_PUBLISH) - - out, _ = capsys.readouterr() - assert "Published" in out diff --git a/samples/snippets/quickstart/pub.py b/samples/snippets/quickstart/pub.py deleted file mode 100644 index 16432c0c3..000000000 --- a/samples/snippets/quickstart/pub.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START pubsub_quickstart_pub_all] -import argparse -import time - -# [START pubsub_quickstart_pub_deps] -from google.cloud import pubsub_v1 - -# [END pubsub_quickstart_pub_deps] - - -def get_callback(api_future, data, ref): - """Wrap message data in the context of the callback function.""" - - def callback(api_future): - try: - print( - "Published message {} now has message ID {}".format( - data, api_future.result() - ) - ) - ref["num_messages"] += 1 - except Exception: - print( - "A problem occurred when publishing {}: {}\n".format( - data, api_future.exception() - ) - ) - raise - - return callback - - -def pub(project_id, topic_id): - """Publishes a message to a Pub/Sub topic.""" - # [START pubsub_quickstart_pub_client] - # Initialize a Publisher client. - client = pubsub_v1.PublisherClient() - # [END pubsub_quickstart_pub_client] - # Create a fully qualified identifier in the form of - # `projects/{project_id}/topics/{topic_id}` - topic_path = client.topic_path(project_id, topic_id) - - # Data sent to Cloud Pub/Sub must be a bytestring. - data = b"Hello, World!" - - # Keep track of the number of published messages. - ref = dict({"num_messages": 0}) - - # When you publish a message, the client returns a future. - api_future = client.publish(topic_path, data=data) - api_future.add_done_callback(get_callback(api_future, data, ref)) - - # Keep the main thread from exiting while the message future - # gets resolved in the background. - while api_future.running(): - time.sleep(0.5) - print("Published {} message(s).".format(ref["num_messages"])) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("project_id", help="Google Cloud project ID") - parser.add_argument("topic_id", help="Pub/Sub topic ID") - - args = parser.parse_args() - - pub(args.project_id, args.topic_id) -# [END pubsub_quickstart_pub_all] diff --git a/samples/snippets/quickstart/pub_test.py b/samples/snippets/quickstart/pub_test.py deleted file mode 100644 index 6f5cc06c4..000000000 --- a/samples/snippets/quickstart/pub_test.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import uuid - -from google.api_core.exceptions import AlreadyExists -from google.cloud import pubsub_v1 -import pytest - -import pub # noqa - - -UUID = uuid.uuid4().hex -PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] -TOPIC = "quickstart-pub-test-topic-" + UUID - - -@pytest.fixture(scope="module") -def publisher_client(): - yield pubsub_v1.PublisherClient() - - -@pytest.fixture(scope="module") -def topic(publisher_client): - topic_path = publisher_client.topic_path(PROJECT, TOPIC) - - try: - publisher_client.create_topic(topic_path) - except AlreadyExists: - pass - - yield TOPIC - - publisher_client.delete_topic(topic_path) - - -def test_pub(publisher_client, topic, capsys): - pub.pub(PROJECT, topic) - - out, _ = capsys.readouterr() - - assert "Hello, World!" in out diff --git a/samples/snippets/quickstart/sub.py b/samples/snippets/quickstart/sub.py deleted file mode 100644 index efe008915..000000000 --- a/samples/snippets/quickstart/sub.py +++ /dev/null @@ -1,69 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# [START pubsub_quickstart_sub_all] -import argparse - -# [START pubsub_quickstart_sub_deps] -from google.cloud import pubsub_v1 - -# [END pubsub_quickstart_sub_deps] - - -def sub(project_id, subscription_id): - """Receives messages from a Pub/Sub subscription.""" - # [START pubsub_quickstart_sub_client] - # Initialize a Subscriber client - subscriber_client = pubsub_v1.SubscriberClient() - # [END pubsub_quickstart_sub_client] - # Create a fully qualified identifier in the form of - # `projects/{project_id}/subscriptions/{subscription_id}` - subscription_path = subscriber_client.subscription_path(project_id, subscription_id) - - def callback(message): - print( - "Received message {} of message ID {}\n".format(message, message.message_id) - ) - # Acknowledge the message. Unack'ed messages will be redelivered. - message.ack() - print("Acknowledged message {}\n".format(message.message_id)) - - streaming_pull_future = subscriber_client.subscribe( - subscription_path, callback=callback - ) - print("Listening for messages on {}..\n".format(subscription_path)) - - try: - # Calling result() on StreamingPullFuture keeps the main thread from - # exiting while messages get processed in the callbacks. - streaming_pull_future.result() - except: # noqa - streaming_pull_future.cancel() - - subscriber_client.close() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("project_id", help="Google Cloud project ID") - parser.add_argument("subscription_id", help="Pub/Sub subscription ID") - - args = parser.parse_args() - - sub(args.project_id, args.subscription_id) -# [END pubsub_quickstart_sub_all] diff --git a/samples/snippets/quickstart/sub_test.py b/samples/snippets/quickstart/sub_test.py deleted file mode 100644 index 38047422a..000000000 --- a/samples/snippets/quickstart/sub_test.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import uuid - -from google.api_core.exceptions import AlreadyExists -from google.cloud import pubsub_v1 -import mock -import pytest - -import sub # noqa - - -UUID = uuid.uuid4().hex -PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] -TOPIC = "quickstart-sub-test-topic-" + UUID -SUBSCRIPTION = "quickstart-sub-test-topic-sub-" + UUID - -publisher_client = pubsub_v1.PublisherClient() -subscriber_client = pubsub_v1.SubscriberClient() - - -@pytest.fixture(scope="module") -def topic_path(): - topic_path = publisher_client.topic_path(PROJECT, TOPIC) - - try: - topic = publisher_client.create_topic(topic_path) - yield topic.name - except AlreadyExists: - yield topic_path - - publisher_client.delete_topic(topic_path) - - -@pytest.fixture(scope="module") -def subscription_path(topic_path): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION) - - try: - subscription = subscriber_client.create_subscription( - subscription_path, topic_path - ) - yield subscription.name - except AlreadyExists: - yield subscription_path - - subscriber_client.delete_subscription(subscription_path) - subscriber_client.close() - - -def _publish_messages(topic_path): - publish_future = publisher_client.publish(topic_path, data=b"Hello World!") - publish_future.result() - - -def test_sub(monkeypatch, topic_path, subscription_path, capsys): - - real_client = pubsub_v1.SubscriberClient() - mock_client = mock.Mock(spec=pubsub_v1.SubscriberClient, wraps=real_client) - - # Attributes on mock_client_constructor uses the corresponding - # attributes on pubsub_v1.SubscriberClient. - mock_client_constructor = mock.create_autospec(pubsub_v1.SubscriberClient) - mock_client_constructor.return_value = mock_client - - monkeypatch.setattr(pubsub_v1, "SubscriberClient", mock_client_constructor) - - def mock_subscribe(subscription_path, callback=None): - real_future = real_client.subscribe(subscription_path, callback=callback) - mock_future = mock.Mock(spec=real_future, wraps=real_future) - - def mock_result(): - return real_future.result(timeout=10) - - mock_future.result.side_effect = mock_result - return mock_future - - mock_client.subscribe.side_effect = mock_subscribe - - _publish_messages(topic_path) - - sub.sub(PROJECT, SUBSCRIPTION) - - out, _ = capsys.readouterr() - assert "Received message" in out - assert "Acknowledged message" in out - - real_client.close() diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt deleted file mode 100644 index adf26b9f9..000000000 --- a/samples/snippets/requirements-test.txt +++ /dev/null @@ -1,3 +0,0 @@ -backoff==1.10.0 -pytest==5.3.2 -mock==3.0.5 diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt deleted file mode 100644 index 9b496510a..000000000 --- a/samples/snippets/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -google-cloud-pubsub==1.6.0 diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py deleted file mode 100644 index f079e7d42..000000000 --- a/samples/snippets/subscriber.py +++ /dev/null @@ -1,783 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This application demonstrates how to perform basic operations on -subscriptions with the Cloud Pub/Sub API. - -For more information, see the README.md under /pubsub and the documentation -at https://cloud.google.com/pubsub/docs. -""" - -import argparse - - -def list_subscriptions_in_topic(project_id, topic_id): - """Lists all subscriptions for a given topic.""" - # [START pubsub_list_topic_subscriptions] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - - publisher = pubsub_v1.PublisherClient() - topic_path = publisher.topic_path(project_id, topic_id) - - for subscription in publisher.list_topic_subscriptions(topic_path): - print(subscription) - # [END pubsub_list_topic_subscriptions] - - -def list_subscriptions_in_project(project_id): - """Lists all subscriptions in the current project.""" - # [START pubsub_list_subscriptions] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - - subscriber = pubsub_v1.SubscriberClient() - project_path = subscriber.project_path(project_id) - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - for subscription in subscriber.list_subscriptions(project_path): - print(subscription.name) - # [END pubsub_list_subscriptions] - - -def create_subscription(project_id, topic_id, subscription_id): - """Create a new pull subscription on the given topic.""" - # [START pubsub_create_pull_subscription] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - subscription = subscriber.create_subscription(subscription_path, topic_path) - - print("Subscription created: {}".format(subscription)) - # [END pubsub_create_pull_subscription] - - -def create_subscription_with_dead_letter_topic( - project_id, topic_id, subscription_id, dead_letter_topic_id -): - """Create a subscription with dead letter policy.""" - # [START pubsub_dead_letter_create_subscription] - from google.cloud import pubsub_v1 - from google.cloud.pubsub_v1.types import DeadLetterPolicy - - # TODO(developer) - # project_id = "your-project-id" - # endpoint = "https://my-test-project.appspot.com/push" - # TODO(developer): This is an existing topic that the subscription - # with dead letter policy is attached to. - # topic_id = "your-topic-id" - # TODO(developer): This is an existing subscription with a dead letter policy. - # subscription_id = "your-subscription-id" - # TODO(developer): This is an existing dead letter topic that the subscription - # with dead letter policy will forward dead letter messages to. - # dead_letter_topic_id = "your-dead-letter-topic-id" - - subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) - subscription_path = subscriber.subscription_path(project_id, subscription_id) - dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_id) - - dead_letter_policy = DeadLetterPolicy( - dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10 - ) - - with subscriber: - subscription = subscriber.create_subscription( - subscription_path, topic_path, dead_letter_policy=dead_letter_policy - ) - - print("Subscription created: {}".format(subscription.name)) - print( - "It will forward dead letter messages to: {}".format( - subscription.dead_letter_policy.dead_letter_topic - ) - ) - print( - "After {} delivery attempts.".format( - subscription.dead_letter_policy.max_delivery_attempts - ) - ) - # [END pubsub_dead_letter_create_subscription] - - -def create_push_subscription(project_id, topic_id, subscription_id, endpoint): - """Create a new push subscription on the given topic.""" - # [START pubsub_create_push_subscription] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - # subscription_id = "your-subscription-id" - # endpoint = "https://my-test-project.appspot.com/push" - - subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - subscription = subscriber.create_subscription( - subscription_path, topic_path, push_config - ) - - print("Push subscription created: {}".format(subscription)) - print("Endpoint for subscription is: {}".format(endpoint)) - # [END pubsub_create_push_subscription] - - -def delete_subscription(project_id, subscription_id): - """Deletes an existing Pub/Sub topic.""" - # [START pubsub_delete_subscription] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - subscriber.delete_subscription(subscription_path) - - print("Subscription deleted: {}".format(subscription_path)) - # [END pubsub_delete_subscription] - - -def update_push_subscription(project_id, topic_id, subscription_id, endpoint): - """ - Updates an existing Pub/Sub subscription's push endpoint URL. - Note that certain properties of a subscription, such as - its topic, are not modifiable. - """ - # [START pubsub_update_push_configuration] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # topic_id = "your-topic-id" - # subscription_id = "your-subscription-id" - # endpoint = "https://my-test-project.appspot.com/push" - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) - - subscription = pubsub_v1.types.Subscription( - name=subscription_path, topic=topic_id, push_config=push_config - ) - - update_mask = {"paths": {"push_config"}} - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - result = subscriber.update_subscription(subscription, update_mask) - - print("Subscription updated: {}".format(subscription_path)) - print("New endpoint for subscription is: {}".format(result.push_config)) - # [END pubsub_update_push_configuration] - - -def update_subscription_with_dead_letter_policy( - project_id, topic_id, subscription_id, dead_letter_topic_id -): - """Update a subscription's dead letter policy.""" - # [START pubsub_dead_letter_update_subscription] - from google.cloud import pubsub_v1 - from google.cloud.pubsub_v1.types import DeadLetterPolicy, FieldMask - - # TODO(developer) - # project_id = "your-project-id" - # TODO(developer): This is an existing topic that the subscription - # with dead letter policy is attached to. - # topic_id = "your-topic-id" - # TODO(developer): This is an existing subscription with a dead letter policy. - # subscription_id = "your-subscription-id" - # TODO(developer): This is an existing dead letter topic that the subscription - # with dead letter policy will forward dead letter messages to. - # dead_letter_topic_id = "your-dead-letter-topic-id" - - subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) - subscription_path = subscriber.subscription_path(project_id, subscription_id) - dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_id) - - subscription_before_update = subscriber.get_subscription(subscription_path) - print("Before the update: {}".format(subscription_before_update)) - - # Indicates which fields in the provided subscription to update. - update_mask = FieldMask(paths=["dead_letter_policy.max_delivery_attempts"]) - - # Construct a dead letter policy you expect to have after the update. - dead_letter_policy = DeadLetterPolicy( - dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=20 - ) - - # Construct the subscription with the dead letter policy you expect to have - # after the update. Here, values in the required fields (name, topic) help - # identify the subscription. - subscription = pubsub_v1.types.Subscription( - name=subscription_path, topic=topic_path, dead_letter_policy=dead_letter_policy, - ) - - with subscriber: - subscription_after_update = subscriber.update_subscription( - subscription, update_mask - ) - - print("After the update: {}".format(subscription_after_update)) - # [END pubsub_dead_letter_update_subscription] - return subscription_after_update - - -def remove_dead_letter_policy(project_id, topic_id, subscription_id): - """Remove dead letter policy from a subscription.""" - # [START pubsub_dead_letter_remove] - from google.cloud import pubsub_v1 - from google.cloud.pubsub_v1.types import FieldMask - - # TODO(developer) - # project_id = "your-project-id" - # TODO(developer): This is an existing topic that the subscription - # with dead letter policy is attached to. - # topic_id = "your-topic-id" - # TODO(developer): This is an existing subscription with a dead letter policy. - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - topic_path = subscriber.topic_path(project_id, topic_id) - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - subscription_before_update = subscriber.get_subscription(subscription_path) - print("Before removing the policy: {}".format(subscription_before_update)) - - # Indicates which fields in the provided subscription to update. - update_mask = FieldMask( - paths=[ - "dead_letter_policy.dead_letter_topic", - "dead_letter_policy.max_delivery_attempts", - ] - ) - - # Construct the subscription (without any dead letter policy) that you - # expect to have after the update. - subscription = pubsub_v1.types.Subscription( - name=subscription_path, topic=topic_path - ) - - with subscriber: - subscription_after_update = subscriber.update_subscription( - subscription, update_mask - ) - - print("After removing the policy: {}".format(subscription_after_update)) - # [END pubsub_dead_letter_remove] - return subscription_after_update - - -def receive_messages(project_id, subscription_id, timeout=None): - """Receives messages from a pull subscription.""" - # [START pubsub_subscriber_async_pull] - # [START pubsub_quickstart_subscriber] - from concurrent.futures import TimeoutError - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - # Number of seconds the subscriber should listen for messages - # timeout = 5.0 - - subscriber = pubsub_v1.SubscriberClient() - # The `subscription_path` method creates a fully qualified identifier - # in the form `projects/{project_id}/subscriptions/{subscription_id}` - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - def callback(message): - print("Received message: {}".format(message)) - message.ack() - - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) - print("Listening for messages on {}..\n".format(subscription_path)) - - # Wrap subscriber in a 'with' block to automatically call close() when done. - with subscriber: - try: - # When `timeout` is not set, result() will block indefinitely, - # unless an exception is encountered first. - streaming_pull_future.result(timeout=timeout) - except TimeoutError: - streaming_pull_future.cancel() - # [END pubsub_subscriber_async_pull] - # [END pubsub_quickstart_subscriber] - - -def receive_messages_with_custom_attributes(project_id, subscription_id, timeout=None): - """Receives messages from a pull subscription.""" - # [START pubsub_subscriber_async_pull_custom_attributes] - from concurrent.futures import TimeoutError - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - # Number of seconds the subscriber should listen for messages - # timeout = 5.0 - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - def callback(message): - print("Received message: {}".format(message.data)) - if message.attributes: - print("Attributes:") - for key in message.attributes: - value = message.attributes.get(key) - print("{}: {}".format(key, value)) - message.ack() - - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) - print("Listening for messages on {}..\n".format(subscription_path)) - - # Wrap subscriber in a 'with' block to automatically call close() when done. - with subscriber: - try: - # When `timeout` is not set, result() will block indefinitely, - # unless an exception is encountered first. - streaming_pull_future.result(timeout=timeout) - except TimeoutError: - streaming_pull_future.cancel() - # [END pubsub_subscriber_async_pull_custom_attributes] - - -def receive_messages_with_flow_control(project_id, subscription_id, timeout=None): - """Receives messages from a pull subscription with flow control.""" - # [START pubsub_subscriber_flow_settings] - from concurrent.futures import TimeoutError - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - # Number of seconds the subscriber should listen for messages - # timeout = 5.0 - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - def callback(message): - print("Received message: {}".format(message.data)) - message.ack() - - # Limit the subscriber to only have ten outstanding messages at a time. - flow_control = pubsub_v1.types.FlowControl(max_messages=10) - - streaming_pull_future = subscriber.subscribe( - subscription_path, callback=callback, flow_control=flow_control - ) - print("Listening for messages on {}..\n".format(subscription_path)) - - # Wrap subscriber in a 'with' block to automatically call close() when done. - with subscriber: - try: - # When `timeout` is not set, result() will block indefinitely, - # unless an exception is encountered first. - streaming_pull_future.result(timeout=timeout) - except TimeoutError: - streaming_pull_future.cancel() - # [END pubsub_subscriber_flow_settings] - - -def synchronous_pull(project_id, subscription_id): - """Pulling messages synchronously.""" - # [START pubsub_subscriber_sync_pull] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - NUM_MESSAGES = 3 - - # Wrap the subscriber in a 'with' block to automatically call close() to - # close the underlying gRPC channel when done. - with subscriber: - # The subscriber pulls a specific number of messages. - response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) - - ack_ids = [] - for received_message in response.received_messages: - print("Received: {}".format(received_message.message.data)) - ack_ids.append(received_message.ack_id) - - # Acknowledges the received messages so they will not be sent again. - subscriber.acknowledge(subscription_path, ack_ids) - - print( - "Received and acknowledged {} messages. Done.".format( - len(response.received_messages) - ) - ) - # [END pubsub_subscriber_sync_pull] - - -def synchronous_pull_with_lease_management(project_id, subscription_id): - """Pulling messages synchronously with lease management""" - # [START pubsub_subscriber_sync_pull_with_lease] - import logging - import multiprocessing - import random - import time - - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - NUM_MESSAGES = 2 - ACK_DEADLINE = 30 - SLEEP_TIME = 10 - - # The subscriber pulls a specific number of messages. - response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) - - multiprocessing.log_to_stderr() - logger = multiprocessing.get_logger() - logger.setLevel(logging.INFO) - - def worker(msg): - """Simulates a long-running process.""" - RUN_TIME = random.randint(1, 60) - logger.info( - "{}: Running {} for {}s".format( - time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME - ) - ) - time.sleep(RUN_TIME) - - # `processes` stores process as key and ack id and message as values. - processes = dict() - for message in response.received_messages: - process = multiprocessing.Process(target=worker, args=(message,)) - processes[process] = (message.ack_id, message.message.data) - process.start() - - while processes: - for process in list(processes): - ack_id, msg_data = processes[process] - # If the process is still running, reset the ack deadline as - # specified by ACK_DEADLINE once every while as specified - # by SLEEP_TIME. - if process.is_alive(): - # `ack_deadline_seconds` must be between 10 to 600. - subscriber.modify_ack_deadline( - subscription_path, [ack_id], ack_deadline_seconds=ACK_DEADLINE, - ) - logger.info( - "{}: Reset ack deadline for {} for {}s".format( - time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE, - ) - ) - - # If the processs is finished, acknowledges using `ack_id`. - else: - subscriber.acknowledge(subscription_path, [ack_id]) - logger.info( - "{}: Acknowledged {}".format( - time.strftime("%X", time.gmtime()), msg_data - ) - ) - processes.pop(process) - - # If there are still processes running, sleeps the thread. - if processes: - time.sleep(SLEEP_TIME) - - print( - "Received and acknowledged {} messages. Done.".format( - len(response.received_messages) - ) - ) - - # Close the underlying gPRC channel. Alternatively, wrap subscriber in - # a 'with' block to automatically call close() when done. - subscriber.close() - # [END pubsub_subscriber_sync_pull_with_lease] - - -def listen_for_errors(project_id, subscription_id, timeout=None): - """Receives messages and catches errors from a pull subscription.""" - # [START pubsub_subscriber_error_listener] - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - # Number of seconds the subscriber should listen for messages - # timeout = 5.0 - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - def callback(message): - print("Received message: {}".format(message)) - message.ack() - - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) - print("Listening for messages on {}..\n".format(subscription_path)) - - # Wrap subscriber in a 'with' block to automatically call close() when done. - with subscriber: - # When `timeout` is not set, result() will block indefinitely, - # unless an exception is encountered first. - try: - streaming_pull_future.result(timeout=timeout) - except Exception as e: - streaming_pull_future.cancel() - print( - "Listening for messages on {} threw an exception: {}.".format( - subscription_id, e - ) - ) - # [END pubsub_subscriber_error_listener] - - -def receive_messages_with_delivery_attempts(project_id, subscription_id, timeout=None): - # [START pubsub_dead_letter_delivery_attempt] - from concurrent.futures import TimeoutError - from google.cloud import pubsub_v1 - - # TODO(developer) - # project_id = "your-project-id" - # subscription_id = "your-subscription-id" - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_id) - - def callback(message): - print("Received message: {}".format(message)) - print("With delivery attempts: {}".format(message.delivery_attempt)) - message.ack() - - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) - print("Listening for messages on {}..\n".format(subscription_path)) - - # Wrap subscriber in a 'with' block to automatically call close() when done. - with subscriber: - # When `timeout` is not set, result() will block indefinitely, - # unless an exception is encountered first. - try: - streaming_pull_future.result(timeout=timeout) - except TimeoutError: - streaming_pull_future.cancel() - # [END pubsub_dead_letter_delivery_attempt] - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("project_id", help="Your Google Cloud project ID") - - subparsers = parser.add_subparsers(dest="command") - list_in_topic_parser = subparsers.add_parser( - "list-in-topic", help=list_subscriptions_in_topic.__doc__ - ) - list_in_topic_parser.add_argument("topic_id") - - list_in_project_parser = subparsers.add_parser( - "list-in-project", help=list_subscriptions_in_project.__doc__ - ) - - create_parser = subparsers.add_parser("create", help=create_subscription.__doc__) - create_parser.add_argument("topic_id") - create_parser.add_argument("subscription_id") - - create_with_dead_letter_policy_parser = subparsers.add_parser( - "create-with-dead-letter-policy", - help=create_subscription_with_dead_letter_topic.__doc__, - ) - create_with_dead_letter_policy_parser.add_argument("topic_id") - create_with_dead_letter_policy_parser.add_argument("subscription_id") - create_with_dead_letter_policy_parser.add_argument("dead_letter_topic_id") - - create_push_parser = subparsers.add_parser( - "create-push", help=create_push_subscription.__doc__ - ) - create_push_parser.add_argument("topic_id") - create_push_parser.add_argument("subscription_id") - create_push_parser.add_argument("endpoint") - - delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__) - delete_parser.add_argument("subscription_id") - - update_push_parser = subparsers.add_parser( - "update-push", help=update_push_subscription.__doc__ - ) - update_push_parser.add_argument("topic_id") - update_push_parser.add_argument("subscription_id") - update_push_parser.add_argument("endpoint") - - update_dead_letter_policy_parser = subparsers.add_parser( - "update-dead-letter-policy", - help=update_subscription_with_dead_letter_policy.__doc__, - ) - update_dead_letter_policy_parser.add_argument("topic_id") - update_dead_letter_policy_parser.add_argument("subscription_id") - update_dead_letter_policy_parser.add_argument("dead_letter_topic_id") - - remove_dead_letter_policy_parser = subparsers.add_parser( - "remove-dead-letter-policy", help=remove_dead_letter_policy.__doc__ - ) - remove_dead_letter_policy_parser.add_argument("topic_id") - remove_dead_letter_policy_parser.add_argument("subscription_id") - - receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__) - receive_parser.add_argument("subscription_id") - receive_parser.add_argument("timeout", default=None, type=float, nargs="?") - - receive_with_custom_attributes_parser = subparsers.add_parser( - "receive-custom-attributes", - help=receive_messages_with_custom_attributes.__doc__, - ) - receive_with_custom_attributes_parser.add_argument("subscription_id") - receive_with_custom_attributes_parser.add_argument( - "timeout", default=None, type=float, nargs="?" - ) - - receive_with_flow_control_parser = subparsers.add_parser( - "receive-flow-control", help=receive_messages_with_flow_control.__doc__ - ) - receive_with_flow_control_parser.add_argument("subscription_id") - receive_with_flow_control_parser.add_argument( - "timeout", default=None, type=float, nargs="?" - ) - - synchronous_pull_parser = subparsers.add_parser( - "receive-synchronously", help=synchronous_pull.__doc__ - ) - synchronous_pull_parser.add_argument("subscription_id") - - synchronous_pull_with_lease_management_parser = subparsers.add_parser( - "receive-synchronously-with-lease", - help=synchronous_pull_with_lease_management.__doc__, - ) - synchronous_pull_with_lease_management_parser.add_argument("subscription_id") - - listen_for_errors_parser = subparsers.add_parser( - "listen-for-errors", help=listen_for_errors.__doc__ - ) - listen_for_errors_parser.add_argument("subscription_id") - listen_for_errors_parser.add_argument( - "timeout", default=None, type=float, nargs="?" - ) - - receive_messages_with_delivery_attempts_parser = subparsers.add_parser( - "receive-messages-with-delivery-attempts", - help=receive_messages_with_delivery_attempts.__doc__, - ) - receive_messages_with_delivery_attempts_parser.add_argument("subscription_id") - receive_messages_with_delivery_attempts_parser.add_argument( - "timeout", default=None, type=float, nargs="?" - ) - - args = parser.parse_args() - - if args.command == "list-in-topic": - list_subscriptions_in_topic(args.project_id, args.topic_id) - elif args.command == "list-in-project": - list_subscriptions_in_project(args.project_id) - elif args.command == "create": - create_subscription(args.project_id, args.topic_id, args.subscription_id) - elif args.command == "create-with-dead-letter-policy": - create_subscription_with_dead_letter_topic( - args.project_id, - args.topic_id, - args.subscription_id, - args.dead_letter_topic_id, - ) - elif args.command == "create-push": - create_push_subscription( - args.project_id, args.topic_id, args.subscription_id, args.endpoint, - ) - elif args.command == "delete": - delete_subscription(args.project_id, args.subscription_id) - elif args.command == "update-push": - update_push_subscription( - args.project_id, args.topic_id, args.subscription_id, args.endpoint, - ) - elif args.command == "update-dead-letter-policy": - update_subscription_with_dead_letter_policy( - args.project_id, - args.topic_id, - args.subscription_id, - args.dead_letter_topic_id, - ) - elif args.command == "remove-dead-letter-policy": - remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id) - elif args.command == "receive": - receive_messages(args.project_id, args.subscription_id, args.timeout) - elif args.command == "receive-custom-attributes": - receive_messages_with_custom_attributes( - args.project_id, args.subscription_id, args.timeout - ) - elif args.command == "receive-flow-control": - receive_messages_with_flow_control( - args.project_id, args.subscription_id, args.timeout - ) - elif args.command == "receive-synchronously": - synchronous_pull(args.project_id, args.subscription_id) - elif args.command == "receive-synchronously-with-lease": - synchronous_pull_with_lease_management(args.project_id, args.subscription_id) - elif args.command == "listen-for-errors": - listen_for_errors(args.project_id, args.subscription_id, args.timeout) - elif args.command == "receive-messages-with-delivery-attempts": - receive_messages_with_delivery_attempts( - args.project_id, args.subscription_id, args.timeout - ) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py deleted file mode 100644 index a7f7c139c..000000000 --- a/samples/snippets/subscriber_test.py +++ /dev/null @@ -1,341 +0,0 @@ -# Copyright 2016 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import uuid - -import backoff -from google.cloud import pubsub_v1 -import pytest - -import subscriber - -UUID = uuid.uuid4().hex -PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] -TOPIC = "subscription-test-topic-" + UUID -DEAD_LETTER_TOPIC = "subscription-test-dead-letter-topic-" + UUID -SUBSCRIPTION_ADMIN = "subscription-test-subscription-admin-" + UUID -SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID -SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID -SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID -ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) -NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) - - -@pytest.fixture(scope="module") -def publisher_client(): - yield pubsub_v1.PublisherClient() - - -@pytest.fixture(scope="module") -def topic(publisher_client): - topic_path = publisher_client.topic_path(PROJECT, TOPIC) - - try: - topic = publisher_client.get_topic(topic_path) - except: # noqa - topic = publisher_client.create_topic(topic_path) - - yield topic.name - - publisher_client.delete_topic(topic.name) - - -@pytest.fixture(scope="module") -def dead_letter_topic(publisher_client): - topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) - - try: - dead_letter_topic = publisher_client.get_topic(topic_path) - except: # noqa - dead_letter_topic = publisher_client.create_topic(topic_path) - - yield dead_letter_topic.name - - publisher_client.delete_topic(dead_letter_topic.name) - - -@pytest.fixture(scope="module") -def subscriber_client(): - subscriber_client = pubsub_v1.SubscriberClient() - yield subscriber_client - subscriber_client.close() - - -@pytest.fixture(scope="module") -def subscription_admin(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) - - try: - subscription = subscriber_client.get_subscription(subscription_path) - except: # noqa - subscription = subscriber_client.create_subscription( - subscription_path, topic=topic - ) - - yield subscription.name - - -@pytest.fixture(scope="module") -def subscription_sync(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_SYNC) - - try: - subscription = subscriber_client.get_subscription(subscription_path) - except: # noqa - subscription = subscriber_client.create_subscription( - subscription_path, topic=topic - ) - - yield subscription.name - - subscriber_client.delete_subscription(subscription.name) - - -@pytest.fixture(scope="module") -def subscription_async(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ASYNC) - - try: - subscription = subscriber_client.get_subscription(subscription_path) - except: # noqa - subscription = subscriber_client.create_subscription( - subscription_path, topic=topic - ) - - yield subscription.name - - subscriber_client.delete_subscription(subscription.name) - - -@pytest.fixture(scope="module") -def subscription_dlq(subscriber_client, topic): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) - - try: - subscription = subscriber_client.get_subscription(subscription_path) - except: # noqa - subscription = subscriber_client.create_subscription( - subscription_path, topic=topic - ) - - yield subscription.name - - subscriber_client.delete_subscription(subscription.name) - - -def test_list_in_topic(subscription_admin, capsys): - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - subscriber.list_subscriptions_in_topic(PROJECT, TOPIC) - out, _ = capsys.readouterr() - assert subscription_admin in out - - eventually_consistent_test() - - -def test_list_in_project(subscription_admin, capsys): - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - subscriber.list_subscriptions_in_project(PROJECT) - out, _ = capsys.readouterr() - assert subscription_admin in out - - eventually_consistent_test() - - -def test_create(subscriber_client): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) - - try: - subscriber_client.delete_subscription(subscription_path) - except Exception: - pass - - subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN) - - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - assert subscriber_client.get_subscription(subscription_path) - - eventually_consistent_test() - - -def test_create_subscription_with_dead_letter_policy( - subscriber_client, publisher_client, topic, dead_letter_topic, capsys -): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) - dead_letter_topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) - - try: - subscriber_client.delete_subscription(subscription_path) - except Exception: - pass - - subscriber.create_subscription_with_dead_letter_topic( - PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC - ) - - out, _ = capsys.readouterr() - assert "Subscription created: " + subscription_path in out - assert "It will forward dead letter messages to: " + dead_letter_topic_path in out - assert "After 10 delivery attempts." in out - - -def test_create_push(subscriber_client): - subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) - try: - subscriber_client.delete_subscription(subscription_path) - except Exception: - pass - - subscriber.create_push_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT) - - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - assert subscriber_client.get_subscription(subscription_path) - - eventually_consistent_test() - - -def test_update(subscriber_client, subscription_admin, capsys): - subscriber.update_push_subscription( - PROJECT, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT - ) - - out, _ = capsys.readouterr() - assert "Subscription updated" in out - - -def test_update_dead_letter_policy( - subscriber_client, topic, subscription_dlq, dead_letter_topic, capsys -): - _ = subscriber.update_subscription_with_dead_letter_policy( - PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC - ) - - out, _ = capsys.readouterr() - assert "max_delivery_attempts: 20" in out - - -def test_delete(subscriber_client, subscription_admin): - subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ADMIN) - - @backoff.on_exception(backoff.expo, AssertionError, max_time=60) - def eventually_consistent_test(): - with pytest.raises(Exception): - subscriber_client.get_subscription(subscription_admin) - - eventually_consistent_test() - - -def _publish_messages(publisher_client, topic): - for n in range(5): - data = u"message {}".format(n).encode("utf-8") - publish_future = publisher_client.publish( - topic, data=data, origin="python-sample" - ) - publish_future.result() - - -def test_receive(publisher_client, topic, subscription_async, capsys): - _publish_messages(publisher_client, topic) - - subscriber.receive_messages(PROJECT, SUBSCRIPTION_ASYNC, 5) - - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_async in out - assert "message" in out - - -def test_receive_with_custom_attributes( - publisher_client, topic, subscription_async, capsys -): - - _publish_messages(publisher_client, topic) - - subscriber.receive_messages_with_custom_attributes(PROJECT, SUBSCRIPTION_ASYNC, 5) - - out, _ = capsys.readouterr() - assert "message" in out - assert "origin" in out - assert "python-sample" in out - - -def test_receive_with_flow_control(publisher_client, topic, subscription_async, capsys): - - _publish_messages(publisher_client, topic) - - subscriber.receive_messages_with_flow_control(PROJECT, SUBSCRIPTION_ASYNC, 5) - - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_async in out - assert "message" in out - - -def test_receive_synchronously(publisher_client, topic, subscription_sync, capsys): - _publish_messages(publisher_client, topic) - - subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC) - - out, _ = capsys.readouterr() - assert "Done." in out - - -def test_receive_synchronously_with_lease( - publisher_client, topic, subscription_sync, capsys -): - _publish_messages(publisher_client, topic) - - subscriber.synchronous_pull_with_lease_management(PROJECT, SUBSCRIPTION_SYNC) - - out, _ = capsys.readouterr() - assert "Done." in out - - -def test_listen_for_errors(publisher_client, topic, subscription_async, capsys): - - _publish_messages(publisher_client, topic) - - subscriber.listen_for_errors(PROJECT, SUBSCRIPTION_ASYNC, 5) - - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_async in out - assert "threw an exception" in out - - -def test_receive_with_delivery_attempts( - publisher_client, topic, subscription_dlq, dead_letter_topic, capsys -): - _publish_messages(publisher_client, topic) - - subscriber.receive_messages_with_delivery_attempts(PROJECT, SUBSCRIPTION_DLQ, 10) - - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_dlq in out - assert "Received message: " in out - assert "message 4" in out - assert "With delivery attempts: " in out - - -def test_remove_dead_letter_policy(subscriber_client, subscription_dlq): - subscription_after_update = subscriber.remove_dead_letter_policy( - PROJECT, TOPIC, SUBSCRIPTION_DLQ - ) - - assert subscription_after_update.dead_letter_policy.dead_letter_topic == "" diff --git a/synth.py b/synth.py index 0e2c96e42..b44cc0acf 100644 --- a/synth.py +++ b/synth.py @@ -18,7 +18,6 @@ import synthtool as s from synthtool import gcp -from synthtool.languages import python gapic = gcp.GAPICBazel() common = gcp.CommonTemplates() @@ -267,16 +266,8 @@ def _merge_dict(d1, d2): # Add templated files # ---------------------------------------------------------------------------- templated_files = gcp.CommonTemplates().py_library( - unit_cov_level=97, - cov_level=99, - system_test_external_dependencies=["psutil"], - samples=True, + unit_cov_level=97, cov_level=99, system_test_external_dependencies=["psutil"], ) s.move(templated_files) -# ---------------------------------------------------------------------------- -# Samples templates -# ---------------------------------------------------------------------------- -python.py_samples() - s.shell.run(["nox", "-s", "blacken"], hide_output=False)