From e204b86e34346b6d456771ef4d4cfe5e15e53238 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 26 Jun 2020 19:01:00 +0200 Subject: [PATCH] samples: add samples from pubsub/cloud-client (#134) * Add pubsub publisher and subscriber samples Change-Id: I38b90c10aef72c37188c4520897302933b9d2ea7 * Update readme Change-Id: Ie95e2e1556a8d97b5321dc86bf8de431aa36a2d5 * Add pubsub iam samples Change-Id: I12c407d3cdf4a3f9736dfaeca6f20b31df6d310a * Fix lint issue Change-Id: Ifebdab0b974cc3d3fe8900a23ca7416fed9e026a * Auto-update dependencies. [(#540)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/540) * Auto-update dependencies. [(#542)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/542) * Move to google-cloud [(#544)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/544) * Add new "quickstart" samples [(#547)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/547) * Quickstart tests [(#569)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/569) * Add tests for quickstarts * Update secrets * Generate readmes for most service samples [(#599)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/599) * Update samples to support latest Google Cloud Python [(#656)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/656) * Auto-update dependencies. [(#715)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/715) * Fix pubusb tests Change-Id: I7dfe60b0f1240dc58a664968fd97ca5a8fa1109d * Auto-update dependencies. [(#825)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/825) * Auto-update dependencies. [(#876)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/876) * Fix reference to our testing tools * Re-generate all readmes * Auto-update dependencies. [(#922)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/922) * Auto-update dependencies. * Fix pubsub iam samples * Fix README rst links [(#962)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/962) * Fix README rst links * Update all READMEs * Auto-update dependencies. [(#1004)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1004) * Auto-update dependencies. * Fix natural language samples * Fix pubsub iam samples * Fix language samples * Fix bigquery samples * Auto-update dependencies. [(#1055)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1055) * Auto-update dependencies. * Explicitly use latest bigtable client Change-Id: Id71e9e768f020730e4ca9514a0d7ebaa794e7d9e * Revert language update for now Change-Id: I8867f154e9a5aae00d0047c9caf880e5e8f50c53 * Remove pdb. smh Change-Id: I5ff905fadc026eebbcd45512d4e76e003e3b2b43 * Update pubsub samples [(#1092)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1092) * Fix argpraser for pubsub subscriber Change-Id: I776863091846ee8ff8a70078c8b8d5498cf81ed6 * Add comment about result blocking in pubsub samples Change-Id: I149fc1242ceb6b2cff8eae7ef18b364dd5c26566 * Auto-update dependencies. [(#1097)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1097) * Update all generated readme auth instructions [(#1121)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1121) Change-Id: I03b5eaef8b17ac3dc3c0339fd2c7447bd3e11bd2 * Added Link to Python Setup Guide [(#1158)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1158) * Update Readme.rst to add Python setup guide As requested in b/64770713. This sample is linked in documentation https://cloud.google.com/bigtable/docs/scaling, and it would make more sense to update the guide here than in the documentation. * Update README.rst * Update README.rst * Update README.rst * Update README.rst * Update README.rst * Update install_deps.tmpl.rst * Updated readmegen scripts and re-generated related README files * Fixed the lint error * Auto-update dependencies. [(#1138)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1138) * Fix a few more lint issues Change-Id: I0d420f3053f391fa225e4b8179e45fd1138f5c65 * Add Snippet for Listing All Subscriptions in a Project [(#1169)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1169) * Auto-update dependencies. [(#1186)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1186) * Auto-update dependencies. [(#1234)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1234) * Auto-update dependencies. * Drop pytest-logcapture as it's no longer needed Change-Id: Ia8b9e8aaf248e9770db6bc4842a4532df8383893 * Auto-update dependencies. [(#1239)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1239) * Added "Open in Cloud Shell" buttons to README files [(#1254)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1254) * Auto-update dependencies. [(#1263)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1263) * Auto-update dependencies. [(#1272)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1272) * Auto-update dependencies. * Update requirements.txt * Auto-update dependencies. [(#1282)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1282) * Auto-update dependencies. * Fix storage acl sample Change-Id: I413bea899fdde4c4859e4070a9da25845b81f7cf * Add listen for errors sample. [(#1306)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1306) * Add listen for errors sample. * Update subscriber.py * Update subscriber.py * Fix subscription.open get called twice in the client libraries [(#1321)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1321) * Add tests for creating push subscription. [(#1332)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1332) This is a separate PR from actually adding the sample, which is in https://github.com/GoogleCloudPlatform/python-docs-samples/pull/1331. * Add create push subscription sample. [(#1331)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1331) * Update API version and body. [(#1326)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1326) The API version should be v1, not v1beta1. Also remove the unnecessary 'data' field from the body and just use 'binary_data'. * Add sample for updating a subscription. [(#1335)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1335) * Change update_subscription to change endpoint URL. [(#1344)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1344) The documentation specifies that the update subscription commands show how to update an endpoint URL: https://cloud.google.com/pubsub/docs/admin#update_a_subscription. * Auto-update dependencies. [(#1359)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1359) * Auto-update dependencies. [(#1389)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1389) * Added sample for publishing/receiving messages with custom attributes [(#1409)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1409) * Auto-update dependencies. * Regenerate the README files and fix the Open in Cloud Shell link for some samples [(#1441)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1441) * Update READMEs to fix numbering and add git clone [(#1464)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1464) * PubSub: adds region tags and updates existing to standard [(#1491)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1491) * Pubsub: Add missing region tag [(#1498)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1498) * Add the Pub/Sub handle_publisher_error sample [(#1440)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1440) * Add the Pub/Sub handle_publisher_error sample * Update requirements.txt * Update publisher.py * Update publisher.py * Added region tag * Modified publisher with error handling [(#1568)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1568) * Updated google-cloud-pubsub to version 0.35 [(#1624)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1624) * Updated library version * Rewrote test for publish with error handler * Custom _publish function in test prints no 'Attributes' * Added timeout in error handling [(#1636)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1636) * Auto-update dependencies. [(#1658)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1658) * Auto-update dependencies. * Rollback appengine/standard/bigquery/. * Rollback appengine/standard/iap/. * Rollback bigtable/metricscaler. * Rolledback appengine/flexible/datastore. * Rollback dataproc/ * Rollback jobs/api_client * Rollback vision/cloud-client. * Rollback functions/ocr/app. * Rollback iot/api-client/end_to_end_example. * Rollback storage/cloud-client. * Rollback kms/api-client. * Rollback dlp/ * Rollback bigquery/cloud-client. * Rollback iot/api-client/manager. * Rollback appengine/flexible/cloudsql_postgresql. * Added sample for Pub/Sub synchronous pull subscriber [(#1673)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1673) * Added sample for synchronous pull * Updated variable name [(#1680)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1680) * Fixed return object from `subscriber.subscribe()` [(#1685)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1685) * Pub/Sub: synchronous pull with lease management [(#1701)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1701) * Synchronous pull with lease management * Updated library version * Pub/Sub: moved import statements inside region tags [(#1753)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1753) * Moved import stataments inside region tags * Explained topic and subscription path methods * Pub/Sub end-to-end sample [(#1800)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1800) * Created new end-to-end sample, moved old sample * Add space around operator * Add test for updating a subscription. [(#1336)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1336) Tests for https://github.com/GoogleCloudPlatform/python-docs-samples/pull/1335. Using ack_deadline_seconds as the example. * Fix update test to use new endpoint [(#1925)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1925) * Fix update test to use new endpoint * Handle subscription already exists Previous deletions don't always succeed * Use a new endpoint for update * Auto-update dependencies. [(#1980)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/1980) * Auto-update dependencies. * Update requirements.txt * Update requirements.txt * Cloud Pub/Sub Quickstart V2 [(#2004)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2004) * Quickstart V2 * Adopts Kir's suggestions * Adopted Tim's suggestions * proper resource deletion during teardown * Pub/Sub: publish with error-handling comments [(#2222)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2222) * Resolve all futures [(#2231)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2231) * Pub/Sub: add publish retry sample [(#2273)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2273) * Publish retry sample * double to single quotes * double to single quotes * license year * Fix a TODO comment on pubsub/cloud-client/subscriber.py [(#2302)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2302) * Print actual number of messages pulled [(#2078)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2078) * Print actual number of messages pulled * Pub/Sub: fix subscriber async region tag mistake [(#2334)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2334) * Pub/Sub: update retry settings in sample [(#2395)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2395) * Pub/Sub: improve pub.py [(#2403)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2403) * print number of messages published * two nit's * Adds updates for samples profiler ... vision [(#2439)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2439) * Pub/Sub: update how subscriber client listens to StreamingPullFuture [(#2475)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2475) * update sub.py & requirements.txt * fix flaky subscriber test with separate subscriptions * Pub/Sub: update how to test with mock [(#2555)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2555) * Update test with mock * Clean up resources after tests * Use unique resource names avoid test failures * Delete subscriptions in cleanup phase * Ensure unique topic name * Update assert to remove bytestring notation * Rewrite PubSubToGCS test using dataflow testing module * Pub/Sub: remove infinite while loops in subscriber examples [(#2604)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2604) * use result() on streaming pull futures instead of infinite while * remove unused imports * Pub/Sub: add timeout in argparse [(#2637)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2637) * Auto-update dependencies. [(#2005)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2005) * Auto-update dependencies. * Revert update of appengine/flexible/datastore. * revert update of appengine/flexible/scipy * revert update of bigquery/bqml * revert update of bigquery/cloud-client * revert update of bigquery/datalab-migration * revert update of bigtable/quickstart * revert update of compute/api * revert update of container_registry/container_analysis * revert update of dataflow/run_template * revert update of datastore/cloud-ndb * revert update of dialogflow/cloud-client * revert update of dlp * revert update of functions/imagemagick * revert update of functions/ocr/app * revert update of healthcare/api-client/fhir * revert update of iam/api-client * revert update of iot/api-client/gcs_file_to_device * revert update of iot/api-client/mqtt_example * revert update of language/automl * revert update of run/image-processing * revert update of vision/automl * revert update testing/requirements.txt * revert update of vision/cloud-client/detect * revert update of vision/cloud-client/product_search * revert update of jobs/v2/api_client * revert update of jobs/v3/api_client * revert update of opencensus * revert update of translate/cloud-client * revert update to speech/cloud-client Co-authored-by: Kurtis Van Gent <31518063+kurtisvg@users.noreply.github.com> Co-authored-by: Doug Mahugh * remove publish concurrency control sample [(#2960)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2960) * Pub/Sub: remove unreferenced samples [(#2986)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2986) * remove qs samples * update README * Pub/Sub: add SubscriberClient.close() to examples [(#3118)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/3118) * Add SubscriberClient.close() to examples. Co-authored-by: Prad Nelluru Co-authored-by: Prad Nelluru * Pub/Sub: update publish with batch settings sample [(#3137)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/3137) * non-blocking publish * remove unused lib * lint * add defaults * Simplify noxfile setup. [(#2806)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/2806) * chore(deps): update dependency requests to v2.23.0 * Simplify noxfile and add version control. * Configure appengine/standard to only test Python 2.7. * Update Kokokro configs to match noxfile. * Add requirements-test to each folder. * Remove Py2 versions from everything execept appengine/standard. * Remove conftest.py. * Remove appengine/standard/conftest.py * Remove 'no-sucess-flaky-report' from pytest.ini. * Add GAE SDK back to appengine/standard tests. * Fix typo. * Roll pytest to python 2 version. * Add a bunch of testing requirements. * Remove typo. * Add appengine lib directory back in. * Add some additional requirements. * Fix issue with flake8 args. * Even more requirements. * Readd appengine conftest.py. * Add a few more requirements. * Even more Appengine requirements. * Add webtest for appengine/standard/mailgun. * Add some additional requirements. * Add workaround for issue with mailjet-rest. * Add responses for appengine/standard/mailjet. Co-authored-by: Renovate Bot * chore: remove gcp-devrel-py-tools from iot and pubsub [(#3470)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/3470) * [iot] chore: remove unused dependency * [pubsub] chore: remove gcp-devrel-py-tools * Update dependency google-cloud-pubsub to v1.4.2 in Storage and Pub/Sub [(#3343)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/3343) * chore: some lint fixes [(#3748)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/3748) * chore(deps): update dependency google-cloud-pubsub to v1.4.3 [(#3725)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/3725) Co-authored-by: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com> Co-authored-by: Takashi Matsuo * chore(deps): update dependency google-cloud-pubsub to v1.5.0 [(#3781)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/3781) Co-authored-by: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com> * samples: add Pub/Sub dead letter queue samples [(#3904)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/3904) * fix: make timeout an optional positional arg [(#3938)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/3938) * fix: make timeout an optional positional arg * place `none` back in function signature Co-authored-by: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com> * fix: replace name with id in samples [(#3953)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/3953) * Replace GCLOUD_PROJECT with GOOGLE_CLOUD_PROJECT. [(#4022)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4022) * nit: remove redundant/wrong Pub/Sub region tag [(#4027)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4027) * Pub/Sub: wrap subscriber in a with block and add comments [(#4070)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4070) Use a `with` block to wrap subscriber and describe its purpose. Internal bug: b/157401623 * Update dependency google-cloud-pubsub to v1.6.0 [(#4039)](https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4039) This PR contains the following updates: | Package | Update | Change | |---|---|---| | [google-cloud-pubsub](https://togithub.com/googleapis/python-pubsub) | minor | `==1.5.0` -> `==1.6.0` | --- ### Release Notes
googleapis/python-pubsub ### [`v1.6.0`](https://togithub.com/googleapis/python-pubsub/blob/master/CHANGELOG.md#​160-httpswwwgithubcomgoogleapispython-pubsubcomparev150v160-2020-06-09) [Compare Source](https://togithub.com/googleapis/python-pubsub/compare/v1.5.0...v1.6.0) ##### Features - Add flow control for message publishing ([#​96](https://www.github.com/googleapis/python-pubsub/issues/96)) ([06085c4](https://www.github.com/googleapis/python-pubsub/commit/06085c4083b9dccdd50383257799904510bbf3a0)) ##### Bug Fixes - Fix PubSub incompatibility with api-core 1.17.0+ ([#​103](https://www.github.com/googleapis/python-pubsub/issues/103)) ([c02060f](https://www.github.com/googleapis/python-pubsub/commit/c02060fbbe6e2ca4664bee08d2de10665d41dc0b)) ##### Documentation - Clarify that Schedulers shouldn't be used with multiple SubscriberClients ([#​100](https://togithub.com/googleapis/python-pubsub/pull/100)) ([cf9e87c](https://togithub.com/googleapis/python-pubsub/commit/cf9e87c80c0771f3fa6ef784a8d76cb760ad37ef)) - Fix update subscription/snapshot/topic samples ([#​113](https://togithub.com/googleapis/python-pubsub/pull/113)) ([e62c38b](https://togithub.com/googleapis/python-pubsub/commit/e62c38bb33de2434e32f866979de769382dea34a)) ##### Internal / Testing Changes - Re-generated service implementaton using synth: removed experimental notes from the RetryPolicy and filtering features in anticipation of GA, added DetachSubscription (experimental) ([#​114](https://togithub.com/googleapis/python-pubsub/pull/114)) ([0132a46](https://togithub.com/googleapis/python-pubsub/commit/0132a4680e0727ce45d5e27d98ffc9f3541a0962)) - Incorporate will_accept() checks into publish() ([#​108](https://togithub.com/googleapis/python-pubsub/pull/108)) ([6c7677e](https://togithub.com/googleapis/python-pubsub/commit/6c7677ecb259672bbb9b6f7646919e602c698570))
--- ### Renovate configuration :date: **Schedule**: At any time (no schedule defined). :vertical_traffic_light: **Automerge**: Disabled by config. Please merge this manually once you are satisfied. :recycle: **Rebasing**: Never, or you tick the rebase/retry checkbox. :no_bell: **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, check this box --- This PR has been generated by [WhiteSource Renovate](https://renovate.whitesourcesoftware.com). View repository job log [here](https://app.renovatebot.com/dashboard#GoogleCloudPlatform/python-docs-samples). * chore: update templates Co-authored-by: Jon Wayne Parrott Co-authored-by: DPE bot Co-authored-by: Jason Dobry Co-authored-by: Bill Prin Co-authored-by: michaelawyu Co-authored-by: noerog <32459203+noerog@users.noreply.github.com> Co-authored-by: L J Co-authored-by: Frank Natividad Co-authored-by: Alix Hamilton Co-authored-by: michaelawyu Co-authored-by: Tianzi Cai Co-authored-by: Charles Engelke Co-authored-by: Tianzi Cai Co-authored-by: Keiji Yoshida Co-authored-by: oli Co-authored-by: Gus Class Co-authored-by: Kurtis Van Gent <31518063+kurtisvg@users.noreply.github.com> Co-authored-by: Doug Mahugh Co-authored-by: Prad Nelluru Co-authored-by: Prad Nelluru Co-authored-by: Renovate Bot Co-authored-by: Takashi Matsuo Co-authored-by: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com> --- .github/CODEOWNERS | 11 + samples/AUTHORING_GUIDE.md | 1 + samples/CONTRIBUTING.md | 1 + samples/snippets/README.rst | 282 +++++++++ samples/snippets/README.rst.in | 30 + samples/snippets/iam.py | 231 +++++++ samples/snippets/iam_test.py | 118 ++++ samples/snippets/noxfile.py | 224 +++++++ samples/snippets/publisher.py | 334 ++++++++++ samples/snippets/publisher_test.py | 146 +++++ samples/snippets/quickstart/pub.py | 86 +++ samples/snippets/quickstart/pub_test.py | 56 ++ samples/snippets/quickstart/sub.py | 69 +++ samples/snippets/quickstart/sub_test.py | 102 +++ samples/snippets/requirements-test.txt | 3 + samples/snippets/requirements.txt | 1 + samples/snippets/subscriber.py | 783 ++++++++++++++++++++++++ samples/snippets/subscriber_test.py | 341 +++++++++++ synth.py | 11 +- 19 files changed, 2829 insertions(+), 1 deletion(-) create mode 100644 .github/CODEOWNERS create mode 100644 samples/AUTHORING_GUIDE.md create mode 100644 samples/CONTRIBUTING.md create mode 100644 samples/snippets/README.rst create mode 100644 samples/snippets/README.rst.in create mode 100644 samples/snippets/iam.py create mode 100644 samples/snippets/iam_test.py create mode 100644 samples/snippets/noxfile.py create mode 100644 samples/snippets/publisher.py create mode 100644 samples/snippets/publisher_test.py create mode 100644 samples/snippets/quickstart/pub.py create mode 100644 samples/snippets/quickstart/pub_test.py create mode 100644 samples/snippets/quickstart/sub.py create mode 100644 samples/snippets/quickstart/sub_test.py create mode 100644 samples/snippets/requirements-test.txt create mode 100644 samples/snippets/requirements.txt create mode 100644 samples/snippets/subscriber.py create mode 100644 samples/snippets/subscriber_test.py diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 000000000..cf01548a9 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,11 @@ +# Code owners file. +# This file controls who is tagged for review for any given pull request. +# +# For syntax help see: +# https://help.github.com/en/github/creating-cloning-and-archiving-repositories/about-code-owners#codeowners-syntax + + +# The python-samples-owners team is the default owner for anything not +# explicitly taken by someone else. + + /samples/ @anguillanneuf @hongalex @googleapis/python-samples-owners diff --git a/samples/AUTHORING_GUIDE.md b/samples/AUTHORING_GUIDE.md new file mode 100644 index 000000000..55c97b32f --- /dev/null +++ b/samples/AUTHORING_GUIDE.md @@ -0,0 +1 @@ +See https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/AUTHORING_GUIDE.md \ No newline at end of file diff --git a/samples/CONTRIBUTING.md b/samples/CONTRIBUTING.md new file mode 100644 index 000000000..34c882b6f --- /dev/null +++ b/samples/CONTRIBUTING.md @@ -0,0 +1 @@ +See https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/CONTRIBUTING.md \ No newline at end of file diff --git a/samples/snippets/README.rst b/samples/snippets/README.rst new file mode 100644 index 000000000..2676680af --- /dev/null +++ b/samples/snippets/README.rst @@ -0,0 +1,282 @@ + +.. This file is automatically generated. Do not edit this file directly. + +Google Cloud Pub/Sub Python Samples +=============================================================================== + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/README.rst + + +This directory contains samples for Google Cloud Pub/Sub. `Google Cloud Pub/Sub`_ is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. + + + + +.. _Google Cloud Pub/Sub: https://cloud.google.com/pubsub/docs + + +Setup +------------------------------------------------------------------------------- + + + +Authentication +++++++++++++++ + +This sample requires you to have authentication setup. Refer to the +`Authentication Getting Started Guide`_ for instructions on setting up +credentials for applications. + +.. _Authentication Getting Started Guide: + https://cloud.google.com/docs/authentication/getting-started + + + + +Install Dependencies +++++++++++++++++++++ + +#. Clone python-docs-samples and change directory to the sample directory you want to use. + + .. code-block:: bash + + $ git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git + +#. Install `pip`_ and `virtualenv`_ if you do not already have them. You may want to refer to the `Python Development Environment Setup Guide`_ for Google Cloud Platform for instructions. + + .. _Python Development Environment Setup Guide: + https://cloud.google.com/python/setup + +#. Create a virtualenv. Samples are compatible with Python 3.6+. + + .. code-block:: bash + + $ virtualenv env + $ source env/bin/activate + +#. Install the dependencies needed to run the samples. + + .. code-block:: bash + + $ pip install -r requirements.txt + +.. _pip: https://pip.pypa.io/ +.. _virtualenv: https://virtualenv.pypa.io/ + + + + + + +Samples +------------------------------------------------------------------------------- + + +Quickstart ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/quickstart.py,pubsub/cloud-client/README.rst + + + + +To run this sample: + +.. code-block:: bash + + $ python quickstart.py + + + + +Publisher ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/publisher.py,pubsub/cloud-client/README.rst + + + + +To run this sample: + +.. code-block:: bash + + $ python publisher.py + + + usage: publisher.py [-h] + project_id + {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} + ... + + This application demonstrates how to perform basic operations on topics + with the Cloud Pub/Sub API. + + For more information, see the README.md under /pubsub and the documentation + at https://cloud.google.com/pubsub/docs. + + positional arguments: + project_id Your Google Cloud project ID + {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} + list Lists all Pub/Sub topics in the given project. + create Create a new Pub/Sub topic. + delete Deletes an existing Pub/Sub topic. + publish Publishes multiple messages to a Pub/Sub topic. + publish-with-custom-attributes + Publishes multiple messages with custom attributes to + a Pub/Sub topic. + publish-with-error-handler + Publishes multiple messages to a Pub/Sub topic with an + error handler. + publish-with-batch-settings + Publishes multiple messages to a Pub/Sub topic with + batch settings. + publish-with-retry-settings + Publishes messages with custom retry settings. + + optional arguments: + -h, --help show this help message and exit + + + + + +Subscribers ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/subscriber.py,pubsub/cloud-client/README.rst + + + + +To run this sample: + +.. code-block:: bash + + $ python subscriber.py + + + usage: subscriber.py [-h] + project_id + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} + ... + + This application demonstrates how to perform basic operations on + subscriptions with the Cloud Pub/Sub API. + + For more information, see the README.md under /pubsub and the documentation + at https://cloud.google.com/pubsub/docs. + + positional arguments: + project_id Your Google Cloud project ID + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} + list-in-topic Lists all subscriptions for a given topic. + list-in-project Lists all subscriptions in the current project. + create Create a new pull subscription on the given topic. + create-with-dead-letter-policy + Create a subscription with dead letter policy. + create-push Create a new push subscription on the given topic. + delete Deletes an existing Pub/Sub topic. + update-push Updates an existing Pub/Sub subscription's push + endpoint URL. Note that certain properties of a + subscription, such as its topic, are not modifiable. + update-dead-letter-policy + Update a subscription's dead letter policy. + remove-dead-letter-policy + Remove dead letter policy from a subscription. + receive Receives messages from a pull subscription. + receive-custom-attributes + Receives messages from a pull subscription. + receive-flow-control + Receives messages from a pull subscription with flow + control. + receive-synchronously + Pulling messages synchronously. + receive-synchronously-with-lease + Pulling messages synchronously with lease management + listen-for-errors Receives messages and catches errors from a pull + subscription. + receive-messages-with-delivery-attempts + + optional arguments: + -h, --help show this help message and exit + + + + + +Identity and Access Management ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/iam.py,pubsub/cloud-client/README.rst + + + + +To run this sample: + +.. code-block:: bash + + $ python iam.py + + + usage: iam.py [-h] + project + {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} + ... + + This application demonstrates how to perform basic operations on IAM + policies with the Cloud Pub/Sub API. + + For more information, see the README.md under /pubsub and the documentation + at https://cloud.google.com/pubsub/docs. + + positional arguments: + project Your Google Cloud project ID + {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} + get-topic-policy Prints the IAM policy for the given topic. + get-subscription-policy + Prints the IAM policy for the given subscription. + set-topic-policy Sets the IAM policy for a topic. + set-subscription-policy + Sets the IAM policy for a topic. + check-topic-permissions + Checks to which permissions are available on the given + topic. + check-subscription-permissions + Checks to which permissions are available on the given + subscription. + + optional arguments: + -h, --help show this help message and exit + + + + + + + + + +The client library +------------------------------------------------------------------------------- + +This sample uses the `Google Cloud Client Library for Python`_. +You can read the documentation for more details on API usage and use GitHub +to `browse the source`_ and `report issues`_. + +.. _Google Cloud Client Library for Python: + https://googlecloudplatform.github.io/google-cloud-python/ +.. _browse the source: + https://github.com/GoogleCloudPlatform/google-cloud-python +.. _report issues: + https://github.com/GoogleCloudPlatform/google-cloud-python/issues + + + +.. _Google Cloud SDK: https://cloud.google.com/sdk/ diff --git a/samples/snippets/README.rst.in b/samples/snippets/README.rst.in new file mode 100644 index 000000000..ddbc64712 --- /dev/null +++ b/samples/snippets/README.rst.in @@ -0,0 +1,30 @@ +# This file is used to generate README.rst + +product: + name: Google Cloud Pub/Sub + short_name: Cloud Pub/Sub + url: https://cloud.google.com/pubsub/docs + description: > + `Google Cloud Pub/Sub`_ is a fully-managed real-time messaging service that + allows you to send and receive messages between independent applications. + +setup: +- auth +- install_deps + +samples: +- name: Quickstart + file: quickstart.py +- name: Publisher + file: publisher.py + show_help: true +- name: Subscribers + file: subscriber.py + show_help: true +- name: Identity and Access Management + file: iam.py + show_help: true + +cloud_client_library: true + +folder: pubsub/cloud-client \ No newline at end of file diff --git a/samples/snippets/iam.py b/samples/snippets/iam.py new file mode 100644 index 000000000..71c55d764 --- /dev/null +++ b/samples/snippets/iam.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python + +# Copyright 2019 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This application demonstrates how to perform basic operations on IAM +policies with the Cloud Pub/Sub API. + +For more information, see the README.md under /pubsub and the documentation +at https://cloud.google.com/pubsub/docs. +""" + +import argparse + + +def get_topic_policy(project, topic_id): + """Prints the IAM policy for the given topic.""" + # [START pubsub_get_topic_policy] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + client = pubsub_v1.PublisherClient() + topic_path = client.topic_path(project, topic_id) + + policy = client.get_iam_policy(topic_path) + + print("Policy for topic {}:".format(topic_path)) + for binding in policy.bindings: + print("Role: {}, Members: {}".format(binding.role, binding.members)) + # [END pubsub_get_topic_policy] + + +def get_subscription_policy(project, subscription_id): + """Prints the IAM policy for the given subscription.""" + # [START pubsub_get_subscription_policy] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + client = pubsub_v1.SubscriberClient() + subscription_path = client.subscription_path(project, subscription_id) + + policy = client.get_iam_policy(subscription_path) + + print("Policy for subscription {}:".format(subscription_path)) + for binding in policy.bindings: + print("Role: {}, Members: {}".format(binding.role, binding.members)) + + client.close() + # [END pubsub_get_subscription_policy] + + +def set_topic_policy(project, topic_id): + """Sets the IAM policy for a topic.""" + # [START pubsub_set_topic_policy] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + client = pubsub_v1.PublisherClient() + topic_path = client.topic_path(project, topic_id) + + policy = client.get_iam_policy(topic_path) + + # Add all users as viewers. + policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) + + # Add a group as a publisher. + policy.bindings.add( + role="roles/pubsub.publisher", members=["group:cloud-logs@google.com"] + ) + + # Set the policy + policy = client.set_iam_policy(topic_path, policy) + + print("IAM policy for topic {} set: {}".format(topic_id, policy)) + # [END pubsub_set_topic_policy] + + +def set_subscription_policy(project, subscription_id): + """Sets the IAM policy for a topic.""" + # [START pubsub_set_subscription_policy] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + client = pubsub_v1.SubscriberClient() + subscription_path = client.subscription_path(project, subscription_id) + + policy = client.get_iam_policy(subscription_path) + + # Add all users as viewers. + policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) + + # Add a group as an editor. + policy.bindings.add(role="roles/editor", members=["group:cloud-logs@google.com"]) + + # Set the policy + policy = client.set_iam_policy(subscription_path, policy) + + print("IAM policy for subscription {} set: {}".format(subscription_id, policy)) + + client.close() + # [END pubsub_set_subscription_policy] + + +def check_topic_permissions(project, topic_id): + """Checks to which permissions are available on the given topic.""" + # [START pubsub_test_topic_permissions] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + client = pubsub_v1.PublisherClient() + topic_path = client.topic_path(project, topic_id) + + permissions_to_check = ["pubsub.topics.publish", "pubsub.topics.update"] + + allowed_permissions = client.test_iam_permissions(topic_path, permissions_to_check) + + print( + "Allowed permissions for topic {}: {}".format(topic_path, allowed_permissions) + ) + # [END pubsub_test_topic_permissions] + + +def check_subscription_permissions(project, subscription_id): + """Checks to which permissions are available on the given subscription.""" + # [START pubsub_test_subscription_permissions] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + client = pubsub_v1.SubscriberClient() + subscription_path = client.subscription_path(project, subscription_id) + + permissions_to_check = [ + "pubsub.subscriptions.consume", + "pubsub.subscriptions.update", + ] + + allowed_permissions = client.test_iam_permissions( + subscription_path, permissions_to_check + ) + + print( + "Allowed permissions for subscription {}: {}".format( + subscription_path, allowed_permissions + ) + ) + + client.close() + # [END pubsub_test_subscription_permissions] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project", help="Your Google Cloud project ID") + + subparsers = parser.add_subparsers(dest="command") + + get_topic_policy_parser = subparsers.add_parser( + "get-topic-policy", help=get_topic_policy.__doc__ + ) + get_topic_policy_parser.add_argument("topic_id") + + get_subscription_policy_parser = subparsers.add_parser( + "get-subscription-policy", help=get_subscription_policy.__doc__ + ) + get_subscription_policy_parser.add_argument("subscription_id") + + set_topic_policy_parser = subparsers.add_parser( + "set-topic-policy", help=set_topic_policy.__doc__ + ) + set_topic_policy_parser.add_argument("topic_id") + + set_subscription_policy_parser = subparsers.add_parser( + "set-subscription-policy", help=set_subscription_policy.__doc__ + ) + set_subscription_policy_parser.add_argument("subscription_id") + + check_topic_permissions_parser = subparsers.add_parser( + "check-topic-permissions", help=check_topic_permissions.__doc__ + ) + check_topic_permissions_parser.add_argument("topic_id") + + check_subscription_permissions_parser = subparsers.add_parser( + "check-subscription-permissions", help=check_subscription_permissions.__doc__, + ) + check_subscription_permissions_parser.add_argument("subscription_id") + + args = parser.parse_args() + + if args.command == "get-topic-policy": + get_topic_policy(args.project, args.topic_id) + elif args.command == "get-subscription-policy": + get_subscription_policy(args.project, args.subscription_id) + elif args.command == "set-topic-policy": + set_topic_policy(args.project, args.topic_id) + elif args.command == "set-subscription-policy": + set_subscription_policy(args.project, args.subscription_id) + elif args.command == "check-topic-permissions": + check_topic_permissions(args.project, args.topic_id) + elif args.command == "check-subscription-permissions": + check_subscription_permissions(args.project, args.subscription_id) diff --git a/samples/snippets/iam_test.py b/samples/snippets/iam_test.py new file mode 100644 index 000000000..d196953f6 --- /dev/null +++ b/samples/snippets/iam_test.py @@ -0,0 +1,118 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +from google.cloud import pubsub_v1 +import pytest + +import iam + +UUID = uuid.uuid4().hex +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +TOPIC = "iam-test-topic-" + UUID +SUBSCRIPTION = "iam-test-subscription-" + UUID + + +@pytest.fixture(scope="module") +def publisher_client(): + yield pubsub_v1.PublisherClient() + + +@pytest.fixture(scope="module") +def topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + publisher_client.delete_topic(topic_path) + except Exception: + pass + + publisher_client.create_topic(topic_path) + + yield topic_path + + publisher_client.delete_topic(topic_path) + + +@pytest.fixture(scope="module") +def subscriber_client(): + subscriber_client = pubsub_v1.SubscriberClient() + yield subscriber_client + subscriber_client.close() + + +@pytest.fixture +def subscription(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION) + + try: + subscriber_client.delete_subscription(subscription_path) + except Exception: + pass + + subscriber_client.create_subscription(subscription_path, topic=topic) + + yield subscription_path + + subscriber_client.delete_subscription(subscription_path) + + +def test_get_topic_policy(topic, capsys): + iam.get_topic_policy(PROJECT, TOPIC) + + out, _ = capsys.readouterr() + assert topic in out + + +def test_get_subscription_policy(subscription, capsys): + iam.get_subscription_policy(PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + assert subscription in out + + +def test_set_topic_policy(publisher_client, topic): + iam.set_topic_policy(PROJECT, TOPIC) + + policy = publisher_client.get_iam_policy(topic) + assert "roles/pubsub.publisher" in str(policy) + assert "allUsers" in str(policy) + + +def test_set_subscription_policy(subscriber_client, subscription): + iam.set_subscription_policy(PROJECT, SUBSCRIPTION) + + policy = subscriber_client.get_iam_policy(subscription) + assert "roles/pubsub.viewer" in str(policy) + assert "allUsers" in str(policy) + + +def test_check_topic_permissions(topic, capsys): + iam.check_topic_permissions(PROJECT, TOPIC) + + out, _ = capsys.readouterr() + + assert topic in out + assert "pubsub.topics.publish" in out + + +def test_check_subscription_permissions(subscription, capsys): + iam.check_subscription_permissions(PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + + assert subscription in out + assert "pubsub.subscriptions.consume" in out diff --git a/samples/snippets/noxfile.py b/samples/snippets/noxfile.py new file mode 100644 index 000000000..ba55d7ce5 --- /dev/null +++ b/samples/snippets/noxfile.py @@ -0,0 +1,224 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +from pathlib import Path +import sys + +import nox + + +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING +# DO NOT EDIT THIS FILE EVER! +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING + +# Copy `noxfile_config.py` to your directory and modify it instead. + + +# `TEST_CONFIG` dict is a configuration hook that allows users to +# modify the test configurations. The values here should be in sync +# with `noxfile_config.py`. Users will copy `noxfile_config.py` into +# their directory and modify it. + +TEST_CONFIG = { + # You can opt out from the test for specific Python versions. + 'ignored_versions': ["2.7"], + + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + 'gcloud_project_env': 'GOOGLE_CLOUD_PROJECT', + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + 'envs': {}, +} + + +try: + # Ensure we can import noxfile_config in the project's directory. + sys.path.append('.') + from noxfile_config import TEST_CONFIG_OVERRIDE +except ImportError as e: + print("No user noxfile_config found: detail: {}".format(e)) + TEST_CONFIG_OVERRIDE = {} + +# Update the TEST_CONFIG with the user supplied values. +TEST_CONFIG.update(TEST_CONFIG_OVERRIDE) + + +def get_pytest_env_vars(): + """Returns a dict for pytest invocation.""" + ret = {} + + # Override the GCLOUD_PROJECT and the alias. + env_key = TEST_CONFIG['gcloud_project_env'] + # This should error out if not set. + ret['GOOGLE_CLOUD_PROJECT'] = os.environ[env_key] + + # Apply user supplied envs. + ret.update(TEST_CONFIG['envs']) + return ret + + +# DO NOT EDIT - automatically generated. +# All versions used to tested samples. +ALL_VERSIONS = ["2.7", "3.6", "3.7", "3.8"] + +# Any default versions that should be ignored. +IGNORED_VERSIONS = TEST_CONFIG['ignored_versions'] + +TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) + +INSTALL_LIBRARY_FROM_SOURCE = bool(os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False)) +# +# Style Checks +# + + +def _determine_local_import_names(start_dir): + """Determines all import names that should be considered "local". + + This is used when running the linter to insure that import order is + properly checked. + """ + file_ext_pairs = [os.path.splitext(path) for path in os.listdir(start_dir)] + return [ + basename + for basename, extension in file_ext_pairs + if extension == ".py" + or os.path.isdir(os.path.join(start_dir, basename)) + and basename not in ("__pycache__") + ] + + +# Linting with flake8. +# +# We ignore the following rules: +# E203: whitespace before ‘:’ +# E266: too many leading ‘#’ for block comment +# E501: line too long +# I202: Additional newline in a section of imports +# +# We also need to specify the rules which are ignored by default: +# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121'] +FLAKE8_COMMON_ARGS = [ + "--show-source", + "--builtin=gettext", + "--max-complexity=20", + "--import-order-style=google", + "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py", + "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202", + "--max-line-length=88", +] + + +@nox.session +def lint(session): + session.install("flake8", "flake8-import-order") + + local_names = _determine_local_import_names(".") + args = FLAKE8_COMMON_ARGS + [ + "--application-import-names", + ",".join(local_names), + "." + ] + session.run("flake8", *args) + + +# +# Sample Tests +# + + +PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"] + + +def _session_tests(session, post_install=None): + """Runs py.test for a particular project.""" + if os.path.exists("requirements.txt"): + session.install("-r", "requirements.txt") + + if os.path.exists("requirements-test.txt"): + session.install("-r", "requirements-test.txt") + + if INSTALL_LIBRARY_FROM_SOURCE: + session.install("-e", _get_repo_root()) + + if post_install: + post_install(session) + + session.run( + "pytest", + *(PYTEST_COMMON_ARGS + session.posargs), + # Pytest will return 5 when no tests are collected. This can happen + # on travis where slow and flaky tests are excluded. + # See http://doc.pytest.org/en/latest/_modules/_pytest/main.html + success_codes=[0, 5], + env=get_pytest_env_vars() + ) + + +@nox.session(python=ALL_VERSIONS) +def py(session): + """Runs py.test for a sample using the specified version of Python.""" + if session.python in TESTED_VERSIONS: + _session_tests(session) + else: + session.skip("SKIPPED: {} tests are disabled for this sample.".format( + session.python + )) + + +# +# Readmegen +# + + +def _get_repo_root(): + """ Returns the root folder of the project. """ + # Get root of this repository. Assume we don't have directories nested deeper than 10 items. + p = Path(os.getcwd()) + for i in range(10): + if p is None: + break + if Path(p / ".git").exists(): + return str(p) + p = p.parent + raise Exception("Unable to detect repository root.") + + +GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")]) + + +@nox.session +@nox.parametrize("path", GENERATED_READMES) +def readmegen(session, path): + """(Re-)generates the readme for a sample.""" + session.install("jinja2", "pyyaml") + dir_ = os.path.dirname(path) + + if os.path.exists(os.path.join(dir_, "requirements.txt")): + session.install("-r", os.path.join(dir_, "requirements.txt")) + + in_file = os.path.join(dir_, "README.rst.in") + session.run( + "python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file + ) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py new file mode 100644 index 000000000..477b31b9c --- /dev/null +++ b/samples/snippets/publisher.py @@ -0,0 +1,334 @@ +#!/usr/bin/env python + +# Copyright 2016 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This application demonstrates how to perform basic operations on topics +with the Cloud Pub/Sub API. + +For more information, see the README.md under /pubsub and the documentation +at https://cloud.google.com/pubsub/docs. +""" + +import argparse + + +def list_topics(project_id): + """Lists all Pub/Sub topics in the given project.""" + # [START pubsub_list_topics] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + + publisher = pubsub_v1.PublisherClient() + project_path = publisher.project_path(project_id) + + for topic in publisher.list_topics(project_path): + print(topic) + # [END pubsub_list_topics] + + +def create_topic(project_id, topic_id): + """Create a new Pub/Sub topic.""" + # [START pubsub_quickstart_create_topic] + # [START pubsub_create_topic] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + topic = publisher.create_topic(topic_path) + + print("Topic created: {}".format(topic)) + # [END pubsub_quickstart_create_topic] + # [END pubsub_create_topic] + + +def delete_topic(project_id, topic_id): + """Deletes an existing Pub/Sub topic.""" + # [START pubsub_delete_topic] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + publisher.delete_topic(topic_path) + + print("Topic deleted: {}".format(topic_path)) + # [END pubsub_delete_topic] + + +def publish_messages(project_id, topic_id): + """Publishes multiple messages to a Pub/Sub topic.""" + # [START pubsub_quickstart_publisher] + # [START pubsub_publish] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + # The `topic_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/topics/{topic_id}` + topic_path = publisher.topic_path(project_id, topic_id) + + for n in range(1, 10): + data = u"Message number {}".format(n) + # Data must be a bytestring + data = data.encode("utf-8") + # When you publish a message, the client returns a future. + future = publisher.publish(topic_path, data=data) + print(future.result()) + + print("Published messages.") + # [END pubsub_quickstart_publisher] + # [END pubsub_publish] + + +def publish_messages_with_custom_attributes(project_id, topic_id): + """Publishes multiple messages with custom attributes + to a Pub/Sub topic.""" + # [START pubsub_publish_custom_attributes] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + for n in range(1, 10): + data = u"Message number {}".format(n) + # Data must be a bytestring + data = data.encode("utf-8") + # Add two attributes, origin and username, to the message + future = publisher.publish( + topic_path, data, origin="python-sample", username="gcp" + ) + print(future.result()) + + print("Published messages with custom attributes.") + # [END pubsub_publish_custom_attributes] + + +def publish_messages_with_error_handler(project_id, topic_id): + # [START pubsub_publish_messages_error_handler] + """Publishes multiple messages to a Pub/Sub topic with an error handler.""" + import time + + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + futures = dict() + + def get_callback(f, data): + def callback(f): + try: + print(f.result()) + futures.pop(data) + except: # noqa + print("Please handle {} for {}.".format(f.exception(), data)) + + return callback + + for i in range(10): + data = str(i) + futures.update({data: None}) + # When you publish a message, the client returns a future. + future = publisher.publish( + topic_path, data=data.encode("utf-8") # data must be a bytestring. + ) + futures[data] = future + # Publish failures shall be handled in the callback function. + future.add_done_callback(get_callback(future, data)) + + # Wait for all the publish futures to resolve before exiting. + while futures: + time.sleep(5) + + print("Published message with error handler.") + # [END pubsub_publish_messages_error_handler] + + +def publish_messages_with_batch_settings(project_id, topic_id): + """Publishes multiple messages to a Pub/Sub topic with batch settings.""" + # [START pubsub_publisher_batch_settings] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + # Configure the batch to publish as soon as there is ten messages, + # one kilobyte of data, or one second has passed. + batch_settings = pubsub_v1.types.BatchSettings( + max_messages=10, # default 100 + max_bytes=1024, # default 1 MB + max_latency=1, # default 10 ms + ) + publisher = pubsub_v1.PublisherClient(batch_settings) + topic_path = publisher.topic_path(project_id, topic_id) + + # Resolve the publish future in a separate thread. + def callback(future): + message_id = future.result() + print(message_id) + + for n in range(1, 10): + data = u"Message number {}".format(n) + # Data must be a bytestring + data = data.encode("utf-8") + future = publisher.publish(topic_path, data=data) + # Non-blocking. Allow the publisher client to batch multiple messages. + future.add_done_callback(callback) + + print("Published messages with batch settings.") + # [END pubsub_publisher_batch_settings] + + +def publish_messages_with_retry_settings(project_id, topic_id): + """Publishes messages with custom retry settings.""" + # [START pubsub_publisher_retry_settings] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + # Configure the retry settings. Defaults will be overwritten. + retry_settings = { + "interfaces": { + "google.pubsub.v1.Publisher": { + "retry_codes": { + "publish": [ + "ABORTED", + "CANCELLED", + "DEADLINE_EXCEEDED", + "INTERNAL", + "RESOURCE_EXHAUSTED", + "UNAVAILABLE", + "UNKNOWN", + ] + }, + "retry_params": { + "messaging": { + "initial_retry_delay_millis": 100, # default: 100 + "retry_delay_multiplier": 1.3, # default: 1.3 + "max_retry_delay_millis": 60000, # default: 60000 + "initial_rpc_timeout_millis": 5000, # default: 25000 + "rpc_timeout_multiplier": 1.0, # default: 1.0 + "max_rpc_timeout_millis": 600000, # default: 30000 + "total_timeout_millis": 600000, # default: 600000 + } + }, + "methods": { + "Publish": { + "retry_codes_name": "publish", + "retry_params_name": "messaging", + } + }, + } + } + } + + publisher = pubsub_v1.PublisherClient(client_config=retry_settings) + topic_path = publisher.topic_path(project_id, topic_id) + + for n in range(1, 10): + data = u"Message number {}".format(n) + # Data must be a bytestring + data = data.encode("utf-8") + future = publisher.publish(topic_path, data=data) + print(future.result()) + + print("Published messages with retry settings.") + # [END pubsub_publisher_retry_settings] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project_id", help="Your Google Cloud project ID") + + subparsers = parser.add_subparsers(dest="command") + subparsers.add_parser("list", help=list_topics.__doc__) + + create_parser = subparsers.add_parser("create", help=create_topic.__doc__) + create_parser.add_argument("topic_id") + + delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) + delete_parser.add_argument("topic_id") + + publish_parser = subparsers.add_parser("publish", help=publish_messages.__doc__) + publish_parser.add_argument("topic_id") + + publish_with_custom_attributes_parser = subparsers.add_parser( + "publish-with-custom-attributes", + help=publish_messages_with_custom_attributes.__doc__, + ) + publish_with_custom_attributes_parser.add_argument("topic_id") + + publish_with_error_handler_parser = subparsers.add_parser( + "publish-with-error-handler", help=publish_messages_with_error_handler.__doc__, + ) + publish_with_error_handler_parser.add_argument("topic_id") + + publish_with_batch_settings_parser = subparsers.add_parser( + "publish-with-batch-settings", + help=publish_messages_with_batch_settings.__doc__, + ) + publish_with_batch_settings_parser.add_argument("topic_id") + + publish_with_retry_settings_parser = subparsers.add_parser( + "publish-with-retry-settings", + help=publish_messages_with_retry_settings.__doc__, + ) + publish_with_retry_settings_parser.add_argument("topic_id") + + args = parser.parse_args() + + if args.command == "list": + list_topics(args.project_id) + elif args.command == "create": + create_topic(args.project_id, args.topic_id) + elif args.command == "delete": + delete_topic(args.project_id, args.topic_id) + elif args.command == "publish": + publish_messages(args.project_id, args.topic_id) + elif args.command == "publish-with-custom-attributes": + publish_messages_with_custom_attributes(args.project_id, args.topic_id) + elif args.command == "publish-with-error-handler": + publish_messages_with_error_handler(args.project_id, args.topic_id) + elif args.command == "publish-with-batch-settings": + publish_messages_with_batch_settings(args.project_id, args.topic_id) + elif args.command == "publish-with-retry-settings": + publish_messages_with_retry_settings(args.project_id, args.topic_id) diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py new file mode 100644 index 000000000..b5c2ea1ea --- /dev/null +++ b/samples/snippets/publisher_test.py @@ -0,0 +1,146 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +import uuid + +import backoff +from google.cloud import pubsub_v1 +import mock +import pytest + +import publisher + +UUID = uuid.uuid4().hex +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +TOPIC_ADMIN = "publisher-test-topic-admin-" + UUID +TOPIC_PUBLISH = "publisher-test-topic-publish-" + UUID + + +@pytest.fixture +def client(): + yield pubsub_v1.PublisherClient() + + +@pytest.fixture +def topic_admin(client): + topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) + + try: + topic = client.get_topic(topic_path) + except: # noqa + topic = client.create_topic(topic_path) + + yield topic.name + # Teardown of `topic_admin` is handled in `test_delete()`. + + +@pytest.fixture +def topic_publish(client): + topic_path = client.topic_path(PROJECT, TOPIC_PUBLISH) + + try: + topic = client.get_topic(topic_path) + except: # noqa + topic = client.create_topic(topic_path) + + yield topic.name + + client.delete_topic(topic.name) + + +def _make_sleep_patch(): + real_sleep = time.sleep + + def new_sleep(period): + if period == 60: + real_sleep(5) + raise RuntimeError("sigil") + else: + real_sleep(period) + + return mock.patch("time.sleep", new=new_sleep) + + +def test_list(client, topic_admin, capsys): + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + publisher.list_topics(PROJECT) + out, _ = capsys.readouterr() + assert topic_admin in out + + eventually_consistent_test() + + +def test_create(client): + topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) + try: + client.delete_topic(topic_path) + except Exception: + pass + + publisher.create_topic(PROJECT, TOPIC_ADMIN) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + assert client.get_topic(topic_path) + + eventually_consistent_test() + + +def test_delete(client, topic_admin): + publisher.delete_topic(PROJECT, TOPIC_ADMIN) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + with pytest.raises(Exception): + client.get_topic(client.topic_path(PROJECT, TOPIC_ADMIN)) + + eventually_consistent_test() + + +def test_publish(topic_publish, capsys): + publisher.publish_messages(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published" in out + + +def test_publish_with_custom_attributes(topic_publish, capsys): + publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published" in out + + +def test_publish_with_batch_settings(topic_publish, capsys): + publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published" in out + + +def test_publish_with_retry_settings(topic_publish, capsys): + publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published" in out + + +def test_publish_with_error_handler(topic_publish, capsys): + publisher.publish_messages_with_error_handler(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published" in out diff --git a/samples/snippets/quickstart/pub.py b/samples/snippets/quickstart/pub.py new file mode 100644 index 000000000..16432c0c3 --- /dev/null +++ b/samples/snippets/quickstart/pub.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START pubsub_quickstart_pub_all] +import argparse +import time + +# [START pubsub_quickstart_pub_deps] +from google.cloud import pubsub_v1 + +# [END pubsub_quickstart_pub_deps] + + +def get_callback(api_future, data, ref): + """Wrap message data in the context of the callback function.""" + + def callback(api_future): + try: + print( + "Published message {} now has message ID {}".format( + data, api_future.result() + ) + ) + ref["num_messages"] += 1 + except Exception: + print( + "A problem occurred when publishing {}: {}\n".format( + data, api_future.exception() + ) + ) + raise + + return callback + + +def pub(project_id, topic_id): + """Publishes a message to a Pub/Sub topic.""" + # [START pubsub_quickstart_pub_client] + # Initialize a Publisher client. + client = pubsub_v1.PublisherClient() + # [END pubsub_quickstart_pub_client] + # Create a fully qualified identifier in the form of + # `projects/{project_id}/topics/{topic_id}` + topic_path = client.topic_path(project_id, topic_id) + + # Data sent to Cloud Pub/Sub must be a bytestring. + data = b"Hello, World!" + + # Keep track of the number of published messages. + ref = dict({"num_messages": 0}) + + # When you publish a message, the client returns a future. + api_future = client.publish(topic_path, data=data) + api_future.add_done_callback(get_callback(api_future, data, ref)) + + # Keep the main thread from exiting while the message future + # gets resolved in the background. + while api_future.running(): + time.sleep(0.5) + print("Published {} message(s).".format(ref["num_messages"])) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project_id", help="Google Cloud project ID") + parser.add_argument("topic_id", help="Pub/Sub topic ID") + + args = parser.parse_args() + + pub(args.project_id, args.topic_id) +# [END pubsub_quickstart_pub_all] diff --git a/samples/snippets/quickstart/pub_test.py b/samples/snippets/quickstart/pub_test.py new file mode 100644 index 000000000..6f5cc06c4 --- /dev/null +++ b/samples/snippets/quickstart/pub_test.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +from google.api_core.exceptions import AlreadyExists +from google.cloud import pubsub_v1 +import pytest + +import pub # noqa + + +UUID = uuid.uuid4().hex +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +TOPIC = "quickstart-pub-test-topic-" + UUID + + +@pytest.fixture(scope="module") +def publisher_client(): + yield pubsub_v1.PublisherClient() + + +@pytest.fixture(scope="module") +def topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + publisher_client.create_topic(topic_path) + except AlreadyExists: + pass + + yield TOPIC + + publisher_client.delete_topic(topic_path) + + +def test_pub(publisher_client, topic, capsys): + pub.pub(PROJECT, topic) + + out, _ = capsys.readouterr() + + assert "Hello, World!" in out diff --git a/samples/snippets/quickstart/sub.py b/samples/snippets/quickstart/sub.py new file mode 100644 index 000000000..efe008915 --- /dev/null +++ b/samples/snippets/quickstart/sub.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START pubsub_quickstart_sub_all] +import argparse + +# [START pubsub_quickstart_sub_deps] +from google.cloud import pubsub_v1 + +# [END pubsub_quickstart_sub_deps] + + +def sub(project_id, subscription_id): + """Receives messages from a Pub/Sub subscription.""" + # [START pubsub_quickstart_sub_client] + # Initialize a Subscriber client + subscriber_client = pubsub_v1.SubscriberClient() + # [END pubsub_quickstart_sub_client] + # Create a fully qualified identifier in the form of + # `projects/{project_id}/subscriptions/{subscription_id}` + subscription_path = subscriber_client.subscription_path(project_id, subscription_id) + + def callback(message): + print( + "Received message {} of message ID {}\n".format(message, message.message_id) + ) + # Acknowledge the message. Unack'ed messages will be redelivered. + message.ack() + print("Acknowledged message {}\n".format(message.message_id)) + + streaming_pull_future = subscriber_client.subscribe( + subscription_path, callback=callback + ) + print("Listening for messages on {}..\n".format(subscription_path)) + + try: + # Calling result() on StreamingPullFuture keeps the main thread from + # exiting while messages get processed in the callbacks. + streaming_pull_future.result() + except: # noqa + streaming_pull_future.cancel() + + subscriber_client.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project_id", help="Google Cloud project ID") + parser.add_argument("subscription_id", help="Pub/Sub subscription ID") + + args = parser.parse_args() + + sub(args.project_id, args.subscription_id) +# [END pubsub_quickstart_sub_all] diff --git a/samples/snippets/quickstart/sub_test.py b/samples/snippets/quickstart/sub_test.py new file mode 100644 index 000000000..38047422a --- /dev/null +++ b/samples/snippets/quickstart/sub_test.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import uuid + +from google.api_core.exceptions import AlreadyExists +from google.cloud import pubsub_v1 +import mock +import pytest + +import sub # noqa + + +UUID = uuid.uuid4().hex +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +TOPIC = "quickstart-sub-test-topic-" + UUID +SUBSCRIPTION = "quickstart-sub-test-topic-sub-" + UUID + +publisher_client = pubsub_v1.PublisherClient() +subscriber_client = pubsub_v1.SubscriberClient() + + +@pytest.fixture(scope="module") +def topic_path(): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + topic = publisher_client.create_topic(topic_path) + yield topic.name + except AlreadyExists: + yield topic_path + + publisher_client.delete_topic(topic_path) + + +@pytest.fixture(scope="module") +def subscription_path(topic_path): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION) + + try: + subscription = subscriber_client.create_subscription( + subscription_path, topic_path + ) + yield subscription.name + except AlreadyExists: + yield subscription_path + + subscriber_client.delete_subscription(subscription_path) + subscriber_client.close() + + +def _publish_messages(topic_path): + publish_future = publisher_client.publish(topic_path, data=b"Hello World!") + publish_future.result() + + +def test_sub(monkeypatch, topic_path, subscription_path, capsys): + + real_client = pubsub_v1.SubscriberClient() + mock_client = mock.Mock(spec=pubsub_v1.SubscriberClient, wraps=real_client) + + # Attributes on mock_client_constructor uses the corresponding + # attributes on pubsub_v1.SubscriberClient. + mock_client_constructor = mock.create_autospec(pubsub_v1.SubscriberClient) + mock_client_constructor.return_value = mock_client + + monkeypatch.setattr(pubsub_v1, "SubscriberClient", mock_client_constructor) + + def mock_subscribe(subscription_path, callback=None): + real_future = real_client.subscribe(subscription_path, callback=callback) + mock_future = mock.Mock(spec=real_future, wraps=real_future) + + def mock_result(): + return real_future.result(timeout=10) + + mock_future.result.side_effect = mock_result + return mock_future + + mock_client.subscribe.side_effect = mock_subscribe + + _publish_messages(topic_path) + + sub.sub(PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + assert "Received message" in out + assert "Acknowledged message" in out + + real_client.close() diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt new file mode 100644 index 000000000..adf26b9f9 --- /dev/null +++ b/samples/snippets/requirements-test.txt @@ -0,0 +1,3 @@ +backoff==1.10.0 +pytest==5.3.2 +mock==3.0.5 diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt new file mode 100644 index 000000000..9b496510a --- /dev/null +++ b/samples/snippets/requirements.txt @@ -0,0 +1 @@ +google-cloud-pubsub==1.6.0 diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py new file mode 100644 index 000000000..f079e7d42 --- /dev/null +++ b/samples/snippets/subscriber.py @@ -0,0 +1,783 @@ +#!/usr/bin/env python + +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This application demonstrates how to perform basic operations on +subscriptions with the Cloud Pub/Sub API. + +For more information, see the README.md under /pubsub and the documentation +at https://cloud.google.com/pubsub/docs. +""" + +import argparse + + +def list_subscriptions_in_topic(project_id, topic_id): + """Lists all subscriptions for a given topic.""" + # [START pubsub_list_topic_subscriptions] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + for subscription in publisher.list_topic_subscriptions(topic_path): + print(subscription) + # [END pubsub_list_topic_subscriptions] + + +def list_subscriptions_in_project(project_id): + """Lists all subscriptions in the current project.""" + # [START pubsub_list_subscriptions] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + + subscriber = pubsub_v1.SubscriberClient() + project_path = subscriber.project_path(project_id) + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + for subscription in subscriber.list_subscriptions(project_path): + print(subscription.name) + # [END pubsub_list_subscriptions] + + +def create_subscription(project_id, topic_id, subscription_id): + """Create a new pull subscription on the given topic.""" + # [START pubsub_create_pull_subscription] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + subscription = subscriber.create_subscription(subscription_path, topic_path) + + print("Subscription created: {}".format(subscription)) + # [END pubsub_create_pull_subscription] + + +def create_subscription_with_dead_letter_topic( + project_id, topic_id, subscription_id, dead_letter_topic_id +): + """Create a subscription with dead letter policy.""" + # [START pubsub_dead_letter_create_subscription] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import DeadLetterPolicy + + # TODO(developer) + # project_id = "your-project-id" + # endpoint = "https://my-test-project.appspot.com/push" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_id = "your-topic-id" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_id = "your-subscription-id" + # TODO(developer): This is an existing dead letter topic that the subscription + # with dead letter policy will forward dead letter messages to. + # dead_letter_topic_id = "your-dead-letter-topic-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_id) + + dead_letter_policy = DeadLetterPolicy( + dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10 + ) + + with subscriber: + subscription = subscriber.create_subscription( + subscription_path, topic_path, dead_letter_policy=dead_letter_policy + ) + + print("Subscription created: {}".format(subscription.name)) + print( + "It will forward dead letter messages to: {}".format( + subscription.dead_letter_policy.dead_letter_topic + ) + ) + print( + "After {} delivery attempts.".format( + subscription.dead_letter_policy.max_delivery_attempts + ) + ) + # [END pubsub_dead_letter_create_subscription] + + +def create_push_subscription(project_id, topic_id, subscription_id, endpoint): + """Create a new push subscription on the given topic.""" + # [START pubsub_create_push_subscription] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # subscription_id = "your-subscription-id" + # endpoint = "https://my-test-project.appspot.com/push" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + subscription = subscriber.create_subscription( + subscription_path, topic_path, push_config + ) + + print("Push subscription created: {}".format(subscription)) + print("Endpoint for subscription is: {}".format(endpoint)) + # [END pubsub_create_push_subscription] + + +def delete_subscription(project_id, subscription_id): + """Deletes an existing Pub/Sub topic.""" + # [START pubsub_delete_subscription] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + subscriber.delete_subscription(subscription_path) + + print("Subscription deleted: {}".format(subscription_path)) + # [END pubsub_delete_subscription] + + +def update_push_subscription(project_id, topic_id, subscription_id, endpoint): + """ + Updates an existing Pub/Sub subscription's push endpoint URL. + Note that certain properties of a subscription, such as + its topic, are not modifiable. + """ + # [START pubsub_update_push_configuration] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # subscription_id = "your-subscription-id" + # endpoint = "https://my-test-project.appspot.com/push" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) + + subscription = pubsub_v1.types.Subscription( + name=subscription_path, topic=topic_id, push_config=push_config + ) + + update_mask = {"paths": {"push_config"}} + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + result = subscriber.update_subscription(subscription, update_mask) + + print("Subscription updated: {}".format(subscription_path)) + print("New endpoint for subscription is: {}".format(result.push_config)) + # [END pubsub_update_push_configuration] + + +def update_subscription_with_dead_letter_policy( + project_id, topic_id, subscription_id, dead_letter_topic_id +): + """Update a subscription's dead letter policy.""" + # [START pubsub_dead_letter_update_subscription] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import DeadLetterPolicy, FieldMask + + # TODO(developer) + # project_id = "your-project-id" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_id = "your-topic-id" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_id = "your-subscription-id" + # TODO(developer): This is an existing dead letter topic that the subscription + # with dead letter policy will forward dead letter messages to. + # dead_letter_topic_id = "your-dead-letter-topic-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_id) + + subscription_before_update = subscriber.get_subscription(subscription_path) + print("Before the update: {}".format(subscription_before_update)) + + # Indicates which fields in the provided subscription to update. + update_mask = FieldMask(paths=["dead_letter_policy.max_delivery_attempts"]) + + # Construct a dead letter policy you expect to have after the update. + dead_letter_policy = DeadLetterPolicy( + dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=20 + ) + + # Construct the subscription with the dead letter policy you expect to have + # after the update. Here, values in the required fields (name, topic) help + # identify the subscription. + subscription = pubsub_v1.types.Subscription( + name=subscription_path, topic=topic_path, dead_letter_policy=dead_letter_policy, + ) + + with subscriber: + subscription_after_update = subscriber.update_subscription( + subscription, update_mask + ) + + print("After the update: {}".format(subscription_after_update)) + # [END pubsub_dead_letter_update_subscription] + return subscription_after_update + + +def remove_dead_letter_policy(project_id, topic_id, subscription_id): + """Remove dead letter policy from a subscription.""" + # [START pubsub_dead_letter_remove] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import FieldMask + + # TODO(developer) + # project_id = "your-project-id" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_id = "your-topic-id" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + subscription_before_update = subscriber.get_subscription(subscription_path) + print("Before removing the policy: {}".format(subscription_before_update)) + + # Indicates which fields in the provided subscription to update. + update_mask = FieldMask( + paths=[ + "dead_letter_policy.dead_letter_topic", + "dead_letter_policy.max_delivery_attempts", + ] + ) + + # Construct the subscription (without any dead letter policy) that you + # expect to have after the update. + subscription = pubsub_v1.types.Subscription( + name=subscription_path, topic=topic_path + ) + + with subscriber: + subscription_after_update = subscriber.update_subscription( + subscription, update_mask + ) + + print("After removing the policy: {}".format(subscription_after_update)) + # [END pubsub_dead_letter_remove] + return subscription_after_update + + +def receive_messages(project_id, subscription_id, timeout=None): + """Receives messages from a pull subscription.""" + # [START pubsub_subscriber_async_pull] + # [START pubsub_quickstart_subscriber] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + + subscriber = pubsub_v1.SubscriberClient() + # The `subscription_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/subscriptions/{subscription_id}` + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message): + print("Received message: {}".format(message)) + message.ack() + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() + # [END pubsub_subscriber_async_pull] + # [END pubsub_quickstart_subscriber] + + +def receive_messages_with_custom_attributes(project_id, subscription_id, timeout=None): + """Receives messages from a pull subscription.""" + # [START pubsub_subscriber_async_pull_custom_attributes] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message): + print("Received message: {}".format(message.data)) + if message.attributes: + print("Attributes:") + for key in message.attributes: + value = message.attributes.get(key) + print("{}: {}".format(key, value)) + message.ack() + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() + # [END pubsub_subscriber_async_pull_custom_attributes] + + +def receive_messages_with_flow_control(project_id, subscription_id, timeout=None): + """Receives messages from a pull subscription with flow control.""" + # [START pubsub_subscriber_flow_settings] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message): + print("Received message: {}".format(message.data)) + message.ack() + + # Limit the subscriber to only have ten outstanding messages at a time. + flow_control = pubsub_v1.types.FlowControl(max_messages=10) + + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback, flow_control=flow_control + ) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() + # [END pubsub_subscriber_flow_settings] + + +def synchronous_pull(project_id, subscription_id): + """Pulling messages synchronously.""" + # [START pubsub_subscriber_sync_pull] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + NUM_MESSAGES = 3 + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + # The subscriber pulls a specific number of messages. + response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) + + ack_ids = [] + for received_message in response.received_messages: + print("Received: {}".format(received_message.message.data)) + ack_ids.append(received_message.ack_id) + + # Acknowledges the received messages so they will not be sent again. + subscriber.acknowledge(subscription_path, ack_ids) + + print( + "Received and acknowledged {} messages. Done.".format( + len(response.received_messages) + ) + ) + # [END pubsub_subscriber_sync_pull] + + +def synchronous_pull_with_lease_management(project_id, subscription_id): + """Pulling messages synchronously with lease management""" + # [START pubsub_subscriber_sync_pull_with_lease] + import logging + import multiprocessing + import random + import time + + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + NUM_MESSAGES = 2 + ACK_DEADLINE = 30 + SLEEP_TIME = 10 + + # The subscriber pulls a specific number of messages. + response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) + + multiprocessing.log_to_stderr() + logger = multiprocessing.get_logger() + logger.setLevel(logging.INFO) + + def worker(msg): + """Simulates a long-running process.""" + RUN_TIME = random.randint(1, 60) + logger.info( + "{}: Running {} for {}s".format( + time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME + ) + ) + time.sleep(RUN_TIME) + + # `processes` stores process as key and ack id and message as values. + processes = dict() + for message in response.received_messages: + process = multiprocessing.Process(target=worker, args=(message,)) + processes[process] = (message.ack_id, message.message.data) + process.start() + + while processes: + for process in list(processes): + ack_id, msg_data = processes[process] + # If the process is still running, reset the ack deadline as + # specified by ACK_DEADLINE once every while as specified + # by SLEEP_TIME. + if process.is_alive(): + # `ack_deadline_seconds` must be between 10 to 600. + subscriber.modify_ack_deadline( + subscription_path, [ack_id], ack_deadline_seconds=ACK_DEADLINE, + ) + logger.info( + "{}: Reset ack deadline for {} for {}s".format( + time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE, + ) + ) + + # If the processs is finished, acknowledges using `ack_id`. + else: + subscriber.acknowledge(subscription_path, [ack_id]) + logger.info( + "{}: Acknowledged {}".format( + time.strftime("%X", time.gmtime()), msg_data + ) + ) + processes.pop(process) + + # If there are still processes running, sleeps the thread. + if processes: + time.sleep(SLEEP_TIME) + + print( + "Received and acknowledged {} messages. Done.".format( + len(response.received_messages) + ) + ) + + # Close the underlying gPRC channel. Alternatively, wrap subscriber in + # a 'with' block to automatically call close() when done. + subscriber.close() + # [END pubsub_subscriber_sync_pull_with_lease] + + +def listen_for_errors(project_id, subscription_id, timeout=None): + """Receives messages and catches errors from a pull subscription.""" + # [START pubsub_subscriber_error_listener] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message): + print("Received message: {}".format(message)) + message.ack() + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except Exception as e: + streaming_pull_future.cancel() + print( + "Listening for messages on {} threw an exception: {}.".format( + subscription_id, e + ) + ) + # [END pubsub_subscriber_error_listener] + + +def receive_messages_with_delivery_attempts(project_id, subscription_id, timeout=None): + # [START pubsub_dead_letter_delivery_attempt] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message): + print("Received message: {}".format(message)) + print("With delivery attempts: {}".format(message.delivery_attempt)) + message.ack() + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() + # [END pubsub_dead_letter_delivery_attempt] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project_id", help="Your Google Cloud project ID") + + subparsers = parser.add_subparsers(dest="command") + list_in_topic_parser = subparsers.add_parser( + "list-in-topic", help=list_subscriptions_in_topic.__doc__ + ) + list_in_topic_parser.add_argument("topic_id") + + list_in_project_parser = subparsers.add_parser( + "list-in-project", help=list_subscriptions_in_project.__doc__ + ) + + create_parser = subparsers.add_parser("create", help=create_subscription.__doc__) + create_parser.add_argument("topic_id") + create_parser.add_argument("subscription_id") + + create_with_dead_letter_policy_parser = subparsers.add_parser( + "create-with-dead-letter-policy", + help=create_subscription_with_dead_letter_topic.__doc__, + ) + create_with_dead_letter_policy_parser.add_argument("topic_id") + create_with_dead_letter_policy_parser.add_argument("subscription_id") + create_with_dead_letter_policy_parser.add_argument("dead_letter_topic_id") + + create_push_parser = subparsers.add_parser( + "create-push", help=create_push_subscription.__doc__ + ) + create_push_parser.add_argument("topic_id") + create_push_parser.add_argument("subscription_id") + create_push_parser.add_argument("endpoint") + + delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__) + delete_parser.add_argument("subscription_id") + + update_push_parser = subparsers.add_parser( + "update-push", help=update_push_subscription.__doc__ + ) + update_push_parser.add_argument("topic_id") + update_push_parser.add_argument("subscription_id") + update_push_parser.add_argument("endpoint") + + update_dead_letter_policy_parser = subparsers.add_parser( + "update-dead-letter-policy", + help=update_subscription_with_dead_letter_policy.__doc__, + ) + update_dead_letter_policy_parser.add_argument("topic_id") + update_dead_letter_policy_parser.add_argument("subscription_id") + update_dead_letter_policy_parser.add_argument("dead_letter_topic_id") + + remove_dead_letter_policy_parser = subparsers.add_parser( + "remove-dead-letter-policy", help=remove_dead_letter_policy.__doc__ + ) + remove_dead_letter_policy_parser.add_argument("topic_id") + remove_dead_letter_policy_parser.add_argument("subscription_id") + + receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__) + receive_parser.add_argument("subscription_id") + receive_parser.add_argument("timeout", default=None, type=float, nargs="?") + + receive_with_custom_attributes_parser = subparsers.add_parser( + "receive-custom-attributes", + help=receive_messages_with_custom_attributes.__doc__, + ) + receive_with_custom_attributes_parser.add_argument("subscription_id") + receive_with_custom_attributes_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + + receive_with_flow_control_parser = subparsers.add_parser( + "receive-flow-control", help=receive_messages_with_flow_control.__doc__ + ) + receive_with_flow_control_parser.add_argument("subscription_id") + receive_with_flow_control_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + + synchronous_pull_parser = subparsers.add_parser( + "receive-synchronously", help=synchronous_pull.__doc__ + ) + synchronous_pull_parser.add_argument("subscription_id") + + synchronous_pull_with_lease_management_parser = subparsers.add_parser( + "receive-synchronously-with-lease", + help=synchronous_pull_with_lease_management.__doc__, + ) + synchronous_pull_with_lease_management_parser.add_argument("subscription_id") + + listen_for_errors_parser = subparsers.add_parser( + "listen-for-errors", help=listen_for_errors.__doc__ + ) + listen_for_errors_parser.add_argument("subscription_id") + listen_for_errors_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + + receive_messages_with_delivery_attempts_parser = subparsers.add_parser( + "receive-messages-with-delivery-attempts", + help=receive_messages_with_delivery_attempts.__doc__, + ) + receive_messages_with_delivery_attempts_parser.add_argument("subscription_id") + receive_messages_with_delivery_attempts_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + + args = parser.parse_args() + + if args.command == "list-in-topic": + list_subscriptions_in_topic(args.project_id, args.topic_id) + elif args.command == "list-in-project": + list_subscriptions_in_project(args.project_id) + elif args.command == "create": + create_subscription(args.project_id, args.topic_id, args.subscription_id) + elif args.command == "create-with-dead-letter-policy": + create_subscription_with_dead_letter_topic( + args.project_id, + args.topic_id, + args.subscription_id, + args.dead_letter_topic_id, + ) + elif args.command == "create-push": + create_push_subscription( + args.project_id, args.topic_id, args.subscription_id, args.endpoint, + ) + elif args.command == "delete": + delete_subscription(args.project_id, args.subscription_id) + elif args.command == "update-push": + update_push_subscription( + args.project_id, args.topic_id, args.subscription_id, args.endpoint, + ) + elif args.command == "update-dead-letter-policy": + update_subscription_with_dead_letter_policy( + args.project_id, + args.topic_id, + args.subscription_id, + args.dead_letter_topic_id, + ) + elif args.command == "remove-dead-letter-policy": + remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id) + elif args.command == "receive": + receive_messages(args.project_id, args.subscription_id, args.timeout) + elif args.command == "receive-custom-attributes": + receive_messages_with_custom_attributes( + args.project_id, args.subscription_id, args.timeout + ) + elif args.command == "receive-flow-control": + receive_messages_with_flow_control( + args.project_id, args.subscription_id, args.timeout + ) + elif args.command == "receive-synchronously": + synchronous_pull(args.project_id, args.subscription_id) + elif args.command == "receive-synchronously-with-lease": + synchronous_pull_with_lease_management(args.project_id, args.subscription_id) + elif args.command == "listen-for-errors": + listen_for_errors(args.project_id, args.subscription_id, args.timeout) + elif args.command == "receive-messages-with-delivery-attempts": + receive_messages_with_delivery_attempts( + args.project_id, args.subscription_id, args.timeout + ) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py new file mode 100644 index 000000000..a7f7c139c --- /dev/null +++ b/samples/snippets/subscriber_test.py @@ -0,0 +1,341 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +import backoff +from google.cloud import pubsub_v1 +import pytest + +import subscriber + +UUID = uuid.uuid4().hex +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +TOPIC = "subscription-test-topic-" + UUID +DEAD_LETTER_TOPIC = "subscription-test-dead-letter-topic-" + UUID +SUBSCRIPTION_ADMIN = "subscription-test-subscription-admin-" + UUID +SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID +SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID +SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID +ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) +NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) + + +@pytest.fixture(scope="module") +def publisher_client(): + yield pubsub_v1.PublisherClient() + + +@pytest.fixture(scope="module") +def topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + topic = publisher_client.get_topic(topic_path) + except: # noqa + topic = publisher_client.create_topic(topic_path) + + yield topic.name + + publisher_client.delete_topic(topic.name) + + +@pytest.fixture(scope="module") +def dead_letter_topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) + + try: + dead_letter_topic = publisher_client.get_topic(topic_path) + except: # noqa + dead_letter_topic = publisher_client.create_topic(topic_path) + + yield dead_letter_topic.name + + publisher_client.delete_topic(dead_letter_topic.name) + + +@pytest.fixture(scope="module") +def subscriber_client(): + subscriber_client = pubsub_v1.SubscriberClient() + yield subscriber_client + subscriber_client.close() + + +@pytest.fixture(scope="module") +def subscription_admin(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) + + try: + subscription = subscriber_client.get_subscription(subscription_path) + except: # noqa + subscription = subscriber_client.create_subscription( + subscription_path, topic=topic + ) + + yield subscription.name + + +@pytest.fixture(scope="module") +def subscription_sync(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_SYNC) + + try: + subscription = subscriber_client.get_subscription(subscription_path) + except: # noqa + subscription = subscriber_client.create_subscription( + subscription_path, topic=topic + ) + + yield subscription.name + + subscriber_client.delete_subscription(subscription.name) + + +@pytest.fixture(scope="module") +def subscription_async(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ASYNC) + + try: + subscription = subscriber_client.get_subscription(subscription_path) + except: # noqa + subscription = subscriber_client.create_subscription( + subscription_path, topic=topic + ) + + yield subscription.name + + subscriber_client.delete_subscription(subscription.name) + + +@pytest.fixture(scope="module") +def subscription_dlq(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) + + try: + subscription = subscriber_client.get_subscription(subscription_path) + except: # noqa + subscription = subscriber_client.create_subscription( + subscription_path, topic=topic + ) + + yield subscription.name + + subscriber_client.delete_subscription(subscription.name) + + +def test_list_in_topic(subscription_admin, capsys): + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + subscriber.list_subscriptions_in_topic(PROJECT, TOPIC) + out, _ = capsys.readouterr() + assert subscription_admin in out + + eventually_consistent_test() + + +def test_list_in_project(subscription_admin, capsys): + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + subscriber.list_subscriptions_in_project(PROJECT) + out, _ = capsys.readouterr() + assert subscription_admin in out + + eventually_consistent_test() + + +def test_create(subscriber_client): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) + + try: + subscriber_client.delete_subscription(subscription_path) + except Exception: + pass + + subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + assert subscriber_client.get_subscription(subscription_path) + + eventually_consistent_test() + + +def test_create_subscription_with_dead_letter_policy( + subscriber_client, publisher_client, topic, dead_letter_topic, capsys +): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) + dead_letter_topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) + + try: + subscriber_client.delete_subscription(subscription_path) + except Exception: + pass + + subscriber.create_subscription_with_dead_letter_topic( + PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC + ) + + out, _ = capsys.readouterr() + assert "Subscription created: " + subscription_path in out + assert "It will forward dead letter messages to: " + dead_letter_topic_path in out + assert "After 10 delivery attempts." in out + + +def test_create_push(subscriber_client): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) + try: + subscriber_client.delete_subscription(subscription_path) + except Exception: + pass + + subscriber.create_push_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + assert subscriber_client.get_subscription(subscription_path) + + eventually_consistent_test() + + +def test_update(subscriber_client, subscription_admin, capsys): + subscriber.update_push_subscription( + PROJECT, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT + ) + + out, _ = capsys.readouterr() + assert "Subscription updated" in out + + +def test_update_dead_letter_policy( + subscriber_client, topic, subscription_dlq, dead_letter_topic, capsys +): + _ = subscriber.update_subscription_with_dead_letter_policy( + PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC + ) + + out, _ = capsys.readouterr() + assert "max_delivery_attempts: 20" in out + + +def test_delete(subscriber_client, subscription_admin): + subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ADMIN) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + with pytest.raises(Exception): + subscriber_client.get_subscription(subscription_admin) + + eventually_consistent_test() + + +def _publish_messages(publisher_client, topic): + for n in range(5): + data = u"message {}".format(n).encode("utf-8") + publish_future = publisher_client.publish( + topic, data=data, origin="python-sample" + ) + publish_future.result() + + +def test_receive(publisher_client, topic, subscription_async, capsys): + _publish_messages(publisher_client, topic) + + subscriber.receive_messages(PROJECT, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async in out + assert "message" in out + + +def test_receive_with_custom_attributes( + publisher_client, topic, subscription_async, capsys +): + + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_with_custom_attributes(PROJECT, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert "message" in out + assert "origin" in out + assert "python-sample" in out + + +def test_receive_with_flow_control(publisher_client, topic, subscription_async, capsys): + + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_with_flow_control(PROJECT, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async in out + assert "message" in out + + +def test_receive_synchronously(publisher_client, topic, subscription_sync, capsys): + _publish_messages(publisher_client, topic) + + subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC) + + out, _ = capsys.readouterr() + assert "Done." in out + + +def test_receive_synchronously_with_lease( + publisher_client, topic, subscription_sync, capsys +): + _publish_messages(publisher_client, topic) + + subscriber.synchronous_pull_with_lease_management(PROJECT, SUBSCRIPTION_SYNC) + + out, _ = capsys.readouterr() + assert "Done." in out + + +def test_listen_for_errors(publisher_client, topic, subscription_async, capsys): + + _publish_messages(publisher_client, topic) + + subscriber.listen_for_errors(PROJECT, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async in out + assert "threw an exception" in out + + +def test_receive_with_delivery_attempts( + publisher_client, topic, subscription_dlq, dead_letter_topic, capsys +): + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_with_delivery_attempts(PROJECT, SUBSCRIPTION_DLQ, 10) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_dlq in out + assert "Received message: " in out + assert "message 4" in out + assert "With delivery attempts: " in out + + +def test_remove_dead_letter_policy(subscriber_client, subscription_dlq): + subscription_after_update = subscriber.remove_dead_letter_policy( + PROJECT, TOPIC, SUBSCRIPTION_DLQ + ) + + assert subscription_after_update.dead_letter_policy.dead_letter_topic == "" diff --git a/synth.py b/synth.py index b44cc0acf..0e2c96e42 100644 --- a/synth.py +++ b/synth.py @@ -18,6 +18,7 @@ import synthtool as s from synthtool import gcp +from synthtool.languages import python gapic = gcp.GAPICBazel() common = gcp.CommonTemplates() @@ -266,8 +267,16 @@ def _merge_dict(d1, d2): # Add templated files # ---------------------------------------------------------------------------- templated_files = gcp.CommonTemplates().py_library( - unit_cov_level=97, cov_level=99, system_test_external_dependencies=["psutil"], + unit_cov_level=97, + cov_level=99, + system_test_external_dependencies=["psutil"], + samples=True, ) s.move(templated_files) +# ---------------------------------------------------------------------------- +# Samples templates +# ---------------------------------------------------------------------------- +python.py_samples() + s.shell.run(["nox", "-s", "blacken"], hide_output=False)