Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add vmId to the Rntbd Health Check Error Message #43079

Merged
merged 13 commits into from
Jan 10, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.NotImplementedException;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.TagName;
import com.azure.cosmos.implementation.directconnectivity.rntbd.AsyncRntbdRequestRecord;
import com.azure.cosmos.implementation.directconnectivity.rntbd.OpenConnectionRntbdRequestRecord;
Expand All @@ -58,7 +59,6 @@
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdResponseDecoder;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUUID;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUtils;
import com.azure.cosmos.implementation.guava25.base.Strings;
import com.azure.cosmos.implementation.guava25.collect.ImmutableMap;
import io.micrometer.core.instrument.Tag;
Expand Down Expand Up @@ -984,7 +984,7 @@ private static final class FakeEndpoint implements RntbdEndpoint {
private final RntbdDurableEndpointMetrics durableEndpointMetrics;

private FakeEndpoint(
final Config config, final RntbdRequestTimer timer, final Uri addressUri,
final Config config, final ClientTelemetry clientTelemetry, RntbdRequestTimer timer, final Uri addressUri,
final RntbdResponse... expected
) {

Expand Down Expand Up @@ -1013,7 +1013,7 @@ private FakeEndpoint(
);

RntbdRequestManager requestManager = new RntbdRequestManager(
new RntbdClientChannelHealthChecker(config),
new RntbdClientChannelHealthChecker(config, clientTelemetry),
config,
null,
null);
Expand Down Expand Up @@ -1183,12 +1183,14 @@ public OpenConnectionRntbdRequestRecord openConnection(RntbdRequestArgs openConn
static class Provider implements RntbdEndpoint.Provider {

final Config config;
final ClientTelemetry clientTelemetry;
final RntbdResponse expected;
final RntbdRequestTimer timer;
final IAddressResolver addressResolver;

Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected, IAddressResolver addressResolver) {
this.config = new Config(options, sslContext, LogLevel.WARN);
this.clientTelemetry = new ClientTelemetry(mockDiagnosticsClientContext(), false, null, null, null, null, null, null, null, null, null, null);
this.timer = new RntbdRequestTimer(
config.tcpNetworkRequestTimeoutInNanos(),
config.requestTimerResolutionInNanos());
Expand Down Expand Up @@ -1218,12 +1220,12 @@ public int evictions() {

@Override
public RntbdEndpoint createIfAbsent(URI serviceEndpoint, Uri addressUri, ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor, int minRequiredChannelsForEndpoint, AddressSelector addressSelector) {
return new FakeEndpoint(config, timer, addressUri, expected);
return new FakeEndpoint(config, clientTelemetry, timer, addressUri, expected);
}

@Override
public RntbdEndpoint get(URI physicalAddress) {
return new FakeEndpoint(config, timer, new Uri(physicalAddress.toString()), expected);
return new FakeEndpoint(config, clientTelemetry, timer, new Uri(physicalAddress.toString()), expected);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetryInfo;
import com.azure.cosmos.implementation.cpu.CpuLoadHistory;
import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor;
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
Expand Down Expand Up @@ -48,7 +50,12 @@ public void isHealthyForWriteHangTests(boolean withFailureReason) throws Interru
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand All @@ -73,7 +80,12 @@ public void isHealthyForWriteHangTests(boolean withFailureReason) throws Interru
Future<String> healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue);
assertThat(healthyResult.getNow().contains("health check failed due to non-responding write"));
assertThat(healthyResult.getNow().contains("health check failed due to non-responding write")).isTrue();
assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")).isTrue();
assertThat(healthyResult.getNow().contains("clientUsedMemory")).isTrue();
assertThat(healthyResult.getNow().contains("clientAvailableMemory")).isTrue();
assertThat(healthyResult.getNow().contains("clientSystemCpuLoad")).isTrue();
assertThat(healthyResult.getNow().contains("clientAvailableProcessors")).isTrue();
} else {
Future<Boolean> healthyResult = healthChecker.isHealthy(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
Expand All @@ -90,7 +102,12 @@ public void isHealthyForReadHangTests(boolean withFailureReason) throws Interrup
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand All @@ -114,7 +131,12 @@ public void isHealthyForReadHangTests(boolean withFailureReason) throws Interrup
Future<String> healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue);
assertThat(healthyResult.getNow().contains("health check failed due to non-responding read"));
assertThat(healthyResult.getNow().contains("health check failed due to non-responding read")).isTrue();
assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")).isTrue();
assertThat(healthyResult.getNow().contains("clientUsedMemory")).isTrue();
assertThat(healthyResult.getNow().contains("clientAvailableMemory")).isTrue();
assertThat(healthyResult.getNow().contains("clientSystemCpuLoad")).isTrue();
assertThat(healthyResult.getNow().contains("clientAvailableProcessors")).isTrue();
} else {
Future<Boolean> healthyResult = healthChecker.isHealthy(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
Expand All @@ -131,7 +153,12 @@ public void transitTimeoutTimeLimitTests(boolean withFailureReason) throws Inter
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down Expand Up @@ -174,7 +201,12 @@ public void transitTimeoutHighFrequencyTests(boolean withFailureReason) throws I
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down Expand Up @@ -218,7 +250,12 @@ public void transitTimeoutOnWriteTests(boolean withFailureReason) throws Interru
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down Expand Up @@ -264,7 +301,12 @@ public void transitTimeoutOnWrite_HighCPULoadTests(boolean withFailureReason) th
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down Expand Up @@ -322,7 +364,12 @@ public void cancellationPronenessOfChannel_Test(boolean withFailureReason) throw
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down Expand Up @@ -375,7 +422,12 @@ public void cancellationPronenessOfChannelWithHighCpuLoad_Test(boolean withFailu
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetryInfo;
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -40,7 +42,13 @@ public void transitTimeoutTimestampTests() throws URISyntaxException {
new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(),
sslContextMock,
LogLevel.INFO);
RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);

ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);

RntbdConnectionStateListener connectionStateListener = Mockito.mock(RntbdConnectionStateListener.class);

Expand Down Expand Up @@ -119,7 +127,13 @@ public void rntbdContextResponseTests() {
new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(),
sslContextMock,
LogLevel.INFO);
RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);

ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);

RntbdConnectionStateListener connectionStateListener = Mockito.mock(RntbdConnectionStateListener.class);

Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
* Added support to allow changing http2 max connection pool size with system property `COSMOS.HTTP2_MAX_CONNECTION_POOL_SIZE` and system variable `COSMOS_HTTP2_MAX_CONNECTION_POOL_SIZE`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947)
* Added support to allow changing http2 max connection pool size with system property `COSMOS.HTTP2_MIN_CONNECTION_POOL_SIZE` and system variable `COSMOS_HTTP2_MIN_CONNECTION_POOL_SIZE`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947)
* Added options to fine-tune settings for bulk operations. - [PR 43509](https://github.com/Azure/azure-sdk-for-java/pull/43509)

* Added client vmId info to Rntbd health check logs - See [43079](https://github.com/Azure/azure-sdk-for-java/pull/43079)

### 4.65.0 (2024-11-19)

#### Features Added
Expand Down
Loading
Loading