Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPH] adds support for processing multiple partitions #4535

Merged
merged 4 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 73 additions & 4 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ export interface BatchOptions {
partitionKey?: string;
}

// @public
export class CheckpointManager {
// (undocumented)
updateCheckpoint(eventData: EventData): Promise<void>;
// (undocumented)
updateCheckpoint(offset: string, sequenceNumber: number): Promise<void>;
}

// @public
export enum CloseReason {
OwnershipLost = "OwnershipLost",
Shutdown = "Shutdown"
}

export { DataTransformer }

export { DefaultDataTransformer }
Expand Down Expand Up @@ -148,14 +162,21 @@ export class EventPosition {

// @public
export class EventProcessor {
// Warning: (ae-forgotten-export) The symbol "PartitionProcessorFactory" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "PartitionManager" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "EventProcessorOptions" needs to be exported by the entry point index.d.ts
constructor(consumerGroupName: string, eventHubClient: EventHubClient, partitionProcessorFactory: PartitionProcessorFactory, partitionManager: PartitionManager, options?: EventProcessorOptions);
start(): Promise<void>;
start(): void;
stop(): Promise<void>;
}

// @public (undocumented)
export interface EventProcessorOptions {
// (undocumented)
initialEventPosition?: EventPosition;
// (undocumented)
maxBatchSize?: number;
// (undocumented)
maxWaitTimeInSeconds?: number;
}

export { MessagingError }

// @public
Expand All @@ -174,6 +195,54 @@ export interface PartitionContext {
readonly partitionId: string;
}

// @public
export interface PartitionManager {
// (undocumented)
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
// Warning: (ae-forgotten-export) The symbol "Checkpoint" needs to be exported by the entry point index.d.ts
//
// (undocumented)
createCheckpoint(checkpoint: Checkpoint): Promise<void>;
// (undocumented)
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
}

// @public
export interface PartitionOwnership {
// (undocumented)
consumerGroupName: string;
// (undocumented)
ETag?: string;
// (undocumented)
eventHubName: string;
// (undocumented)
instanceId: string;
// (undocumented)
lastModifiedTime?: number;
// (undocumented)
offset?: number;
// (undocumented)
ownerLevel: number;
// (undocumented)
partitionId: string;
// (undocumented)
sequenceNumber?: number;
}

// @public (undocumented)
export interface PartitionProcessor {
close?(reason: CloseReason): Promise<void>;
initialize?(): Promise<void>;
processError(error: Error): Promise<void>;
processEvents(events: EventData[]): Promise<void>;
}

// @public
export interface PartitionProcessorFactory {
// (undocumented)
(context: PartitionContext, checkpointManager: CheckpointManager): PartitionProcessor;
}

// @public
export interface PartitionProperties {
beginningSequenceNumber: number;
Expand Down
175 changes: 140 additions & 35 deletions sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import uuid from "uuid/v4";
import { EventHubClient } from "./eventHubClient";
import { EventPosition } from "./eventPosition";
import { PartitionContext } from "./partitionContext";
import { CheckpointManager, Checkpoint } from "./checkpointManager";
import { EventData } from "./eventData";
import { PartitionPump } from "./partitionPump";
import { PumpManager } from "./pumpManager";
import { AbortSignalLike, AbortController } from "@azure/abort-controller";
import * as log from "./log";
import { cancellableDelay } from "./util/cancellableDelay";

/**
* Reason for closing a PartitionProcessor.
*/
export enum CloseReason {
/**
* Ownership of the partition was lost or transitioned to a new processor instance.
*/
OwnershipLost = "OwnershipLost",
/**
* The EventProcessor was shutdown.
*/
Shutdown = "Shutdown",
/**
* The PartitionProcessor was shutdown for an unknown reason.
*/
Unknown = "Unknown"
}

export interface PartitionProcessor {
/**
Expand All @@ -18,7 +40,7 @@ export interface PartitionProcessor {
* This may occur when control of the partition switches to another EPH or when user stops EPH
* TODO: update string -> CloseReason
*/
close?(reason: string): Promise<void>;
close?(reason: CloseReason): Promise<void>;
/**
* Called when a batch of events have been received.
*/
Expand Down Expand Up @@ -79,7 +101,11 @@ export class EventProcessor {
private _eventHubClient: EventHubClient;
private _partitionProcessorFactory: PartitionProcessorFactory;
private _processorOptions: EventProcessorOptions;
private _partitionPump?: PartitionPump;
private _pumpManager: PumpManager;
private _id: string = uuid();
private _isRunning: boolean = false;
private _loopTask?: PromiseLike<void>;
private _abortController?: AbortController;

constructor(
consumerGroupName: string,
Expand All @@ -94,55 +120,134 @@ export class EventProcessor {
this._eventHubClient = eventHubClient;
this._partitionProcessorFactory = partitionProcessorFactory;
this._processorOptions = options;
this._pumpManager = new PumpManager(this._id, options);
}

private async _getInactivePartitions(): Promise<string[]> {
try {
// get all partition ids on the event hub
const partitionIds = await this._eventHubClient.getPartitionIds();
// get partitions this EventProcessor is actively processing
const activePartitionIds = this._pumpManager.receivingFromPartitions();

// get a list of partition ids that are not being processed by this EventProcessor
const inactivePartitionIds: string[] = partitionIds.filter(
(id) => activePartitionIds.indexOf(id) === -1
);
return inactivePartitionIds;
} catch (err) {
log.error(`[${this._id}] An error occured when retrieving partition ids: ${err}`);
throw err;
}
}

/**
* Starts the EventProcessor loop.
* Load-balancing and partition ownership should be checked inside the loop.
* @ignore
*/
private async _runLoop(abortSignal: AbortSignalLike): Promise<void> {
// periodically check if there is any partition not being processed and process it
const waitIntervalInMs = 30000;
while (!abortSignal.aborted) {
try {
// get a list of partition ids that are not being processed by this EventProcessor
const partitionsToAdd = await this._getInactivePartitions();
// check if the loop has been cancelled
if (abortSignal.aborted) {
return;
}

const tasks: PromiseLike<void>[] = [];
// create partition pumps to process any partitions we should be processing
for (const partitionId of partitionsToAdd) {
const partitionContext: PartitionContext = {
consumerGroupName: this._consumerGroupName,
eventHubName: this._eventHubClient.eventHubName,
partitionId: partitionId
};

const checkpointManager = new CheckpointManager();

log.eventProcessor(
`[${this._id}] [${partitionId}] Calling user-provided PartitionProcessorFactory.`
);
const partitionProcessor = this._partitionProcessorFactory(
partitionContext,
checkpointManager
);

// eventually this will 1st check if the existing PartitionOwnership has a position
const eventPosition =
this._processorOptions.initialEventPosition || EventPosition.earliest();

tasks.push(
this._pumpManager.createPump(
this._eventHubClient,
partitionContext,
eventPosition,
partitionProcessor
)
);
}

// wait for all the new pumps to be created
await Promise.all(tasks);
log.eventProcessor(`[${this._id}] PartitionPumps created within EventProcessor.`);

// sleep
log.eventProcessor(
`[${this._id}] Pausing the EventProcessor loop for ${waitIntervalInMs} ms.`
);
await cancellableDelay(waitIntervalInMs, abortSignal);
} catch (err) {
log.error(`[${this._id}] An error occured within the EventProcessor loop: ${err}`);
}
}

// loop has completed, remove all existing pumps
return this._pumpManager.removeAllPumps(CloseReason.Shutdown);
}

/**
* Starts the event processor, fetching the list of partitions, and attempting to grab leases
* For each successful lease, it will get the details from the blob and start a receiver at the
* point where it left off previously.
*
* @return {Promise<void>}
* @return {void}
*/
async start(): Promise<void> {
const partitionIds = await this._eventHubClient.getPartitionIds();
const partitionContext: PartitionContext = {
partitionId: partitionIds[0],
consumerGroupName: this._consumerGroupName,
eventHubName: this._eventHubClient.eventHubName
};
const partitionProcessor = this._partitionProcessorFactory(
partitionContext,
new CheckpointManager()
);
if (partitionProcessor.initialize && typeof partitionProcessor.initialize !== "function") {
throw new TypeError("'initialize' must be of type 'function'.");
}
if (typeof partitionProcessor.processEvents !== "function") {
throw new TypeError("'processEvents' is required and must be of type 'function'.");
}
if (typeof partitionProcessor.processError !== "function") {
throw new TypeError("'processError' is required and must be of type 'function'.");
}
if (partitionProcessor.close && typeof partitionProcessor.close !== "function") {
throw new TypeError("'close' must be of type 'function'.");
start(): void {
if (this._isRunning) {
log.eventProcessor(`[${this._id}] Attempted to start an already running EventProcessor.`);
return;
}

this._partitionPump = new PartitionPump(
this._eventHubClient,
partitionContext,
partitionProcessor,
this._processorOptions
);
await this._partitionPump.start(partitionIds[0]);
this._isRunning = true;
this._abortController = new AbortController();
log.eventProcessor(`[${this._id}] Starting an EventProcessor.`);
this._loopTask = this._runLoop(this._abortController.signal);
}

/**
* Stops the EventProcessor from processing messages.
* @return {Promise<void>}
*/
async stop(): Promise<void> {
if (this._partitionPump) {
await this._partitionPump.stop("Stopped processing");
log.eventProcessor(`[${this._id}] Stopping an EventProcessor.`);
if (this._abortController) {
// cancel the event processor loop
this._abortController.abort();
}

this._isRunning = false;
try {
// waits for the event processor loop to complete
// will complete immediately if _loopTask is undefined
await this._loopTask;
} catch (err) {
log.error(`[${this._id}] An error occured while stopping the EventProcessor: ${err}`);
} finally {
log.eventProcessor(`[${this._id}] EventProcessor stopped.`);
}
}
}
11 changes: 10 additions & 1 deletion sdk/eventhub/event-hubs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@ export { PartitionProperties, EventHubProperties } from "./managementClient";
export { EventHubProducer } from "./sender";
export { EventHubConsumer, EventIteratorOptions } from "./receiver";
export { EventDataBatch } from "./eventDataBatch";
export { EventProcessor } from "./eventProcessor";
export { CheckpointManager } from "./checkpointManager";
export {
EventProcessor,
CloseReason,
EventProcessorOptions,
PartitionProcessor,
PartitionManager,
PartitionProcessorFactory,
PartitionOwnership
} from "./eventProcessor";
export { PartitionContext } from "./partitionContext";
export {
MessagingError,
Expand Down
10 changes: 10 additions & 0 deletions sdk/eventhub/event-hubs/src/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,13 @@ export const iotClient = debugModule("azure:event-hubs:iothubClient");
* log statements for partitionManager
*/
export const partitionPump = debugModule("azure:event-hubs:partitionPump");
/**
* @ignore
* log statements for pumpManager
*/
export const pumpManager = debugModule("azure:event-hubs:pumpManager");
/**
* @ignore
* log statements for eventProcessor
*/
export const eventProcessor = debugModule("azure:event-hubs:eventProcessor");
Loading