From 0443605b6dda736a19c7abec23298628cf280f94 Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 12 Dec 2019 17:30:57 -0800 Subject: [PATCH 1/9] Add initial migration. --- .../migration-guide.md | 374 ++++++++++++++++++ 1 file changed, 374 insertions(+) create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md diff --git a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md new file mode 100644 index 000000000000..93e466a9a867 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md @@ -0,0 +1,374 @@ +# Migration Guide (Event Hubs v3 to v5) + +This document is intended for users that are familiar with v3 of the Java SDK for Event Hubs library +([`azure-eventhubs`][azure-eventhubs] and [`azure-eventhubs-eph`][azure-eventhubs-eph]) and wish to migrate their +application to V5 of the same library. + +For users new to the Java SDK for Event Hubs, please see the [README for azure-messaging-eventhubs][README]. + +## Table of contents + +- [Prerequisites](#prerequisites) +- [Updated Maven dependencies](#updated-maven-dependencies) +- [General changes](#general-changes) + - [Converting core classes](#converting-core-classes) + - [Sending events](#sending-events) + - [Receiving events](#receiving-events) + - [Minor renames](#minor-renames) +- [Migration samples](#migration-samples) + - [Migrating code from `EventHubClient` to `EventHubProducerClient` for sending events](#migrating-code-from--eventhubclient--to--eventhubproducerclient--for-sending-events) + - [Migrating code from `EventHubClient` to `EventHubConsumerClient` for receiving events](#migrating-code-from--eventhubclient--to--eventhubconsumerclient--for-receiving-events) + - [Migrating code from `EventProcessorHost` to `EventHubConsumerClient` for receiving events](#migrating-code-from--eventprocessorhost--to--eventhubconsumerclient--for-receiving-events) + +## Prerequisites +Java Development Kit (JDK) with version 8 or above + +## Updated Maven dependencies + +Dependencies for Event Hubs has been updated to: +```xml + + + com.azure + azure-messaging-eventhubs + 5.0.0-beta.6 + + + + + com.azure + azure-messaging-eventhubs-checkpointstore-blob + 1.0.0-beta.4 + + +``` + +## General changes + +In the interest of simplifying the API surface, we've made three clients, each with an asynchronous and synchronous +variant. One client for sending events, `EventHubProducerAsyncClient`, and two for receiving events. +`EventProcessorClient` is the production level consumer and `EventHubConsumerAsyncClient` for exploration and +lower-level control of `EventData` consumption. + +[EventProcessorClient][EventProcessorClient] supports checkpointing and load balancing through a plugin model. +Currently, only Azure Blob storage is supported through +[azure-messaging-eventhubs-checkpointstore-blob][azure-messaging-eventhubs-checkpointstore-blob], but support for other +durable storage (i.e. Cosmos DB, Redis) can be added in the future. + +| Operation | Asynchronous client | Synchronous client | +|---|---|---| +| Producing events | [EventHubProducerAsyncClient][EventHubProducerAsyncClient] | [EventHubProducerClient][EventHubProducerClient] | +| Consuming events (supports checkpointing and load balancing) | [EventProcessorClient][EventProcessorClient] | | +| Consuming events | [EventHubConsumerAsyncClient][EventHubConsumerAsyncClient] | [EventHubConsumerClient][EventHubConsumerClient] | + +### Converting core classes + +Creation of producers or consumers is done through either [EventHubClientBuilder][EventHubClientBuilder] or +[EventProcessorClientBuilder][EventProcessorClientBuilder]. Asynchronous clients are created by invoking +`builder.build*AsyncClient()`. Synchronous clients are created by invoking `builder.build*Client()`. + +| In v3 | Equivalent in v5 | Sample | +|---|---|---| +| `EventHubClient.createFromConnectionString()` | `var builder = new EventHubClientBuilder().connectionString();`
then either `builder.buildProducerAsyncClient();` or
`builder.consumerGroup().buildConsumerAsyncClient();` | [Publishing events][PublishEventsWithCustomMetadata], [Consuming events][ConsumeEvents] | +| `EventHubClient.createWithAzureActiveDirectory()` | `var builder = new EventHubClientBuilder().tokenCredential();`
then either `builder.buildProducerAsyncClient();` or
`builder.consumerGroup().buildConsumerAsyncClient();` | [Publishing events with Azure AD][PublishEventsWithAzureIdentity] | +| `EventProcessorHost.EventProcessorHostBuilder.newBuilder()` | `new EventProcessorClientBuilder().buildEventProcessorClient()` | [EventProcessorClient with Blob storage][EventProcessorClientInstantiation] | + +### Sending events + +`EventHubProducerAsyncClient` and `EventHubProducerClient` can publish events to a single partition or allow the service +to load balance events between all the partitions. The behaviour is determined when using +[`CreateBatchOptions`][CreateBatchOptions] in `producer.createBatch(CreateBatchOptions)`. + +| In v3 | Equivalent in v5 | Sample | +|---|---|---| +| `PartitionSender.send(...)` | `EventHubProducerAsyncClient.send()` | [Publishing events to a specific partition][PublishEventsToSpecificPartition] | +| `EventHubClient.send(...)` | `EventHubProducerAsyncClient.send()` | [Publishing events][PublishEventsWithCustomMetadata] | + +### Receiving events + +| In v3 | Equivalent in v5 | Sample | +|---|---|---| +| `PartitionReceiver.receive()` | `EventHubConsumerAsyncClient.receiveFromPartition()` | [Consuming events][ConsumeEvents] | +| `PartitionReceiver.setReceiveHandler()` | `EventHubConsumerAsyncClient.receiveFromPartition()` | [Consuming events][ConsumeEvents] | + +### Minor renames + +| In v3 | Equivalent in v5 | +|---|---| +| `EventPosition.fromStartOfStream()` | `EventPosition.earliest()` | +| `EventPosition.fromEndOfStream()` | `EventPosition.latest()` | + +## Migration samples + +* [Sending events](#migrating-code-from-eventhubclient-to-eventhubproducerclient-for-sending-events) +* [Receiving events](#migrating-code-from-eventhubclient-to-eventhubconsumerclient-for-receiving-events) +* [Receiving events with checkpointing](#migrating-code-from-eventprocessorhost-to-eventhubconsumerclient-for-receiving-events) + +### Migrating from `PartitionSender` or `EventHubClient` to `EventHubProducerClient` for sending events +In v3, there were multiple options on how to publish events to an Event Hub. + +In v5, this has been consolidated into a more efficient `send(EventDataBatch)` method. Batching merges information from +multiple events into a single sent message, reducing the amount of network communication needed vs sending events one at +a time. + +So in v3: +```java +EventHubClient client = EventHubClient.createFromConnectionString("connection-string", + Executors.newScheduledThreadPool(4)).get(); + +List events = Arrays.asList(EventData.create("foo".getBytes()), EventData.create("bar".getBytes())); + +CompletableFuture sendFuture = client.createPartitionSender("my-partition-id") + .thenCompose(sender -> { + EventDataBatch batch = sender.createBatch(); + for (EventData event : events) { + try { + batch.tryAdd(event); + } catch (PayloadSizeExceededException e) { + System.err.println("Event is too large for batch. Exception: " + e); + } + } + + return sender.send(batch); + }); +sendFuture.get(); +``` + +In v5: +```java +const producer = new EventHubProducerClient(connectionString); + +const eventsToSend = [ + // events go here +]; + +let batch = await producer.createBatch(); +let i = 0; + +while (i < eventsToSend.length) { + // messages can fail to be added to the batch if they exceed the maximum size configured for + // the EventHub. + const isAdded = batch.tryAdd(eventsToSend[i]); + + if (isAdded) { + console.log(`Added event number ${i} to the batch`); + ++i; + continue; + } + + if (batch.count === 0) { + // If we can't add it and the batch is empty that means the message we're trying to send + // is too large, even when it would be the _only_ message in the batch. + // + // At this point you'll need to decide if you're okay with skipping this message entirely + // or find some way to shrink it. + console.log(`Message was too large and can't be sent until it's made smaller. Skipping...`); + ++i; + continue; + } + + // otherwise this just signals a good spot to send our batch + console.log(`Batch is full - sending ${batch.count} messages as a single batch.`); + await producer.sendBatch(batch); + + // and create a new one to house the next set of messages + batch = await producer.createBatch(); +} + +// send any remaining messages, if any. +if (batch.count > 0) { + console.log(`Sending remaining ${batch.count} messages as a single batch.`) + await producer.sendBatch(batch); +} +``` + +### Migrating code from `EventHubClient` to `EventHubConsumerClient` for receiving events + +In V2, event handlers were passed as positional arguments to `receive`. + +In V5, event handlers are passed as part of a `SubscriptionEventHandlers` shaped object. + +For example, this code which receives from a partition in V2: + +```typescript +const client = EventHubClient.createFromConnectionString(connectionString); +const rcvHandler = client.receive(partitionId, onMessageHandler, onErrorHandler, { + eventPosition: EventPosition.fromStart(), + consumerGroup: consumerGroupName +}); +await rcvHandler.stop(); +``` + +Becomes this in V5: + +```typescript +const eventHubConsumerClient = new EventHubConsumerClient(consumerGroupName, connectionString); + +const subscription = eventHubConsumerClient.subscribe( + partitionId, { + processInitialize: (initContext) => { + initContext.setStartingPosition(EventPosition.fromStart()); + }, + processEvents: onMessageHandler, + processError: onErrorHandler +}); + +await subscription.close(); +``` + +See [`receiveEvents.ts`](https://github.com/Azure/azure-sdk-for-js/blob/master/sdk/eventhub/event-hubs/samples/receiveEvents.ts) +for a sample program demonstrating this. + +In V5, this has been consolidated into a more efficient `sendBatch` method. +Batching merges information from multiple messages into a single send, reducing +the amount of network communication needed vs sending messages one at a time. + +So in V2: +```java +const eventsToSend = [ + // events go here +]; + +const client = EventHubClient.createFromConnectionString(connectionString); + +// Would fail if the total size of events exceed the max size supported by the library. +await client.sendBatch(eventsToSend, partitionId); +``` + +In V5: +```typescript +const producer = new EventHubProducerClient(connectionString); + +const eventsToSend = [ + // events go here +]; + +let batch = await producer.createBatch(); +let i = 0; + +while (i < eventsToSend.length) { + // messages can fail to be added to the batch if they exceed the maximum size configured for + // the EventHub. + const isAdded = batch.tryAdd(eventsToSend[i]); + + if (isAdded) { + console.log(`Added event number ${i} to the batch`); + ++i; + continue; + } + + if (batch.count === 0) { + // If we can't add it and the batch is empty that means the message we're trying to send + // is too large, even when it would be the _only_ message in the batch. + // + // At this point you'll need to decide if you're okay with skipping this message entirely + // or find some way to shrink it. + console.log(`Message was too large and can't be sent until it's made smaller. Skipping...`); + ++i; + continue; + } + + // otherwise this just signals a good spot to send our batch + console.log(`Batch is full - sending ${batch.count} messages as a single batch.`); + await producer.sendBatch(batch); + + // and create a new one to house the next set of messages + batch = await producer.createBatch(); +} + +// send any remaining messages, if any. +if (batch.count > 0) { + console.log(`Sending remaining ${batch.count} messages as a single batch.`) + await producer.sendBatch(batch); +} +``` + +### Migrating code from `EventProcessorHost` to `EventHubConsumerClient` for receiving events + +In V2, `EventProcessorHost` allowed you to balance the load between multiple instances of +your program when receiving events. + +In V5, `EventHubConsumerClient` allows you to do the same with the `subscribe()` method if you +pass a `CheckpointStore` to the constructor. + +So in V2: +```typescript +const eph = EventProcessorHost.createFromConnectionString( + EventProcessorHost.createHostName(ephName), + storageConnectionString, + storageContainerName, + ehConnectionString, + { + eventHubPath: eventHubName, + onEphError: (error) => { + // This is your error handler for errors occuring during load balancing. + console.log("Error when running EPH: %O", error); + } + } +); + +// In V2, you get a single event passed to your callback. If you had asynchronous code running in your callback, +// it is not awaited before the callback is called for the next event. +const onMessage = (context, event) => { /** your code here **/ } + +// This is your error handler for errors occurring when receiving events. +const onError = (error) => { + console.log("Received Error: %O", error); +}; + +await eph.start(onMessage, onError); +``` + +And in V5: +```typescript +import { EventHubConsumerClient, CheckpointStore } from "@azure/event-hubs"; +import { ContainerClient } from "@azure/storage-blob"; +import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"; + +const containerClient = new ContainerClient(storageConnectionString, storageContainerName); +const checkpointStore : CheckpointStore = new BlobCheckpointStore(containerClient); +const eventHubConsumerClient = new EventHubConsumerClient(consumerGroupName, ehConnectionString, eventHubName); + +const subscription = eventHubConsumerClient.subscribe( + partitionId, { + // In V5 we deliver events in batches, rather than a single message at a time. + // You can control the batch size via the options passed to the client. + // + // If your callback is an async function or returns a promise, it will be awaited before the + // callback is called for the next batch of events. + processEvents: (events, context) => { /** your code here **/ }, + + // Prior to V5 errors were handled by separate callbacks depending + // on where they were thrown i.e when managing different partitions vs receiving from each partition. + // + // In V5 you only need a single error handler for all of those cases. + processError: (error, context) => { + if (context.partitionId) { + console.log("Error when receiving events from partition %s: %O", context.partitionId, error) + } else { + console.log("Error from the consumer client: %O", error); + } + } +}); + +await subscription.close(); +``` + + +[azure-eventhubs-eph]: https://search.maven.org/artifact/com.microsoft.azure/azure-eventhubs-eph +[azure-eventhubs]: https://search.maven.org/artifact/com.microsoft.azure/azure-eventhubs +[azure-messaging-eventhubs-checkpointstore-blob]: https://search.maven.org/artifact/com.azure/azure-messaging-eventhubs-checkpointstore-blob +[ConsumeEvents]: src/samples/java/com/azure/messaging/eventhubs/ConsumeEvents.java +[CreateBatchOptions]: src/main/java/com/azure/messaging/eventhubs/models/CreateBatchOptions.java +[EventHubClientBuilder]: src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +[EventHubConsumerAsyncClient]: src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java +[EventHubConsumerClient]: src/main/java/com/azure/messaging/eventhubs/EventHubConsumerClient.java +[EventHubProducerAsyncClient]: src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java +[EventHubProducerClient]: src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java +[EventProcessorClient]: src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java +[EventProcessorClientBuilder]: src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java +[EventProcessorClientInstantiation]: ../azure-messaging-eventhubs-checkpointstore-blob/src/samples/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreSample.java +[PublishEventsToSpecificPartition]: src/samples/java/com/azure/messaging/eventhubs/PublishEventsToSpecificPartition.java +[PublishEventsWithAzureIdentity]: src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithAzureIdentity.java +[PublishEventsWithCustomMetadata]: src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithCustomMetadata.java +[README]: README.md From 4d8d199076b55f17d48cd27a1f1d37edd1052451 Mon Sep 17 00:00:00 2001 From: Connie Date: Fri, 13 Dec 2019 04:14:31 -0800 Subject: [PATCH 2/9] Adding .vscode and .factorypath to gitignore. --- .gitignore | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 0b117a131da2..946f4e315f83 100644 --- a/.gitignore +++ b/.gitignore @@ -61,4 +61,8 @@ Thumbs.db # reduced pom files should not be included dependency-reduced-pom.xml -package-lock.json \ No newline at end of file +package-lock.json + +# VS Code +.vscode/ +.factorypath From 84e45895329f12002b88a6651d94bb193f307c9d Mon Sep 17 00:00:00 2001 From: Connie Date: Fri, 13 Dec 2019 04:14:46 -0800 Subject: [PATCH 3/9] Fixing typo in Azure storage migration guide. --- sdk/storage/azure-storage-blob/migrationGuides/V8_V12.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-blob/migrationGuides/V8_V12.md b/sdk/storage/azure-storage-blob/migrationGuides/V8_V12.md index c1f4b3cbd570..3a6d2bf8427e 100644 --- a/sdk/storage/azure-storage-blob/migrationGuides/V8_V12.md +++ b/sdk/storage/azure-storage-blob/migrationGuides/V8_V12.md @@ -8,7 +8,7 @@ For more info of the motivation behind this major change, please refer to [this Java Development Kit (JDK) with version 8 or above ## Converting Core Classes -Our core synchronous classes have been replaced, as well as new a~~~~synchronous counterparts added. +Our core synchronous classes have been replaced, as well as new asynchronous counterparts added. | Core V8 classes | Equivalent V12 Class | NEW Asynchronous clients | |---:|---:|---:| From 4187ad680f93376a4684a61dbc978282719842f2 Mon Sep 17 00:00:00 2001 From: Connie Date: Fri, 13 Dec 2019 06:44:54 -0800 Subject: [PATCH 4/9] Update sending migration sample code. --- .../migration-guide.md | 77 +++++++------------ 1 file changed, 28 insertions(+), 49 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md index 93e466a9a867..c189702c910d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md @@ -16,10 +16,9 @@ For users new to the Java SDK for Event Hubs, please see the [README for azure-m - [Receiving events](#receiving-events) - [Minor renames](#minor-renames) - [Migration samples](#migration-samples) - - [Migrating code from `EventHubClient` to `EventHubProducerClient` for sending events](#migrating-code-from--eventhubclient--to--eventhubproducerclient--for-sending-events) - - [Migrating code from `EventHubClient` to `EventHubConsumerClient` for receiving events](#migrating-code-from--eventhubclient--to--eventhubconsumerclient--for-receiving-events) - - [Migrating code from `EventProcessorHost` to `EventHubConsumerClient` for receiving events](#migrating-code-from--eventprocessorhost--to--eventhubconsumerclient--for-receiving-events) - + - [Migrating from `PartitionSender` or `EventHubClient` to `EventHubProducerAsyncClient` for sending events](#migrating-from-partitionsender-or-eventhubclient-to-eventhubproducerasyncclient-for-sending-events) + - [Migrating code from `PartitionReceiver` to `EventHubConsumerClient` for receiving events](#migrating-code-from-partitionreceiver-to-eventhubconsumerclient-for-receiving-events) + - [Migrating code from `EventProcessorHost` to `EventHubConsumerClient` for receiving events](#migrating-code-from-eventprocessorhost-to-eventhubconsumerclient-for-receiving-events) ## Prerequisites Java Development Kit (JDK) with version 8 or above @@ -100,20 +99,23 @@ to load balance events between all the partitions. The behaviour is determined w ## Migration samples -* [Sending events](#migrating-code-from-eventhubclient-to-eventhubproducerclient-for-sending-events) -* [Receiving events](#migrating-code-from-eventhubclient-to-eventhubconsumerclient-for-receiving-events) +* [Sending events](#migrating-from-partitionsender-or-eventhubclient-to-eventhubproducerasyncclient-for-sending-events) +* [Receiving events](#migrating-code-from-partitionreceiver-to-eventhubconsumerclient-for-receiving-events) * [Receiving events with checkpointing](#migrating-code-from-eventprocessorhost-to-eventhubconsumerclient-for-receiving-events) -### Migrating from `PartitionSender` or `EventHubClient` to `EventHubProducerClient` for sending events +### Migrating from `PartitionSender` or `EventHubClient` to `EventHubProducerAsyncClient` for sending events In v3, there were multiple options on how to publish events to an Event Hub. In v5, this has been consolidated into a more efficient `send(EventDataBatch)` method. Batching merges information from multiple events into a single sent message, reducing the amount of network communication needed vs sending events one at a time. +The code below assumes all events fit into a single batch. For a more complete example, see sample: [Publishing events +to specific partition][PublishEventsToSpecificPartition]. + So in v3: ```java -EventHubClient client = EventHubClient.createFromConnectionString("connection-string", +EventHubClient client = EventHubClient.createFromConnectionString("connection-string-for-an-event-hub", Executors.newScheduledThreadPool(4)).get(); List events = Arrays.asList(EventData.create("foo".getBytes()), EventData.create("bar".getBytes())); @@ -123,6 +125,8 @@ CompletableFuture sendFuture = client.createPartitionSender("my-partition- EventDataBatch batch = sender.createBatch(); for (EventData event : events) { try { + // Assuming all events fit into a single batch. This returns false if it does not. + // If that is the case, we'll send the full batch then create another one to continue adding events to. batch.tryAdd(event); } catch (PayloadSizeExceededException e) { System.err.println("Event is too large for batch. Exception: " + e); @@ -136,53 +140,28 @@ sendFuture.get(); In v5: ```java -const producer = new EventHubProducerClient(connectionString); - -const eventsToSend = [ - // events go here -]; - -let batch = await producer.createBatch(); -let i = 0; - -while (i < eventsToSend.length) { - // messages can fail to be added to the batch if they exceed the maximum size configured for - // the EventHub. - const isAdded = batch.tryAdd(eventsToSend[i]); - - if (isAdded) { - console.log(`Added event number ${i} to the batch`); - ++i; - continue; - } +List events = Arrays.asList(EventData.create("foo".getBytes()), EventData.create("bar".getBytes())); - if (batch.count === 0) { - // If we can't add it and the batch is empty that means the message we're trying to send - // is too large, even when it would be the _only_ message in the batch. - // - // At this point you'll need to decide if you're okay with skipping this message entirely - // or find some way to shrink it. - console.log(`Message was too large and can't be sent until it's made smaller. Skipping...`); - ++i; - continue; - } +EventHubProducerAsyncClient producer = new EventHubClientBuilder() + .connectionString("connection-string-for-an-event-hub") + .buildAsyncProducerClient(); - // otherwise this just signals a good spot to send our batch - console.log(`Batch is full - sending ${batch.count} messages as a single batch.`); - await producer.sendBatch(batch); +CreateBatchOptions options = new CreateBatchOptions() + .setPartitionId("my-partition-id"); - // and create a new one to house the next set of messages - batch = await producer.createBatch(); -} +Mono sendOperation = producer.createBatch(options).flatMap(batch -> { + for (EventData event : data) { + // Assuming all events fit into a single batch. This returns false if it does not. + // If that is the case, we'll send the full batch then create another one to continue adding events to. + batch.tryAdd(event); + } + return producer.send(batch); +}); -// send any remaining messages, if any. -if (batch.count > 0) { - console.log(`Sending remaining ${batch.count} messages as a single batch.`) - await producer.sendBatch(batch); -} +sendOperation.block(); ``` -### Migrating code from `EventHubClient` to `EventHubConsumerClient` for receiving events +### Migrating code from `PartitionReceiver` to `EventHubConsumerClient` for receiving events In V2, event handlers were passed as positional arguments to `receive`. From 00deca82452b74fc0321f8a95b145e619c6d2cf1 Mon Sep 17 00:00:00 2001 From: Connie Date: Fri, 13 Dec 2019 15:33:02 -0800 Subject: [PATCH 5/9] Add other content for EPH. --- .../migration-guide.md | 261 +++++++----------- 1 file changed, 101 insertions(+), 160 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md index c189702c910d..da0eee21bb5f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md @@ -16,9 +16,10 @@ For users new to the Java SDK for Event Hubs, please see the [README for azure-m - [Receiving events](#receiving-events) - [Minor renames](#minor-renames) - [Migration samples](#migration-samples) - - [Migrating from `PartitionSender` or `EventHubClient` to `EventHubProducerAsyncClient` for sending events](#migrating-from-partitionsender-or-eventhubclient-to-eventhubproducerasyncclient-for-sending-events) - - [Migrating code from `PartitionReceiver` to `EventHubConsumerClient` for receiving events](#migrating-code-from-partitionreceiver-to-eventhubconsumerclient-for-receiving-events) - - [Migrating code from `EventProcessorHost` to `EventHubConsumerClient` for receiving events](#migrating-code-from-eventprocessorhost-to-eventhubconsumerclient-for-receiving-events) + - [Migrating code from `PartitionSender` or `EventHubClient` to `EventHubProducerAsyncClient` for sending events](#migrating-code-from-partitionsender-or-eventhubclient-to-eventhubproducerasyncclient-for-sending-events) + - [Migrating code from `PartitionReceiver` to `EventHubConsumerAsyncClient` for receiving events in batches](#migrating-code-from-partitionreceiver-to-eventhubconsumerasyncclient-for-receiving-events-in-batches) + - [Migrating code from `EventProcessorHost` to `EventProcessorClient` for receiving events](#migrating-code-from-eventprocessorhost-to-eventprocessorclient-for-receiving-events) + ## Prerequisites Java Development Kit (JDK) with version 8 or above @@ -103,7 +104,7 @@ to load balance events between all the partitions. The behaviour is determined w * [Receiving events](#migrating-code-from-partitionreceiver-to-eventhubconsumerclient-for-receiving-events) * [Receiving events with checkpointing](#migrating-code-from-eventprocessorhost-to-eventhubconsumerclient-for-receiving-events) -### Migrating from `PartitionSender` or `EventHubClient` to `EventHubProducerAsyncClient` for sending events +### Migrating code from `PartitionSender` or `EventHubClient` to `EventHubProducerAsyncClient` for sending events In v3, there were multiple options on how to publish events to an Event Hub. In v5, this has been consolidated into a more efficient `send(EventDataBatch)` method. Batching merges information from @@ -115,14 +116,14 @@ to specific partition][PublishEventsToSpecificPartition]. So in v3: ```java -EventHubClient client = EventHubClient.createFromConnectionString("connection-string-for-an-event-hub", - Executors.newScheduledThreadPool(4)).get(); +EventHubClient client = EventHubClient.createFromConnectionStringSync("connection-string-for-an-event-hub", + Executors.newScheduledThreadPool(4)); List events = Arrays.asList(EventData.create("foo".getBytes()), EventData.create("bar".getBytes())); CompletableFuture sendFuture = client.createPartitionSender("my-partition-id") - .thenCompose(sender -> { - EventDataBatch batch = sender.createBatch(); + .thenCompose(producer -> { + EventDataBatch batch = producer.createBatch(); for (EventData event : events) { try { // Assuming all events fit into a single batch. This returns false if it does not. @@ -133,8 +134,9 @@ CompletableFuture sendFuture = client.createPartitionSender("my-partition- } } - return sender.send(batch); + return producer.send(batch); }); + sendFuture.get(); ``` @@ -161,176 +163,114 @@ Mono sendOperation = producer.createBatch(options).flatMap(batch -> { sendOperation.block(); ``` -### Migrating code from `PartitionReceiver` to `EventHubConsumerClient` for receiving events +### Migrating code from `PartitionReceiver` to `EventHubConsumerAsyncClient` for receiving events in batches -In V2, event handlers were passed as positional arguments to `receive`. +In v3, events were received by creating a `PartitionReceiver` and invoking `receive(int)` multiple times to receive +events up to a certain number. -In V5, event handlers are passed as part of a `SubscriptionEventHandlers` shaped object. +In v5, [project Reactor][project-reactor] is used, so events can be streamed as they come in without having to use a +batched receive approach. -For example, this code which receives from a partition in V2: +This code which receives from a partition in v3: -```typescript -const client = EventHubClient.createFromConnectionString(connectionString); -const rcvHandler = client.receive(partitionId, onMessageHandler, onErrorHandler, { - eventPosition: EventPosition.fromStart(), - consumerGroup: consumerGroupName -}); -await rcvHandler.stop(); -``` +```java +EventHubClient client = EventHubClient.createFromConnectionStringSync("connection-string-for-an-event-hub", + Executors.newScheduledThreadPool(5)); +PartitionReceiver consumer = client.createReceiverSync("my-consumer-group", "my-partition-id", + EventPosition.fromStartOfStream()); + +// Gets 100 events or until the receive timeout elapses. +consumer.receive(100).thenAccept(events -> { + for (EventData event : events) { + System.out.println("Sequence number: " + event.getSystemProperties().getSequenceNumber()); + System.out.println("Contents: " + new String(event.getBytes(), StandardCharsets.UTF_8)); + } +}).get(); -Becomes this in V5: +// Gets the next 50 events or until the receive timeout elapses. +consumer.receive(50).thenAccept(events -> { + for (EventData event : events) { + System.out.println("Sequence number: " + event.getSystemProperties().getSequenceNumber()); + System.out.println("Contents: " + new String(event.getBytes(), StandardCharsets.UTF_8)); + } +}).get(); +``` -```typescript -const eventHubConsumerClient = new EventHubConsumerClient(consumerGroupName, connectionString); +Becomes this in v5: +```java +EventHubConsumerAsyncClient consumer = new EventHubClientBuilder() + .connectionString("connection-string-for-an-event-hub") + .consumerGroup("my-consumer-group") + .buildAsyncConsumerClient(); + +// This is a non-blocking call. It'll subscribe and return a Disposable. This will stream events as they come +// in, starting from the beginning of the partition. +Disposable subscription = consumer.receiveFromPartition("my-partition-id", EventPosition.earliest()) + .subscribe(partitionEvent -> { + EventData event = partitionEvent.getData(); + System.out.println("Sequence number: " + event.getSequenceNumber()); + System.out.println("Contents: " + new String(event.getBody(), StandardCharsets.UTF_8)); + }); + +// Keep fetching events +// When you are finished, dispose of the subscription. +subscription.dispose(); +``` -const subscription = eventHubConsumerClient.subscribe( - partitionId, { - processInitialize: (initContext) => { - initContext.setStartingPosition(EventPosition.fromStart()); - }, - processEvents: onMessageHandler, - processError: onErrorHandler -}); +See [`ConsumeEvents.java`][ConsumeEvents] for a sample program demonstrating this. -await subscription.close(); -``` +### Migrating code from `EventProcessorHost` to `EventProcessorClient` for receiving events -See [`receiveEvents.ts`](https://github.com/Azure/azure-sdk-for-js/blob/master/sdk/eventhub/event-hubs/samples/receiveEvents.ts) -for a sample program demonstrating this. +In v3, `EventProcessorHost` allowed you to balance the load between multiple instances of your program and checkpoint +events when receiving. -In V5, this has been consolidated into a more efficient `sendBatch` method. -Batching merges information from multiple messages into a single send, reducing -the amount of network communication needed vs sending messages one at a time. +In v5, `EventProcessorClient` allows you to do the same and includes a plugin model, so other durable stores can be used +if desired. -So in V2: +The following code in v3: ```java -const eventsToSend = [ - // events go here -]; - -const client = EventHubClient.createFromConnectionString(connectionString); +private static void main(String[] args) throws Exception { + EventProcessorHost processor = EventProcessorHost.EventProcessorHostBuilder.newBuilder("a-processor-name", "my-consumer-group") + .useAzureStorageCheckpointLeaseManager("storage-connection-string", "storage-container-name", "prefix") + .useEventHubConnectionString("connection-string-for-an-event-hub") + .build(); -// Would fail if the total size of events exceed the max size supported by the library. -await client.sendBatch(eventsToSend, partitionId); -``` + processor.registerEventProcessor(MyEventProcessor.class).get(); -In V5: -```typescript -const producer = new EventHubProducerClient(connectionString); - -const eventsToSend = [ - // events go here -]; - -let batch = await producer.createBatch(); -let i = 0; - -while (i < eventsToSend.length) { - // messages can fail to be added to the batch if they exceed the maximum size configured for - // the EventHub. - const isAdded = batch.tryAdd(eventsToSend[i]); - - if (isAdded) { - console.log(`Added event number ${i} to the batch`); - ++i; - continue; - } - - if (batch.count === 0) { - // If we can't add it and the batch is empty that means the message we're trying to send - // is too large, even when it would be the _only_ message in the batch. - // - // At this point you'll need to decide if you're okay with skipping this message entirely - // or find some way to shrink it. - console.log(`Message was too large and can't be sent until it's made smaller. Skipping...`); - ++i; - continue; - } - - // otherwise this just signals a good spot to send our batch - console.log(`Batch is full - sending ${batch.count} messages as a single batch.`); - await producer.sendBatch(batch); - - // and create a new one to house the next set of messages - batch = await producer.createBatch(); + // When you are finished processing events. + processor.unregisterEventProcessor(); } -// send any remaining messages, if any. -if (batch.count > 0) { - console.log(`Sending remaining ${batch.count} messages as a single batch.`) - await producer.sendBatch(batch); -} -``` +class MyEventProcessor implements IEventProcessor { + @Override + public void onOpen(PartitionContext context) { + System.out.println("Started receiving on partition: " + context.getPartitionId()); + } -### Migrating code from `EventProcessorHost` to `EventHubConsumerClient` for receiving events - -In V2, `EventProcessorHost` allowed you to balance the load between multiple instances of -your program when receiving events. - -In V5, `EventHubConsumerClient` allows you to do the same with the `subscribe()` method if you -pass a `CheckpointStore` to the constructor. - -So in V2: -```typescript -const eph = EventProcessorHost.createFromConnectionString( - EventProcessorHost.createHostName(ephName), - storageConnectionString, - storageContainerName, - ehConnectionString, - { - eventHubPath: eventHubName, - onEphError: (error) => { - // This is your error handler for errors occuring during load balancing. - console.log("Error when running EPH: %O", error); - } - } -); - -// In V2, you get a single event passed to your callback. If you had asynchronous code running in your callback, -// it is not awaited before the callback is called for the next event. -const onMessage = (context, event) => { /** your code here **/ } - -// This is your error handler for errors occurring when receiving events. -const onError = (error) => { - console.log("Received Error: %O", error); -}; - -await eph.start(onMessage, onError); -``` + @Override + public void onClose(PartitionContext context, CloseReason reason) { + System.out.printf("Stopped receiving on partition: %s. Reason: %s%n", context.getPartitionId(), reason); + } -And in V5: -```typescript -import { EventHubConsumerClient, CheckpointStore } from "@azure/event-hubs"; -import { ContainerClient } from "@azure/storage-blob"; -import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"; - -const containerClient = new ContainerClient(storageConnectionString, storageContainerName); -const checkpointStore : CheckpointStore = new BlobCheckpointStore(containerClient); -const eventHubConsumerClient = new EventHubConsumerClient(consumerGroupName, ehConnectionString, eventHubName); - -const subscription = eventHubConsumerClient.subscribe( - partitionId, { - // In V5 we deliver events in batches, rather than a single message at a time. - // You can control the batch size via the options passed to the client. - // - // If your callback is an async function or returns a promise, it will be awaited before the - // callback is called for the next batch of events. - processEvents: (events, context) => { /** your code here **/ }, - - // Prior to V5 errors were handled by separate callbacks depending - // on where they were thrown i.e when managing different partitions vs receiving from each partition. - // - // In V5 you only need a single error handler for all of those cases. - processError: (error, context) => { - if (context.partitionId) { - console.log("Error when receiving events from partition %s: %O", context.partitionId, error) - } else { - console.log("Error from the consumer client: %O", error); - } - } -}); + @Override + public void onEvents(PartitionContext context, Iterable events) { + System.out.println("Received events from partition: %s." + context.getPartitionId()); + for (EventData event : events) { + System.out.println("Sequence number: " + event.getSystemProperties().getSequenceNumber()); + System.out.println("Contents: " + new String(event.getBytes(), StandardCharsets.UTF_8)); + } + } + + @Override + public void onError(PartitionContext context, Throwable error) { + System.err.printf("Error occurred on partition: %s. Error: %s%n", context.getPartitionId(), error); + } + } +``` -await subscription.close(); +And in v5: +```java +// TODO ``` @@ -351,3 +291,4 @@ await subscription.close(); [PublishEventsWithAzureIdentity]: src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithAzureIdentity.java [PublishEventsWithCustomMetadata]: src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithCustomMetadata.java [README]: README.md +[project-reactor]: https://projectreactor.io/ From 11969003bd4b4d49240b4598aca98003ff777ef1 Mon Sep 17 00:00:00 2001 From: Connie Date: Fri, 13 Dec 2019 16:00:49 -0800 Subject: [PATCH 6/9] Adding exception handling and event processor client sample. --- .../migration-guide.md | 123 +++++++++++++----- 1 file changed, 87 insertions(+), 36 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md index da0eee21bb5f..bc9d5b0609b9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md @@ -100,9 +100,9 @@ to load balance events between all the partitions. The behaviour is determined w ## Migration samples -* [Sending events](#migrating-from-partitionsender-or-eventhubclient-to-eventhubproducerasyncclient-for-sending-events) -* [Receiving events](#migrating-code-from-partitionreceiver-to-eventhubconsumerclient-for-receiving-events) -* [Receiving events with checkpointing](#migrating-code-from-eventprocessorhost-to-eventhubconsumerclient-for-receiving-events) +- [Migrating code from `PartitionSender` or `EventHubClient` to `EventHubProducerAsyncClient` for sending events](#migrating-code-from-partitionsender-or-eventhubclient-to-eventhubproducerasyncclient-for-sending-events) +- [Migrating code from `PartitionReceiver` to `EventHubConsumerAsyncClient` for receiving events in batches](#migrating-code-from-partitionreceiver-to-eventhubconsumerasyncclient-for-receiving-events-in-batches) +- [Migrating code from `EventProcessorHost` to `EventProcessorClient` for receiving events](#migrating-code-from-eventprocessorhost-to-eventprocessorclient-for-receiving-events) ### Migrating code from `PartitionSender` or `EventHubClient` to `EventHubProducerAsyncClient` for sending events In v3, there were multiple options on how to publish events to an Event Hub. @@ -124,13 +124,14 @@ List events = Arrays.asList(EventData.create("foo".getBytes()), Event CompletableFuture sendFuture = client.createPartitionSender("my-partition-id") .thenCompose(producer -> { EventDataBatch batch = producer.createBatch(); + // Assuming all events fit into a single batch. This returns false if it does not. + // If it returns false, we send the batch, create another one, and continue to + // add events to it. for (EventData event : events) { try { - // Assuming all events fit into a single batch. This returns false if it does not. - // If that is the case, we'll send the full batch then create another one to continue adding events to. batch.tryAdd(event); } catch (PayloadSizeExceededException e) { - System.err.println("Event is too large for batch. Exception: " + e); + System.err.println("Event is larger than maximum allowed size. Exception: " + e); } } @@ -154,8 +155,13 @@ CreateBatchOptions options = new CreateBatchOptions() Mono sendOperation = producer.createBatch(options).flatMap(batch -> { for (EventData event : data) { // Assuming all events fit into a single batch. This returns false if it does not. - // If that is the case, we'll send the full batch then create another one to continue adding events to. - batch.tryAdd(event); + // If it returns false, we send the batch, create another one, and continue to + // add events to it. + try { + batch.tryAdd(event); + } catch (AmqpException e) { + System.err.println("Event is larger than maximum allowed size. Exception: " + e); + } } return producer.send(batch); }); @@ -168,8 +174,8 @@ sendOperation.block(); In v3, events were received by creating a `PartitionReceiver` and invoking `receive(int)` multiple times to receive events up to a certain number. -In v5, [project Reactor][project-reactor] is used, so events can be streamed as they come in without having to use a -batched receive approach. +In v5, [project Reactor][project-reactor] is used, so events can be streamed as they come in without having to use a +batched receive approach. This code which receives from a partition in v3: @@ -222,15 +228,19 @@ See [`ConsumeEvents.java`][ConsumeEvents] for a sample program demonstrating thi ### Migrating code from `EventProcessorHost` to `EventProcessorClient` for receiving events In v3, `EventProcessorHost` allowed you to balance the load between multiple instances of your program and checkpoint -events when receiving. +events when receiving. Developers would have to create and register a concrete implementation of `IEventProcessor` to +begin consuming events. In v5, `EventProcessorClient` allows you to do the same and includes a plugin model, so other durable stores can be used -if desired. +if desired. The development model is made simpler by registering functions that would be invoked for each event. To use +Azure Blob storage for checkpointing, include +[azure-messaging-eventhubs-checkpointstore-blob][azure-messaging-eventhubs-checkpointstore-blob] as a dependency. The following code in v3: ```java private static void main(String[] args) throws Exception { - EventProcessorHost processor = EventProcessorHost.EventProcessorHostBuilder.newBuilder("a-processor-name", "my-consumer-group") + EventProcessorHost processor = EventProcessorHost.EventProcessorHostBuilder + .newBuilder("a-processor-name", "my-consumer-group") .useAzureStorageCheckpointLeaseManager("storage-connection-string", "storage-container-name", "prefix") .useEventHubConnectionString("connection-string-for-an-event-hub") .build(); @@ -241,36 +251,77 @@ private static void main(String[] args) throws Exception { processor.unregisterEventProcessor(); } -class MyEventProcessor implements IEventProcessor { - @Override - public void onOpen(PartitionContext context) { - System.out.println("Started receiving on partition: " + context.getPartitionId()); - } +public class MyEventProcessor implements IEventProcessor { + @Override + public void onOpen(PartitionContext context) { + System.out.println("Started receiving on partition: " + context.getPartitionId()); + } - @Override - public void onClose(PartitionContext context, CloseReason reason) { - System.out.printf("Stopped receiving on partition: %s. Reason: %s%n", context.getPartitionId(), reason); - } + @Override + public void onClose(PartitionContext context, CloseReason reason) { + System.out.printf("Stopped receiving on partition: %s. Reason: %s%n", context.getPartitionId(), reason); + } - @Override - public void onEvents(PartitionContext context, Iterable events) { - System.out.println("Received events from partition: %s." + context.getPartitionId()); - for (EventData event : events) { - System.out.println("Sequence number: " + event.getSystemProperties().getSequenceNumber()); - System.out.println("Contents: " + new String(event.getBytes(), StandardCharsets.UTF_8)); - } + @Override + public void onEvents(PartitionContext context, Iterable events) { + System.out.println("Received events from partition: " + context.getPartitionId()); + for (EventData event : events) { + System.out.println("Sequence number: " + event.getSystemProperties().getSequenceNumber()); + System.out.println("Contents: " + new String(event.getBytes(), StandardCharsets.UTF_8)); } + } - @Override - public void onError(PartitionContext context, Throwable error) { - System.err.printf("Error occurred on partition: %s. Error: %s%n", context.getPartitionId(), error); - } + @Override + public void onError(PartitionContext context, Throwable error) { + System.err.printf("Error occurred on partition: %s. Error: %s%n", context.getPartitionId(), error); } +} ``` -And in v5: +And in v5, implementing `MyEventProcessor` is not necessary. The callbacks are invoked for each respective event that +occurs on an owned partition. + ```java -// TODO +private static void main(String[] args) { + BlobContainerAsyncClient blobClient = new BlobContainerClientBuilder() + .connectionString("storage-connection-string") + .containerName("storage-container-name") + .buildAsyncClient(); + + EventProcessorClient processor = new EventProcessorClientBuilder() + .connectionString("connection-string-for-an-event-hub") + .consumerGroup("my-consumer-group") + .checkpointStore(new BlobCheckpointStore(blobClient)) + .processEvent(eventContext -> onEvent(eventContext)) + .processError(context -> { + System.err.printf("Error occurred on partition: %s. Error: %s%n", + context.getPartitionContext().getPartitionId(), context.getThrowable()); + }) + .processPartitionInitialization(initializationContext -> { + System.out.printf("Started receiving on partition: %s%n", + initializationContext.getPartitionContext().getPartitionId()); + }) + .processPartitionClose(closeContext -> { + System.out.printf("Stopped receiving on partition: %s. Reason: %s%n", + closeContext.getPartitionContext().getPartitionId(), + closeContext.getCloseReason()); + }) + .buildEventProcessorClient(); + + processor.start(); + + // When you are finished processing events. + processor.stop(); +} + +private static void onEvent(EventContext eventContext) { + PartitionContext partition = eventContext.getPartitionContext(); + System.out.println("Received events from partition: " + partition.getPartitionId()); + + EventData event = eventContext.getEventData(); + System.out.println("Sequence number: " + event.getSequenceNumber()); + System.out.println("Contents: " + new String(event.getBody(), StandardCharsets.UTF_8)); +} ``` @@ -287,8 +338,8 @@ And in v5: [EventProcessorClient]: src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java [EventProcessorClientBuilder]: src/main/java/com/azure/messaging/eventhubs/EventProcessorClientBuilder.java [EventProcessorClientInstantiation]: ../azure-messaging-eventhubs-checkpointstore-blob/src/samples/java/com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStoreSample.java +[project-reactor]: https://projectreactor.io/ [PublishEventsToSpecificPartition]: src/samples/java/com/azure/messaging/eventhubs/PublishEventsToSpecificPartition.java [PublishEventsWithAzureIdentity]: src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithAzureIdentity.java [PublishEventsWithCustomMetadata]: src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithCustomMetadata.java [README]: README.md -[project-reactor]: https://projectreactor.io/ From 5ee2da812d6c1476cacc26f56a9e1aedc9879c20 Mon Sep 17 00:00:00 2001 From: Connie Date: Fri, 13 Dec 2019 16:29:31 -0800 Subject: [PATCH 7/9] Add more samples. --- .../migration-guide.md | 124 +++++++++++++++--- 1 file changed, 108 insertions(+), 16 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md index bc9d5b0609b9..16c32d7aa376 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md @@ -8,6 +8,7 @@ For users new to the Java SDK for Event Hubs, please see the [README for azure-m ## Table of contents +- [Table of contents](#table-of-contents) - [Prerequisites](#prerequisites) - [Updated Maven dependencies](#updated-maven-dependencies) - [General changes](#general-changes) @@ -16,7 +17,9 @@ For users new to the Java SDK for Event Hubs, please see the [README for azure-m - [Receiving events](#receiving-events) - [Minor renames](#minor-renames) - [Migration samples](#migration-samples) - - [Migrating code from `PartitionSender` or `EventHubClient` to `EventHubProducerAsyncClient` for sending events](#migrating-code-from-partitionsender-or-eventhubclient-to-eventhubproducerasyncclient-for-sending-events) + - [Migrating code from `PartitionSender` to `EventHubProducerAsyncClient` for sending events to a partition](#migrating-code-from-partitionsender-to-eventhubproducerasyncclient-for-sending-events-to-a-partition) + - [Migrating code from `EventHubClient` to `EventHubProducerAsyncClient` for sending events using automatic routing](#migrating-code-from-eventhubclient-to-eventhubproducerasyncclient-for-sending-events-using-automatic-routing) + - [Migrating code from `EventHubClient` to `EventHubProducerAsyncClient` for sending events with partition key](#migrating-code-from-eventhubclient-to-eventhubproducerasyncclient-for-sending-events-with-partition-key) - [Migrating code from `PartitionReceiver` to `EventHubConsumerAsyncClient` for receiving events in batches](#migrating-code-from-partitionreceiver-to-eventhubconsumerasyncclient-for-receiving-events-in-batches) - [Migrating code from `EventProcessorHost` to `EventProcessorClient` for receiving events](#migrating-code-from-eventprocessorhost-to-eventprocessorclient-for-receiving-events) @@ -71,7 +74,7 @@ Creation of producers or consumers is done through either [EventHubClientBuilder |---|---|---| | `EventHubClient.createFromConnectionString()` | `var builder = new EventHubClientBuilder().connectionString();`
then either `builder.buildProducerAsyncClient();` or
`builder.consumerGroup().buildConsumerAsyncClient();` | [Publishing events][PublishEventsWithCustomMetadata], [Consuming events][ConsumeEvents] | | `EventHubClient.createWithAzureActiveDirectory()` | `var builder = new EventHubClientBuilder().tokenCredential();`
then either `builder.buildProducerAsyncClient();` or
`builder.consumerGroup().buildConsumerAsyncClient();` | [Publishing events with Azure AD][PublishEventsWithAzureIdentity] | -| `EventProcessorHost.EventProcessorHostBuilder.newBuilder()` | `new EventProcessorClientBuilder().buildEventProcessorClient()` | [EventProcessorClient with Blob storage][EventProcessorClientInstantiation] | +| `EventProcessorHost.EventProcessorHostBuilder`
`.newBuilder()` | `new EventProcessorClientBuilder().buildEventProcessorClient()` | [EventProcessorClient with Blob storage][EventProcessorClientInstantiation] | ### Sending events @@ -100,33 +103,35 @@ to load balance events between all the partitions. The behaviour is determined w ## Migration samples -- [Migrating code from `PartitionSender` or `EventHubClient` to `EventHubProducerAsyncClient` for sending events](#migrating-code-from-partitionsender-or-eventhubclient-to-eventhubproducerasyncclient-for-sending-events) +- [Migrating code from `PartitionSender` to `EventHubProducerAsyncClient` for sending events to a partition](#migrating-code-from-partitionsender-to-eventhubproducerasyncclient-for-sending-events-to-a-partition) +- [Migrating code from `EventHubClient` to `EventHubProducerAsyncClient` for sending events using automatic routing](#migrating-code-from-eventhubclient-to-eventhubproducerasyncclient-for-sending-events-using-automatic-routing) +- [Migrating code from `EventHubClient` to `EventHubProducerAsyncClient` for sending events with partition key](#migrating-code-from-eventhubclient-to-eventhubproducerasyncclient-for-sending-events-with-partition-key) - [Migrating code from `PartitionReceiver` to `EventHubConsumerAsyncClient` for receiving events in batches](#migrating-code-from-partitionreceiver-to-eventhubconsumerasyncclient-for-receiving-events-in-batches) - [Migrating code from `EventProcessorHost` to `EventProcessorClient` for receiving events](#migrating-code-from-eventprocessorhost-to-eventprocessorclient-for-receiving-events) -### Migrating code from `PartitionSender` or `EventHubClient` to `EventHubProducerAsyncClient` for sending events -In v3, there were multiple options on how to publish events to an Event Hub. +### Migrating code from `PartitionSender` to `EventHubProducerAsyncClient` for sending events to a partition + +In v3, events could be published to a single partition using `PartitionSender`. In v5, this has been consolidated into a more efficient `send(EventDataBatch)` method. Batching merges information from multiple events into a single sent message, reducing the amount of network communication needed vs sending events one at -a time. +a time. Events are published to a specific partition [`CreateBatchOptions.setPartitionId()`][CreateBatchOptions] is set +before calling `createBatch(CreateBatchOptions)`. The code below assumes all events fit into a single batch. For a more complete example, see sample: [Publishing events to specific partition][PublishEventsToSpecificPartition]. So in v3: ```java -EventHubClient client = EventHubClient.createFromConnectionStringSync("connection-string-for-an-event-hub", - Executors.newScheduledThreadPool(4)); - -List events = Arrays.asList(EventData.create("foo".getBytes()), EventData.create("bar".getBytes())); +EventHubClient client = EventHubClient.createFromConnectionStringSync( + "connection-string-for-an-event-hub", Executors.newScheduledThreadPool(4)); +List events = Arrays.asList(EventData.create("foo".getBytes())); CompletableFuture sendFuture = client.createPartitionSender("my-partition-id") .thenCompose(producer -> { EventDataBatch batch = producer.createBatch(); // Assuming all events fit into a single batch. This returns false if it does not. - // If it returns false, we send the batch, create another one, and continue to - // add events to it. + // If it returns false, we send the batch, create another, and continue to add events. for (EventData event : events) { try { batch.tryAdd(event); @@ -143,7 +148,7 @@ sendFuture.get(); In v5: ```java -List events = Arrays.asList(EventData.create("foo".getBytes()), EventData.create("bar".getBytes())); +List events = Arrays.asList(EventData.create("foo".getBytes())); EventHubProducerAsyncClient producer = new EventHubClientBuilder() .connectionString("connection-string-for-an-event-hub") @@ -153,10 +158,9 @@ CreateBatchOptions options = new CreateBatchOptions() .setPartitionId("my-partition-id"); Mono sendOperation = producer.createBatch(options).flatMap(batch -> { - for (EventData event : data) { + for (EventData event : events) { // Assuming all events fit into a single batch. This returns false if it does not. - // If it returns false, we send the batch, create another one, and continue to - // add events to it. + // If it returns false, we send the batch, create another, and continue to add events. try { batch.tryAdd(event); } catch (AmqpException e) { @@ -169,6 +173,94 @@ Mono sendOperation = producer.createBatch(options).flatMap(batch -> { sendOperation.block(); ``` +### Migrating code from `EventHubClient` to `EventHubProducerAsyncClient` for sending events using automatic routing + +In v3, events could be published to an Event Hub that allowed the service to automatically route events to an available partition. + +In v5, this has been consolidated into a more efficient `send(EventDataBatch)` method. Batching merges information from +multiple events into a single sent message, reducing the amount of network communication needed vs sending events one at +a time. Automatic routing occurs when an `EventDataBatch` is created using `createBatch()`. + +So in v3: +```java +EventHubClient client = EventHubClient.createFromConnectionStringSync( + "connection-string-for-an-event-hub", Executors.newScheduledThreadPool(4)); +List events = Arrays.asList(EventData.create("foo".getBytes())); + +EventDataBatch batch = client.createBatch(); +for (EventData event : events) { + // Assuming all events fit into a single batch. This returns false if it does not. + // If it returns false, we send the batch, create another, and continue to add events. + try { + batch.tryAdd(event); + } catch (PayloadSizeExceededException e) { + System.err.println("Event is larger than maximum allowed size. Exception: " + e); + } +} + +client.send(batch).get(); +``` + +In v5: +```java +List events = Arrays.asList(EventData.create("foo".getBytes())); + +EventHubProducerAsyncClient producer = new EventHubClientBuilder() + .connectionString("connection-string-for-an-event-hub") + .buildAsyncProducerClient(); + +Mono sendOperation = producer.createBatch().flatMap(batch -> { + for (EventData event : events) { + // Assuming all events fit into a single batch. This returns false if it does not. + // If it returns false, we send the batch, create another, and continue to add events. + try { + batch.tryAdd(event); + } catch (AmqpException e) { + System.err.println("Event is larger than maximum allowed size. Exception: " + e); + } + } + return producer.send(batch); +}); + +sendOperation.block(); +``` + +### Migrating code from `EventHubClient` to `EventHubProducerAsyncClient` for sending events with partition key + +In v3, events could be published with a partition key. + +In v5, this has been consolidated into a more efficient `send(EventDataBatch)` method. Batching merges information from +multiple events into a single sent message, reducing the amount of network communication needed vs sending events one at +a time. Events are published with a partition key when [`CreateBatchOptions.setPartitionKey()`][CreateBatchOptions] is +set before calling `createBatch(CreateBatchOptions)`. + +So in v3: +```java +EventHubClient client = EventHubClient.createFromConnectionStringSync( + "connection-string-for-an-event-hub", + Executors.newScheduledThreadPool(5)); + +BatchOptions batchOptions = new BatchOptions().with(options -> options.partitionKey = "a-key"); +EventDataBatch batch = client.createBatch(batchOptions); + +// Fill batch with events then send it. +client.send(batch).get(); +``` + +In v5: +```java +EventHubProducerAsyncClient producer = new EventHubClientBuilder() + .connectionString("connection-string-for-an-event-hub") + .buildAsyncProducerClient(); + +CreateBatchOptions options = new CreateBatchOptions() + .setPartitionKey("a-key"); +EventDataBatch batch = producer.createBatch(options).block(); + +// Fill batch with events then send it. +producer.send(batch).block(); +``` + ### Migrating code from `PartitionReceiver` to `EventHubConsumerAsyncClient` for receiving events in batches In v3, events were received by creating a `PartitionReceiver` and invoking `receive(int)` multiple times to receive From cf0cf97cf891db4ab45287ecd994f3d740350207 Mon Sep 17 00:00:00 2001 From: Connie Date: Sun, 15 Dec 2019 16:56:15 -0800 Subject: [PATCH 8/9] Adding link to additional samples. --- .../azure-messaging-eventhubs/migration-guide.md | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md index 16c32d7aa376..5a8c7470531b 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md @@ -22,6 +22,7 @@ For users new to the Java SDK for Event Hubs, please see the [README for azure-m - [Migrating code from `EventHubClient` to `EventHubProducerAsyncClient` for sending events with partition key](#migrating-code-from-eventhubclient-to-eventhubproducerasyncclient-for-sending-events-with-partition-key) - [Migrating code from `PartitionReceiver` to `EventHubConsumerAsyncClient` for receiving events in batches](#migrating-code-from-partitionreceiver-to-eventhubconsumerasyncclient-for-receiving-events-in-batches) - [Migrating code from `EventProcessorHost` to `EventProcessorClient` for receiving events](#migrating-code-from-eventprocessorhost-to-eventprocessorclient-for-receiving-events) +- [Additional samples](#additional-samples) ## Prerequisites Java Development Kit (JDK) with version 8 or above @@ -237,7 +238,7 @@ set before calling `createBatch(CreateBatchOptions)`. So in v3: ```java EventHubClient client = EventHubClient.createFromConnectionStringSync( - "connection-string-for-an-event-hub", + "connection-string-for-an-event-hub", Executors.newScheduledThreadPool(5)); BatchOptions batchOptions = new BatchOptions().with(options -> options.partitionKey = "a-key"); @@ -416,6 +417,12 @@ private static void onEvent(EventContext eventContext) { } ``` +## Additional samples + +More examples can be found at: +- [Event Hubs samples](../azure-messaging-eventhubs/src/samples/README.md) +- [Event Hubs Azure Storage checkpoint store samples](../azure-messaging-eventhubs-checkpointstore-blob/src/samples/README.md) + [azure-eventhubs-eph]: https://search.maven.org/artifact/com.microsoft.azure/azure-eventhubs-eph [azure-eventhubs]: https://search.maven.org/artifact/com.microsoft.azure/azure-eventhubs @@ -434,4 +441,4 @@ private static void onEvent(EventContext eventContext) { [PublishEventsToSpecificPartition]: src/samples/java/com/azure/messaging/eventhubs/PublishEventsToSpecificPartition.java [PublishEventsWithAzureIdentity]: src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithAzureIdentity.java [PublishEventsWithCustomMetadata]: src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithCustomMetadata.java -[README]: README.md +[README]: README.md \ No newline at end of file From 4a5760635c1ea41a8360bc25f4d45b2fffe90f40 Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 16 Dec 2019 09:48:17 -0800 Subject: [PATCH 9/9] Update issues in migration-guide. --- .../azure-messaging-eventhubs/migration-guide.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md index 5a8c7470531b..be050b1876ff 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md @@ -38,7 +38,7 @@ Dependencies for Event Hubs has been updated to: 5.0.0-beta.6 - + com.azure azure-messaging-eventhubs-checkpointstore-blob @@ -50,14 +50,14 @@ Dependencies for Event Hubs has been updated to: ## General changes In the interest of simplifying the API surface, we've made three clients, each with an asynchronous and synchronous -variant. One client for sending events, `EventHubProducerAsyncClient`, and two for receiving events. -`EventProcessorClient` is the production level consumer and `EventHubConsumerAsyncClient` for exploration and +variant. One client is for producing events, `EventHubProducerAsyncClient`, while two are intended for reading events. +`EventProcessorClient` is the production-level consumer and `EventHubConsumerAsyncClient` for exploration and lower-level control of `EventData` consumption. -[EventProcessorClient][EventProcessorClient] supports checkpointing and load balancing through a plugin model. -Currently, only Azure Blob storage is supported through +[EventProcessorClient][EventProcessorClient] supports checkpointing and load balancing using a plugin model. +Currently, only Azure Storage Blobs is supported through [azure-messaging-eventhubs-checkpointstore-blob][azure-messaging-eventhubs-checkpointstore-blob], but support for other -durable storage (i.e. Cosmos DB, Redis) can be added in the future. +durable storage (i.e. Cosmos DB, Redis) may be added in the future. | Operation | Asynchronous client | Synchronous client | |---|---|---| @@ -75,7 +75,7 @@ Creation of producers or consumers is done through either [EventHubClientBuilder |---|---|---| | `EventHubClient.createFromConnectionString()` | `var builder = new EventHubClientBuilder().connectionString();`
then either `builder.buildProducerAsyncClient();` or
`builder.consumerGroup().buildConsumerAsyncClient();` | [Publishing events][PublishEventsWithCustomMetadata], [Consuming events][ConsumeEvents] | | `EventHubClient.createWithAzureActiveDirectory()` | `var builder = new EventHubClientBuilder().tokenCredential();`
then either `builder.buildProducerAsyncClient();` or
`builder.consumerGroup().buildConsumerAsyncClient();` | [Publishing events with Azure AD][PublishEventsWithAzureIdentity] | -| `EventProcessorHost.EventProcessorHostBuilder`
`.newBuilder()` | `new EventProcessorClientBuilder().buildEventProcessorClient()` | [EventProcessorClient with Blob storage][EventProcessorClientInstantiation] | +| `EventProcessorHost.EventProcessorHostBuilder`
`.newBuilder()` | `new EventProcessorClientBuilder().buildEventProcessorClient()` | [EventProcessorClient with Azure Storage Blobs][EventProcessorClientInstantiation] | ### Sending events @@ -326,7 +326,7 @@ begin consuming events. In v5, `EventProcessorClient` allows you to do the same and includes a plugin model, so other durable stores can be used if desired. The development model is made simpler by registering functions that would be invoked for each event. To use -Azure Blob storage for checkpointing, include +Azure Storage Blobs for checkpointing, include [azure-messaging-eventhubs-checkpointstore-blob][azure-messaging-eventhubs-checkpointstore-blob] as a dependency. The following code in v3: