Skip to content

Commit

Permalink
FixForEmptyStoreResponseStatisticsList (#18182)
Browse files Browse the repository at this point in the history
* fix for empty store response statistics

Co-authored-by: Annie Liang <xinlian@microsoft.com>
Co-authored-by: annie-mac <annie-mac@s-pkges.redmond.corp.microsoft.com>
  • Loading branch information
3 people authored Jan 11, 2021
1 parent 0263490 commit 7f57596
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,7 @@ Mono<StoreResponse> writePrivateAsync(
try {
Throwable unwrappedException = Exceptions.unwrap(t);
CosmosException ex = Utils.as(unwrappedException, CosmosException.class);
try {
BridgeInternal.recordResponse(request.requestContext.cosmosDiagnostics, request,
storeReader.createStoreResult(null, ex, false, false, primaryUri));
} catch (Exception e) {
logger.error("Error occurred while recording response", e);
}
storeReader.createAndRecordStoreResult(request, null, ex, false, false, primaryUri);
String value = ex.getResponseHeaders().get(HttpConstants.HttpHeaders.WRITE_REQUEST_TRIGGER_ADDRESS_REFRESH);
if (!Strings.isNullOrWhiteSpace(value)) {
Integer result = Integers.tryParse(value);
Expand All @@ -200,12 +195,7 @@ Mono<StoreResponse> writePrivateAsync(
);

}).flatMap(response -> {
try {
BridgeInternal.recordResponse(request.requestContext.cosmosDiagnostics, request,
storeReader.createStoreResult(response, null, false, false, primaryURI.get()));
} catch (Exception e) {
logger.error("Error occurred while recording response", e);
}
storeReader.createAndRecordStoreResult(request, response, null, false, false, primaryURI.get());
return barrierForGlobalStrong(request, response);
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ private Flux<StoreResult> toStoreResult(RxDocumentServiceRequest request,
return storeRespAndURI.getLeft()
.flatMap(storeResponse -> {
try {
StoreResult storeResult = this.createStoreResult(
StoreResult storeResult = this.createAndRecordStoreResult(
request,
storeResponse,
null, requiresValidLsn,
readMode != ReadMode.Strong,
Expand All @@ -167,11 +168,12 @@ private Flux<StoreResult> toStoreResult(RxDocumentServiceRequest request,
}

// Exception storeException = readTask.Exception != null ? readTask.Exception.InnerException : null;
StoreResult storeResult = this.createStoreResult(
StoreResult storeResult = this.createAndRecordStoreResult(
request,
null,
storeException, requiresValidLsn,
readMode != ReadMode.Strong,
null);
storeRespAndURI.getRight());
if (storeException instanceof TransportException) {
BridgeInternal.getFailedReplicas(request.requestContext.cosmosDiagnostics).add(storeRespAndURI.getRight().getURI());
}
Expand Down Expand Up @@ -245,13 +247,6 @@ private Flux<List<StoreResult>> readFromReplicas(List<StoreResult> resultCollect
return Mono.error(e);
}).map(newStoreResults -> {
for (StoreResult srr : newStoreResults) {

entity.requestContext.requestChargeTracker.addCharge(srr.requestCharge);
try {
BridgeInternal.recordResponse(entity.requestContext.cosmosDiagnostics, entity, srr);
} catch (Exception e) {
logger.error("Unexpected failure while recording response", e);
}
if (srr.isValid) {

try {
Expand Down Expand Up @@ -522,11 +517,14 @@ private Mono<ReadReplicaResult> readPrimaryInternalAsync(
storeResponse -> {

try {
StoreResult storeResult = this.createStoreResult(
storeResponse != null ? storeResponse : null,
null, requiresValidLsn,
true,
storeResponse != null ? storeResponseObsAndUri.getRight() : null);
StoreResult storeResult = this.createAndRecordStoreResult(
entity,
storeResponse != null ? storeResponse : null,
null,
requiresValidLsn,
true,
storeResponse != null ? storeResponseObsAndUri.getRight() : null);

return Mono.just(storeResult);
} catch (CosmosException e) {
return Mono.error(e);
Expand All @@ -551,9 +549,11 @@ private Mono<ReadReplicaResult> readPrimaryInternalAsync(
}

try {
StoreResult storeResult = this.createStoreResult(
StoreResult storeResult = this.createAndRecordStoreResult(
entity,
null,
storeTaskException, requiresValidLsn,
storeTaskException,
requiresValidLsn,
true,
null);
return Mono.just(storeResult);
Expand All @@ -564,13 +564,6 @@ private Mono<ReadReplicaResult> readPrimaryInternalAsync(
});

return storeResultObs.map(storeResult -> {
try {
BridgeInternal.recordResponse(entity.requestContext.cosmosDiagnostics, entity, storeResult);
} catch (Exception e) {
logger.error("Unexpected failure while recording response", e);
}
entity.requestContext.requestChargeTracker.addCharge(storeResult.requestCharge);

if (storeResult.isGoneException && !storeResult.isInvalidPartitionException) {
return new ReadReplicaResult(true, Collections.emptyList());
}
Expand Down Expand Up @@ -656,6 +649,32 @@ private static Mono<StoreResponse> completeActivity(Mono<StoreResponse> task, Ob
return task;
}

StoreResult createAndRecordStoreResult(
RxDocumentServiceRequest request,
StoreResponse storeResponse,
Exception responseException,
boolean requiresValidLsn,
boolean useLocalLSNBasedHeaders,
Uri storePhysicalAddress) {

StoreResult storeResult = this.createStoreResult(storeResponse, responseException, requiresValidLsn, useLocalLSNBasedHeaders, storePhysicalAddress);

try {
BridgeInternal.recordResponse(request.requestContext.cosmosDiagnostics, request, storeResult);
if (request.requestContext.requestChargeTracker != null) {
request.requestContext.requestChargeTracker.addCharge(storeResult.requestCharge);
}
} catch (Exception e){
logger.error("Unexpected failure while recording response", e);
}

if (responseException !=null) {
verifyCanContinueOnException(storeResult.getException());
}

return storeResult;
}

StoreResult createStoreResult(StoreResponse storeResponse,
Exception responseException,
boolean requiresValidLsn,
Expand Down Expand Up @@ -736,7 +755,6 @@ StoreResult createStoreResult(StoreResponse storeResponse,
Throwable unwrappedResponseExceptions = Exceptions.unwrap(responseException);
CosmosException cosmosException = Utils.as(unwrappedResponseExceptions, CosmosException.class);
if (cosmosException != null) {
StoreReader.verifyCanContinueOnException(cosmosException);
long quorumAckedLSN = -1;
int currentReplicaSetSize = -1;
int currentWriteQuorum = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.FailureValidator;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.ISessionContainer;
import com.azure.cosmos.implementation.PartitionIsMigratingException;
Expand Down Expand Up @@ -60,6 +61,17 @@ public Object[][] exceptionArgProvider() {
};
}

@DataProvider(name = "storeResponseArgProvider")
public Object[][] storeResponseArgProvider() {
return new Object[][]{
{ new PartitionKeyRangeGoneException(), null, },
{ new PartitionKeyRangeIsSplittingException() , null, },
{ new PartitionIsMigratingException(), null, },
{ new GoneException(), null, },
{ null, Mockito.mock(StoreResponse.class), }
};
}

@Test(groups = "unit", dataProvider = "exceptionArgProvider")
public void exception(Exception ex, Class<Exception> klass, int expectedStatusCode, Integer expectedSubStatusCode) {
TransportClientWrapper transportClientWrapper = new TransportClientWrapper.Builder.ReplicaResponseBuilder
Expand Down Expand Up @@ -193,6 +205,53 @@ public void timeout2() throws Exception {
subscriber.assertError(RequestTimeoutException.class);
}

@Test(groups = "unit", dataProvider = "storeResponseArgProvider")
public void storeResponseRecordedOnException(Exception ex, StoreResponse storeResponse) {
DiagnosticsClientContext clientContext = mockDiagnosticsClientContext();
TransportClientWrapper transportClientWrapper;

if (ex != null) {
transportClientWrapper = new TransportClientWrapper.Builder.ReplicaResponseBuilder
.SequentialBuilder()
.then(ex)
.build();
} else {
transportClientWrapper = new TransportClientWrapper.Builder.ReplicaResponseBuilder
.SequentialBuilder()
.then(storeResponse)
.build();
}

Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");

AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
.withSecondary(ImmutableList.of(secondaryUri1, secondaryUri2, secondaryUri3))
.build();
sessionContainer = Mockito.mock(ISessionContainer.class);
IAuthorizationTokenProvider authorizationTokenProvider = Mockito.mock(IAuthorizationTokenProvider.class);
serviceConfigReader = Mockito.mock(GatewayServiceConfigurationReader.class);

consistencyWriter = new ConsistencyWriter(clientContext,
addressSelectorWrapper.addressSelector,
sessionContainer,
transportClientWrapper.transportClient,
authorizationTokenProvider,
serviceConfigReader,
false);

TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class);
RxDocumentServiceRequest dsr = mockDocumentServiceRequest(clientContext);

consistencyWriter.writeAsync(dsr, timeoutHelper, false).subscribe();

String cosmosDiagnostics = dsr.requestContext.cosmosDiagnostics.toString();
assertThat(cosmosDiagnostics).containsOnlyOnce("storeResult");
}

@DataProvider(name = "globalStrongArgProvider")
public Object[][] globalStrongArgProvider() {
return new Object[][]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.PartitionIsMigratingException;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.RequestRateTooLargeException;
import com.azure.cosmos.implementation.DocumentServiceRequestContext;
import com.azure.cosmos.implementation.FailureValidator;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.ISessionContainer;
import com.azure.cosmos.implementation.ISessionToken;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PartitionIsMigratingException;
import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.RequestRateTooLargeException;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.StoreResponseBuilder;
Expand All @@ -38,13 +38,16 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.azure.cosmos.implementation.HttpConstants.StatusCodes.GONE;
import static com.azure.cosmos.implementation.HttpConstants.SubStatusCodes.COMPLETING_PARTITION_MIGRATION;
import static com.azure.cosmos.implementation.HttpConstants.SubStatusCodes.COMPLETING_SPLIT;
import static com.azure.cosmos.implementation.HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
import static com.azure.cosmos.implementation.TestUtils.mockDocumentServiceRequest;
import static org.assertj.core.api.Assertions.assertThat;
import static com.azure.cosmos.implementation.TestUtils.*;

public class StoreReaderTest {
private final static DiagnosticsClientContext clientContext = mockDiagnosticsClientContext();
Expand Down Expand Up @@ -132,6 +135,17 @@ public Object[][] exceptionArgProvider() {
};
}

@DataProvider(name = "storeResponseArgProvider")
public Object[][] storeResponseArgProvider() {
return new Object[][]{
{ new PartitionKeyRangeGoneException(), null, },
{ new PartitionKeyRangeIsSplittingException() , null, },
{ new PartitionIsMigratingException(), null, },
{ new GoneException(), null, },
{ null, Mockito.mock(StoreResponse.class), }
};
}

@Test(groups = "unit", dataProvider = "exceptionArgProvider")
public void exception(Exception ex, Class<Exception> klass, int expectedStatusCode, Integer expectedSubStatusCode) {
TransportClientWrapper transportClientWrapper = new TransportClientWrapper.Builder.ReplicaResponseBuilder
Expand Down Expand Up @@ -749,6 +763,57 @@ public void readMultipleReplicasAsync(boolean includePrimary, int replicaCountTo
.verifyTotalInvocations(1);
}

@Test(groups = "unit", dataProvider = "storeResponseArgProvider")
public void storeResponseRecordedOnException(Exception ex, StoreResponse storeResponse) {
TransportClientWrapper transportClientWrapper;

if (ex != null) {
transportClientWrapper = new TransportClientWrapper.Builder.ReplicaResponseBuilder
.SequentialBuilder()
.then(ex)
.then(ex)
.then(ex)
.then(ex)
.build();
} else {
transportClientWrapper = new TransportClientWrapper.Builder.ReplicaResponseBuilder
.SequentialBuilder()
.then(storeResponse)
.then(storeResponse)
.then(storeResponse)
.then(storeResponse)
.build();
}

Uri primaryUri = Uri.create("primary");
Uri secondaryUri1 = Uri.create("secondary1");
Uri secondaryUri2 = Uri.create("secondary2");
Uri secondaryUri3 = Uri.create("secondary3");

AddressSelectorWrapper addressSelectorWrapper = AddressSelectorWrapper.Builder.Simple.create()
.withPrimary(primaryUri)
.withSecondary(ImmutableList.of(secondaryUri1, secondaryUri2, secondaryUri3))
.build();
ISessionContainer sessionContainer = Mockito.mock(ISessionContainer.class);
StoreReader storeReader = new StoreReader(transportClientWrapper.transportClient, addressSelectorWrapper.addressSelector, sessionContainer);

TimeoutHelper timeoutHelper = Mockito.mock(TimeoutHelper.class);
RxDocumentServiceRequest dsr = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(),
OperationType.Read, "/dbs/db/colls/col/docs/docId", ResourceType.Document);
dsr.requestContext = new DocumentServiceRequestContext();
dsr.requestContext.timeoutHelper = timeoutHelper;
dsr.requestContext.resolvedPartitionKeyRange = partitionKeyRangeWithId("1");

try {
storeReader.readMultipleReplicaAsync(dsr, true, 3, true, true, ReadMode.Strong).subscribe();
} catch (Exception e) {
// catch any exceptions here
}

String cosmosDiagnostics = dsr.requestContext.cosmosDiagnostics.toString();
assertThat(this.getMatchingElementCount(cosmosDiagnostics, "storeResult") >= 1).isTrue();
}

public static void validateSuccess(Mono<List<StoreResult>> single,
MultiStoreResultValidator validator) {
validateSuccess(single, validator, 10000);
Expand Down Expand Up @@ -800,6 +865,18 @@ public static <T> void validateException(Mono<T> single,
validateException(single, validator, TIMEOUT);
}

private int getMatchingElementCount(String cosmosDiagnostics, String regex) {
Pattern storeResultPattern = Pattern.compile(regex);
Matcher matcher = storeResultPattern.matcher(cosmosDiagnostics);

int count = 0;
while (matcher.find()) {
count++;
}

return count;
}

private PartitionKeyRange partitionKeyRangeWithId(String id) {
PartitionKeyRange partitionKeyRange = Mockito.mock(PartitionKeyRange.class);
Mockito.doReturn(id).when(partitionKeyRange).getId();
Expand Down

0 comments on commit 7f57596

Please sign in to comment.