Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPH] implement CheckpointManager methods #4583

Merged
merged 7 commits into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export interface BatchOptions {
// @public
export interface Checkpoint {
consumerGroupName: string;
eTag: string;
eventHubName: string;
instanceId: string;
offset: number;
Expand All @@ -41,10 +42,9 @@ export interface Checkpoint {

// @public
export class CheckpointManager {
// (undocumented)
updateCheckpoint(eventData: EventData): Promise<void>;
// (undocumented)
updateCheckpoint(offset: string, sequenceNumber: number): Promise<void>;
constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, instanceId: string);
updateCheckpoint(eventData: ReceivedEventData): Promise<void>;
updateCheckpoint(sequenceNumber: number, offset: number): Promise<void>;
}

// @public
Expand Down Expand Up @@ -164,11 +164,11 @@ export class EventPosition {
static earliest(): EventPosition;
enqueuedTime?: Date | number;
static fromEnqueuedTime(enqueuedTime: Date | number): EventPosition;
static fromOffset(offset: string, isInclusive?: boolean): EventPosition;
static fromOffset(offset: number, isInclusive?: boolean): EventPosition;
static fromSequenceNumber(sequenceNumber: number, isInclusive?: boolean): EventPosition;
isInclusive: boolean;
static latest(): EventPosition;
offset?: string;
offset?: number | "@latest";
sequenceNumber?: number;
}

Expand All @@ -191,9 +191,9 @@ export interface EventProcessorOptions {

// @public
export class InMemoryPartitionManager implements PartitionManager {
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnership(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<string>;
}

export { MessagingError }
Expand All @@ -213,9 +213,9 @@ export interface PartitionContext {

// @public
export interface PartitionManager {
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnership(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<string>;
}

// @public
Expand Down Expand Up @@ -249,7 +249,7 @@ export interface PartitionProcessorFactory {
export interface PartitionProperties {
beginningSequenceNumber: number;
eventHubPath: string;
lastEnqueuedOffset: string;
lastEnqueuedOffset: number;
lastEnqueuedSequenceNumber: number;
lastEnqueuedTimeUtc: Date;
partitionId: string;
Expand All @@ -259,7 +259,7 @@ export interface PartitionProperties {
export interface ReceivedEventData {
body: any;
enqueuedTimeUtc: Date;
offset: string;
offset: number;
partitionKey: string | null;
properties?: {
[key: string]: any;
Expand Down
79 changes: 57 additions & 22 deletions sdk/eventhub/event-hubs/src/checkpointManager.ts
Original file line number Diff line number Diff line change
@@ -1,58 +1,93 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

// import { PartitionContext } from "./partitionContext";
import { EventData } from "./eventData";
// import { PartitionManager } from "./eventProcessor";
import { PartitionContext } from "./partitionContext";
import { ReceivedEventData } from "./eventData";
import { PartitionManager } from "./eventProcessor";

/**
* Used by createCheckpoint in PartitionManager
**/
export interface Checkpoint {
/**
/**
* @property The event hub name
*/
eventHubName: string;
/**
/**
* @property The consumer group name
*/
consumerGroupName: string;
/**
/**
* @property The unique instance identifier
*/
instanceId: string;
/**
/**
* @property The identifier of the Event Hub partition
*/
partitionId: string;
/**
/**
* @property The sequence number of the event.
*/
sequenceNumber: number;
/**
/**
* @property The offset of the event.
*/
offset: number;
/**
* @property The unique identifier for the operation.
*/
eTag: string;
}

/**
* CheckPointManager is created by the library & passed to user's code to let them create a checkpoint
*/
export class CheckpointManager {
// private _partitionContext: PartitionContext; // for internal use by createCheckpoint
// private _partitionManager: PartitionManager; // for internal use by createCheckpoint

// constructor(partitionContext: PartitionContext, partitionManager: PartitionManager) {
// this._partitionContext = partitionContext;
// this._partitionManager = partitionManager;
// }

public async updateCheckpoint(eventData: EventData): Promise<void>;
private _partitionContext: PartitionContext;
private _partitionManager: PartitionManager;
private _instanceId: string;
private _eTag: string;

public async updateCheckpoint(offset: string, sequenceNumber: number): Promise<void>;
constructor(partitionContext: PartitionContext, partitionManager: PartitionManager, instanceId: string) {
this._partitionContext = partitionContext;
this._partitionManager = partitionManager;
this._instanceId = instanceId;
this._eTag = "";
}
/**
* Updates a checkpoint using the event data.
*
* @param eventData The event data to use for updating the checkpoint.
* @return Promise<void>
*/
public async updateCheckpoint(eventData: ReceivedEventData): Promise<void>;
/**
* Updates a checkpoint using the given offset and sequence number.
*
* @param sequenceNumber The sequence number to update the checkpoint.
* @param offset The offset to update the checkpoint.
* @return Promise<void>.
*/
public async updateCheckpoint(sequenceNumber: number, offset: number): Promise<void>;

public async updateCheckpoint(
eventDataOrOffset: EventData | string,
sequenceNumber?: number
): Promise<void> {}
eventDataOrSequenceNumber: ReceivedEventData | number,
offset?: number
): Promise<void> {
const checkpoint: Checkpoint = {
eventHubName: this._partitionContext.eventHubName,
consumerGroupName: this._partitionContext.consumerGroupName,
instanceId: this._instanceId,
partitionId: this._partitionContext.partitionId,
sequenceNumber:
typeof eventDataOrSequenceNumber === "number"
? eventDataOrSequenceNumber
: eventDataOrSequenceNumber.sequenceNumber,
offset:
typeof eventDataOrSequenceNumber === "number" ? offset! : eventDataOrSequenceNumber.offset,
eTag: this._eTag
};

this._eTag = await this._partitionManager.updateCheckpoint(checkpoint);
}
}
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/src/eventData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export interface EventDataInternal {
/**
* @property [offset] The offset of the event.
*/
offset?: string;
offset?: number;
/**
* @property [sequenceNumber] The sequence number of the event.
*/
Expand Down Expand Up @@ -216,7 +216,7 @@ export interface ReceivedEventData {
/**
* @property The offset of the event.
*/
offset: string;
offset: number;
/**
* @property The sequence number of the event.
*/
Expand Down
16 changes: 8 additions & 8 deletions sdk/eventhub/event-hubs/src/eventPosition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface EventPositionOptions {
* @property The offset of the event at the position. It can be undefined
* if the position is just created from a sequence number or an enqueued time.
*/
offset?: string;
offset?: number | "@latest";
/**
* @property Indicates if the current event at the specified offset is
* included or not. It is only applicable if offset is set. Default value: false.
Expand All @@ -40,20 +40,20 @@ export interface EventPositionOptions {
*/
export class EventPosition {
/**
* @property The token that represents the beginning event in the stream of a partition: `"-1"`.
* @property The token that represents the beginning event in the stream of a partition: `-1`.
* @static
* @readonly
* @ignore
*/
private static readonly startOfStream: string = "-1";
private static readonly startOfStream: number = -1;

/**
* @property The token that represents the last event in the stream of a partition: `"@latest"`.
* @static
* @readonly
* @ignore
*/
private static readonly endOfStream: string = "@latest";
private static readonly endOfStream = "@latest";
/**
* @property The offset of the event identified by this position.
* Expected to be undefined if the position is just created from a sequence number or an enqueued time.
Expand All @@ -63,7 +63,7 @@ export class EventPosition {
* The same offset may refer to a different event as events reach the age limit for
* retention and are no longer visible within the partition.
*/
offset?: string;
offset?: number | "@latest";
/**
* @property Indicates if the specified offset is inclusive of the event which it identifies.
* This information is only relevent if the event position was identified by an offset or sequence number.
Expand Down Expand Up @@ -106,11 +106,11 @@ export class EventPosition {
* Default: `false`.
* @returns EventPosition
*/
static fromOffset(offset: string, isInclusive?: boolean): EventPosition {
static fromOffset(offset: number, isInclusive?: boolean): EventPosition {
if (offset == undefined) {
throw new Error('Missing parameter "offset"');
}
return new EventPosition({ offset: String(offset), isInclusive: isInclusive });
return new EventPosition({ offset: offset, isInclusive: isInclusive });
}

/**
Expand Down Expand Up @@ -165,7 +165,7 @@ export class EventPosition {
*/

static latest(): EventPosition {
return EventPosition.fromOffset(EventPosition.endOfStream);
return new EventPosition({ offset: EventPosition.endOfStream });
}
}

Expand Down
28 changes: 22 additions & 6 deletions sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { PumpManager } from "./pumpManager";
import { AbortSignalLike, AbortController } from "@azure/abort-controller";
import * as log from "./log";
import { cancellableDelay } from "./util/cancellableDelay";
import { generate_uuid } from "rhea-promise";

/**
* Reason for closing a PartitionProcessor.
Expand Down Expand Up @@ -53,7 +54,7 @@ export interface PartitionProcessor {

/**
* used by PartitionManager to claim ownership.
* returned by listOwnerships
* returned by listOwnership
*/
export interface PartitionOwnership {
/**
Expand Down Expand Up @@ -116,22 +117,22 @@ export interface PartitionManager {
* @param consumerGroupName The consumer group name.
* @return A list of partition ownership details of all the partitions that have/had an owner.
*/
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
listOwnership(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
/**
* Called to claim ownership of a list of partitions. This will return the list of partitions that were owned
* successfully.
*
* @param partitionOwnerships The list of partition ownerships this instance is claiming to own.
* @param partitionOwnership The list of partition ownership this instance is claiming to own.
* @return A list of partitions this instance successfully claimed ownership.
*/
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
/**
* Updates the checkpoint in the data store for a partition.
*
* @param checkpoint The checkpoint.
* @return The new eTag on successful update.
*/
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
updateCheckpoint(checkpoint: Checkpoint): Promise<string>;
}

// Options passed when creating EventProcessor, everything is optional
Expand All @@ -155,6 +156,7 @@ export class EventProcessor {
private _isRunning: boolean = false;
private _loopTask?: PromiseLike<void>;
private _abortController?: AbortController;
private _partitionManager: PartitionManager;

constructor(
consumerGroupName: string,
Expand All @@ -168,6 +170,7 @@ export class EventProcessor {
this._consumerGroupName = consumerGroupName;
this._eventHubClient = eventHubClient;
this._partitionProcessorFactory = partitionProcessorFactory;
this._partitionManager = partitionManager;
this._processorOptions = options;
this._pumpManager = new PumpManager(this._id, options);
}
Expand Down Expand Up @@ -216,7 +219,20 @@ export class EventProcessor {
partitionId: partitionId
};

const checkpointManager = new CheckpointManager();
const partitionOwnership: PartitionOwnership = {
eventHubName: this._eventHubClient.eventHubName,
consumerGroupName: this._consumerGroupName,
instanceId: generate_uuid(),
partitionId: partitionId,
ownerLevel: 0
};
await this._partitionManager.claimOwnership([partitionOwnership]);

const checkpointManager = new CheckpointManager(
partitionContext,
this._partitionManager,
this._id
);

log.eventProcessor(
`[${this._id}] [${partitionId}] Calling user-provided PartitionProcessorFactory.`
Expand Down
Loading