Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue where CosmosDiagnosticsContext is null when diagnostics are sampled out. #39352

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
Expand Down Expand Up @@ -132,6 +133,7 @@ public void onlyDefaultLogger() {
CosmosContainer container = this.getContainer(builder);
executeTestCase(container);


// no assertions here - invocations for diagnostics handler are validated above
// log4j event logging isn't validated in general in unit tests because it is too brittle to do so
// with custom appender
Expand All @@ -152,8 +154,33 @@ public void onlyLoggerWithCustomConfig() {
.diagnosticsHandler(CosmosDiagnosticsHandler.DEFAULT_LOGGING_HANDLER)
);
CosmosContainer container = this.getContainer(builder);
executeTestCase(container);
CosmosItemResponse<ObjectNode> response = executeTestCase(container);
assertThat(response.getDiagnostics()).isNotNull();
assertThat(response.getDiagnostics().getDiagnosticsContext()).isNotNull();
// no assertions here - invocations for diagnostics handler are validated above
// log4j event logging isn't validated in general in unit tests because it is too brittle to do so
// with custom appender
}

@Test(groups = { "fast", "emulator" }, timeOut = TIMEOUT)
public void onlyLoggerAlwaysSampledOut() {
CosmosClientBuilder builder = this
.getClientBuilder()
.clientTelemetryConfig(
new CosmosClientTelemetryConfig()
.diagnosticsThresholds(
new CosmosDiagnosticsThresholds()
.setPointOperationLatencyThreshold(Duration.ofMillis(100))
.setNonPointOperationLatencyThreshold(Duration.ofMillis(2000))
.setRequestChargeThreshold(100)
)
.diagnosticsHandler(CosmosDiagnosticsHandler.DEFAULT_LOGGING_HANDLER)
.sampleDiagnostics(0)
);
CosmosContainer container = this.getContainer(builder);
CosmosItemResponse<ObjectNode> response = executeTestCase(container);
assertThat(response.getDiagnostics()).isNotNull();
assertThat(response.getDiagnostics().getDiagnosticsContext()).isNotNull();
// no assertions here - invocations for diagnostics handler are validated above
// log4j event logging isn't validated in general in unit tests because it is too brittle to do so
// with custom appender
Expand All @@ -177,10 +204,46 @@ public void onlyCustomLoggerWithCustomConfig() {
.diagnosticsHandler(capturingLogger)
);
CosmosContainer container = this.getContainer(builder);
executeTestCase(container);
String id = UUID.randomUUID().toString();
ObjectNode doc = getDocumentDefinition(id);

CosmosDiagnostics diagnostics = executeDocumentOperation(container, OperationType.Create, id, doc);
assertThat(diagnostics).isNotNull();
assertThat(diagnostics.getDiagnosticsContext()).isNotNull();

diagnostics = executeDocumentOperation(container, OperationType.Query, id, doc);
assertThat(diagnostics).isNotNull();
assertThat(diagnostics.getDiagnosticsContext()).isNotNull();

assertThat(capturingLogger.getLoggedMessages()).isNotNull();
assertThat(capturingLogger.getLoggedMessages()).hasSize(1);
assertThat(capturingLogger.getLoggedMessages()).hasSize(2);
}

@Test(groups = { "fast", "emulator" }, timeOut = TIMEOUT)
public void onlyCustomLoggerAlwaysSampledOut() {

CapturingLogger capturingLogger = new CapturingLogger();

CosmosClientBuilder builder = this
.getClientBuilder()
.clientTelemetryConfig(
new CosmosClientTelemetryConfig()
.diagnosticsThresholds(
new CosmosDiagnosticsThresholds()
.setPointOperationLatencyThreshold(Duration.ofMillis(100))
.setNonPointOperationLatencyThreshold(Duration.ofMillis(2000))
.setRequestChargeThreshold(100)
)
.diagnosticsHandler(capturingLogger)
.sampleDiagnostics(0)
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
);
CosmosContainer container = this.getContainer(builder);
CosmosItemResponse<ObjectNode> response = executeTestCase(container);

assertThat(response.getDiagnostics()).isNotNull();
assertThat(response.getDiagnostics().getDiagnosticsContext()).isNotNull();
assertThat(capturingLogger.getLoggedMessages()).isNotNull();
assertThat(capturingLogger.getLoggedMessages()).hasSize(0);
}

