diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConnectionStateListenerTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConnectionStateListenerTest.java index fe329253fe969..d41b688ded2ef 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConnectionStateListenerTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConnectionStateListenerTest.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation.directconnectivity; +import com.azure.cosmos.ConnectionMode; import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ConnectionPolicy; @@ -13,6 +14,8 @@ import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.UserAgentContainer; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetryInfo; import com.azure.cosmos.implementation.directconnectivity.TcpServerMock.RequestResponseType; import com.azure.cosmos.implementation.directconnectivity.TcpServerMock.SslContextUtils; import com.azure.cosmos.implementation.directconnectivity.TcpServerMock.TcpServer; @@ -33,6 +36,7 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Random; @@ -93,12 +97,27 @@ public void connectionStateListener_OnConnectionEvent( Configs config = Mockito.mock(Configs.class); Mockito.doReturn(sslContext).when(config).getSslContext(false, false); + ClientTelemetry clientTelemetry = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfo = new ClientTelemetryInfo( + "testMachine", + "testClient", + "testProcess", + "testApp", + ConnectionMode.DIRECT, + "test-cdb-account", + "Test Region 1", + "Linux", + false, + Arrays.asList("Test Region 1", "Test Region 2")); + + Mockito.when(clientTelemetry.getClientTelemetryInfo()).thenReturn(clientTelemetryInfo); + RntbdTransportClient client = new RntbdTransportClient( config, connectionPolicy, new UserAgentContainer(), addressResolver, - null, + clientTelemetry, null); RxDocumentServiceRequest req = diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java index 03d295a8a662f..7d25deb336625 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java @@ -37,6 +37,7 @@ import com.azure.cosmos.implementation.UserAgentContainer; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.NotImplementedException; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; import com.azure.cosmos.implementation.clienttelemetry.TagName; import com.azure.cosmos.implementation.directconnectivity.rntbd.AsyncRntbdRequestRecord; import com.azure.cosmos.implementation.directconnectivity.rntbd.OpenConnectionRntbdRequestRecord; @@ -58,7 +59,6 @@ import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdResponseDecoder; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUUID; -import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUtils; import com.azure.cosmos.implementation.guava25.base.Strings; import com.azure.cosmos.implementation.guava25.collect.ImmutableMap; import io.micrometer.core.instrument.Tag; @@ -984,7 +984,7 @@ private static final class FakeEndpoint implements RntbdEndpoint { private final RntbdDurableEndpointMetrics durableEndpointMetrics; private FakeEndpoint( - final Config config, final RntbdRequestTimer timer, final Uri addressUri, + final Config config, final ClientTelemetry clientTelemetry, RntbdRequestTimer timer, final Uri addressUri, final RntbdResponse... expected ) { @@ -1013,7 +1013,7 @@ private FakeEndpoint( ); RntbdRequestManager requestManager = new RntbdRequestManager( - new RntbdClientChannelHealthChecker(config), + new RntbdClientChannelHealthChecker(config, clientTelemetry), config, null, null); @@ -1183,12 +1183,14 @@ public OpenConnectionRntbdRequestRecord openConnection(RntbdRequestArgs openConn static class Provider implements RntbdEndpoint.Provider { final Config config; + final ClientTelemetry clientTelemetry; final RntbdResponse expected; final RntbdRequestTimer timer; final IAddressResolver addressResolver; Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected, IAddressResolver addressResolver) { this.config = new Config(options, sslContext, LogLevel.WARN); + this.clientTelemetry = new ClientTelemetry(mockDiagnosticsClientContext(), false, null, null, null, null, null, null, null, null, null, null); this.timer = new RntbdRequestTimer( config.tcpNetworkRequestTimeoutInNanos(), config.requestTimerResolutionInNanos()); @@ -1218,12 +1220,12 @@ public int evictions() { @Override public RntbdEndpoint createIfAbsent(URI serviceEndpoint, Uri addressUri, ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor, int minRequiredChannelsForEndpoint, AddressSelector addressSelector) { - return new FakeEndpoint(config, timer, addressUri, expected); + return new FakeEndpoint(config, clientTelemetry, timer, addressUri, expected); } @Override public RntbdEndpoint get(URI physicalAddress) { - return new FakeEndpoint(config, timer, new Uri(physicalAddress.toString()), expected); + return new FakeEndpoint(config, clientTelemetry, timer, new Uri(physicalAddress.toString()), expected); } @Override diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java index 973a1a9b6c197..f4795bc8b9328 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java @@ -4,6 +4,8 @@ package com.azure.cosmos.implementation.directconnectivity.rntbd; import com.azure.cosmos.implementation.ConnectionPolicy; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetryInfo; import com.azure.cosmos.implementation.cpu.CpuLoadHistory; import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor; import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient; @@ -48,7 +50,12 @@ public void isHealthyForWriteHangTests(boolean withFailureReason) throws Interru sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -73,7 +80,8 @@ public void isHealthyForWriteHangTests(boolean withFailureReason) throws Interru Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); - assertThat(healthyResult.getNow().contains("health check failed due to non-responding write")); + assertThat(healthyResult.getNow().contains("health check failed due to non-responding write")).isTrue(); + assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")).isTrue(); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -90,7 +98,12 @@ public void isHealthyForReadHangTests(boolean withFailureReason) throws Interrup sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -113,8 +126,12 @@ public void isHealthyForReadHangTests(boolean withFailureReason) throws Interrup if (withFailureReason) { Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); - assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); - assertThat(healthyResult.getNow().contains("health check failed due to non-responding read")); + + String message = healthyResult.getNow(); + assertThat(message).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); + assertThat(message.contains("health check failed due to non-responding read")).isTrue(); + assertThat(message.contains("clientVmId: testClientVmId")).isTrue(); + validateSystemDiagnostics(message); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -131,7 +148,12 @@ public void transitTimeoutTimeLimitTests(boolean withFailureReason) throws Inter sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -174,7 +196,12 @@ public void transitTimeoutHighFrequencyTests(boolean withFailureReason) throws I sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -200,8 +227,12 @@ public void transitTimeoutHighFrequencyTests(boolean withFailureReason) throws I if (withFailureReason) { Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); - assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); - assertThat(healthyResult.getNow().contains("health check failed due to transit timeout high frequency threshold hit")); + + String message = healthyResult.getNow(); + assertThat(message).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); + assertThat(message.contains("health check failed due to transit timeout high frequency threshold hit")).isTrue(); + assertThat(message.contains("clientVmId: testClientVmId")).isTrue(); + validateSystemDiagnostics(message); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -218,7 +249,12 @@ public void transitTimeoutOnWriteTests(boolean withFailureReason) throws Interru sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -245,8 +281,12 @@ public void transitTimeoutOnWriteTests(boolean withFailureReason) throws Interru if (withFailureReason) { Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); - assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); - assertThat(healthyResult.getNow()).contains("health check failed due to transit timeout on write threshold hit"); + + String message = healthyResult.getNow(); + assertThat(message).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); + assertThat(message).contains("health check failed due to transit timeout on write threshold hit"); + assertThat(message).contains("clientVmId: testClientVmId"); + validateSystemDiagnostics(message); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -264,7 +304,12 @@ public void transitTimeoutOnWrite_HighCPULoadTests(boolean withFailureReason) th sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -303,6 +348,7 @@ public void transitTimeoutOnWrite_HighCPULoadTests(boolean withFailureReason) th assertThat(healthyResult.getNow()).isEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); // Verify under high CPU load, the transitTimeout will be reset Mockito.verify(timestampsMock, Mockito.times(1)).resetTransitTimeout(); + } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -322,7 +368,12 @@ public void cancellationPronenessOfChannel_Test(boolean withFailureReason) throw sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -375,7 +426,12 @@ public void cancellationPronenessOfChannelWithHighCpuLoad_Test(boolean withFailu sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -409,7 +465,8 @@ public void cancellationPronenessOfChannelWithHighCpuLoad_Test(boolean withFailu if (withFailureReason) { Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); - assertThat(healthyResult.getNow()).isEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); + String message = healthyResult.getNow(); + assertThat(message).isEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); // Verify under high CPU load, the cancellationCount will be reset Mockito.verify(timestampsMock, Mockito.times(1)).resetCancellationCount(); } else { @@ -422,4 +479,10 @@ public void cancellationPronenessOfChannelWithHighCpuLoad_Test(boolean withFailu } } + private void validateSystemDiagnostics(String string) { + assertThat(string.contains("clientUsedMemory")).isTrue(); + assertThat(string.contains("clientAvailableMemory")).isTrue(); + assertThat(string.contains("clientSystemCpuLoad")).isTrue(); + assertThat(string.contains("clientAvailableProcessors")).isTrue(); + } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManagerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManagerTests.java index ef4662420145e..64908b8636608 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManagerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManagerTests.java @@ -7,6 +7,8 @@ import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetryInfo; import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient; import com.azure.cosmos.implementation.directconnectivity.Uri; import io.netty.channel.Channel; @@ -40,7 +42,13 @@ public void transitTimeoutTimestampTests() throws URISyntaxException { new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(), sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); RntbdConnectionStateListener connectionStateListener = Mockito.mock(RntbdConnectionStateListener.class); @@ -119,7 +127,13 @@ public void rntbdContextResponseTests() { new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(), sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); RntbdConnectionStateListener connectionStateListener = Mockito.mock(RntbdConnectionStateListener.class); diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 76f7e177166c4..735f9ec3967ad 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed #### Other Changes +* Added client vmId info to Rntbd health check logs - See [43079](https://github.com/Azure/azure-sdk-for-java/pull/43079) * Added support to enable http2 for gateway mode with system property `COSMOS.HTTP2_ENABLED` and system variable `COSMOS_HTTP2_ENABLED`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947) * Added support to allow changing http2 max connection pool size with system property `COSMOS.HTTP2_MAX_CONNECTION_POOL_SIZE` and system variable `COSMOS_HTTP2_MAX_CONNECTION_POOL_SIZE`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947) * Added support to allow changing http2 max connection pool size with system property `COSMOS.HTTP2_MIN_CONNECTION_POOL_SIZE` and system variable `COSMOS_HTTP2_MIN_CONNECTION_POOL_SIZE`. - [PR 42947](https://github.com/Azure/azure-sdk-for-java/pull/42947) @@ -22,7 +23,7 @@ *`cosmos.client.req.rntbd.bulkOpRetriedCountPerEvaluation` *`cosmos.client.req.rntbd.bulkGlobalOpCount` *`cosmos.client.req.rntbd.bulkTargetMaxMicroBatchSize` - + ### 4.65.0 (2024-11-19) #### Features Added diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index 6e890534adb1c..e628fa64894bd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -3,7 +3,10 @@ package com.azure.cosmos.implementation.directconnectivity.rntbd; +import com.azure.cosmos.implementation.ClientSideRequestStatistics; +import com.azure.cosmos.implementation.CosmosDiagnosticsSystemUsageSnapshot; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint.Config; import com.fasterxml.jackson.annotation.JsonProperty; @@ -33,6 +36,13 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelHealthChecker.class); + // We need to keep a reference to client telemetry to reference the VM ID of the SDK + // client instance. We have this property in client diagnostics, but it can only be + // obtained if an operation fails or succeeds. If an operation hangs, the diagnostics + // will not be available. Floating this value to the health check logs will give us access + // to the client VM ID in scenarios where the operation hangs. + private final ClientTelemetry clientTelemetry; + // A channel will be declared healthy if a read succeeded recently as defined by this value. private static final long recentReadWindowInNanos = 1_000_000_000L; @@ -80,7 +90,7 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck // region Constructors - public RntbdClientChannelHealthChecker(final Config config) { + public RntbdClientChannelHealthChecker(final Config config, final ClientTelemetry clientTelemetry) { checkNotNull(config, "expected non-null config"); @@ -104,6 +114,7 @@ public RntbdClientChannelHealthChecker(final Config config) { this.timeoutOnWriteTimeLimitInNanos = config.timeoutDetectionOnWriteTimeLimitInNanos(); this.nonRespondingChannelReadDelayTimeLimitInNanos = config.nonRespondingChannelReadDelayTimeLimitInNanos(); this.cancellationCountSinceLastReadThreshold = config.cancellationCountSinceLastReadThreshold(); + this.clientTelemetry = clientTelemetry; } // endregion @@ -263,17 +274,21 @@ private String isWriteHang(Timestamps timestamps, Instant currentTime, RntbdRequ final Optional rntbdContext = requestManager.rntbdContext(); final int pendingRequestCount = requestManager.pendingRequestCount(); + CosmosDiagnosticsSystemUsageSnapshot systemInfo = ClientSideRequestStatistics.fetchSystemInformation(); + writeHangMessage = MessageFormat.format( "{0} health check failed due to non-responding write: [lastChannelWriteAttemptTime: {1}, " + "lastChannelWriteTime: {2}, writeDelayInNanos: {3}, writeDelayLimitInNanos: {4}, " + - "rntbdContext: {5}, pendingRequestCount: {6}]", + "rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}, {8}]", channel, timestamps.lastChannelWriteAttemptTime(), timestamps.lastChannelWriteTime(), writeDelayInNanos, this.writeDelayLimitInNanos, rntbdContext, - pendingRequestCount); + pendingRequestCount, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(writeHangMessage); } @@ -298,14 +313,16 @@ private String isReadHang(Timestamps timestamps, Instant currentTime, RntbdReque readHangMessage = MessageFormat.format( "{0} health check failed due to non-responding read: [lastChannelWrite: {1}, lastChannelRead: {2}, " - + "readDelay: {3}, readDelayLimit: {4}, rntbdContext: {5}, pendingRequestCount: {6}]", + + "readDelay: {3}, readDelayLimit: {4}, rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}, {8}]", channel, timestamps.lastChannelWriteTime(), timestamps.lastChannelReadTime(), readDelay, this.readDelayLimitInNanos, rntbdContext, - pendingRequestCount); + pendingRequestCount, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(readHangMessage); @@ -337,11 +354,13 @@ private String transitTimeoutValidation(Timestamps timestamps, Instant currentTi if (readDelay >= this.timeoutTimeLimitInNanos) { transitTimeoutValidationMessage = MessageFormat.format( "{0} health check failed due to transit timeout detection time limit: [rntbdContext: {1}," - + "lastChannelRead: {2}, timeoutTimeLimitInNanos: {3}]", + + "lastChannelRead: {2}, timeoutTimeLimitInNanos: {3}, clientVmId: {4}, {5}]", channel, rntbdContext, timestamps.lastReadTime, - this.timeoutTimeLimitInNanos); + this.timeoutTimeLimitInNanos, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(transitTimeoutValidationMessage); return transitTimeoutValidationMessage; @@ -353,13 +372,16 @@ private String transitTimeoutValidation(Timestamps timestamps, Instant currentTi && readDelay >= this.timeoutHighFrequencyTimeLimitInNanos) { transitTimeoutValidationMessage = MessageFormat.format( "{0} health check failed due to transit timeout high frequency threshold hit: [rntbdContext: {1}," - + "lastChannelRead: {2}, transitTimeoutCount: {3}, timeoutHighFrequencyThreshold: {4}, timeoutHighFrequencyTimeLimitInNanos: {5}]", + + "lastChannelRead: {2}, transitTimeoutCount: {3}, timeoutHighFrequencyThreshold: {4}, timeoutHighFrequencyTimeLimitInNanos: {5}" + + "clientVmId: {6}, {7}]", channel, rntbdContext, timestamps.lastReadTime, timestamps.transitTimeoutCount, this.timeoutHighFrequencyThreshold, - this.timeoutHighFrequencyTimeLimitInNanos); + this.timeoutHighFrequencyTimeLimitInNanos, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(transitTimeoutValidationMessage); return transitTimeoutValidationMessage; @@ -371,13 +393,16 @@ private String transitTimeoutValidation(Timestamps timestamps, Instant currentTi && readDelay >= this.timeoutOnWriteTimeLimitInNanos) { transitTimeoutValidationMessage = MessageFormat.format( "{0} health check failed due to transit timeout on write threshold hit: [rntbdContext: {1}," - + "lastChannelRead: {2}, transitTimeoutWriteCount: {3}, timeoutOnWriteThreshold: {4}, timeoutOnWriteTimeLimitInNanos: {5}]", + + "lastChannelRead: {2}, transitTimeoutWriteCount: {3}, timeoutOnWriteThreshold: {4}, timeoutOnWriteTimeLimitInNanos: {5}]" + + "clientVmId: {6}, {7}]", channel, rntbdContext, timestamps.lastReadTime, timestamps.transitTimeoutWriteCount, this.timeoutOnWriteThreshold, - this.timeoutOnWriteTimeLimitInNanos); + this.timeoutOnWriteTimeLimitInNanos, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(transitTimeoutValidationMessage); return transitTimeoutValidationMessage; @@ -394,12 +419,14 @@ private String idleConnectionValidation(Timestamps timestamps, Instant currentTi if (Duration.between(timestamps.lastChannelReadTime(), currentTime).toNanos() > this.idleConnectionTimeoutInNanos) { errorMessage = MessageFormat.format( "{0} health check failed due to idle connection timeout: [lastChannelWrite: {1}, lastChannelRead: {2}, " - + "idleConnectionTimeout: {3}, currentTime: {4}]", + + "idleConnectionTimeout: {3}, currentTime: {4}, clientVmId: {5}, {6}]", channel, timestamps.lastChannelWriteTime(), timestamps.lastChannelReadTime(), idleConnectionTimeoutInNanos, - currentTime); + currentTime, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(errorMessage); } @@ -431,13 +458,15 @@ private String isCancellationProneChannel(Timestamps timestamps, Instant current errorMessage = MessageFormat.format( "{0} health check failed due to channel being cancellation prone: [rntbdContext: {1}, lastChannelWrite: {2}, lastChannelRead: {3}," - + "cancellationCountSinceLastSuccessfulRead: {4}, currentTime: {5}]", + + "cancellationCountSinceLastSuccessfulRead: {4}, currentTime: {5}, clientVmId: {6}, {7}]", channel, rntbdContext, timestamps.lastChannelWriteTime(), timestamps.lastChannelReadTime(), timestamps.cancellationCount(), - currentTime); + currentTime, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(errorMessage); return errorMessage; @@ -447,6 +476,15 @@ private String isCancellationProneChannel(Timestamps timestamps, Instant current return errorMessage; } + private String getSystemDiagnostics() { + CosmosDiagnosticsSystemUsageSnapshot systemInfo = ClientSideRequestStatistics.fetchSystemInformation(); + return MessageFormat.format("clientUsedMemory: {0}, clientAvailableMemory: {1}, clientSystemCpuLoad: {2}, clientAvailableProcessors: {3}", + systemInfo.getUsedMemory(), + systemInfo.getAvailableMemory(), + systemInfo.getSystemCpuLoad(), + systemInfo.getAvailableProcessors()); + } + @Override public String toString() { return RntbdObjectMapper.toString(this); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java index f0bd021b11eed..b6e2976a1afc7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java @@ -212,35 +212,15 @@ public final class RntbdClientChannelPool implements ChannelPool { final Config config, final ClientTelemetry clientTelemetry, final RntbdConnectionStateListener connectionStateListener, - final RntbdServerErrorInjector faultInjectionInterceptors, - final RntbdDurableEndpointMetrics durableEndpointMetrics) { - this( - endpoint, - bootstrap, - config, - new RntbdClientChannelHealthChecker(config), - clientTelemetry, - connectionStateListener, - faultInjectionInterceptors, - durableEndpointMetrics); - } - - private RntbdClientChannelPool( - final RntbdServiceEndpoint endpoint, - final Bootstrap bootstrap, - final Config config, - final RntbdClientChannelHealthChecker healthChecker, - final ClientTelemetry clientTelemetry, - final RntbdConnectionStateListener connectionStateListener, final RntbdServerErrorInjector serverErrorInjector, final RntbdDurableEndpointMetrics durableEndpointMetrics) { checkNotNull(endpoint, "expected non-null endpoint"); checkNotNull(bootstrap, "expected non-null bootstrap"); checkNotNull(config, "expected non-null config"); - checkNotNull(healthChecker, "expected non-null healthChecker"); checkNotNull(durableEndpointMetrics, "expected non-null durableEndpointMetrics"); + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetry); this.poolHandler = new RntbdClientChannelHandler(config, healthChecker, connectionStateListener, serverErrorInjector); this.executor = bootstrap.config().group().next(); this.healthChecker = healthChecker; @@ -1204,7 +1184,6 @@ private void notifyChannelConnect(final ChannelFuture future, final Promise