Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Implement PartitionManager methods #4538

Merged
merged 6 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ export interface Checkpoint {
sequenceNumber: number;
}

// @public
export class CheckpointManager {
// (undocumented)
updateCheckpoint(eventData: EventData): Promise<void>;
// (undocumented)
updateCheckpoint(offset: string, sequenceNumber: number): Promise<void>;
}

// @public
export enum CloseReason {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something all languages are exposing? I don't remember seeing it in our API design, that close() takes a reason. 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposed API for javascript, here it takes CloseReason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@conniey We brought that up on Monday too, because we were curious how Java would know why the subscription was closed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I also wonder what the user could do differently with that reason (ie. user shutdown vs stolen)? I mean, we log it in the EventConsumer but it is not propagated out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory if it was a shutdown they could checkpoint one last time, whereas if it were stolen they wouldn’t be able to update the checkpoint.

OwnershipLost = "OwnershipLost",
Shutdown = "Shutdown",
Unknown = "Unknown"
}

export { DataTransformer }

export { DefaultDataTransformer }
Expand Down Expand Up @@ -164,6 +179,16 @@ export class EventProcessor {
stop(): Promise<void>;
}

// @public (undocumented)
export interface EventProcessorOptions {
// (undocumented)
initialEventPosition?: EventPosition;
// (undocumented)
maxBatchSize?: number;
// (undocumented)
maxWaitTimeInSeconds?: number;
}

// @public
export class InMemoryPartitionManager implements PartitionManager {
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
Expand All @@ -186,6 +211,13 @@ export interface PartitionContext {
readonly partitionId: string;
}

// @public
export interface PartitionManager {
claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
}

// @public
export interface PartitionOwnership {
consumerGroupName: string;
Expand All @@ -199,6 +231,20 @@ export interface PartitionOwnership {
sequenceNumber?: number;
}

// @public (undocumented)
export interface PartitionProcessor {
close?(reason: CloseReason): Promise<void>;
initialize?(): Promise<void>;
processError(error: Error): Promise<void>;
processEvents(events: EventData[]): Promise<void>;
}

// @public
export interface PartitionProcessorFactory {
// (undocumented)
(context: PartitionContext, checkpointManager: CheckpointManager): PartitionProcessor;
}

// @public
export interface PartitionProperties {
beginningSequenceNumber: number;
Expand Down
11 changes: 10 additions & 1 deletion sdk/eventhub/event-hubs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@ export { PartitionProperties, EventHubProperties } from "./managementClient";
export { EventHubProducer } from "./sender";
export { EventHubConsumer, EventIteratorOptions } from "./receiver";
export { EventDataBatch } from "./eventDataBatch";
export { EventProcessor, PartitionOwnership } from "./eventProcessor";
export { CheckpointManager } from "./checkpointManager";
export {
EventProcessor,
CloseReason,
EventProcessorOptions,
PartitionProcessor,
PartitionManager,
PartitionProcessorFactory,
PartitionOwnership
} from "./eventProcessor";
export { PartitionContext } from "./partitionContext";
export { InMemoryPartitionManager} from "./inMemoryPartitionManager"
export { Checkpoint } from "./checkpointManager";
Expand Down
4 changes: 3 additions & 1 deletion sdk/eventhub/event-hubs/test/eventProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import {
delay,
InMemoryPartitionManager,
PartitionOwnership,
Checkpoint
Checkpoint,
PartitionProcessorFactory,
CloseReason
} from "../src";
import { EnvVarKeys, getEnvVars } from "./utils/testUtils";
import { generate_uuid } from "rhea-promise";
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.