Skip to content

Commit

Permalink
[Service Bus] Surface Session Lock Lost error to the user
Browse files Browse the repository at this point in the history
  • Loading branch information
ramya0820 authored and ramya-rao-a committed Feb 12, 2019
1 parent de1cbe9 commit 297716e
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 136 deletions.
19 changes: 17 additions & 2 deletions packages/@azure/servicebus/data-plane/lib/clientEntityContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ export interface ClientEntityContextBase {
* objects associated with this client.
*/
messageSessions: Dictionary<MessageSession>;
/**
* @property {Dictionary<MessageSession>} expiredMessageSessions A dictionary that stores expired message sessions IDs.
*/
expiredMessageSessions: Dictionary<Boolean>;
/**
* @property {MessageSender} [sender] The ServiceBus sender associated with the client entity.
*/
Expand All @@ -77,7 +81,7 @@ export interface ClientEntityContextBase {
*/
export interface ClientEntityContext extends ClientEntityContextBase {
detached(error?: AmqpError | Error): Promise<void>;
getReceiver(name: string, sessionId?: string): MessageReceiver | MessageSession | undefined;
getReceiver(name: string, sessionId?: string): MessageReceiver | MessageSession;
}

/**
Expand Down Expand Up @@ -113,14 +117,23 @@ export namespace ClientEntityContext {
entityPath: entityPath,
requestResponseLockedMessages: new ConcurrentExpiringMap<string>(),
isSessionEnabled: !!options.isSessionEnabled,
messageSessions: {}
messageSessions: {},
expiredMessageSessions: {}
};

(entityContext as ClientEntityContext).sessionManager = new SessionManager(
entityContext as ClientEntityContext
);

(entityContext as ClientEntityContext).getReceiver = (name: string, sessionId?: string) => {
if (sessionId && entityContext.expiredMessageSessions[sessionId]) {
const error = new Error(
`The session lock has expired on the session with id ${sessionId}.`
);
error.name = "SessionLockLostError";
throw error;
}

let receiver: MessageReceiver | MessageSession | undefined = undefined;
if (
sessionId != undefined &&
Expand All @@ -132,6 +145,8 @@ export namespace ClientEntityContext {
receiver = entityContext.streamingReceiver;
} else if (entityContext.batchingReceiver && entityContext.batchingReceiver.name === name) {
receiver = entityContext.batchingReceiver;
} else {
throw new Error(`Cannot find the receiver with name '${name}'.`);
}
return receiver;
};
Expand Down
22 changes: 12 additions & 10 deletions packages/@azure/servicebus/data-plane/lib/queueClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,18 @@ export class QueueClient extends Client {
*/
async getSessionReceiver(options?: SessionReceiverOptions): Promise<SessionReceiver> {
if (!options) options = {};
if (
options.sessionId &&
this._context.messageSessions[options.sessionId] &&
this._context.messageSessions[options.sessionId].isOpen()
) {
throw new Error(
`Close the current session receiver for sessionId ${
options.sessionId
} before using "getSessionReceiver" to create a new one for the same sessionId`
);
if (options.sessionId) {
if (
this._context.messageSessions[options.sessionId] &&
this._context.messageSessions[options.sessionId].isOpen()
) {
throw new Error(
`Close the current session receiver for sessionId ${
options.sessionId
} before using "getSessionReceiver" to create a new one for the same sessionId`
);
}
delete this._context.expiredMessageSessions[options.sessionId];
}
this._context.isSessionEnabled = true;
const messageSession = await MessageSession.create(this._context, options);
Expand Down
80 changes: 34 additions & 46 deletions packages/@azure/servicebus/data-plane/lib/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -886,17 +886,14 @@ export class ServiceBusMessage implements ReceivedMessage {
return;
}
const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId);
if (receiver) {
if (receiver.receiveMode !== ReceiveMode.peekLock) {
throw new Error("The operation is only supported in 'PeekLock' receive mode.");
}
if (this.delivery.remote_settled) {
throw new Error("This message has been already settled.");
}
return receiver.settleMessage(this, DispositionType.complete);
} else {
throw new Error(`Cannot find the receiver with name '${this.delivery.link.name}'.`);

if (receiver.receiveMode !== ReceiveMode.peekLock) {
throw new Error("The operation is only supported in 'PeekLock' receive mode.");
}
if (this.delivery.remote_settled) {
throw new Error("This message has been already settled.");
}
return receiver.settleMessage(this, DispositionType.complete);
}
/**
* Abandons a message using it's lock token. This will make the message available again in
Expand Down Expand Up @@ -924,19 +921,16 @@ export class ServiceBusMessage implements ReceivedMessage {
return;
}
const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId);
if (receiver) {
if (receiver.receiveMode !== ReceiveMode.peekLock) {
throw new Error("The operation is only supported in 'PeekLock' receive mode.");
}
if (this.delivery.remote_settled) {
throw new Error("This message has been already settled.");
}
return receiver.settleMessage(this, DispositionType.abandon, {
propertiesToModify: propertiesToModify
});
} else {
throw new Error(`Cannot find the receiver with name '${this.delivery.link.name}'.`);

if (receiver.receiveMode !== ReceiveMode.peekLock) {
throw new Error("The operation is only supported in 'PeekLock' receive mode.");
}
if (this.delivery.remote_settled) {
throw new Error("This message has been already settled.");
}
return receiver.settleMessage(this, DispositionType.abandon, {
propertiesToModify: propertiesToModify
});
}

/**
Expand Down Expand Up @@ -966,19 +960,16 @@ export class ServiceBusMessage implements ReceivedMessage {
return;
}
const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId);
if (receiver) {
if (receiver.receiveMode !== ReceiveMode.peekLock) {
throw new Error("The operation is only supported in 'PeekLock' receive mode.");
}
if (this.delivery.remote_settled) {
throw new Error("This message has been already settled.");
}
return receiver.settleMessage(this, DispositionType.defer, {
propertiesToModify: propertiesToModify
});
} else {
throw new Error(`Cannot find the receiver with name '${this.delivery.link.name}'.`);

if (receiver.receiveMode !== ReceiveMode.peekLock) {
throw new Error("The operation is only supported in 'PeekLock' receive mode.");
}
if (this.delivery.remote_settled) {
throw new Error("This message has been already settled.");
}
return receiver.settleMessage(this, DispositionType.defer, {
propertiesToModify: propertiesToModify
});
}

/**
Expand Down Expand Up @@ -1018,19 +1009,16 @@ export class ServiceBusMessage implements ReceivedMessage {
return;
}
const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId);
if (receiver) {
if (receiver.receiveMode !== ReceiveMode.peekLock) {
throw new Error("The operation is only supported in 'PeekLock' receive mode.");
}
if (this.delivery.remote_settled) {
throw new Error("This message has been already settled.");
}
return receiver.settleMessage(this, DispositionType.deadletter, {
error: error
});
} else {
throw new Error(`Cannot find the receiver with name '${this.delivery.link.name}'.`);

if (receiver.receiveMode !== ReceiveMode.peekLock) {
throw new Error("The operation is only supported in 'PeekLock' receive mode.");
}
if (this.delivery.remote_settled) {
throw new Error("This message has been already settled.");
}
return receiver.settleMessage(this, DispositionType.deadletter, {
error: error
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,12 @@ export class MessageSession extends LinkEntity {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
const sbError = translate(receiverError);
if (sbError.name === "SessionLockLostError") {
this._context.expiredMessageSessions[this.sessionId!] = true;
sbError.message = `The session lock has expired on the session with id ${
this.sessionId
}.`;
}
log.error(
"[%s] An error occurred for Receiver '%s': %O.",
connectionId,
Expand Down Expand Up @@ -338,8 +344,12 @@ export class MessageSession extends LinkEntity {
const connectionId = this._context.namespace.connectionId;
const receiverError = context.receiver && context.receiver.error;
const receiver = this._receiver || context.receiver!;
let clearExpiredSessionFlag = true;
if (receiverError) {
const sbError = translate(receiverError);
if (sbError.name === "SessionLockLostError") {
clearExpiredSessionFlag = false;
}
log.error(
"[%s] 'receiver_close' event occurred for receiver '%s' for sessionId '%s'. " +
"The associated error is: %O",
Expand Down Expand Up @@ -379,6 +389,10 @@ export class MessageSession extends LinkEntity {
this.sessionId
);
}

if (this.sessionId && clearExpiredSessionFlag) {
delete this._context.expiredMessageSessions[this.sessionId];
}
};

this._onSessionClose = async (context: EventContext) => {
Expand Down Expand Up @@ -590,6 +604,10 @@ export class MessageSession extends LinkEntity {
}
}
return;
} finally {
if (this._receiver) {
this._receiver!.addCredit(1);
}
}

// If we've made it this far, then user's message handler completed fine. Let us try
Expand Down Expand Up @@ -624,7 +642,6 @@ export class MessageSession extends LinkEntity {
// setting the "message" event listener.
this._receiver.on(ReceiverEvents.message, onSessionMessage);
// adding credit
this._receiver!.setCreditWindow(this.maxConcurrentCallsPerSession);
this._receiver!.addCredit(this.maxConcurrentCallsPerSession);
} else {
this._isReceivingMessages = false;
Expand Down Expand Up @@ -1076,7 +1093,12 @@ export class MessageSession extends LinkEntity {
* @ignore
*/
private _ensureSessionLockRenewal(): void {
if (this.autoRenewLock && Date.now() < this._totalAutoLockRenewDuration && this.isOpen()) {
if (
this.autoRenewLock &&
new Date(this._totalAutoLockRenewDuration) > this.sessionLockedUntilUtc! &&
Date.now() < this._totalAutoLockRenewDuration &&
this.isOpen()
) {
const connectionId = this._context.namespace.connectionId;
const nextRenewalTimeout = calculateRenewAfterDuration(this.sessionLockedUntilUtc!);
this._sessionLockRenewalTimer = setTimeout(async () => {
Expand Down
22 changes: 12 additions & 10 deletions packages/@azure/servicebus/data-plane/lib/subscriptionClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,18 @@ export class SubscriptionClient extends Client {
*/
async getSessionReceiver(options?: SessionReceiverOptions): Promise<SessionReceiver> {
if (!options) options = {};
if (
options.sessionId &&
this._context.messageSessions[options.sessionId] &&
this._context.messageSessions[options.sessionId].isOpen()
) {
throw new Error(
`Close the current session receiver for sessionId ${
options.sessionId
} before using "getSessionReceiver" to create a new one for the same sessionId`
);
if (options.sessionId) {
if (
this._context.messageSessions[options.sessionId] &&
this._context.messageSessions[options.sessionId].isOpen()
) {
throw new Error(
`Close the current session receiver for sessionId ${
options.sessionId
} before using "getSessionReceiver" to create a new one for the same sessionId`
);
}
delete this._context.expiredMessageSessions[options.sessionId];
}
this._context.isSessionEnabled = true;
const messageSession = await MessageSession.create(this._context, options);
Expand Down
Loading

0 comments on commit 297716e

Please sign in to comment.