Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Use AwaitableSender in lieu of Sender #4446

Merged
merged 25 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
260b0e6
[Event Hubs] Introduce timeoutInMs on RetryOptions (#4239)
ramya0820 Jul 11, 2019
c74f5e4
Merge branch 'master' of https://github.com/ramya0820/azure-sdk-for-js
Jul 13, 2019
00016cb
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 15, 2019
5c12c13
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 16, 2019
2081256
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
Jul 17, 2019
f3511e0
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 19, 2019
7f00436
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 20, 2019
e2dc78b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
Jul 23, 2019
232390c
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
Jul 23, 2019
05046df
Replace Sender -> AwaitableSender
Jul 26, 2019
c58e6eb
Address comments
Jul 26, 2019
298990b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
Jul 26, 2019
5956ffa
Remove comment
Jul 30, 2019
b2633be
Pass sendTimeout to AwaitableSender
Jul 30, 2019
264794c
Refactor translate()
Jul 30, 2019
aa664ff
Prepend rejects with return
Jul 31, 2019
25e3f63
Refactor translate()
Jul 31, 2019
edf1949
Add tests
Jul 31, 2019
ebd7dde
Refactor and fix timeout to init()
Jul 31, 2019
11dbe10
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
Jul 31, 2019
cb14ba3
Address comments
Aug 1, 2019
f5f0428
Address comments
Aug 1, 2019
abc9689
Refactor translate()
Aug 2, 2019
ffca3c9
Address comments
Aug 2, 2019
dee862c
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
Aug 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sdk/core/core-amqp/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,18 @@ export function translate(err: AmqpError | Error): MessagingError {

let error: MessagingError = err as MessagingError;

// InsufficientCreditError occurs when the number of credits available on Rhea link is insufficient.
// Since reasons for such shortage can be transient such as for pending delivery of messages, this is treated as a retryable error.
if (
// instanceof checks on custom Errors doesn't work without manually setting the prototype within the error.
// Must do a name check until OperationTimeoutError is updated, and that doesn't break compatibility
// https://github.com/Microsoft/TypeScript/wiki/Breaking-Changes#extending-built-ins-like-error-array-and-map-may-no-longer-work
(err as Error).name === "InsufficientCreditError"
) {
error.retryable = true;
return error;
}

// OperationTimeoutError occurs when the service fails to respond within a given timeframe.
// Since reasons for such failures can be transient, this is treated as a retryable error.
if (
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { AbortSignalLike } from '@azure/abort-controller';
import { AmqpError } from 'rhea-promise';
import { AwaitableSender } from 'rhea-promise';
import { ConnectionContextBase } from '@azure/core-amqp';
import { DataTransformer } from '@azure/core-amqp';
import { DefaultDataTransformer } from '@azure/core-amqp';
Expand All @@ -15,7 +16,6 @@ import { EventHubConnectionConfig } from '@azure/core-amqp';
import { MessagingError } from '@azure/core-amqp';
import { Receiver } from 'rhea-promise';
import { ReceiverOptions } from 'rhea-promise';
import { Sender } from 'rhea-promise';
import { SharedKeyCredential } from '@azure/core-amqp';
import { TokenCredential } from '@azure/core-amqp';
import { TokenType } from '@azure/core-amqp';
Expand Down
149 changes: 44 additions & 105 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@
import uuid from "uuid/v4";
import * as log from "./log";
import {
Sender,
AwaitableSender,
EventContext,
OnAmqpEvent,
SenderOptions as RheaSenderOptions,
SenderEvents,
AwaitableSenderOptions,
message,
AmqpError
} from "rhea-promise";
import {
defaultLock,
Func,
retry,
translate,
AmqpMessage,
Expand All @@ -36,6 +34,7 @@ import { getRetryAttemptTimeoutInMs, RetryOptions } from "./eventHubClient";
*/
interface CreateSenderOptions {
newName?: boolean;
sendTimeoutInSeconds?: number;
}

/**
Expand Down Expand Up @@ -79,7 +78,7 @@ export class EventHubSender extends LinkEntity {
* @property [_sender] The AMQP sender link.
* @private
*/
private _sender?: Sender;
private _sender?: AwaitableSender;

/**
* Creates a new EventHubSender instance.
Expand Down Expand Up @@ -278,7 +277,7 @@ export class EventHubSender extends LinkEntity {
}
if (shouldReopen) {
await defaultLock.acquire(this.senderLock, () => {
const options: RheaSenderOptions = this._createSenderOptions({
const options: AwaitableSenderOptions = this._createSenderOptions({
newName: true
});
// shall retry forever at an interval of 15 seconds if the error is a retryable error
Expand Down Expand Up @@ -524,9 +523,9 @@ export class EventHubSender extends LinkEntity {
);
}

private _createSenderOptions(options: CreateSenderOptions): RheaSenderOptions {
private _createSenderOptions(options: CreateSenderOptions): AwaitableSenderOptions {
if (options.newName) this.name = `${uuid()}`;
const srOptions: RheaSenderOptions = {
const srOptions: AwaitableSenderOptions = {
name: this.name,
target: {
address: this.address
Expand Down Expand Up @@ -557,85 +556,18 @@ export class EventHubSender extends LinkEntity {
const abortSignal: AbortSignalLike | undefined = options.abortSignal;
const sendEventPromise = () =>
new Promise<void>(async (resolve, reject) => {
let waitTimer: any;

let onRejected: Func<EventContext, void>;
let onReleased: Func<EventContext, void>;
let onModified: Func<EventContext, void>;
let onAccepted: Func<EventContext, void>;
let onAborted: () => void;

const rejectOnAbort = () => {
const desc: string =
`[${this._context.connectionId}] The send operation on the Sender "${
this.name
}" with ` + `address "${this.address}" has been cancelled by the user.`;
`[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` +
`address "${this.address}" has been cancelled by the user.`;
log.error(desc);
return reject(new AbortError("The send operation has been cancelled by the user."));
};

if (abortSignal && abortSignal.aborted) {
// operation has been cancelled, so exit quickly
return rejectOnAbort();
}

onAborted = () => {
removeListeners();
rejectOnAbort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll want to add a return here or else the function will continue running.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rejectOnAbort has a return reject(..) in it, would we still need another return here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. rejectOnAbort is a function. The return in it only returns from that function and not the outer code set of {}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated..

};

onAccepted = (context: EventContext) => {
// Since we will be adding listener for accepted and rejected event every time
// we send a message, we need to remove listener for both the events.
// This will ensure duplicate listeners are not added for the same event.
removeListeners();
log.sender(
"[%s] Sender '%s', got event accepted.",
this._context.connectionId,
this.name
);
resolve();
};

onRejected = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name);
const err = translate(context!.delivery!.remote_state!.error);
log.error(err);
reject(err);
};

onReleased = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender '${this.name}', ` +
`received a release disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
reject(err);
};

onModified = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender "${this.name}", ` +
`received a modified disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
reject(err);
};
}

const removeListeners = (): void => {
clearTimeout(waitTimer);
Expand All @@ -644,14 +576,17 @@ export class EventHubSender extends LinkEntity {
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
if (this._sender) {
this._sender.removeListener(SenderEvents.rejected, onRejected);
this._sender.removeListener(SenderEvents.accepted, onAccepted);
this._sender.removeListener(SenderEvents.released, onReleased);
this._sender.removeListener(SenderEvents.modified, onModified);
}
};

const onAborted = () => {
removeListeners();
rejectOnAbort();
};

if (abortSignal) {
abortSignal.addEventListener("abort", onAborted);
}

const actionAfterTimeout = () => {
removeListeners();
const desc: string =
Expand All @@ -666,11 +601,7 @@ export class EventHubSender extends LinkEntity {
return reject(translate(e));
};

if (abortSignal) {
abortSignal.addEventListener("abort", onAborted);
}

waitTimer = setTimeout(
const waitTimer = setTimeout(
actionAfterTimeout,
getRetryAttemptTimeoutInMs(options.retryOptions)
);
Expand All @@ -684,13 +615,13 @@ export class EventHubSender extends LinkEntity {

try {
await defaultLock.acquire(this.senderLock, () => {
return this._init();
return this._init(this._createSenderOptions({}));
});
} catch (err) {
clearTimeout(waitTimer);
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
clearTimeout(waitTimer);
err = translate(err);
log.error(
"[%s] An error occurred while creating the sender %s",
Expand All @@ -716,18 +647,26 @@ export class EventHubSender extends LinkEntity {
this.name
);

this._sender!.on(SenderEvents.accepted, onAccepted);
this._sender!.on(SenderEvents.rejected, onRejected);
this._sender!.on(SenderEvents.modified, onModified);
this._sender!.on(SenderEvents.released, onReleased);

const delivery = this._sender!.send(message, undefined, 0x80013700);
log.sender(
"[%s] Sender '%s', sent message with delivery id: %d",
this._context.connectionId,
this.name,
delivery.id
);
try {
const delivery = await this._sender!.send(message, undefined, 0x80013700);
log.sender(
"[%s] Sender '%s', sent message with delivery id: %d",
this._context.connectionId,
this.name,
delivery.id
);
} catch (err) {
err = translate(err);
log.error(
"[%s] An error occurred while sending the message",
this._context.connectionId,
err
);
return reject(err);
} finally {
clearTimeout(waitTimer);
}
return resolve();
} else {
// let us retry to send the message after some time.
const msg =
Expand Down Expand Up @@ -764,7 +703,7 @@ export class EventHubSender extends LinkEntity {
* @ignore
* @returns
*/
private async _init(options?: RheaSenderOptions): Promise<void> {
private async _init(options?: AwaitableSenderOptions): Promise<void> {
try {
// isOpen isConnecting Should establish
// true false No
Expand All @@ -785,7 +724,7 @@ export class EventHubSender extends LinkEntity {
if (!options) {
options = this._createSenderOptions({});
}
this._sender = await this._context.connection.createSender(options);
this._sender = await this._context.connection.createAwaitableSender(options);
this.isConnecting = false;
log.error(
"[%s] Sender '%s' with address '%s' has established itself.",
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-hubs/src/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
TokenType
} from "@azure/core-amqp";
import { ConnectionContext } from "./connectionContext";
import { Sender, Receiver } from "rhea-promise";
import { AwaitableSender, Receiver } from "rhea-promise";
import * as log from "./log";

/**
Expand Down Expand Up @@ -230,7 +230,7 @@ export class LinkEntity {
* @param [link] The Sender or Receiver link that needs to be closed and
* removed.
*/
protected async _closeLink(link?: Sender | Receiver): Promise<void> {
protected async _closeLink(link?: AwaitableSender | Receiver): Promise<void> {
clearTimeout(this._tokenRenewalTimer as NodeJS.Timer);
if (link) {
try {
Expand Down