diff --git a/sdk/eventhub/mock-hub/package.json b/sdk/eventhub/mock-hub/package.json index 83e936df7d9d..b3d247ddcff5 100644 --- a/sdk/eventhub/mock-hub/package.json +++ b/sdk/eventhub/mock-hub/package.json @@ -11,6 +11,7 @@ ], "main": "dist/index.js", "types": "types/index.d.ts", + "sideEffects": false, "scripts": { "audit": "node ../../../common/scripts/rush-audit.js && rimraf node_modules package-lock.json && npm i --package-lock-only 2>&1 && npm audit", "build": "npm run clean && tsc -p .", @@ -24,8 +25,8 @@ "integration-test:browser": "echo skipped", "integration-test:node": "echo skipped", "integration-test": "npm run integration-test:node && npm run integration-test:browser", - "lint:fix": "echo skipped", - "lint": "eslint package.json src --ext .ts -f html -o mock-hub-lintReport.html || exit 0", + "lint:fix": "eslint --no-eslintrc -c ../../eslintrc.internal.json package.json src --ext .ts --fix --fix-type [problem,suggestion]", + "lint": "eslint --no-eslintrc -c ../../.eslintrc.internal.json package.json src --ext .ts", "pack": "npm pack 2>&1", "prepare": "npm run build", "test": "echo \"No tests implemented\"", diff --git a/sdk/eventhub/mock-hub/src/messages/event-hubs/partitionInfo.ts b/sdk/eventhub/mock-hub/src/messages/event-hubs/partitionInfo.ts index 2a25966b2808..386626a8041c 100644 --- a/sdk/eventhub/mock-hub/src/messages/event-hubs/partitionInfo.ts +++ b/sdk/eventhub/mock-hub/src/messages/event-hubs/partitionInfo.ts @@ -5,9 +5,9 @@ import { Message, types } from "rhea"; /** * Checks whether the provided message is requesting the partition info from the Event Hub. - * @param entityPath The path the client sent the request to. + * @param entityPath - The path the client sent the request to. * Expected to be `$management` if the message is requesting runtime info. - * @param message The message sent by the client. + * @param message - The message sent by the client. */ export function isPartitionInfo(entityPath: string, message: Message): boolean { if (entityPath !== "$management") { diff --git a/sdk/eventhub/mock-hub/src/messages/event-hubs/runtimeInfo.ts b/sdk/eventhub/mock-hub/src/messages/event-hubs/runtimeInfo.ts index 42998bf85b21..12c01afd83ef 100644 --- a/sdk/eventhub/mock-hub/src/messages/event-hubs/runtimeInfo.ts +++ b/sdk/eventhub/mock-hub/src/messages/event-hubs/runtimeInfo.ts @@ -5,9 +5,9 @@ import { Message, types } from "rhea"; /** * Checks whether the provided message is requesting the EventHub's runtime info. - * @param entityPath The path the client sent the request to. + * @param entityPath - The path the client sent the request to. * Expected to be `$management` if the message is requesting runtime info. - * @param message The message sent by the client. + * @param message - The message sent by the client. */ export function isHubRuntimeInfo(entityPath: string, message: Message): boolean { if (entityPath !== "$management") { diff --git a/sdk/eventhub/mock-hub/src/sender/streamingPartitionSender.ts b/sdk/eventhub/mock-hub/src/sender/streamingPartitionSender.ts index bd09fc975fab..58e7aa1fc513 100644 --- a/sdk/eventhub/mock-hub/src/sender/streamingPartitionSender.ts +++ b/sdk/eventhub/mock-hub/src/sender/streamingPartitionSender.ts @@ -1,10 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { Sender, SenderEvents, DeliveryAnnotations, types } from "rhea"; -import { AbortController, AbortSignalLike, AbortError } from "@azure/abort-controller"; - -import { MessageStore, MessageRecord } from "../storage/messageStore"; +import { AbortController, AbortError, AbortSignalLike } from "@azure/abort-controller"; +import { DeliveryAnnotations, Sender, SenderEvents, types } from "rhea"; +import { MessageRecord, MessageStore } from "../storage/messageStore"; import { EventPosition } from "../utils/eventPosition"; import { Message } from "rhea"; @@ -23,11 +22,11 @@ export class StreamingPartitionSender { /** * Instantiates a `StreamingPartitionSender`. - * @param messageStore The `MessageStore` that contains all of the messages sent to the service. - * @param sender The sender link that should be used to send messages to. - * @param partitionId Specifies which partition to send messages from. - * @param startPosition Specifies which message to start iterating from. - * @param enableRuntimeMetric Indicates whether partition info should be sent on each event. + * @param messageStore - The `MessageStore` that contains all of the messages sent to the service. + * @param sender - The sender link that should be used to send messages to. + * @param partitionId - Specifies which partition to send messages from. + * @param startPosition - Specifies which message to start iterating from. + * @param enableRuntimeMetric - Indicates whether partition info should be sent on each event. */ constructor( messageStore: MessageStore, @@ -46,7 +45,7 @@ export class StreamingPartitionSender { /** * Starts sending messages. */ - start() { + start(): void { this._sendMessages().catch((err) => { console.error(`Unexpected error while sending messages`, err); }); @@ -55,11 +54,11 @@ export class StreamingPartitionSender { /** * Stops sending messages. */ - stop() { + stop(): void { this._abortController.abort(); } - private async _sendMessages() { + private async _sendMessages(): Promise { const abortSignal = this._abortController.signal; const iterator = this._messageIterator; const sender = this._sender; @@ -112,7 +111,7 @@ export class StreamingPartitionSender { // And away it goes! sender.send(outgoingMessage); } catch (err) { - if ((err as any)?.name !== "AbortError") { + if (err?.name !== "AbortError") { console.error(`Unexpected error while streaming events: `, err); } } @@ -121,15 +120,17 @@ export class StreamingPartitionSender { private _waitForSendable(sender: Sender, abortSignal: AbortSignalLike): Promise { return new Promise((resolve, reject) => { - const onAbort = () => { + const onAbort = (): void => { sender.removeListener(SenderEvents.sendable, onSendable); abortSignal.removeEventListener("abort", onAbort); reject(new AbortError("Cancelled operation.")); }; - const onSendable = () => { + + const onSendable = (): void => { abortSignal.removeEventListener("abort", onAbort); resolve(); }; + sender.once(SenderEvents.sendable, onSendable); abortSignal.addEventListener("abort", onAbort); diff --git a/sdk/eventhub/mock-hub/src/server/mockServer.ts b/sdk/eventhub/mock-hub/src/server/mockServer.ts index a83437ab5b02..0fed2b768841 100644 --- a/sdk/eventhub/mock-hub/src/server/mockServer.ts +++ b/sdk/eventhub/mock-hub/src/server/mockServer.ts @@ -1,21 +1,21 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { ListenOptions } from "net"; -import { EventEmitter } from "events"; import { + ConnectionError, + ConnectionEvents, ConnectionOptions, Container, EventContext, Message, + Receiver, ReceiverEvents, Sender, - create_container, SenderEvents, - Receiver, - ConnectionEvents, - ConnectionError + create_container } from "rhea"; +import { EventEmitter } from "events"; +import { ListenOptions } from "net"; import { convertBufferToMessages } from "../utils/convertBufferToMessage"; export interface MockServerOptions { @@ -148,6 +148,8 @@ export class MockServer extends EventEmitter { return new Promise((resolve, reject) => { const options = this._options; const ONE_MB = 1024 * 1024; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any const listenOptions: ListenOptions & ConnectionOptions & any = { port: options.port ?? 0, max_frame_size: 65536, @@ -183,6 +185,7 @@ export class MockServer extends EventEmitter { emit(type: "receiverClose", event: ReceiverCloseEvent): boolean; emit(type: "senderClose", event: SenderCloseEvent): boolean; emit(type: "connectionClose", event: ConnectionCloseEvent): boolean; + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/explicit-module-boundary-types emit(type: string, event: any): boolean { return super.emit(type, event); } @@ -191,56 +194,57 @@ export class MockServer extends EventEmitter { * Add new "receiverOpen" event listener. * This event indicates when the remote peer has created a `Sender` * and the server creates a `Receiver` link in response. - * @param type "receiverOpen" - * @param listener + * @param type - "receiverOpen" + * @param listener - */ public on(type: "receiverOpen", listener: (event: ReceiverOpenEvent) => void): this; /** * Add new "receiverClose" event listener. * This event indicates when the remote peer has closed a `Sender` * and the server closes a `Receiver` link in response. - * @param type "receiverClose" - * @param listener + * @param type - "receiverClose" + * @param listener - */ public on(type: "receiverClose", listener: (event: ReceiverCloseEvent) => void): this; /** * Add new "connectionOpen" event listener. * This event indicates when the remote peer has created a connection to the server. - * @param type "connectionOpen" - * @param listener + * @param type - "connectionOpen" + * @param listener - */ public on(type: "connectionOpen", listener: (event: ConnectionOpenEvent) => void): this; /** * Add new "senderOpen" event listener. * This event indicates when the remote peer has created a `Receiver` * and the server creates a `Sender` link in response. - * @param type "senderOpen" - * @param listener + * @param type - "senderOpen" + * @param listener - */ public on(type: "senderOpen", listener: (event: SenderOpenEvent) => void): this; /** * Add new "senderClose" event listener. * This event indicates when the remote peer has closed a `Receiver` * and the server closes a `Sender` link in response. - * @param type "senderClose" - * @param listener + * @param type - "senderClose" + * @param listener - */ public on(type: "senderClose", listener: (event: SenderCloseEvent) => void): this; /** * Add new "connectionClose" event listener. * This event indicates when the remote peer has closed a connection to the server. - * @param type "connectionClose" - * @param listener + * @param type - "connectionClose" + * @param listener - */ public on(type: "connectionClose", listener: (event: ConnectionCloseEvent) => void): this; /** * Add new "onMessage" event listener. * This event indicates when the server has received a message from a remote peer. * Messages are received over a `Receiver` link. - * @param type "connectionClose" - * @param listener + * @param type - "connectionClose" + * @param listener - */ public on(type: "onMessages", listener: (event: OnMessagesEvent) => void): this; + // eslint-disable-next-line @typescript-eslint/no-explicit-any public on(type: string, listener: (event: any) => void): this { return super.on(type, listener); } @@ -267,14 +271,18 @@ export class MockServer extends EventEmitter { }); } - private _setupDefaultListeners() { + private _setupDefaultListeners(): void { this._container.sasl_server_mechanisms.enable_anonymous(); this._container.sasl.server_add_external(this._container.sasl_server_mechanisms); this._container.sasl_server_mechanisms["MSSBCBS"] = this._container.sasl_server_mechanisms[ "EXTERNAL" ]; - this._container.on(ConnectionEvents.connectionError, () => {}); - this._container.on(ConnectionEvents.protocolError, () => {}); + this._container.on(ConnectionEvents.connectionError, () => { + /* do nothing */ + }); + this._container.on(ConnectionEvents.protocolError, () => { + /* do nothing */ + }); this._container.on(ConnectionEvents.connectionOpen, (context: EventContext) => { context.connection.on("error", function(this: typeof context.connection, err: Error) { console.log(`Error occurred on connection:`, err?.message); @@ -358,7 +366,7 @@ export class MockServer extends EventEmitter { return incomingMessages; } - private _handleMessage = (context: EventContext) => { + private _handleMessage = (context: EventContext): void => { if (!context.message || !context.receiver) { return; } @@ -375,7 +383,11 @@ export class MockServer extends EventEmitter { }); }; - private _sendMessage = (context: EventContext, outgoingMessage: Message, toLinkName?: string) => { + private _sendMessage = ( + context: EventContext, + outgoingMessage: Message, + toLinkName?: string + ): void => { const sender = context.connection.find_sender( (s: Sender) => s.name === toLinkName || s.target.address === toLinkName ); diff --git a/sdk/eventhub/mock-hub/src/services/eventHubs.ts b/sdk/eventhub/mock-hub/src/services/eventHubs.ts index 1eabdad64210..ed81475eb834 100644 --- a/sdk/eventhub/mock-hub/src/services/eventHubs.ts +++ b/sdk/eventhub/mock-hub/src/services/eventHubs.ts @@ -1,38 +1,38 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { URL } from "url"; import { Connection, ConnectionEvents, EventContext, - Sender, Message, - SenderEvents, - ReceiverEvents + ReceiverEvents, + Sender, + SenderEvents } from "rhea"; import { + ConnectionCloseEvent, MockServer, MockServerOptions, - SenderOpenEvent, - ReceiverOpenEvent, OnMessagesEvent, + ReceiverOpenEvent, SenderCloseEvent, - ConnectionCloseEvent + SenderOpenEvent } from "../server/mockServer"; -import { MessageStore } from "../storage/messageStore"; -import { createCbsAccepted } from "../messages/cbs/cbsAccepted"; import { - isHubRuntimeInfo, - generateHubRuntimeInfoResponse + generateBadPartitionInfoResponse, + generatePartitionInfoResponse, + isPartitionInfo +} from "../messages/event-hubs/partitionInfo"; +import { + generateHubRuntimeInfoResponse, + isHubRuntimeInfo } from "../messages/event-hubs/runtimeInfo"; +import { MessageStore } from "../storage/messageStore"; import { StreamingPartitionSender } from "../sender/streamingPartitionSender"; +import { URL } from "url"; +import { createCbsAccepted } from "../messages/cbs/cbsAccepted"; import { getEventPosition } from "../utils/eventPosition"; -import { - isPartitionInfo, - generatePartitionInfoResponse, - generateBadPartitionInfoResponse -} from "../messages/event-hubs/partitionInfo"; export interface IMockEventHub { readonly partitionIds: string[]; @@ -154,7 +154,7 @@ export class MockEventHub implements IMockEventHub { /** * Instantiates a `MockEventHub` using the provided options. - * @param options + * @param options - The options to instantiate the MockEventHub with. */ constructor(options: MockEventHubOptions) { this._partitionCount = options.partitionCount ?? 2; @@ -177,12 +177,12 @@ export class MockEventHub implements IMockEventHub { }); } - private _handleConnectionInactivity = (connection: Connection) => { + private _handleConnectionInactivity = (connection: Connection): void => { if (!this._connectionInactivityTimeoutInMs) { return; } - const forceCloseConnection = () => { + const forceCloseConnection = (): void => { connection.close({ condition: "amqp:connection:forced", description: `The connection was inactive for more than the allowed ${this._connectionInactivityTimeoutInMs} milliseconds and is closed by the service.` @@ -192,7 +192,7 @@ export class MockEventHub implements IMockEventHub { let tid = setTimeout(forceCloseConnection, this._connectionInactivityTimeoutInMs); this._clearableTimeouts.add(tid); - const bounceTimeout = () => { + const bounceTimeout = (): void => { clearTimeout(tid); this._clearableTimeouts.delete(tid); tid = setTimeout(forceCloseConnection, this._connectionInactivityTimeoutInMs); @@ -210,9 +210,9 @@ export class MockEventHub implements IMockEventHub { * The event handler for when the service creates a `Receiver` link. * * This is done in response to the client opening a `Sender` link. - * @param event + * @param event - */ - private _handleReceiverOpen = (event: ReceiverOpenEvent) => { + private _handleReceiverOpen = (event: ReceiverOpenEvent): void => { event.receiver.set_source(event.receiver.source); event.receiver.set_target(event.receiver.target); if (this._isReceiverPartitionEntityPath(event.entityPath)) { @@ -238,9 +238,9 @@ export class MockEventHub implements IMockEventHub { * The event handler for when the service creates a `Sender` link. * * This is done in response to the client opening a `Receiver` link. - * @param event + * @param event - */ - private _handleSenderOpen = (event: SenderOpenEvent) => { + private _handleSenderOpen = (event: SenderOpenEvent): void => { event.sender.set_source(event.sender.source); event.sender.set_target(event.sender.target); if (event.entityPath === "$cbs") { @@ -306,7 +306,7 @@ export class MockEventHub implements IMockEventHub { // Probably should close the sender at this point. event.sender.close({ condition: "amqp:internal-error", - description: (err as any)?.message ?? "" + description: err?.message ?? "" }); } } @@ -317,9 +317,9 @@ export class MockEventHub implements IMockEventHub { * * This is done in response to the client closing a `Receiver` link, * or the service closing the `Sender` link. - * @param event + * @param event - */ - private _handleSenderClose = (event: SenderCloseEvent) => { + private _handleSenderClose = (event: SenderCloseEvent): void => { if (this._isSenderPartitionEntityPath(event.entityPath)) { // Handles partition-specific senders. const entityComponents = this._parseSenderPartitionEntityPath(event.entityPath); @@ -342,13 +342,13 @@ export class MockEventHub implements IMockEventHub { * The event handler for when the service closes a connection. * * This is done when a client explicitly closes or is disconnected. - * @param event + * @param event - */ - private _handleConnectionClose = (event: ConnectionCloseEvent) => { + private _handleConnectionClose = (event: ConnectionCloseEvent): void => { // Cleanup the partition senders we might have for this connection. // We'll just do brute force for now and optimize later. - for (const [consumerGroup, partitionMap] of this._consumerGroupPartitionSenderMap) { - for (const [partitionId, senders] of partitionMap) { + for (const [, partitionMap] of this._consumerGroupPartitionSenderMap) { + for (const [, senders] of partitionMap) { for (const sender of senders) { if (sender.connection === event.context.connection) { senders.delete(sender); @@ -372,9 +372,9 @@ export class MockEventHub implements IMockEventHub { * The event handler for when the service receives a message. * * Messages are not automatically accepted/rejected. - * @param event + * @param event - */ - private _handleOnMessages = (event: OnMessagesEvent) => { + private _handleOnMessages = (event: OnMessagesEvent): void => { // Handle batched messages first. if (event.entityPath === this._name) { // received a message without a partition id @@ -414,13 +414,13 @@ export class MockEventHub implements IMockEventHub { /** * Handles responding to CBS messages. - * @param event + * @param event - */ - private _handleCbsMessage(event: OnMessagesEvent, message: Message) { + private _handleCbsMessage(event: OnMessagesEvent, message: Message): void { let outgoingMessage: Message; if (!this.isValidCbsAuth(message)) { outgoingMessage = { - correlation_id: message.message_id!.toString(), + correlation_id: message.message_id?.toString(), to: message.reply_to, application_properties: { "status-code": 404, @@ -436,14 +436,14 @@ export class MockEventHub implements IMockEventHub { }); } event.context.delivery?.accept(); - return event.sendMessage(outgoingMessage); + event.sendMessage(outgoingMessage); } /** * Handles responding to Management READ EventHubs messages. - * @param event + * @param event - */ - private _handleHubRuntimeInfoMessage(event: OnMessagesEvent, message: Message) { + private _handleHubRuntimeInfoMessage(event: OnMessagesEvent, message: Message): void { const outgoingMessage = generateHubRuntimeInfoResponse({ correlationId: message.message_id?.toString(), partitions: this.partitionIds, @@ -452,42 +452,47 @@ export class MockEventHub implements IMockEventHub { eventHubName: this._name }); event.context.delivery?.accept(); - return event.sendMessage(outgoingMessage); + event.sendMessage(outgoingMessage); } /** * Handles responding to Management READ Partition messages. - * @param event + * @param event - */ - private _handlePartitionInfoMessage(event: OnMessagesEvent, message: Message) { + private _handlePartitionInfoMessage(event: OnMessagesEvent, message: Message): void { const partitionId = message.application_properties?.partition; let outgoingMessage: Message; if (!this.partitionIds.includes(partitionId)) { outgoingMessage = generateBadPartitionInfoResponse({ - correlationId: message.message_id!.toString(), + correlationId: message.message_id?.toString(), targetLinkName: message.reply_to }); } else { const partitionInfo = this._messageStore.getPartitionInfo(partitionId); outgoingMessage = generatePartitionInfoResponse({ ...partitionInfo, - correlationId: message.message_id!.toString(), + correlationId: message.message_id?.toString(), targetLinkName: message.reply_to, eventHubName: this._name }); } event.context.delivery?.accept(); - return event.sendMessage(outgoingMessage); + event.sendMessage(outgoingMessage); } /** * Handles storing and accepting/rejecting messages sent from a client to a partition. - * @param event - * @param partitionId + * @param event - + * @param partitionId - */ - private _handleReceivedMessage(event: OnMessagesEvent, partitionId?: string) { - const delivery = event.context.delivery!; - const deliverySize = (delivery as any)["data"]?.length ?? 0; + private _handleReceivedMessage(event: OnMessagesEvent, partitionId?: string): void { + const delivery = event.context.delivery; + + if (!delivery) { + throw new Error("event.context.delivery must be defined"); + } + + const deliverySize = (delivery as { data?: unknown[] })["data"]?.length ?? 0; const maxMessageSize = event.context.receiver?.get_option("max_message_size", 1024 * 1024) ?? 1024 * 1024; if (deliverySize >= maxMessageSize) { @@ -506,7 +511,7 @@ export class MockEventHub implements IMockEventHub { /** * Gets the Sender's `ownerLevel`, if it has one. - * @param sender + * @param sender - */ private _getSenderOwnerLevel(sender: Sender): number | undefined { const ownerLevel: number | undefined = sender.properties?.["com.microsoft:epoch"]; @@ -518,11 +523,11 @@ export class MockEventHub implements IMockEventHub { * * Note: Partition senders are used to send messages to a client receiver that * is listening on a consumerGroup/partitionId combination. - * @param consumerGroup - * @param partitionId - * @param sender + * @param consumerGroup - + * @param partitionId - + * @param sender - */ - private _storePartitionSender(consumerGroup: string, partitionId: string, sender: Sender) { + private _storePartitionSender(consumerGroup: string, partitionId: string, sender: Sender): void { // Ensure we have an entry for the consumer group. const consumerGroupPartitionMap = this._consumerGroupPartitionSenderMap.get(consumerGroup) ?? new Map>(); @@ -538,11 +543,11 @@ export class MockEventHub implements IMockEventHub { /** * Removes the partition sender based on its consumerGroup and partitionId. * - * @param consumerGroup - * @param partitionId - * @param sender + * @param consumerGroup - + * @param partitionId - + * @param sender - */ - private _deletePartitionSender(consumerGroup: string, partitionId: string, sender: Sender) { + private _deletePartitionSender(consumerGroup: string, partitionId: string, sender: Sender): void { const partitionSenders = this._consumerGroupPartitionSenderMap .get(consumerGroup) ?.get(partitionId); @@ -559,9 +564,9 @@ export class MockEventHub implements IMockEventHub { * * If the `Sender` is allowed to be created and does have an `ownerLevel`, * any existing `Sender`s with the same consumerGroup/partitionId will be closed. - * @param consumerGroup - * @param partitionId - * @param sender + * @param consumerGroup - + * @param partitionId - + * @param sender - */ private _handleSenderOwnerLevel( consumerGroup: string, @@ -634,10 +639,10 @@ export class MockEventHub implements IMockEventHub { * * If a `partitionId` is not provided, a partition will be assigned * either based on the `partitionKey` if it is available, or at random. - * @param message - * @param partitionId + * @param message - + * @param partitionId - */ - private _storeMessage(messages: Message[], partitionId?: string) { + private _storeMessage(messages: Message[], partitionId?: string): void { if (!messages.length) { return; } @@ -662,7 +667,7 @@ export class MockEventHub implements IMockEventHub { /** * A very hacky 'hash' function to calculate a `partitionId` from a `partitionKey`. - * @param partitionKey + * @param partitionKey - */ private _partitionIdFromKey(partitionKey: string): string { let hash = 0; @@ -675,9 +680,9 @@ export class MockEventHub implements IMockEventHub { /** * Validates whether the partition sender can be created. * - * @param entityComponents - * @param sender - * @param context + * @param entityComponents - + * @param sender - + * @param context - */ private _handlePartitionSenderOpenValidation( entityComponents: PartionSenderEntityComponents, @@ -707,14 +712,14 @@ export class MockEventHub implements IMockEventHub { /** * Starts the service. */ - start() { + start(): Promise { return this._mockServer.start(); } /** * Stops the service. */ - stop() { + stop(): Promise { for (const tid of this._clearableTimeouts.values()) { clearTimeout(tid); } @@ -730,7 +735,7 @@ export class MockEventHub implements IMockEventHub { return; } - const [eventHubName, _1, partitionId] = parts; + const [eventHubName, , partitionId] = parts; return { eventHubName, partitionId @@ -745,7 +750,7 @@ export class MockEventHub implements IMockEventHub { return; } - const [eventHubName, _1, consumerGroup, _2, partitionId] = parts; + const [eventHubName, , consumerGroup, , partitionId] = parts; return { eventHubName, consumerGroup, @@ -753,7 +758,7 @@ export class MockEventHub implements IMockEventHub { }; } - private isValidCbsAuth(message: Message) { + private isValidCbsAuth(message: Message): boolean | undefined { const name = message.application_properties?.name as string | undefined; if (!name) { return; diff --git a/sdk/eventhub/mock-hub/src/storage/messageStore.ts b/sdk/eventhub/mock-hub/src/storage/messageStore.ts index b21ee7307257..2c9751ca90b8 100644 --- a/sdk/eventhub/mock-hub/src/storage/messageStore.ts +++ b/sdk/eventhub/mock-hub/src/storage/messageStore.ts @@ -3,9 +3,9 @@ /// -import { Message } from "rhea"; import "@azure/core-asynciterator-polyfill"; import { EventPosition } from "../utils/eventPosition"; +import { Message } from "rhea"; import { Queue } from "./queue"; export interface MessageRecord { @@ -16,6 +16,15 @@ export interface MessageRecord { message: Message; } +export interface PartitionInfo { + beginningSequenceNumber: number; + lastEnqueuedOffset: string; + lastEnqueuedTimeUtc: Date; + lastEnqueuedSequenceNumber: number; + partitionId: string; + isPartitionEmpty: boolean; +} + /** * The `MessageStore` stores events sent to the service. * It provides a method of pulling events from a partition via `getMessageIterator`, @@ -43,7 +52,7 @@ export class MessageStore { /** * Gets the list of `MessageRecord` associated with the specified partition id. - * @param partitionId + * @param partitionId - The partition id to find message records for. */ private _getPartitionStore(partitionId: string): MessageRecord[] { const partitionStore = this._partitionRecordMap.get(partitionId) ?? []; @@ -53,7 +62,7 @@ export class MessageStore { /** * Gets the full Set of 'QueueViews' associated with the specified partition id. - * @param partitionId + * @param partitionId - */ private _getPartitionViews(partitionId: string): Set> { const queueViews = this._partitionQueueViews.get(partitionId) ?? new Set(); @@ -63,10 +72,10 @@ export class MessageStore { /** * Returns the list of `MessageRecord` that appears on or after the specified `startPosition`. - * @param fullList List of `MessageRecord`. - * @param startPosition The `EventPosition` used to find which `MessageRecord` to start reading from. + * @param fullList - List of `MessageRecord`. + * @param startPosition - The `EventPosition` used to find which `MessageRecord` to start reading from. */ - private _getSubList(fullList: MessageRecord[], startPosition: EventPosition) { + private _getSubList(fullList: MessageRecord[], startPosition: EventPosition): MessageRecord[] { if (startPosition.type === "offset" && startPosition.value === "@latest") { return []; } @@ -96,9 +105,9 @@ export class MessageStore { /** * Provides information about the state of the specified partition. - * @param partitionId + * @param partitionId - The partition ID to find information about. */ - public getPartitionInfo(partitionId: string) { + public getPartitionInfo(partitionId: string): PartitionInfo { const partitionStore = this._getPartitionStore(partitionId); const isEmpty = !partitionStore.length; @@ -130,11 +139,11 @@ export class MessageStore { * Associates the provided `Message` with a `partitionId` and stores it. * * This will also update any `MessageIterator`s that are waiting on this partitionId. - * @param partitionId - * @param message - * @param partitionKey + * @param partitionId - The partition id to associate the message with. + * @param message - The message to store. + * @param partitionKey - Optional partition key. */ - public storeMessage(partitionId: string, message: Message, partitionKey?: string) { + public storeMessage(partitionId: string, message: Message, partitionKey?: string): void { const partitionStore = this._getPartitionStore(partitionId); const record: MessageRecord = { enqueuedTime: new Date(), @@ -153,13 +162,13 @@ export class MessageStore { /** * Returns an AsyncIterableIterator that yields `MessageRecord`. * - * @param partitionId - * @param startPosition Specifies which `MessageRecord` to start iterating from. + * @param partitionId - The partition ID + * @param startPosition - Specifies which `MessageRecord` to start iterating from. */ public async *getMessageIterator( partitionId: string, startPosition: EventPosition - ): AsyncIterator { + ): AsyncIterator { const partitionStore = this._getPartitionStore(partitionId); const partitionViews = this._getPartitionViews(partitionId); const partitionStoreSubset = this._getSubList(partitionStore, startPosition); diff --git a/sdk/eventhub/mock-hub/src/storage/queue.ts b/sdk/eventhub/mock-hub/src/storage/queue.ts index 59fc662040a8..5d08a9329b99 100644 --- a/sdk/eventhub/mock-hub/src/storage/queue.ts +++ b/sdk/eventhub/mock-hub/src/storage/queue.ts @@ -41,7 +41,7 @@ export class Queue { /** * Appends new item to the queue. - * @param item + * @param item - the item to append */ public push(item: T): void { if (!this._resolveNextItem(item)) { @@ -49,7 +49,7 @@ export class Queue { } } - private _resolveNextItem(item: T) { + private _resolveNextItem(item: T): boolean { if (!this._nextItemResolve) { return false; }