Skip to content

Commit

Permalink
[Event Hubs] Allow users to configure retry options in apis using the…
Browse files Browse the repository at this point in the history
… $management link (#3044)
  • Loading branch information
ShivangiReja authored and ramya-rao-a committed May 22, 2019
1 parent 1eb3e4f commit 4c5f9ed
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 25 deletions.
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/src/eventHubClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ export class EventHubClient {
*/
async getHubRuntimeInformation(options?: RequestOptions): Promise<EventHubRuntimeInformation> {
try {
return await this._context.managementSession!.getHubRuntimeInformation();
return await this._context.managementSession!.getHubRuntimeInformation(options);
} catch (err) {
log.error("An error occurred while getting the hub runtime information: %O", err);
throw err;
Expand Down Expand Up @@ -391,7 +391,7 @@ export class EventHubClient {
throw new Error("'partitionId' is a required parameter and must be of type: 'string' | 'number'.");
}
try {
return await this._context.managementSession!.getPartitionInformation(partitionId);
return await this._context.managementSession!.getPartitionInformation(partitionId, options);
} catch (err) {
log.error("An error occurred while getting the partition information: %O", err);
throw err;
Expand Down
64 changes: 41 additions & 23 deletions sdk/eventhub/event-hubs/src/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
// Licensed under the MIT License.

import uuid from "uuid/v4";
import { RequestResponseLink, defaultLock, translate, Constants } from "@azure/amqp-common";
import { RequestResponseLink, defaultLock, translate, Constants, SendRequestOptions } from "@azure/amqp-common";
import { Message, EventContext, SenderEvents, ReceiverEvents, SenderOptions, ReceiverOptions } from "rhea-promise";
import { ConnectionContext } from "./connectionContext";
import { LinkEntity } from "./linkEntity";
import * as log from "./log";
import { RequestOptions } from "./eventHubClient";
/**
* Describes the runtime information of an EventHub.
* @interface EventHubRuntimeInformation
Expand Down Expand Up @@ -120,8 +121,19 @@ export class ManagementClient extends LinkEntity {
* @param {Connection} connection - The established amqp connection
* @returns {Promise<EventHubRuntimeInformation>}
*/
async getHubRuntimeInformation(): Promise<EventHubRuntimeInformation> {
const info: any = await this._makeManagementRequest(Constants.eventHub);
async getHubRuntimeInformation(options?: RequestOptions): Promise<EventHubRuntimeInformation> {
const request: Message = {
body: Buffer.from(JSON.stringify([])),
message_id: uuid(),
reply_to: this.replyTo,
application_properties: {
operation: Constants.readOperation,
name: this.entityPath as string,
type: `${Constants.vendorString}:${Constants.eventHub}`
}
};

const info: any = await this._makeManagementRequest(request, options);
const runtimeInfo: EventHubRuntimeInformation = {
path: info.name,
createdAt: new Date(info.created_at),
Expand Down Expand Up @@ -150,11 +162,27 @@ export class ManagementClient extends LinkEntity {
* @param {Connection} connection - The established amqp connection
* @param {(string|number)} partitionId Partition ID for which partition information is required.
*/
async getPartitionInformation(partitionId: string | number): Promise<EventHubPartitionRuntimeInformation> {
async getPartitionInformation(
partitionId: string | number,
options?: RequestOptions
): Promise<EventHubPartitionRuntimeInformation> {
if (typeof partitionId !== "string" && typeof partitionId !== "number") {
throw new Error("'partitionId' is a required parameter and must be of " + "type: 'string' | 'number'.");
}
const info: any = await this._makeManagementRequest(Constants.partition, partitionId);

const request: Message = {
body: Buffer.from(JSON.stringify([])),
message_id: uuid(),
reply_to: this.replyTo,
application_properties: {
operation: Constants.readOperation,
name: this.entityPath as string,
type: `${Constants.vendorString}:${Constants.partition}`,
partition: `${partitionId}`
}
};

const info: any = await this._makeManagementRequest(request, options);
const partitionInfo: EventHubPartitionRuntimeInformation = {
beginningSequenceNumber: info.begin_sequence_number,
hubPath: info.name,
Expand Down Expand Up @@ -249,29 +277,19 @@ export class ManagementClient extends LinkEntity {
* @param {string} type - The type of entity requested for. Valid values are "eventhub", "partition"
* @param {string | number} [partitionId] - The partitionId. Required only when type is "partition".
*/
private async _makeManagementRequest(type: "eventhub" | "partition", partitionId?: string | number): Promise<any> {
if (partitionId != undefined && (typeof partitionId !== "string" && typeof partitionId !== "number")) {
throw new Error("'partitionId' is a required parameter and must be of type: 'string' | 'number'.");
}
private async _makeManagementRequest(request: Message, options?: RequestOptions): Promise<any> {
try {
const request: Message = {
body: Buffer.from(JSON.stringify([])),
message_id: uuid(),
reply_to: this.replyTo,
application_properties: {
operation: Constants.readOperation,
name: this.entityPath as string,
type: `${Constants.vendorString}:${type}`
}
};
if (partitionId != undefined && type === Constants.partition) {
request.application_properties!.partition = `${partitionId}`;
}
log.mgmt("[%s] Acquiring lock to get the management req res link.", this._context.connectionId);
await defaultLock.acquire(this.managementLock, () => {
return this._init();
});
return (await this._mgmtReqResLink!.sendRequest(request)).body;

const sendRequestOptions: SendRequestOptions = {
timeoutInSeconds: options && options.idleTimeoutInSeconds,
times: options && options.retryAttempts,
delayInSeconds: options && options.delayBetweenRetriesInSeconds
};
return (await this._mgmtReqResLink!.sendRequest(request, sendRequestOptions)).body;
} catch (err) {
err = translate(err);
log.error("An error occurred while making the request to $management endpoint: %O", err);
Expand Down

0 comments on commit 4c5f9ed

Please sign in to comment.