Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update documentation and in-memory checkpoint store sample #7389

Merged
merged 2 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions eng/pipelines/client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ jobs:
jdkVersionOption: '1.11'
jdkArchitectureOption: 'x64'
publishJUnitResults: false
goals: 'install site:site site:stage'
goals: 'site:site site:stage'

- script: |
git clone https://github.com/JonathanGiles/DependencyChecker.git
Expand Down Expand Up @@ -148,7 +148,6 @@ jobs:
- pwsh: |
copy -r target/staging $(Build.ArtifactStagingDirectory)
copy eng/code-quality-reports/src/main/resources/index.html $(Build.ArtifactStagingDirectory)
copy eng/spotbugs-aggregate-report/target/spotbugs/spotbugsXml.html (Join-Path $(Build.ArtifactStagingDirectory) "staging")
copy output/dependencies.html (Join-Path $(Build.ArtifactStagingDirectory) "staging")
displayName: 'Copy reports to artifact staging'
condition: ne(variables['Build.Reason'], 'PullRequest')
Expand Down
5 changes: 4 additions & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ managing the underlying consumer operations.

In our example, we will focus on building the [`EventProcessorClient`][EventProcessorClient], use the
[`InMemoryCheckpointStore`][InMemoryCheckpointStore] available in samples, and a callback function that processes events
received from the Event Hub and writes to console.
received from the Event Hub and writes to console. For production applications, it's recommended to use a durable
store like [Checkpoint Store with Azure Storage Blobs][BlobCheckpointStore].


