From 297716e2040c4ea117cebe004ad31baceeb2a0ce Mon Sep 17 00:00:00 2001 From: ramya0820 <45977823+ramya0820@users.noreply.github.com> Date: Mon, 11 Feb 2019 19:28:22 -0800 Subject: [PATCH] [Service Bus] Surface Session Lock Lost error to the user --- .../data-plane/lib/clientEntityContext.ts | 19 +- .../servicebus/data-plane/lib/queueClient.ts | 22 +- .../data-plane/lib/serviceBusMessage.ts | 80 +++--- .../data-plane/lib/session/messageSession.ts | 26 +- .../data-plane/lib/subscriptionClient.ts | 22 +- .../data-plane/test/renewLockSessions.spec.ts | 259 +++++++++++++----- 6 files changed, 292 insertions(+), 136 deletions(-) diff --git a/packages/@azure/servicebus/data-plane/lib/clientEntityContext.ts b/packages/@azure/servicebus/data-plane/lib/clientEntityContext.ts index 08d363a220fa..5f792716846a 100644 --- a/packages/@azure/servicebus/data-plane/lib/clientEntityContext.ts +++ b/packages/@azure/servicebus/data-plane/lib/clientEntityContext.ts @@ -56,6 +56,10 @@ export interface ClientEntityContextBase { * objects associated with this client. */ messageSessions: Dictionary; + /** + * @property {Dictionary} expiredMessageSessions A dictionary that stores expired message sessions IDs. + */ + expiredMessageSessions: Dictionary; /** * @property {MessageSender} [sender] The ServiceBus sender associated with the client entity. */ @@ -77,7 +81,7 @@ export interface ClientEntityContextBase { */ export interface ClientEntityContext extends ClientEntityContextBase { detached(error?: AmqpError | Error): Promise; - getReceiver(name: string, sessionId?: string): MessageReceiver | MessageSession | undefined; + getReceiver(name: string, sessionId?: string): MessageReceiver | MessageSession; } /** @@ -113,7 +117,8 @@ export namespace ClientEntityContext { entityPath: entityPath, requestResponseLockedMessages: new ConcurrentExpiringMap(), isSessionEnabled: !!options.isSessionEnabled, - messageSessions: {} + messageSessions: {}, + expiredMessageSessions: {} }; (entityContext as ClientEntityContext).sessionManager = new SessionManager( @@ -121,6 +126,14 @@ export namespace ClientEntityContext { ); (entityContext as ClientEntityContext).getReceiver = (name: string, sessionId?: string) => { + if (sessionId && entityContext.expiredMessageSessions[sessionId]) { + const error = new Error( + `The session lock has expired on the session with id ${sessionId}.` + ); + error.name = "SessionLockLostError"; + throw error; + } + let receiver: MessageReceiver | MessageSession | undefined = undefined; if ( sessionId != undefined && @@ -132,6 +145,8 @@ export namespace ClientEntityContext { receiver = entityContext.streamingReceiver; } else if (entityContext.batchingReceiver && entityContext.batchingReceiver.name === name) { receiver = entityContext.batchingReceiver; + } else { + throw new Error(`Cannot find the receiver with name '${name}'.`); } return receiver; }; diff --git a/packages/@azure/servicebus/data-plane/lib/queueClient.ts b/packages/@azure/servicebus/data-plane/lib/queueClient.ts index f8e34f270372..42f434898cd4 100644 --- a/packages/@azure/servicebus/data-plane/lib/queueClient.ts +++ b/packages/@azure/servicebus/data-plane/lib/queueClient.ts @@ -166,16 +166,18 @@ export class QueueClient extends Client { */ async getSessionReceiver(options?: SessionReceiverOptions): Promise { if (!options) options = {}; - if ( - options.sessionId && - this._context.messageSessions[options.sessionId] && - this._context.messageSessions[options.sessionId].isOpen() - ) { - throw new Error( - `Close the current session receiver for sessionId ${ - options.sessionId - } before using "getSessionReceiver" to create a new one for the same sessionId` - ); + if (options.sessionId) { + if ( + this._context.messageSessions[options.sessionId] && + this._context.messageSessions[options.sessionId].isOpen() + ) { + throw new Error( + `Close the current session receiver for sessionId ${ + options.sessionId + } before using "getSessionReceiver" to create a new one for the same sessionId` + ); + } + delete this._context.expiredMessageSessions[options.sessionId]; } this._context.isSessionEnabled = true; const messageSession = await MessageSession.create(this._context, options); diff --git a/packages/@azure/servicebus/data-plane/lib/serviceBusMessage.ts b/packages/@azure/servicebus/data-plane/lib/serviceBusMessage.ts index 083301927a12..eda08aefd42a 100644 --- a/packages/@azure/servicebus/data-plane/lib/serviceBusMessage.ts +++ b/packages/@azure/servicebus/data-plane/lib/serviceBusMessage.ts @@ -886,17 +886,14 @@ export class ServiceBusMessage implements ReceivedMessage { return; } const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId); - if (receiver) { - if (receiver.receiveMode !== ReceiveMode.peekLock) { - throw new Error("The operation is only supported in 'PeekLock' receive mode."); - } - if (this.delivery.remote_settled) { - throw new Error("This message has been already settled."); - } - return receiver.settleMessage(this, DispositionType.complete); - } else { - throw new Error(`Cannot find the receiver with name '${this.delivery.link.name}'.`); + + if (receiver.receiveMode !== ReceiveMode.peekLock) { + throw new Error("The operation is only supported in 'PeekLock' receive mode."); + } + if (this.delivery.remote_settled) { + throw new Error("This message has been already settled."); } + return receiver.settleMessage(this, DispositionType.complete); } /** * Abandons a message using it's lock token. This will make the message available again in @@ -924,19 +921,16 @@ export class ServiceBusMessage implements ReceivedMessage { return; } const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId); - if (receiver) { - if (receiver.receiveMode !== ReceiveMode.peekLock) { - throw new Error("The operation is only supported in 'PeekLock' receive mode."); - } - if (this.delivery.remote_settled) { - throw new Error("This message has been already settled."); - } - return receiver.settleMessage(this, DispositionType.abandon, { - propertiesToModify: propertiesToModify - }); - } else { - throw new Error(`Cannot find the receiver with name '${this.delivery.link.name}'.`); + + if (receiver.receiveMode !== ReceiveMode.peekLock) { + throw new Error("The operation is only supported in 'PeekLock' receive mode."); + } + if (this.delivery.remote_settled) { + throw new Error("This message has been already settled."); } + return receiver.settleMessage(this, DispositionType.abandon, { + propertiesToModify: propertiesToModify + }); } /** @@ -966,19 +960,16 @@ export class ServiceBusMessage implements ReceivedMessage { return; } const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId); - if (receiver) { - if (receiver.receiveMode !== ReceiveMode.peekLock) { - throw new Error("The operation is only supported in 'PeekLock' receive mode."); - } - if (this.delivery.remote_settled) { - throw new Error("This message has been already settled."); - } - return receiver.settleMessage(this, DispositionType.defer, { - propertiesToModify: propertiesToModify - }); - } else { - throw new Error(`Cannot find the receiver with name '${this.delivery.link.name}'.`); + + if (receiver.receiveMode !== ReceiveMode.peekLock) { + throw new Error("The operation is only supported in 'PeekLock' receive mode."); + } + if (this.delivery.remote_settled) { + throw new Error("This message has been already settled."); } + return receiver.settleMessage(this, DispositionType.defer, { + propertiesToModify: propertiesToModify + }); } /** @@ -1018,19 +1009,16 @@ export class ServiceBusMessage implements ReceivedMessage { return; } const receiver = this._context.getReceiver(this.delivery.link.name, this.sessionId); - if (receiver) { - if (receiver.receiveMode !== ReceiveMode.peekLock) { - throw new Error("The operation is only supported in 'PeekLock' receive mode."); - } - if (this.delivery.remote_settled) { - throw new Error("This message has been already settled."); - } - return receiver.settleMessage(this, DispositionType.deadletter, { - error: error - }); - } else { - throw new Error(`Cannot find the receiver with name '${this.delivery.link.name}'.`); + + if (receiver.receiveMode !== ReceiveMode.peekLock) { + throw new Error("The operation is only supported in 'PeekLock' receive mode."); + } + if (this.delivery.remote_settled) { + throw new Error("This message has been already settled."); } + return receiver.settleMessage(this, DispositionType.deadletter, { + error: error + }); } /** diff --git a/packages/@azure/servicebus/data-plane/lib/session/messageSession.ts b/packages/@azure/servicebus/data-plane/lib/session/messageSession.ts index dd1d6c4d23b6..6a43583ab910 100644 --- a/packages/@azure/servicebus/data-plane/lib/session/messageSession.ts +++ b/packages/@azure/servicebus/data-plane/lib/session/messageSession.ts @@ -309,6 +309,12 @@ export class MessageSession extends LinkEntity { const receiverError = context.receiver && context.receiver.error; if (receiverError) { const sbError = translate(receiverError); + if (sbError.name === "SessionLockLostError") { + this._context.expiredMessageSessions[this.sessionId!] = true; + sbError.message = `The session lock has expired on the session with id ${ + this.sessionId + }.`; + } log.error( "[%s] An error occurred for Receiver '%s': %O.", connectionId, @@ -338,8 +344,12 @@ export class MessageSession extends LinkEntity { const connectionId = this._context.namespace.connectionId; const receiverError = context.receiver && context.receiver.error; const receiver = this._receiver || context.receiver!; + let clearExpiredSessionFlag = true; if (receiverError) { const sbError = translate(receiverError); + if (sbError.name === "SessionLockLostError") { + clearExpiredSessionFlag = false; + } log.error( "[%s] 'receiver_close' event occurred for receiver '%s' for sessionId '%s'. " + "The associated error is: %O", @@ -379,6 +389,10 @@ export class MessageSession extends LinkEntity { this.sessionId ); } + + if (this.sessionId && clearExpiredSessionFlag) { + delete this._context.expiredMessageSessions[this.sessionId]; + } }; this._onSessionClose = async (context: EventContext) => { @@ -590,6 +604,10 @@ export class MessageSession extends LinkEntity { } } return; + } finally { + if (this._receiver) { + this._receiver!.addCredit(1); + } } // If we've made it this far, then user's message handler completed fine. Let us try @@ -624,7 +642,6 @@ export class MessageSession extends LinkEntity { // setting the "message" event listener. this._receiver.on(ReceiverEvents.message, onSessionMessage); // adding credit - this._receiver!.setCreditWindow(this.maxConcurrentCallsPerSession); this._receiver!.addCredit(this.maxConcurrentCallsPerSession); } else { this._isReceivingMessages = false; @@ -1076,7 +1093,12 @@ export class MessageSession extends LinkEntity { * @ignore */ private _ensureSessionLockRenewal(): void { - if (this.autoRenewLock && Date.now() < this._totalAutoLockRenewDuration && this.isOpen()) { + if ( + this.autoRenewLock && + new Date(this._totalAutoLockRenewDuration) > this.sessionLockedUntilUtc! && + Date.now() < this._totalAutoLockRenewDuration && + this.isOpen() + ) { const connectionId = this._context.namespace.connectionId; const nextRenewalTimeout = calculateRenewAfterDuration(this.sessionLockedUntilUtc!); this._sessionLockRenewalTimer = setTimeout(async () => { diff --git a/packages/@azure/servicebus/data-plane/lib/subscriptionClient.ts b/packages/@azure/servicebus/data-plane/lib/subscriptionClient.ts index df922258ea9d..5d940b8af83c 100644 --- a/packages/@azure/servicebus/data-plane/lib/subscriptionClient.ts +++ b/packages/@azure/servicebus/data-plane/lib/subscriptionClient.ts @@ -208,16 +208,18 @@ export class SubscriptionClient extends Client { */ async getSessionReceiver(options?: SessionReceiverOptions): Promise { if (!options) options = {}; - if ( - options.sessionId && - this._context.messageSessions[options.sessionId] && - this._context.messageSessions[options.sessionId].isOpen() - ) { - throw new Error( - `Close the current session receiver for sessionId ${ - options.sessionId - } before using "getSessionReceiver" to create a new one for the same sessionId` - ); + if (options.sessionId) { + if ( + this._context.messageSessions[options.sessionId] && + this._context.messageSessions[options.sessionId].isOpen() + ) { + throw new Error( + `Close the current session receiver for sessionId ${ + options.sessionId + } before using "getSessionReceiver" to create a new one for the same sessionId` + ); + } + delete this._context.expiredMessageSessions[options.sessionId]; } this._context.isSessionEnabled = true; const messageSession = await MessageSession.create(this._context, options); diff --git a/packages/@azure/servicebus/data-plane/test/renewLockSessions.spec.ts b/packages/@azure/servicebus/data-plane/test/renewLockSessions.spec.ts index 4bc78e1f9be6..f94ebf78f012 100644 --- a/packages/@azure/servicebus/data-plane/test/renewLockSessions.spec.ts +++ b/packages/@azure/servicebus/data-plane/test/renewLockSessions.spec.ts @@ -14,10 +14,20 @@ import { SubscriptionClient, ServiceBusMessage, MessagingError, - OnError + OnError, + delay } from "../lib"; -import { delay } from "rhea-promise"; -import { testSessionId1, purge, testMessagesWithSessions } from "./testUtils"; +import { + testMessagesWithSessions, + testSessionId1, + purge, + getSenderClient, + getReceiverClient, + ClientType +} from "./testUtils"; + +let senderClient: QueueClient | TopicClient; +let receiverClient: QueueClient | SubscriptionClient; describe("Standard", function(): void { const SERVICEBUS_CONNECTION_STRING = check( @@ -30,16 +40,11 @@ describe("Standard", function(): void { await namespace.close(); }); - const STANDARD_QUEUE_SESSION = - process.env.QUEUE_NAME_NO_PARTITION_SESSION || "unpartitioned-queue-sessions"; describe("Unpartitioned Queue", function(): void { - let senderClient: QueueClient; - let receiverClient: QueueClient; - - describe("Tests - Lock Renewal for Sessions- Peeklock Mode", function(): void { + describe("Tests - Lock Renewal for Sessions - Peeklock Mode", function(): void { beforeEach(async () => { - senderClient = namespace.createQueueClient(STANDARD_QUEUE_SESSION); - receiverClient = senderClient; + senderClient = getSenderClient(namespace, ClientType.UnpartitionedQueueWithSessions); + receiverClient = getReceiverClient(namespace, ClientType.UnpartitionedQueueWithSessions); await beforeEachTest(receiverClient); }); @@ -71,23 +76,48 @@ describe("Standard", function(): void { > { await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { maxSessionAutoRenewLockDurationInSeconds: 0, - delayBeforeAttemptingToCompleteMessageInSeconds: 31 + delayBeforeAttemptingToCompleteMessageInSeconds: 31, + expectSessionLockLostErrorToBeThrown: true + }); + }); + + it("Receive a msg using Streaming Receiver, lock will not expire until configured time", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 38, + delayBeforeAttemptingToCompleteMessageInSeconds: 35, + expectSessionLockLostErrorToBeThrown: false + }); + }); + + it("Receive a msg using Streaming Receiver, lock will expire sometime after the configured time", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 35, + delayBeforeAttemptingToCompleteMessageInSeconds: 80, + expectSessionLockLostErrorToBeThrown: true + }); + }).timeout(95000); + + it("Receive a msg using Streaming Receiver, lock renewal does not take place when config value is less than lock duration", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 15, + delayBeforeAttemptingToCompleteMessageInSeconds: 31, + expectSessionLockLostErrorToBeThrown: true }); - // Service bus completes the message even when the session lock expires. }); }); }); - const STANDARD_QUEUE_PARTITION_SESSION = - process.env.QUEUE_NAME_SESSION || "partitioned-queue-sessions"; describe("Partitioned Queue", function(): void { - let senderClient: QueueClient; - let receiverClient: QueueClient; - - describe("Tests - Lock Renewal for Sessions- Peeklock Mode", function(): void { + describe("Tests - Lock Renewal for Sessions - Peeklock Mode", function(): void { beforeEach(async () => { - senderClient = namespace.createQueueClient(STANDARD_QUEUE_PARTITION_SESSION); - receiverClient = senderClient; + senderClient = getSenderClient(namespace, ClientType.PartitionedQueueWithSessions); + receiverClient = getReceiverClient(namespace, ClientType.PartitionedQueueWithSessions); await beforeEachTest(receiverClient); }); @@ -119,28 +149,50 @@ describe("Standard", function(): void { > { await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { maxSessionAutoRenewLockDurationInSeconds: 0, - delayBeforeAttemptingToCompleteMessageInSeconds: 31 + delayBeforeAttemptingToCompleteMessageInSeconds: 31, + expectSessionLockLostErrorToBeThrown: true + }); + }); + + it("Receive a msg using Streaming Receiver, lock will not expire until configured time", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 38, + delayBeforeAttemptingToCompleteMessageInSeconds: 35, + expectSessionLockLostErrorToBeThrown: false + }); + }); + + it("Receive a msg using Streaming Receiver, lock will expire sometime after the configured time", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 35, + delayBeforeAttemptingToCompleteMessageInSeconds: 80, + expectSessionLockLostErrorToBeThrown: true + }); + }).timeout(95000); + + it("Receive a msg using Streaming Receiver, lock renewal does not take place when config value is less than lock duration", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 15, + delayBeforeAttemptingToCompleteMessageInSeconds: 31, + expectSessionLockLostErrorToBeThrown: true }); - // Service bus completes the message even when the session lock expires. }); }); }); - const STANDARD_TOPIC_SESSION = - process.env.TOPIC_NAME_NO_PARTITION_SESSION || "unpartitioned-topic-sessions"; - const STANDARD_SUBSCRIPTION_SESSION = - process.env.SUBSCRIPTION_NAME_NO_PARTITION_SESSION || - "unpartitioned-topic-sessions-subscription"; describe("Unpartitioned Topic/Subscription", function(): void { - let senderClient: TopicClient; - let receiverClient: SubscriptionClient; - - describe("Tests - Lock Renewal for Sessions- Peeklock Mode", function(): void { + describe("Tests - Lock Renewal for Sessions - Peeklock Mode", function(): void { beforeEach(async () => { - senderClient = namespace.createTopicClient(STANDARD_TOPIC_SESSION); - receiverClient = namespace.createSubscriptionClient( - STANDARD_TOPIC_SESSION, - STANDARD_SUBSCRIPTION_SESSION + senderClient = getSenderClient(namespace, ClientType.UnpartitionedTopicWithSessions); + receiverClient = getReceiverClient( + namespace, + ClientType.UnpartitionedSubscriptionWithSessions ); await beforeEachTest(receiverClient); }); @@ -173,27 +225,50 @@ describe("Standard", function(): void { > { await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { maxSessionAutoRenewLockDurationInSeconds: 0, - delayBeforeAttemptingToCompleteMessageInSeconds: 31 + delayBeforeAttemptingToCompleteMessageInSeconds: 31, + expectSessionLockLostErrorToBeThrown: true + }); + }); + + it("Receive a msg using Streaming Receiver, lock will not expire until configured time", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 38, + delayBeforeAttemptingToCompleteMessageInSeconds: 35, + expectSessionLockLostErrorToBeThrown: false + }); + }); + + it("Receive a msg using Streaming Receiver, lock will expire sometime after the configured time", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 35, + delayBeforeAttemptingToCompleteMessageInSeconds: 80, + expectSessionLockLostErrorToBeThrown: true + }); + }).timeout(95000); + + it("Receive a msg using Streaming Receiver, lock renewal does not take place when config value is less than lock duration", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 15, + delayBeforeAttemptingToCompleteMessageInSeconds: 31, + expectSessionLockLostErrorToBeThrown: true }); - // Service bus completes the message even when the session lock expires. }); }); }); - const STANDARD_TOPIC_PARTITION_SESSION = - process.env.TOPIC_NAME_SESSION || "partitioned-topic-sessions"; - const STANDARD_SUBSCRIPTION_PARTITION_SESSION = - process.env.SUBSCRIPTION_NAME_SESSION || "partitioned-topic-sessions-subscription"; describe("Partitioned Topic/Subscription", function(): void { - let senderClient: TopicClient; - let receiverClient: SubscriptionClient; - - describe("Tests - Lock Renewal for Sessions- Peeklock Mode", function(): void { + describe("Tests - Lock Renewal for Sessions - Peeklock Mode", function(): void { beforeEach(async () => { - senderClient = namespace.createTopicClient(STANDARD_TOPIC_PARTITION_SESSION); - receiverClient = namespace.createSubscriptionClient( - STANDARD_TOPIC_PARTITION_SESSION, - STANDARD_SUBSCRIPTION_PARTITION_SESSION + senderClient = getSenderClient(namespace, ClientType.PartitionedTopicWithSessions); + receiverClient = getReceiverClient( + namespace, + ClientType.PartitionedSubscriptionWithSessions ); await beforeEachTest(receiverClient); }); @@ -226,9 +301,39 @@ describe("Standard", function(): void { > { await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { maxSessionAutoRenewLockDurationInSeconds: 0, - delayBeforeAttemptingToCompleteMessageInSeconds: 31 + delayBeforeAttemptingToCompleteMessageInSeconds: 31, + expectSessionLockLostErrorToBeThrown: true + }); + }); + + it("Receive a msg using Streaming Receiver, lock will not expire until configured time", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 38, + delayBeforeAttemptingToCompleteMessageInSeconds: 35, + expectSessionLockLostErrorToBeThrown: false + }); + }); + + it("Receive a msg using Streaming Receiver, lock will expire sometime after the configured time", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 35, + delayBeforeAttemptingToCompleteMessageInSeconds: 80, + expectSessionLockLostErrorToBeThrown: true + }); + }).timeout(95000); + + it("Receive a msg using Streaming Receiver, lock renewal does not take place when config value is less than lock duration", async function(): Promise< + void + > { + await testAutoLockRenewalConfigBehavior(senderClient, receiverClient, { + maxSessionAutoRenewLockDurationInSeconds: 15, + delayBeforeAttemptingToCompleteMessageInSeconds: 31, + expectSessionLockLostErrorToBeThrown: true }); - // Complete fails as expected }); }); }); @@ -323,9 +428,8 @@ async function testBatchReceiverManualLockRenewalErrorOnLockExpiry( await delay(lockDurationInMilliseconds + 1000); let errorWasThrown: boolean = false; - await msgs[0].complete().catch((err: any) => { - should.equal(err.name, "Error"); - should.equal(!(err.message.search("Cannot find the receiver with name") + 1), false); + await msgs[0].complete().catch((err) => { + should.equal(err.name, "SessionLockLostError"); errorWasThrown = true; }); @@ -405,6 +509,7 @@ async function testStreamingReceiverManualLockRenewalHappyCase( interface AutoLockRenewalTestOptions { maxSessionAutoRenewLockDurationInSeconds: number | undefined; delayBeforeAttemptingToCompleteMessageInSeconds: number; + expectSessionLockLostErrorToBeThrown: boolean; } async function testAutoLockRenewalConfigBehavior( @@ -420,6 +525,9 @@ async function testAutoLockRenewalConfigBehavior( sessionId: testSessionId1, maxSessionAutoRenewLockDurationInSeconds: options.maxSessionAutoRenewLockDurationInSeconds }); + + let sessionLockLostErrorThrown = false; + const messagesReceived: ServiceBusMessage[] = []; await sessionClient.receive( async (brokeredMessage: ServiceBusMessage) => { if (numOfMessagesReceived < 1) { @@ -428,30 +536,49 @@ async function testAutoLockRenewalConfigBehavior( should.equal(brokeredMessage.body, testMessagesWithSessions.body); should.equal(brokeredMessage.messageId, testMessagesWithSessions.messageId); + messagesReceived.push(brokeredMessage); + // Sleeping... await delay(options.delayBeforeAttemptingToCompleteMessageInSeconds * 1000); - - let errorWasThrown: boolean = false; - await brokeredMessage.complete().catch((err) => { - errorWasThrown = true; // Service bus completes the message even when the session lock expires. - }); - - should.equal(errorWasThrown, false, "Error Thrown flag value mismatch"); } }, - onError, + (err: MessagingError | Error) => { + if (err.name === "SessionLockLostError") { + sessionLockLostErrorThrown = true; + } else { + onError(err); + } + }, { autoComplete: false } ); - await delay(options.delayBeforeAttemptingToCompleteMessageInSeconds * 1000 + 4000); + await delay(options.delayBeforeAttemptingToCompleteMessageInSeconds * 1000 + 2000); + should.equal( + sessionLockLostErrorThrown, + options.expectSessionLockLostErrorToBeThrown, + "SessionLockLostErrorThrown flag must match" + ); + + should.equal(messagesReceived.length, 1, "Mismatch in number of messages received"); + + let errorWasThrown: boolean = false; + await messagesReceived[0].complete().catch((err) => { + should.equal(err.name, "SessionLockLostError"); + errorWasThrown = true; + }); + + should.equal( + errorWasThrown, + options.expectSessionLockLostErrorToBeThrown, + "Error Thrown flag value mismatch" + ); + await sessionClient.close(); if (uncaughtErrorFromHandlers) { chai.assert.fail(uncaughtErrorFromHandlers.message); } - - should.equal(numOfMessagesReceived, 1, "Mismatch in number of messages received"); } // Helper functions