Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up Node#close. #39317

Merged
merged 9 commits into from
Apr 17, 2019
14 changes: 14 additions & 0 deletions server/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* Internal startup code.
Expand Down Expand Up @@ -183,8 +184,15 @@ public void run() {
IOUtils.close(node, spawner);
LoggerContext context = (LoggerContext) LogManager.getContext(false);
Configurator.shutdown(context);
if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
throw new IllegalStateException("Node didn't stop within 10 seconds. " +
"Any outstanding requests or tasks might get killed.");
}
} catch (IOException ex) {
throw new ElasticsearchException("failed to stop node", ex);
} catch (InterruptedException e) {
LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we revert the order here (i.e. first log the message, then restore interrupt status)?

Thread.currentThread().interrupt();
}
}
});
Expand Down Expand Up @@ -267,6 +275,12 @@ private void start() throws NodeValidationException {
static void stop() throws IOException {
try {
IOUtils.close(INSTANCE.node, INSTANCE.spawner);
if (INSTANCE.node != null && INSTANCE.node.awaitClose(10, TimeUnit.SECONDS) == false) {
throw new IllegalStateException("Node didn't stop within 10 seconds. Any outstanding requests or tasks might get killed.");
}
} catch (InterruptedException e) {
LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here I suggest to revert the order.

Thread.currentThread().interrupt();
} finally {
INSTANCE.keepAliveLatch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@

package org.elasticsearch.common.component;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public abstract class AbstractLifecycleComponent implements LifecycleComponent {
private static final Logger logger = LogManager.getLogger(AbstractLifecycleComponent.class);

protected final Lifecycle lifecycle = new Lifecycle();

Expand All @@ -52,59 +49,64 @@ public void removeLifecycleListener(LifecycleListener listener) {

@Override
public void start() {
if (!lifecycle.canMoveToStarted()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeStart();
}
doStart();
lifecycle.moveToStarted();
for (LifecycleListener listener : listeners) {
listener.afterStart();
synchronized (lifecycle) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can become tricky to synchronize on an object that can also be directly accessed by subclasses. The state of Lifecycle has only one field that is declared volatile and as far as I could see all usages in subclasses only query the state and only the base class modifies it. From that perspective we are safe but it is easy to introduce subtle bugs. I wonder whether in the future it would make sense to think about encapsulating Lifecycle in AbstractLifecycleComponent to make this a bit more robust.

if (!lifecycle.canMoveToStarted()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeStart();
}
doStart();
lifecycle.moveToStarted();
for (LifecycleListener listener : listeners) {
listener.afterStart();
}
}
}

protected abstract void doStart();

@Override
public void stop() {
if (!lifecycle.canMoveToStopped()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeStop();
}
lifecycle.moveToStopped();
doStop();
for (LifecycleListener listener : listeners) {
listener.afterStop();
synchronized (lifecycle) {
if (!lifecycle.canMoveToStopped()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeStop();
}
lifecycle.moveToStopped();
doStop();
for (LifecycleListener listener : listeners) {
listener.afterStop();
}
}
}

protected abstract void doStop();

@Override
public void close() {
if (lifecycle.started()) {
stop();
}
if (!lifecycle.canMoveToClosed()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeClose();
}
lifecycle.moveToClosed();
try {
doClose();
} catch (IOException e) {
// TODO: we need to separate out closing (ie shutting down) services, vs releasing runtime transient
// structures. Shutting down services should use IOUtils.close
logger.warn("failed to close " + getClass().getName(), e);
}
for (LifecycleListener listener : listeners) {
listener.afterClose();
synchronized (lifecycle) {
if (lifecycle.started()) {
stop();
}
if (!lifecycle.canMoveToClosed()) {
return;
}
for (LifecycleListener listener : listeners) {
listener.beforeClose();
}
lifecycle.moveToClosed();
try {
doClose();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
for (LifecycleListener listener : listeners) {
listener.afterClose();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,22 @@
* }
* </pre>
* <p>
* NOTE: The Lifecycle class is thread-safe. It is also possible to prevent concurrent state transitions
* by locking on the Lifecycle object itself. This is typically useful when chaining multiple transitions.
* <p>
* Note, closed is only allowed to be called when stopped, so make sure to stop the component first.
* Here is how the logic can be applied:
* Here is how the logic can be applied. A lock of the {@code lifecycleState} object is taken so that
* another thread cannot move the state from {@code STOPPED} to {@code STARTED} before it has moved to
* {@code CLOSED}.
* <pre>
* public void close() {
* if (lifecycleState.started()) {
* stop();
* }
* if (!lifecycleState.moveToClosed()) {
* return;
* synchronized (lifecycleState) {
* if (lifecycleState.started()) {
* stop();
* }
* if (!lifecycleState.moveToClosed()) {
* return;
* }
* }
* // perform close logic here
* }
Expand Down Expand Up @@ -116,7 +123,7 @@ public boolean canMoveToStarted() throws IllegalStateException {
}


public boolean moveToStarted() throws IllegalStateException {
public synchronized boolean moveToStarted() throws IllegalStateException {
State localState = this.state;
if (localState == State.INITIALIZED || localState == State.STOPPED) {
state = State.STARTED;
Expand Down Expand Up @@ -145,7 +152,7 @@ public boolean canMoveToStopped() throws IllegalStateException {
throw new IllegalStateException("Can't move to stopped with unknown state");
}

public boolean moveToStopped() throws IllegalStateException {
public synchronized boolean moveToStopped() throws IllegalStateException {
State localState = state;
if (localState == State.STARTED) {
state = State.STOPPED;
Expand All @@ -171,7 +178,7 @@ public boolean canMoveToClosed() throws IllegalStateException {
return true;
}

public boolean moveToClosed() throws IllegalStateException {
public synchronized boolean moveToClosed() throws IllegalStateException {
State localState = state;
if (localState == State.CLOSED) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,18 @@ public void close() {
public StoredContext stashContext() {
final ThreadContextStruct context = threadLocal.get();
threadLocal.set(null);
return () -> threadLocal.set(context);
return () -> {
// If the node and thus the threadLocal get closed while this task
// is still executing, we don't want this runnable to fail with an
// uncaught exception
try {
threadLocal.set(context);
} catch (IllegalStateException e) {
if (isClosed() == false) {
throw e;
}
}
};
}

/**
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader.CacheHelper;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
Expand Down Expand Up @@ -200,6 +201,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;
final AbstractRefCounted indicesRefCount; // pkg-private for testing
private final CountDownLatch closeLatch = new CountDownLatch(1);

@Override
protected void doStart() {
Expand Down Expand Up @@ -273,6 +275,8 @@ protected void closeInternal() {
indicesQueryCache);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
closeLatch.countDown();
}
}
};
Expand Down Expand Up @@ -311,6 +315,18 @@ protected void doClose() throws IOException {
indicesRefCount.decRef();
}

/**
* Wait for this {@link IndicesService} to be effectively closed. When this returns {@code true}, all shards and shard stores
* are closed and all shard {@link CacheHelper#addClosedListener(org.apache.lucene.index.IndexReader.ClosedListener) closed
* listeners} have run. However some {@link IndexEventListener#onStoreClosed(ShardId) shard closed listeners} might not have
* run.
* @returns true if all shards closed within the given timeout, false otherwise
* @throws InterruptedException if the current thread got interrupted while waiting for shards to close
*/
public boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
return closeLatch.await(timeout, timeUnit);
}

/**
* Returns the node stats indices stats. The {@code includePrevious} flag controls
* if old shards stats will be aggregated as well (only for relevant stats, such as
Expand Down
51 changes: 34 additions & 17 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -783,11 +783,13 @@ private Node stop() {
// In this case the process will be terminated even if the first call to close() has not finished yet.
@Override
public synchronized void close() throws IOException {
if (lifecycle.started()) {
stop();
}
if (!lifecycle.moveToClosed()) {
return;
synchronized (lifecycle) {
if (lifecycle.started()) {
stop();
}
if (!lifecycle.moveToClosed()) {
return;
}
}

logger.info("closing ...");
Expand Down Expand Up @@ -835,21 +837,12 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(ScriptService.class));

toClose.add(() -> stopWatch.stop().start("thread_pool"));
// TODO this should really use ThreadPool.terminate()
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown());
toClose.add(() -> {
try {
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
}
});

toClose.add(() -> stopWatch.stop().start("thread_pool_force_shutdown"));
toClose.add(() -> injector.getInstance(ThreadPool.class).shutdownNow());
// Don't call shutdownNow here, it might break ongoing operations on Lucene indices.
// See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in
// awaitClose if the node doesn't finish closing within the specified time.
toClose.add(() -> stopWatch.stop());


toClose.add(injector.getInstance(NodeEnvironment.class));
toClose.add(injector.getInstance(PageCacheRecycler.class));

Expand All @@ -860,6 +853,30 @@ public synchronized void close() throws IOException {
logger.info("closed");
}

/**
* Wait for this node to be effectively closed.
*/
// synchronized to prevent running concurrently with close()
public synchronized boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
if (lifecycle.closed() == false) {
// We don't want to shutdown the threadpool or interrupt threads on a node that is not
// closed yet.
throw new IllegalStateException("Call close() first");
}


ThreadPool threadPool = injector.getInstance(ThreadPool.class);
final boolean terminated = ThreadPool.terminate(threadPool, timeout, timeUnit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it a bit odd to call a mutating method in an await-style method but I do see the reason why it is needed here.

if (terminated) {
// All threads terminated successfully. Because search, recovery and all other operations
// that run on shards run in the threadpool, indices should be effectively closed by now.
if (nodeService.awaitClose(0, TimeUnit.MILLISECONDS) == false) {
throw new IllegalStateException("Some shards are still open after the threadpool terminated. " +
"Something is leaking index readers or store references.");
}
}
return terminated;
}

/**
* Returns {@code true} if the node is closed.
Expand Down
9 changes: 9 additions & 0 deletions server/src/main/java/org/elasticsearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class NodeService implements Closeable {
private final Settings settings;
Expand Down Expand Up @@ -135,4 +136,12 @@ public void close() throws IOException {
IOUtils.close(indicesService);
}

/**
* Wait for the node to be effectively closed.
* @see IndicesService#awaitClose(long, TimeUnit)
*/
public boolean awaitClose(long timeout, TimeUnit timeUnit) throws InterruptedException {
return indicesService.awaitClose(timeout, timeUnit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ private InetSocketAddress bindToPort(final String name, final InetAddress hostAd
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
closeLock.writeLock().lock();
try {
// No need for locking here since Lifecycle objects can't move from STARTED to INITIALIZED
if (lifecycle.initialized() == false && lifecycle.started() == false) {
throw new IllegalStateException("transport has been stopped");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ public void onFailure(Exception e) {

@Override
public void close() {
lifecycle.moveToStopped();
lifecycle.moveToClosed();
synchronized (lifecycle) {
lifecycle.moveToStopped();
lifecycle.moveToClosed();
}
}

private class ScheduledPing extends AbstractLifecycleRunnable {
Expand Down
Loading