Skip to content

Commit

Permalink
Ensure one-shot wrappers release their delegates
Browse files Browse the repository at this point in the history
We recently adjusted `RunOnce`, `Releasables#releaseOnce` and
`ActionListener#notifyOnce` so that once they have fired they drop the
now-unnecessary reference to the delegate. This commit introduces tests
to verify that this reference does genuinely become unreachable (i.e.
available for garbage collection) as expected.

It also fixes a bug in `ActionListener#notifyOnce` which caused us to
unexpectedly retain a reference to the delegate 🤦

Relates elastic#92452
Relates elastic#92507
Relates elastic#92537
  • Loading branch information
DaveCTurner committed Jan 15, 2023
1 parent 2cdaabe commit a75dd6c
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 5 deletions.
16 changes: 12 additions & 4 deletions libs/core/src/main/java/org/elasticsearch/core/Releasables.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,18 @@ public static Releasable wrap(final Releasable... releasables) {
*/
public static Releasable releaseOnce(final Releasable releasable) {
final var ref = new AtomicReference<>(releasable);
return () -> {
final var acquired = ref.getAndSet(null);
if (acquired != null) {
acquired.close();
return new Releasable() {
@Override
public void close() {
final var acquired = ref.getAndSet(null);
if (acquired != null) {
acquired.close();
}
}

@Override
public String toString() {
return "releaseOnce[" + ref.get() + "]";
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public void onFailure(Exception e) {

@Override
public String toString() {
return "notifyOnce[" + delegate + "]";
return "notifyOnce[" + delegateRef.get() + "]";
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ public void run() {
public boolean hasRun() {
return delegateRef.get() == null;
}

@Override
public String toString() {
return "RunOnce[" + delegateRef.get() + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ReachabilityChecker;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -307,6 +308,15 @@ public void onFailure(Exception e) {
}
}

public void testNotifyOnceReleasesDelegate() {
final var reachabilityChecker = new ReachabilityChecker();
final var listener = ActionListener.notifyOnce(reachabilityChecker.register(ActionListener.wrap(() -> {})));
reachabilityChecker.checkReachable();
listener.onResponse(null);
reachabilityChecker.ensureUnreachable();
assertEquals("notifyOnce[null]", listener.toString());
}

public void testConcurrentNotifyOnce() throws InterruptedException {
final var completed = new AtomicBoolean();
final var listener = ActionListener.notifyOnce(new ActionListener<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ReachabilityChecker;

import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -24,4 +25,13 @@ public void testReleaseOnce() {
releasable.close();
assertEquals(1, count.get());
}

public void testReleaseOnceReleasesDelegate() {
final var reachabilityChecker = new ReachabilityChecker();
final var releaseOnce = Releasables.releaseOnce(reachabilityChecker.register(() -> logger.info("test")));
reachabilityChecker.checkReachable();
releaseOnce.close();
reachabilityChecker.ensureUnreachable();
assertEquals("releaseOnce[null]", releaseOnce.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ReachabilityChecker;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -88,4 +89,13 @@ public void onAfter() {
assertTrue(runOnce.hasRun());
}
}

public void testReleasesDelegate() {
final var reachabilityChecker = new ReachabilityChecker();
final var runOnce = new RunOnce(reachabilityChecker.register(() -> logger.info("test")));
reachabilityChecker.checkReachable();
runOnce.run();
reachabilityChecker.ensureUnreachable();
assertEquals("RunOnce[null]", runOnce.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test;

import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

/**
* Utility class for checking that objects become unreachable when expected.
*/
public class ReachabilityChecker {

private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
private final Queue<Registered> references = ConcurrentCollections.newQueue();

/**
* Register the given target object for reachability checks.
*
* @return the given target object.
*/
public <T> T register(T target) {
var referenceQueue = new ReferenceQueue<>();
references.add(
new Registered(target.toString(), new PhantomReference<>(Objects.requireNonNull(target), referenceQueue), referenceQueue)
);
return target;
}

/**
* Ensure that all registered objects have become unreachable.
*/
public void ensureUnreachable() {
ensureUnreachable(TimeUnit.SECONDS.toMillis(10));
}

void ensureUnreachable(long timeoutMillis) {
Registered registered;
while ((registered = references.poll()) != null) {
registered.assertReferenceEnqueuedForCollection(memoryMXBean, timeoutMillis);
}
}

/**
* From the objects registered since the most recent call to {@link #ensureUnreachable()} (or since the construction of this {@link
* ReachabilityChecker} if {@link #ensureUnreachable()} has not been called) this method chooses one at random and verifies that it has
* not yet become unreachable.
*/
public void checkReachable() {
if (references.peek() == null) {
throw new AssertionError("no references registered");
}

var target = Randomness.get().nextInt(references.size());
var iterator = references.iterator();
for (int i = 0; i < target; i++) {
assertTrue(iterator.hasNext());
assertNotNull(iterator.next());
}

assertTrue(iterator.hasNext());
iterator.next().assertReferenceNotEnqueuedForCollection(memoryMXBean);
}

private static final class Registered {
private final String description;
private final PhantomReference<?> phantomReference;
private final ReferenceQueue<?> referenceQueue;

Registered(String description, PhantomReference<?> phantomReference, ReferenceQueue<?> referenceQueue) {
this.description = description;
this.phantomReference = phantomReference;
this.referenceQueue = referenceQueue;
}

/**
* Attempts to trigger the GC repeatedly until the {@link ReferenceQueue} yields a reference.
*/
public void assertReferenceEnqueuedForCollection(MemoryMXBean memoryMXBean, long timeoutMillis) {
try {
final var timeoutAt = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
while (true) {
memoryMXBean.gc();
final var ref = referenceQueue.remove(500);
if (ref != null) {
ref.clear();
return;
}
assertTrue("still reachable: " + description, System.nanoTime() < timeoutAt);
assertNull(phantomReference.get()); // always succeeds, we're just doing this to use the phantomReference for something
}
} catch (Exception e) {
throw new AssertionError("unexpected", e);
}
}

/**
* Attempts to trigger the GC and verifies that the {@link ReferenceQueue} does not yield a reference.
*/
public void assertReferenceNotEnqueuedForCollection(MemoryMXBean memoryMXBean) {
try {
memoryMXBean.gc();
assertNull("became unreachable: " + description, referenceQueue.remove(100));
} catch (Exception e) {
throw new AssertionError("unexpected", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test;

import org.hamcrest.Matchers;

public class ReachabilityCheckerTests extends ESTestCase {

public void testSuccess() {
final var reachabilityChecker = new ReachabilityChecker();
var target = reachabilityChecker.register(createTarget());
reachabilityChecker.checkReachable();
target = null;
reachabilityChecker.ensureUnreachable();
assertNull(target);
}

public void testBecomesUnreachable() {
final var reachabilityChecker = new ReachabilityChecker();
var target = reachabilityChecker.register(createTarget());
reachabilityChecker.checkReachable();
target = null;
assertThat(
expectThrows(AssertionError.class, reachabilityChecker::checkReachable).getMessage(),
Matchers.startsWith("became unreachable: test object")
);
assertNull(target);
}

public void testStaysReachable() {
final var reachabilityChecker = new ReachabilityChecker();
var target = reachabilityChecker.register(createTarget());
reachabilityChecker.checkReachable();
assertThat(
expectThrows(AssertionError.class, () -> reachabilityChecker.ensureUnreachable(500)).getMessage(),
Matchers.startsWith("still reachable: test object")
);
assertNotNull(target);
}

private static Object createTarget() {
return new Object() {
@Override
public String toString() {
return "test object";
}
};
}
}

0 comments on commit a75dd6c

Please sign in to comment.