Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Logs warning message when Batcher is not closed on GC #746

Merged
merged 2 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Queues up the elements until {@link #flush()} is called; once batching is over, returned future
* resolves.
Expand All @@ -65,11 +71,13 @@
public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
implements Batcher<ElementT, ElementResultT> {

private static final Logger LOG = Logger.getLogger(BatcherImpl.class.getName());
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT>
batchingDescriptor;
private final UnaryCallable<RequestT, ResponseT> unaryCallable;
private final RequestT prototype;
private final BatchingSettings batchingSettings;
private final BatcherReference currentBatcherReference;

private Batch<ElementT, ElementResultT, RequestT, ResponseT> currentOpenBatch;
private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0);
Expand Down Expand Up @@ -110,6 +118,7 @@ public BatcherImpl(
} else {
scheduledFuture = Futures.immediateCancelledFuture();
}
currentBatcherReference = new BatcherReference(this);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -205,6 +214,8 @@ public void close() throws InterruptedException {
flush();
scheduledFuture.cancel(true);
isClosed = true;
currentBatcherReference.closed = true;
currentBatcherReference.clear();
}

/**
Expand Down Expand Up @@ -304,4 +315,98 @@ boolean isCancelled() {
return scheduledFuture.isCancelled();
}
}

/**
* On every Batcher allocation this class will check for garbage collected batchers that were
* never closed and emit warning logs.
*/
@VisibleForTesting
static final class BatcherReference extends WeakReference<BatcherImpl> {

private static final ReferenceQueue<BatcherImpl> refQueue = new ReferenceQueue<>();

// Retain the References so they don't get GC'd
private static final ConcurrentMap<BatcherReference, BatcherReference> refs =
new ConcurrentHashMap<>();

private static final String ALLOCATION_SITE_PROPERTY_NAME =
"com.google.api.gax.batching.v2.Batcher.enableAllocationTracking";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the v2 name


private static final boolean ENABLE_ALLOCATION_TRACKING =
Boolean.parseBoolean(System.getProperty(ALLOCATION_SITE_PROPERTY_NAME, "true"));
private static final RuntimeException missingCallSite = missingCallSite();

private final Reference<RuntimeException> allocationSite;
private volatile boolean closed;

BatcherReference(BatcherImpl referent) {
super(referent, refQueue);
// allocationSite is softReference to make it garbage collectible, but delay it as long as
// possible as BatcherReference can only be weakly referred.
allocationSite =
new SoftReference<>(
ENABLE_ALLOCATION_TRACKING
? new RuntimeException("Batcher allocation site")
: missingCallSite);
refs.put(this, this);
cleanQueue();
}

/**
* This clear() is *not* called automatically by the JVM. As this is a weak ref, the reference
* will be cleared automatically by the JVM, but will not be removed from {@link #refs}. We do
* it here to avoid this ending up on the reference queue.
*/
@Override
public void clear() {
clearInternal();
// We run this here to periodically clean up the queue if any Batcher is being
// closed properly.
cleanQueue();
}

private void clearInternal() {
super.clear();
refs.remove(this);
allocationSite.clear();
}

/**
* It performs below tasks:
*
* <ul>
* <li>Check each batcher registered on refQueue while initialization.
* <li>Unregister them from refQueue.
* <li>If close() is not called on the batcher, then emits log with possible allocationSite.
* <li>Keeps track of number of batcher on which close() is not called.
* </ul>
*/
@VisibleForTesting
static int cleanQueue() {
BatcherReference ref;
int orphanedBatchers = 0;
while ((ref = (BatcherReference) refQueue.poll()) != null) {
RuntimeException maybeAllocationSite = ref.allocationSite.get();
ref.clearInternal(); // technically the reference is gone already.
if (!ref.closed) {
orphanedBatchers++;
if (LOG.isLoggable(Level.SEVERE)) {
String message = "Batcher was not closed properly!!! Make sure to call close().";
LOG.log(Level.SEVERE, message, maybeAllocationSite);
}
}
}
return orphanedBatchers;
}

private static RuntimeException missingCallSite() {
RuntimeException e =
new RuntimeException(
"Batcher allocation site not recorded. Set -D"
+ ALLOCATION_SITE_PROPERTY_NAME
+ "=true to enable it");
e.setStackTrace(new StackTraceElement[0]);
return e;
}
}
}
101 changes: 97 additions & 4 deletions gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import static com.google.api.gax.rpc.testing.FakeBatchableApi.SQUARER_BATCHING_DESC_V2;
import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatcherImpl.BatcherReference;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList;
import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -52,6 +55,10 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Filter;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
Expand Down Expand Up @@ -341,17 +348,18 @@ public Void call() throws InterruptedException {
/** Validates ongoing runnable is cancelled once Batcher is GCed. */
@Test
public void testPushCurrentBatchRunnable() throws Exception {
long delay = 100L;
long DELAY_TIME = 50L;
BatchingSettings settings =
batchingSettings.toBuilder().setDelayThreshold(Duration.ofMillis(delay)).build();
batchingSettings.toBuilder().setDelayThreshold(Duration.ofMillis(DELAY_TIME)).build();
BatcherImpl<Integer, Integer, LabeledIntList, List<Integer>> batcher =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings, EXECUTOR);

BatcherImpl.PushCurrentBatchRunnable<Integer, Integer, LabeledIntList, List<Integer>>
pushBatchRunnable = new BatcherImpl.PushCurrentBatchRunnable<>(batcher);
ScheduledFuture<?> onGoingRunnable =
EXECUTOR.scheduleWithFixedDelay(pushBatchRunnable, delay, delay, TimeUnit.MILLISECONDS);
EXECUTOR.scheduleWithFixedDelay(
pushBatchRunnable, DELAY_TIME, DELAY_TIME, TimeUnit.MILLISECONDS);
pushBatchRunnable.setScheduledFuture(onGoingRunnable);

boolean isExecutorCancelled = pushBatchRunnable.isCancelled();
Expand All @@ -369,7 +377,7 @@ public void testPushCurrentBatchRunnable() throws Exception {
if (isExecutorCancelled) {
break;
}
Thread.sleep(100L * (1L << retry));
Thread.sleep(DELAY_TIME * (1L << retry));
}
// ScheduledFuture should be isCancelled now.
assertThat(pushBatchRunnable.isCancelled()).isTrue();
Expand All @@ -391,6 +399,91 @@ public ApiFuture<List<Integer>> futureCall(
underTest.flush();
}

/**
* Validates the presence of warning in case {@link BatcherImpl} is garbage collected without
* being closed first.
*
* <p>Note:This test cannot run concurrently with other tests that use Batchers.
*/
@Test
public void testUnclosedBatchersAreLogged() throws Exception {
final long DELAY_TIME = 30L;
int actualRemaining = 0;
for (int retry = 0; retry < 3; retry++) {
System.gc();
System.runFinalization();
actualRemaining = BatcherReference.cleanQueue();
if (actualRemaining == 0) {
break;
}
Thread.sleep(DELAY_TIME * (1L << retry));
}
assertThat(actualRemaining).isAtMost(0);
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
callLabeledIntSquarer,
labeledIntList,
batchingSettings,
EXECUTOR);
Batcher<Integer, Integer> extraBatcher =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
callLabeledIntSquarer,
labeledIntList,
batchingSettings,
EXECUTOR);

