diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 4639ba21abce..d6d1f5f4a2fb 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -29,6 +29,16 @@ export interface BatchOptions { partitionKey?: string; } +// @public +export interface Checkpoint { + consumerGroupName: string; + eventHubName: string; + instanceId: string; + offset: number; + partitionId: string; + sequenceNumber: number; +} + // @public export class CheckpointManager { // (undocumented) @@ -40,7 +50,8 @@ export class CheckpointManager { // @public export enum CloseReason { OwnershipLost = "OwnershipLost", - Shutdown = "Shutdown" + Shutdown = "Shutdown", + Unknown = "Unknown" } export { DataTransformer } @@ -178,6 +189,13 @@ export interface EventProcessorOptions { maxWaitTimeInSeconds?: number; } +// @public +export class InMemoryPartitionManager implements PartitionManager { + claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise; + listOwnerships(eventHubName: string, consumerGroupName: string): Promise; + updateCheckpoint(checkpoint: Checkpoint): Promise; +} + export { MessagingError } // @public @@ -188,45 +206,28 @@ export type OnMessage = (eventData: ReceivedEventData) => void; // @public export interface PartitionContext { - // (undocumented) readonly consumerGroupName: string; - // (undocumented) readonly eventHubName: string; - // (undocumented) readonly partitionId: string; } // @public export interface PartitionManager { - // (undocumented) claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise; - // Warning: (ae-forgotten-export) The symbol "Checkpoint" needs to be exported by the entry point index.d.ts - // - // (undocumented) - createCheckpoint(checkpoint: Checkpoint): Promise; - // (undocumented) listOwnerships(eventHubName: string, consumerGroupName: string): Promise; + updateCheckpoint(checkpoint: Checkpoint): Promise; } // @public export interface PartitionOwnership { - // (undocumented) consumerGroupName: string; - // (undocumented) - ETag?: string; - // (undocumented) + eTag?: string; eventHubName: string; - // (undocumented) instanceId: string; - // (undocumented) - lastModifiedTime?: number; - // (undocumented) + lastModifiedTimeInMS?: number; offset?: number; - // (undocumented) ownerLevel: number; - // (undocumented) partitionId: string; - // (undocumented) sequenceNumber?: number; } diff --git a/sdk/eventhub/event-hubs/src/checkpointManager.ts b/sdk/eventhub/event-hubs/src/checkpointManager.ts index 5822392396e9..f97c5cac4029 100644 --- a/sdk/eventhub/event-hubs/src/checkpointManager.ts +++ b/sdk/eventhub/event-hubs/src/checkpointManager.ts @@ -9,11 +9,29 @@ import { EventData } from "./eventData"; * Used by createCheckpoint in PartitionManager **/ export interface Checkpoint { + /** + * @property The event hub name + */ eventHubName: string; + /** + * @property The consumer group name + */ consumerGroupName: string; + /** + * @property The unique instance identifier + */ instanceId: string; + /** + * @property The identifier of the Event Hub partition + */ partitionId: string; + /** + * @property The sequence number of the event. + */ sequenceNumber: number; + /** + * @property The offset of the event. + */ offset: number; } diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index bba3db587fd2..4f1f5d227fcb 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -56,15 +56,43 @@ export interface PartitionProcessor { * returned by listOwnerships */ export interface PartitionOwnership { + /** + * @property The event hub name + */ eventHubName: string; + /** + * @property The consumer group name + */ consumerGroupName: string; + /** + * @property The unique instance identifier + */ instanceId: string; + /** + * @property The identifier of the Event Hub partition + */ partitionId: string; + /** + * @property + * The owner level + */ ownerLevel: number; + /** + * @property The offset of the event. + */ offset?: number; + /** + * @property The sequence number of the event. + */ sequenceNumber?: number; - lastModifiedTime?: number; - ETag?: string; + /** + * @property The last modified time. + */ + lastModifiedTimeInMS?: number; + /** + * @property The unique identifier for the operation. + */ + eTag?: string; } /** @@ -80,9 +108,30 @@ export interface PartitionProcessorFactory { * Deals mainly with read/write to the chosen storage service */ export interface PartitionManager { + /** + * Called to get the list of all existing partition ownership from the underlying data store. Could return empty + * results if there are is no existing ownership information. + * + * @param eventHubName The event hub name. + * @param consumerGroupName The consumer group name. + * @return A list of partition ownership details of all the partitions that have/had an owner. + */ listOwnerships(eventHubName: string, consumerGroupName: string): Promise; + /** + * Called to claim ownership of a list of partitions. This will return the list of partitions that were owned + * successfully. + * + * @param partitionOwnerships The list of partition ownerships this instance is claiming to own. + * @return A list of partitions this instance successfully claimed ownership. + */ claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise; - createCheckpoint(checkpoint: Checkpoint): Promise; + /** + * Updates the checkpoint in the data store for a partition. + * + * @param checkpoint The checkpoint. + * @return The new eTag on successful update. + */ + updateCheckpoint(checkpoint: Checkpoint): Promise; } // Options passed when creating EventProcessor, everything is optional diff --git a/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts b/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts new file mode 100644 index 000000000000..07e43beb31f5 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { PartitionManager, PartitionOwnership } from "./eventProcessor"; +import { Checkpoint } from "./checkpointManager"; +import { generate_uuid } from "rhea-promise"; + +/** + * A simple in-memory implementation of a `PartitionManager` + * @class + */ +export class InMemoryPartitionManager implements PartitionManager { + private _partitionOwnershipMap: Map = new Map(); + + /** + * Get the list of all existing partition ownership from the underlying data store. Could return empty + * results if there are is no existing ownership information. + * + * @param eventHubName The event hub name. + * @param consumerGroupName The consumer group name. + * @return Partition ownership details of all the partitions that have/had an owner.. + */ + async listOwnerships( + eventHubName: string, + consumerGroupName: string + ): Promise { + return Array.from(this._partitionOwnershipMap.values()); + } + + /** + * Claim ownership of a list of partitions. This will return the list of partitions that were owned + * successfully. + * + * @param partitionOwnerships The list of partition ownerships this instance is claiming to own. + * @return A list partitions this instance successfully claimed ownership. + */ + async claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise { + for (const partitionOwnership of partitionOwnerships) { + if (!this._partitionOwnershipMap.has(partitionOwnership.partitionId)) { + partitionOwnership.eTag = generate_uuid(); + this._partitionOwnershipMap.set(partitionOwnership.partitionId, partitionOwnership); + } + } + return partitionOwnerships; + } + + /** + * Updates the checkpoint in the data store for a partition. + * + * @param checkpoint The checkpoint. + * @return Promise + */ + async updateCheckpoint(checkpoint: Checkpoint): Promise { + const partitionOwnership = this._partitionOwnershipMap.get(checkpoint.partitionId); + if (partitionOwnership) { + partitionOwnership.sequenceNumber = checkpoint.sequenceNumber; + partitionOwnership.offset = checkpoint.offset; + partitionOwnership.eTag = generate_uuid(); + } + } +} diff --git a/sdk/eventhub/event-hubs/src/index.ts b/sdk/eventhub/event-hubs/src/index.ts index bda445efac2f..78bddbc5e3b3 100644 --- a/sdk/eventhub/event-hubs/src/index.ts +++ b/sdk/eventhub/event-hubs/src/index.ts @@ -32,6 +32,8 @@ export { PartitionOwnership } from "./eventProcessor"; export { PartitionContext } from "./partitionContext"; +export { InMemoryPartitionManager} from "./inMemoryPartitionManager" +export { Checkpoint } from "./checkpointManager"; export { MessagingError, DataTransformer, diff --git a/sdk/eventhub/event-hubs/src/partitionContext.ts b/sdk/eventhub/event-hubs/src/partitionContext.ts index 32876804f204..ac0c67a8d124 100644 --- a/sdk/eventhub/event-hubs/src/partitionContext.ts +++ b/sdk/eventhub/event-hubs/src/partitionContext.ts @@ -6,7 +6,19 @@ * about the partition, the EventProcessor will be processing events from. */ export interface PartitionContext { + /** + * @property The identifier of the Event Hub partition + * @readonly + */ readonly partitionId: string; + /** + * @property The event hub name + * @readonly + */ readonly eventHubName: string; + /** + * @property The consumer group name + * @readonly + */ readonly consumerGroupName: string; } diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index abb870103b43..0a779d9ad59b 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -14,10 +14,14 @@ import { EventProcessor, PartitionContext, delay, + InMemoryPartitionManager, + PartitionOwnership, + Checkpoint, PartitionProcessorFactory, CloseReason } from "../src"; import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; +import { generate_uuid } from "rhea-promise"; const env = getEnvVars(); describe("Event Processor", function(): void { @@ -431,4 +435,53 @@ describe("Event Processor", function(): void { isCloseCalled.should.equal(true); }); }); + + describe("InMemory Partition Manager", function(): void { + it("should claim ownership, get a list of ownership and update checkpoint", async function(): Promise { + const inMemoryPartitionManager = new InMemoryPartitionManager(); + const partitionOwnership1: PartitionOwnership = { + eventHubName: "myEventHub", + consumerGroupName: EventHubClient.defaultConsumerGroupName, + instanceId: generate_uuid(), + partitionId: "0", + ownerLevel: 10 + }; + const partitionOwnership2: PartitionOwnership = { + eventHubName: "myEventHub", + consumerGroupName: EventHubClient.defaultConsumerGroupName, + instanceId: generate_uuid(), + partitionId: "1", + ownerLevel: 10 + }; + const partitionOwnership = await inMemoryPartitionManager.claimOwnerships([ + partitionOwnership1, + partitionOwnership2 + ]); + partitionOwnership.length.should.equals(2); + + const ownershipslist = await inMemoryPartitionManager.listOwnerships( + "myEventHub", + EventHubClient.defaultConsumerGroupName + ); + ownershipslist.length.should.equals(2); + + const checkpoint: Checkpoint = { + eventHubName: "myEventHub", + consumerGroupName: EventHubClient.defaultConsumerGroupName, + instanceId: generate_uuid(), + partitionId: "0", + sequenceNumber: 10, + offset: 50 + }; + + await inMemoryPartitionManager.updateCheckpoint(checkpoint); + const partitionOwnershipList = await inMemoryPartitionManager.listOwnerships( + "myEventHub", + EventHubClient.defaultConsumerGroupName + ); + partitionOwnershipList[0].partitionId.should.equals(checkpoint.partitionId); + partitionOwnershipList[0].sequenceNumber!.should.equals(checkpoint.sequenceNumber); + partitionOwnershipList[0].offset!.should.equals(checkpoint.offset); + }); + }); }).timeout(90000);