From a9da621a863039b9a79eb77b366a7de2081f5048 Mon Sep 17 00:00:00 2001 From: chradek Date: Tue, 30 Jul 2019 12:20:11 -0700 Subject: [PATCH 1/4] [EPH] adds support for processing multiple partitions --- sdk/eventhub/event-hubs/src/eventProcessor.ts | 171 +++++++-- sdk/eventhub/event-hubs/src/index.ts | 11 +- sdk/eventhub/event-hubs/src/log.ts | 10 + sdk/eventhub/event-hubs/src/partitionPump.ts | 23 +- sdk/eventhub/event-hubs/src/pumpManager.ts | 142 ++++++++ .../event-hubs/src/util/cancellableDelay.ts | 17 + .../event-hubs/test/eventProcessor.spec.ts | 337 +++++++++++++++++- 7 files changed, 668 insertions(+), 43 deletions(-) create mode 100644 sdk/eventhub/event-hubs/src/pumpManager.ts create mode 100644 sdk/eventhub/event-hubs/src/util/cancellableDelay.ts diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 285fb43b1043..e618f5983e37 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -1,12 +1,30 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +import uuid from "uuid/v4"; import { EventHubClient } from "./eventHubClient"; import { EventPosition } from "./eventPosition"; import { PartitionContext } from "./partitionContext"; import { CheckpointManager, Checkpoint } from "./checkpointManager"; import { EventData } from "./eventData"; -import { PartitionPump } from "./partitionPump"; +import { PumpManager } from "./pumpManager"; +import { AbortSignalLike, AbortController } from "@azure/abort-controller"; +import * as log from "./log"; +import { cancellableDelay } from "./util/cancellableDelay"; + +/** + * Reason for closing an EventProcessor. + */ +export enum CloseReason { + /** + * Ownership of the partition was lost or transitioned to a new processor instance. + */ + OwnershipLost = "OwnershipLost", + /** + * The EventProcessor was shutdown. + */ + Shutdown = "Shutdown" +} export interface PartitionProcessor { /** @@ -18,7 +36,7 @@ export interface PartitionProcessor { * This may occur when control of the partition switches to another EPH or when user stops EPH * TODO: update string -> CloseReason */ - close?(reason: string): Promise; + close?(reason: CloseReason): Promise; /** * Called when a batch of events have been received. */ @@ -79,7 +97,11 @@ export class EventProcessor { private _eventHubClient: EventHubClient; private _partitionProcessorFactory: PartitionProcessorFactory; private _processorOptions: EventProcessorOptions; - private _partitionPump?: PartitionPump; + private _pumpManager: PumpManager; + private _id: string = uuid(); + private _isRunning: boolean = false; + private _loopTask?: PromiseLike; + private _abortController?: AbortController; constructor( consumerGroupName: string, @@ -94,6 +116,93 @@ export class EventProcessor { this._eventHubClient = eventHubClient; this._partitionProcessorFactory = partitionProcessorFactory; this._processorOptions = options; + this._pumpManager = new PumpManager(this._id, options); + } + + private async _getInactivePartitions(): Promise { + try { + // get all partition ids on the event hub + const partitionIds = await this._eventHubClient.getPartitionIds(); + // get partitions this EventProcessor is actively processing + const activePartitionIds = this._pumpManager.receivingFromPartitions(); + + // get a list of partition ids that are not being processed by this EventProcessor + const inactivePartitionIds: string[] = partitionIds.filter( + (id) => activePartitionIds.indexOf(id) === -1 + ); + return inactivePartitionIds; + } catch (err) { + log.error(`[${this._id}] An error occured when retrieving partition ids: ${err}`); + throw err; + } + } + + /** + * Starts the EventProcessor loop. + * Load-balancing and partition ownership should be checked inside the loop. + * @ignore + */ + private async _runLoop(abortSignal: AbortSignalLike): Promise { + // periodically check if there is any partition not being processed and process it + const waitIntervalInMs = 30000; + while (!abortSignal.aborted) { + try { + // get a list of partition ids that are not being processed by this EventProcessor + const partitionsToAdd = await this._getInactivePartitions(); + // check if the loop has been cancelled + if (abortSignal.aborted) { + return; + } + + const tasks: PromiseLike[] = []; + // create partition pumps to process any partitions we should be processing + for (const partitionId of partitionsToAdd) { + const partitionContext: PartitionContext = { + consumerGroupName: this._consumerGroupName, + eventHubName: this._eventHubClient.eventHubName, + partitionId: partitionId + }; + + const checkpointManager = new CheckpointManager(); + + log.eventProcessor( + `[${this._id}] [${partitionId}] Calling user-provided PartitionProcessorFactory.` + ); + const partitionProcessor = this._partitionProcessorFactory( + partitionContext, + checkpointManager + ); + + // eventually this will 1st check if the existing PartitionOwnership has a position + const eventPosition = + this._processorOptions.initialEventPosition || EventPosition.earliest(); + + tasks.push( + this._pumpManager.createPump( + this._eventHubClient, + partitionContext, + eventPosition, + partitionProcessor + ) + ); + } + + // wait for all the new pumps to be created + await Promise.all(tasks); + log.eventProcessor(`[${this._id}] PartitionPumps created within EventProcessor.`); + + // sleep + log.eventProcessor( + `[${this._id}] Pausing the EventProcessor loop for ${waitIntervalInMs} ms.` + ); + await cancellableDelay(waitIntervalInMs, abortSignal); + } catch (err) { + log.error(`[${this._id}] An error occured within the EventProcessor loop: ${err}`); + } + } + + // loop has completed, remove all existing pumps + return this._pumpManager.removeAllPumps(CloseReason.Shutdown); } /** @@ -101,39 +210,18 @@ export class EventProcessor { * For each successful lease, it will get the details from the blob and start a receiver at the * point where it left off previously. * - * @return {Promise} + * @return {void} */ - async start(): Promise { - const partitionIds = await this._eventHubClient.getPartitionIds(); - const partitionContext: PartitionContext = { - partitionId: partitionIds[0], - consumerGroupName: this._consumerGroupName, - eventHubName: this._eventHubClient.eventHubName - }; - const partitionProcessor = this._partitionProcessorFactory( - partitionContext, - new CheckpointManager() - ); - if (partitionProcessor.initialize && typeof partitionProcessor.initialize !== "function") { - throw new TypeError("'initialize' must be of type 'function'."); - } - if (typeof partitionProcessor.processEvents !== "function") { - throw new TypeError("'processEvents' is required and must be of type 'function'."); - } - if (typeof partitionProcessor.processError !== "function") { - throw new TypeError("'processError' is required and must be of type 'function'."); - } - if (partitionProcessor.close && typeof partitionProcessor.close !== "function") { - throw new TypeError("'close' must be of type 'function'."); + start(): void { + if (this._isRunning) { + log.eventProcessor(`[${this._id}] Attempted to start an already running EventProcessor.`); + return; } - this._partitionPump = new PartitionPump( - this._eventHubClient, - partitionContext, - partitionProcessor, - this._processorOptions - ); - await this._partitionPump.start(partitionIds[0]); + this._isRunning = true; + this._abortController = new AbortController(); + log.eventProcessor(`[${this._id}] Starting an EventProcessor.`); + this._loopTask = this._runLoop(this._abortController.signal); } /** @@ -141,8 +229,21 @@ export class EventProcessor { * @return {Promise} */ async stop(): Promise { - if (this._partitionPump) { - await this._partitionPump.stop("Stopped processing"); + log.eventProcessor(`[${this._id}] Stopping an EventProcessor.`); + if (this._abortController) { + // cancel the event processor loop + this._abortController.abort(); + } + + this._isRunning = false; + try { + // waits for the event processor loop to complete + // will complete immediately if _loopTask is undefined + await this._loopTask; + } catch (err) { + log.error(`[${this._id}] An error occured while stopping the EventProcessor: ${err}`); + } finally { + log.eventProcessor(`[${this._id}] EventProcessor stopped.`); } } } diff --git a/sdk/eventhub/event-hubs/src/index.ts b/sdk/eventhub/event-hubs/src/index.ts index e00a632b0fd2..bda445efac2f 100644 --- a/sdk/eventhub/event-hubs/src/index.ts +++ b/sdk/eventhub/event-hubs/src/index.ts @@ -21,7 +21,16 @@ export { PartitionProperties, EventHubProperties } from "./managementClient"; export { EventHubProducer } from "./sender"; export { EventHubConsumer, EventIteratorOptions } from "./receiver"; export { EventDataBatch } from "./eventDataBatch"; -export { EventProcessor } from "./eventProcessor"; +export { CheckpointManager } from "./checkpointManager"; +export { + EventProcessor, + CloseReason, + EventProcessorOptions, + PartitionProcessor, + PartitionManager, + PartitionProcessorFactory, + PartitionOwnership +} from "./eventProcessor"; export { PartitionContext } from "./partitionContext"; export { MessagingError, diff --git a/sdk/eventhub/event-hubs/src/log.ts b/sdk/eventhub/event-hubs/src/log.ts index 6f8610aeceb0..b4ae70c53b05 100644 --- a/sdk/eventhub/event-hubs/src/log.ts +++ b/sdk/eventhub/event-hubs/src/log.ts @@ -58,3 +58,13 @@ export const iotClient = debugModule("azure:event-hubs:iothubClient"); * log statements for partitionManager */ export const partitionPump = debugModule("azure:event-hubs:partitionPump"); +/** + * @ignore + * log statements for pumpManager + */ +export const pumpManager = debugModule("azure:event-hubs:pumpManager"); +/** + * @ignore + * log statements for eventProcessor + */ +export const eventProcessor = debugModule("azure:event-hubs:eventProcessor"); diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index cd55e47949e6..c2db5423b0e6 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -2,7 +2,7 @@ // Licensed under the MIT License. import * as log from "./log"; -import { EventProcessorOptions, PartitionProcessor } from "./eventProcessor"; +import { EventProcessorOptions, PartitionProcessor, CloseReason } from "./eventProcessor"; import { PartitionContext } from "./partitionContext"; import { EventHubClient } from "./eventHubClient"; import { EventPosition } from "./eventPosition"; @@ -32,16 +32,24 @@ export class PartitionPump { this._abortController = new AbortController(); } - async start(partitionId: string): Promise { + public get isReceiving(): boolean { + return this._isReceiving; + } + + async start(): Promise { + this._isReceiving = true; if (this._partitionProcessor.initialize) { - await this._partitionProcessor.initialize(); + try { + await this._partitionProcessor.initialize(); + } catch { + this._isReceiving = false; + } } - this._receiveEvents(partitionId); + this._receiveEvents(this._partitionContext.partitionId); log.partitionPump("Successfully started the receiver."); } private async _receiveEvents(partitionId: string): Promise { - this._isReceiving = true; try { this._receiver = await this._eventHubClient.createConsumer( this._partitionContext.consumerGroupName, @@ -55,6 +63,9 @@ export class PartitionPump { this._processorOptions.maxWaitTimeInSeconds, this._abortController.signal ); + if (!this._isReceiving) { + return; + } await this._partitionProcessor.processEvents(receivedEvents); } } catch (err) { @@ -71,7 +82,7 @@ export class PartitionPump { } } - async stop(reason: string): Promise { + async stop(reason: CloseReason): Promise { this._isReceiving = false; try { if (this._receiver) { diff --git a/sdk/eventhub/event-hubs/src/pumpManager.ts b/sdk/eventhub/event-hubs/src/pumpManager.ts new file mode 100644 index 000000000000..f8e38654dc70 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/pumpManager.ts @@ -0,0 +1,142 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { EventHubClient } from "./eventHubClient"; +import { PartitionContext } from "./partitionContext"; +import { EventPosition } from "./eventPosition"; +import { PartitionProcessor, EventProcessorOptions, CloseReason } from "./eventProcessor"; +import { PartitionPump } from "./partitionPump"; +import * as log from "./log"; + +/** + * The PumpManager handles the creation and removal of PartitionPumps. + * It also starts a PartitionPump when it is created, and stops a + * PartitionPump when it is removed. + * @ignore + */ +export class PumpManager { + private readonly _eventProcessorName: string; + private readonly _options: EventProcessorOptions; + private _partitionIdToPumps: { + [partitionId: string]: PartitionPump | undefined; + } = {}; + + /** + * @ignore + */ + constructor(eventProcessorName: string, eventProcessorOptions: EventProcessorOptions = {}) { + this._eventProcessorName = eventProcessorName; + this._options = eventProcessorOptions; + } + + /** + * Returns a list of partitionIds that are actively receiving messages. + * @ignore + */ + public receivingFromPartitions(): string[] { + return Object.keys(this._partitionIdToPumps).filter((id) => { + const pump = this._partitionIdToPumps[id]; + return Boolean(pump && pump.isReceiving); + }); + } + + /** + * Creates and starts a PartitionPump. + * @param eventHubClient The EventHubClient to forward to the PartitionPump. + * @param partitionContext The PartitionContext to forward to the PartitionPump. + * @param initialEventPosition The EventPosition to forward to the PartitionPump. + * @param partitionProcessor The PartitionProcessor to forward to the PartitionPump. + * @param abortSignal Used to cancel pump creation. + * @ignore + */ + public async createPump( + eventHubClient: EventHubClient, + partitionContext: PartitionContext, + initialEventPosition: EventPosition, + partitionProcessor: PartitionProcessor + ): Promise { + const partitionId = partitionContext.partitionId; + // attempt to get an existing pump + const existingPump = this._partitionIdToPumps[partitionId]; + if (existingPump) { + if (existingPump.isReceiving) { + log.pumpManager( + `[${this._eventProcessorName}] [${partitionId}] The existing pump is running.` + ); + return; + } + log.pumpManager( + `[${this._eventProcessorName}] [${partitionId}] The existing pump is not running.` + ); + await this.removePump(partitionId, CloseReason.OwnershipLost); + } + + log.pumpManager(`[${this._eventProcessorName}] [${partitionId}] Creating a new pump.`); + + const pump = new PartitionPump(eventHubClient, partitionContext, partitionProcessor, { + ...this._options, + initialEventPosition + }); + + try { + await pump.start(); + this._partitionIdToPumps[partitionId] = pump; + } catch (err) { + log.error( + `[${this._eventProcessorName}] [${partitionId}] An error occured while adding/updating a pump: ${err}` + ); + } + } + + /** + * Stop a PartitionPump and removes it from the internal map. + * @param partitionId The partitionId to remove the associated PartitionPump from. + * @param reason The reason for removing the pump. + * @ignore + */ + public async removePump(partitionId: string, reason: CloseReason): Promise { + try { + const pump = this._partitionIdToPumps[partitionId]; + if (pump) { + delete this._partitionIdToPumps[partitionId]; + log.pumpManager(`[${this._eventProcessorName}] [${partitionId}] Stopping the pump.`); + await pump.stop(reason); + } else { + log.pumpManager( + `[${this._eventProcessorName}] [${partitionId}] No pump was found to remove.` + ); + } + } catch (err) { + log.error( + `[${this._eventProcessorName}] [${partitionId}] An error occured while removing a pump: ${err}` + ); + } + } + + /** + * Stops all PartitionPumps and removes them from the internal map. + * @param reason The reason for removing the pump. + * @ignore + */ + public async removeAllPumps(reason: CloseReason): Promise { + const partitionIds = Object.keys(this._partitionIdToPumps); + + log.pumpManager(`[${this._eventProcessorName}] Removing all pumps due to reason ${reason}.`); + + const tasks: PromiseLike[] = []; + for (const partitionId of partitionIds) { + const pump = this._partitionIdToPumps[partitionId]; + if (pump) { + tasks.push(pump.stop(reason)); + } + } + + try { + await Promise.all(tasks); + } catch (err) { + log.error(`[${this._eventProcessorName}] An error occured while removing all pumps: ${err}`); + } finally { + this._partitionIdToPumps = {}; + } + } +} diff --git a/sdk/eventhub/event-hubs/src/util/cancellableDelay.ts b/sdk/eventhub/event-hubs/src/util/cancellableDelay.ts new file mode 100644 index 000000000000..45a037e9f36a --- /dev/null +++ b/sdk/eventhub/event-hubs/src/util/cancellableDelay.ts @@ -0,0 +1,17 @@ +import { AbortSignalLike, AbortError } from "@azure/abort-controller"; + +export function cancellableDelay(delayInMs: number, abortSignal?: AbortSignalLike): Promise { + return new Promise((resolve, reject) => { + if (abortSignal && abortSignal.aborted) { + return reject(new AbortError(`The delay was cancelled by the user.`)); + } + + const timer = setTimeout(resolve, delayInMs); + if (abortSignal) { + abortSignal.addEventListener("abort", () => { + clearTimeout(timer); + reject(new AbortError(`The delay was cancelled by the user.`)); + }); + } + }); +} diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 2fdb84605c45..abb870103b43 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -13,7 +13,9 @@ import { EventData, EventProcessor, PartitionContext, - delay + delay, + PartitionProcessorFactory, + CloseReason } from "../src"; import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; const env = getEnvVars(); @@ -39,7 +41,340 @@ describe("Event Processor", function(): void { await client.close(); }); + it("should treat consecutive start invocations as idempotent", async function(): Promise { + const partitionIds = await client.getPartitionIds(); + + // ensure we have at least 2 partitions + partitionIds.length.should.gte(2); + + const partitionResultsMap = new Map< + string, + { events: string[]; initialized: boolean; closeReason?: CloseReason } + >(); + partitionIds.forEach((id) => partitionResultsMap.set(id, { events: [], initialized: false })); + let didError = false; + + // The partitionProcess will need to add events to the partitionResultsMap as they are received + const factory: PartitionProcessorFactory = (context) => { + return { + async initialize() { + partitionResultsMap.get(context.partitionId)!.initialized = true; + }, + async close(reason) { + partitionResultsMap.get(context.partitionId)!.closeReason = reason; + }, + async processEvents(events) { + const existingEvents = partitionResultsMap.get(context.partitionId)!.events; + events.forEach((event) => existingEvents.push(event.body)); + }, + async processError() { + didError = true; + } + }; + }; + + const processor = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + factory, + undefined as any, + { + initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) + } + ); + + processor.start(); + processor.start(); + processor.start(); + + // create messages + const expectedMessagePrefix = "EventProcessor test - multiple partitions - "; + for (const partitionId of partitionIds) { + const producer = client.createProducer({ partitionId }); + await producer.send({ body: expectedMessagePrefix + partitionId }); + await producer.close(); + } + + // shutdown the processor + await processor.stop(); + + didError.should.be.false; + // validate correct events captured for each partition + for (const partitionId of partitionIds) { + const results = partitionResultsMap.get(partitionId)!; + const events = results.events; + events.length.should.equal(1); + events[0].should.equal(expectedMessagePrefix + partitionId); + results.initialized.should.be.true; + (results.closeReason === CloseReason.Shutdown).should.be.true; + } + }); + + it("should not throw if stop is called without start", async function(): Promise { + let didPartitionProcessorStart = false; + + // The partitionProcess will need to add events to the partitionResultsMap as they are received + const factory: PartitionProcessorFactory = (context) => { + return { + async initialize() { + didPartitionProcessorStart = true; + }, + async close() { + didPartitionProcessorStart = true; + }, + async processEvents(events) { + didPartitionProcessorStart = true; + }, + async processError() { + didPartitionProcessorStart = true; + } + }; + }; + + const processor = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + factory, + undefined as any, + { + initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) + } + ); + + // shutdown the processor + await processor.stop(); + + didPartitionProcessorStart.should.be.false; + }); + + it("should support start after stopping", async function(): Promise { + const partitionIds = await client.getPartitionIds(); + + // ensure we have at least 2 partitions + partitionIds.length.should.gte(2); + + const partitionResultsMap = new Map< + string, + { events: string[]; initialized: boolean; closeReason?: CloseReason } + >(); + partitionIds.forEach((id) => partitionResultsMap.set(id, { events: [], initialized: false })); + let didError = false; + + // The partitionProcess will need to add events to the partitionResultsMap as they are received + const factory: PartitionProcessorFactory = (context) => { + return { + async initialize() { + partitionResultsMap.get(context.partitionId)!.initialized = true; + }, + async close(reason) { + partitionResultsMap.get(context.partitionId)!.closeReason = reason; + }, + async processEvents(events) { + const existingEvents = partitionResultsMap.get(context.partitionId)!.events; + events.forEach((event) => existingEvents.push(event.body)); + }, + async processError() { + didError = true; + } + }; + }; + + const processor = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + factory, + undefined as any, + { + initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) + } + ); + + processor.start(); + + // create messages + const expectedMessagePrefix = "EventProcessor test - multiple partitions - "; + for (const partitionId of partitionIds) { + const producer = client.createProducer({ partitionId }); + await producer.send({ body: expectedMessagePrefix + partitionId }); + await producer.close(); + } + + // set a delay to give a consumers a chance to receive a message + await delay(1000); + + // shutdown the processor + await processor.stop(); + + didError.should.be.false; + // validate correct events captured for each partition + for (const partitionId of partitionIds) { + const results = partitionResultsMap.get(partitionId)!; + const events = results.events; + events.length.should.equal(1); + events[0].should.equal(expectedMessagePrefix + partitionId); + results.initialized.should.be.true; + (results.closeReason === CloseReason.Shutdown).should.be.true; + // reset fields + results.initialized = false; + results.closeReason = undefined; + results.events = []; + } + + // start it again + // note: since checkpointing isn't implemented yet, + // EventProcessor will retrieve events from the initialEventPosition. + processor.start(); + + // set a delay to give a consumers a chance to receive a message + await delay(1000); + + await processor.stop(); + + didError.should.be.false; + // validate correct events captured for each partition + for (const partitionId of partitionIds) { + const results = partitionResultsMap.get(partitionId)!; + const events = results.events; + events.length.should.equal(1); + events[0].should.equal(expectedMessagePrefix + partitionId); + results.initialized.should.be.true; + (results.closeReason === CloseReason.Shutdown).should.be.true; + } + }); + describe("Partition processor", function(): void { + it("should support processing events across multiple partitions", async function(): Promise< + void + > { + const partitionIds = await client.getPartitionIds(); + + // ensure we have at least 2 partitions + partitionIds.length.should.gte(2); + + const partitionResultsMap = new Map< + string, + { events: string[]; initialized: boolean; closeReason?: CloseReason } + >(); + partitionIds.forEach((id) => partitionResultsMap.set(id, { events: [], initialized: false })); + let didError = false; + + // The partitionProcess will need to add events to the partitionResultsMap as they are received + const factory: PartitionProcessorFactory = (context) => { + return { + async initialize() { + partitionResultsMap.get(context.partitionId)!.initialized = true; + }, + async close(reason) { + partitionResultsMap.get(context.partitionId)!.closeReason = reason; + }, + async processEvents(events) { + const existingEvents = partitionResultsMap.get(context.partitionId)!.events; + events.forEach((event) => existingEvents.push(event.body)); + }, + async processError() { + didError = true; + } + }; + }; + + const processor = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + factory, + undefined as any, + { + initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) + } + ); + + processor.start(); + + // create messages + const expectedMessagePrefix = "EventProcessor test - multiple partitions - "; + for (const partitionId of partitionIds) { + const producer = client.createProducer({ partitionId }); + await producer.send({ body: expectedMessagePrefix + partitionId }); + await producer.close(); + } + + // set a delay to give a consumers a chance to receive a message + await delay(1000); + + // shutdown the processor + await processor.stop(); + + didError.should.be.false; + // validate correct events captured for each partition + for (const partitionId of partitionIds) { + const results = partitionResultsMap.get(partitionId)!; + const events = results.events; + events.length.should.equal(1); + events[0].should.equal(expectedMessagePrefix + partitionId); + results.initialized.should.be.true; + (results.closeReason === CloseReason.Shutdown).should.be.true; + } + }); + + it("should support processing events across multiple partitions without initialize or close", async function(): Promise< + void + > { + const partitionIds = await client.getPartitionIds(); + + // ensure we have at least 2 partitions + partitionIds.length.should.gte(2); + + const partitionResultsMap = new Map(); + partitionIds.forEach((id) => partitionResultsMap.set(id, [])); + let didError = false; + + // The partitionProcess will need to add events to the partitionResultsMap as they are received + const factory: PartitionProcessorFactory = (context) => { + return { + async processEvents(events) { + const existingEvents = partitionResultsMap.get(context.partitionId)!; + events.forEach((event) => existingEvents.push(event.body)); + }, + async processError() { + didError = true; + } + }; + }; + + const processor = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + factory, + undefined as any, + { + initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) + } + ); + + processor.start(); + + // create messages + const expectedMessagePrefix = "EventProcessor test - multiple partitions - "; + for (const partitionId of partitionIds) { + const producer = client.createProducer({ partitionId }); + await producer.send({ body: expectedMessagePrefix + partitionId }); + await producer.close(); + } + + // set a delay to give a consumers a chance to receive a message + await delay(1000); + + // shutdown the processor + await processor.stop(); + + didError.should.be.false; + // validate correct events captured for each partition + for (const partitionId of partitionIds) { + const events = partitionResultsMap.get(partitionId)!; + events.length.should.equal(1); + events[0].should.equal(expectedMessagePrefix + partitionId); + } + }); + it("should call methods on a PartitionProcessor ", async function(): Promise { const receivedEvents: EventData[] = []; let isinitializeCalled = false; From bcd0246cadc62ace2bbbf9f1eb350ba28d07ddcb Mon Sep 17 00:00:00 2001 From: chradek Date: Tue, 30 Jul 2019 12:27:15 -0700 Subject: [PATCH 2/4] adds type check for initialize/close in partitionPump --- sdk/eventhub/event-hubs/src/partitionPump.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index c2db5423b0e6..e87bc3423907 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -38,7 +38,7 @@ export class PartitionPump { async start(): Promise { this._isReceiving = true; - if (this._partitionProcessor.initialize) { + if (typeof this._partitionProcessor.initialize === "function") { try { await this._partitionProcessor.initialize(); } catch { @@ -89,7 +89,7 @@ export class PartitionPump { await this._receiver.close(); } this._abortController.abort(); - if (this._partitionProcessor.close) { + if (typeof this._partitionProcessor.close === "function") { await this._partitionProcessor.close(reason); } } catch (err) { From c5b9aa00761dca31b3c55b67c542080b0f00e41d Mon Sep 17 00:00:00 2001 From: chradek Date: Tue, 30 Jul 2019 12:31:18 -0700 Subject: [PATCH 3/4] updates API review --- .../event-hubs/review/event-hubs.api.md | 77 ++++++++++++++++++- 1 file changed, 73 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 0423821806b9..5b2f3306d2c4 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -28,6 +28,20 @@ export interface BatchOptions { partitionKey?: string; } +// @public +export class CheckpointManager { + // (undocumented) + updateCheckpoint(eventData: EventData): Promise; + // (undocumented) + updateCheckpoint(offset: string, sequenceNumber: number): Promise; +} + +// @public +export enum CloseReason { + OwnershipLost = "OwnershipLost", + Shutdown = "Shutdown" +} + export { DataTransformer } export { DefaultDataTransformer } @@ -148,14 +162,21 @@ export class EventPosition { // @public export class EventProcessor { - // Warning: (ae-forgotten-export) The symbol "PartitionProcessorFactory" needs to be exported by the entry point index.d.ts - // Warning: (ae-forgotten-export) The symbol "PartitionManager" needs to be exported by the entry point index.d.ts - // Warning: (ae-forgotten-export) The symbol "EventProcessorOptions" needs to be exported by the entry point index.d.ts constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions); - start(): Promise; + start(): void; stop(): Promise; } +// @public (undocumented) +export interface EventProcessorOptions { + // (undocumented) + initialEventPosition?: EventPosition; + // (undocumented) + maxBatchSize?: number; + // (undocumented) + maxWaitTimeInSeconds?: number; +} + export { MessagingError } // @public @@ -174,6 +195,54 @@ export interface PartitionContext { readonly partitionId: string; } +// @public +export interface PartitionManager { + // (undocumented) + claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise; + // Warning: (ae-forgotten-export) The symbol "Checkpoint" needs to be exported by the entry point index.d.ts + // + // (undocumented) + createCheckpoint(checkpoint: Checkpoint): Promise; + // (undocumented) + listOwnerships(eventHubName: string, consumerGroupName: string): Promise; +} + +// @public +export interface PartitionOwnership { + // (undocumented) + consumerGroupName: string; + // (undocumented) + ETag?: string; + // (undocumented) + eventHubName: string; + // (undocumented) + instanceId: string; + // (undocumented) + lastModifiedTime?: number; + // (undocumented) + offset?: number; + // (undocumented) + ownerLevel: number; + // (undocumented) + partitionId: string; + // (undocumented) + sequenceNumber?: number; +} + +// @public (undocumented) +export interface PartitionProcessor { + close?(reason: CloseReason): Promise; + initialize?(): Promise; + processError(error: Error): Promise; + processEvents(events: EventData[]): Promise; +} + +// @public +export interface PartitionProcessorFactory { + // (undocumented) + (context: PartitionContext, checkpointManager: CheckpointManager): PartitionProcessor; +} + // @public export interface PartitionProperties { beginningSequenceNumber: number; From 956a7f52a369b5834428394b1e7242e0f8c6d01d Mon Sep 17 00:00:00 2001 From: chradek Date: Tue, 30 Jul 2019 15:28:50 -0700 Subject: [PATCH 4/4] addresses comments --- sdk/eventhub/event-hubs/src/eventProcessor.ts | 8 ++++++-- sdk/eventhub/event-hubs/src/partitionPump.ts | 2 +- sdk/eventhub/event-hubs/src/pumpManager.ts | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index e618f5983e37..bba3db587fd2 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -13,7 +13,7 @@ import * as log from "./log"; import { cancellableDelay } from "./util/cancellableDelay"; /** - * Reason for closing an EventProcessor. + * Reason for closing a PartitionProcessor. */ export enum CloseReason { /** @@ -23,7 +23,11 @@ export enum CloseReason { /** * The EventProcessor was shutdown. */ - Shutdown = "Shutdown" + Shutdown = "Shutdown", + /** + * The PartitionProcessor was shutdown for an unknown reason. + */ + Unknown = "Unknown" } export interface PartitionProcessor { diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index e87bc3423907..759875a957a4 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -42,7 +42,7 @@ export class PartitionPump { try { await this._partitionProcessor.initialize(); } catch { - this._isReceiving = false; + // swallow the error from the user-defined code } } this._receiveEvents(this._partitionContext.partitionId); diff --git a/sdk/eventhub/event-hubs/src/pumpManager.ts b/sdk/eventhub/event-hubs/src/pumpManager.ts index f8e38654dc70..8b430fa45acc 100644 --- a/sdk/eventhub/event-hubs/src/pumpManager.ts +++ b/sdk/eventhub/event-hubs/src/pumpManager.ts @@ -68,7 +68,7 @@ export class PumpManager { log.pumpManager( `[${this._eventProcessorName}] [${partitionId}] The existing pump is not running.` ); - await this.removePump(partitionId, CloseReason.OwnershipLost); + await this.removePump(partitionId, CloseReason.Unknown); } log.pumpManager(`[${this._eventProcessorName}] [${partitionId}] Creating a new pump.`);