Skip to content

Commit

Permalink
Pub/Sub: add SubscriberClient.close() to examples [(#3118)](GoogleClo…
Browse files Browse the repository at this point in the history
…udPlatform/python-docs-samples#3118)

* Add SubscriberClient.close() to examples.

Co-authored-by: Prad Nelluru <pradn@google.com>
Co-authored-by: Prad Nelluru <prad.nelluru@gmail.com>
  • Loading branch information
3 people authored and plamut committed Jul 10, 2020
1 parent 4571111 commit 6c3f2fc
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 36 deletions.
6 changes: 6 additions & 0 deletions samples/snippets/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def get_subscription_policy(project, subscription_name):
print("Policy for subscription {}:".format(subscription_path))
for binding in policy.bindings:
print("Role: {}, Members: {}".format(binding.role, binding.members))

client.close()
# [END pubsub_get_subscription_policy]


Expand Down Expand Up @@ -101,6 +103,8 @@ def set_subscription_policy(project, subscription_name):
subscription_name, policy
)
)

client.close()
# [END pubsub_set_subscription_policy]


Expand Down Expand Up @@ -144,6 +148,8 @@ def check_subscription_permissions(project, subscription_name):
subscription_path, allowed_permissions
)
)

client.close()
# [END pubsub_test_subscription_permissions]


Expand Down
4 changes: 3 additions & 1 deletion samples/snippets/iam_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def topic(publisher_client):

@pytest.fixture(scope="module")
def subscriber_client():
yield pubsub_v1.SubscriberClient()
subscriber_client = pubsub_v1.SubscriberClient()
yield subscriber_client
subscriber_client.close()


@pytest.fixture
Expand Down
14 changes: 9 additions & 5 deletions samples/snippets/quickstart/sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ def sub(project_id, subscription_name):
"""Receives messages from a Pub/Sub subscription."""
# [START pubsub_quickstart_sub_client]
# Initialize a Subscriber client
client = pubsub_v1.SubscriberClient()
subscriber_client = pubsub_v1.SubscriberClient()
# [END pubsub_quickstart_sub_client]
# Create a fully qualified identifier in the form of
# `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = client.subscription_path(project_id, subscription_name)
subscription_path = subscriber_client.subscription_path(
project_id, subscription_name
)

def callback(message):
print(
Expand All @@ -43,18 +45,20 @@ def callback(message):
message.ack()
print("Acknowledged message {}\n".format(message.message_id))

streaming_pull_future = client.subscribe(
streaming_pull_future = subscriber_client.subscribe(
subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))

# Calling result() on StreamingPullFuture keeps the main thread from
# exiting while messages get processed in the callbacks.
try:
# Calling result() on StreamingPullFuture keeps the main thread from
# exiting while messages get processed in the callbacks.
streaming_pull_future.result()
except: # noqa
streaming_pull_future.cancel()

subscriber_client.close()


if __name__ == "__main__":
parser = argparse.ArgumentParser(
Expand Down
3 changes: 3 additions & 0 deletions samples/snippets/quickstart/sub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def subscription_path(topic_path):
yield subscription_path

subscriber_client.delete_subscription(subscription_path)
subscriber_client.close()


def _publish_messages(topic_path):
Expand Down Expand Up @@ -102,3 +103,5 @@ def mock_result():
out, _ = capsys.readouterr()
assert "Received message" in out
assert "Acknowledged message" in out

real_client.close()
2 changes: 1 addition & 1 deletion samples/snippets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
google-cloud-pubsub==1.1.0
google-cloud-pubsub==1.3.0
78 changes: 50 additions & 28 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def list_subscriptions_in_project(project_id):

for subscription in subscriber.list_subscriptions(project_path):
print(subscription.name)

subscriber.close()
# [END pubsub_list_subscriptions]


Expand All @@ -75,6 +77,8 @@ def create_subscription(project_id, topic_name, subscription_name):
)

print("Subscription created: {}".format(subscription))

subscriber.close()
# [END pubsub_create_pull_subscription]


Expand Down Expand Up @@ -104,6 +108,8 @@ def create_push_subscription(

print("Push subscription created: {}".format(subscription))
print("Endpoint for subscription is: {}".format(endpoint))

subscriber.close()
# [END pubsub_create_push_subscription]


Expand All @@ -123,6 +129,8 @@ def delete_subscription(project_id, subscription_name):
subscriber.delete_subscription(subscription_path)

print("Subscription deleted: {}".format(subscription_path))

subscriber.close()
# [END pubsub_delete_subscription]


Expand Down Expand Up @@ -158,6 +166,8 @@ def update_subscription(project_id, subscription_name, endpoint):

print("Subscription updated: {}".format(subscription_path))
print("New endpoint for subscription is: {}".format(result.push_config))

subscriber.close()
# [END pubsub_update_push_configuration]


Expand Down Expand Up @@ -188,12 +198,14 @@ def callback(message):
)
print("Listening for messages on {}..\n".format(subscription_path))

# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_async_pull]
# [END pubsub_quickstart_subscriber]

Expand Down Expand Up @@ -230,12 +242,14 @@ def callback(message):
)
print("Listening for messages on {}..\n".format(subscription_path))

# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_async_pull_custom_attributes]
# [END pubsub_subscriber_sync_pull_custom_attributes]

Expand Down Expand Up @@ -269,12 +283,14 @@ def callback(message):
)
print("Listening for messages on {}..\n".format(subscription_path))

# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_flow_settings]


Expand Down Expand Up @@ -309,6 +325,8 @@ def synchronous_pull(project_id, subscription_name):
len(response.received_messages)
)
)

subscriber.close()
# [END pubsub_subscriber_sync_pull]


Expand Down Expand Up @@ -398,6 +416,8 @@ def worker(msg):
len(response.received_messages)
)
)

subscriber.close()
# [END pubsub_subscriber_sync_pull_with_lease]


Expand Down Expand Up @@ -425,17 +445,19 @@ def callback(message):
)
print("Listening for messages on {}..\n".format(subscription_path))

# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except Exception as e:
streaming_pull_future.cancel()
print(
"Listening for messages on {} threw an exception: {}.".format(
subscription_name, e
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except Exception as e:
streaming_pull_future.cancel()
print(
"Listening for messages on {} threw an exception: {}.".format(
subscription_name, e
)
)
)
# [END pubsub_subscriber_error_listener]


Expand Down
4 changes: 3 additions & 1 deletion samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def topic(publisher_client):

@pytest.fixture(scope="module")
def subscriber_client():
yield pubsub_v1.SubscriberClient()
subscriber_client = pubsub_v1.SubscriberClient()
yield subscriber_client
subscriber_client.close()


@pytest.fixture(scope="module")
Expand Down

0 comments on commit 6c3f2fc

Please sign in to comment.