Skip to content

Commit

Permalink
Rntbd health check improvement 2 (#33464)
Browse files Browse the repository at this point in the history
* rntbd healthcheck timeout detection improvement

---------

Co-authored-by: Fabian Meiswinkel <fabian@meiswinkel.com>
Co-authored-by: annie-mac <annie-mac@WCSMTOPVT44P022.redmond.corp.microsoft.com>
Co-authored-by: annie-mac <annie-mac@annie-macs-MacBook-Pro.local>
  • Loading branch information
4 people authored Feb 16, 2023
1 parent 01d23dd commit cd4e903
Show file tree
Hide file tree
Showing 25 changed files with 1,010 additions and 298 deletions.
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
#### Breaking Changes

#### Bugs Fixed
- Change feed pull API is suing an incorrect key value for collection lookup, which can result in using the old collection in collection recreate scenarios. - See [PR 33178](https://github.com/Azure/azure-sdk-for-java/pull/33178)
* Change feed pull API is using an incorrect key value for collection lookup, which can result in using the old collection in collection recreate scenarios. - See [PR 33178](https://github.com/Azure/azure-sdk-for-java/pull/33178)
* Added improvement in `RntbdClientChannelHealthChecker` for detecting transit timeout. - See [PR 33464](https://github.com/Azure/azure-sdk-for-java/pull/33464)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,29 +278,6 @@ public static <E extends CosmosException> RntbdChannelAcquisitionTimeline getCha
return e.getChannelAcquisitionTimeline();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> E setChannelTaskQueueSize(E e, int value) {
e.setRntbdChannelTaskQueueSize(value);
return e;
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> int getRntbdPendingRequestQueueSize(E e) {
return e.getRntbdPendingRequestQueueSize();
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> E setRntbdPendingRequestQueueSize(E e, int value) {
e.setRntbdPendingRequestQueueSize(value);
return e;
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> int getChannelTaskQueueSize(E e) {
return e.getRntbdChannelTaskQueueSize();
}


@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <E extends CosmosException> E setRntbdRequestLength(E e, int requestLen) {
e.setRntbdRequestLength(requestLen);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.azure.cosmos.implementation.batch.BatchExecUtils;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelStatistics;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpointStatistics;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -83,14 +84,14 @@ public class CosmosException extends AzureException {
private CosmosError cosmosError;

/**
* RNTBD channel task queue size
* RNTBD endpoint statistics
*/
private int rntbdChannelTaskQueueSize;
private RntbdEndpointStatistics rntbdEndpointStatistics;

/**
* RNTBD endpoint statistics
*/
private RntbdEndpointStatistics rntbdEndpointStatistics;
private RntbdChannelStatistics rntbdChannelStatistics;

/**
* LSN
Expand Down Expand Up @@ -122,11 +123,6 @@ public class CosmosException extends AzureException {
*/
private int requestPayloadLength;

/**
* RNTBD pending request queue size
*/
private int rntbdPendingRequestQueueSize;

/**
* RNTBD request length
*/
Expand Down Expand Up @@ -500,6 +496,14 @@ RntbdEndpointStatistics getRntbdServiceEndpointStatistics() {
return this.rntbdEndpointStatistics;
}

RntbdChannelStatistics getRntbdChannelStatistics() {
return this.rntbdChannelStatistics;
}

void setRntbdChannelStatistics(RntbdChannelStatistics rntbdChannelStatistics) {
this.rntbdChannelStatistics = rntbdChannelStatistics;
}

void setRntbdRequestLength(int rntbdRequestLength) {
this.rntbdRequestLength = rntbdRequestLength;
}
Expand Down Expand Up @@ -532,22 +536,6 @@ void setSendingRequestHasStarted(boolean hasSendingRequestStarted) {
this.sendingRequestHasStarted = hasSendingRequestStarted;
}

int getRntbdChannelTaskQueueSize() {
return this.rntbdChannelTaskQueueSize;
}

void setRntbdChannelTaskQueueSize(int rntbdChannelTaskQueueSize) {
this.rntbdChannelTaskQueueSize = rntbdChannelTaskQueueSize;
}

int getRntbdPendingRequestQueueSize() {
return this.rntbdChannelTaskQueueSize;
}

void setRntbdPendingRequestQueueSize(int rntbdPendingRequestQueueSize) {
this.rntbdPendingRequestQueueSize = rntbdPendingRequestQueueSize;
}

List<String> getReplicaStatusList() {
return this.replicaStatusList;
}
Expand All @@ -568,6 +556,20 @@ public List<String> getReplicaStatusList(CosmosException cosmosException) {
return cosmosException.getReplicaStatusList();
}

@Override
public CosmosException setRntbdChannelStatistics(
CosmosException cosmosException,
RntbdChannelStatistics rntbdChannelStatistics) {

cosmosException.setRntbdChannelStatistics(rntbdChannelStatistics);
return cosmosException;
}

@Override
public RntbdChannelStatistics getRntbdChannelStatistics(CosmosException cosmosException) {
return cosmosException.getRntbdChannelStatistics();
}

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos;

import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import io.netty.channel.ChannelOption;

Expand All @@ -23,7 +24,7 @@ public final class DirectConnectionConfig {
private static final Duration DEFAULT_IDLE_ENDPOINT_TIMEOUT = Duration.ofHours(1l);
private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(5L);
private static final Duration DEFAULT_NETWORK_REQUEST_TIMEOUT = Duration.ofSeconds(5L);
private static final Duration MIN_NETWORK_REQUEST_TIMEOUT = Duration.ofSeconds(5L);
private static final Duration MIN_NETWORK_REQUEST_TIMEOUT = Duration.ofSeconds(1L);
private static final Duration MAX_NETWORK_REQUEST_TIMEOUT = Duration.ofSeconds(10L);
private static final int DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT = 130;
private static final int DEFAULT_MAX_REQUESTS_PER_CONNECTION = 30;
Expand All @@ -39,6 +40,7 @@ public final class DirectConnectionConfig {
private int maxRequestsPerConnection;
private int ioThreadCountPerCoreFactor;
private int ioThreadPriority;
private boolean healthCheckTimeoutDetectionEnabled;

/**
* Constructor
Expand All @@ -53,6 +55,7 @@ public DirectConnectionConfig() {
this.networkRequestTimeout = DEFAULT_NETWORK_REQUEST_TIMEOUT;
this.ioThreadCountPerCoreFactor = DEFAULT_IO_THREAD_COUNT_PER_CORE_FACTOR;
this.ioThreadPriority = DEFAULT_IO_THREAD_PRIORITY;
this.healthCheckTimeoutDetectionEnabled = Configs.isTcpHealthCheckTimeoutDetectionEnabled();
}

/**
Expand Down Expand Up @@ -258,7 +261,7 @@ public Duration getNetworkRequestTimeout() {
* Sets the network request timeout interval (time to wait for response from network peer).
*
* Default value is 5 seconds.
* It only allows values &ge;5s and &le;10s. (backend allows requests to take up-to 5 seconds processing time - 5 seconds
* It only allows values &ge;1s and &le;10s. (backend allows requests to take up-to 5 seconds processing time - 5 seconds
* buffer so 10 seconds in total for transport is more than sufficient).
*
* Attention! Please adjust this value with caution.
Expand Down Expand Up @@ -298,6 +301,15 @@ DirectConnectionConfig setIoThreadPriority(int ioThreadPriority) {
return this;
}

DirectConnectionConfig setHealthCheckTimeoutDetectionEnabled(boolean timeoutDetectionEnabled) {
this.healthCheckTimeoutDetectionEnabled = timeoutDetectionEnabled;
return this;
}

boolean isHealthCheckTimeoutDetectionEnabled() {
return this.healthCheckTimeoutDetectionEnabled;
}

@Override
public String toString() {
return "DirectConnectionConfig{" +
Expand All @@ -309,6 +321,7 @@ public String toString() {
", networkRequestTimeout=" + networkRequestTimeout +
", ioThreadCountPerCoreFactor=" + ioThreadCountPerCoreFactor +
", ioThreadPriority=" + ioThreadPriority +
", tcpHealthCheckTimeoutDetectionEnabled=" + healthCheckTimeoutDetectionEnabled +
'}';
}

Expand Down Expand Up @@ -339,6 +352,19 @@ public DirectConnectionConfig setIoThreadPriority(DirectConnectionConfig config,
int ioThreadPriority) {
return config.setIoThreadPriority(ioThreadPriority);
}

@Override
public DirectConnectionConfig setHealthCheckTimeoutDetectionEnabled(
DirectConnectionConfig directConnectionConfig, boolean timeoutDetectionEnabled) {

directConnectionConfig.setHealthCheckTimeoutDetectionEnabled(timeoutDetectionEnabled);
return directConnectionConfig;
}

@Override
public boolean isHealthCheckTimeoutDetectionEnabled(DirectConnectionConfig directConnectionConfig) {
return directConnectionConfig.isHealthCheckTimeoutDetectionEnabled();
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public class Configs {
private static final String REPLICA_ADDRESS_VALIDATION_ENABLED = "COSMOS.REPLICA_ADDRESS_VALIDATION_ENABLED";
private static final boolean DEFAULT_REPLICA_ADDRESS_VALIDATION_ENABLED = true;

// Rntbd health check related config
private static final String TCP_HEALTH_CHECK_TIMEOUT_DETECTION_ENABLED = "COSMOS.TCP_HEALTH_CHECK_TIMEOUT_DETECTION_ENABLED";
private static final boolean DEFAULT_TCP_HEALTH_CHECK_TIMEOUT_DETECTION_ENABLED = true;

public Configs() {
this.sslContext = sslContextInit();
}
Expand Down Expand Up @@ -314,4 +318,10 @@ public static boolean isReplicaAddressValidationEnabled() {
REPLICA_ADDRESS_VALIDATION_ENABLED,
DEFAULT_REPLICA_ADDRESS_VALIDATION_ENABLED);
}

public static boolean isTcpHealthCheckTimeoutDetectionEnabled() {
return getJVMConfigAsBoolean(
TCP_HEALTH_CHECK_TIMEOUT_DETECTION_ENABLED,
DEFAULT_TCP_HEALTH_CHECK_TIMEOUT_DETECTION_ENABLED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class ConnectionPolicy {
private boolean tcpConnectionEndpointRediscoveryEnabled;
private int ioThreadCountPerCoreFactor;
private int ioThreadPriority;
private boolean tcpHealthCheckTimeoutDetectionEnabled;

/**
* Constructor.
Expand All @@ -62,7 +63,10 @@ public ConnectionPolicy(GatewayConnectionConfig gatewayConnectionConfig) {
this(ConnectionMode.GATEWAY, DirectConnectionConfig.getDefaultConfig(), gatewayConnectionConfig);
}

private ConnectionPolicy(ConnectionMode connectionMode, DirectConnectionConfig directConnectionConfig, GatewayConnectionConfig gatewayConnectionConfig) {
private ConnectionPolicy(
ConnectionMode connectionMode,
DirectConnectionConfig directConnectionConfig,
GatewayConnectionConfig gatewayConnectionConfig) {
this();
this.connectionMode = connectionMode;
this.connectTimeout = directConnectionConfig.getConnectTimeout();
Expand All @@ -84,6 +88,11 @@ private ConnectionPolicy(ConnectionMode connectionMode, DirectConnectionConfig d
this.maxConnectionPoolSize = gatewayConnectionConfig.getMaxConnectionPoolSize();
this.httpNetworkRequestTimeout = BridgeInternal.getNetworkRequestTimeoutFromGatewayConnectionConfig(gatewayConnectionConfig);
this.proxy = gatewayConnectionConfig.getProxy();
this.tcpHealthCheckTimeoutDetectionEnabled =
ImplementationBridgeHelpers
.DirectConnectionConfigHelper
.getDirectConnectionConfigAccessor()
.isHealthCheckTimeoutDetectionEnabled(directConnectionConfig);
}

private ConnectionPolicy() {
Expand All @@ -94,6 +103,7 @@ private ConnectionPolicy() {
this.throttlingRetryOptions = new ThrottlingRetryOptions();
this.userAgentSuffix = "";
this.ioThreadPriority = Thread.NORM_PRIORITY;
this.tcpHealthCheckTimeoutDetectionEnabled = true;
}

/**
Expand Down Expand Up @@ -550,6 +560,10 @@ public ConnectionPolicy setMaxRequestsPerConnection(int maxRequestsPerConnection

public int getIoThreadPriority() { return this.ioThreadPriority; }

public boolean isTcpHealthCheckTimeoutDetectionEnabled() {
return this.tcpHealthCheckTimeoutDetectionEnabled;
}

public ConnectionPolicy setIoThreadCountPerCoreFactor(int ioThreadCountPerCoreFactor) {
this.ioThreadCountPerCoreFactor = ioThreadCountPerCoreFactor;
return this;
Expand Down Expand Up @@ -582,6 +596,9 @@ public String toString() {
", maxConnectionsPerEndpoint=" + maxConnectionsPerEndpoint +
", maxRequestsPerConnection=" + maxRequestsPerConnection +
", tcpConnectionEndpointRediscoveryEnabled=" + tcpConnectionEndpointRediscoveryEnabled +
", ioThreadPriority=" + ioThreadPriority +
", ioThreadCountPerCoreFactor=" + ioThreadCountPerCoreFactor +
", tcpHealthCheckTimeoutDetectionEnabled=" + tcpHealthCheckTimeoutDetectionEnabled +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.azure.cosmos.implementation.batch.ItemBatchOperation;
import com.azure.cosmos.implementation.batch.PartitionScopeThresholds;
import com.azure.cosmos.implementation.clienttelemetry.TagName;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelStatistics;
import com.azure.cosmos.implementation.patch.PatchOperation;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple;
Expand Down Expand Up @@ -192,6 +193,9 @@ DirectConnectionConfig setIoThreadCountPerCoreFactor(
int getIoThreadPriority(DirectConnectionConfig config);
DirectConnectionConfig setIoThreadPriority(
DirectConnectionConfig config, int ioThreadPriority);
DirectConnectionConfig setHealthCheckTimeoutDetectionEnabled(
DirectConnectionConfig directConnectionConfig, boolean timeoutDetectionEnabled);
boolean isHealthCheckTimeoutDetectionEnabled(DirectConnectionConfig directConnectionConfig);
}
}

Expand Down Expand Up @@ -1081,6 +1085,8 @@ public static void setCosmosExceptionAccessor(final CosmosExceptionAccessor newA
public interface CosmosExceptionAccessor {
CosmosException createCosmosException(int statusCode, Exception innerException);
List<String> getReplicaStatusList(CosmosException cosmosException);
CosmosException setRntbdChannelStatistics(CosmosException cosmosException, RntbdChannelStatistics rntbdChannelStatistics);
RntbdChannelStatistics getRntbdChannelStatistics(CosmosException cosmosException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public CpuLoadHistory(List<CpuLoad> cpuLoad, Duration monitoringInterval) {

public boolean isCpuOverloaded() {
if (cpuOverload.get() == null) {
cpuOverload.set(isCpuOverloadInternal());
cpuOverload.set(isCpuOverloadedInternal());
}

return cpuOverload.get();
Expand All @@ -55,13 +55,25 @@ Instant getLastTimestamp() {
return this.cpuLoad.get(this.cpuLoad.size() - 1).timestamp;
}

private boolean isCpuOverloadInternal() {
private boolean isCpuOverloadedInternal() {
if (isCpuOverThreshold(90.0)) {
return true;
}

return delayInThreadScheduling();
}

public boolean isCpuOverThreshold(double cpuThreshold) {
for (int index = 0; index < this.cpuLoad.size(); ++index) {
if ((double) this.cpuLoad.get(index).value > 90.0) {
if ((double) this.cpuLoad.get(index).value > cpuThreshold) {
return true;
}
}

return false;
}

private boolean delayInThreadScheduling() {
// This signal is fragile, because the timestamps come from
// a non-monotonic clock that might have gotten adjusted by
// e.g. NTP.
Expand Down
Loading

0 comments on commit cd4e903

Please sign in to comment.