From f358fbe3bc67bf83f350ca119c38c4c81b348bde Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 23 Mar 2020 15:17:49 -0700 Subject: [PATCH] Pub/Sub: add SubscriberClient.close() to examples (#3118) * Add SubscriberClient.close() to examples. Co-authored-by: Prad Nelluru Co-authored-by: Prad Nelluru --- pubsub/cloud-client/iam.py | 6 ++ pubsub/cloud-client/iam_test.py | 4 +- pubsub/cloud-client/quickstart/sub.py | 14 ++-- pubsub/cloud-client/quickstart/sub_test.py | 3 + pubsub/cloud-client/requirements.txt | 2 +- pubsub/cloud-client/subscriber.py | 78 ++++++++++++++-------- pubsub/cloud-client/subscriber_test.py | 4 +- 7 files changed, 75 insertions(+), 36 deletions(-) diff --git a/pubsub/cloud-client/iam.py b/pubsub/cloud-client/iam.py index f014ce749022..eb0c8246307b 100644 --- a/pubsub/cloud-client/iam.py +++ b/pubsub/cloud-client/iam.py @@ -51,6 +51,8 @@ def get_subscription_policy(project, subscription_name): print("Policy for subscription {}:".format(subscription_path)) for binding in policy.bindings: print("Role: {}, Members: {}".format(binding.role, binding.members)) + + client.close() # [END pubsub_get_subscription_policy] @@ -101,6 +103,8 @@ def set_subscription_policy(project, subscription_name): subscription_name, policy ) ) + + client.close() # [END pubsub_set_subscription_policy] @@ -144,6 +148,8 @@ def check_subscription_permissions(project, subscription_name): subscription_path, allowed_permissions ) ) + + client.close() # [END pubsub_test_subscription_permissions] diff --git a/pubsub/cloud-client/iam_test.py b/pubsub/cloud-client/iam_test.py index 2b019f9ea16f..f88cde851e7d 100644 --- a/pubsub/cloud-client/iam_test.py +++ b/pubsub/cloud-client/iam_test.py @@ -49,7 +49,9 @@ def topic(publisher_client): @pytest.fixture(scope="module") def subscriber_client(): - yield pubsub_v1.SubscriberClient() + subscriber_client = pubsub_v1.SubscriberClient() + yield subscriber_client + subscriber_client.close() @pytest.fixture diff --git a/pubsub/cloud-client/quickstart/sub.py b/pubsub/cloud-client/quickstart/sub.py index 5791af14d799..1d90726f5b04 100644 --- a/pubsub/cloud-client/quickstart/sub.py +++ b/pubsub/cloud-client/quickstart/sub.py @@ -27,11 +27,13 @@ def sub(project_id, subscription_name): """Receives messages from a Pub/Sub subscription.""" # [START pubsub_quickstart_sub_client] # Initialize a Subscriber client - client = pubsub_v1.SubscriberClient() + subscriber_client = pubsub_v1.SubscriberClient() # [END pubsub_quickstart_sub_client] # Create a fully qualified identifier in the form of # `projects/{project_id}/subscriptions/{subscription_name}` - subscription_path = client.subscription_path(project_id, subscription_name) + subscription_path = subscriber_client.subscription_path( + project_id, subscription_name + ) def callback(message): print( @@ -43,18 +45,20 @@ def callback(message): message.ack() print("Acknowledged message {}\n".format(message.message_id)) - streaming_pull_future = client.subscribe( + streaming_pull_future = subscriber_client.subscribe( subscription_path, callback=callback ) print("Listening for messages on {}..\n".format(subscription_path)) - # Calling result() on StreamingPullFuture keeps the main thread from - # exiting while messages get processed in the callbacks. try: + # Calling result() on StreamingPullFuture keeps the main thread from + # exiting while messages get processed in the callbacks. streaming_pull_future.result() except: # noqa streaming_pull_future.cancel() + subscriber_client.close() + if __name__ == "__main__": parser = argparse.ArgumentParser( diff --git a/pubsub/cloud-client/quickstart/sub_test.py b/pubsub/cloud-client/quickstart/sub_test.py index 07edfad7c4d2..1b59a3d043ac 100644 --- a/pubsub/cloud-client/quickstart/sub_test.py +++ b/pubsub/cloud-client/quickstart/sub_test.py @@ -62,6 +62,7 @@ def subscription_path(topic_path): yield subscription_path subscriber_client.delete_subscription(subscription_path) + subscriber_client.close() def _publish_messages(topic_path): @@ -102,3 +103,5 @@ def mock_result(): out, _ = capsys.readouterr() assert "Received message" in out assert "Acknowledged message" in out + + real_client.close() diff --git a/pubsub/cloud-client/requirements.txt b/pubsub/cloud-client/requirements.txt index a5a8b2bb921a..cc192f6f7c58 100644 --- a/pubsub/cloud-client/requirements.txt +++ b/pubsub/cloud-client/requirements.txt @@ -1 +1 @@ -google-cloud-pubsub==1.1.0 +google-cloud-pubsub==1.3.0 diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 79c9bc4a0aaa..e22efc7b16f3 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -52,6 +52,8 @@ def list_subscriptions_in_project(project_id): for subscription in subscriber.list_subscriptions(project_path): print(subscription.name) + + subscriber.close() # [END pubsub_list_subscriptions] @@ -75,6 +77,8 @@ def create_subscription(project_id, topic_name, subscription_name): ) print("Subscription created: {}".format(subscription)) + + subscriber.close() # [END pubsub_create_pull_subscription] @@ -104,6 +108,8 @@ def create_push_subscription( print("Push subscription created: {}".format(subscription)) print("Endpoint for subscription is: {}".format(endpoint)) + + subscriber.close() # [END pubsub_create_push_subscription] @@ -123,6 +129,8 @@ def delete_subscription(project_id, subscription_name): subscriber.delete_subscription(subscription_path) print("Subscription deleted: {}".format(subscription_path)) + + subscriber.close() # [END pubsub_delete_subscription] @@ -158,6 +166,8 @@ def update_subscription(project_id, subscription_name, endpoint): print("Subscription updated: {}".format(subscription_path)) print("New endpoint for subscription is: {}".format(result.push_config)) + + subscriber.close() # [END pubsub_update_push_configuration] @@ -188,12 +198,14 @@ def callback(message): ) print("Listening for messages on {}..\n".format(subscription_path)) - # result() in a future will block indefinitely if `timeout` is not set, - # unless an exception is encountered first. - try: - streaming_pull_future.result(timeout=timeout) - except: # noqa - streaming_pull_future.cancel() + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except: # noqa + streaming_pull_future.cancel() # [END pubsub_subscriber_async_pull] # [END pubsub_quickstart_subscriber] @@ -230,12 +242,14 @@ def callback(message): ) print("Listening for messages on {}..\n".format(subscription_path)) - # result() in a future will block indefinitely if `timeout` is not set, - # unless an exception is encountered first. - try: - streaming_pull_future.result(timeout=timeout) - except: # noqa - streaming_pull_future.cancel() + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except: # noqa + streaming_pull_future.cancel() # [END pubsub_subscriber_async_pull_custom_attributes] # [END pubsub_subscriber_sync_pull_custom_attributes] @@ -269,12 +283,14 @@ def callback(message): ) print("Listening for messages on {}..\n".format(subscription_path)) - # result() in a future will block indefinitely if `timeout` is not set, - # unless an exception is encountered first. - try: - streaming_pull_future.result(timeout=timeout) - except: # noqa - streaming_pull_future.cancel() + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except: # noqa + streaming_pull_future.cancel() # [END pubsub_subscriber_flow_settings] @@ -309,6 +325,8 @@ def synchronous_pull(project_id, subscription_name): len(response.received_messages) ) ) + + subscriber.close() # [END pubsub_subscriber_sync_pull] @@ -398,6 +416,8 @@ def worker(msg): len(response.received_messages) ) ) + + subscriber.close() # [END pubsub_subscriber_sync_pull_with_lease] @@ -425,17 +445,19 @@ def callback(message): ) print("Listening for messages on {}..\n".format(subscription_path)) - # result() in a future will block indefinitely if `timeout` is not set, - # unless an exception is encountered first. - try: - streaming_pull_future.result(timeout=timeout) - except Exception as e: - streaming_pull_future.cancel() - print( - "Listening for messages on {} threw an exception: {}.".format( - subscription_name, e + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except Exception as e: + streaming_pull_future.cancel() + print( + "Listening for messages on {} threw an exception: {}.".format( + subscription_name, e + ) ) - ) # [END pubsub_subscriber_error_listener] diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 50353c1c6e42..94905d63525d 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -52,7 +52,9 @@ def topic(publisher_client): @pytest.fixture(scope="module") def subscriber_client(): - yield pubsub_v1.SubscriberClient() + subscriber_client = pubsub_v1.SubscriberClient() + yield subscriber_client + subscriber_client.close() @pytest.fixture(scope="module")