diff --git a/sdk/eventhub/event-hubs/src/eventHubClient.ts b/sdk/eventhub/event-hubs/src/eventHubClient.ts index edf2210c2361..1fcf14fb7921 100644 --- a/sdk/eventhub/event-hubs/src/eventHubClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubClient.ts @@ -357,7 +357,7 @@ export class EventHubClient { */ async getHubRuntimeInformation(options?: RequestOptions): Promise { try { - return await this._context.managementSession!.getHubRuntimeInformation(); + return await this._context.managementSession!.getHubRuntimeInformation(options); } catch (err) { log.error("An error occurred while getting the hub runtime information: %O", err); throw err; @@ -391,7 +391,7 @@ export class EventHubClient { throw new Error("'partitionId' is a required parameter and must be of type: 'string' | 'number'."); } try { - return await this._context.managementSession!.getPartitionInformation(partitionId); + return await this._context.managementSession!.getPartitionInformation(partitionId, options); } catch (err) { log.error("An error occurred while getting the partition information: %O", err); throw err; diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index 0609d829baaf..c51f307d592c 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -2,11 +2,12 @@ // Licensed under the MIT License. import uuid from "uuid/v4"; -import { RequestResponseLink, defaultLock, translate, Constants } from "@azure/amqp-common"; +import { RequestResponseLink, defaultLock, translate, Constants, SendRequestOptions } from "@azure/amqp-common"; import { Message, EventContext, SenderEvents, ReceiverEvents, SenderOptions, ReceiverOptions } from "rhea-promise"; import { ConnectionContext } from "./connectionContext"; import { LinkEntity } from "./linkEntity"; import * as log from "./log"; +import { RequestOptions } from "./eventHubClient"; /** * Describes the runtime information of an EventHub. * @interface EventHubRuntimeInformation @@ -120,8 +121,19 @@ export class ManagementClient extends LinkEntity { * @param {Connection} connection - The established amqp connection * @returns {Promise} */ - async getHubRuntimeInformation(): Promise { - const info: any = await this._makeManagementRequest(Constants.eventHub); + async getHubRuntimeInformation(options?: RequestOptions): Promise { + const request: Message = { + body: Buffer.from(JSON.stringify([])), + message_id: uuid(), + reply_to: this.replyTo, + application_properties: { + operation: Constants.readOperation, + name: this.entityPath as string, + type: `${Constants.vendorString}:${Constants.eventHub}` + } + }; + + const info: any = await this._makeManagementRequest(request, options); const runtimeInfo: EventHubRuntimeInformation = { path: info.name, createdAt: new Date(info.created_at), @@ -150,11 +162,27 @@ export class ManagementClient extends LinkEntity { * @param {Connection} connection - The established amqp connection * @param {(string|number)} partitionId Partition ID for which partition information is required. */ - async getPartitionInformation(partitionId: string | number): Promise { + async getPartitionInformation( + partitionId: string | number, + options?: RequestOptions + ): Promise { if (typeof partitionId !== "string" && typeof partitionId !== "number") { throw new Error("'partitionId' is a required parameter and must be of " + "type: 'string' | 'number'."); } - const info: any = await this._makeManagementRequest(Constants.partition, partitionId); + + const request: Message = { + body: Buffer.from(JSON.stringify([])), + message_id: uuid(), + reply_to: this.replyTo, + application_properties: { + operation: Constants.readOperation, + name: this.entityPath as string, + type: `${Constants.vendorString}:${Constants.partition}`, + partition: `${partitionId}` + } + }; + + const info: any = await this._makeManagementRequest(request, options); const partitionInfo: EventHubPartitionRuntimeInformation = { beginningSequenceNumber: info.begin_sequence_number, hubPath: info.name, @@ -249,29 +277,19 @@ export class ManagementClient extends LinkEntity { * @param {string} type - The type of entity requested for. Valid values are "eventhub", "partition" * @param {string | number} [partitionId] - The partitionId. Required only when type is "partition". */ - private async _makeManagementRequest(type: "eventhub" | "partition", partitionId?: string | number): Promise { - if (partitionId != undefined && (typeof partitionId !== "string" && typeof partitionId !== "number")) { - throw new Error("'partitionId' is a required parameter and must be of type: 'string' | 'number'."); - } + private async _makeManagementRequest(request: Message, options?: RequestOptions): Promise { try { - const request: Message = { - body: Buffer.from(JSON.stringify([])), - message_id: uuid(), - reply_to: this.replyTo, - application_properties: { - operation: Constants.readOperation, - name: this.entityPath as string, - type: `${Constants.vendorString}:${type}` - } - }; - if (partitionId != undefined && type === Constants.partition) { - request.application_properties!.partition = `${partitionId}`; - } log.mgmt("[%s] Acquiring lock to get the management req res link.", this._context.connectionId); await defaultLock.acquire(this.managementLock, () => { return this._init(); }); - return (await this._mgmtReqResLink!.sendRequest(request)).body; + + const sendRequestOptions: SendRequestOptions = { + timeoutInSeconds: options && options.idleTimeoutInSeconds, + times: options && options.retryAttempts, + delayInSeconds: options && options.delayBetweenRetriesInSeconds + }; + return (await this._mgmtReqResLink!.sendRequest(request, sendRequestOptions)).body; } catch (err) { err = translate(err); log.error("An error occurred while making the request to $management endpoint: %O", err);