Skip to content

Commit

Permalink
Gracefully shutdown elasticsearch (#96363)
Browse files Browse the repository at this point in the history
Early in shutdown, stop listening for HTTP requests and gracefully close all HTTP connections.

Adds `http.shutdown_grace_period` setting, the maximum amount of time to wait for in-flight HTTP requests to finish.  After that time, the http channels are all closed.

Graceful shutdown procedure:
1) Stop listening for new HTTP connections
2) Tell all new requests to add `Connection: close` response header and close the channel after the request.
3) Wait up to the grace period for all open connections to close
4) If grace period expired, close all remaining connections

Fixes: #96147
  • Loading branch information
stu-elastic authored Jun 6, 2023
1 parent 2e81bcc commit 18e0fea
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 50 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/96363.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 96363
summary: Gracefully shutdown elasticsearch
area: Infra/Node Lifecycle
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ public void apply(Settings value, Settings current, Settings previous) {
HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT,
HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE,
HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH,
HttpTransportSettings.SETTING_HTTP_SERVER_SHUTDOWN_GRACE_PERIOD,
HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT,
HttpTransportSettings.SETTING_HTTP_RESET_COOKIES,
HttpTransportSettings.SETTING_HTTP_TCP_NO_DELAY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
Expand All @@ -28,6 +29,7 @@
import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.core.AbstractRefCounted;
Expand All @@ -54,6 +56,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -63,6 +66,7 @@
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PORT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_PORT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_SERVER_SHUTDOWN_GRACE_PERIOD;

public abstract class AbstractHttpServerTransport extends AbstractLifecycleComponent implements HttpServerTransport {
private static final Logger logger = LogManager.getLogger(AbstractHttpServerTransport.class);
Expand All @@ -87,6 +91,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
private final PlainActionFuture<Void> allClientsClosedListener = PlainActionFuture.newFuture();
private final RefCounted refCounted = AbstractRefCounted.of(() -> allClientsClosedListener.onResponse(null));
private final Set<HttpServerChannel> httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final long shutdownGracePeriodMillis;
private final HttpClientStatsTracker httpClientStatsTracker;

private final HttpTracer httpLogger;
Expand Down Expand Up @@ -136,6 +141,7 @@ protected AbstractHttpServerTransport(
);
slowLogThresholdMs = TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings).getMillis();
httpClientStatsTracker = new HttpClientStatsTracker(settings, clusterSettings, threadPool);
shutdownGracePeriodMillis = SETTING_HTTP_SERVER_SHUTDOWN_GRACE_PERIOD.get(settings).getMillis();
}

public Recycler<BytesRef> recycler() {
Expand Down Expand Up @@ -216,6 +222,18 @@ private TransportAddress bindAddress(final InetAddress hostAddress) {

protected abstract HttpServerChannel bind(InetSocketAddress hostAddress) throws Exception;

/**
* Gracefully shut down. If {@link HttpTransportSettings#SETTING_HTTP_SERVER_SHUTDOWN_GRACE_PERIOD} is zero, the default, then
* forcefully close all open connections immediately.
* Serially run through the following steps:
* 1) Stop listening for new HTTP connections, which means no new HttpChannel are added to the {@link #httpChannels} list
* 2) Add the {@code Connection: close} response header to all new requests on existing {@link #httpChannels} and close the HttpChannel
* after the new request completes
* 3) If grace period is set, wait for all {@link #httpChannels} to close via 2 for up to the configured grace period,
* {@link #shutdownGracePeriodMillis}.
* If all connections are closed before the expiration of the grace period, stop waiting early.
* 4) Close all open httpChannels even if requests are in flight.
*/
@Override
protected void doStop() {
synchronized (httpServerChannels) {
Expand All @@ -229,22 +247,41 @@ protected void doStop() {
}
}
}
try {
refCounted.decRef();
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
gracefullyCloseConnections();
refCounted.decRef();
boolean closed = false;
if (shutdownGracePeriodMillis > 0) {
try {
FutureUtils.get(allClientsClosedListener, shutdownGracePeriodMillis, TimeUnit.MILLISECONDS);
closed = true;
} catch (ElasticsearchTimeoutException t) {
logger.warn(format("timed out while waiting [%d]ms for clients to close connections", shutdownGracePeriodMillis));
}
}
if (closed == false) {
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}

try {
allClientsClosedListener.get();
} catch (Exception e) {
assert false : e;
logger.warn("unexpected exception while waiting for http channels to close", e);
try {
allClientsClosedListener.get();
} catch (Exception e) {
assert false : e;
logger.warn("unexpected exception while waiting for http channels to close", e);
}
}
stopInternal();
}

/**
* Close the client channel after a new request.
*/
void gracefullyCloseConnections() {
gracefullyCloseConnections = true;
}

@Override
protected void doClose() {}

Expand Down Expand Up @@ -513,8 +550,4 @@ private static ActionListener<Void> earlyResponseListener(HttpRequest request, H
public ThreadPool getThreadPool() {
return threadPool;
}

public void gracefullyCloseConnections() {
gracefullyCloseConnections = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ public final class HttpTransportSettings {
new ByteSizeValue(4, ByteSizeUnit.KB),
Property.NodeScope
);

public static final Setting<TimeValue> SETTING_HTTP_SERVER_SHUTDOWN_GRACE_PERIOD = Setting.positiveTimeSetting(
"http.shutdown_grace_period",
TimeValue.timeValueMillis(0),
Setting.Property.NodeScope
);

// don't reset cookies by default, since I don't think we really need to
// note, parsing cookies was fixed in netty 3.5.1 regarding stack allocation, but still, currently, we don't need cookies
public static final Setting<Boolean> SETTING_HTTP_RESET_COOKIES = Setting.boolSetting("http.reset_cookies", false, Property.NodeScope);
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand Down Expand Up @@ -1699,7 +1700,20 @@ public synchronized void close() throws IOException {
* logic should use Node Shutdown, see {@link org.elasticsearch.cluster.metadata.NodesShutdownMetadata}.
*/
public void prepareForClose() {
HttpServerTransport httpServerTransport = injector.getInstance(HttpServerTransport.class);
FutureTask<Void> stopper = new FutureTask<>(() -> {
httpServerTransport.stop();
return null;
});
new Thread(stopper, "http-server-transport-stop").start();

Optional.ofNullable(terminationHandler.get()).ifPresent(TerminationHandler::handleTermination);

try {
stopper.get();
} catch (Exception e) {
logger.warn("unexpected exception while waiting for http server to close", e);
}
}

/**
Expand Down
Loading

0 comments on commit 18e0fea

Please sign in to comment.