// Try to capture the log output but without causing terminal noise. Adding the filter must
// be done before clearing the ref or else it might be missed.
final List<LogRecord> records = new ArrayList<>(1);
Logger batcherLogger = Logger.getLogger(BatcherImpl.class.getName());
Filter oldFilter = batcherLogger.getFilter();
batcherLogger.setFilter(
new Filter() {
@Override
public boolean isLoggable(LogRecord record) {
synchronized (records) {
records.add(record);
}
return false;
}
});

try {
// extraBatcher should not create any noise in the console as we called close() on it.
extraBatcher.close();
extraBatcher = null;

underTest = null;
// That *should* have been the last reference. Try to reclaim it.
boolean success = false;
for (int retry = 0; retry < 3; retry++) {
System.gc();
System.runFinalization();
int orphans = BatcherReference.cleanQueue();
if (orphans == 1) {
success = true;
break;
}
// Validates that there are no other batcher instance present while GC cleanup.
assertWithMessage("unexpected extra orphans").that(orphans).isEqualTo(0);
Thread.sleep(DELAY_TIME * (1L << retry));
}
assertWithMessage("Batcher was not garbage collected").that(success).isTrue();

LogRecord lr;
synchronized (records) {
assertThat(records.size()).isEqualTo(1);
lr = records.get(0);
}
assertThat(lr.getMessage()).contains("not closed properly");
assertThat(lr.getLevel()).isEqualTo(Level.SEVERE);
} finally {
batcherLogger.setFilter(oldFilter);
}
}

private void testElementTriggers(BatchingSettings settings) throws Exception {
underTest =
new BatcherImpl<>(
Expand Down