Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Logs warning message when Batcher is not closed on GC
Browse files Browse the repository at this point in the history
Displays a warning in the console when Batchers are garbage collected without calling to Batcher#close().

Derived from [ManagedChannelImpl](https://github.com/grpc/grpc-java/blob/48ca4527c14a95914f9cb7f58ec72997cb96899a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java#L1262).

 - Addressing feedback for logging messages
 - Rebase with current master
 - Minimize the sleep time in the test case
  • Loading branch information
rahulKQL committed Aug 14, 2019
1 parent 0de16d5 commit f870cd4
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 5 deletions.
107 changes: 106 additions & 1 deletion gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Queues up the elements until {@link #flush()} is called; once batching is over, returned future
* resolves.
Expand All @@ -65,11 +71,13 @@
public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
implements Batcher<ElementT, ElementResultT> {

private static final Logger LOG = Logger.getLogger(BatcherImpl.class.getName());
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT>
batchingDescriptor;
private final UnaryCallable<RequestT, ResponseT> unaryCallable;
private final RequestT prototype;
private final BatchingSettings batchingSettings;
private final BatcherReference currentBatcherReference;

private Batch<ElementT, ElementResultT, RequestT, ResponseT> currentOpenBatch;
private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0);
Expand Down Expand Up @@ -110,6 +118,7 @@ public BatcherImpl(
} else {
scheduledFuture = Futures.immediateCancelledFuture();
}
currentBatcherReference = new BatcherReference(this);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -205,6 +214,8 @@ public void close() throws InterruptedException {
flush();
scheduledFuture.cancel(true);
isClosed = true;
currentBatcherReference.closed = true;
currentBatcherReference.clear();
}

/**
Expand Down Expand Up @@ -304,4 +315,98 @@ boolean isCancelled() {
return scheduledFuture.isCancelled();
}
}

/**
* On every Batcher allocation this class will check for garbage collected batchers that were
* never closed and emit warning logs.
*/
@VisibleForTesting
static final class BatcherReference extends WeakReference<BatcherImpl> {

private static final ReferenceQueue<BatcherImpl> refQueue = new ReferenceQueue<>();

// Retain the References so they don't get GC'd
private static final ConcurrentMap<BatcherReference, BatcherReference> refs =
new ConcurrentHashMap<>();

private static final String ALLOCATION_SITE_PROPERTY_NAME =
"com.google.api.gax.batching.v2.Batcher.enableAllocationTracking";

private static final boolean ENABLE_ALLOCATION_TRACKING =
Boolean.parseBoolean(System.getProperty(ALLOCATION_SITE_PROPERTY_NAME, "true"));
private static final RuntimeException missingCallSite = missingCallSite();

private final Reference<RuntimeException> allocationSite;
private volatile boolean closed;

BatcherReference(BatcherImpl referent) {
super(referent, refQueue);
// allocationSite is softReference to make it garbage collectible, but delay it as long as
// possible as BatcherReference can only be weakly referred.
allocationSite =
new SoftReference<>(
ENABLE_ALLOCATION_TRACKING
? new RuntimeException("Batcher allocation site")
: missingCallSite);
refs.put(this, this);
cleanQueue();
}

/**
* This clear() is *not* called automatically by the JVM. As this is a weak ref, the reference
* will be cleared automatically by the JVM, but will not be removed from {@link #refs}. We do
* it here to avoid this ending up on the reference queue.
*/
@Override
public void clear() {
clearInternal();
// We run this here to periodically clean up the queue if any Batcher is being
// closed properly.
cleanQueue();
}

private void clearInternal() {
super.clear();
refs.remove(this);
allocationSite.clear();
}

/**
* It performs below tasks:
*
* <ul>
* <li>Check each batcher registered on refQueue while initialization.
* <li>Unregister them from refQueue.
* <li>If close() is not called on the batcher, then emits log with possible allocationSite.
* <li>Keeps track of number of batcher on which close() is not called.
* </ul>
*/
@VisibleForTesting
static int cleanQueue() {
BatcherReference ref;
int orphanedBatchers = 0;
while ((ref = (BatcherReference) refQueue.poll()) != null) {
RuntimeException maybeAllocationSite = ref.allocationSite.get();
ref.clearInternal(); // technically the reference is gone already.
if (!ref.closed) {
orphanedBatchers++;
if (LOG.isLoggable(Level.SEVERE)) {
String message = "Batcher was not closed properly!!! Make sure to call close().";
LOG.log(Level.SEVERE, message, maybeAllocationSite);
}
}
}
return orphanedBatchers;
}

private static RuntimeException missingCallSite() {
RuntimeException e =
new RuntimeException(
"Batcher allocation site not recorded. Set -D"
+ ALLOCATION_SITE_PROPERTY_NAME
+ "=true to enable it");
e.setStackTrace(new StackTraceElement[0]);
return e;
}
}
}
101 changes: 97 additions & 4 deletions gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import static com.google.api.gax.rpc.testing.FakeBatchableApi.SQUARER_BATCHING_DESC_V2;
import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatcherImpl.BatcherReference;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList;
import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -52,6 +55,10 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Filter;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test;
Expand Down Expand Up @@ -341,17 +348,18 @@ public Void call() throws InterruptedException {
/** Validates ongoing runnable is cancelled once Batcher is GCed. */
@Test
public void testPushCurrentBatchRunnable() throws Exception {
long delay = 100L;
long DELAY_TIME = 50L;
BatchingSettings settings =
batchingSettings.toBuilder().setDelayThreshold(Duration.ofMillis(delay)).build();
batchingSettings.toBuilder().setDelayThreshold(Duration.ofMillis(DELAY_TIME)).build();
BatcherImpl<Integer, Integer, LabeledIntList, List<Integer>> batcher =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings, EXECUTOR);

BatcherImpl.PushCurrentBatchRunnable<Integer, Integer, LabeledIntList, List<Integer>>
pushBatchRunnable = new BatcherImpl.PushCurrentBatchRunnable<>(batcher);
ScheduledFuture<?> onGoingRunnable =
EXECUTOR.scheduleWithFixedDelay(pushBatchRunnable, delay, delay, TimeUnit.MILLISECONDS);
EXECUTOR.scheduleWithFixedDelay(
pushBatchRunnable, DELAY_TIME, DELAY_TIME, TimeUnit.MILLISECONDS);
pushBatchRunnable.setScheduledFuture(onGoingRunnable);

boolean isExecutorCancelled = pushBatchRunnable.isCancelled();
Expand All @@ -369,7 +377,7 @@ public void testPushCurrentBatchRunnable() throws Exception {
if (isExecutorCancelled) {
break;
}
Thread.sleep(100L * (1L << retry));
Thread.sleep(DELAY_TIME * (1L << retry));
}
// ScheduledFuture should be isCancelled now.
assertThat(pushBatchRunnable.isCancelled()).isTrue();
Expand All @@ -391,6 +399,91 @@ public ApiFuture<List<Integer>> futureCall(
underTest.flush();
}

/**
* Validates the presence of warning in case {@link BatcherImpl} is garbage collected without
* being closed first.
*
* <p>Note:This test cannot run concurrently with other tests that use Batchers.
*/
@Test
public void testUnclosedBatchersAreLogged() throws Exception {
final long DELAY_TIME = 30L;
int actualRemaining = 0;
for (int retry = 0; retry < 3; retry++) {
System.gc();
System.runFinalization();
actualRemaining = BatcherReference.cleanQueue();
if (actualRemaining == 0) {
break;
}
Thread.sleep(DELAY_TIME * (1L << retry));
}
assertThat(actualRemaining).isAtMost(0);
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
callLabeledIntSquarer,
labeledIntList,
batchingSettings,
EXECUTOR);
Batcher<Integer, Integer> extraBatcher =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
callLabeledIntSquarer,
labeledIntList,
batchingSettings,
EXECUTOR);

// Try to capture the log output but without causing terminal noise. Adding the filter must
// be done before clearing the ref or else it might be missed.
final List<LogRecord> records = new ArrayList<>(1);
Logger batcherLogger = Logger.getLogger(BatcherImpl.class.getName());
Filter oldFilter = batcherLogger.getFilter();
batcherLogger.setFilter(
new Filter() {
@Override
public boolean isLoggable(LogRecord record) {
synchronized (records) {
records.add(record);
}
return false;
}
});

try {
// extraBatcher should not create any noise in the console as we called close() on it.
extraBatcher.close();
extraBatcher = null;

underTest = null;
// That *should* have been the last reference. Try to reclaim it.
boolean success = false;
for (int retry = 0; retry < 3; retry++) {
System.gc();
System.runFinalization();
int orphans = BatcherReference.cleanQueue();
if (orphans == 1) {
success = true;
break;
}
// Validates that there are no other batcher instance present while GC cleanup.
assertWithMessage("unexpected extra orphans").that(orphans).isEqualTo(0);
Thread.sleep(DELAY_TIME * (1L << retry));
}
assertWithMessage("Batcher was not garbage collected").that(success).isTrue();

LogRecord lr;
synchronized (records) {
assertThat(records.size()).isEqualTo(1);
lr = records.get(0);
}
assertThat(lr.getMessage()).contains("not closed properly");
assertThat(lr.getLevel()).isEqualTo(Level.SEVERE);
} finally {
batcherLogger.setFilter(oldFilter);
}
}

private void testElementTriggers(BatchingSettings settings) throws Exception {
underTest =
new BatcherImpl<>(
Expand Down

0 comments on commit f870cd4

Please sign in to comment.