diff --git a/google-cloud-clients/google-cloud-pubsub/README.md b/google-cloud-clients/google-cloud-pubsub/README.md index e6791f7af51c..a6956093f8a3 100644 --- a/google-cloud-clients/google-cloud-pubsub/README.md +++ b/google-cloud-clients/google-cloud-pubsub/README.md @@ -119,6 +119,7 @@ try { } finally { if (publisher != null) { publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } ``` diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 2b1fa858010a..53e36d791fd4 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -27,10 +27,7 @@ import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; -import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.ApiExceptionFactory; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.NoHeaderProvider; import com.google.api.gax.rpc.StatusCode; @@ -46,7 +43,6 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import com.google.pubsub.v1.TopicNames; -import io.grpc.Status; import java.io.IOException; import java.util.Collections; import java.util.Iterator; @@ -424,6 +420,16 @@ public void shutdown() throws Exception { publisherStub.shutdown(); } + /** + * Wait for all work has completed execution after a {@link #shutdown()} request, or the timeout + * occurs, or the current thread is interrupted. + * + *

Call this method to make sure all resources are freed properly. + */ + public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { + return publisherStub.awaitTermination(duration, unit); + } + private boolean hasBatchingBytes() { return getMaxBatchBytes() > 0; } @@ -443,6 +449,7 @@ private boolean hasBatchingBytes() { * } finally { * // When finished with the publisher, make sure to shutdown to free up resources. * publisher.shutdown(); + * publisher.awaitTermination(1, TimeUnit.MINUTES); * } * } */ @@ -463,6 +470,7 @@ public static Builder newBuilder(TopicName topicName) { * } finally { * // When finished with the publisher, make sure to shutdown to free up resources. * publisher.shutdown(); + * publisher.awaitTermination(1, TimeUnit.MINUTES); * } * } */ diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index f02606db4321..de4ca4872459 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -16,6 +16,8 @@ package com.google.cloud.pubsub.it; +import static com.google.common.truth.Truth.assertThat; + import com.google.auto.value.AutoValue; import com.google.cloud.ServiceOptions; import com.google.cloud.pubsub.v1.AckReplyConsumer; @@ -32,20 +34,17 @@ import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; - import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; - -import static com.google.common.truth.Truth.assertThat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; public class ITPubSubTest { @@ -147,6 +146,7 @@ public void failed(Subscriber.State from, Throwable failure) { .publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("msg2")).build()) .get(); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); // Ack the first message. MessageAndConsumer toAck = pollQueue(receiveQueue); diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index bea0ec4e9194..3cded21fac80 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -42,6 +42,7 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessServerBuilder; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -114,6 +115,7 @@ public void testPublishByDuration() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test @@ -152,6 +154,7 @@ public void testPublishByNumBatchedMessages() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount()); assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test @@ -186,6 +189,7 @@ public void testSinglePublishByNumBytes() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test @@ -228,6 +232,7 @@ public void testPublishMixedSizeAndDuration() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().get(0).getMessagesCount()); assertEquals(1, testPublisherServiceImpl.getCapturedRequests().get(1).getMessagesCount()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } private ApiFuture sendTestMessage(Publisher publisher, String data) { @@ -278,6 +283,7 @@ public void testPublishFailureRetries() throws Exception { assertEquals(2, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test(expected = ExecutionException.class) @@ -302,6 +308,7 @@ public void testPublishFailureRetries_retriesDisabled() throws Exception { } finally { assertSame(testPublisherServiceImpl.getCapturedRequests().size(), 1); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } @@ -328,6 +335,7 @@ public void testPublishFailureRetries_maxRetriesSetup() throws Exception { assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test @@ -353,6 +361,7 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception assertEquals(3, testPublisherServiceImpl.getCapturedRequests().size()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test(expected = ExecutionException.class) @@ -381,6 +390,7 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce } finally { assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 1); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } @@ -403,6 +413,7 @@ public void testPublisherGetters() throws Exception { assertEquals(Duration.ofMillis(11), publisher.getBatchingSettings().getDelayThreshold()); assertEquals(12, (long) publisher.getBatchingSettings().getElementCountThreshold()); publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } @Test diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java index 16b1dc7c14b6..65561f3a05a2 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/CreateTopicAndPublishMessages.java @@ -25,10 +25,10 @@ import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; /** * A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub topic and asynchronously @@ -75,6 +75,7 @@ public static void publishMessages() throws Exception { if (publisher != null) { // When finished with the publisher, shutdown to free up resources. publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } // [END pubsub_publish] @@ -123,11 +124,12 @@ public void onSuccess(String messageId) { if (publisher != null) { // When finished with the publisher, shutdown to free up resources. publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } // [END pubsub_publish_error_handler] } - + public static void main(String... args) throws Exception { createTopic(); publishMessages(); diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java index 51638dfe353f..989b7ba96a0f 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/PublisherSnippets.java @@ -35,9 +35,9 @@ import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; -import org.threeten.bp.Duration; - import java.io.FileInputStream; +import java.util.concurrent.TimeUnit; +import org.threeten.bp.Duration; /** This class contains snippets for the {@link Publisher} interface. */ public class PublisherSnippets { @@ -78,6 +78,7 @@ public static void newBuilder(String projectId, String topicId) throws Exception } finally { // When finished with the publisher, make sure to shutdown to free up resources. publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } @@ -108,8 +109,8 @@ public Publisher getPublisherWithCustomRetrySettings(ProjectTopicName topicName) Duration retryDelay = Duration.ofMillis(100); // default : 1 ms double retryDelayMultiplier = 2.0; // back off for repeated failures Duration maxRetryDelay = Duration.ofSeconds(5); // default : 10 seconds - Duration totalTimeout = Duration.ofSeconds(1); // default: 0 - Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 0 + Duration totalTimeout = Duration.ofSeconds(1); // default: 0 + Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 0 Duration maxRpcTimeout = Duration.ofSeconds(10); // default: 0 RetrySettings retrySettings = RetrySettings.newBuilder() diff --git a/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java index ba173ca717f3..50405fc3b26d 100644 --- a/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java +++ b/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java @@ -16,6 +16,9 @@ package com.google.cloud.examples.pubsub.snippets; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; @@ -24,27 +27,22 @@ import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.TopicAdminClient; -import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.ReceivedMessage; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; - import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; public class ITPubSubSnippets { @@ -98,6 +96,7 @@ public void onFailure(Throwable t) { } finally { if (publisher != null) { publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } } @@ -144,6 +143,7 @@ public void testPublisherSyncSubscriber() throws Exception { } finally { if (publisher != null) { publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); } }