Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventHubs metrics #31024

Merged
merged 5 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ class OpenTelemetryAttributes implements TelemetryAttributes {
private static Map<String, String> getMappings() {
Map<String, String> mappings = new HashMap<>();
// messaging mapping, attributes are defined in com.azure.core.amqp.implementation.ClientConstants
mappings.put("status", "otel.status_code");
mappings.put("entityName", "messaging.destination");
mappings.put("entityPath", "messaging.az.entity_path");
mappings.put("hostName", "net.peer.name");
mappings.put("errorCondition", "amqp.error_condition");
mappings.put("amqpStatusCode", "amqp.status_code");
mappings.put("amqpOperation", "amqp.operation");
mappings.put("deliveryState", "amqp.delivery_state");
mappings.put("partitionId", "messaging.eventhubs.partition_id");
mappings.put("consumerGroup", "messaging.eventhubs.consumer_group");

return Collections.unmodifiableMap(mappings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,15 @@ public void attributeMappings() {
put("deliveryState", "rejected");
put("amqpStatusCode", "no_content");
put("amqpOperation", "peek");
put("partitionId", 42);
put("status", "error");
put("consumerGroup", "$Default");
}});

assertEquals(OpenTelemetryAttributes.class, attributeCollection.getClass());
Attributes attributes = ((OpenTelemetryAttributes) attributeCollection).get();

assertEquals(8, attributes.size());
assertEquals(11, attributes.size());
assertEquals("value", attributes.get(AttributeKey.stringKey("foobar")));
assertEquals("host", attributes.get(AttributeKey.stringKey("net.peer.name")));
assertEquals("entity", attributes.get(AttributeKey.stringKey("messaging.destination")));
Expand All @@ -83,6 +86,9 @@ public void attributeMappings() {
assertEquals("rejected", attributes.get(AttributeKey.stringKey("amqp.delivery_state")));
assertEquals("peek", attributes.get(AttributeKey.stringKey("amqp.operation")));
assertEquals("no_content", attributes.get(AttributeKey.stringKey("amqp.status_code")));
assertEquals(42, attributes.get(AttributeKey.longKey("messaging.eventhubs.partition_id")));
assertEquals("error", attributes.get(AttributeKey.stringKey("otel.status_code")));
assertEquals("$Default", attributes.get(AttributeKey.stringKey("messaging.eventhubs.consumer_group")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-test</artifactId>
<version>1.12.0</version> <!-- {x-version-update;com.azure:azure-core-test;dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,32 @@
package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.core.http.rest.Response;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.MeterProvider;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.ListBlobsOptions;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import static java.nio.charset.StandardCharsets.UTF_8;

Expand Down Expand Up @@ -66,6 +68,7 @@ public class BlobCheckpointStore implements CheckpointStore {
private static final ClientLogger LOGGER = new ClientLogger(BlobCheckpointStore.class);

private final BlobContainerAsyncClient blobContainerAsyncClient;
private final MetricsHelper metricsHelper;
private final Map<String, BlobAsyncClient> blobClients = new ConcurrentHashMap<>();

/**
Expand All @@ -75,7 +78,20 @@ public class BlobCheckpointStore implements CheckpointStore {
* blobs in the storage container.
*/
public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient) {
this(blobContainerAsyncClient, null);
}


/**
* Creates an instance of BlobCheckpointStore.
*
* @param blobContainerAsyncClient The {@link BlobContainerAsyncClient} this instance will use to read and update
* @param options The {@link ClientOptions} to configure this instance.
* blobs in the storage container.
*/
public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient, ClientOptions options) {
this.blobContainerAsyncClient = blobContainerAsyncClient;
this.metricsHelper = new MetricsHelper(options == null ? null : options.getMetricsOptions(), MeterProvider.getDefaultProvider());
}

/**
Expand Down Expand Up @@ -256,6 +272,11 @@ public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
return blobAsyncClient.getBlockBlobAsyncClient().uploadWithResponse(Flux.just(UPLOAD_DATA), 0, null,
metadata, null, null, null).then();
}
})
.doOnEach(signal -> {
if (signal.isOnComplete() || signal.isOnError()) {
metricsHelper.reportCheckpoint(checkpoint, blobName, !signal.hasError());
}
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.MetricsOptions;
import com.azure.core.util.TelemetryAttributes;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.metrics.LongCounter;
import com.azure.core.util.metrics.LongGauge;
import com.azure.core.util.metrics.Meter;
import com.azure.core.util.metrics.MeterProvider;
import com.azure.messaging.eventhubs.models.Checkpoint;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

final class MetricsHelper {
private static final ClientLogger LOGGER = new ClientLogger(MetricsHelper.class);

// Make sure attribute names are consistent across AMQP Core, EventHubs, ServiceBus when applicable
// and mapped correctly in OTel Metrics https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-metrics-opentelemetry/src/main/java/com/azure/core/metrics/opentelemetry/OpenTelemetryAttributes.java
private static final String ENTITY_NAME_KEY = "entityName";
private static final String HOSTNAME_KEY = "hostName";
private static final String PARTITION_ID_KEY = "partitionId";
private static final String CONSUMER_GROUP_KEY = "consumerGroup";
private static final String STATUS_KEY = "status";

// since checkpoint store is stateless it might be used for endless number of eventhubs.
// we'll have as many subscriptions as there are combinations of fqdn, eventhub name, partitionId and consumer group.
// In the unlikely case it's shared across a lot of EH client instances, metrics would be too costly
// and unhelpful. So, let's just set a hard limit on number of subscriptions.
private static final int MAX_ATTRIBUTES_SETS = 100;

private static final String PROPERTIES_FILE = "azure-messaging-eventhubs-checkpointstore-blob.properties";
private static final String NAME_KEY = "name";
private static final String VERSION_KEY = "version";
private static final String LIBRARY_NAME;
private static final String LIBRARY_VERSION;
private static final String UNKNOWN = "UNKNOWN";

static {
final Map<String, String> properties = CoreUtils.getProperties(PROPERTIES_FILE);
LIBRARY_NAME = properties.getOrDefault(NAME_KEY, UNKNOWN);
LIBRARY_VERSION = properties.getOrDefault(VERSION_KEY, UNKNOWN);
}

private final ConcurrentHashMap<String, TelemetryAttributes> common = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, TelemetryAttributes> checkpointFailure = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, TelemetryAttributes> checkpointSuccess = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CurrentValue> seqNoSubscriptions = new ConcurrentHashMap<>();

private volatile boolean maxCapacityReached = false;

private final Meter meter;
private final LongGauge lastSequenceNumber;
private final LongCounter checkpointCounter;
private final boolean isEnabled;

MetricsHelper(MetricsOptions metricsOptions, MeterProvider meterProvider) {
if (areMetricsEnabled(metricsOptions)) {
this.meter = meterProvider.createMeter(LIBRARY_NAME, LIBRARY_VERSION, metricsOptions);
this.isEnabled = this.meter.isEnabled();
} else {
this.meter = null;
this.isEnabled = false;
}

if (isEnabled) {
this.lastSequenceNumber = this.meter.createLongGauge("messaging.eventhubs.checkpoint.sequence_number", "Last successfully checkpointed sequence number.", "seqNo");
this.checkpointCounter = this.meter.createLongCounter("messaging.eventhubs.checkpoints", "Number of checkpoints.", null);
} else {
this.lastSequenceNumber = null;
this.checkpointCounter = null;
}
}

void reportCheckpoint(Checkpoint checkpoint, String attributesId, boolean success) {
if (!isEnabled || !(lastSequenceNumber.isEnabled() && checkpointCounter.isEnabled())) {
return;
}

if (!maxCapacityReached && (seqNoSubscriptions.size() >= MAX_ATTRIBUTES_SETS || common.size() >= MAX_ATTRIBUTES_SETS)) {
LOGGER.error("Too many attribute combinations are reported for checkpoint metrics, ignoring any new dimensions.");
maxCapacityReached = true;
}

if (lastSequenceNumber.isEnabled() && success) {
updateCurrentValue(attributesId, checkpoint);
}

if (checkpointCounter.isEnabled()) {
TelemetryAttributes attributes = null;
if (success) {
attributes = getOrCreate(checkpointSuccess, attributesId, checkpoint, "ok");
} else {
attributes = getOrCreate(checkpointFailure, attributesId, checkpoint, "error");
}
if (attributes != null) {
checkpointCounter.add(1, attributes, Context.NONE);
}
}
}

private TelemetryAttributes getOrCreate(ConcurrentHashMap<String, TelemetryAttributes> source, String attributesId, Checkpoint checkpoint, String status) {
if (maxCapacityReached) {
return source.get(attributesId);
}

return source.computeIfAbsent(attributesId, i -> meter.createAttributes(createAttributes(checkpoint, status)));
}

private Map<String, Object> createAttributes(Checkpoint checkpoint, String status) {
Map<String, Object> attributesMap = new HashMap<>(5);
attributesMap.put(HOSTNAME_KEY, checkpoint.getFullyQualifiedNamespace());
attributesMap.put(ENTITY_NAME_KEY, checkpoint.getEventHubName());
attributesMap.put(PARTITION_ID_KEY, checkpoint.getPartitionId());
attributesMap.put(CONSUMER_GROUP_KEY, checkpoint.getConsumerGroup());
if (status != null) {
attributesMap.put(STATUS_KEY, status);
}

return attributesMap;
}

private void updateCurrentValue(String attributesId, Checkpoint checkpoint) {
if (checkpoint.getSequenceNumber() == null) {
return;
}

final CurrentValue valueSupplier;
if (maxCapacityReached) {
valueSupplier = seqNoSubscriptions.get(attributesId);
if (valueSupplier == null) {
return;
}
} else {
TelemetryAttributes attributes = getOrCreate(common, attributesId, checkpoint, null);
if (attributes == null) {
return;
}

valueSupplier = seqNoSubscriptions.computeIfAbsent(attributesId, a -> {
AtomicReference<Long> lastSeqNo = new AtomicReference<>();
return new CurrentValue(lastSequenceNumber.registerCallback(() -> lastSeqNo.get(), attributes), lastSeqNo);
});
}

valueSupplier.set(checkpoint.getSequenceNumber());
}

private static boolean areMetricsEnabled(MetricsOptions options) {
if (options == null || options.isEnabled()) {
return true;
}

return false;
}

private static class CurrentValue {
private final AtomicReference<Long> lastSeqNo;
private final AutoCloseable subscription;

CurrentValue(AutoCloseable subscription, AtomicReference<Long> lastSeqNo) {
this.subscription = subscription;
this.lastSeqNo = lastSeqNo;
}

void set(long value) {
lastSeqNo.set(value);
}

void close() {
if (subscription != null) {
try {
subscription.close();
} catch (Exception e) {
// should never happen
throw LOGGER.logThrowableAsWarning(new RuntimeException(e));
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name=${project.artifactId}
version=${project.version}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.ListBlobsOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -56,9 +58,20 @@ public class BlobEventProcessorClientStoreTest {
@Mock
private BlobAsyncClient blobAsyncClient;

private AutoCloseable autoCloseable;

@BeforeEach
public void setup() {
MockitoAnnotations.initMocks(this);
public void beforeEach() {
this.autoCloseable = MockitoAnnotations.openMocks(this);
}

@AfterEach
public void afterEach() throws Exception {
if (autoCloseable != null) {
autoCloseable.close();
}

Mockito.framework().clearInlineMock(this);
}

@Test
Expand Down
Loading