Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure one-shot wrappers release their delegates #92928

5 changes: 5 additions & 0 deletions docs/changelog/92928.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 92928
summary: Ensure one-shot wrappers release their delegates
area: Infra/Core
type: bug
issues: []
16 changes: 12 additions & 4 deletions libs/core/src/main/java/org/elasticsearch/core/Releasables.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,18 @@ public static Releasable wrap(final Releasable... releasables) {
*/
public static Releasable releaseOnce(final Releasable releasable) {
final var ref = new AtomicReference<>(releasable);
return () -> {
final var acquired = ref.getAndSet(null);
if (acquired != null) {
acquired.close();
return new Releasable() {
@Override
public void close() {
final var acquired = ref.getAndSet(null);
if (acquired != null) {
acquired.close();
}
}

@Override
public String toString() {
return "releaseOnce[" + ref.get() + "]";
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public void onFailure(Exception e) {

@Override
public String toString() {
return "notifyOnce[" + delegate + "]";
return "notifyOnce[" + delegateRef.get() + "]";
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ public void run() {
public boolean hasRun() {
return delegateRef.get() == null;
}

@Override
public String toString() {
return "RunOnce[" + delegateRef.get() + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ReachabilityChecker;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -307,6 +308,15 @@ public void onFailure(Exception e) {
}
}

public void testNotifyOnceReleasesDelegate() {
final var reachabilityChecker = new ReachabilityChecker();
final var listener = ActionListener.notifyOnce(reachabilityChecker.register(ActionListener.wrap(() -> {})));
reachabilityChecker.checkReachable();
listener.onResponse(null);
reachabilityChecker.ensureUnreachable();
assertEquals("notifyOnce[null]", listener.toString());
}

public void testConcurrentNotifyOnce() throws InterruptedException {
final var completed = new AtomicBoolean();
final var listener = ActionListener.notifyOnce(new ActionListener<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ReachabilityChecker;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
Expand Down Expand Up @@ -164,4 +165,23 @@ public void testThenCombine() throws Exception {
assertThat(exception.getMessage(), containsString("simulated"));
}
}

public void testAddedListenersReleasedOnCompletion() {
final StepListener<Void> step = new StepListener<>();
final ReachabilityChecker reachabilityChecker = new ReachabilityChecker();

for (int i = between(1, 3); i > 0; i--) {
step.addListener(reachabilityChecker.register(ActionListener.wrap(() -> {})));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possibility to extract the allocations of the ActionListeners into an array before we register them? That way we'll minimize the allocations between register calls and ensure that when we call GC we'll get our references cleaned up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it needs that much ceremony to be stable then this is not going to work in general. I wonder, would it work to do the same thing that the G1OverLimitStrategy does?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think if we can somehow reuse that, we are definitely more certain the references will be properly checked for liveness. I also think the 3 action listeners are not a lot of garbage, it won't be a problem if we called System.gc() on creation of the ReachabilityChecker. It's fine as-is with the GC-up-front change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I added a short comment to the class indicating that there is some possible flakiness here. We can revisit this if it turns out to be bad.

}
reachabilityChecker.checkReachable();
if (randomBoolean()) {
step.onResponse(null);
} else {
step.onFailure(new ElasticsearchException("simulated"));
}
reachabilityChecker.ensureUnreachable();

step.addListener(reachabilityChecker.register(ActionListener.wrap(() -> {})));
reachabilityChecker.ensureUnreachable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
*/
package org.elasticsearch.action.support;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ReachabilityChecker;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transports;
Expand Down Expand Up @@ -159,4 +161,23 @@ private static void awaitSafe(CyclicBarrier barrier) {
throw new AssertionError("unexpected", e);
}
}

public void testAddedListenersReleasedOnCompletion() {
final ListenableActionFuture<Void> future = new ListenableActionFuture<>();
final ReachabilityChecker reachabilityChecker = new ReachabilityChecker();

for (int i = between(1, 3); i > 0; i--) {
future.addListener(reachabilityChecker.register(ActionListener.wrap(() -> {})));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above, moving as much allocation as we can ahead of time to make sure we don't hit a GC. Perhaps the most prudent thing to do in all these tests is call System.gc() before we go into any of the test code. With that we ensure that at least the new space in generational collectors (e.g. G1) will be collected.

}
reachabilityChecker.checkReachable();
if (randomBoolean()) {
future.onResponse(null);
} else {
future.onFailure(new ElasticsearchException("simulated"));
}
reachabilityChecker.ensureUnreachable();

future.addListener(reachabilityChecker.register(ActionListener.wrap(() -> {})));
reachabilityChecker.ensureUnreachable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ReachabilityChecker;

import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -24,4 +25,15 @@ public void testReleaseOnce() {
releasable.close();
assertEquals(1, count.get());
}

public void testReleaseOnceReleasesDelegate() {
final var reachabilityChecker = new ReachabilityChecker();
final var releaseOnce = Releasables.releaseOnce(reachabilityChecker.register(this::noop));
reachabilityChecker.checkReachable();
releaseOnce.close();
reachabilityChecker.ensureUnreachable();
assertEquals("releaseOnce[null]", releaseOnce.toString());
}

private void noop() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ReachabilityChecker;
import org.junit.After;

import java.util.concurrent.BrokenBarrierException;
Expand Down Expand Up @@ -124,4 +126,23 @@ public void testConcurrentListenerRegistrationAndCompletion() throws BrokenBarri
assertEquals(numberOfThreads - 1, numResponses.get());
assertEquals(0, numExceptions.get());
}

public void testAddedListenersReleasedOnCompletion() {
final ListenableFuture<Void> future = new ListenableFuture<>();
final ReachabilityChecker reachabilityChecker = new ReachabilityChecker();

for (int i = between(1, 3); i > 0; i--) {
future.addListener(reachabilityChecker.register(ActionListener.wrap(() -> {})));
}
reachabilityChecker.checkReachable();
if (randomBoolean()) {
future.onResponse(null);
} else {
future.onFailure(new ElasticsearchException("simulated"));
}
reachabilityChecker.ensureUnreachable();

future.addListener(reachabilityChecker.register(ActionListener.wrap(() -> {})));
reachabilityChecker.ensureUnreachable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ReachabilityChecker;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -88,4 +89,15 @@ public void onAfter() {
assertTrue(runOnce.hasRun());
}
}

public void testReleasesDelegate() {
final var reachabilityChecker = new ReachabilityChecker();
final var runOnce = new RunOnce(reachabilityChecker.register(this::noop));
reachabilityChecker.checkReachable();
runOnce.run();
reachabilityChecker.ensureUnreachable();
assertEquals("RunOnce[null]", runOnce.toString());
}

private void noop() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test;

import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

/**
* Utility class for checking that objects become unreachable when expected.
*/
public class ReachabilityChecker {

private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
private final Queue<Registered> references = ConcurrentCollections.newQueue();

public ReachabilityChecker() {
memoryMXBean.gc();
}

/**
* Register the given target object for reachability checks.
*
* @return the given target object.
*/
public <T> T register(T target) {
var referenceQueue = new ReferenceQueue<>();
references.add(
new Registered(target.toString(), new PhantomReference<>(Objects.requireNonNull(target), referenceQueue), referenceQueue)
);
return target;
}

/**
* Ensure that all registered objects have become unreachable.
*/
public void ensureUnreachable() {
ensureUnreachable(TimeUnit.SECONDS.toMillis(10));
}

void ensureUnreachable(long timeoutMillis) {
Registered registered;
while ((registered = references.poll()) != null) {
registered.assertReferenceEnqueuedForCollection(memoryMXBean, timeoutMillis);
}
}

/**
* From the objects registered since the most recent call to {@link #ensureUnreachable()} (or since the construction of this {@link
* ReachabilityChecker} if {@link #ensureUnreachable()} has not been called) this method chooses one at random and verifies that it has
* not yet become unreachable.
*/
public void checkReachable() {
if (references.peek() == null) {
throw new AssertionError("no references registered");
}

var target = Randomness.get().nextInt(references.size());
var iterator = references.iterator();
for (int i = 0; i < target; i++) {
assertTrue(iterator.hasNext());
assertNotNull(iterator.next());
}

assertTrue(iterator.hasNext());
iterator.next().assertReferenceNotEnqueuedForCollection(memoryMXBean);
}

private static final class Registered {
private final String description;
private final PhantomReference<?> phantomReference;
private final ReferenceQueue<?> referenceQueue;

Registered(String description, PhantomReference<?> phantomReference, ReferenceQueue<?> referenceQueue) {
this.description = description;
this.phantomReference = phantomReference;
this.referenceQueue = referenceQueue;
}

/**
* Attempts to trigger the GC repeatedly until the {@link ReferenceQueue} yields a reference.
*/
public void assertReferenceEnqueuedForCollection(MemoryMXBean memoryMXBean, long timeoutMillis) {
try {
final var timeoutAt = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
while (true) {
memoryMXBean.gc();
final var ref = referenceQueue.remove(500);
if (ref != null) {
ref.clear();
return;
}
assertTrue("still reachable: " + description, System.nanoTime() < timeoutAt);
assertNull(phantomReference.get()); // always succeeds, we're just doing this to use the phantomReference for something
}
} catch (Exception e) {
throw new AssertionError("unexpected", e);
}
}

/**
* Attempts to trigger the GC and verifies that the {@link ReferenceQueue} does not yield a reference.
*/
public void assertReferenceNotEnqueuedForCollection(MemoryMXBean memoryMXBean) {
try {
memoryMXBean.gc();
assertNull("became unreachable: " + description, referenceQueue.remove(100));
} catch (Exception e) {
throw new AssertionError("unexpected", e);
}
}
}
}
Loading