Skip to content

Commit

Permalink
[Event Hubs] Update send operation to include initialization (#4319)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramya0820 authored Jul 22, 2019
1 parent ba21be6 commit 703f599
Showing 1 changed file with 138 additions and 125 deletions.
263 changes: 138 additions & 125 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import {
ErrorNameConditionMapper,
RetryConfig,
RetryOperationType,
Constants,
randomNumberFromInterval
Constants
} from "@azure/core-amqp";
import { EventData, toAmqpMessage } from "./eventData";
import { ConnectionContext } from "./connectionContext";
Expand Down Expand Up @@ -469,16 +468,6 @@ export class EventHubSender extends LinkEntity {
throw error;
}

if (!this.isOpen()) {
log.sender(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);
await defaultLock.acquire(this.senderLock, () => {
return this._init();
});
}
log.sender(
"[%s] Sender '%s', trying to send EventData[].",
this._context.connectionId,
Expand Down Expand Up @@ -567,146 +556,171 @@ export class EventHubSender extends LinkEntity {
): Promise<void> {
const abortSignal: AbortSignalLike | undefined = options.abortSignal;
const sendEventPromise = () =>
new Promise<void>((resolve, reject) => {
new Promise<void>(async (resolve, reject) => {
let waitTimer: any;

let onRejected: Func<EventContext, void>;
let onReleased: Func<EventContext, void>;
let onModified: Func<EventContext, void>;
let onAccepted: Func<EventContext, void>;
let onAborted: () => void;

const rejectOnAbort = () => {
const desc: string =
`[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` +
`address "${this.address}" has been cancelled by the user.`;
`[${this._context.connectionId}] The send operation on the Sender "${
this.name
}" with ` + `address "${this.address}" has been cancelled by the user.`;
log.error(desc);
reject(new AbortError("The send operation has been cancelled by the user."));
return reject(new AbortError("The send operation has been cancelled by the user."));
};

if (abortSignal && abortSignal.aborted) {
// operation has been cancelled, so exit quickly
return rejectOnAbort();
}

let waitTimer: any;
log.sender(
"[%s] Sender '%s', credit: %d available: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
this._sender!.session.outgoing.available()
);
if (this._sender!.sendable()) {
onAborted = () => {
removeListeners();
rejectOnAbort();
};

onAccepted = (context: EventContext) => {
// Since we will be adding listener for accepted and rejected event every time
// we send a message, we need to remove listener for both the events.
// This will ensure duplicate listeners are not added for the same event.
removeListeners();
log.sender(
"[%s] Sender '%s', sending message with id '%s'.",
"[%s] Sender '%s', got event accepted.",
this._context.connectionId,
this.name
);
let onRejected: Func<EventContext, void>;
let onReleased: Func<EventContext, void>;
let onModified: Func<EventContext, void>;
let onAccepted: Func<EventContext, void>;
let onAborted: () => void;
resolve();
};

const removeListeners = (): void => {
clearTimeout(waitTimer);
// When `removeListeners` is called on timeout, the sender might be closed and cleared
// So, check if it exists, before removing listeners from it.
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
if (this._sender) {
this._sender.removeListener(SenderEvents.rejected, onRejected);
this._sender.removeListener(SenderEvents.accepted, onAccepted);
this._sender.removeListener(SenderEvents.released, onReleased);
this._sender.removeListener(SenderEvents.modified, onModified);
}
};
onRejected = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name);
const err = translate(context!.delivery!.remote_state!.error);
log.error(err);
reject(err);
};

onAborted = () => {
removeListeners();
rejectOnAbort();
};
onAccepted = (context: EventContext) => {
// Since we will be adding listener for accepted and rejected event every time
// we send a message, we need to remove listener for both the events.
// This will ensure duplicate listeners are not added for the same event.
removeListeners();
log.sender(
"[%s] Sender '%s', got event accepted.",
this._context.connectionId,
this.name
onReleased = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender '${this.name}', ` +
`received a release disposition.Hence we are rejecting the promise.`
);
resolve();
};
onRejected = (context: EventContext) => {
removeListeners();
log.error(
"[%s] Sender '%s', got event rejected.",
this._context.connectionId,
this.name
}
log.error(err);
reject(err);
};

onModified = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender "${this.name}", ` +
`received a modified disposition.Hence we are rejecting the promise.`
);
const err = translate(context!.delivery!.remote_state!.error);
log.error(err);
reject(err);
}
log.error(err);
reject(err);
};

const removeListeners = (): void => {
clearTimeout(waitTimer);
// When `removeListeners` is called on timeout, the sender might be closed and cleared
// So, check if it exists, before removing listeners from it.
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
if (this._sender) {
this._sender.removeListener(SenderEvents.rejected, onRejected);
this._sender.removeListener(SenderEvents.accepted, onAccepted);
this._sender.removeListener(SenderEvents.released, onReleased);
this._sender.removeListener(SenderEvents.modified, onModified);
}
};

const actionAfterTimeout = () => {
removeListeners();
const desc: string =
`[${this._context.connectionId}] Sender "${this.name}" with ` +
`address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
log.error(desc);
const e: Error = {
name: "OperationTimeoutError",
message: desc
};
onReleased = (context: EventContext) => {
removeListeners();
log.error(
"[%s] Sender '%s', got event released.",
this._context.connectionId,
this.name
);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender '${this.name}', ` +
`received a release disposition.Hence we are rejecting the promise.`
);
return reject(translate(e));
};

if (abortSignal) {
abortSignal.addEventListener("abort", onAborted);
}

waitTimer = setTimeout(
actionAfterTimeout,
getRetryAttemptTimeoutInMs(options.retryOptions)
);

if (!this.isOpen()) {
log.sender(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);

try {
await defaultLock.acquire(this.senderLock, () => {
return this._init();
});
} catch (err) {
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
log.error(err);
reject(err);
};
onModified = (context: EventContext) => {
removeListeners();
clearTimeout(waitTimer);
err = translate(err);
log.error(
"[%s] Sender '%s', got event modified.",
"[%s] An error occurred while creating the sender %s",
this._context.connectionId,
this.name
this.name,
err
);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender "${this.name}", ` +
`received a modified disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
reject(err);
};
return reject(err);
}
}

const actionAfterTimeout = () => {
removeListeners();
const desc: string =
`[${this._context.connectionId}] Sender "${this.name}" with ` +
`address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
log.error(desc);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description: desc
};
return reject(translate(e));
};
log.sender(
"[%s] Sender '%s', credit: %d available: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
this._sender!.session.outgoing.available()
);
if (this._sender!.sendable()) {
log.sender(
"[%s] Sender '%s', sending message with id '%s'.",
this._context.connectionId,
this.name
);

if (abortSignal) {
abortSignal.addEventListener("abort", onAborted);
}
this._sender!.on(SenderEvents.accepted, onAccepted);
this._sender!.on(SenderEvents.rejected, onRejected);
this._sender!.on(SenderEvents.modified, onModified);
this._sender!.on(SenderEvents.released, onReleased);
waitTimer = setTimeout(
actionAfterTimeout,
getRetryAttemptTimeoutInMs(options.retryOptions)
);

const delivery = this._sender!.send(message, undefined, 0x80013700);
log.sender(
"[%s] Sender '%s', sent message with delivery id: %d",
Expand All @@ -728,7 +742,6 @@ export class EventHubSender extends LinkEntity {
}
});

const jitterInSeconds = randomNumberFromInterval(1, 4);
const maxRetries = options.retryOptions && options.retryOptions.maxRetries;
const delayInSeconds =
options.retryOptions &&
Expand All @@ -741,7 +754,7 @@ export class EventHubSender extends LinkEntity {
connectionId: this._context.connectionId,
operationType: RetryOperationType.sendMessage,
maxRetries: maxRetries,
delayInSeconds: delayInSeconds + jitterInSeconds
delayInSeconds: delayInSeconds
};
return retry<void>(config);
}
Expand Down

0 comments on commit 703f599

Please sign in to comment.