Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Use AwaitableSender in lieu of Sender #4446

Merged
merged 25 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
260b0e6
[Event Hubs] Introduce timeoutInMs on RetryOptions (#4239)
ramya0820 Jul 11, 2019
c74f5e4
Merge branch 'master' of https://github.com/ramya0820/azure-sdk-for-js
Jul 13, 2019
00016cb
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 15, 2019
5c12c13
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 16, 2019
2081256
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
Jul 17, 2019
f3511e0
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 19, 2019
7f00436
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 20, 2019
e2dc78b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
Jul 23, 2019
232390c
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
Jul 23, 2019
05046df
Replace Sender -> AwaitableSender
Jul 26, 2019
c58e6eb
Address comments
Jul 26, 2019
298990b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
Jul 26, 2019
5956ffa
Remove comment
Jul 30, 2019
b2633be
Pass sendTimeout to AwaitableSender
Jul 30, 2019
264794c
Refactor translate()
Jul 30, 2019
aa664ff
Prepend rejects with return
Jul 31, 2019
25e3f63
Refactor translate()
Jul 31, 2019
edf1949
Add tests
Jul 31, 2019
ebd7dde
Refactor and fix timeout to init()
Jul 31, 2019
11dbe10
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
Jul 31, 2019
cb14ba3
Address comments
Aug 1, 2019
f5f0428
Address comments
Aug 1, 2019
abc9689
Refactor translate()
Aug 2, 2019
ffca3c9
Address comments
Aug 2, 2019
dee862c
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
Aug 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 40 additions & 21 deletions sdk/core/core-amqp/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -481,14 +481,24 @@ export const retryableErrors: string[] = [
"ServerBusyError",
"ServiceUnavailableError",
"OperationCancelledError",

// OperationTimeoutError occurs when the service fails to respond within a given timeframe.
// Since reasons for such failures can be transient, this is treated as a retryable error.
"OperationTimeoutError",

"SenderBusyError",
"MessagingError",
"DetachForcedError",
"ConnectionForcedError",
"TransferLimitExceededError"
"TransferLimitExceededError",

// InsufficientCreditError occurs when the number of credits available on Rhea link is insufficient.
// Since reasons for such shortage can be transient such as for pending delivery of messages, this is treated as a retryable error.
"InsufficientCreditError"
];

export const nonRetryableErrors: string[] = ["AbortError"];

/**
* Maps some SytemErrors to amqp error conditions
* @enum SystemErrorConditionMapper
Expand Down Expand Up @@ -539,6 +549,29 @@ function isBrowserWebsocketError(err: any): boolean {
return result;
}

/**
* @internal
* Checks if given object maps to a valid custom error. If yes, configures and returns the appropriate error instance, else returns `undefined`.
* @param err
*/
function getCustomError(err: AmqpError | Error): MessagingError | undefined {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to cover any of the amqp errors here because amqp errors should undergo the appropriate processing as covered in the translate(). Therefore, make an early exit from here if isAmqpError(err) returns true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we have updated the if/else in translate() to use the result of this method only when it is not an amqp error. That is good.
But here since the input supports both AmqpError and simple Error, it is still confusing to anyone reading this code independently

My first suggestion would have been to update this method to take in only Error, but this might not work out due to typing issues. Because as far as the typescript compiler is concerned, the err object you pass in could be either of the 2 types.

Please consider not having this as a separate method and moving the code here directly into translate()

const error: MessagingError = err as MessagingError;
if (
// instanceof checks on custom Errors doesn't work without manually setting the prototype within the error.
// Must do a name check until the custom error is updated, and that doesn't break compatibility
// https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work
retryableErrors.indexOf((err as Error).name) > 0
) {
error.retryable = true;
return error;
} else if (nonRetryableErrors.indexOf((err as Error).name) > 0) {
error.retryable = false;
return error;
} else {
return undefined;
}
}

/**
* Translates the AQMP error received at the protocol layer or a generic Error into a MessagingError.
*
Expand All @@ -551,30 +584,16 @@ export function translate(err: AmqpError | Error): MessagingError {
return err as MessagingError;
}

let error: MessagingError = err as MessagingError;

// OperationTimeoutError occurs when the service fails to respond within a given timeframe.
// Since reasons for such failures can be transient, this is treated as a retryable error.
if (
// instanceof checks on custom Errors doesn't work without manually setting the prototype within the error.
// Must do a name check until OperationTimeoutError is updated, and that doesn't break compatibility
// https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work
(err as Error).name === "OperationTimeoutError"
) {
error.retryable = true;
return error;
const customError = getCustomError(err);
if (customError) {
return customError;
}

let error: MessagingError = err as MessagingError;

// Built-in errors like TypeError and RangeError should not be retryable as these indicate issues
// with user input and not an issue with the Messaging process.
if (
err instanceof TypeError ||
err instanceof RangeError ||
// instanceof checks on custom Errors doesn't work without manually setting the prototype within the error.
// Must do a name check until AbortError is updated, and that doesn't break compatibility
// https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work
(err as Error).name === "AbortError"
) {
if (err instanceof TypeError || err instanceof RangeError) {
error.retryable = false;
return error;
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { AbortSignalLike } from '@azure/abort-controller';
import { AmqpError } from 'rhea-promise';
import { AwaitableSender } from 'rhea-promise';
import { ConnectionContextBase } from '@azure/core-amqp';
import { DataTransformer } from '@azure/core-amqp';
import { DefaultDataTransformer } from '@azure/core-amqp';
Expand All @@ -15,7 +16,6 @@ import { EventHubConnectionConfig } from '@azure/core-amqp';
import { MessagingError } from '@azure/core-amqp';
import { Receiver } from 'rhea-promise';
import { ReceiverOptions } from 'rhea-promise';
import { Sender } from 'rhea-promise';
import { SharedKeyCredential } from '@azure/core-amqp';
import { TokenCredential } from '@azure/core-amqp';
import { TokenType } from '@azure/core-amqp';
Expand Down
161 changes: 50 additions & 111 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@
import uuid from "uuid/v4";
import * as log from "./log";
import {
Sender,
AwaitableSender,
EventContext,
OnAmqpEvent,
SenderOptions as RheaSenderOptions,
SenderEvents,
AwaitableSenderOptions,
message,
AmqpError
} from "rhea-promise";
import {
defaultLock,
Func,
retry,
translate,
AmqpMessage,
Expand All @@ -36,6 +34,7 @@ import { getRetryAttemptTimeoutInMs, RetryOptions } from "./eventHubClient";
*/
interface CreateSenderOptions {
newName?: boolean;
sendTimeoutInSeconds?: number;
}

/**
Expand Down Expand Up @@ -79,7 +78,7 @@ export class EventHubSender extends LinkEntity {
* @property [_sender] The AMQP sender link.
* @private
*/
private _sender?: Sender;
private _sender?: AwaitableSender;

/**
* Creates a new EventHubSender instance.
Expand Down Expand Up @@ -278,7 +277,7 @@ export class EventHubSender extends LinkEntity {
}
if (shouldReopen) {
await defaultLock.acquire(this.senderLock, () => {
const options: RheaSenderOptions = this._createSenderOptions({
const options: AwaitableSenderOptions = this._createSenderOptions({
newName: true
});
// shall retry forever at an interval of 15 seconds if the error is a retryable error
Expand Down Expand Up @@ -524,17 +523,18 @@ export class EventHubSender extends LinkEntity {
);
}

private _createSenderOptions(options: CreateSenderOptions): RheaSenderOptions {
private _createSenderOptions(options: CreateSenderOptions): AwaitableSenderOptions {
if (options.newName) this.name = `${uuid()}`;
const srOptions: RheaSenderOptions = {
const srOptions: AwaitableSenderOptions = {
name: this.name,
target: {
address: this.address
},
onError: this._onAmqpError,
onClose: this._onAmqpClose,
onSessionError: this._onSessionError,
onSessionClose: this._onSessionClose
onSessionClose: this._onSessionClose,
sendTimeoutInSeconds: options.sendTimeoutInSeconds
};
log.sender("Creating sender with options: %O", srOptions);
return srOptions;
Expand All @@ -557,101 +557,35 @@ export class EventHubSender extends LinkEntity {
const abortSignal: AbortSignalLike | undefined = options.abortSignal;
const sendEventPromise = () =>
new Promise<void>(async (resolve, reject) => {
let waitTimer: any;

let onRejected: Func<EventContext, void>;
let onReleased: Func<EventContext, void>;
let onModified: Func<EventContext, void>;
let onAccepted: Func<EventContext, void>;
let onAborted: () => void;

const rejectOnAbort = () => {
const desc: string =
`[${this._context.connectionId}] The send operation on the Sender "${
this.name
}" with ` + `address "${this.address}" has been cancelled by the user.`;
`[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` +
`address "${this.address}" has been cancelled by the user.`;
log.error(desc);
return reject(new AbortError("The send operation has been cancelled by the user."));
};

if (abortSignal && abortSignal.aborted) {
// operation has been cancelled, so exit quickly
return rejectOnAbort();
}

onAborted = () => {
removeListeners();
rejectOnAbort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll want to add a return here or else the function will continue running.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rejectOnAbort has a return reject(..) in it, would we still need another return here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. rejectOnAbort is a function. The return in it only returns from that function and not the outer code set of {}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated..

};

onAccepted = (context: EventContext) => {
// Since we will be adding listener for accepted and rejected event every time
// we send a message, we need to remove listener for both the events.
// This will ensure duplicate listeners are not added for the same event.
removeListeners();
log.sender(
"[%s] Sender '%s', got event accepted.",
this._context.connectionId,
this.name
);
resolve();
};

onRejected = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name);
const err = translate(context!.delivery!.remote_state!.error);
log.error(err);
reject(err);
};

onReleased = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender '${this.name}', ` +
`received a release disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
reject(err);
};

onModified = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender "${this.name}", ` +
`received a modified disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
reject(err);
};
}

const removeListeners = (): void => {
clearTimeout(waitTimer);
// When `removeListeners` is called on timeout, the sender might be closed and cleared
// So, check if it exists, before removing listeners from it.
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
if (this._sender) {
this._sender.removeListener(SenderEvents.rejected, onRejected);
this._sender.removeListener(SenderEvents.accepted, onAccepted);
this._sender.removeListener(SenderEvents.released, onReleased);
this._sender.removeListener(SenderEvents.modified, onModified);
}
};

const onAborted = () => {
removeListeners();
rejectOnAbort();
};

if (abortSignal) {
abortSignal.addEventListener("abort", onAborted);
}

const actionAfterTimeout = () => {
removeListeners();
const desc: string =
Expand All @@ -666,11 +600,7 @@ export class EventHubSender extends LinkEntity {
return reject(translate(e));
};

if (abortSignal) {
abortSignal.addEventListener("abort", onAborted);
}

waitTimer = setTimeout(
const waitTimer = setTimeout(
actionAfterTimeout,
getRetryAttemptTimeoutInMs(options.retryOptions)
);
Expand All @@ -684,13 +614,14 @@ export class EventHubSender extends LinkEntity {

try {
await defaultLock.acquire(this.senderLock, () => {
return this._init();
return this._init(
this._createSenderOptions({
sendTimeoutInSeconds: getRetryAttemptTimeoutInMs(options.retryOptions)
})
);
});
} catch (err) {
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
clearTimeout(waitTimer);
removeListeners();
err = translate(err);
log.error(
"[%s] An error occurred while creating the sender %s",
Expand All @@ -716,18 +647,26 @@ export class EventHubSender extends LinkEntity {
this.name
);

this._sender!.on(SenderEvents.accepted, onAccepted);
this._sender!.on(SenderEvents.rejected, onRejected);
this._sender!.on(SenderEvents.modified, onModified);
this._sender!.on(SenderEvents.released, onReleased);

const delivery = this._sender!.send(message, undefined, 0x80013700);
log.sender(
"[%s] Sender '%s', sent message with delivery id: %d",
this._context.connectionId,
this.name,
delivery.id
);
try {
const delivery = await this._sender!.send(message, undefined, 0x80013700);
log.sender(
"[%s] Sender '%s', sent message with delivery id: %d",
this._context.connectionId,
this.name,
delivery.id
);
return resolve();
} catch (err) {
err = translate(err);
log.error(
"[%s] An error occurred while sending the message",
this._context.connectionId,
err
);
return reject(err);
} finally {
removeListeners();
}
} else {
// let us retry to send the message after some time.
const msg =
Expand Down Expand Up @@ -764,7 +703,7 @@ export class EventHubSender extends LinkEntity {
* @ignore
* @returns
*/
private async _init(options?: RheaSenderOptions): Promise<void> {
private async _init(options?: AwaitableSenderOptions): Promise<void> {
try {
// isOpen isConnecting Should establish
// true false No
Expand All @@ -785,7 +724,7 @@ export class EventHubSender extends LinkEntity {
if (!options) {
options = this._createSenderOptions({});
}
this._sender = await this._context.connection.createSender(options);
this._sender = await this._context.connection.createAwaitableSender(options);
this.isConnecting = false;
log.error(
"[%s] Sender '%s' with address '%s' has established itself.",
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/src/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
TokenType
} from "@azure/core-amqp";
import { ConnectionContext } from "./connectionContext";
import { Sender, Receiver } from "rhea-promise";
import { AwaitableSender, Receiver } from "rhea-promise";
import * as log from "./log";

/**
Expand Down Expand Up @@ -230,7 +230,7 @@ export class LinkEntity {
* @param [link] The Sender or Receiver link that needs to be closed and
* removed.
*/
protected async _closeLink(link?: Sender | Receiver): Promise<void> {
protected async _closeLink(link?: AwaitableSender | Receiver): Promise<void> {
clearTimeout(this._tokenRenewalTimer as NodeJS.Timer);
if (link) {
try {
Expand Down