From ac30216f864b63968988849f5f92b489029d6785 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Tue, 30 Jul 2019 14:29:52 -0700 Subject: [PATCH 1/5] [Event Hubs] Implement PartitionManager methods --- .../event-hubs/review/event-hubs.api.md | 48 +++++++++++++++ sdk/eventhub/event-hubs/src/eventProcessor.ts | 2 +- .../src/inMemoryPartitionManager.ts | 40 +++++++++++++ sdk/eventhub/event-hubs/src/index.ts | 4 +- .../event-hubs/test/eventProcessor.spec.ts | 60 ++++++++++++++++++- 5 files changed, 151 insertions(+), 3 deletions(-) create mode 100644 sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 0423821806b9..46225b23b135 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -28,6 +28,22 @@ export interface BatchOptions { partitionKey?: string; } +// @public +export interface Checkpoint { + // (undocumented) + consumerGroupName: string; + // (undocumented) + eventHubName: string; + // (undocumented) + instanceId: string; + // (undocumented) + offset: number; + // (undocumented) + partitionId: string; + // (undocumented) + sequenceNumber: number; +} + export { DataTransformer } export { DefaultDataTransformer } @@ -156,6 +172,16 @@ export class EventProcessor { stop(): Promise; } +// @public +export class InMemoryPartitionManager implements PartitionManager { + // (undocumented) + claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise; + // (undocumented) + listOwnerships(eventHubName: string, consumerGroupName: string): Promise; + // (undocumented) + updateCheckpoint(checkpoint: Checkpoint): Promise; +} + export { MessagingError } // @public @@ -174,6 +200,28 @@ export interface PartitionContext { readonly partitionId: string; } +// @public +export interface PartitionOwnership { + // (undocumented) + consumerGroupName: string; + // (undocumented) + ETag?: string; + // (undocumented) + eventHubName: string; + // (undocumented) + instanceId: string; + // (undocumented) + lastModifiedTime?: number; + // (undocumented) + offset?: number; + // (undocumented) + ownerLevel: number; + // (undocumented) + partitionId: string; + // (undocumented) + sequenceNumber?: number; +} + // @public export interface PartitionProperties { beginningSequenceNumber: number; diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 285fb43b1043..2090b043ce09 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -60,7 +60,7 @@ export interface PartitionProcessorFactory { export interface PartitionManager { listOwnerships(eventHubName: string, consumerGroupName: string): Promise; claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise; - createCheckpoint(checkpoint: Checkpoint): Promise; + updateCheckpoint(checkpoint: Checkpoint): Promise; } // Options passed when creating EventProcessor, everything is optional diff --git a/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts b/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts new file mode 100644 index 000000000000..3bc3ef02046a --- /dev/null +++ b/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { PartitionManager, PartitionOwnership } from "./eventProcessor"; +import { Checkpoint } from "./checkpointManager"; +import { generate_uuid } from "rhea-promise"; + +/** + * A simple in-memory implementation of a `PartitionManager` + * @class + */ +export class InMemoryPartitionManager implements PartitionManager { + private _partitionOwnershipMap: Map = new Map(); + + async listOwnerships( + eventHubName: string, + consumerGroupName: string + ): Promise { + return Array.from(this._partitionOwnershipMap.values()); + } + + async claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise { + for (let partitionOwnership of partitionOwnerships) { + if (!this._partitionOwnershipMap.has(partitionOwnership.partitionId)) { + partitionOwnership.ETag = generate_uuid(); + this._partitionOwnershipMap.set(partitionOwnership.partitionId, partitionOwnership); + } + } + return partitionOwnerships; + } + + async updateCheckpoint(checkpoint: Checkpoint): Promise { + let partitionOwnership = this._partitionOwnershipMap.get(checkpoint.partitionId); + if (partitionOwnership) { + partitionOwnership.sequenceNumber = checkpoint.sequenceNumber; + partitionOwnership.offset = checkpoint.offset; + partitionOwnership.ETag = generate_uuid(); + } + } +} diff --git a/sdk/eventhub/event-hubs/src/index.ts b/sdk/eventhub/event-hubs/src/index.ts index e00a632b0fd2..5022b8f5106d 100644 --- a/sdk/eventhub/event-hubs/src/index.ts +++ b/sdk/eventhub/event-hubs/src/index.ts @@ -21,8 +21,10 @@ export { PartitionProperties, EventHubProperties } from "./managementClient"; export { EventHubProducer } from "./sender"; export { EventHubConsumer, EventIteratorOptions } from "./receiver"; export { EventDataBatch } from "./eventDataBatch"; -export { EventProcessor } from "./eventProcessor"; +export { EventProcessor, PartitionOwnership } from "./eventProcessor"; export { PartitionContext } from "./partitionContext"; +export { InMemoryPartitionManager} from "./inMemoryPartitionManager" +export { Checkpoint } from "./checkpointManager"; export { MessagingError, DataTransformer, diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 2fdb84605c45..4e67b945a595 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -13,9 +13,13 @@ import { EventData, EventProcessor, PartitionContext, - delay + delay, + InMemoryPartitionManager, + PartitionOwnership, + Checkpoint } from "../src"; import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; +import { generate_uuid } from "rhea-promise"; const env = getEnvVars(); describe("Event Processor", function(): void { @@ -96,4 +100,58 @@ describe("Event Processor", function(): void { isCloseCalled.should.equal(true); }); }); + + describe("InMemory Partition Manager", function(): void { + const inMemoryPartitionManager = new InMemoryPartitionManager(); + + it("Claim ownership ", async function(): Promise { + const partitionOwnership1: PartitionOwnership = { + eventHubName: "myEventHub", + consumerGroupName: EventHubClient.defaultConsumerGroupName, + instanceId: generate_uuid(), + partitionId: "0", + ownerLevel: 10 + }; + const partitionOwnership2: PartitionOwnership = { + eventHubName: "myEventHub", + consumerGroupName: EventHubClient.defaultConsumerGroupName, + instanceId: generate_uuid(), + partitionId: "1", + ownerLevel: 10 + }; + const partitionOwnership = await inMemoryPartitionManager.claimOwnerships([ + partitionOwnership1, + partitionOwnership2 + ]); + partitionOwnership.length.should.equals(2); + }); + + it("Get list of ownerships ", async function(): Promise { + const partitionOwnershipList = await inMemoryPartitionManager.listOwnerships( + "myEventHub", + EventHubClient.defaultConsumerGroupName + ); + partitionOwnershipList.length.should.equals(2); + }); + + it("Update Checkpoint ", async function(): Promise { + const checkpoint: Checkpoint = { + eventHubName: "myEventHub", + consumerGroupName: EventHubClient.defaultConsumerGroupName, + instanceId: generate_uuid(), + partitionId: "0", + sequenceNumber: 10, + offset: 50 + }; + + await inMemoryPartitionManager.updateCheckpoint(checkpoint); + const partitionOwnershipList = await inMemoryPartitionManager.listOwnerships( + "myEventHub", + EventHubClient.defaultConsumerGroupName + ); + partitionOwnershipList[0].partitionId.should.equals("0"); + partitionOwnershipList[0].sequenceNumber!.should.equals(10); + partitionOwnershipList[0].offset!.should.equals(50); + }); + }); }).timeout(90000); From 7283f2d6d5c1fb3146efa5f7f672c716362286d5 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Tue, 30 Jul 2019 14:45:48 -0700 Subject: [PATCH 2/5] Update tests --- sdk/eventhub/event-hubs/test/eventProcessor.spec.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 4e67b945a595..f06e2cd1f803 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -104,7 +104,7 @@ describe("Event Processor", function(): void { describe("InMemory Partition Manager", function(): void { const inMemoryPartitionManager = new InMemoryPartitionManager(); - it("Claim ownership ", async function(): Promise { + it("should claim ownership ", async function(): Promise { const partitionOwnership1: PartitionOwnership = { eventHubName: "myEventHub", consumerGroupName: EventHubClient.defaultConsumerGroupName, @@ -126,7 +126,7 @@ describe("Event Processor", function(): void { partitionOwnership.length.should.equals(2); }); - it("Get list of ownerships ", async function(): Promise { + it("should get a list of ownerships ", async function(): Promise { const partitionOwnershipList = await inMemoryPartitionManager.listOwnerships( "myEventHub", EventHubClient.defaultConsumerGroupName @@ -134,7 +134,7 @@ describe("Event Processor", function(): void { partitionOwnershipList.length.should.equals(2); }); - it("Update Checkpoint ", async function(): Promise { + it("should update checkpoint ", async function(): Promise { const checkpoint: Checkpoint = { eventHubName: "myEventHub", consumerGroupName: EventHubClient.defaultConsumerGroupName, From b75d1d9ee268a8d0491b2c17e45627ed32a82f45 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Tue, 30 Jul 2019 15:37:47 -0700 Subject: [PATCH 3/5] Address comments --- .../event-hubs/review/event-hubs.api.md | 8 +------ .../event-hubs/src/checkpointManager.ts | 18 ++++++++++++++++ sdk/eventhub/event-hubs/src/eventProcessor.ts | 2 +- .../src/inMemoryPartitionManager.ts | 4 ++-- .../event-hubs/test/eventProcessor.spec.ts | 21 +++++++------------ 5 files changed, 30 insertions(+), 23 deletions(-) diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 46225b23b135..3086e8455798 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -30,17 +30,11 @@ export interface BatchOptions { // @public export interface Checkpoint { - // (undocumented) consumerGroupName: string; - // (undocumented) eventHubName: string; - // (undocumented) instanceId: string; - // (undocumented) offset: number; - // (undocumented) partitionId: string; - // (undocumented) sequenceNumber: number; } @@ -205,7 +199,7 @@ export interface PartitionOwnership { // (undocumented) consumerGroupName: string; // (undocumented) - ETag?: string; + eTag?: string; // (undocumented) eventHubName: string; // (undocumented) diff --git a/sdk/eventhub/event-hubs/src/checkpointManager.ts b/sdk/eventhub/event-hubs/src/checkpointManager.ts index 5822392396e9..f97c5cac4029 100644 --- a/sdk/eventhub/event-hubs/src/checkpointManager.ts +++ b/sdk/eventhub/event-hubs/src/checkpointManager.ts @@ -9,11 +9,29 @@ import { EventData } from "./eventData"; * Used by createCheckpoint in PartitionManager **/ export interface Checkpoint { + /** + * @property The event hub name + */ eventHubName: string; + /** + * @property The consumer group name + */ consumerGroupName: string; + /** + * @property The unique instance identifier + */ instanceId: string; + /** + * @property The identifier of the Event Hub partition + */ partitionId: string; + /** + * @property The sequence number of the event. + */ sequenceNumber: number; + /** + * @property The offset of the event. + */ offset: number; } diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 2090b043ce09..0c758fed00e2 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -42,7 +42,7 @@ export interface PartitionOwnership { offset?: number; sequenceNumber?: number; lastModifiedTime?: number; - ETag?: string; + eTag?: string; } /** diff --git a/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts b/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts index 3bc3ef02046a..82341bc711a1 100644 --- a/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts +++ b/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts @@ -22,7 +22,7 @@ export class InMemoryPartitionManager implements PartitionManager { async claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise { for (let partitionOwnership of partitionOwnerships) { if (!this._partitionOwnershipMap.has(partitionOwnership.partitionId)) { - partitionOwnership.ETag = generate_uuid(); + partitionOwnership.eTag = generate_uuid(); this._partitionOwnershipMap.set(partitionOwnership.partitionId, partitionOwnership); } } @@ -34,7 +34,7 @@ export class InMemoryPartitionManager implements PartitionManager { if (partitionOwnership) { partitionOwnership.sequenceNumber = checkpoint.sequenceNumber; partitionOwnership.offset = checkpoint.offset; - partitionOwnership.ETag = generate_uuid(); + partitionOwnership.eTag = generate_uuid(); } } } diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index f06e2cd1f803..3215481b6bc0 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -102,9 +102,8 @@ describe("Event Processor", function(): void { }); describe("InMemory Partition Manager", function(): void { - const inMemoryPartitionManager = new InMemoryPartitionManager(); - - it("should claim ownership ", async function(): Promise { + it("should claim ownership, get a list of ownership and update checkpoint", async function(): Promise { + const inMemoryPartitionManager = new InMemoryPartitionManager(); const partitionOwnership1: PartitionOwnership = { eventHubName: "myEventHub", consumerGroupName: EventHubClient.defaultConsumerGroupName, @@ -124,17 +123,13 @@ describe("Event Processor", function(): void { partitionOwnership2 ]); partitionOwnership.length.should.equals(2); - }); - - it("should get a list of ownerships ", async function(): Promise { - const partitionOwnershipList = await inMemoryPartitionManager.listOwnerships( + + const Ownershipslist = await inMemoryPartitionManager.listOwnerships( "myEventHub", EventHubClient.defaultConsumerGroupName ); - partitionOwnershipList.length.should.equals(2); - }); - - it("should update checkpoint ", async function(): Promise { + Ownershipslist.length.should.equals(2); + const checkpoint: Checkpoint = { eventHubName: "myEventHub", consumerGroupName: EventHubClient.defaultConsumerGroupName, @@ -150,8 +145,8 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName ); partitionOwnershipList[0].partitionId.should.equals("0"); - partitionOwnershipList[0].sequenceNumber!.should.equals(10); - partitionOwnershipList[0].offset!.should.equals(50); + partitionOwnershipList[0].sequenceNumber!.should.equals(checkpoint.sequenceNumber); + partitionOwnershipList[0].offset!.should.equals(checkpoint.offset); }); }); }).timeout(90000); From c7de1940bdcfe0752b14561f076444acab9b3c91 Mon Sep 17 00:00:00 2001 From: ShivangiReja <45216704+ShivangiReja@users.noreply.github.com> Date: Tue, 30 Jul 2019 15:46:53 -0700 Subject: [PATCH 4/5] Update sdk/eventhub/event-hubs/test/eventProcessor.spec.ts Co-Authored-By: Ramya Rao --- sdk/eventhub/event-hubs/test/eventProcessor.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 3215481b6bc0..f6e500bd9262 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -144,7 +144,7 @@ describe("Event Processor", function(): void { "myEventHub", EventHubClient.defaultConsumerGroupName ); - partitionOwnershipList[0].partitionId.should.equals("0"); + partitionOwnershipList[0].partitionId.should.equals(checkpoint.partitionId); partitionOwnershipList[0].sequenceNumber!.should.equals(checkpoint.sequenceNumber); partitionOwnershipList[0].offset!.should.equals(checkpoint.offset); }); From d880b4be25f1bff5976169bbd8478f91dfc99b56 Mon Sep 17 00:00:00 2001 From: Shivangi Reja Date: Tue, 30 Jul 2019 16:17:01 -0700 Subject: [PATCH 5/5] Address comments --- .../event-hubs/review/event-hubs.api.md | 17 +------ sdk/eventhub/event-hubs/src/eventProcessor.ts | 51 ++++++++++++++++++- .../src/inMemoryPartitionManager.ts | 25 ++++++++- .../event-hubs/src/partitionContext.ts | 12 +++++ .../event-hubs/test/eventProcessor.spec.ts | 4 +- 5 files changed, 88 insertions(+), 21 deletions(-) diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 3086e8455798..410a447a9e21 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -168,11 +168,8 @@ export class EventProcessor { // @public export class InMemoryPartitionManager implements PartitionManager { - // (undocumented) claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise; - // (undocumented) listOwnerships(eventHubName: string, consumerGroupName: string): Promise; - // (undocumented) updateCheckpoint(checkpoint: Checkpoint): Promise; } @@ -186,33 +183,21 @@ export type OnMessage = (eventData: ReceivedEventData) => void; // @public export interface PartitionContext { - // (undocumented) readonly consumerGroupName: string; - // (undocumented) readonly eventHubName: string; - // (undocumented) readonly partitionId: string; } // @public export interface PartitionOwnership { - // (undocumented) consumerGroupName: string; - // (undocumented) eTag?: string; - // (undocumented) eventHubName: string; - // (undocumented) instanceId: string; - // (undocumented) - lastModifiedTime?: number; - // (undocumented) + lastModifiedTimeInMS?: number; offset?: number; - // (undocumented) ownerLevel: number; - // (undocumented) partitionId: string; - // (undocumented) sequenceNumber?: number; } diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 0c758fed00e2..c2b1ffa98a20 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -34,14 +34,42 @@ export interface PartitionProcessor { * returned by listOwnerships */ export interface PartitionOwnership { + /** + * @property The event hub name + */ eventHubName: string; + /** + * @property The consumer group name + */ consumerGroupName: string; + /** + * @property The unique instance identifier + */ instanceId: string; + /** + * @property The identifier of the Event Hub partition + */ partitionId: string; + /** + * @property + * The owner level + */ ownerLevel: number; + /** + * @property The offset of the event. + */ offset?: number; + /** + * @property The sequence number of the event. + */ sequenceNumber?: number; - lastModifiedTime?: number; + /** + * @property The last modified time. + */ + lastModifiedTimeInMS?: number; + /** + * @property The unique identifier for the operation. + */ eTag?: string; } @@ -58,8 +86,29 @@ export interface PartitionProcessorFactory { * Deals mainly with read/write to the chosen storage service */ export interface PartitionManager { + /** + * Called to get the list of all existing partition ownership from the underlying data store. Could return empty + * results if there are is no existing ownership information. + * + * @param eventHubName The event hub name. + * @param consumerGroupName The consumer group name. + * @return A list of partition ownership details of all the partitions that have/had an owner. + */ listOwnerships(eventHubName: string, consumerGroupName: string): Promise; + /** + * Called to claim ownership of a list of partitions. This will return the list of partitions that were owned + * successfully. + * + * @param partitionOwnerships The list of partition ownerships this instance is claiming to own. + * @return A list of partitions this instance successfully claimed ownership. + */ claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise; + /** + * Updates the checkpoint in the data store for a partition. + * + * @param checkpoint The checkpoint. + * @return The new eTag on successful update. + */ updateCheckpoint(checkpoint: Checkpoint): Promise; } diff --git a/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts b/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts index 82341bc711a1..07e43beb31f5 100644 --- a/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts +++ b/sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts @@ -12,6 +12,14 @@ import { generate_uuid } from "rhea-promise"; export class InMemoryPartitionManager implements PartitionManager { private _partitionOwnershipMap: Map = new Map(); + /** + * Get the list of all existing partition ownership from the underlying data store. Could return empty + * results if there are is no existing ownership information. + * + * @param eventHubName The event hub name. + * @param consumerGroupName The consumer group name. + * @return Partition ownership details of all the partitions that have/had an owner.. + */ async listOwnerships( eventHubName: string, consumerGroupName: string @@ -19,8 +27,15 @@ export class InMemoryPartitionManager implements PartitionManager { return Array.from(this._partitionOwnershipMap.values()); } + /** + * Claim ownership of a list of partitions. This will return the list of partitions that were owned + * successfully. + * + * @param partitionOwnerships The list of partition ownerships this instance is claiming to own. + * @return A list partitions this instance successfully claimed ownership. + */ async claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise { - for (let partitionOwnership of partitionOwnerships) { + for (const partitionOwnership of partitionOwnerships) { if (!this._partitionOwnershipMap.has(partitionOwnership.partitionId)) { partitionOwnership.eTag = generate_uuid(); this._partitionOwnershipMap.set(partitionOwnership.partitionId, partitionOwnership); @@ -29,8 +44,14 @@ export class InMemoryPartitionManager implements PartitionManager { return partitionOwnerships; } + /** + * Updates the checkpoint in the data store for a partition. + * + * @param checkpoint The checkpoint. + * @return Promise + */ async updateCheckpoint(checkpoint: Checkpoint): Promise { - let partitionOwnership = this._partitionOwnershipMap.get(checkpoint.partitionId); + const partitionOwnership = this._partitionOwnershipMap.get(checkpoint.partitionId); if (partitionOwnership) { partitionOwnership.sequenceNumber = checkpoint.sequenceNumber; partitionOwnership.offset = checkpoint.offset; diff --git a/sdk/eventhub/event-hubs/src/partitionContext.ts b/sdk/eventhub/event-hubs/src/partitionContext.ts index 32876804f204..ac0c67a8d124 100644 --- a/sdk/eventhub/event-hubs/src/partitionContext.ts +++ b/sdk/eventhub/event-hubs/src/partitionContext.ts @@ -6,7 +6,19 @@ * about the partition, the EventProcessor will be processing events from. */ export interface PartitionContext { + /** + * @property The identifier of the Event Hub partition + * @readonly + */ readonly partitionId: string; + /** + * @property The event hub name + * @readonly + */ readonly eventHubName: string; + /** + * @property The consumer group name + * @readonly + */ readonly consumerGroupName: string; } diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index f6e500bd9262..5620bb8a1d70 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -124,11 +124,11 @@ describe("Event Processor", function(): void { ]); partitionOwnership.length.should.equals(2); - const Ownershipslist = await inMemoryPartitionManager.listOwnerships( + const ownershipslist = await inMemoryPartitionManager.listOwnerships( "myEventHub", EventHubClient.defaultConsumerGroupName ); - Ownershipslist.length.should.equals(2); + ownershipslist.length.should.equals(2); const checkpoint: Checkpoint = { eventHubName: "myEventHub",