Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core-amqp] Refactor time units to milliseconds #4401

Merged
merged 14 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions sdk/core/core-amqp/src/ConnectionContextBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ export interface CreateConnectionContextBaseParameters {
*/
isEntityPathRequired?: boolean;
/**
* @property {number} [operationTimeoutInSeconds] - The duration in which the promise should
* @property {number} [operationTimeoutInMs] - The duration in which the promise should
* complete (resolve/reject). If it is not completed, then the Promise will be rejected after
* timeout occurs. Default: `60 seconds`.
* timeout occurs. Default: `60000 milliseconds`.
*/
operationTimeoutInSeconds?: number;
operationTimeoutInMs?: number;
}

export module ConnectionContextBase {
Expand Down Expand Up @@ -156,7 +156,9 @@ export module ConnectionContextBase {
platform: `(${os.arch()}-${os.type()}-${os.release()})`,
framework: `Node/${process.version}`
},
operationTimeoutInSeconds: parameters.operationTimeoutInSeconds
operationTimeoutInSeconds: parameters.operationTimeoutInMs
? parameters.operationTimeoutInMs / 1000
: undefined
};

if (
Expand Down
22 changes: 10 additions & 12 deletions sdk/core/core-amqp/src/requestResponseLink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ export interface SendRequestOptions {
*/
abortSignal?: AbortSignalLike;
/**
* @property {number} [timeoutInSeconds] Max time to wait for the operation to complete.
* Default: `60 seconds`.
* @property {number} [timeoutInMs] Max time to wait for the operation to complete.
* Default: `60000 milliseconds`.
*/
timeoutInSeconds?: number;
timeoutInMs?: number;
/**
* @property {string} [requestName] Name of the request being performed.
*/
Expand Down Expand Up @@ -74,20 +74,18 @@ export class RequestResponseLink implements ReqResLink {

/**
* Sends the given request message and returns the received response. If the operation is not
* completed in the provided timeout in seconds `default: 60`, then `OperationTimeoutError` is thrown.
* completed in the provided timeout in milliseconds `default: 60000`, then `OperationTimeoutError` is thrown.
*
* @param {Message} request The AMQP (request) message.
* @param {SendRequestOptions} [options] Options that can be provided while sending a request.
* @returns {Promise<Message>} Promise<Message> The AMQP (response) message.
*/
sendRequest(request: AmqpMessage, options?: SendRequestOptions): Promise<AmqpMessage> {
if (!options) options = {};

if (!options.timeoutInSeconds) {
options.timeoutInSeconds = Constants.defaultOperationTimeoutInSeconds;
sendRequest(request: AmqpMessage, options: SendRequestOptions = {}): Promise<AmqpMessage> {
if (!options.timeoutInMs) {
options.timeoutInMs = Constants.defaultOperationTimeoutInMs;
}

const aborter: AbortSignalLike | undefined = options && options.abortSignal;
const aborter: AbortSignalLike | undefined = options.abortSignal;

return new Promise<AmqpMessage>((resolve: any, reject: any) => {
let waitTimer: any;
Expand All @@ -100,7 +98,7 @@ export class RequestResponseLink implements ReqResLink {

const rejectOnAbort = () => {
const address = this.receiver.address || "address";
const requestName = options!.requestName;
const requestName = options.requestName;
const desc: string =
`[${this.connection.id}] The request "${requestName}" ` +
`to "${address}" has been cancelled by the user.`;
Expand Down Expand Up @@ -212,7 +210,7 @@ export class RequestResponseLink implements ReqResLink {
return reject(translate(e));
};

waitTimer = setTimeout(actionAfterTimeout, options!.timeoutInSeconds! * 1000);
waitTimer = setTimeout(actionAfterTimeout, options.timeoutInMs);
this.receiver.on(ReceiverEvents.message, messageCallback);

log.reqres(
Expand Down
22 changes: 11 additions & 11 deletions sdk/core/core-amqp/src/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { delay, isNode } from "./util/utils";
import * as log from "./log";
import {
defaultMaxRetries,
defaultDelayBetweenOperationRetriesInSeconds,
defaultDelayBetweenOperationRetriesInMs,
defaultMaxDelayForExponentialRetryInMs,
defaultMinDelayForExponentialRetryInMs
} from "./util/constants";
Expand Down Expand Up @@ -79,12 +79,12 @@ export interface RetryConfig<T> {
*/
maxRetries?: number;
/**
* @property {number} [delayInSeconds] Amount of time to wait in seconds before making the
* next attempt. Default: 30.
* @property {number} [delayInMs] Amount of time to wait in milliseconds before making the
* next attempt. Default: `30000 milliseconds`.
* When `retryPolicy` option is set to `ExponentialRetryPolicy`, \
* this is used to compute the exponentially increasing delays between retries.
*/
delayInSeconds?: number;
delayInMs?: number;
/**
* @property {string} connectionHost The host "<yournamespace>.servicebus.windows.net".
* Used to check network connectivity.
Expand Down Expand Up @@ -160,8 +160,8 @@ export async function retry<T>(config: RetryConfig<T>): Promise<T> {
if (config.maxRetries == undefined || config.maxRetries < 0) {
config.maxRetries = defaultMaxRetries;
}
if (config.delayInSeconds == undefined || config.delayInSeconds < 0) {
config.delayInSeconds = defaultDelayBetweenOperationRetriesInSeconds;
if (config.delayInMs == undefined || config.delayInMs < 0) {
config.delayInMs = defaultDelayBetweenOperationRetriesInMs;
}
if (config.maxExponentialRetryDelayInMs == undefined || config.maxExponentialRetryDelayInMs < 0) {
config.maxExponentialRetryDelayInMs = defaultMaxDelayForExponentialRetryInMs;
Expand Down Expand Up @@ -216,12 +216,12 @@ export async function retry<T>(config: RetryConfig<T>): Promise<T> {
i,
err
);
let targetDelayInMs = config.delayInSeconds;
let targetDelayInMs = config.delayInMs;
if (config.retryPolicy === RetryPolicy.ExponentialRetryPolicy) {
let incrementDelta = Math.pow(2, i) - 1;
const boundedRandDelta =
config.delayInSeconds * 0.8 +
Math.floor(Math.random() * (config.delayInSeconds * 1.2 - config.delayInSeconds * 0.8));
config.delayInMs * 0.8 +
Math.floor(Math.random() * (config.delayInMs * 1.2 - config.delayInMs * 0.8));
incrementDelta *= boundedRandDelta;

targetDelayInMs = Math.min(
Expand All @@ -232,9 +232,9 @@ export async function retry<T>(config: RetryConfig<T>): Promise<T> {

if (lastError && lastError.retryable) {
log.error(
"[%s] Sleeping for %d seconds for '%s'.",
"[%s] Sleeping for %d milliseconds for '%s'.",
config.connectionId,
targetDelayInMs / 1000,
targetDelayInMs,
config.operationType
);
await delay(targetDelayInMs);
Expand Down
10 changes: 5 additions & 5 deletions sdk/core/core-amqp/src/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export const receiverError = "receiver_error";
export const senderError = "sender_error";
export const sessionError = "session_error";
export const connectionError = "connection_error";
export const defaultOperationTimeoutInSeconds = 60;
export const defaultOperationTimeoutInMs = 60000;
export const managementRequestKey = "managementRequest";
export const negotiateCbsKey = "negotiateCbs";
export const negotiateClaim = "negotiateClaim";
Expand All @@ -71,13 +71,13 @@ export const maxDurationValue = 922337203685477;
export const minDurationValue = -922337203685477;
// https://github.com/Azure/azure-amqp/blob/master/Microsoft.Azure.Amqp/Amqp/AmqpConstants.cs#L47
export const maxAbsoluteExpiryTime = new Date("9999-12-31T07:59:59.000Z").getTime();
export const aadTokenValidityMarginSeconds = 5;
export const aadTokenValidityMarginInMs = 5000;
export const connectionReconnectDelay = 300;
export const defaultMaxRetries = 3;
export const defaultMaxRetriesForConnection = 150;
export const defaultDelayBetweenOperationRetriesInSeconds = 30;
export const defaultMaxDelayForExponentialRetryInMs = 1000 * 90;
export const defaultMinDelayForExponentialRetryInMs = 1000 * 3;
export const defaultDelayBetweenOperationRetriesInMs = 30000;
export const defaultMaxDelayForExponentialRetryInMs = 90000;
export const defaultMinDelayForExponentialRetryInMs = 3000;
export const receiverSettleMode = "receiver-settle-mode";
export const dispositionStatus = "disposition-status";
export const fromSequenceNumber = "from-sequence-number";
Expand Down
8 changes: 4 additions & 4 deletions sdk/core/core-amqp/test/requestResponse.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ describe("RequestResponseLink", function() {
}
}
});
}, 500);
}, 200);
setTimeout(() => {
rcvr.emit("message", {
message: {
Expand All @@ -114,11 +114,11 @@ describe("RequestResponseLink", function() {
body: "Hello World!!"
}
});
}, 1000);
}, 2000);

const sendRequestPromise = async (): Promise<Message> => {
return await link.sendRequest(request, {
timeoutInSeconds: 5
timeoutInMs: 5000
});
};

Expand All @@ -127,7 +127,7 @@ describe("RequestResponseLink", function() {
connectionId: "connection-1",
operationType: RetryOperationType.management,
maxRetries: 3,
delayInSeconds: 1
delayInMs: 1000
};

const message = await retry<Message>(config);
Expand Down
22 changes: 11 additions & 11 deletions sdk/core/core-amqp/test/retry.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dotenv.config();
},
connectionId: "connection-1",
operationType: RetryOperationType.cbsAuth,
delayInSeconds: 15,
delayInMs: 15000,
minExponentialRetryDelayInMs: 0,
retryPolicy: retryPolicy
};
Expand Down Expand Up @@ -66,7 +66,7 @@ dotenv.config();
},
connectionId: "connection-1",
operationType: RetryOperationType.management,
delayInSeconds: 15,
delayInMs: 15000,
minExponentialRetryDelayInMs: 0,
retryPolicy: retryPolicy
};
Expand Down Expand Up @@ -101,7 +101,7 @@ dotenv.config();
connectionId: "connection-1",
operationType: RetryOperationType.receiverLink,
maxRetries: 2,
delayInSeconds: 0.5,
delayInMs: 500,
minExponentialRetryDelayInMs: 0,
retryPolicy: retryPolicy
};
Expand Down Expand Up @@ -140,7 +140,7 @@ dotenv.config();
connectionId: "connection-1",
operationType: RetryOperationType.senderLink,
maxRetries: 2,
delayInSeconds: 0.5,
delayInMs: 500,
minExponentialRetryDelayInMs: 0,
retryPolicy: retryPolicy
};
Expand Down Expand Up @@ -180,7 +180,7 @@ dotenv.config();
connectionId: "connection-1",
operationType: RetryOperationType.sendMessage,
maxRetries: 2,
delayInSeconds: 0.5,
delayInMs: 500,
minExponentialRetryDelayInMs: 0,
retryPolicy: retryPolicy
};
Expand All @@ -207,7 +207,7 @@ dotenv.config();
connectionId: "connection-1",
operationType: RetryOperationType.session,
maxRetries: 4,
delayInSeconds: 0.5,
delayInMs: 500,
minExponentialRetryDelayInMs: 0,
retryPolicy: retryPolicy
};
Expand Down Expand Up @@ -236,7 +236,7 @@ dotenv.config();
},
connectionId: "connection-1",
operationType: RetryOperationType.cbsAuth,
delayInSeconds: 0.5,
delayInMs: 500,
minExponentialRetryDelayInMs: 0,
retryPolicy: retryPolicy
};
Expand Down Expand Up @@ -265,7 +265,7 @@ dotenv.config();
connectionId: "connection-1",
operationType: RetryOperationType.management,
maxRetries: Infinity,
delayInSeconds: 0.5,
delayInMs: 500,
minExponentialRetryDelayInMs: 0,
retryPolicy: retryPolicy
};
Expand Down Expand Up @@ -300,7 +300,7 @@ dotenv.config();
connectionId: "connection-1",
operationType: RetryOperationType.receiverLink,
maxRetries: Infinity,
delayInSeconds: 0.5,
delayInMs: 500,
minExponentialRetryDelayInMs: 0,
retryPolicy: retryPolicy
};
Expand Down Expand Up @@ -339,7 +339,7 @@ dotenv.config();
connectionId: "connection-1",
operationType: RetryOperationType.senderLink,
maxRetries: Infinity,
delayInSeconds: 0.5,
delayInMs: 500,
minExponentialRetryDelayInMs: 0,
retryPolicy: retryPolicy
};
Expand Down Expand Up @@ -379,7 +379,7 @@ dotenv.config();
connectionId: "connection-1",
operationType: RetryOperationType.sendMessage,
maxRetries: Constants.defaultMaxRetriesForConnection,
delayInSeconds: 0.001,
delayInMs: 1,
minExponentialRetryDelayInMs: 0,
retryPolicy: retryPolicy
};
Expand Down
6 changes: 3 additions & 3 deletions sdk/eventhub/event-hubs/src/eventHubClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export interface RetryOptions {
retryInterval?: number;
/**
* Number of milliseconds to wait before declaring that current attempt has timed out which will trigger a retry
* A minimum value of 60 seconds will be used if a value not greater than this is provided.
* A minimum value of `60000` milliseconds will be used if a value not greater than this is provided.
*/
timeoutInMs?: number;
/**
Expand All @@ -62,8 +62,8 @@ export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefine
retryOptions == undefined ||
typeof retryOptions.timeoutInMs !== "number" ||
!isFinite(retryOptions.timeoutInMs) ||
retryOptions.timeoutInMs < Constants.defaultOperationTimeoutInSeconds * 1000
? Constants.defaultOperationTimeoutInSeconds * 1000
retryOptions.timeoutInMs < Constants.defaultOperationTimeoutInMs
? Constants.defaultOperationTimeoutInMs
: retryOptions.timeoutInMs;
return timeoutInMs;
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ export class EventHubReceiver extends LinkEntity {
const linkCreationConfig: RetryConfig<void> = {
connectionId: this._context.connectionId,
connectionHost: this._context.config.host,
delayInSeconds: 15,
delayInMs: 15000,
operation: () => this.initialize(initOptions),
operationType: RetryOperationType.receiverLink,
maxRetries: Constants.defaultMaxRetriesForConnection
Expand Down
12 changes: 5 additions & 7 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ export class EventHubSender extends LinkEntity {
operationType: RetryOperationType.senderLink,
maxRetries: Constants.defaultMaxRetriesForConnection,
connectionHost: this._context.config.host,
delayInSeconds: 15
delayInMs: 15000
};
return retry<void>(config);
});
Expand Down Expand Up @@ -397,9 +397,9 @@ export class EventHubSender extends LinkEntity {
connectionId: this._context.connectionId,
operationType: RetryOperationType.senderLink,
maxRetries: retryOptions.maxRetries,
delayInSeconds:
delayInMs:
typeof retryOptions.retryInterval === "number"
? retryOptions.retryInterval / 1000
? retryOptions.retryInterval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesnt need any checks anymore. We should be able to say delayInMs: retryOptions.retryInterval and expect core-amqp to do all the checks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for other places that have this pattern

: undefined,
retryPolicy: retryOptions.retryPolicy,
minExponentialRetryDelayInMs: retryOptions.minExponentialRetryDelayInMs,
Expand Down Expand Up @@ -748,10 +748,8 @@ export class EventHubSender extends LinkEntity {
connectionId: this._context.connectionId,
operationType: RetryOperationType.sendMessage,
maxRetries: retryOptions.maxRetries,
delayInSeconds:
typeof retryOptions.retryInterval === "number"
? retryOptions.retryInterval / 1000
: undefined,
delayInMs:
typeof retryOptions.retryInterval === "number" ? retryOptions.retryInterval : undefined,
retryPolicy: retryOptions.retryPolicy,
minExponentialRetryDelayInMs: retryOptions.minExponentialRetryDelayInMs,
maxExponentialRetryDelayInMs: retryOptions.maxExponentialRetryDelayInMs
Expand Down
Loading