From ae614b3daeb167d641d8ede32a92de3ec411c6cc Mon Sep 17 00:00:00 2001 From: Michael Darakananda <pongad@google.com> Date: Mon, 17 Sep 2018 12:28:55 -0700 Subject: [PATCH] pubsub: add Publisher.awaitTermination (#3688) [Newer gRPC versions](https://github.com/grpc/grpc-java/releases/tag/v1.12.0) seem to check that we call this method. Currently shutdown waits for all messages to publish and return before shutting anything down, so awaitTermination likely won't do anything meaningful. In the future, we should make shutdown return promptly and use awaitTermination to wait for messages. I reported this at #3687. Fixes #3648. --- .../google-cloud-pubsub/README.md | 1 + .../com/google/cloud/pubsub/v1/Publisher.java | 16 +++++++++++---- .../google/cloud/pubsub/it/ITPubSubTest.java | 16 +++++++-------- .../cloud/pubsub/v1/PublisherImplTest.java | 11 ++++++++++ .../CreateTopicAndPublishMessages.java | 6 ++++-- .../pubsub/snippets/PublisherSnippets.java | 9 +++++---- .../pubsub/snippets/ITPubSubSnippets.java | 20 +++++++++---------- 7 files changed, 51 insertions(+), 28 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/README.md b/google-cloud-clients/google-cloud-pubsub/README.md index e6791f7af51c..a6956093f8a3 100644 --- a/google-cloud-clients/google-cloud-pubsub/README.md +++ b/google-cloud-clients/google-cloud-pubsub/README.md @@ -119,6 +119,7 @@ try { } finally { if (publisher != null) { publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } ``` diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 2b1fa858010a..53e36d791fd4 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -27,10 +27,7 @@ import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; -import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.ApiExceptionFactory; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.NoHeaderProvider; import com.google.api.gax.rpc.StatusCode; @@ -46,7 +43,6 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import com.google.pubsub.v1.TopicNames; -import io.grpc.Status; import java.io.IOException; import java.util.Collections; import java.util.Iterator; @@ -424,6 +420,16 @@ public void shutdown() throws Exception { publisherStub.shutdown(); } + /** + * Wait for all work has completed execution after a {@link #shutdown()} request, or the timeout + * occurs, or the current thread is interrupted. + * + * <p>Call this method to make sure all resources are freed properly. + */ + public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { + return publisherStub.awaitTermination(duration, unit); + } + private boolean hasBatchingBytes() { return getMaxBatchBytes() > 0; } @@ -443,6 +449,7 @@ private boolean hasBatchingBytes() { * } finally { * // When finished with the publisher, make sure to shutdown to free up resources. * publisher.shutdown(); + * publisher.awaitTermination(1, TimeUnit.MINUTES); * } * }</pre> */ @@ -463,6 +470,7 @@ public static Builder newBuilder(TopicName topicName) { * } finally { * // When finished with the publisher, make sure to shutdown to free up resources. * publisher.shutdown(); + * publisher.awaitTermination(1, TimeUnit.MINUTES); * } * }</pre> */ diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index f02606db4321..de4ca4872459 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -16,6 +16,8 @@ package com.google.cloud.pubsub.it; +import static com.google.common.truth.Truth.assertThat; + import com.google.auto.value.AutoValue; import com.google.cloud.ServiceOptions; import com.google.cloud.pubsub.v1.AckReplyConsumer; @@ -32,20 +34,17 @@ import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; - import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; - -import static com.google.common.truth.Truth.assertThat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; public class ITPubSubTest { @@ -147,6 +146,7 @@ public void failed(Subscriber.State from, Throwable failure) { .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg2")).build()) .get(); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); // Ack the first message. MessageAndConsumer toAck = pollQueue(receiveQueue); diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index bea0ec4e9194..3cded21fac80 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -42,6 +42,7 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessServerBuilder; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -114,6 +115,7 @@ public void testPublishByDuration() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test @@ -152,6 +154,7 @@ public void testPublishByNumBatchedMessages() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount()); assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test @@ -186,6 +189,7 @@ public void testSinglePublishByNumBytes() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test @@ -228,6 +232,7 @@ public void testPublishMixedSizeAndDuration() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount()); assertEquals(1, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } private ApiFuture<String> sendTestMessage(Publisher publisher, String data) { @@ -278,6 +283,7 @@ public void testPublishFailureRetries() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test(expected = ExecutionException.class) @@ -302,6 +308,7 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception { } finally { assertSame(testPublisherServiceImpl.getCapturedRequests().size(), 1); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } @@ -328,6 +335,7 @@ public void testPublishFailureRetries_maxRetriesSetup() throws Exception { assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test @@ -353,6 +361,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test(expected = ExecutionException.class) @@ -381,6 +390,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce } finally { assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 1); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } @@ -403,6 +413,7 @@ public void testPublisherGetters() throws Exception { assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold()); assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java index 16b1dc7c14b6..65561f3a05a2 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java @@ -25,10 +25,10 @@ import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; /** * A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub topic and asynchronously @@ -75,6 +75,7 @@ public static void publishMessages() throws Exception { if (publisher != null) { // When finished with the publisher, shutdown to free up resources. publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } // [END pubsub_publish] @@ -123,11 +124,12 @@ public void onSuccess(String messageId) { if (publisher != null) { // When finished with the publisher, shutdown to free up resources. publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } // [END pubsub_publish_error_handler] } - + public static void main(String... args) throws Exception { createTopic(); publishMessages(); diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java index 51638dfe353f..989b7ba96a0f 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java @@ -35,9 +35,9 @@ import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; -import org.threeten.bp.Duration; - import java.io.FileInputStream; +import java.util.concurrent.TimeUnit; +import org.threeten.bp.Duration; /** This class contains snippets for the {@link Publisher} interface. */ public class PublisherSnippets { @@ -78,6 +78,7 @@ public static void newBuilder(String projectId, String topicId) throws Exception } finally { // When finished with the publisher, make sure to shutdown to free up resources. publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } @@ -108,8 +109,8 @@ public Publisher getPublisherWithCustomRetrySettings(ProjectTopicName topicName) Duration retryDelay = Duration.ofMillis(100); // default : 1 ms double retryDelayMultiplier = 2.0; // back off for repeated failures Duration maxRetryDelay = Duration.ofSeconds(5); // default : 10 seconds - Duration totalTimeout = Duration.ofSeconds(1); // default: 0 - Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 0 + Duration totalTimeout = Duration.ofSeconds(1); // default: 0 + Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 0 Duration maxRpcTimeout = Duration.ofSeconds(10); // default: 0 RetrySettings retrySettings = RetrySettings.newBuilder() diff --git a/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java index ba173ca717f3..50405fc3b26d 100644 --- a/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java +++ b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java @@ -16,6 +16,9 @@ package com.google.cloud.examples.pubsub.snippets; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; @@ -24,27 +27,22 @@ import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.TopicAdminClient; -import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.ReceivedMessage; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; - import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; public class ITPubSubSnippets { @@ -98,6 +96,7 @@ public void onFailure(Throwable t) { } finally { if (publisher != null) { publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } @@ -144,6 +143,7 @@ public void testPublisherSyncSubscriber() throws Exception { } finally { if (publisher != null) { publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } }