Skip to content

Commit

Permalink
[EPH] adds support for processing multiple partitions (#4535)
Browse files Browse the repository at this point in the history
  • Loading branch information
chradek authored Jul 30, 2019
1 parent 3447e89 commit 6bc44fa
Show file tree
Hide file tree
Showing 8 changed files with 747 additions and 49 deletions.
77 changes: 73 additions & 4 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ export interface BatchOptions {
partitionKey?: string;
}

// @public
export class CheckpointManager {
// (undocumented)
updateCheckpoint(eventData: EventData): Promise<void>;
// (undocumented)
updateCheckpoint(offset: string, sequenceNumber: number): Promise<void>;
}

// @public
export enum CloseReason {
OwnershipLost = "OwnershipLost",
Shutdown = "Shutdown"
}

export { DataTransformer }

export { DefaultDataTransformer }
Expand Down Expand Up @@ -149,14 +163,21 @@ export class EventPosition {

// @public
export class EventProcessor {
// Warning: (ae-forgotten-export) The symbol "PartitionProcessorFactory" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "PartitionManager" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "EventProcessorOptions" needs to be exported by the entry point index.d.ts
constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions);
start(): Promise<void>;
start(): void;
stop(): Promise<void>;
}

// @public (undocumented)
export interface EventProcessorOptions {
// (undocumented)
initialEventPosition?: EventPosition;
// (undocumented)
maxBatchSize?: number;
// (undocumented)
maxWaitTimeInSeconds?: number;
}

export { MessagingError }

// @public
Expand All @@ -175,6 +196,54 @@ export interface PartitionContext {
readonly partitionId: string;
}

// @public
export interface PartitionManager {
// (undocumented)
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
// Warning: (ae-forgotten-export) The symbol "Checkpoint" needs to be exported by the entry point index.d.ts
//
// (undocumented)
createCheckpoint(checkpoint: Checkpoint): Promise<void>;
// (undocumented)
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
}

// @public
export interface PartitionOwnership {
// (undocumented)
consumerGroupName: string;
// (undocumented)
ETag?: string;
// (undocumented)
eventHubName: string;
// (undocumented)
instanceId: string;
// (undocumented)
lastModifiedTime?: number;
// (undocumented)
offset?: number;
// (undocumented)
ownerLevel: number;
// (undocumented)
partitionId: string;
// (undocumented)
sequenceNumber?: number;
}

// @public (undocumented)
export interface PartitionProcessor {
close?(reason: CloseReason): Promise<void>;
initialize?(): Promise<void>;
processError(error: Error): Promise<void>;
processEvents(events: EventData[]): Promise<void>;
}

// @public
export interface PartitionProcessorFactory {
// (undocumented)
(context: PartitionContext, checkpointManager: CheckpointManager): PartitionProcessor;
}

// @public
export interface PartitionProperties {
beginningSequenceNumber: number;
Expand Down
175 changes: 140 additions & 35 deletions sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import uuid from "uuid/v4";
import { EventHubClient } from "./eventHubClient";
import { EventPosition } from "./eventPosition";
import { PartitionContext } from "./partitionContext";
import { CheckpointManager, Checkpoint } from "./checkpointManager";
import { EventData } from "./eventData";
import { PartitionPump } from "./partitionPump";
import { PumpManager } from "./pumpManager";
import { AbortSignalLike, AbortController } from "@azure/abort-controller";
import * as log from "./log";
import { cancellableDelay } from "./util/cancellableDelay";

/**
* Reason for closing a PartitionProcessor.
*/
export enum CloseReason {
/**
* Ownership of the partition was lost or transitioned to a new processor instance.
*/
OwnershipLost = "OwnershipLost",
/**
* The EventProcessor was shutdown.
*/
Shutdown = "Shutdown",
/**
* The PartitionProcessor was shutdown for an unknown reason.
*/
Unknown = "Unknown"
}

export interface PartitionProcessor {
/**
Expand All @@ -18,7 +40,7 @@ export interface PartitionProcessor {
* This may occur when control of the partition switches to another EPH or when user stops EPH
* TODO: update string -> CloseReason
*/
close?(reason: string): Promise<void>;
close?(reason: CloseReason): Promise<void>;
/**
* Called when a batch of events have been received.
*/
Expand Down Expand Up @@ -79,7 +101,11 @@ export class EventProcessor {
private _eventHubClient: EventHubClient;
private _partitionProcessorFactory: PartitionProcessorFactory;
private _processorOptions: EventProcessorOptions;
private _partitionPump?: PartitionPump;
private _pumpManager: PumpManager;
private _id: string = uuid();
private _isRunning: boolean = false;
private _loopTask?: PromiseLike<void>;
private _abortController?: AbortController;

constructor(
consumerGroupName: string,
Expand All @@ -94,55 +120,134 @@ export class EventProcessor {
this._eventHubClient = eventHubClient;
this._partitionProcessorFactory = partitionProcessorFactory;
this._processorOptions = options;
this._pumpManager = new PumpManager(this._id, options);
}

private async _getInactivePartitions(): Promise<string[]> {
try {
// get all partition ids on the event hub
const partitionIds = await this._eventHubClient.getPartitionIds();
// get partitions this EventProcessor is actively processing
const activePartitionIds = this._pumpManager.receivingFromPartitions();

// get a list of partition ids that are not being processed by this EventProcessor
const inactivePartitionIds: string[] = partitionIds.filter(
(id) => activePartitionIds.indexOf(id) === -1
);
return inactivePartitionIds;
} catch (err) {
log.error(`[${this._id}] An error occured when retrieving partition ids: ${err}`);
throw err;
}
}

/**
* Starts the EventProcessor loop.
* Load-balancing and partition ownership should be checked inside the loop.
* @ignore
*/
private async _runLoop(abortSignal: AbortSignalLike): Promise<void> {
// periodically check if there is any partition not being processed and process it
const waitIntervalInMs = 30000;
while (!abortSignal.aborted) {
try {
// get a list of partition ids that are not being processed by this EventProcessor
const partitionsToAdd = await this._getInactivePartitions();
// check if the loop has been cancelled
if (abortSignal.aborted) {
return;
}

const tasks: PromiseLike<void>[] = [];
// create partition pumps to process any partitions we should be processing
for (const partitionId of partitionsToAdd) {
const partitionContext: PartitionContext = {
consumerGroupName: this._consumerGroupName,
eventHubName: this._eventHubClient.eventHubName,
partitionId: partitionId
};

const checkpointManager = new CheckpointManager();

log.eventProcessor(
`[${this._id}] [${partitionId}] Calling user-provided PartitionProcessorFactory.`
);
const partitionProcessor = this._partitionProcessorFactory(
partitionContext,
checkpointManager
);

// eventually this will 1st check if the existing PartitionOwnership has a position
const eventPosition =
this._processorOptions.initialEventPosition || EventPosition.earliest();

tasks.push(
this._pumpManager.createPump(
this._eventHubClient,
partitionContext,
eventPosition,
partitionProcessor
)
);
}

// wait for all the new pumps to be created
await Promise.all(tasks);
log.eventProcessor(`[${this._id}] PartitionPumps created within EventProcessor.`);

// sleep
log.eventProcessor(
`[${this._id}] Pausing the EventProcessor loop for ${waitIntervalInMs} ms.`
);
await cancellableDelay(waitIntervalInMs, abortSignal);
} catch (err) {
log.error(`[${this._id}] An error occured within the EventProcessor loop: ${err}`);
}
}

// loop has completed, remove all existing pumps
return this._pumpManager.removeAllPumps(CloseReason.Shutdown);
}

/**
* Starts the event processor, fetching the list of partitions, and attempting to grab leases
* For each successful lease, it will get the details from the blob and start a receiver at the
* point where it left off previously.
*
* @return {Promise<void>}
* @return {void}
*/
async start(): Promise<void> {
const partitionIds = await this._eventHubClient.getPartitionIds();
const partitionContext: PartitionContext = {
partitionId: partitionIds[0],
consumerGroupName: this._consumerGroupName,
eventHubName: this._eventHubClient.eventHubName
};
const partitionProcessor = this._partitionProcessorFactory(
partitionContext,
new CheckpointManager()
);
if (partitionProcessor.initialize && typeof partitionProcessor.initialize !== "function") {
throw new TypeError("'initialize' must be of type 'function'.");
}
if (typeof partitionProcessor.processEvents !== "function") {
throw new TypeError("'processEvents' is required and must be of type 'function'.");
}
if (typeof partitionProcessor.processError !== "function") {
throw new TypeError("'processError' is required and must be of type 'function'.");
}
if (partitionProcessor.close && typeof partitionProcessor.close !== "function") {
throw new TypeError("'close' must be of type 'function'.");
start(): void {
if (this._isRunning) {
log.eventProcessor(`[${this._id}] Attempted to start an already running EventProcessor.`);
return;
}

this._partitionPump = new PartitionPump(
this._eventHubClient,
partitionContext,
partitionProcessor,
this._processorOptions
);
await this._partitionPump.start(partitionIds[0]);
this._isRunning = true;
this._abortController = new AbortController();
log.eventProcessor(`[${this._id}] Starting an EventProcessor.`);
this._loopTask = this._runLoop(this._abortController.signal);
}

/**
* Stops the EventProcessor from processing messages.
* @return {Promise<void>}
*/
async stop(): Promise<void> {
if (this._partitionPump) {
await this._partitionPump.stop("Stopped processing");
log.eventProcessor(`[${this._id}] Stopping an EventProcessor.`);
if (this._abortController) {
// cancel the event processor loop
this._abortController.abort();
}

this._isRunning = false;
try {
// waits for the event processor loop to complete
// will complete immediately if _loopTask is undefined
await this._loopTask;
} catch (err) {
log.error(`[${this._id}] An error occured while stopping the EventProcessor: ${err}`);
} finally {
log.eventProcessor(`[${this._id}] EventProcessor stopped.`);
}
}
}
11 changes: 10 additions & 1 deletion sdk/eventhub/event-hubs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@ export { PartitionProperties, EventHubProperties } from "./managementClient";
export { EventHubProducer } from "./sender";
export { EventHubConsumer, EventIteratorOptions } from "./receiver";
export { EventDataBatch } from "./eventDataBatch";
export { EventProcessor } from "./eventProcessor";
export { CheckpointManager } from "./checkpointManager";
export {
EventProcessor,
CloseReason,
EventProcessorOptions,
PartitionProcessor,
PartitionManager,
PartitionProcessorFactory,
PartitionOwnership
} from "./eventProcessor";
export { PartitionContext } from "./partitionContext";
export {
MessagingError,
Expand Down
10 changes: 10 additions & 0 deletions sdk/eventhub/event-hubs/src/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,13 @@ export const iotClient = debugModule("azure:event-hubs:iothubClient");
* log statements for partitionManager
*/
export const partitionPump = debugModule("azure:event-hubs:partitionPump");
/**
* @ignore
* log statements for pumpManager
*/
export const pumpManager = debugModule("azure:event-hubs:pumpManager");
/**
* @ignore
* log statements for eventProcessor
*/
export const eventProcessor = debugModule("azure:event-hubs:eventProcessor");
Loading

0 comments on commit 6bc44fa

Please sign in to comment.