Skip to content

Commit

Permalink
Add configuration for HTTP/2 PING scheduler interval and retry threshold
Browse files Browse the repository at this point in the history
- Added a method to configure the execution interval of the scheduler
  that sends HTTP/2 PING frames and periodically checks for ACK responses
- Introduced a retry threshold setting to limit the number of PING transmission attempts
  before considering the connection as unresponsive
- Default values:
  - Scheduler interval must be explicitly set
  - Retry threshold defaults to 0 (no retries, only one PING attempt)

Signed-off-by: raccoonback <kosb15@naver.com>
  • Loading branch information
raccoonback committed Feb 7, 2025
1 parent 4f53922 commit 18196bd
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 94 deletions.
5 changes: 4 additions & 1 deletion docs/modules/ROOT/pages/http-client.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,10 @@ include::{examples-dir}/http2/H2Application.java[lines=18..42]
----
<1> Configures the client to support only `HTTP/2`
<2> Configures `SSL`
<3> You can configure the interval for checking `Ping` frames
<3> Sets the interval for sending `HTTP/2` `PING` frames and receiving `ACK` responses
<4> Sets the execution interval for the scheduler that sends `HTTP/2` `PING frames and periodically checks for `ACK` responses
<5> Sets the threshold for retrying `HTTP/2` `PING` frame transmissions.


The following listing presents a simple `H2C` example:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public interface NettyPipeline {
String OnChannelReadIdle = LEFT + "onChannelReadIdle";
String OnChannelWriteIdle = LEFT + "onChannelWriteIdle";
String ProxyHandler = LEFT + "proxyHandler";
String H2LivenessHandler = LEFT + "h2LivenessHandler";
String H2LivenessHandler = LEFT + "h2LivenessHandler";
/**
* Use to register a special handler which ensures that any {@link io.netty.channel.VoidChannelPromise}
* will be converted to "unvoided" promises.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public static void main(String[] args) {
HttpClient.create()
.protocol(HttpProtocol.H2) //<1>
.secure() //<2>
.http2Settings(builder -> builder.pingInterval(Duration.ofMillis(100))); // <3>
.http2Settings(
builder -> builder.pingAckTimeout(Duration.ofMillis(600)) // <3>
.pingScheduleInterval(Duration.ofMillis(300)) // <4>
.pingAckDropThreshold(2) // <5>
);

Tuple2<String, HttpHeaders> response =
client.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,75 @@ public interface Builder {
//Builder pushEnabled(boolean pushEnabled);

/**
* Sets the interval for checking ping frames.
* If a ping ACK frame is not received within the configured interval, the connection will be closed.
* Sets the interval for sending HTTP/2 PING frames and receiving ACK responses.
*
* <p>Be cautious when setting a very short interval, as it may cause the connection to be closed,
* even if the keep-alive setting is enabled.</p>
* <p>
* This method configures the time interval at which PING frames are sent to the peer.
* The interval should be chosen carefully to balance between detecting connection issues
* and minimizing unnecessary network traffic.
* </p>
*
* <p>If no interval is specified, no ping frame checking will be performed by default.</p>
* <p>
* If the interval is set too short, it may cause excessive network overhead.
* If set too long, connection failures may not be detected in a timely manner.
* </p>
*
* @param pingInterval the duration between sending ping frames. If not specified, ping frame checking is disabled.
* @return {@code this}
* @since 1.2.3
* @param pingAckTimeout the interval in between consecutive PING frames
* and ACK responses. Must be a positive value.
*/
default Builder pingAckTimeout(Duration pingAckTimeout) {
return this;
}

/**
* Sets the execution interval for the scheduler that sends HTTP/2 PING frames
* and periodically checks for ACK responses.
*
* <p>
* This method configures the time interval at which the scheduler runs
* to send PING frames and verify if ACK responses are received within
* the expected timeframe.
* Proper tuning of this interval helps in detecting connection issues
* while avoiding unnecessary network overhead.
* </p>
*
* <p>
* If the interval is too short, it may increase network and CPU usage.
* Conversely, setting it too long may delay the detection of connection failures.
* </p>
*
* @param pingScheduleInterval the interval in at which the scheduler executes.
* Must be a positive value.
*/
default Builder pingInterval(Duration pingInterval) {
default Builder pingScheduleInterval(Duration pingScheduleInterval) {
return this;
}

/**
* Sets the threshold for retrying HTTP/2 PING frame transmissions.
*
* <p>
* This method defines the maximum number of attempts to send a PING frame
* before considering the connection as unresponsive.
* If the threshold is exceeded without receiving an ACK response,
* the connection may be closed or marked as unhealthy.
* </p>
*
* <p>
* A lower threshold can detect connection failures more quickly but may lead
* to premature disconnections. Conversely, a higher threshold allows more retries
* but may delay failure detection.
* </p>
*
* <p>
* If this value is not specified, it defaults to 0, meaning only one attempt to send a PING frame is made without retries.
* </p>
*
* @param pingAckDropThreshold the maximum number of PING transmission attempts.
* Must be a positive integer.
* The default value is 0, meaning no retries will occur and only one PING frame will be sent.
*/
default Builder pingAckDropThreshold(Integer pingAckDropThreshold) {
return this;
}
}
Expand Down Expand Up @@ -195,13 +251,33 @@ public Boolean pushEnabled() {
}

/**
* Returns the configured {@code pingInterval} value or null.
* Returns the configured {@code pingAckTimeout} value or null.
*
* @return the configured {@code pingAckTimeout} value or null
*/
@Nullable
public Duration pingAckTimeout() {
return pingAckTimeout;
}

/**
* Returns the configured {@code pingScheduleInterval} value or null.
*
* @return the configured {@code pingInterval} value or null
* @return the configured {@code pingScheduleInterval} value or null
*/
@Nullable
public Duration pingInterval() {
return pingInterval;
public Duration pingScheduleInterval() {
return pingScheduleInterval;
}

/**
* Returns the configured {@code pingAckDropThreshold} value or null.
*
* @return the configured {@code pingAckDropThreshold} value or null
*/
@Nullable
public Integer pingAckDropThreshold() {
return pingAckDropThreshold;
}

@Override
Expand All @@ -220,7 +296,9 @@ public boolean equals(Object o) {
maxHeaderListSize.equals(that.maxHeaderListSize) &&
Objects.equals(maxStreams, that.maxStreams) &&
Objects.equals(pushEnabled, that.pushEnabled) &&
Objects.equals(pingInterval, that.pingInterval);
Objects.equals(pingAckTimeout, that.pingAckTimeout) &&
Objects.equals(pingScheduleInterval, that.pingScheduleInterval) &&
Objects.equals(pingAckDropThreshold, that.pingAckDropThreshold);
}

@Override
Expand All @@ -233,7 +311,9 @@ public int hashCode() {
result = 31 * result + Long.hashCode(maxHeaderListSize);
result = 31 * result + Long.hashCode(maxStreams);
result = 31 * result + Boolean.hashCode(pushEnabled);
result = 31 * result + Objects.hashCode(pingInterval);
result = 31 * result + Objects.hashCode(pingAckTimeout);
result = 31 * result + Objects.hashCode(pingScheduleInterval);
result = 31 * result + Objects.hashCode(pingAckDropThreshold);
return result;
}

Expand All @@ -244,7 +324,9 @@ public int hashCode() {
final Long maxHeaderListSize;
final Long maxStreams;
final Boolean pushEnabled;
final Duration pingInterval;
final Duration pingAckTimeout;
final Duration pingScheduleInterval;
final Integer pingAckDropThreshold;

Http2SettingsSpec(Build build) {
Http2Settings settings = build.http2Settings;
Expand All @@ -261,12 +343,16 @@ public int hashCode() {
maxHeaderListSize = settings.maxHeaderListSize();
maxStreams = build.maxStreams;
pushEnabled = settings.pushEnabled();
pingInterval = build.pingInterval;
pingAckTimeout = build.pingAckTimeout;
pingScheduleInterval = build.pingScheduleInterval;
pingAckDropThreshold = build.pingAckDropThreshold;
}

static final class Build implements Builder {
Long maxStreams;
Duration pingInterval;
Duration pingAckTimeout;
Duration pingScheduleInterval;
Integer pingAckDropThreshold;
final Http2Settings http2Settings = Http2Settings.defaultSettings();

@Override
Expand Down Expand Up @@ -314,8 +400,20 @@ public Builder maxStreams(long maxStreams) {
}

@Override
public Builder pingInterval(Duration pingInterval) {
this.pingInterval = pingInterval;
public Builder pingAckTimeout(Duration pingAckTimeout) {
this.pingAckTimeout = pingAckTimeout;
return this;
}

@Override
public Builder pingScheduleInterval(Duration pingScheduleInterval) {
this.pingScheduleInterval = pingScheduleInterval;
return this;
}

@Override
public Builder pingAckDropThreshold(Integer pingAckDropThreshold) {
this.pingAckDropThreshold = pingAckDropThreshold;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import reactor.util.annotation.Nullable;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;

Expand Down Expand Up @@ -55,33 +56,57 @@ final class Http2ConnectionLivenessHandler extends ChannelDuplexHandler {
private static final Logger log = Loggers.getLogger(Http2ConnectionLivenessHandler.class);

private ScheduledFuture<?> pingScheduler;

private final ChannelFutureListener pingWriteListener = new PingWriteListener();
private final Http2ConnectionEncoder encoder;
private final long pingIntervalNanos;
private final long pingAckTimeoutNanos;
private final long pingScheduleIntervalNanos;
private final int pingAckDropThreshold;

private int pingAckDropCount;
private long lastSentPingData;
private long lastReceivedPingTime;
private long lastSendingPingTime;
private long lastIoTime;
private boolean isPingAckPending;

public Http2ConnectionLivenessHandler(Http2ConnectionEncoder encoder, @Nullable Duration pingInterval) {
public Http2ConnectionLivenessHandler(Http2ConnectionEncoder encoder, @Nullable Duration pingAckTimeout,
@Nullable Duration pintScheduleInterval, @Nullable Integer pingAckDropThreshold) {
Objects.requireNonNull(encoder, "encoder");

this.encoder = encoder;

if (pingInterval != null) {
this.pingIntervalNanos = pingInterval.toNanos();
if (pingAckTimeout != null) {
this.pingAckTimeoutNanos = pingAckTimeout.toNanos();
}
else {
this.pingAckTimeoutNanos = 0L;
}

if (pintScheduleInterval != null) {
this.pingScheduleIntervalNanos = pintScheduleInterval.toNanos();
}
else {
this.pingIntervalNanos = 0L;
this.pingScheduleIntervalNanos = 0L;
}

if (pingAckDropThreshold != null) {
this.pingAckDropThreshold = pingAckDropThreshold;
}
else {
this.pingAckDropThreshold = 0;
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (isPingIntervalConfigured()) {
isPingAckPending = false;
pingAckDropCount = 0;
pingScheduler = ctx.executor()
.schedule(
new PingChecker(ctx),
pingIntervalNanos,
pingAckTimeoutNanos,
NANOSECONDS
);
}
Expand Down Expand Up @@ -118,14 +143,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cancel();
ctx.fireExceptionCaught(cause);
}

private boolean isPingIntervalConfigured() {
return pingIntervalNanos > 0;
return pingAckTimeoutNanos > 0
&& pingScheduleIntervalNanos > 0;
}

private void cancel() {
Expand Down Expand Up @@ -155,6 +175,7 @@ public void run() {
}

isPingAckPending = false;
pingAckDropCount = 0;
pingScheduler = invokeNextSchedule();
return;
}
Expand All @@ -170,15 +191,28 @@ public void run() {
}

if (isOutOfTimeRange()) {
countPingDrop();

if (isExceedAckDropThreshold()) {
if (log.isInfoEnabled()) {
log.info("Closing {} channel due to delayed ping frame response (timeout: {} ns). lastReceivedPingTime: {}, current: {}", channel, pingAckTimeoutNanos, lastReceivedPingTime, System.nanoTime());
}

close(channel);
return;
}

if (log.isInfoEnabled()) {
log.info("Closing {} channel due to delayed ping frame response (timeout: {} ns).", channel, pingIntervalNanos);
log.info("Drop ping ack frame in {} channel. (ping: {})", channel, lastSentPingData);
}

close(channel);
writePing(ctx);
pingScheduler = invokeNextSchedule();
return;
}

isPingAckPending = false;
pingAckDropCount = 0;
pingScheduler = invokeNextSchedule();
}

Expand All @@ -192,18 +226,26 @@ private void writePing(ChannelHandlerContext ctx) {
}

private boolean isIoInProgress() {
return pingIntervalNanos > (System.nanoTime() - lastIoTime);
return pingAckTimeoutNanos >= (System.nanoTime() - lastIoTime);
}

private boolean isOutOfTimeRange() {
return pingIntervalNanos < (System.nanoTime() - lastReceivedPingTime);
return pingAckTimeoutNanos < Math.abs(lastReceivedPingTime - lastSendingPingTime);
}

private void countPingDrop() {
pingAckDropCount++;
}

private boolean isExceedAckDropThreshold() {
return pingAckDropCount > pingAckDropThreshold;
}

private ScheduledFuture<?> invokeNextSchedule() {
return ctx.executor()
.schedule(
new PingChecker(ctx),
pingIntervalNanos,
pingScheduleIntervalNanos,
NANOSECONDS
);
}
Expand Down Expand Up @@ -233,6 +275,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
}

isPingAckPending = true;
lastSendingPingTime = System.nanoTime();
}
else if (log.isDebugEnabled()) {
log.debug("Failed to wrote PING frame to {} channel.", future.channel());
Expand Down
Loading

0 comments on commit 18196bd

Please sign in to comment.