diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForCircuitBreakerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForCircuitBreakerTests.java index 15b3a4cfea6ac..88329c855a40b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForCircuitBreakerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForCircuitBreakerTests.java @@ -4,7 +4,6 @@ package com.azure.cosmos; import com.azure.cosmos.implementation.GlobalEndpointManager; -import com.azure.cosmos.implementation.MetadataDiagnosticsContext; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionKeyRange; import com.azure.cosmos.implementation.PointOperationContextForCircuitBreaker; @@ -168,12 +167,12 @@ public void recordHealthyToHealthyWithFailuresStatusTransition(String partitionL String maxExclusive = "BB"; String collectionResourceId = "dbs/db1/colls/coll1"; - List applicableReadWriteEndpoints = ImmutableList.of( + List applicableReadWriteEndpoints = ImmutableList.of( LocationEastUs2EndpointToLocationPair, LocationEastUsEndpointToLocationPair, LocationCentralUsEndpointToLocationPair) .stream() - .map(uriStringPair -> new LocationCache.ConsolidatedLocationEndpoints(uriStringPair.getLeft(), null)) + .map(uriStringPair -> new LocationCache.ConsolidatedRegionalEndpoint(uriStringPair.getLeft(), null)) .collect(Collectors.toList()); RxDocumentServiceRequest request = constructRxDocumentServiceRequestInstance( @@ -186,11 +185,11 @@ public void recordHealthyToHealthyWithFailuresStatusTransition(String partitionL maxExclusive, LocationEastUs2EndpointToLocationPair.getKey()); - Mockito.when(this.globalEndpointManagerMock.getReadEndpoints()).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); - Mockito.when(this.globalEndpointManagerMock.getWriteEndpoints()).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getReadEndpoints()).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getWriteEndpoints()).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); globalPartitionEndpointManagerForCircuitBreaker - .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedLocationEndpoints(LocationEastUs2EndpointToLocationPair.getKey(), null)); + .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getKey(), null)); Class[] enclosedClasses = GlobalPartitionEndpointManagerForCircuitBreaker.class.getDeclaredClasses(); Class partitionLevelUnavailabilityInfoClass @@ -236,12 +235,12 @@ public void recordHealthyWithFailuresToUnavailableStatusTransition(String partit String maxExclusive = "BB"; String collectionResourceId = "dbs/db1/colls/coll1"; - List applicableReadWriteEndpoints = ImmutableList.of( + List applicableReadWriteEndpoints = ImmutableList.of( LocationEastUs2EndpointToLocationPair, LocationEastUsEndpointToLocationPair, LocationCentralUsEndpointToLocationPair) .stream() - .map(uriStringPair -> new LocationCache.ConsolidatedLocationEndpoints(uriStringPair.getLeft(), null)) + .map(uriStringPair -> new LocationCache.ConsolidatedRegionalEndpoint(uriStringPair.getLeft(), null)) .collect(Collectors.toList()); RxDocumentServiceRequest request = constructRxDocumentServiceRequestInstance( @@ -254,15 +253,15 @@ public void recordHealthyWithFailuresToUnavailableStatusTransition(String partit maxExclusive, LocationEastUs2EndpointToLocationPair.getKey()); - Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); - Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); int exceptionCountToHandle = globalPartitionEndpointManagerForCircuitBreaker.getConsecutiveExceptionBasedCircuitBreaker().getAllowedExceptionCountToMaintainStatus(LocationHealthStatus.HealthyWithFailures, readOperationTrue); for (int i = 1; i <= exceptionCountToHandle; i++) { globalPartitionEndpointManagerForCircuitBreaker - .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedLocationEndpoints(LocationEastUs2EndpointToLocationPair.getKey(), null)); + .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getKey(), null)); } Class[] enclosedClasses = GlobalPartitionEndpointManagerForCircuitBreaker.class.getDeclaredClasses(); @@ -311,12 +310,12 @@ public void recordUnavailableToHealthyTentativeStatusTransition(String partition String maxExclusive = "BB"; String collectionResourceId = "dbs/db1/colls/coll1"; - List applicableReadWriteEndpoints = ImmutableList.of( + List applicableReadWriteEndpoints = ImmutableList.of( LocationEastUs2EndpointToLocationPair, LocationEastUsEndpointToLocationPair, LocationCentralUsEndpointToLocationPair) .stream() - .map(uriStringPair -> new LocationCache.ConsolidatedLocationEndpoints(uriStringPair.getLeft(), null)) + .map(uriStringPair -> new LocationCache.ConsolidatedRegionalEndpoint(uriStringPair.getLeft(), null)) .collect(Collectors.toList()); RxDocumentServiceRequest request = constructRxDocumentServiceRequestInstance( @@ -329,15 +328,15 @@ public void recordUnavailableToHealthyTentativeStatusTransition(String partition maxExclusive, LocationEastUs2EndpointToLocationPair.getKey()); - Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); - Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); int exceptionCountToHandle = globalPartitionEndpointManagerForCircuitBreaker.getConsecutiveExceptionBasedCircuitBreaker().getAllowedExceptionCountToMaintainStatus(LocationHealthStatus.HealthyWithFailures, readOperationTrue); for (int i = 1; i <= exceptionCountToHandle; i++) { globalPartitionEndpointManagerForCircuitBreaker - .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedLocationEndpoints(LocationEastUs2EndpointToLocationPair.getKey(), null)); + .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getKey(), null)); } Class[] enclosedClasses = GlobalPartitionEndpointManagerForCircuitBreaker.class.getDeclaredClasses(); @@ -397,12 +396,12 @@ public void recordHealthyTentativeToHealthyStatusTransition(String partitionLeve String maxExclusive = "BB"; String collectionResourceId = "dbs/db1/colls/coll1"; - List applicableReadWriteEndpoints = ImmutableList.of( + List applicableReadWriteEndpoints = ImmutableList.of( LocationEastUs2EndpointToLocationPair, LocationEastUsEndpointToLocationPair, LocationCentralUsEndpointToLocationPair) .stream() - .map(uriStringPair -> new LocationCache.ConsolidatedLocationEndpoints(uriStringPair.getLeft(), null)) + .map(uriStringPair -> new LocationCache.ConsolidatedRegionalEndpoint(uriStringPair.getLeft(), null)) .collect(Collectors.toList()); RxDocumentServiceRequest request = constructRxDocumentServiceRequestInstance( @@ -415,15 +414,15 @@ public void recordHealthyTentativeToHealthyStatusTransition(String partitionLeve maxExclusive, LocationEastUs2EndpointToLocationPair.getKey()); - Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); - Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); int exceptionCountToHandle = globalPartitionEndpointManagerForCircuitBreaker.getConsecutiveExceptionBasedCircuitBreaker().getAllowedExceptionCountToMaintainStatus(LocationHealthStatus.HealthyWithFailures, readOperationTrue); for (int i = 1; i <= exceptionCountToHandle; i++) { globalPartitionEndpointManagerForCircuitBreaker - .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedLocationEndpoints(LocationEastUs2EndpointToLocationPair.getKey(), null)); + .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getKey(), null)); } Class[] enclosedClasses = GlobalPartitionEndpointManagerForCircuitBreaker.class.getDeclaredClasses(); @@ -490,12 +489,12 @@ public void recordHealthyTentativeToUnavailableTransition(String partitionLevelC String maxExclusive = "BB"; String collectionResourceId = "dbs/db1/colls/coll1"; - List applicableReadWriteEndpoints = ImmutableList.of( + List applicableReadWriteEndpoints = ImmutableList.of( LocationEastUs2EndpointToLocationPair, LocationEastUsEndpointToLocationPair, LocationCentralUsEndpointToLocationPair) .stream() - .map(uriStringPair -> new LocationCache.ConsolidatedLocationEndpoints(uriStringPair.getLeft(), null)) + .map(uriStringPair -> new LocationCache.ConsolidatedRegionalEndpoint(uriStringPair.getLeft(), null)) .collect(Collectors.toList()); RxDocumentServiceRequest request = constructRxDocumentServiceRequestInstance( @@ -508,15 +507,15 @@ public void recordHealthyTentativeToUnavailableTransition(String partitionLevelC maxExclusive, LocationEastUs2EndpointToLocationPair.getKey()); - Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); - Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); int exceptionCountToHandle = globalPartitionEndpointManagerForCircuitBreaker.getConsecutiveExceptionBasedCircuitBreaker().getAllowedExceptionCountToMaintainStatus(LocationHealthStatus.HealthyWithFailures, readOperationTrue); for (int i = 1; i <= exceptionCountToHandle; i++) { globalPartitionEndpointManagerForCircuitBreaker - .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedLocationEndpoints(LocationEastUs2EndpointToLocationPair.getKey(), null)); + .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getKey(), null)); } Class[] enclosedClasses = GlobalPartitionEndpointManagerForCircuitBreaker.class.getDeclaredClasses(); @@ -557,7 +556,7 @@ public void recordHealthyTentativeToUnavailableTransition(String partitionLevelC for (int i = 1; i <= exceptionCountToHandle; i++) { globalPartitionEndpointManagerForCircuitBreaker - .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedLocationEndpoints(LocationEastUs2EndpointToLocationPair.getKey(), null)); + .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getKey(), null)); } locationSpecificHealthContext = locationEndpointToLocationSpecificContextForPartition.get(LocationEastUs2EndpointToLocationPair.getKey()); @@ -582,12 +581,12 @@ public void allRegionsUnavailableHandling(String partitionLevelCircuitBreakerCon String maxExclusive = "BB"; String collectionResourceId = "dbs/db1/colls/coll1"; - List applicableReadWriteEndpoints = ImmutableList.of( + List applicableReadWriteEndpoints = ImmutableList.of( LocationEastUs2EndpointToLocationPair, LocationEastUsEndpointToLocationPair, LocationCentralUsEndpointToLocationPair) .stream() - .map(uriStringPair -> new LocationCache.ConsolidatedLocationEndpoints(uriStringPair.getLeft(), null)) + .map(uriStringPair -> new LocationCache.ConsolidatedRegionalEndpoint(uriStringPair.getLeft(), null)) .collect(Collectors.toList()); RxDocumentServiceRequest request = constructRxDocumentServiceRequestInstance( @@ -600,8 +599,8 @@ public void allRegionsUnavailableHandling(String partitionLevelCircuitBreakerCon maxExclusive, LocationEastUs2EndpointToLocationPair.getKey()); - Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); - Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); int exceptionCountToHandle = globalPartitionEndpointManagerForCircuitBreaker @@ -610,11 +609,11 @@ public void allRegionsUnavailableHandling(String partitionLevelCircuitBreakerCon for (int i = 1; i <= exceptionCountToHandle; i++) { globalPartitionEndpointManagerForCircuitBreaker - .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedLocationEndpoints(LocationEastUs2EndpointToLocationPair.getKey(), null)); + .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getKey(), null)); globalPartitionEndpointManagerForCircuitBreaker - .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedLocationEndpoints(LocationEastUsEndpointToLocationPair.getKey(), null)); + .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getKey(), null)); globalPartitionEndpointManagerForCircuitBreaker - .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedLocationEndpoints(LocationCentralUsEndpointToLocationPair.getKey(), null)); + .handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getKey(), null)); } Class[] enclosedClasses = GlobalPartitionEndpointManagerForCircuitBreaker.class.getDeclaredClasses(); @@ -654,12 +653,12 @@ public void multiContainerBothWithSinglePartitionHealthyToUnavailableHandling(St String collectionResourceId1 = "dbs/db1/colls/coll1"; String collectionResourceId2 = "dbs/db1/colls/coll2"; - List applicableReadWriteEndpoints = ImmutableList.of( + List applicableReadWriteEndpoints = ImmutableList.of( LocationEastUs2EndpointToLocationPair, LocationEastUsEndpointToLocationPair, LocationCentralUsEndpointToLocationPair) .stream() - .map(uriStringPair -> new LocationCache.ConsolidatedLocationEndpoints(uriStringPair.getLeft(), null)) + .map(uriStringPair -> new LocationCache.ConsolidatedRegionalEndpoint(uriStringPair.getLeft(), null)) .collect(Collectors.toList()); RxDocumentServiceRequest request1 = constructRxDocumentServiceRequestInstance( @@ -682,15 +681,15 @@ public void multiContainerBothWithSinglePartitionHealthyToUnavailableHandling(St maxExclusive, LocationEastUs2EndpointToLocationPair.getKey()); - Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); - Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); int exceptionCountToHandle = globalPartitionEndpointManagerForCircuitBreaker.getConsecutiveExceptionBasedCircuitBreaker().getAllowedExceptionCountToMaintainStatus(LocationHealthStatus.HealthyWithFailures, readOperationTrue); for (int i = 1; i <= exceptionCountToHandle; i++) { globalPartitionEndpointManagerForCircuitBreaker - .handleLocationExceptionForPartitionKeyRange(request1, new LocationCache.ConsolidatedLocationEndpoints(LocationEastUs2EndpointToLocationPair.getKey(), null)); + .handleLocationExceptionForPartitionKeyRange(request1, new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getKey(), null)); } globalPartitionEndpointManagerForCircuitBreaker.handleLocationSuccessForPartitionKeyRange(request2); @@ -767,16 +766,16 @@ public void allRegionsUnavailableHandlingWithMultiThreading(String partitionLeve String collectionResourceId = "dbs/db1/colls/coll1"; PartitionKeyRange partitionKeyRange = new PartitionKeyRange(pkRangeId, minInclusive, maxExclusive); - List applicableReadWriteEndpoints = ImmutableList.of( + List applicableReadWriteEndpoints = ImmutableList.of( LocationEastUs2EndpointToLocationPair, LocationEastUsEndpointToLocationPair, LocationCentralUsEndpointToLocationPair) .stream() - .map(uriStringPair -> new LocationCache.ConsolidatedLocationEndpoints(uriStringPair.getLeft(), null)) + .map(uriStringPair -> new LocationCache.ConsolidatedRegionalEndpoint(uriStringPair.getLeft(), null)) .collect(Collectors.toList()); - Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); - Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); + Mockito.when(this.globalEndpointManagerMock.getApplicableReadEndpoints(Mockito.anyList())).thenReturn((UnmodifiableList) UnmodifiableList.unmodifiableList(applicableReadWriteEndpoints)); RxDocumentServiceRequest requestCentralUs = constructRxDocumentServiceRequestInstance( readOperationTrue ? OperationType.Read : OperationType.Create, @@ -884,10 +883,10 @@ private static void validateAllRegionsAreNotUnavailableAfterExceptionInLocation( URI locationWithFailure, String collectionResourceId, PartitionKeyRange partitionKeyRange, - List applicableReadWriteLocations) { + List applicableReadWriteLocations) { logger.warn("Handling exception for {}", locationWithFailure.getPath()); - globalPartitionEndpointManagerForCircuitBreaker.handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedLocationEndpoints(locationWithFailure, null)); + globalPartitionEndpointManagerForCircuitBreaker.handleLocationExceptionForPartitionKeyRange(request, new LocationCache.ConsolidatedRegionalEndpoint(locationWithFailure, null)); List unavailableRegions = globalPartitionEndpointManagerForCircuitBreaker.getUnavailableRegionsForPartitionKeyRange(collectionResourceId, partitionKeyRange, request.getOperationType()); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ProactiveConnectionManagementTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ProactiveConnectionManagementTest.java index 5dc05b38428b1..75ec2b6d40136 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ProactiveConnectionManagementTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ProactiveConnectionManagementTest.java @@ -181,13 +181,13 @@ public void openConnectionsAndInitCachesWithContainer(ProactiveConnectionManagem cosmosAsyncContainer.openConnectionsAndInitCaches(proactiveConnectionRegionCount).block(); - UnmodifiableList readEndpoints = + UnmodifiableList readEndpoints = globalEndpointManager.getReadEndpoints(); List proactiveConnectionEndpoints = readEndpoints.subList( 0, Math.min(readEndpoints.size(),proactiveContainerInitConfig.getProactiveConnectionRegionsCount())) - .stream().map(LocationCache.ConsolidatedLocationEndpoints::getGatewayLocationEndpoint).collect(Collectors.toList()); + .stream().map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint).collect(Collectors.toList()); Mono asyncContainerMono = Mono.just(cosmosAsyncContainer); @@ -345,13 +345,13 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect ConcurrentHashMap routingMap = getRoutingMap(rxDocumentClient); ConcurrentHashMap collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient); Set endpoints = ConcurrentHashMap.newKeySet(); - UnmodifiableList readEndpoints = globalEndpointManager.getReadEndpoints(); + UnmodifiableList readEndpoints = globalEndpointManager.getReadEndpoints(); List proactiveConnectionEndpoints = readEndpoints.subList( 0, Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount())) .stream() - .map(LocationCache.ConsolidatedLocationEndpoints::getGatewayLocationEndpoint) + .map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint) .collect(Collectors.toList()); Flux asyncContainerFlux = Flux.fromIterable(asyncContainers); @@ -495,12 +495,12 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect ConcurrentHashMap routingMap = getRoutingMap(rxDocumentClient); ConcurrentHashMap collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient); Set endpoints = ConcurrentHashMap.newKeySet(); - UnmodifiableList readEndpoints = globalEndpointManager.getReadEndpoints(); + UnmodifiableList readEndpoints = globalEndpointManager.getReadEndpoints(); List proactiveConnectionEndpoints = readEndpoints.subList( 0, Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount())) .stream() - .map(LocationCache.ConsolidatedLocationEndpoints::getGatewayLocationEndpoint) + .map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint) .collect(Collectors.toList());; Flux asyncContainerFlux = Flux.fromIterable(asyncContainers); @@ -666,12 +666,12 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect ConcurrentHashMap routingMap = getRoutingMap(rxDocumentClient); ConcurrentHashMap collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient); Set endpoints = ConcurrentHashMap.newKeySet(); - UnmodifiableList readEndpoints = globalEndpointManager.getReadEndpoints(); + UnmodifiableList readEndpoints = globalEndpointManager.getReadEndpoints(); List proactiveConnectionEndpoints = readEndpoints.subList( 0, Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount())) .stream() - .map(LocationCache.ConsolidatedLocationEndpoints::getGatewayLocationEndpoint) + .map(LocationCache.ConsolidatedRegionalEndpoint::getGatewayLocationEndpoint) .collect(Collectors.toList());; Flux asyncContainerFlux = Flux.fromIterable(asyncContainers); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RegionScopedSessionContainerTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RegionScopedSessionContainerTest.java index 22bee3a048606..b050f7a0f5196 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RegionScopedSessionContainerTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RegionScopedSessionContainerTest.java @@ -8,6 +8,7 @@ import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; import com.azure.cosmos.implementation.guava25.collect.ImmutableMap; +import com.azure.cosmos.implementation.routing.LocationCache; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.ModelBridgeUtils; import com.azure.cosmos.models.PartitionKey; @@ -374,7 +375,11 @@ public void sessionContainer() throws Exception { int numPartitionKeyRangeIds = 5; String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(LocationEastUsEndpointToLocationPair.getLeft()), Mockito.any())).thenReturn(regionContacted); @@ -434,7 +439,11 @@ public void setSessionToken_NoSessionTokenForPartitionKeyRangeId() throws Except GlobalEndpointManager globalEndpointManagerMock = Mockito.mock(GlobalEndpointManager.class); ISessionContainer sessionContainer = new RegionScopedSessionContainer("127.0.0.1", false, globalEndpointManagerMock); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(endpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -497,7 +506,11 @@ public void setSessionToken_MergeOldWithNew() throws Exception { GlobalEndpointManager globalEndpointManagerMock = Mockito.mock(GlobalEndpointManager.class); RegionScopedSessionContainer sessionContainer = new RegionScopedSessionContainer("127.0.0.1", false, globalEndpointManagerMock); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -550,12 +563,15 @@ public void resolveGlobalSessionTokenReturnsEmptyStringOnCacheMiss() { String initialSessionToken = "1#100#1=20#2=5#3=30"; String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - String resultantSessionToken = partitionKeyRangeId + ":" + initialSessionToken; GlobalEndpointManager globalEndpointManagerMock = Mockito.mock(GlobalEndpointManager.class); RegionScopedSessionContainer sessionContainer = new RegionScopedSessionContainer("127.0.0.1", false, globalEndpointManagerMock); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -583,7 +599,11 @@ public void resolveGlobalSessionTokenReturnsTokenMapUsingName() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -625,7 +645,11 @@ public void resolveGlobalSessionTokenReturnsTokenMapUsingResourceId() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -669,7 +693,11 @@ public void resolveLocalSessionTokenReturnsTokenMapUsingName() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -709,7 +737,11 @@ public void resolveLocalSessionTokenReturnsTokenMapUsingResourceId() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -754,7 +786,11 @@ public void resolveLocalSessionTokenReturnsNullOnPartitionMiss() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -795,7 +831,11 @@ public void resolveLocalSessionTokenReturnsNullOnCollectionMiss() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -834,7 +874,11 @@ public void resolvePartitionLocalSessionTokenReturnsTokenOnParentMatch() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -876,7 +920,11 @@ public void clearTokenByCollectionFullNameRemovesToken() { URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); String unparsedSessionToken = "range_0:1#100#1=20#2=5#3=30"; - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -935,7 +983,11 @@ public void clearTokenByResourceIdRemovesToken() { String unparsedSessionToken = "range_0:1#100#1=20#2=5#3=30"; - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -995,7 +1047,11 @@ public void clearTokenKeepsUnmatchedCollection() { URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); String unparsedSessionToken = "range_0:1#100#1=20#2=5#3=30"; - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -1050,7 +1106,11 @@ public void setSessionTokenSetsTokenWhenRequestIsntNameBased() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -1088,7 +1148,11 @@ public void setSessionTokenGivesPriorityToOwnerFullNameOverResourceAddress() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -1131,7 +1195,11 @@ public void setSessionTokenIgnoresOwnerIdWhenRequestIsntNameBased() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -1181,7 +1249,11 @@ public void setSessionTokenGivesPriorityToOwnerIdOverResourceIdWhenRequestIsName String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(LocationEastUsEndpointToLocationPair.getLeft()), Mockito.any())).thenReturn(regionContacted); @@ -1245,7 +1317,11 @@ public void setSessionTokenDoesntOverwriteHigherLSN() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -1289,7 +1365,11 @@ public void setSessionTokenOverwriteLowerLSN() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -1332,7 +1412,11 @@ public void setSessionTokenDoesNothingOnEmptySessionTokenHeader() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -1424,7 +1508,11 @@ public void useParentSessionTokenAfterSplit() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -1472,7 +1560,11 @@ public void useParentSessionTokenAfterMerge() { String regionContacted = LocationEastUsEndpointToLocationPair.getRight(); URI locationEndpointContacted = LocationEastUsEndpointToLocationPair.getLeft(); - UnmodifiableList endpoints = new UnmodifiableList<>(ImmutableList.of(LocationEastUsEndpointToLocationPair.getLeft(), LocationEastUs2EndpointToLocationPair.getLeft(), LocationCentralUsEndpointToLocationPair.getLeft())); + UnmodifiableList endpoints = new UnmodifiableList<>( + ImmutableList.of( + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUsEndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationEastUs2EndpointToLocationPair.getLeft(), null), + new LocationCache.ConsolidatedRegionalEndpoint(LocationCentralUsEndpointToLocationPair.getLeft(), null))); Mockito.when(globalEndpointManagerMock.getReadEndpoints()).thenReturn(endpoints); Mockito.when(globalEndpointManagerMock.getRegionName(Mockito.eq(locationEndpointContacted), Mockito.any())).thenReturn(regionContacted); @@ -1567,9 +1659,14 @@ public void resolvePartitionLocalSessionToken( GlobalEndpointManager globalEndpointManagerMock = null; RegionScopedSessionContainer sessionContainer = null; - List writableURIs = writableURIToLocationMappings + List consolidatedWriteRegionalEndpoints = writableURIToLocationMappings .stream() - .map(uriToLocationMappings -> uriToLocationMappings.getLeft()) + .map(uriToLocationMappings -> new LocationCache.ConsolidatedRegionalEndpoint(uriToLocationMappings.getLeft(), null)) + .collect(Collectors.toList()); + + List consolidatedReadRegionalEndpoints = readEndpoints + .stream() + .map(readEndpoint -> new LocationCache.ConsolidatedRegionalEndpoint(readEndpoint, null)) .collect(Collectors.toList()); DatabaseAccount databaseAccount = ModelBridgeUtils.createDatabaseAccount( @@ -1584,7 +1681,7 @@ public void resolvePartitionLocalSessionToken( .when(globalEndpointManagerMock.getLatestDatabaseAccount()) .thenReturn(databaseAccount); - UnmodifiableList readEndpointsInUnmodifiableList = new UnmodifiableList<>(readEndpoints); + UnmodifiableList readEndpointsInUnmodifiableList = new UnmodifiableList<>(consolidatedReadRegionalEndpoints); Mockito .when(globalEndpointManagerMock.getReadEndpoints()) @@ -1592,7 +1689,7 @@ public void resolvePartitionLocalSessionToken( Mockito .when(globalEndpointManagerMock.getApplicableWriteEndpoints(Mockito.anyList())) - .thenReturn(new UnmodifiableList<>(writableURIs)); + .thenReturn(new UnmodifiableList<>(consolidatedWriteRegionalEndpoints)); Mockito .when(globalEndpointManagerMock.canUseMultipleWriteLocations(Mockito.any())) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java index 73c4110cea1c3..66ab13f838bed 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolverTest.java @@ -25,6 +25,7 @@ import com.azure.cosmos.implementation.caches.RxCollectionCache; import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; import com.azure.cosmos.implementation.http.HttpClient; +import com.azure.cosmos.implementation.routing.LocationCache; import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import com.azure.cosmos.models.CosmosContainerIdentity; @@ -78,17 +79,17 @@ public void before_GlobalAddressResolverTest() throws Exception { httpClient = Mockito.mock(HttpClient.class); endpointManager = Mockito.mock(GlobalEndpointManager.class); - List readEndPointList = new ArrayList<>(); - readEndPointList.add(urlforRead1); - readEndPointList.add(urlforRead2); - readEndPointList.add(urlforRead3); - UnmodifiableList readList = new UnmodifiableList<>(readEndPointList); + List readEndPointList = new ArrayList<>(); + readEndPointList.add(new LocationCache.ConsolidatedRegionalEndpoint(urlforRead1, null)); + readEndPointList.add(new LocationCache.ConsolidatedRegionalEndpoint(urlforRead2, null)); + readEndPointList.add(new LocationCache.ConsolidatedRegionalEndpoint(urlforRead3, null)); + UnmodifiableList readList = new UnmodifiableList<>(readEndPointList); - List writeEndPointList = new ArrayList<>(); - writeEndPointList.add(urlforWrite1); - writeEndPointList.add(urlforWrite2); - writeEndPointList.add(urlforWrite3); - UnmodifiableList writeList = new UnmodifiableList<>(writeEndPointList); + List writeEndPointList = new ArrayList<>(); + writeEndPointList.add(new LocationCache.ConsolidatedRegionalEndpoint(urlforWrite1, null)); + writeEndPointList.add(new LocationCache.ConsolidatedRegionalEndpoint(urlforWrite2, null)); + writeEndPointList.add(new LocationCache.ConsolidatedRegionalEndpoint(urlforWrite3, null)); + UnmodifiableList writeList = new UnmodifiableList<>(writeEndPointList); Mockito.when(endpointManager.getReadEndpoints()).thenReturn(readList); Mockito.when(endpointManager.getWriteEndpoints()).thenReturn(writeList); @@ -122,7 +123,7 @@ public void resolveAsync() throws Exception { assertThat(urlsBeforeResolve.contains(urlforRead3)).isFalse();//Last read will be removed from addressCacheByEndpoint after 5 endpoints assertThat(urlsBeforeResolve.contains(urlforRead2)).isTrue(); - URI testUrl = new URI("http://Test.com/"); + LocationCache.ConsolidatedRegionalEndpoint testUrl = new LocationCache.ConsolidatedRegionalEndpoint(new URI("http://Test.com/"), null); Mockito.when(endpointManager.resolveServiceEndpoint(ArgumentMatchers.any())).thenReturn(testUrl); globalAddressResolver.resolveAsync(request, true); Set urlsAfterResolve = globalAddressResolver.addressCacheByEndpoint.keySet(); @@ -156,8 +157,9 @@ public void submitOpenConnectionTasksAndInitCaches() { AddressInformation addressInformation = new AddressInformation(true, true, "https://be1.west-us.com:8080", Protocol.TCP); Mockito - .when(endpointManager.getReadEndpoints()) - .thenReturn(new UnmodifiableList(Arrays.asList(urlforRead1, urlforRead2))); + .when(endpointManager.getReadEndpoints()) + .thenReturn(new UnmodifiableList<>( + Arrays.asList(new LocationCache.ConsolidatedRegionalEndpoint(urlforRead1, null), new LocationCache.ConsolidatedRegionalEndpoint(urlforRead2, null)))); DocumentCollection documentCollection = new DocumentCollection(); documentCollection.setId("TestColl"); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/MetadataRequestRetryPolicyTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/MetadataRequestRetryPolicyTests.java index f747de6ffa547..63b2f92911cd1 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/MetadataRequestRetryPolicyTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/MetadataRequestRetryPolicyTests.java @@ -32,6 +32,7 @@ import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.http.HttpClientConfig; import com.azure.cosmos.implementation.http.HttpTimeoutPolicyControlPlaneHotPath; +import com.azure.cosmos.implementation.routing.LocationCache; import com.azure.cosmos.implementation.throughputControl.TestItem; import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosPatchOperations; @@ -234,14 +235,14 @@ public void forceBackgroundAddressRefresh_onConnectionTimeoutAndRequestCancellat GlobalAddressResolver globalAddressResolver = ReflectionUtils.getGlobalAddressResolver(asyncDocumentClient); GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(asyncDocumentClient); - List readEndpoints = globalEndpointManager.getReadEndpoints(); + List readEndpoints = globalEndpointManager.getReadEndpoints(); Map endpointCacheByURIMap = globalAddressResolver.addressCacheByEndpoint; Map httpClientWrapperByRegionMap = new ConcurrentHashMap<>(); for (int i = 0; i < preferredRegions.size(); i++) { - URI readEndpoint = readEndpoints.get(i); + URI readEndpoint = readEndpoints.get(i).getGatewayLocationEndpoint(); GlobalAddressResolver.EndpointCache endpointCache = endpointCacheByURIMap.get(readEndpoint); GatewayAddressCache gatewayAddressCache = endpointCache.addressCache; HttpClientUnderTestWrapper httpClientUnderTestWrapper = getHttpClientUnderTestWrapper(configs); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/LocationCacheTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/LocationCacheTest.java index 098c2ed248259..f54e0747fdbe5 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/LocationCacheTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/LocationCacheTest.java @@ -512,11 +512,11 @@ public void validateExcludedRegions( request.requestContext.setExcludeRegions(excludedRegionsOnRequest); if (request.isReadOnlyRequest()) { - List applicableReadEndpoints = cache.getApplicableReadEndpoints(request); + List applicableReadEndpoints = cache.getApplicableReadEndpoints(request); assertThat(applicableReadEndpoints.size()).isEqualTo(expectedApplicableEndpoints.size()); expectedApplicableEndpoints.forEach(endpoint -> assertThat(expectedApplicableEndpoints.contains(endpoint)).isTrue()); } else { - List applicableWriteEndpoints = cache.getApplicableWriteEndpoints(request); + List applicableWriteEndpoints = cache.getApplicableWriteEndpoints(request); assertThat(applicableWriteEndpoints.size()).isEqualTo(expectedApplicableEndpoints.size()); expectedApplicableEndpoints.forEach(endpoint -> assertThat(expectedApplicableEndpoints.contains(endpoint)).isTrue()); } @@ -532,8 +532,8 @@ public void validateEffectivePreferredRegions( boolean isDefaultEndpointAlsoRegionalEndpoint) { this.initialize(true, true, isPreferredLocationsListEmpty, isDefaultEndpointAlsoRegionalEndpoint); - List applicableReadEndpoints = cache.getApplicableReadEndpoints(request); - List applicableWriteEndpoints = cache.getApplicableWriteEndpoints(request); + List applicableReadEndpoints = cache.getApplicableReadEndpoints(request); + List applicableWriteEndpoints = cache.getApplicableWriteEndpoints(request); if (request.isReadOnlyRequest()) { assertThat(applicableReadEndpoints.size()).isEqualTo(expectedApplicableReadEndpoints.size()); @@ -667,8 +667,8 @@ private void validateLocationCacheAsync( endpointDiscoveryEnabled, isPreferredListEmpty); - UnmodifiableList currentWriteEndpoints = this.cache.getWriteEndpoints(); - UnmodifiableList currentReadEndpoints = this.cache.getReadEndpoints(); + UnmodifiableList currentWriteEndpoints = this.cache.getWriteEndpoints(); + UnmodifiableList currentReadEndpoints = this.cache.getReadEndpoints(); for (int i = 0; i < readLocationIndex; i++) { this.cache.markEndpointUnavailableForRead(createUrl(Iterables.get(this.databaseAccount.getReadableLocations(), i).getEndpoint())); this.endpointManager.markEndpointUnavailableForRead(createUrl(Iterables.get(this.databaseAccount.getReadableLocations(), i).getEndpoint()));; @@ -898,7 +898,7 @@ private void validateRequestEndpointResolution( // If current write endpoint is unavailable, write endpoints order doesn't change // ALL write requests flip-flop between current write and alternate write endpoint - UnmodifiableList writeEndpoints = this.cache.getWriteEndpoints(); + UnmodifiableList writeEndpoints = this.cache.getWriteEndpoints(); assertThat(firstAvailableWriteEndpoint).isEqualTo(writeEndpoints.get(0)); assertThat(secondAvailableWriteEndpoint).isEqualTo(this.resolveEndpointForWriteRequest(ResourceType.Document, true)); @@ -913,13 +913,13 @@ private void validateRequestEndpointResolution( assertThat(firstAvailableReadEndpoint).isEqualTo(this.resolveEndpointForReadRequest(false)); } - private URI resolveEndpointForReadRequest(boolean masterResourceType) { + private LocationCache.ConsolidatedRegionalEndpoint resolveEndpointForReadRequest(boolean masterResourceType) { RxDocumentServiceRequest request = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Read, masterResourceType ? ResourceType.Database : ResourceType.Document); return this.cache.resolveServiceEndpoint(request); } - private URI resolveEndpointForWriteRequest(ResourceType resourceType, boolean useAlternateWriteEndpoint) { + private LocationCache.ConsolidatedRegionalEndpoint resolveEndpointForWriteRequest(ResourceType resourceType, boolean useAlternateWriteEndpoint) { RxDocumentServiceRequest request = RxDocumentServiceRequest.create(mockDiagnosticsClientContext(), OperationType.Create, resourceType); request.requestContext.routeToLocation(useAlternateWriteEndpoint ? 1 : 0, resourceType.isCollectionChild()); return this.cache.resolveServiceEndpoint(request); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java index 445132bb02c1b..0c565330ecf51 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosDiagnostics.java @@ -331,7 +331,7 @@ String getFirstContactedRegion() { return this.clientSideRequestStatistics.getFirstContactedRegion(); } - LocationCache.ConsolidatedLocationEndpoints getFirstContactedLocationEndpoint() { + LocationCache.ConsolidatedRegionalEndpoint getFirstContactedLocationEndpoint() { return this.clientSideRequestStatistics.getFirstContactedLocationEndpoint(); } @@ -479,7 +479,7 @@ public void setDiagnosticsContext(CosmosDiagnostics cosmosDiagnostics, CosmosDia } @Override - public LocationCache.ConsolidatedLocationEndpoints getFirstContactedLocationEndpoint(CosmosDiagnostics cosmosDiagnostics) { + public LocationCache.ConsolidatedRegionalEndpoint getFirstContactedLocationEndpoint(CosmosDiagnostics cosmosDiagnostics) { if (cosmosDiagnostics == null) { return null; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java index 446f691cb61e8..a281c8933f7aa 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java @@ -48,7 +48,7 @@ public class ClientRetryPolicy extends DocumentClientRetryPolicy { private boolean isReadRequest; private boolean canUseMultipleWriteLocations; private URI locationEndpoint; - private LocationCache.ConsolidatedLocationEndpoints consolidatedLocationEndpoints; + private LocationCache.ConsolidatedRegionalEndpoint consolidatedRegionalEndpoint; private RetryContext retryContext; private CosmosDiagnostics cosmosDiagnostics; private AtomicInteger cnt = new AtomicInteger(0); @@ -89,7 +89,7 @@ public Mono shouldRetry(Exception e) { isReadRequest, canUseMultipleWriteLocations, e); - if (this.locationEndpoint == null || this.consolidatedLocationEndpoints == null) { + if (this.locationEndpoint == null || this.consolidatedRegionalEndpoint == null) { // on before request is not invoked because Document Service Request creation failed. logger.error("locationEndpoint is null because ClientRetryPolicy::onBeforeRequest(.) is not invoked, " + "probably request creation failed due to invalid options, serialization setting, etc."); @@ -231,7 +231,7 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable(RxDocumentServiceRequ return ShouldRetryResult.noRetry(); } else { if (this.canUseMultipleWriteLocations) { - UnmodifiableList endpoints = + UnmodifiableList endpoints = this.isReadRequest ? this.globalEndpointManager.getApplicableReadEndpoints(request) : this.globalEndpointManager.getApplicableWriteEndpoints(request); @@ -307,7 +307,7 @@ private Mono shouldRetryOnGatewayTimeout() { boolean canFailoverOnTimeout = canGatewayRequestFailoverOnTimeout(this.request); if (this.globalPartitionEndpointManagerForCircuitBreaker.isPartitionLevelCircuitBreakingApplicable(this.request)) { - this.globalPartitionEndpointManagerForCircuitBreaker.handleLocationExceptionForPartitionKeyRange(this.request, this.request.requestContext.consolidatedLocationEndpointsToRoute); + this.globalPartitionEndpointManagerForCircuitBreaker.handleLocationExceptionForPartitionKeyRange(this.request, this.request.requestContext.consolidatedRegionalEndpointToRoute); } //if operation is data plane read, metadata read, or query plan it can be retried on a different endpoint. @@ -341,10 +341,10 @@ private Mono refreshLocation(boolean isReadRequest, boolean forceRefresh, // Mark the current read endpoint as unavailable if (isReadRequest) { logger.warn("marking the endpoint {} as unavailable for read",this.locationEndpoint); - this.globalEndpointManager.markEndpointUnavailableForRead(this.consolidatedLocationEndpoints.getGatewayLocationEndpoint()); + this.globalEndpointManager.markEndpointUnavailableForRead(this.consolidatedRegionalEndpoint.getGatewayLocationEndpoint()); } else { logger.warn("marking the endpoint {} as unavailable for write",this.locationEndpoint); - this.globalEndpointManager.markEndpointUnavailableForWrite(this.consolidatedLocationEndpoints.getGatewayLocationEndpoint()); + this.globalEndpointManager.markEndpointUnavailableForWrite(this.consolidatedRegionalEndpoint.getGatewayLocationEndpoint()); } this.retryContext = new RetryContext(this.failoverRetryCount, usePreferredLocations); @@ -359,7 +359,7 @@ private Mono shouldRetryOnBackendServiceUnavailableAsync( if (this.globalPartitionEndpointManagerForCircuitBreaker.isPartitionLevelCircuitBreakingApplicable(this.request)) { this.globalPartitionEndpointManagerForCircuitBreaker - .handleLocationExceptionForPartitionKeyRange(this.request, this.request.requestContext.consolidatedLocationEndpointsToRoute); + .handleLocationExceptionForPartitionKeyRange(this.request, this.request.requestContext.consolidatedRegionalEndpointToRoute); } // The request has failed with 503, SDK need to decide whether it is safe to retry for write operations @@ -421,7 +421,7 @@ private Mono shouldRetryOnRequestTimeout( this.globalPartitionEndpointManagerForCircuitBreaker.handleLocationExceptionForPartitionKeyRange( this.request, - this.request.requestContext.consolidatedLocationEndpointsToRoute); + this.request.requestContext.consolidatedRegionalEndpointToRoute); } } @@ -434,7 +434,7 @@ private Mono shouldRetryOnInternalServerError() { this.globalPartitionEndpointManagerForCircuitBreaker.handleLocationExceptionForPartitionKeyRange( this.request, - this.request.requestContext.consolidatedLocationEndpointsToRoute); + this.request.requestContext.consolidatedRegionalEndpointToRoute); } return Mono.just(ShouldRetryResult.NO_RETRY); @@ -463,17 +463,17 @@ public void onBeforeSendRequest(RxDocumentServiceRequest request) { // Resolve the endpoint for the request and pin the resolution to the resolved endpoint // This enables marking the endpoint unavailability on endpoint failover/unreachability - this.consolidatedLocationEndpoints = this.globalEndpointManager.resolveServiceEndpoint(request); - this.locationEndpoint = request.useThinProxy && this.consolidatedLocationEndpoints.getThinClientLocationEndpoint() != null ? - this.consolidatedLocationEndpoints.getThinClientLocationEndpoint() : - this.consolidatedLocationEndpoints.getGatewayLocationEndpoint(); + this.consolidatedRegionalEndpoint = this.globalEndpointManager.resolveServiceEndpoint(request); + this.locationEndpoint = request.useThinProxy && this.consolidatedRegionalEndpoint.getThinClientLocationEndpoint() != null ? + this.consolidatedRegionalEndpoint.getThinClientLocationEndpoint() : + this.consolidatedRegionalEndpoint.getGatewayLocationEndpoint(); - if (this.consolidatedLocationEndpoints.getThinClientLocationEndpoint() == null) { + if (this.consolidatedRegionalEndpoint.getThinClientLocationEndpoint() == null) { request.useThinProxy = false; } if (request.requestContext != null) { - request.requestContext.routeToLocation(this.locationEndpoint, this.consolidatedLocationEndpoints); + request.requestContext.routeToLocation(this.locationEndpoint, this.consolidatedRegionalEndpoint); } } @@ -524,8 +524,8 @@ private URI getGatewayLocationEndpoint(RxDocumentServiceRequest request) { Objects.requireNonNull(request, "Argument 'request' must not be null'"); Objects.requireNonNull(request.requestContext, "Argument 'request.requestContext' must not be null'"); - Objects.requireNonNull(request.requestContext.consolidatedLocationEndpointsToRoute, "Argument 'request.requestContext.consolidatedLocationEndpointsToRoute' must not be null'"); + Objects.requireNonNull(request.requestContext.consolidatedRegionalEndpointToRoute, "Argument 'request.requestContext.consolidatedRegionalEndpointToRoute' must not be null'"); - return request.requestContext.consolidatedLocationEndpointsToRoute.getGatewayLocationEndpoint(); + return request.requestContext.consolidatedRegionalEndpointToRoute.getGatewayLocationEndpoint(); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java index f2e1209f648f8..72c05373f21a2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java @@ -164,7 +164,7 @@ public void recordResponse(RxDocumentServiceRequest request, StoreResultDiagnost this.requestPayloadSizeInBytes = 0; } - LocationCache.ConsolidatedLocationEndpoints consolidatedLocationEndpoints = null; + LocationCache.ConsolidatedRegionalEndpoint consolidatedRegionalEndpoint = null; URI locationEndPoint = null; if (request.requestContext != null) { @@ -179,7 +179,7 @@ public void recordResponse(RxDocumentServiceRequest request, StoreResultDiagnost } locationEndPoint = request.requestContext.locationEndpointToRoute; - consolidatedLocationEndpoints = request.requestContext.consolidatedLocationEndpointsToRoute; + consolidatedRegionalEndpoint = request.requestContext.consolidatedRegionalEndpointToRoute; List excludedRegions = request.requestContext.getExcludeRegions(); if (excludedRegions != null && !excludedRegions.isEmpty()) { @@ -193,12 +193,12 @@ public void recordResponse(RxDocumentServiceRequest request, StoreResultDiagnost this.requestEndTimeUTC = responseTime; } - if (consolidatedLocationEndpoints != null) { + if (consolidatedRegionalEndpoint != null) { storeResponseStatistics.regionName = - globalEndpointManager.getRegionName(consolidatedLocationEndpoints.getGatewayLocationEndpoint(), request.getOperationType()); + globalEndpointManager.getRegionName(consolidatedRegionalEndpoint.getGatewayLocationEndpoint(), request.getOperationType()); this.regionsContacted.add(storeResponseStatistics.regionName); this.locationEndpointsContacted.add(locationEndPoint); - this.regionsContactedWithContext.add(new RegionWithContext(storeResponseStatistics.regionName, consolidatedLocationEndpoints)); + this.regionsContactedWithContext.add(new RegionWithContext(storeResponseStatistics.regionName, consolidatedRegionalEndpoint)); } if (storeResponseStatistics.requestOperationType == OperationType.Head @@ -222,11 +222,11 @@ public void recordGatewayResponse( this.requestEndTimeUTC = responseTime; } - LocationCache.ConsolidatedLocationEndpoints consolidatedLocationEndpoints = null; + LocationCache.ConsolidatedRegionalEndpoint consolidatedRegionalEndpoint = null; URI locationEndPoint = null; if (rxDocumentServiceRequest != null && rxDocumentServiceRequest.requestContext != null) { - consolidatedLocationEndpoints = rxDocumentServiceRequest.requestContext.consolidatedLocationEndpointsToRoute; + consolidatedRegionalEndpoint = rxDocumentServiceRequest.requestContext.consolidatedRegionalEndpointToRoute; locationEndPoint = rxDocumentServiceRequest.requestContext.locationEndpointToRoute; this.approximateInsertionCountInBloomFilter = rxDocumentServiceRequest.requestContext.getApproximateBloomFilterInsertionCount(); @@ -241,7 +241,7 @@ public void recordGatewayResponse( this.regionsContacted.add(regionName); this.locationEndpointsContacted.add(locationEndPoint); - this.regionsContactedWithContext.add(new RegionWithContext(regionName, consolidatedLocationEndpoints)); + this.regionsContactedWithContext.add(new RegionWithContext(regionName, consolidatedRegionalEndpoint)); } GatewayStatistics gatewayStatistics = new GatewayStatistics(); @@ -659,7 +659,7 @@ public String getFirstContactedRegion() { return this.regionsContactedWithContext.first().regionContacted; } - public LocationCache.ConsolidatedLocationEndpoints getFirstContactedLocationEndpoint() { + public LocationCache.ConsolidatedRegionalEndpoint getFirstContactedLocationEndpoint() { if (this.regionsContactedWithContext == null || this.regionsContactedWithContext.isEmpty()) { return null; } @@ -1056,10 +1056,10 @@ public static CosmosDiagnosticsSystemUsageSnapshot fetchSystemInformation() { static class RegionWithContext implements Comparable { private final String regionContacted; - private final LocationCache.ConsolidatedLocationEndpoints locationEndpointsContacted; + private final LocationCache.ConsolidatedRegionalEndpoint locationEndpointsContacted; private final long recordedTimestamp; - RegionWithContext(String regionContacted, LocationCache.ConsolidatedLocationEndpoints locationEndpointsContacted) { + RegionWithContext(String regionContacted, LocationCache.ConsolidatedRegionalEndpoint locationEndpointsContacted) { this.regionContacted = regionContacted; this.locationEndpointsContacted = locationEndpointsContacted; this.recordedTimestamp = System.currentTimeMillis(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java index 13b9a36865d74..5e4c80972c55e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java @@ -40,7 +40,7 @@ public class DocumentServiceRequestContext implements Cloneable { public volatile Boolean usePreferredLocations; public volatile Integer locationIndexToRoute; public volatile URI locationEndpointToRoute; - public volatile LocationCache.ConsolidatedLocationEndpoints consolidatedLocationEndpointsToRoute; + public volatile LocationCache.ConsolidatedRegionalEndpoint consolidatedRegionalEndpointToRoute; public volatile boolean performedBackgroundAddressRefresh; public volatile boolean performLocalRefreshOnGoneException; public volatile List storeResponses; @@ -82,7 +82,7 @@ public void routeToLocation(int locationIndex, boolean usePreferredLocations) { this.locationIndexToRoute = locationIndex; this.usePreferredLocations = usePreferredLocations; this.locationEndpointToRoute = null; - this.consolidatedLocationEndpointsToRoute = null; + this.consolidatedRegionalEndpointToRoute = null; } /** @@ -91,9 +91,9 @@ public void routeToLocation(int locationIndex, boolean usePreferredLocations) { * * @param locationEndpoint Location endpoint to which the request should be routed. */ - public void routeToLocation(URI locationEndpoint, LocationCache.ConsolidatedLocationEndpoints consolidatedLocationEndpointsToRoute) { + public void routeToLocation(URI locationEndpoint, LocationCache.ConsolidatedRegionalEndpoint consolidatedRegionalEndpointToRoute) { this.locationEndpointToRoute = locationEndpoint; - this.consolidatedLocationEndpointsToRoute = consolidatedLocationEndpointsToRoute; + this.consolidatedRegionalEndpointToRoute = consolidatedRegionalEndpointToRoute; this.locationIndexToRoute = null; this.usePreferredLocations = null; } @@ -104,7 +104,7 @@ public void routeToLocation(URI locationEndpoint, LocationCache.ConsolidatedLoca public void clearRouteToLocation() { this.locationIndexToRoute = null; this.locationEndpointToRoute = null; - this.consolidatedLocationEndpointsToRoute = null; + this.consolidatedRegionalEndpointToRoute = null; this.usePreferredLocations = null; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index 5b6b0785b6a2c..b0cd09f37c864 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -88,32 +88,32 @@ public void init() { startRefreshLocationTimerAsync(true).block(maxInitializationTime); } - public UnmodifiableList getReadEndpoints() { + public UnmodifiableList getReadEndpoints() { // readonly return this.locationCache.getReadEndpoints(); } - public UnmodifiableList getWriteEndpoints() { + public UnmodifiableList getWriteEndpoints() { //readonly return this.locationCache.getWriteEndpoints(); } - public UnmodifiableList getApplicableReadEndpoints(RxDocumentServiceRequest request) { + public UnmodifiableList getApplicableReadEndpoints(RxDocumentServiceRequest request) { // readonly return this.locationCache.getApplicableReadEndpoints(request); } - public UnmodifiableList getApplicableWriteEndpoints(RxDocumentServiceRequest request) { + public UnmodifiableList getApplicableWriteEndpoints(RxDocumentServiceRequest request) { //readonly return this.locationCache.getApplicableWriteEndpoints(request); } - public UnmodifiableList getApplicableReadEndpoints(List excludedRegions) { + public UnmodifiableList getApplicableReadEndpoints(List excludedRegions) { // readonly return this.locationCache.getApplicableReadEndpoints(excludedRegions, Collections.emptyList()); } - public UnmodifiableList getApplicableWriteEndpoints(List excludedRegions) { + public UnmodifiableList getApplicableWriteEndpoints(List excludedRegions) { //readonly return this.locationCache.getApplicableWriteEndpoints(excludedRegions, Collections.emptyList()); } @@ -146,8 +146,8 @@ public static Mono getDatabaseAccountFromAnyLocationsAsync( }); } - public LocationCache.ConsolidatedLocationEndpoints resolveServiceEndpoint(RxDocumentServiceRequest request) { - LocationCache.ConsolidatedLocationEndpoints serviceEndpoints = this.locationCache.resolveServiceEndpoint(request); + public LocationCache.ConsolidatedRegionalEndpoint resolveServiceEndpoint(RxDocumentServiceRequest request) { + LocationCache.ConsolidatedRegionalEndpoint serviceEndpoints = this.locationCache.resolveServiceEndpoint(request); if (request.faultInjectionRequestContext != null) { // TODO: integrate thin client into fault injection request.faultInjectionRequestContext.setLocationEndpointToRoute(serviceEndpoints.getGatewayLocationEndpoint()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index f72bf21de460d..1afa8c2a5bbcf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -83,7 +83,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.net.URI; import java.time.Duration; import java.util.Collection; import java.util.EnumSet; @@ -838,7 +837,7 @@ void recordAddressResolutionEnd( void setDiagnosticsContext(CosmosDiagnostics cosmosDiagnostics, CosmosDiagnosticsContext ctx); - LocationCache.ConsolidatedLocationEndpoints getFirstContactedLocationEndpoint(CosmosDiagnostics cosmosDiagnostics); + LocationCache.ConsolidatedRegionalEndpoint getFirstContactedLocationEndpoint(CosmosDiagnostics cosmosDiagnostics); void mergeMetadataDiagnosticContext(CosmosDiagnostics cosmosDiagnostics, MetadataDiagnosticsContext otherMetadataDiagnosticsContext); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/MetadataRequestRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/MetadataRequestRetryPolicy.java index 2da007c630cb3..06b549096daa6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/MetadataRequestRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/MetadataRequestRetryPolicy.java @@ -72,8 +72,8 @@ public Mono shouldRetry(Exception e) { if (request.requestContext != null && request.requestContext.locationEndpointToRoute != null) { - LocationCache.ConsolidatedLocationEndpoints consolidatedLocationEndpoints = request.requestContext.consolidatedLocationEndpointsToRoute; - URI locationEndpointToRoute = consolidatedLocationEndpoints.getGatewayLocationEndpoint(); + LocationCache.ConsolidatedRegionalEndpoint consolidatedRegionalEndpoint = request.requestContext.consolidatedRegionalEndpointToRoute; + URI locationEndpointToRoute = consolidatedRegionalEndpoint.getGatewayLocationEndpoint(); if (request.isReadOnlyRequest()) { logger.warn("Marking the endpoint : {} as unavailable for read.", locationEndpointToRoute); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 5bbb4f1e5ba8f..ca8061a30b65e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -147,7 +147,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization private final static List EMPTY_REGION_LIST = Collections.emptyList(); - private final static List EMPTY_ENDPOINT_LIST = Collections.emptyList(); + private final static List EMPTY_ENDPOINT_LIST = Collections.emptyList(); private final static ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor diagnosticsAccessor = @@ -6487,7 +6487,7 @@ private DiagnosticsClientContext getEffectiveClientContext(DiagnosticsClientCont * @param operationType - the operationT * @return the applicable endpoints ordered by preference list if any */ - private List getApplicableEndPoints(OperationType operationType, List excludedRegions) { + private List getApplicableEndPoints(OperationType operationType, List excludedRegions) { if (operationType.isReadOnlyOperation()) { return withoutNulls(this.globalEndpointManager.getApplicableReadEndpoints(excludedRegions)); } else if (operationType.isWriteOperation()) { @@ -6497,7 +6497,7 @@ private List getApplicableEndPoints return EMPTY_ENDPOINT_LIST; } - private static List withoutNulls(List orderedEffectiveEndpointsList) { + private static List withoutNulls(List orderedEffectiveEndpointsList) { if (orderedEffectiveEndpointsList == null) { return EMPTY_ENDPOINT_LIST; } @@ -6556,7 +6556,7 @@ private List getApplicableRegionsForSpeculation( return EMPTY_REGION_LIST; } - List consolidatedLocationEndpointsList = getApplicableEndPoints(operationType, excludedRegions); + List consolidatedRegionalEndpointList = getApplicableEndPoints(operationType, excludedRegions); HashSet normalizedExcludedRegions = new HashSet<>(); if (excludedRegions != null) { @@ -6564,7 +6564,7 @@ private List getApplicableRegionsForSpeculation( } List orderedRegionsForSpeculation = new ArrayList<>(); - consolidatedLocationEndpointsList.forEach(consolidatedLocationEndpoints -> { + consolidatedRegionalEndpointList.forEach(consolidatedLocationEndpoints -> { String regionName = this.globalEndpointManager.getRegionName(consolidatedLocationEndpoints.getGatewayLocationEndpoint(), operationType); if (!normalizedExcludedRegions.contains(regionName.toLowerCase(Locale.ROOT))) { orderedRegionsForSpeculation.add(regionName); @@ -6769,7 +6769,7 @@ private Mono executeFeedOperationWithAvailabilityStrategy( private void handleLocationCancellationExceptionForPartitionKeyRange(RxDocumentServiceRequest failedRequest) { - LocationCache.ConsolidatedLocationEndpoints firstContactedLocationEndpoint = diagnosticsAccessor + LocationCache.ConsolidatedRegionalEndpoint firstContactedLocationEndpoint = diagnosticsAccessor .getFirstContactedLocationEndpoint(failedRequest.requestContext.cosmosDiagnostics); if (firstContactedLocationEndpoint != null) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/circuitBreaker/GlobalPartitionEndpointManagerForCircuitBreaker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/circuitBreaker/GlobalPartitionEndpointManagerForCircuitBreaker.java index 7f1b6146b938f..e4a74005508b5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/circuitBreaker/GlobalPartitionEndpointManagerForCircuitBreaker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/circuitBreaker/GlobalPartitionEndpointManagerForCircuitBreaker.java @@ -50,7 +50,7 @@ public class GlobalPartitionEndpointManagerForCircuitBreaker implements AutoClos private final LocationSpecificHealthContextTransitionHandler locationSpecificHealthContextTransitionHandler; private final ConsecutiveExceptionBasedCircuitBreaker consecutiveExceptionBasedCircuitBreaker; private final AtomicReference globalAddressResolverSnapshot; - private final ConcurrentHashMap locationToRegion; + private final ConcurrentHashMap locationToRegion; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Scheduler partitionRecoveryScheduler = Schedulers.newSingle("partition-availability-staleness-check"); @@ -73,7 +73,7 @@ public void init() { } } - public void handleLocationExceptionForPartitionKeyRange(RxDocumentServiceRequest request, LocationCache.ConsolidatedLocationEndpoints failedLocation) { + public void handleLocationExceptionForPartitionKeyRange(RxDocumentServiceRequest request, LocationCache.ConsolidatedRegionalEndpoint failedLocation) { checkNotNull(request, "Argument 'request' cannot be null!"); checkNotNull(request.requestContext, "Argument 'request.requestContext' cannot be null!"); @@ -110,7 +110,7 @@ public void handleLocationExceptionForPartitionKeyRange(RxDocumentServiceRequest if (isFailureThresholdBreached.get()) { - UnmodifiableList applicableEndpoints = request.isReadOnlyRequest() ? + UnmodifiableList applicableEndpoints = request.isReadOnlyRequest() ? this.globalEndpointManager.getApplicableReadEndpoints(request.requestContext.getExcludeRegions()) : this.globalEndpointManager.getApplicableWriteEndpoints(request.requestContext.getExcludeRegions()); @@ -162,7 +162,7 @@ public void handleLocationSuccessForPartitionKeyRange(RxDocumentServiceRequest r String resourceId = request.getResourceId(); PartitionKeyRangeWrapper partitionKeyRangeWrapper = new PartitionKeyRangeWrapper(resolvedPartitionKeyRangeForCircuitBreaker, resourceId); - LocationCache.ConsolidatedLocationEndpoints succeededLocation = request.requestContext.consolidatedLocationEndpointsToRoute; + LocationCache.ConsolidatedRegionalEndpoint succeededLocation = request.requestContext.consolidatedRegionalEndpointToRoute; String collectionLink = getCollectionLink(request); @@ -195,13 +195,13 @@ public List getUnavailableRegionsForPartitionKeyRange(String collectionR List unavailableRegions = new ArrayList<>(); if (partitionLevelLocationUnavailabilityInfoSnapshot != null) { - Map locationEndpointToFailureMetricsForPartition = + Map locationEndpointToFailureMetricsForPartition = partitionLevelLocationUnavailabilityInfoSnapshot.locationEndpointToLocationSpecificContextForPartition; - for (Map.Entry pair : locationEndpointToFailureMetricsForPartition.entrySet()) { - LocationCache.ConsolidatedLocationEndpoints consolidatedLocationEndpoints = pair.getKey(); + for (Map.Entry pair : locationEndpointToFailureMetricsForPartition.entrySet()) { + LocationCache.ConsolidatedRegionalEndpoint consolidatedRegionalEndpoint = pair.getKey(); - URI gatewayLocationEndpoint = consolidatedLocationEndpoints.getGatewayLocationEndpoint(); + URI gatewayLocationEndpoint = consolidatedRegionalEndpoint.getGatewayLocationEndpoint(); LocationSpecificHealthContext locationSpecificHealthContext = pair.getValue(); @@ -229,11 +229,11 @@ private Flux updateStaleLocationInfo() { if (partitionLevelLocationUnavailabilityInfo != null) { - List>> locationToLocationSpecificHealthContextList = new ArrayList<>(); + List>> locationToLocationSpecificHealthContextList = new ArrayList<>(); - for (Map.Entry locationToLocationLevelMetrics : partitionLevelLocationUnavailabilityInfo.locationEndpointToLocationSpecificContextForPartition.entrySet()) { + for (Map.Entry locationToLocationLevelMetrics : partitionLevelLocationUnavailabilityInfo.locationEndpointToLocationSpecificContextForPartition.entrySet()) { - LocationCache.ConsolidatedLocationEndpoints locationWithStaleUnavailabilityInfo = locationToLocationLevelMetrics.getKey(); + LocationCache.ConsolidatedRegionalEndpoint locationWithStaleUnavailabilityInfo = locationToLocationLevelMetrics.getKey(); LocationSpecificHealthContext locationSpecificHealthContext = locationToLocationLevelMetrics.getValue(); if (!locationSpecificHealthContext.isRegionAvailableToProcessRequests()) { @@ -260,7 +260,7 @@ private Flux updateStaleLocationInfo() { .flatMap(locationToLocationSpecificHealthContextPair -> { PartitionKeyRangeWrapper partitionKeyRangeWrapper = locationToLocationSpecificHealthContextPair.getLeft(); - LocationCache.ConsolidatedLocationEndpoints locationWithStaleUnavailabilityInfo = locationToLocationSpecificHealthContextPair.getRight().getLeft(); + LocationCache.ConsolidatedRegionalEndpoint locationWithStaleUnavailabilityInfo = locationToLocationSpecificHealthContextPair.getRight().getLeft(); PartitionLevelLocationUnavailabilityInfo partitionLevelLocationUnavailabilityInfo = this.partitionKeyRangeToLocationSpecificUnavailabilityInfo.get(partitionKeyRangeWrapper); @@ -357,7 +357,7 @@ public boolean isPartitionLevelCircuitBreakingApplicable(RxDocumentServiceReques return false; } - UnmodifiableList applicableWriteEndpoints = globalEndpointManager.getApplicableWriteEndpoints(Collections.emptyList()); + UnmodifiableList applicableWriteEndpoints = globalEndpointManager.getApplicableWriteEndpoints(Collections.emptyList()); return applicableWriteEndpoints != null && applicableWriteEndpoints.size() > 1; } @@ -374,7 +374,7 @@ public void close() { private class PartitionLevelLocationUnavailabilityInfo { - private final ConcurrentHashMap locationEndpointToLocationSpecificContextForPartition; + private final ConcurrentHashMap locationEndpointToLocationSpecificContextForPartition; private final ConcurrentHashMap regionToLocationSpecificHealthContext; private final LocationSpecificHealthContextTransitionHandler locationSpecificHealthContextTransitionHandler; @@ -386,7 +386,7 @@ private PartitionLevelLocationUnavailabilityInfo() { private boolean handleException( PartitionKeyRangeWrapper partitionKeyRangeWrapper, - LocationCache.ConsolidatedLocationEndpoints locationWithException, + LocationCache.ConsolidatedRegionalEndpoint locationWithException, boolean isReadOnlyRequest) { AtomicBoolean isExceptionThresholdBreached = new AtomicBoolean(false); @@ -435,7 +435,7 @@ private boolean handleException( private void handleSuccess( PartitionKeyRangeWrapper partitionKeyRangeWrapper, - LocationCache.ConsolidatedLocationEndpoints succeededLocation, + LocationCache.ConsolidatedRegionalEndpoint succeededLocation, boolean isReadOnlyRequest) { this.locationEndpointToLocationSpecificContextForPartition.compute(succeededLocation, (locationAsKey, locationSpecificContextAsVal) -> { @@ -479,9 +479,9 @@ private void handleSuccess( }); } - public boolean areLocationsAvailableForPartitionKeyRange(List availableLocationsAtAccountLevel) { + public boolean areLocationsAvailableForPartitionKeyRange(List availableLocationsAtAccountLevel) { - for (LocationCache.ConsolidatedLocationEndpoints availableLocation : availableLocationsAtAccountLevel) { + for (LocationCache.ConsolidatedRegionalEndpoint availableLocation : availableLocationsAtAccountLevel) { if (!this.locationEndpointToLocationSpecificContextForPartition.containsKey(availableLocation)) { return true; } else { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java index e3eb844604611..99e6f626d86e6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java @@ -92,10 +92,10 @@ public GlobalAddressResolver( this.addressCacheByEndpoint = new ConcurrentHashMap<>(); this.apiType = apiType; - for (LocationCache.ConsolidatedLocationEndpoints endpoint : endpointManager.getWriteEndpoints()) { + for (LocationCache.ConsolidatedRegionalEndpoint endpoint : endpointManager.getWriteEndpoints()) { this.getOrAddEndpoint(endpoint.getGatewayLocationEndpoint()); } - for (LocationCache.ConsolidatedLocationEndpoints endpoint : endpointManager.getReadEndpoints()) { + for (LocationCache.ConsolidatedRegionalEndpoint endpoint : endpointManager.getReadEndpoints()) { this.getOrAddEndpoint(endpoint.getGatewayLocationEndpoint()); } } @@ -273,7 +273,7 @@ public void configureFaultInjectorProvider(IFaultInjectorProvider faultInjectorP } private IAddressResolver getAddressResolver(RxDocumentServiceRequest rxDocumentServiceRequest) { - LocationCache.ConsolidatedLocationEndpoints endpoint = this.endpointManager.resolveServiceEndpoint(rxDocumentServiceRequest); + LocationCache.ConsolidatedRegionalEndpoint endpoint = this.endpointManager.resolveServiceEndpoint(rxDocumentServiceRequest); return this.getOrAddEndpoint(endpoint.getGatewayLocationEndpoint()).addressResolver; } @@ -300,13 +300,13 @@ private EndpointCache getOrAddEndpoint(URI endpoint) { }); if (this.addressCacheByEndpoint.size() > this.maxEndpoints) { - List allConsolidatedEndpoints = new ArrayList<>(this.endpointManager.getWriteEndpoints()); + List allConsolidatedEndpoints = new ArrayList<>(this.endpointManager.getWriteEndpoints()); allConsolidatedEndpoints.addAll(this.endpointManager.getReadEndpoints()); Collections.reverse(allConsolidatedEndpoints); - LinkedList endpoints = new LinkedList<>(allConsolidatedEndpoints); + LinkedList endpoints = new LinkedList<>(allConsolidatedEndpoints); while (this.addressCacheByEndpoint.size() > this.maxEndpoints) { if (!endpoints.isEmpty()) { - LocationCache.ConsolidatedLocationEndpoints dequeueEndpoint = endpoints.pop(); + LocationCache.ConsolidatedRegionalEndpoint dequeueEndpoint = endpoints.pop(); if (this.addressCacheByEndpoint.get(dequeueEndpoint.getGatewayLocationEndpoint()) != null) { this.addressCacheByEndpoint.remove(dequeueEndpoint.getGatewayLocationEndpoint()); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java index 0b06fd71ecc04..d3a422bad013f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java @@ -19,7 +19,6 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; -import java.net.URI; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -229,7 +228,7 @@ private Mono> nextPage(RxDocumentServiceRequest request) { } private void handleCancellationExceptionForPartitionKeyRange(RxDocumentServiceRequest failedRequest) { - LocationCache.ConsolidatedLocationEndpoints firstContactedLocationEndpoint = diagnosticsAccessor.getFirstContactedLocationEndpoint(failedRequest.requestContext.cosmosDiagnostics); + LocationCache.ConsolidatedRegionalEndpoint firstContactedLocationEndpoint = diagnosticsAccessor.getFirstContactedLocationEndpoint(failedRequest.requestContext.cosmosDiagnostics); if (firstContactedLocationEndpoint != null) { this.globalPartitionEndpointManagerForCircuitBreaker.handleLocationExceptionForPartitionKeyRange(failedRequest, firstContactedLocationEndpoint); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java index 814e8c5ea4c08..6dbeab86e8873 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java @@ -46,7 +46,7 @@ public class LocationCache { private final boolean useMultipleWriteLocations; private final Object lockObject; private final Duration unavailableLocationsExpirationTime; - private final ConcurrentHashMap locationUnavailabilityInfoByEndpoint; + private final ConcurrentHashMap locationUnavailabilityInfoByEndpoint; private final ConnectionPolicy connectionPolicy; private DatabaseAccountLocationsInfo locationInfo; @@ -86,7 +86,7 @@ public LocationCache( * 2. Endpoint availability * @return */ - public UnmodifiableList getReadEndpoints() { + public UnmodifiableList getReadEndpoints() { if (this.locationUnavailabilityInfoByEndpoint.size() > 0 && unavailableLocationsExpirationTimePassed()) { this.updateLocationCache(); @@ -101,7 +101,7 @@ && unavailableLocationsExpirationTimePassed()) { * 2. Endpoint availability * @return */ - public UnmodifiableList getWriteEndpoints() { + public UnmodifiableList getWriteEndpoints() { if (this.locationUnavailabilityInfoByEndpoint.size() > 0 && unavailableLocationsExpirationTimePassed()) { this.updateLocationCache(); @@ -193,15 +193,15 @@ void onLocationPreferenceChanged(UnmodifiableList preferredLocations) { * @param request Request for which getEndpoint is to be resolved * @return Resolved getEndpoint */ - public ConsolidatedLocationEndpoints resolveServiceEndpoint(RxDocumentServiceRequest request) { + public ConsolidatedRegionalEndpoint resolveServiceEndpoint(RxDocumentServiceRequest request) { Objects.requireNonNull(request.requestContext, "RxDocumentServiceRequest.requestContext is required and cannot be null."); if (request.requestContext.locationEndpointToRoute != null) { - Objects.requireNonNull(request.requestContext.consolidatedLocationEndpointsToRoute); + Objects.requireNonNull(request.requestContext.consolidatedRegionalEndpointToRoute); - return request.requestContext.consolidatedLocationEndpointsToRoute; + return request.requestContext.consolidatedRegionalEndpointToRoute; } int locationIndex = Utils.getValueOrDefault(request.requestContext.locationIndexToRoute, 0); @@ -218,22 +218,22 @@ public ConsolidatedLocationEndpoints resolveServiceEndpoint(RxDocumentServiceReq String writeLocation = currentLocationInfo.availableWriteLocations.get(locationIndex); return currentLocationInfo.availableWriteEndpointsByLocation.get(writeLocation); } else { - return new ConsolidatedLocationEndpoints(this.defaultEndpoint, null); + return new ConsolidatedRegionalEndpoint(this.defaultEndpoint, null); } } else { - UnmodifiableList endpoints = + UnmodifiableList endpoints = request.getOperationType().isWriteOperation()? this.getApplicableWriteEndpoints(request) : this.getApplicableReadEndpoints(request); return endpoints.get(locationIndex % endpoints.size()); } } - public UnmodifiableList getApplicableWriteEndpoints(RxDocumentServiceRequest request) { + public UnmodifiableList getApplicableWriteEndpoints(RxDocumentServiceRequest request) { return this.getApplicableWriteEndpoints(request.requestContext.getExcludeRegions(), request.requestContext.getUnavailableRegionsForPartition()); } - public UnmodifiableList getApplicableWriteEndpoints(List excludedRegionsOnRequest, List unavailableRegionsForPartition) { + public UnmodifiableList getApplicableWriteEndpoints(List excludedRegionsOnRequest, List unavailableRegionsForPartition) { - UnmodifiableList writeEndpoints = this.getWriteEndpoints(); + UnmodifiableList writeEndpoints = this.getWriteEndpoints(); Supplier excludedRegionsSupplier = this.connectionPolicy.getExcludedRegionsSupplier(); List effectiveExcludedRegions = isExcludedRegionsSupplierConfigured(excludedRegionsSupplier) ? @@ -261,12 +261,12 @@ public UnmodifiableList getApplicableWriteEndpoin effectiveExcludedRegionsWithPartitionUnavailableRegions); } - public UnmodifiableList getApplicableReadEndpoints(RxDocumentServiceRequest request) { + public UnmodifiableList getApplicableReadEndpoints(RxDocumentServiceRequest request) { return this.getApplicableReadEndpoints(request.requestContext.getExcludeRegions(), request.requestContext.getUnavailableRegionsForPartition()); } - public UnmodifiableList getApplicableReadEndpoints(List excludedRegionsOnRequest, List unavailableRegionsForPartition) { - UnmodifiableList readEndpoints = this.getReadEndpoints(); + public UnmodifiableList getApplicableReadEndpoints(List excludedRegionsOnRequest, List unavailableRegionsForPartition) { + UnmodifiableList readEndpoints = this.getReadEndpoints(); Supplier excludedRegionsSupplier = this.connectionPolicy.getExcludedRegionsSupplier(); List effectiveExcludedRegions = isExcludedRegionsSupplierConfigured(excludedRegionsSupplier) ? @@ -294,14 +294,14 @@ public UnmodifiableList getApplicableReadEndpoint effectiveExcludedRegionsWithPartitionUnavailableRegions); } - private UnmodifiableList getApplicableEndpoints( - UnmodifiableList endpoints, - UnmodifiableMap regionNameByEndpoint, + private UnmodifiableList getApplicableEndpoints( + UnmodifiableList endpoints, + UnmodifiableMap regionNameByEndpoint, URI fallbackEndpoint, List excludeRegionList) { - List applicableEndpoints = new ArrayList<>(); - for (ConsolidatedLocationEndpoints endpoint : endpoints) { + List applicableEndpoints = new ArrayList<>(); + for (ConsolidatedRegionalEndpoint endpoint : endpoints) { Utils.ValueHolder regionName = new Utils.ValueHolder<>(); if (Utils.tryGetValue(regionNameByEndpoint, endpoint, regionName)) { if (!excludeRegionList.stream().anyMatch(regionName.v::equalsIgnoreCase)) { @@ -311,7 +311,7 @@ private UnmodifiableList getApplicableEndpoints( } if (applicableEndpoints.isEmpty()) { - applicableEndpoints.add(new ConsolidatedLocationEndpoints(fallbackEndpoint, null)); + applicableEndpoints.add(new ConsolidatedRegionalEndpoint(fallbackEndpoint, null)); } return new UnmodifiableList<>(applicableEndpoints); @@ -325,7 +325,7 @@ private boolean isExcludeRegionsConfigured(List excludedRegionsOnRequest } public URI resolveFaultInjectionEndpoint(String region, boolean writeOnly) { - Utils.ValueHolder endpointValueHolder = new Utils.ValueHolder<>(); + Utils.ValueHolder endpointValueHolder = new Utils.ValueHolder<>(); if (writeOnly) { Utils.tryGetValue(this.locationInfo.availableWriteEndpointsByLocation, region, endpointValueHolder); } else { @@ -357,7 +357,7 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBac if (this.enableEndpointDiscovery) { boolean shouldRefresh = this.useMultipleWriteLocations && !this.enableMultipleWriteLocations; - List readLocationEndpoints = currentLocationInfo.readEndpoints; + List readLocationEndpoints = currentLocationInfo.readEndpoints; if (this.isEndpointUnavailable(readLocationEndpoints.get(0), OperationType.Read)) { // Since most preferred read endpoint is unavailable, we can only refresh in background if // we have an alternate read endpoint @@ -370,7 +370,7 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBac } if (!Strings.isNullOrEmpty(mostPreferredLocation)) { - Utils.ValueHolder mostPreferredReadEndpointHolder = new Utils.ValueHolder<>(); + Utils.ValueHolder mostPreferredReadEndpointHolder = new Utils.ValueHolder<>(); logger.debug("getReadEndpoints [{}]", readLocationEndpoints); if (Utils.tryGetValue(currentLocationInfo.availableReadEndpointsByLocation, mostPreferredLocation, mostPreferredReadEndpointHolder)) { @@ -394,8 +394,8 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBac } } - Utils.ValueHolder mostPreferredWriteEndpointHolder = new Utils.ValueHolder<>(); - List writeLocationEndpoints = currentLocationInfo.writeEndpoints; + Utils.ValueHolder mostPreferredWriteEndpointHolder = new Utils.ValueHolder<>(); + List writeLocationEndpoints = currentLocationInfo.writeEndpoints; logger.debug("getWriteEndpoints [{}]", writeLocationEndpoints); if (!this.canUseMultipleWriteLocations()) { @@ -445,11 +445,11 @@ public boolean shouldRefreshEndpoints(Utils.ValueHolder canRefreshInBac public String getRegionName(URI locationEndpoint, com.azure.cosmos.implementation.OperationType operationType) { Utils.ValueHolder regionName = new Utils.ValueHolder<>(); if (operationType.isWriteOperation()) { - if (Utils.tryGetValue(this.locationInfo.regionNameByWriteEndpoint, new ConsolidatedLocationEndpoints(locationEndpoint, locationEndpoint), regionName)) { + if (Utils.tryGetValue(this.locationInfo.regionNameByWriteEndpoint, new ConsolidatedRegionalEndpoint(locationEndpoint, locationEndpoint), regionName)) { return regionName.v; } } else { - if (Utils.tryGetValue(this.locationInfo.regionNameByReadEndpoint, new ConsolidatedLocationEndpoints(locationEndpoint, locationEndpoint), regionName)) { + if (Utils.tryGetValue(this.locationInfo.regionNameByReadEndpoint, new ConsolidatedRegionalEndpoint(locationEndpoint, locationEndpoint), regionName)) { return regionName.v; } } @@ -458,15 +458,15 @@ public String getRegionName(URI locationEndpoint, com.azure.cosmos.implementatio return this.locationInfo.availableWriteLocations.get(0).toLowerCase(Locale.ROOT); } - private boolean areEqual(ConsolidatedLocationEndpoints url1, ConsolidatedLocationEndpoints url2) { + private boolean areEqual(ConsolidatedRegionalEndpoint url1, ConsolidatedRegionalEndpoint url2) { return url1.equals(url2); } private void clearStaleEndpointUnavailabilityInfo() { if (!this.locationUnavailabilityInfoByEndpoint.isEmpty()) { - List unavailableEndpoints = new ArrayList<>(this.locationUnavailabilityInfoByEndpoint.keySet()); + List unavailableEndpoints = new ArrayList<>(this.locationUnavailabilityInfoByEndpoint.keySet()); - for (ConsolidatedLocationEndpoints unavailableEndpoint: unavailableEndpoints) { + for (ConsolidatedRegionalEndpoint unavailableEndpoint: unavailableEndpoints) { Utils.ValueHolder unavailabilityInfoHolder = new Utils.ValueHolder<>(); Utils.ValueHolder removedHolder = new Utils.ValueHolder<>(); @@ -485,7 +485,7 @@ private void clearStaleEndpointUnavailabilityInfo() { } } - private boolean isEndpointUnavailable(ConsolidatedLocationEndpoints endpoint, OperationType expectedAvailableOperations) { + private boolean isEndpointUnavailable(ConsolidatedRegionalEndpoint endpoint, OperationType expectedAvailableOperations) { Utils.ValueHolder unavailabilityInfoHolder = new Utils.ValueHolder<>(); if (expectedAvailableOperations == OperationType.None @@ -506,10 +506,10 @@ private boolean isEndpointUnavailable(ConsolidatedLocationEndpoints endpoint, Op } } - private boolean anyEndpointsAvailable(List endpoints, OperationType expectedAvailableOperations) { + private boolean anyEndpointsAvailable(List endpoints, OperationType expectedAvailableOperations) { Utils.ValueHolder unavailabilityInfoHolder = new Utils.ValueHolder<>(); boolean anyEndpointsAvailable = false; - for (ConsolidatedLocationEndpoints endpoint : endpoints) { + for (ConsolidatedRegionalEndpoint endpoint : endpoints) { if (!isEndpointUnavailable(endpoint, expectedAvailableOperations)) { anyEndpointsAvailable = true; break; @@ -523,10 +523,10 @@ private void markEndpointUnavailable( OperationType unavailableOperationType) { Instant currentTime = Instant.now(); LocationUnavailabilityInfo updatedInfo = this.locationUnavailabilityInfoByEndpoint.compute( - new ConsolidatedLocationEndpoints(unavailableEndpoint, null), - new BiFunction() { + new ConsolidatedRegionalEndpoint(unavailableEndpoint, null), + new BiFunction() { @Override - public LocationUnavailabilityInfo apply(ConsolidatedLocationEndpoints url, LocationUnavailabilityInfo info) { + public LocationUnavailabilityInfo apply(ConsolidatedRegionalEndpoint url, LocationUnavailabilityInfo info) { if (info == null) { // not already present, add return new LocationUnavailabilityInfo(currentTime, unavailableOperationType); @@ -575,13 +575,13 @@ private void updateLocationCache( this.clearStaleEndpointUnavailabilityInfo(); Utils.ValueHolder> readValueHolder = Utils.ValueHolder.initialize(nextLocationInfo.availableReadLocations); - Utils.ValueHolder> readRegionMapValueHolder = Utils.ValueHolder.initialize(nextLocationInfo.regionNameByReadEndpoint); + Utils.ValueHolder> readRegionMapValueHolder = Utils.ValueHolder.initialize(nextLocationInfo.regionNameByReadEndpoint); nextLocationInfo.availableReadEndpointsByLocation = this.getEndpointsByLocation(gatewayReadLocations, thinclientReadLocations, readValueHolder, readRegionMapValueHolder); nextLocationInfo.availableReadLocations = readValueHolder.v; nextLocationInfo.regionNameByReadEndpoint = readRegionMapValueHolder.v; Utils.ValueHolder> writeValueHolder = Utils.ValueHolder.initialize(nextLocationInfo.availableWriteLocations); - Utils.ValueHolder> outWriteRegionMap = Utils.ValueHolder.initialize(nextLocationInfo.regionNameByWriteEndpoint); + Utils.ValueHolder> outWriteRegionMap = Utils.ValueHolder.initialize(nextLocationInfo.regionNameByWriteEndpoint); nextLocationInfo.availableWriteEndpointsByLocation = this.getEndpointsByLocation(gatewayWriteLocations, thinclientWriteLocations, writeValueHolder, outWriteRegionMap); nextLocationInfo.availableWriteLocations = writeValueHolder.v; nextLocationInfo.regionNameByWriteEndpoint = outWriteRegionMap.v; @@ -594,7 +594,7 @@ private void updateLocationCache( Utils.ValueHolder regionForDefaultEndpoint = new Utils.ValueHolder<>(); // only set effective preferred locations when default endpoint doesn't map to a regional endpoint - if (!Utils.tryGetValue(nextLocationInfo.regionNameByReadEndpoint, new ConsolidatedLocationEndpoints(this.defaultEndpoint, null), regionForDefaultEndpoint)) { + if (!Utils.tryGetValue(nextLocationInfo.regionNameByReadEndpoint, new ConsolidatedRegionalEndpoint(this.defaultEndpoint, null), regionForDefaultEndpoint)) { nextLocationInfo.effectivePreferredLocations = nextLocationInfo.availableReadLocations; } } @@ -607,16 +607,16 @@ private void updateLocationCache( } } - private UnmodifiableList getPreferredAvailableEndpoints(UnmodifiableMap endpointsByLocation, - UnmodifiableList orderedLocations, - OperationType expectedAvailableOperation, - URI fallbackEndpoint) { - List endpoints = new ArrayList<>(); + private UnmodifiableList getPreferredAvailableEndpoints(UnmodifiableMap endpointsByLocation, + UnmodifiableList orderedLocations, + OperationType expectedAvailableOperation, + URI fallbackEndpoint) { + List endpoints = new ArrayList<>(); DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; // if enableEndpointDiscovery is false, we always use the defaultEndpoint that user passed in during documentClient init if (this.enableEndpointDiscovery) { if (this.canUseMultipleWriteLocations() || expectedAvailableOperation.supports(OperationType.Read)) { - List unavailableEndpoints = new ArrayList<>(); + List unavailableEndpoints = new ArrayList<>(); // When client can not use multiple write locations, preferred locations list should only be used // determining read endpoints order. @@ -625,7 +625,7 @@ private UnmodifiableList getPreferredAvailableEnd if (currentLocationInfo.preferredLocations != null && !currentLocationInfo.preferredLocations.isEmpty()) { for (String location: currentLocationInfo.preferredLocations) { - Utils.ValueHolder endpoint = new Utils.ValueHolder<>(); + Utils.ValueHolder endpoint = new Utils.ValueHolder<>(); if (Utils.tryGetValue(endpointsByLocation, location, endpoint)) { if (this.isEndpointUnavailable(endpoint.v, expectedAvailableOperation)) { unavailableEndpoints.add(endpoint.v); @@ -636,7 +636,7 @@ private UnmodifiableList getPreferredAvailableEnd } } else { for (String location : orderedLocations) { - Utils.ValueHolder endpoint = Utils.ValueHolder.initialize(null); + Utils.ValueHolder endpoint = Utils.ValueHolder.initialize(null); if (Utils.tryGetValue(endpointsByLocation, location, endpoint)) { // if defaultEndpoint equals a regional endpoint then use @@ -656,14 +656,14 @@ private UnmodifiableList getPreferredAvailableEnd } if (endpoints.isEmpty()) { - endpoints.add(new ConsolidatedLocationEndpoints(fallbackEndpoint, null)); + endpoints.add(new ConsolidatedRegionalEndpoint(fallbackEndpoint, null)); } endpoints.addAll(unavailableEndpoints); } else { for (String location : orderedLocations) { - Utils.ValueHolder endpoint = Utils.ValueHolder.initialize(null); + Utils.ValueHolder endpoint = Utils.ValueHolder.initialize(null); if (!Strings.isNullOrEmpty(location) && // location is empty during manual failover Utils.tryGetValue(endpointsByLocation, location, endpoint)) { endpoints.add(endpoint.v); @@ -673,7 +673,7 @@ private UnmodifiableList getPreferredAvailableEnd } if (endpoints.isEmpty()) { - endpoints.add(new ConsolidatedLocationEndpoints(fallbackEndpoint, null)); + endpoints.add(new ConsolidatedRegionalEndpoint(fallbackEndpoint, null)); } return new UnmodifiableList<>(endpoints); @@ -682,8 +682,8 @@ private UnmodifiableList getPreferredAvailableEnd private void addEndpoints( Iterable gatewayDbAccountLocations, Iterable thinClientDbAccountLocations, - Map endpointsByLocation, - Map regionByEndpoint, + Map endpointsByLocation, + Map regionByEndpoint, List parsedLocations) { for (DatabaseAccountLocation gatewayDbAccountLocation : gatewayDbAccountLocations) { @@ -693,14 +693,14 @@ private void addEndpoints( String location = gatewayDbAccountLocation.getName().toLowerCase(Locale.ROOT); URI endpoint = new URI(gatewayDbAccountLocation.getEndpoint().toLowerCase(Locale.ROOT)); - ConsolidatedLocationEndpoints consolidatedLocationEndpoints = new ConsolidatedLocationEndpoints(endpoint, null); + ConsolidatedRegionalEndpoint consolidatedRegionalEndpoint = new ConsolidatedRegionalEndpoint(endpoint, null); if (!endpointsByLocation.containsKey(location)) { - endpointsByLocation.put(location, consolidatedLocationEndpoints); + endpointsByLocation.put(location, consolidatedRegionalEndpoint); } - if (!regionByEndpoint.containsKey(consolidatedLocationEndpoints)) { - regionByEndpoint.put(consolidatedLocationEndpoints, location); + if (!regionByEndpoint.containsKey(consolidatedRegionalEndpoint)) { + regionByEndpoint.put(consolidatedRegionalEndpoint, location); } parsedLocations.add(gatewayDbAccountLocation.getName()); @@ -718,13 +718,13 @@ private void addEndpoints( String location = thinClientDbAccountLocation.getName().toLowerCase(Locale.ROOT); URI endpoint = new URI(thinClientDbAccountLocation.getEndpoint().toLowerCase(Locale.ROOT)); - ConsolidatedLocationEndpoints consolidatedLocationEndpoints = endpointsByLocation.get(location); + ConsolidatedRegionalEndpoint consolidatedRegionalEndpoint = endpointsByLocation.get(location); - if (consolidatedLocationEndpoints == null) { + if (consolidatedRegionalEndpoint == null) { throw new IllegalStateException(String.format("Gateway location endpoint doesn't exist while thin client location endpoint exists for location %s", location)); } - consolidatedLocationEndpoints.thinClientLocationEndpoint = endpoint; + consolidatedRegionalEndpoint.thinClientLocationEndpoint = endpoint; } catch (Exception e) { logger.warn("GetAvailableEndpointsByLocation() - skipping add for location = [{}] as its location name is either empty or endpoint is malformed [{}]", @@ -735,20 +735,20 @@ private void addEndpoints( } } - private UnmodifiableMap getEndpointsByLocation(Iterable gatewayLocations, - Iterable thinClientLocations, - Utils.ValueHolder> orderedLocations, - Utils.ValueHolder> regionMap) { - Map endpointsByLocation = new CaseInsensitiveMap<>(); - Map regionByEndpoint = new CaseInsensitiveMap<>(); + private UnmodifiableMap getEndpointsByLocation(Iterable gatewayLocations, + Iterable thinClientLocations, + Utils.ValueHolder> orderedLocations, + Utils.ValueHolder> regionMap) { + Map endpointsByLocation = new CaseInsensitiveMap<>(); + Map regionByEndpoint = new CaseInsensitiveMap<>(); List parsedLocations = new ArrayList<>(); addEndpoints(gatewayLocations, thinClientLocations, endpointsByLocation, regionByEndpoint, parsedLocations); orderedLocations.v = new UnmodifiableList<>(parsedLocations); - regionMap.v = (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(regionByEndpoint); + regionMap.v = (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(regionByEndpoint); - return (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(endpointsByLocation); + return (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(endpointsByLocation); } public boolean canUseMultipleWriteLocations() { @@ -823,11 +823,11 @@ private static boolean isExcludedRegionsSupplierConfigured(Supplier writeEndpoints; - private UnmodifiableList readEndpoints; + private UnmodifiableList writeEndpoints; + private UnmodifiableList readEndpoints; private UnmodifiableList preferredLocations; private UnmodifiableList effectivePreferredLocations; // lower-case region private UnmodifiableList availableWriteLocations; // lower-case region private UnmodifiableList availableReadLocations; - private UnmodifiableMap availableWriteEndpointsByLocation; - private UnmodifiableMap availableReadEndpointsByLocation; - private UnmodifiableMap regionNameByWriteEndpoint; - private UnmodifiableMap regionNameByReadEndpoint; + private UnmodifiableMap availableWriteEndpointsByLocation; + private UnmodifiableMap availableReadEndpointsByLocation; + private UnmodifiableMap regionNameByWriteEndpoint; + private UnmodifiableMap regionNameByReadEndpoint; public DatabaseAccountLocationsInfo(List preferredLocations, URI defaultEndpoint) { this.preferredLocations = new UnmodifiableList<>(preferredLocations.stream().map(loc -> loc.toLowerCase(Locale.ROOT)).collect(Collectors.toList())); this.effectivePreferredLocations = new UnmodifiableList<>(Collections.emptyList()); this.availableWriteEndpointsByLocation - = (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(new CaseInsensitiveMap<>()); + = (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(new CaseInsensitiveMap<>()); this.availableReadEndpointsByLocation - = (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(new CaseInsensitiveMap<>()); + = (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(new CaseInsensitiveMap<>()); this.regionNameByWriteEndpoint - = (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(new CaseInsensitiveMap<>()); + = (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(new CaseInsensitiveMap<>()); this.regionNameByReadEndpoint - = (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(new CaseInsensitiveMap<>()); + = (UnmodifiableMap) UnmodifiableMap.unmodifiableMap(new CaseInsensitiveMap<>()); this.availableReadLocations = new UnmodifiableList<>(Collections.emptyList()); this.availableWriteLocations = new UnmodifiableList<>(Collections.emptyList()); - this.readEndpoints = new UnmodifiableList<>(Collections.singletonList(new ConsolidatedLocationEndpoints(defaultEndpoint, null))); - this.writeEndpoints = new UnmodifiableList<>(Collections.singletonList(new ConsolidatedLocationEndpoints(defaultEndpoint, null))); + this.readEndpoints = new UnmodifiableList<>(Collections.singletonList(new ConsolidatedRegionalEndpoint(defaultEndpoint, null))); + this.writeEndpoints = new UnmodifiableList<>(Collections.singletonList(new ConsolidatedRegionalEndpoint(defaultEndpoint, null))); } public DatabaseAccountLocationsInfo(DatabaseAccountLocationsInfo other) {