@Test(groups = { "fast", "emulator" }, timeOut = TIMEOUT)
Expand Down Expand Up @@ -368,14 +431,16 @@ public void run() {
assertThat(cosmosDiagnosticsNode.get("jvmFatalErrorMapperExecutionCount").asLong()).isGreaterThan(0);
}

private void executeTestCase(CosmosContainer container) {
private CosmosItemResponse<ObjectNode> executeTestCase(CosmosContainer container) {
String id = UUID.randomUUID().toString();
CosmosItemResponse<ObjectNode> response = container.createItem(
getDocumentDefinition(id),
new PartitionKey(id),
null);
new CosmosItemRequestOptions());
assertThat(response).isNotNull();
assertThat(response.getStatusCode()).isEqualTo(201);

return response;
}

private ObjectNode getDocumentDefinition(String documentId) {
Expand Down Expand Up @@ -407,47 +472,58 @@ private CosmosContainer getContainer(CosmosClientBuilder builder) {
return this.client.getDatabase(asyncContainer.getDatabase().getId()).getContainer(asyncContainer.getId());
}

private void executeDocumentOperation(
private CosmosDiagnostics executeDocumentOperation(
CosmosContainer cosmosContainer,
OperationType operationType,
String createdItemId,
ObjectNode createdItem) {

final AtomicReference<CosmosDiagnostics> diagnostics = new AtomicReference<>(null);

switch (operationType) {
case Query:
String query = String.format("SELECT * from c");
CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions();
queryRequestOptions.setFeedRange(FeedRange.forLogicalPartition(new PartitionKey(createdItemId)));
Iterable<FeedResponse<JsonNode>> results = cosmosContainer.queryItems(query, queryRequestOptions, JsonNode.class).iterableByPage();
results.forEach(t -> {});
Iterable<FeedResponse<JsonNode>> queryResults = cosmosContainer.queryItems(query, queryRequestOptions, JsonNode.class).iterableByPage();
queryResults.forEach(t -> diagnostics.set(t.getCosmosDiagnostics()));
break;
case ReadFeed:
CosmosChangeFeedRequestOptions changeFeedRequestOptions = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
cosmosContainer.queryChangeFeed(changeFeedRequestOptions, JsonNode.class).iterableByPage();
Iterable<FeedResponse<JsonNode>> changeFeedResults =
cosmosContainer.queryChangeFeed(changeFeedRequestOptions, JsonNode.class).iterableByPage();
changeFeedResults.forEach(t -> diagnostics.set(t.getCosmosDiagnostics()));
break;
case Read:
cosmosContainer
CosmosItemResponse<JsonNode> readResponse = cosmosContainer
.readItem(createdItemId, new PartitionKey(createdItemId), JsonNode.class);
diagnostics.set(readResponse.getDiagnostics());
break;
case Replace:
cosmosContainer
CosmosItemResponse<JsonNode> replaceResponse = cosmosContainer
.replaceItem(createdItem, createdItemId, new PartitionKey(createdItemId), new CosmosItemRequestOptions());
diagnostics.set(replaceResponse.getDiagnostics());
break;
case Delete:
try {
cosmosContainer.deleteItem(getDocumentDefinition(UUID.randomUUID().toString()), new CosmosItemRequestOptions());
CosmosItemResponse<Object> deleteResponse = cosmosContainer.deleteItem(getDocumentDefinition(UUID.randomUUID().toString()), new CosmosItemRequestOptions());
diagnostics.set(deleteResponse.getDiagnostics());
} catch (CosmosException e) {
if (!Exceptions.isNotFound(e)) {
throw e;
}
}
break;
case Create:
cosmosContainer.createItem(getDocumentDefinition(UUID.randomUUID().toString()));
CosmosItemResponse<JsonNode> createResponse = cosmosContainer.createItem(getDocumentDefinition(UUID.randomUUID().toString()));
diagnostics.set(createResponse.getDiagnostics());
break;
default:
throw new IllegalArgumentException("The operation type is not supported");
}

return diagnostics.get();
}

private static class CapturingDiagnosticsHandler implements CosmosDiagnosticsHandler {
Expand Down
5 changes: 3 additions & 2 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

#### Bugs Fixed
* Suppress exceptions when calling diagnostics handlers. - See [PR 39077](https://github.com/Azure/azure-sdk-for-java/pull/39077)
* Fixed an issue where no cross region retry for write operations due to channel acquisition timeout - See [PR 39255](https://github.com/Azure/azure-sdk-for-java/pull/39255)

* Fixed an issue where no cross region retry for write operations due to channel acquisition timeout. - See [PR 39255](https://github.com/Azure/azure-sdk-for-java/pull/39255)
* Fixed incorrect container tag value in metrics. - See [PR 39322](https://github.com/Azure/azure-sdk-for-java/pull/39322)
* Fixed issue where CosmosDiagnosticsContext is null when diagnostics are sampled out. - See [PR 39352](https://github.com/Azure/azure-sdk-for-java/pull/39352)
#### Other Changes
* Only call System.exit in `DiagnosticsProvider` for `Error` scenario. Also add `System.err` for `Error` cases. - See [PR 39077](https://github.com/Azure/azure-sdk-for-java/pull/39077)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,12 @@ private void recordFeedResponseConsumerLatencyCore(
}

private void handleDiagnostics(Context context, CosmosDiagnosticsContext cosmosCtx) {

final double samplingRateSnapshot = clientTelemetryConfigAccessor.getSamplingRate(this.telemetryConfig);
if (this.shouldSampleOutOperation(samplingRateSnapshot)) {
return;
}

// @TODO - investigate whether we should push the handling of diagnostics out of the hot path
// currently diagnostics are handled by the same thread on the hot path - which is intentional
// because any async queueing/throttling/sampling can best be done by diagnostic handlers
Expand Down Expand Up @@ -674,10 +680,6 @@ private <T> Mono<T> diagnosticsEnabledPublisher(
ctxAccessor.setSamplingRateSnapshot(cosmosCtx, samplingRateSnapshot);
}

if (shouldSampleOutOperation(samplingRateSnapshot)) {
return resultPublisher;
}

Optional<Object> callDepth = context.getData(COSMOS_CALL_DEPTH);
final boolean isNestedCall = callDepth.isPresent();
if (isNestedCall) {
Expand Down Expand Up @@ -786,7 +788,7 @@ private void end(

checkNotNull(cosmosCtx, "Argument 'cosmosCtx' must not be null.");

// endOperation can be called form two places in Reactor - making sure we process completion only once
// endOperation can be called from two places in Reactor - making sure we process completion only once
if (ctxAccessor.endOperation(
cosmosCtx,
statusCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ private Flux<FeedResponse<T>> wrapWithTracingIfEnabled(CosmosPagedFluxOptions p
FeedOperationState state = pagedFluxOptions.getFeedOperationState();
DiagnosticsProvider tracerProvider = state != null ? state.getDiagnosticsProvider() : null;
Object lockHolder = new Object();
if (tracerProvider == null ||
!tracerProvider.isEnabled()
|| tracerProvider.shouldSampleOutOperation(pagedFluxOptions)) {
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
if (tracerProvider == null || !tracerProvider.isEnabled()) {

return publisher
.doOnEach(signal -> {
Expand Down
Loading