<!-- embedme ./src/samples/java/com/azure/messaging/eventhubs/ReadmeSamples.java#L155-L176 -->
```java
Expand Down Expand Up @@ -470,5 +472,6 @@ Guidelines](./CONTRIBUTING.md) for more information.
[InMemoryCheckpointStore]: ./src/samples/java/com/azure/messaging/eventhubs/InMemoryCheckpointStore.java
[LogLevels]: ../../core/azure-core/src/main/java/com/azure/core/util/logging/ClientLogger.java
[RetryOptions]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java
[BlobCheckpointStore]: ../azure-messaging-eventhubs-checkpointstore-blob/README.md

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%2Feventhubs%2Fazure-messaging-eventhubs%2FREADME.png)
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,26 @@
* EventProcessorClient}.
*
* <p>
* To create an instance of {@link EventProcessorClient} that processes events with user-provided callback, configure
* the following fields:
* To create an instance of {@link EventProcessorClient}, the <b>following fields are required</b>:
*
* <ul>
* <li>{@link #consumerGroup(String) Consumer group name}.</li>
* <li>{@link CheckpointStore} - An implementation of CheckpointStore that stores checkpoint and
* partition ownership information to enable load balancing.</li>
* <li>{@link #processEvent(Consumer)} - A callback that processes events received from the Event Hub.</li>
* partition ownership information to enable load balancing and checkpointing processed events.</li>
* <li>{@link #processEvent(Consumer) processEvent} - A callback that processes events received from the Event Hub
* .</li>
* <li>{@link #processError(Consumer) processError} - A callback that handles errors that may occur while running the
* EventProcessorClient.</li>
* <li>Credentials -
* <strong>Credentials are required</strong> to perform operations against Azure Event Hubs. They can be set by using
* one of the following methods:
* <ul>
* <li>{@link #connectionString(String) connectionString(String)} with a connection string to a specific Event Hub.
* <li>{@link #connectionString(String)} with a connection string to a specific Event Hub.
* </li>
* <li>{@link #connectionString(String, String) connectionString(String, String)} with an Event Hub <i>namespace</i>
* connection string and the Event Hub name.</li>
* <li>{@link #credential(String, String, TokenCredential) credential(String, String, TokenCredential)} with the
* fully qualified namespace, Event Hub name, and a set of credentials authorized to use the Event Hub.
* <li>{@link #connectionString(String, String)} with an Event Hub <i>namespace</i> connection string and the Event Hub
* name.</li>
* <li>{@link #credential(String, String, TokenCredential)} with the fully qualified namespace, Event Hub name, and a
* set of credentials authorized to use the Event Hub.
* </li>
* </ul>
* </li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@

package com.azure.messaging.eventhubs.models;

import static com.azure.core.util.FluxUtil.monoError;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
Expand All @@ -21,7 +18,6 @@
*/
public class EventContext {

private final ClientLogger logger = new ClientLogger(EventContext.class);
private final PartitionContext partitionContext;
private final EventData eventData;
private final CheckpointStore checkpointStore;
Expand Down Expand Up @@ -78,16 +74,13 @@ public LastEnqueuedEventProperties getLastEnqueuedEventProperties() {
}

/**
* Updates the checkpoint asynchronously for this partition using the event data. This will serve as the last known
* successfully processed event in this partition if the update is successful.
* Updates the checkpoint asynchronously for this partition using the event data in this
* {@link EventContext}. This will serve as the last known successfully processed event in this partition if the
* update is successful.
*
* @param eventData The event data to use for updating the checkpoint.
* @return a representation of deferred execution of this call.
*/
public Mono<Void> updateCheckpointAsync(EventData eventData) {
if (eventData == null) {
return monoError(logger, new NullPointerException("'eventData' cannot be null"));
}
public Mono<Void> updateCheckpointAsync() {
Checkpoint checkpoint = new Checkpoint()
.setFullyQualifiedNamespace(partitionContext.getFullyQualifiedNamespace())
.setEventHubName(partitionContext.getEventHubName())
Expand All @@ -103,6 +96,6 @@ public Mono<Void> updateCheckpointAsync(EventData eventData) {
* successfully processed event in this partition if the update is successful.
*/
public void updateCheckpoint() {
this.updateCheckpointAsync(eventData).block();
this.updateCheckpointAsync().block();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the .block(), should we pass in the "retryOptions.operationTimeout" so it doesn't block forever (or I think default is 5 mins)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is just a delegate method to the checkpoint store's updateCheckpoint(), I think the checkpoint store (blob client or any other store) should be configured with the necessary timeouts and retry options. If the store they use is super-slow, they might want a different timeout setting than the one they use for Event Hubs (in retryOptions).

Maybe we can defer adding a timeout to a later release if that's required.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

package com.azure.messaging.eventhubs;

import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.util.List;
import java.util.Locale;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -21,23 +23,39 @@
*/
public class InMemoryCheckpointStore implements CheckpointStore {

private static final String OWNERSHIP = "ownership";
private static final String SEPARATOR = "/";
private static final String CHECKPOINT = "checkpoint";
private final Map<String, PartitionOwnership> partitionOwnershipMap = new ConcurrentHashMap<>();
private final Map<String, Checkpoint> checkpointsMap = new ConcurrentHashMap<>();
private final ClientLogger logger = new ClientLogger(InMemoryCheckpointStore.class);

/**
* {@inheritDoc}
*
* @param fullyQualifiedNamespace The fully qualified namespace of the Event Hubs instance.
* @param eventHubName The name of the Event Hub to list ownership of.
* @param consumerGroup The name of the consumer group to list ownership of.
* @return A {@link Flux} of partition ownership information.
*/
@Override
public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, String eventHubName,
String consumerGroup) {
logger.info("Listing partition ownership");
return Flux.fromIterable(partitionOwnershipMap.values());

String prefix = prefixBuilder(fullyQualifiedNamespace, eventHubName, consumerGroup, OWNERSHIP);
return Flux.fromIterable(partitionOwnershipMap.keySet())
.filter(key -> key.startsWith(prefix))
.map(key -> partitionOwnershipMap.get(key));
}

private String prefixBuilder(String fullyQualifiedNamespace, String eventHubName, String consumerGroup,
String type) {
return new StringBuilder()
.append(fullyQualifiedNamespace)
.append(SEPARATOR)
.append(eventHubName)
.append(SEPARATOR)
.append(consumerGroup)
.append(SEPARATOR)
.append(type)
.toString()
.toLowerCase(Locale.ROOT);
}

/**
Expand All @@ -50,6 +68,13 @@ public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, St
*/
@Override
public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {
if (CoreUtils.isNullOrEmpty(requestedPartitionOwnerships)) {
return Flux.empty();
}
PartitionOwnership firstEntry = requestedPartitionOwnerships.get(0);
String prefix = prefixBuilder(firstEntry.getFullyQualifiedNamespace(), firstEntry.getEventHubName(),
firstEntry.getConsumerGroup(), OWNERSHIP);

return Flux.fromIterable(requestedPartitionOwnerships)
.filter(partitionOwnership -> {
return !partitionOwnershipMap.containsKey(partitionOwnership.getPartitionId())
Expand All @@ -62,15 +87,21 @@ public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requeste
.map(partitionOwnership -> {
partitionOwnership.setETag(UUID.randomUUID().toString())
.setLastModifiedTime(System.currentTimeMillis());
partitionOwnershipMap.put(partitionOwnership.getPartitionId(), partitionOwnership);
partitionOwnershipMap.put(prefix + SEPARATOR + partitionOwnership.getPartitionId(), partitionOwnership);
return partitionOwnership;
});
}

/**
* {@inheritDoc}
*/
@Override
public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, String eventHubName,
String consumerGroup) {
return Flux.fromStream(checkpointsMap.values().stream());
String prefix = prefixBuilder(fullyQualifiedNamespace, eventHubName, consumerGroup, CHECKPOINT);
return Flux.fromIterable(checkpointsMap.keySet())
.filter(key -> key.startsWith(prefix))
.map(key -> checkpointsMap.get(key));
}

/**
Expand All @@ -81,7 +112,13 @@ public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, String e
*/
@Override
public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
checkpointsMap.put(checkpoint.getPartitionId(), checkpoint);
if (checkpoint == null) {
return Mono.error(logger.logExceptionAsError(new NullPointerException("checkpoint cannot be null")));
}

String prefix = prefixBuilder(checkpoint.getFullyQualifiedNamespace(), checkpoint.getEventHubName(),
checkpoint.getEventHubName(), CHECKPOINT);
checkpointsMap.put(prefix + SEPARATOR + checkpoint.getPartitionId(), checkpoint);
logger.info("Updated checkpoint for partition {} with sequence number {}", checkpoint.getPartitionId(),
checkpoint.getSequenceNumber());
return Mono.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ public void testWithSimplePartitionProcessor() throws Exception {
// Assert
assertNotNull(eventProcessorClient.getIdentifier());

StepVerifier.create(checkpointStore.listOwnership("ns", "test-eh", "test-consumer"))
StepVerifier.create(checkpointStore.listOwnership("test-ns", "test-eh", "test-consumer"))
.expectNextCount(1).verifyComplete();

StepVerifier.create(checkpointStore.listOwnership("ns", "test-eh", "test-consumer"))
StepVerifier.create(checkpointStore.listOwnership("test-ns", "test-eh", "test-consumer"))
.assertNext(partitionOwnership -> {
assertEquals("1", partitionOwnership.getPartitionId(), "Partition");
assertEquals("test-consumer", partitionOwnership.getConsumerGroup(), "Consumer");
Expand Down Expand Up @@ -287,7 +287,7 @@ public void testWithMultiplePartitions() throws Exception {

// Assert
Assertions.assertTrue(completed);
StepVerifier.create(checkpointStore.listOwnership("ns", "test-eh", "test-consumer"))
StepVerifier.create(checkpointStore.listOwnership("test-ns", "test-eh", "test-consumer"))
.expectNextCount(1).verifyComplete();

verify(eventHubAsyncClient, atLeast(1)).getPartitionIds();
Expand All @@ -297,15 +297,15 @@ public void testWithMultiplePartitions() throws Exception {
// We expected one to be removed.
Assertions.assertEquals(2, identifiers.size());

StepVerifier.create(checkpointStore.listOwnership("ns", "test-eh", "test-consumer"))
StepVerifier.create(checkpointStore.listOwnership("test-ns", "test-eh", "test-consumer"))
.assertNext(po -> {
String partitionId = po.getPartitionId();
verify(consumer1, atLeastOnce()).receiveFromPartition(eq(partitionId), any(EventPosition.class), any());
}).verifyComplete();
}

private PartitionEvent getEvent(EventData event) {
PartitionContext context = new PartitionContext("ns", "foo", "bar", "baz");
PartitionContext context = new PartitionContext("test-ns", "foo", "bar", "baz");
return new PartitionEvent(context, event, null);
}

Expand Down