diff --git a/eng/pipelines/client.yml b/eng/pipelines/client.yml
index dbd34c05f803..d27b602a0f8d 100644
--- a/eng/pipelines/client.yml
+++ b/eng/pipelines/client.yml
@@ -113,7 +113,7 @@ jobs:
jdkVersionOption: '1.11'
jdkArchitectureOption: 'x64'
publishJUnitResults: false
- goals: 'install site:site site:stage'
+ goals: 'site:site site:stage'
- script: |
git clone https://github.com/JonathanGiles/DependencyChecker.git
@@ -148,7 +148,6 @@ jobs:
- pwsh: |
copy -r target/staging $(Build.ArtifactStagingDirectory)
copy eng/code-quality-reports/src/main/resources/index.html $(Build.ArtifactStagingDirectory)
- copy eng/spotbugs-aggregate-report/target/spotbugs/spotbugsXml.html (Join-Path $(Build.ArtifactStagingDirectory) "staging")
copy output/dependencies.html (Join-Path $(Build.ArtifactStagingDirectory) "staging")
displayName: 'Copy reports to artifact staging'
condition: ne(variables['Build.Reason'], 'PullRequest')
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/README.md b/sdk/eventhubs/azure-messaging-eventhubs/README.md
index e03fd68c0eb1..2f34b5315a7a 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/README.md
+++ b/sdk/eventhubs/azure-messaging-eventhubs/README.md
@@ -319,7 +319,9 @@ managing the underlying consumer operations.
In our example, we will focus on building the [`EventProcessorClient`][EventProcessorClient], use the
[`InMemoryCheckpointStore`][InMemoryCheckpointStore] available in samples, and a callback function that processes events
-received from the Event Hub and writes to console.
+received from the Event Hub and writes to console. For production applications, it's recommended to use a durable
+store like [Checkpoint Store with Azure Storage Blobs][BlobCheckpointStore].
+
```java
@@ -470,5 +472,6 @@ Guidelines](./CONTRIBUTING.md) for more information.
[InMemoryCheckpointStore]: ./src/samples/java/com/azure/messaging/eventhubs/InMemoryCheckpointStore.java
[LogLevels]: ../../core/azure-core/src/main/java/com/azure/core/util/logging/ClientLogger.java
[RetryOptions]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java
+[BlobCheckpointStore]: ../azure-messaging-eventhubs-checkpointstore-blob/README.md

diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java
index 1b1d0637b8df..2e306e58fd37 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java
@@ -32,24 +32,26 @@
* EventProcessorClient}.
*
*
- * To create an instance of {@link EventProcessorClient} that processes events with user-provided callback, configure
- * the following fields:
+ * To create an instance of {@link EventProcessorClient}, the following fields are required:
*
*
* - {@link #consumerGroup(String) Consumer group name}.
* - {@link CheckpointStore} - An implementation of CheckpointStore that stores checkpoint and
- * partition ownership information to enable load balancing.
- * - {@link #processEvent(Consumer)} - A callback that processes events received from the Event Hub.
+ * partition ownership information to enable load balancing and checkpointing processed events.
+ * - {@link #processEvent(Consumer) processEvent} - A callback that processes events received from the Event Hub
+ * .
+ * - {@link #processError(Consumer) processError} - A callback that handles errors that may occur while running the
+ * EventProcessorClient.
* - Credentials -
* Credentials are required to perform operations against Azure Event Hubs. They can be set by using
* one of the following methods:
*
- * - {@link #connectionString(String) connectionString(String)} with a connection string to a specific Event Hub.
+ *
- {@link #connectionString(String)} with a connection string to a specific Event Hub.
*
- * - {@link #connectionString(String, String) connectionString(String, String)} with an Event Hub namespace
- * connection string and the Event Hub name.
- * - {@link #credential(String, String, TokenCredential) credential(String, String, TokenCredential)} with the
- * fully qualified namespace, Event Hub name, and a set of credentials authorized to use the Event Hub.
+ *
- {@link #connectionString(String, String)} with an Event Hub namespace connection string and the Event Hub
+ * name.
+ * - {@link #credential(String, String, TokenCredential)} with the fully qualified namespace, Event Hub name, and a
+ * set of credentials authorized to use the Event Hub.
*
*
*
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventContext.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventContext.java
index e167d0882f3b..ff6672d2bfea 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventContext.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventContext.java
@@ -3,9 +3,6 @@
package com.azure.messaging.eventhubs.models;
-import static com.azure.core.util.FluxUtil.monoError;
-
-import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
@@ -21,7 +18,6 @@
*/
public class EventContext {
- private final ClientLogger logger = new ClientLogger(EventContext.class);
private final PartitionContext partitionContext;
private final EventData eventData;
private final CheckpointStore checkpointStore;
@@ -78,16 +74,13 @@ public LastEnqueuedEventProperties getLastEnqueuedEventProperties() {
}
/**
- * Updates the checkpoint asynchronously for this partition using the event data. This will serve as the last known
- * successfully processed event in this partition if the update is successful.
+ * Updates the checkpoint asynchronously for this partition using the event data in this
+ * {@link EventContext}. This will serve as the last known successfully processed event in this partition if the
+ * update is successful.
*
- * @param eventData The event data to use for updating the checkpoint.
* @return a representation of deferred execution of this call.
*/
- public Mono updateCheckpointAsync(EventData eventData) {
- if (eventData == null) {
- return monoError(logger, new NullPointerException("'eventData' cannot be null"));
- }
+ public Mono updateCheckpointAsync() {
Checkpoint checkpoint = new Checkpoint()
.setFullyQualifiedNamespace(partitionContext.getFullyQualifiedNamespace())
.setEventHubName(partitionContext.getEventHubName())
@@ -103,6 +96,6 @@ public Mono updateCheckpointAsync(EventData eventData) {
* successfully processed event in this partition if the update is successful.
*/
public void updateCheckpoint() {
- this.updateCheckpointAsync(eventData).block();
+ this.updateCheckpointAsync().block();
}
}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/InMemoryCheckpointStore.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/InMemoryCheckpointStore.java
index f564cc58a06c..b07287dc39dc 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/InMemoryCheckpointStore.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/InMemoryCheckpointStore.java
@@ -3,10 +3,12 @@
package com.azure.messaging.eventhubs;
+import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.util.List;
+import java.util.Locale;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -21,23 +23,39 @@
*/
public class InMemoryCheckpointStore implements CheckpointStore {
+ private static final String OWNERSHIP = "ownership";
+ private static final String SEPARATOR = "/";
+ private static final String CHECKPOINT = "checkpoint";
private final Map partitionOwnershipMap = new ConcurrentHashMap<>();
private final Map checkpointsMap = new ConcurrentHashMap<>();
private final ClientLogger logger = new ClientLogger(InMemoryCheckpointStore.class);
/**
* {@inheritDoc}
- *
- * @param fullyQualifiedNamespace The fully qualified namespace of the Event Hubs instance.
- * @param eventHubName The name of the Event Hub to list ownership of.
- * @param consumerGroup The name of the consumer group to list ownership of.
- * @return A {@link Flux} of partition ownership information.
*/
@Override
public Flux listOwnership(String fullyQualifiedNamespace, String eventHubName,
String consumerGroup) {
logger.info("Listing partition ownership");
- return Flux.fromIterable(partitionOwnershipMap.values());
+
+ String prefix = prefixBuilder(fullyQualifiedNamespace, eventHubName, consumerGroup, OWNERSHIP);
+ return Flux.fromIterable(partitionOwnershipMap.keySet())
+ .filter(key -> key.startsWith(prefix))
+ .map(key -> partitionOwnershipMap.get(key));
+ }
+
+ private String prefixBuilder(String fullyQualifiedNamespace, String eventHubName, String consumerGroup,
+ String type) {
+ return new StringBuilder()
+ .append(fullyQualifiedNamespace)
+ .append(SEPARATOR)
+ .append(eventHubName)
+ .append(SEPARATOR)
+ .append(consumerGroup)
+ .append(SEPARATOR)
+ .append(type)
+ .toString()
+ .toLowerCase(Locale.ROOT);
}
/**
@@ -50,6 +68,13 @@ public Flux listOwnership(String fullyQualifiedNamespace, St
*/
@Override
public Flux claimOwnership(List requestedPartitionOwnerships) {
+ if (CoreUtils.isNullOrEmpty(requestedPartitionOwnerships)) {
+ return Flux.empty();
+ }
+ PartitionOwnership firstEntry = requestedPartitionOwnerships.get(0);
+ String prefix = prefixBuilder(firstEntry.getFullyQualifiedNamespace(), firstEntry.getEventHubName(),
+ firstEntry.getConsumerGroup(), OWNERSHIP);
+
return Flux.fromIterable(requestedPartitionOwnerships)
.filter(partitionOwnership -> {
return !partitionOwnershipMap.containsKey(partitionOwnership.getPartitionId())
@@ -62,15 +87,21 @@ public Flux claimOwnership(List requeste
.map(partitionOwnership -> {
partitionOwnership.setETag(UUID.randomUUID().toString())
.setLastModifiedTime(System.currentTimeMillis());
- partitionOwnershipMap.put(partitionOwnership.getPartitionId(), partitionOwnership);
+ partitionOwnershipMap.put(prefix + SEPARATOR + partitionOwnership.getPartitionId(), partitionOwnership);
return partitionOwnership;
});
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public Flux listCheckpoints(String fullyQualifiedNamespace, String eventHubName,
String consumerGroup) {
- return Flux.fromStream(checkpointsMap.values().stream());
+ String prefix = prefixBuilder(fullyQualifiedNamespace, eventHubName, consumerGroup, CHECKPOINT);
+ return Flux.fromIterable(checkpointsMap.keySet())
+ .filter(key -> key.startsWith(prefix))
+ .map(key -> checkpointsMap.get(key));
}
/**
@@ -81,7 +112,13 @@ public Flux listCheckpoints(String fullyQualifiedNamespace, String e
*/
@Override
public Mono updateCheckpoint(Checkpoint checkpoint) {
- checkpointsMap.put(checkpoint.getPartitionId(), checkpoint);
+ if (checkpoint == null) {
+ return Mono.error(logger.logExceptionAsError(new NullPointerException("checkpoint cannot be null")));
+ }
+
+ String prefix = prefixBuilder(checkpoint.getFullyQualifiedNamespace(), checkpoint.getEventHubName(),
+ checkpoint.getEventHubName(), CHECKPOINT);
+ checkpointsMap.put(prefix + SEPARATOR + checkpoint.getPartitionId(), checkpoint);
logger.info("Updated checkpoint for partition {} with sequence number {}", checkpoint.getPartitionId(),
checkpoint.getSequenceNumber());
return Mono.empty();
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java
index 1cc88baed6ed..99f1c76abe66 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorClientTest.java
@@ -148,10 +148,10 @@ public void testWithSimplePartitionProcessor() throws Exception {
// Assert
assertNotNull(eventProcessorClient.getIdentifier());
- StepVerifier.create(checkpointStore.listOwnership("ns", "test-eh", "test-consumer"))
+ StepVerifier.create(checkpointStore.listOwnership("test-ns", "test-eh", "test-consumer"))
.expectNextCount(1).verifyComplete();
- StepVerifier.create(checkpointStore.listOwnership("ns", "test-eh", "test-consumer"))
+ StepVerifier.create(checkpointStore.listOwnership("test-ns", "test-eh", "test-consumer"))
.assertNext(partitionOwnership -> {
assertEquals("1", partitionOwnership.getPartitionId(), "Partition");
assertEquals("test-consumer", partitionOwnership.getConsumerGroup(), "Consumer");
@@ -287,7 +287,7 @@ public void testWithMultiplePartitions() throws Exception {
// Assert
Assertions.assertTrue(completed);
- StepVerifier.create(checkpointStore.listOwnership("ns", "test-eh", "test-consumer"))
+ StepVerifier.create(checkpointStore.listOwnership("test-ns", "test-eh", "test-consumer"))
.expectNextCount(1).verifyComplete();
verify(eventHubAsyncClient, atLeast(1)).getPartitionIds();
@@ -297,7 +297,7 @@ public void testWithMultiplePartitions() throws Exception {
// We expected one to be removed.
Assertions.assertEquals(2, identifiers.size());
- StepVerifier.create(checkpointStore.listOwnership("ns", "test-eh", "test-consumer"))
+ StepVerifier.create(checkpointStore.listOwnership("test-ns", "test-eh", "test-consumer"))
.assertNext(po -> {
String partitionId = po.getPartitionId();
verify(consumer1, atLeastOnce()).receiveFromPartition(eq(partitionId), any(EventPosition.class), any());
@@ -305,7 +305,7 @@ public void testWithMultiplePartitions() throws Exception {
}
private PartitionEvent getEvent(EventData event) {
- PartitionContext context = new PartitionContext("ns", "foo", "bar", "baz");
+ PartitionContext context = new PartitionContext("test-ns", "foo", "bar", "baz");
return new PartitionEvent(context, event, null);
}