Skip to content

Commit

Permalink
[Event Hubs] Implement PartitionManager methods (#4538)
Browse files Browse the repository at this point in the history
 [Event Hubs] Implement PartitionManager methods
  • Loading branch information
ShivangiReja authored Jul 30, 2019
1 parent 6bc44fa commit e9d98bc
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 24 deletions.
43 changes: 22 additions & 21 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ export interface BatchOptions {
partitionKey?: string;
}

// @public
export interface Checkpoint {
consumerGroupName: string;
eventHubName: string;
instanceId: string;
offset: number;
partitionId: string;
sequenceNumber: number;
}

// @public
export class CheckpointManager {
// (undocumented)
Expand All @@ -40,7 +50,8 @@ export class CheckpointManager {
// @public
export enum CloseReason {
OwnershipLost = "OwnershipLost",
Shutdown = "Shutdown"
Shutdown = "Shutdown",
Unknown = "Unknown"
}

export { DataTransformer }
Expand Down Expand Up @@ -178,6 +189,13 @@ export interface EventProcessorOptions {
maxWaitTimeInSeconds?: number;
}

// @public
export class InMemoryPartitionManager implements PartitionManager {
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
}

export { MessagingError }

// @public
Expand All @@ -188,45 +206,28 @@ export type OnMessage = (eventData: ReceivedEventData) => void;

// @public
export interface PartitionContext {
// (undocumented)
readonly consumerGroupName: string;
// (undocumented)
readonly eventHubName: string;
// (undocumented)
readonly partitionId: string;
}

// @public
export interface PartitionManager {
// (undocumented)
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
// Warning: (ae-forgotten-export) The symbol "Checkpoint" needs to be exported by the entry point index.d.ts
//
// (undocumented)
createCheckpoint(checkpoint: Checkpoint): Promise<void>;
// (undocumented)
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
}

// @public
export interface PartitionOwnership {
// (undocumented)
consumerGroupName: string;
// (undocumented)
ETag?: string;
// (undocumented)
eTag?: string;
eventHubName: string;
// (undocumented)
instanceId: string;
// (undocumented)
lastModifiedTime?: number;
// (undocumented)
lastModifiedTimeInMS?: number;
offset?: number;
// (undocumented)
ownerLevel: number;
// (undocumented)
partitionId: string;
// (undocumented)
sequenceNumber?: number;
}

Expand Down
18 changes: 18 additions & 0 deletions sdk/eventhub/event-hubs/src/checkpointManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,29 @@ import { EventData } from "./eventData";
* Used by createCheckpoint in PartitionManager
**/
export interface Checkpoint {
/**
* @property The event hub name
*/
eventHubName: string;
/**
* @property The consumer group name
*/
consumerGroupName: string;
/**
* @property The unique instance identifier
*/
instanceId: string;
/**
* @property The identifier of the Event Hub partition
*/
partitionId: string;
/**
* @property The sequence number of the event.
*/
sequenceNumber: number;
/**
* @property The offset of the event.
*/
offset: number;
}

Expand Down
55 changes: 52 additions & 3 deletions sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,43 @@ export interface PartitionProcessor {
* returned by listOwnerships
*/
export interface PartitionOwnership {
/**
* @property The event hub name
*/
eventHubName: string;
/**
* @property The consumer group name
*/
consumerGroupName: string;
/**
* @property The unique instance identifier
*/
instanceId: string;
/**
* @property The identifier of the Event Hub partition
*/
partitionId: string;
/**
* @property
* The owner level
*/
ownerLevel: number;
/**
* @property The offset of the event.
*/
offset?: number;
/**
* @property The sequence number of the event.
*/
sequenceNumber?: number;
lastModifiedTime?: number;
ETag?: string;
/**
* @property The last modified time.
*/
lastModifiedTimeInMS?: number;
/**
* @property The unique identifier for the operation.
*/
eTag?: string;
}

/**
Expand All @@ -80,9 +108,30 @@ export interface PartitionProcessorFactory {
* Deals mainly with read/write to the chosen storage service
*/
export interface PartitionManager {
/**
* Called to get the list of all existing partition ownership from the underlying data store. Could return empty
* results if there are is no existing ownership information.
*
* @param eventHubName The event hub name.
* @param consumerGroupName The consumer group name.
* @return A list of partition ownership details of all the partitions that have/had an owner.
*/
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
/**
* Called to claim ownership of a list of partitions. This will return the list of partitions that were owned
* successfully.
*
* @param partitionOwnerships The list of partition ownerships this instance is claiming to own.
* @return A list of partitions this instance successfully claimed ownership.
*/
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
createCheckpoint(checkpoint: Checkpoint): Promise<void>;
/**
* Updates the checkpoint in the data store for a partition.
*
* @param checkpoint The checkpoint.
* @return The new eTag on successful update.
*/
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
}

// Options passed when creating EventProcessor, everything is optional
Expand Down
61 changes: 61 additions & 0 deletions sdk/eventhub/event-hubs/src/inMemoryPartitionManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { PartitionManager, PartitionOwnership } from "./eventProcessor";
import { Checkpoint } from "./checkpointManager";
import { generate_uuid } from "rhea-promise";

/**
* A simple in-memory implementation of a `PartitionManager`
* @class
*/
export class InMemoryPartitionManager implements PartitionManager {
private _partitionOwnershipMap: Map<string, PartitionOwnership> = new Map();

/**
* Get the list of all existing partition ownership from the underlying data store. Could return empty
* results if there are is no existing ownership information.
*
* @param eventHubName The event hub name.
* @param consumerGroupName The consumer group name.
* @return Partition ownership details of all the partitions that have/had an owner..
*/
async listOwnerships(
eventHubName: string,
consumerGroupName: string
): Promise<PartitionOwnership[]> {
return Array.from(this._partitionOwnershipMap.values());
}

/**
* Claim ownership of a list of partitions. This will return the list of partitions that were owned
* successfully.
*
* @param partitionOwnerships The list of partition ownerships this instance is claiming to own.
* @return A list partitions this instance successfully claimed ownership.
*/
async claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]> {
for (const partitionOwnership of partitionOwnerships) {
if (!this._partitionOwnershipMap.has(partitionOwnership.partitionId)) {
partitionOwnership.eTag = generate_uuid();
this._partitionOwnershipMap.set(partitionOwnership.partitionId, partitionOwnership);
}
}
return partitionOwnerships;
}

/**
* Updates the checkpoint in the data store for a partition.
*
* @param checkpoint The checkpoint.
* @return Promise<void>
*/
async updateCheckpoint(checkpoint: Checkpoint): Promise<void> {
const partitionOwnership = this._partitionOwnershipMap.get(checkpoint.partitionId);
if (partitionOwnership) {
partitionOwnership.sequenceNumber = checkpoint.sequenceNumber;
partitionOwnership.offset = checkpoint.offset;
partitionOwnership.eTag = generate_uuid();
}
}
}
2 changes: 2 additions & 0 deletions sdk/eventhub/event-hubs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ export {
PartitionOwnership
} from "./eventProcessor";
export { PartitionContext } from "./partitionContext";
export { InMemoryPartitionManager} from "./inMemoryPartitionManager"
export { Checkpoint } from "./checkpointManager";
export {
MessagingError,
DataTransformer,
Expand Down
12 changes: 12 additions & 0 deletions sdk/eventhub/event-hubs/src/partitionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,19 @@
* about the partition, the EventProcessor will be processing events from.
*/
export interface PartitionContext {
/**
* @property The identifier of the Event Hub partition
* @readonly
*/
readonly partitionId: string;
/**
* @property The event hub name
* @readonly
*/
readonly eventHubName: string;
/**
* @property The consumer group name
* @readonly
*/
readonly consumerGroupName: string;
}
53 changes: 53 additions & 0 deletions sdk/eventhub/event-hubs/test/eventProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ import {
EventProcessor,
PartitionContext,
delay,
InMemoryPartitionManager,
PartitionOwnership,
Checkpoint,
PartitionProcessorFactory,
CloseReason
} from "../src";
import { EnvVarKeys, getEnvVars } from "./utils/testUtils";
import { generate_uuid } from "rhea-promise";
const env = getEnvVars();

describe("Event Processor", function(): void {
Expand Down Expand Up @@ -431,4 +435,53 @@ describe("Event Processor", function(): void {
isCloseCalled.should.equal(true);
});
});

describe("InMemory Partition Manager", function(): void {
it("should claim ownership, get a list of ownership and update checkpoint", async function(): Promise<void> {
const inMemoryPartitionManager = new InMemoryPartitionManager();
const partitionOwnership1: PartitionOwnership = {
eventHubName: "myEventHub",
consumerGroupName: EventHubClient.defaultConsumerGroupName,
instanceId: generate_uuid(),
partitionId: "0",
ownerLevel: 10
};
const partitionOwnership2: PartitionOwnership = {
eventHubName: "myEventHub",
consumerGroupName: EventHubClient.defaultConsumerGroupName,
instanceId: generate_uuid(),
partitionId: "1",
ownerLevel: 10
};
const partitionOwnership = await inMemoryPartitionManager.claimOwnerships([
partitionOwnership1,
partitionOwnership2
]);
partitionOwnership.length.should.equals(2);

const ownershipslist = await inMemoryPartitionManager.listOwnerships(
"myEventHub",
EventHubClient.defaultConsumerGroupName
);
ownershipslist.length.should.equals(2);

const checkpoint: Checkpoint = {
eventHubName: "myEventHub",
consumerGroupName: EventHubClient.defaultConsumerGroupName,
instanceId: generate_uuid(),
partitionId: "0",
sequenceNumber: 10,
offset: 50
};

await inMemoryPartitionManager.updateCheckpoint(checkpoint);
const partitionOwnershipList = await inMemoryPartitionManager.listOwnerships(
"myEventHub",
EventHubClient.defaultConsumerGroupName
);
partitionOwnershipList[0].partitionId.should.equals(checkpoint.partitionId);
partitionOwnershipList[0].sequenceNumber!.should.equals(checkpoint.sequenceNumber);
partitionOwnershipList[0].offset!.should.equals(checkpoint.offset);
});
});
}).timeout(90000);

0 comments on commit e9d98bc

Please sign in to comment.