diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index c200722da..eaf8194ff 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -41,14 +41,20 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.lang.ref.SoftReference; import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - +import java.util.logging.Level; +import java.util.logging.Logger; /** * Queues up the elements until {@link #flush()} is called; once batching is over, returned future * resolves. @@ -65,11 +71,13 @@ public class BatcherImpl implements Batcher { + private static final Logger LOG = Logger.getLogger(BatcherImpl.class.getName()); private final BatchingDescriptor batchingDescriptor; private final UnaryCallable unaryCallable; private final RequestT prototype; private final BatchingSettings batchingSettings; + private final BatcherReference currentBatcherReference; private Batch currentOpenBatch; private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); @@ -110,6 +118,7 @@ public BatcherImpl( } else { scheduledFuture = Futures.immediateCancelledFuture(); } + currentBatcherReference = new BatcherReference(this); } /** {@inheritDoc} */ @@ -205,6 +214,8 @@ public void close() throws InterruptedException { flush(); scheduledFuture.cancel(true); isClosed = true; + currentBatcherReference.closed = true; + currentBatcherReference.clear(); } /** @@ -304,4 +315,98 @@ boolean isCancelled() { return scheduledFuture.isCancelled(); } } + + /** + * On every Batcher allocation this class will check for garbage collected batchers that were + * never closed and emit warning logs. + */ + @VisibleForTesting + static final class BatcherReference extends WeakReference { + + private static final ReferenceQueue refQueue = new ReferenceQueue<>(); + + // Retain the References so they don't get GC'd + private static final ConcurrentMap refs = + new ConcurrentHashMap<>(); + + private static final String ALLOCATION_SITE_PROPERTY_NAME = + "com.google.api.gax.batching.Batcher.enableAllocationTracking"; + + private static final boolean ENABLE_ALLOCATION_TRACKING = + Boolean.parseBoolean(System.getProperty(ALLOCATION_SITE_PROPERTY_NAME, "true")); + private static final RuntimeException missingCallSite = missingCallSite(); + + private final Reference allocationSite; + private volatile boolean closed; + + BatcherReference(BatcherImpl referent) { + super(referent, refQueue); + // allocationSite is softReference to make it garbage collectible, but delay it as long as + // possible as BatcherReference can only be weakly referred. + allocationSite = + new SoftReference<>( + ENABLE_ALLOCATION_TRACKING + ? new RuntimeException("Batcher allocation site") + : missingCallSite); + refs.put(this, this); + cleanQueue(); + } + + /** + * This clear() is *not* called automatically by the JVM. As this is a weak ref, the reference + * will be cleared automatically by the JVM, but will not be removed from {@link #refs}. We do + * it here to avoid this ending up on the reference queue. + */ + @Override + public void clear() { + clearInternal(); + // We run this here to periodically clean up the queue if any Batcher is being + // closed properly. + cleanQueue(); + } + + private void clearInternal() { + super.clear(); + refs.remove(this); + allocationSite.clear(); + } + + /** + * It performs below tasks: + * + *
    + *
  • Check each batcher registered on refQueue while initialization. + *
  • Unregister them from refQueue. + *
  • If close() is not called on the batcher, then emits log with possible allocationSite. + *
  • Keeps track of number of batcher on which close() is not called. + *
+ */ + @VisibleForTesting + static int cleanQueue() { + BatcherReference ref; + int orphanedBatchers = 0; + while ((ref = (BatcherReference) refQueue.poll()) != null) { + RuntimeException maybeAllocationSite = ref.allocationSite.get(); + ref.clearInternal(); // technically the reference is gone already. + if (!ref.closed) { + orphanedBatchers++; + if (LOG.isLoggable(Level.SEVERE)) { + String message = "Batcher was not closed properly!!! Make sure to call close()."; + LOG.log(Level.SEVERE, message, maybeAllocationSite); + } + } + } + return orphanedBatchers; + } + + private static RuntimeException missingCallSite() { + RuntimeException e = + new RuntimeException( + "Batcher allocation site not recorded. Set -D" + + ALLOCATION_SITE_PROPERTY_NAME + + "=true to enable it"); + e.setStackTrace(new StackTraceElement[0]); + return e; + } + } } diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 739399b51..3959f6b87 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -32,14 +32,17 @@ import static com.google.api.gax.rpc.testing.FakeBatchableApi.SQUARER_BATCHING_DESC_V2; import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.BatcherImpl.BatcherReference; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -52,6 +55,10 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Filter; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; import org.junit.After; import org.junit.AfterClass; import org.junit.Test; @@ -341,9 +348,9 @@ public Void call() throws InterruptedException { /** Validates ongoing runnable is cancelled once Batcher is GCed. */ @Test public void testPushCurrentBatchRunnable() throws Exception { - long delay = 100L; + long DELAY_TIME = 50L; BatchingSettings settings = - batchingSettings.toBuilder().setDelayThreshold(Duration.ofMillis(delay)).build(); + batchingSettings.toBuilder().setDelayThreshold(Duration.ofMillis(DELAY_TIME)).build(); BatcherImpl> batcher = new BatcherImpl<>( SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings, EXECUTOR); @@ -351,7 +358,8 @@ public void testPushCurrentBatchRunnable() throws Exception { BatcherImpl.PushCurrentBatchRunnable> pushBatchRunnable = new BatcherImpl.PushCurrentBatchRunnable<>(batcher); ScheduledFuture onGoingRunnable = - EXECUTOR.scheduleWithFixedDelay(pushBatchRunnable, delay, delay, TimeUnit.MILLISECONDS); + EXECUTOR.scheduleWithFixedDelay( + pushBatchRunnable, DELAY_TIME, DELAY_TIME, TimeUnit.MILLISECONDS); pushBatchRunnable.setScheduledFuture(onGoingRunnable); boolean isExecutorCancelled = pushBatchRunnable.isCancelled(); @@ -369,7 +377,7 @@ public void testPushCurrentBatchRunnable() throws Exception { if (isExecutorCancelled) { break; } - Thread.sleep(100L * (1L << retry)); + Thread.sleep(DELAY_TIME * (1L << retry)); } // ScheduledFuture should be isCancelled now. assertThat(pushBatchRunnable.isCancelled()).isTrue(); @@ -391,6 +399,91 @@ public ApiFuture> futureCall( underTest.flush(); } + /** + * Validates the presence of warning in case {@link BatcherImpl} is garbage collected without + * being closed first. + * + *

Note:This test cannot run concurrently with other tests that use Batchers. + */ + @Test + public void testUnclosedBatchersAreLogged() throws Exception { + final long DELAY_TIME = 30L; + int actualRemaining = 0; + for (int retry = 0; retry < 3; retry++) { + System.gc(); + System.runFinalization(); + actualRemaining = BatcherReference.cleanQueue(); + if (actualRemaining == 0) { + break; + } + Thread.sleep(DELAY_TIME * (1L << retry)); + } + assertThat(actualRemaining).isAtMost(0); + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, + callLabeledIntSquarer, + labeledIntList, + batchingSettings, + EXECUTOR); + Batcher extraBatcher = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, + callLabeledIntSquarer, + labeledIntList, + batchingSettings, + EXECUTOR); + + // Try to capture the log output but without causing terminal noise. Adding the filter must + // be done before clearing the ref or else it might be missed. + final List records = new ArrayList<>(1); + Logger batcherLogger = Logger.getLogger(BatcherImpl.class.getName()); + Filter oldFilter = batcherLogger.getFilter(); + batcherLogger.setFilter( + new Filter() { + @Override + public boolean isLoggable(LogRecord record) { + synchronized (records) { + records.add(record); + } + return false; + } + }); + + try { + // extraBatcher should not create any noise in the console as we called close() on it. + extraBatcher.close(); + extraBatcher = null; + + underTest = null; + // That *should* have been the last reference. Try to reclaim it. + boolean success = false; + for (int retry = 0; retry < 3; retry++) { + System.gc(); + System.runFinalization(); + int orphans = BatcherReference.cleanQueue(); + if (orphans == 1) { + success = true; + break; + } + // Validates that there are no other batcher instance present while GC cleanup. + assertWithMessage("unexpected extra orphans").that(orphans).isEqualTo(0); + Thread.sleep(DELAY_TIME * (1L << retry)); + } + assertWithMessage("Batcher was not garbage collected").that(success).isTrue(); + + LogRecord lr; + synchronized (records) { + assertThat(records.size()).isEqualTo(1); + lr = records.get(0); + } + assertThat(lr.getMessage()).contains("not closed properly"); + assertThat(lr.getLevel()).isEqualTo(Level.SEVERE); + } finally { + batcherLogger.setFilter(oldFilter); + } + } + private void testElementTriggers(BatchingSettings settings) throws Exception { underTest = new BatcherImpl<>(