From daac149ea06c604cebaa8b7a05e7dffd8df4785f Mon Sep 17 00:00:00 2001 From: Ramya Rao Date: Fri, 11 Jan 2019 16:55:12 -0800 Subject: [PATCH] Refactor tests to reduce duplication of code (#163) --- test/batchReceiver.spec.ts | 1203 +++++++++++++++--------------------- 1 file changed, 500 insertions(+), 703 deletions(-) diff --git a/test/batchReceiver.spec.ts b/test/batchReceiver.spec.ts index b3e33632e335..6bb058336a48 100644 --- a/test/batchReceiver.spec.ts +++ b/test/batchReceiver.spec.ts @@ -15,7 +15,8 @@ import { generateUuid, TopicClient, SubscriptionClient, - delay + delay, + ServiceBusMessage } from "../lib"; const testMessages: SendableMessageInfo[] = [ @@ -38,767 +39,472 @@ async function testPeekMsgsLength( } const maxDeliveryCount = 10; - -describe("ReceiveBatch from Queue/Subscription", function(): void { - let namespace: Namespace; - let queueClient: QueueClient; - let topicClient: TopicClient; - let subscriptionClient: SubscriptionClient; - let errorWasThrown: boolean; - - beforeEach(async () => { - // The tests in this file expect the env variables to contain the connection string and - // the names of empty queue/topic/subscription that are to be tested - - if (!process.env.SERVICEBUS_CONNECTION_STRING) { - throw new Error( - "Define SERVICEBUS_CONNECTION_STRING in your environment before running integration tests." - ); - } - if (!process.env.TOPIC_NAME) { - throw new Error("Define TOPIC_NAME in your environment before running integration tests."); - } - if (!process.env.QUEUE_NAME) { - throw new Error("Define QUEUE_NAME in your environment before running integration tests."); - } - if (!process.env.SUBSCRIPTION_NAME) { - throw new Error( - "Define SUBSCRIPTION_NAME in your environment before running integration tests." - ); - } - - namespace = Namespace.createFromConnectionString(process.env.SERVICEBUS_CONNECTION_STRING); - queueClient = namespace.createQueueClient(process.env.QUEUE_NAME); - topicClient = namespace.createTopicClient(process.env.TOPIC_NAME); - subscriptionClient = namespace.createSubscriptionClient( - process.env.TOPIC_NAME, - process.env.SUBSCRIPTION_NAME +let namespace: Namespace; +let queueClient: QueueClient; +let topicClient: TopicClient; +let subscriptionClient: SubscriptionClient; +let deadletterQueueClient: QueueClient; +let deadletterSubscriptionClient: SubscriptionClient; +let errorWasThrown: boolean; + +async function beforeEachTest(): Promise { + // The tests in this file expect the env variables to contain the connection string and + // the names of empty queue/topic/subscription that are to be tested + + if (!process.env.SERVICEBUS_CONNECTION_STRING) { + throw new Error( + "Define SERVICEBUS_CONNECTION_STRING in your environment before running integration tests." + ); + } + if (!process.env.TOPIC_NAME) { + throw new Error("Define TOPIC_NAME in your environment before running integration tests."); + } + if (!process.env.QUEUE_NAME) { + throw new Error("Define QUEUE_NAME in your environment before running integration tests."); + } + if (!process.env.SUBSCRIPTION_NAME) { + throw new Error( + "Define SUBSCRIPTION_NAME in your environment before running integration tests." ); - - const peekedQueueMsg = await queueClient.peek(); - if (peekedQueueMsg.length) { - throw new Error("Please use an empty queue for integration testing"); - } - - const peekedSubscriptionMsg = await subscriptionClient.peek(); - if (peekedSubscriptionMsg.length) { - throw new Error("Please use an empty Subscription for integration testing"); - } - errorWasThrown = false; - }); - - afterEach(async () => { - return namespace.close(); - }); - - async function getdeadletterQueueClient(): Promise { - const deadLetterQueuePath = Namespace.getDeadLetterQueuePathForQueue(queueClient.name); - const deadletterQueueClient = namespace.createQueueClient(deadLetterQueuePath); - return deadletterQueueClient; } - async function getdeadletterSubscriptionClient(): Promise { - const deadLetterSubscriptionPath = Namespace.getDeadLetterSubcriptionPathForSubcription( + namespace = Namespace.createFromConnectionString(process.env.SERVICEBUS_CONNECTION_STRING); + queueClient = namespace.createQueueClient(process.env.QUEUE_NAME); + topicClient = namespace.createTopicClient(process.env.TOPIC_NAME); + subscriptionClient = namespace.createSubscriptionClient( + process.env.TOPIC_NAME, + process.env.SUBSCRIPTION_NAME + ); + deadletterQueueClient = namespace.createQueueClient( + Namespace.getDeadLetterQueuePathForQueue(queueClient.name) + ); + deadletterSubscriptionClient = namespace.createSubscriptionClient( + Namespace.getDeadLetterSubcriptionPathForSubcription( topicClient.name, subscriptionClient.subscriptionName - ); + ), + subscriptionClient.subscriptionName + ); - const deadletterSubscriptionClient = namespace.createSubscriptionClient( - deadLetterSubscriptionPath ? deadLetterSubscriptionPath : "", - subscriptionClient.subscriptionName - ); - return deadletterSubscriptionClient; + const peekedQueueMsg = await queueClient.peek(); + if (peekedQueueMsg.length) { + throw new Error("Please use an empty queue for integration testing"); } - it("PeekLock: complete() removes msg from Queue", async function(): Promise { - await queueClient.send(testMessages[0]); - const msgs = await queueClient.receiveBatch(1); - - should.equal(Array.isArray(msgs), true); - should.equal(msgs.length, 1); - should.equal(msgs[0].body, testMessages[0].body); - should.equal(msgs[0].messageId, testMessages[0].messageId); - - await msgs[0].complete(); - - await testPeekMsgsLength(queueClient, 0); - }); - - it("PeekLock: complete() removes msg from Subscription", async function(): Promise { - await topicClient.send(testMessages[0]); - const msgs = await subscriptionClient.receiveBatch(1); - - should.equal(Array.isArray(msgs), true); - should.equal(msgs.length, 1); - should.equal(msgs[0].body, testMessages[0].body); - should.equal(msgs[0].messageId, testMessages[0].messageId); - - await msgs[0].complete(); - - await testPeekMsgsLength(subscriptionClient, 0); - }); - - // We test for mutilple receiveBatch specifically to ensure that batchingRecevier on a client is reused - // See https://github.com/Azure/azure-service-bus-node/issues/31 - it("Multiple receiveBatch using Queues", async function(): Promise { - await queueClient.sendBatch(testMessages); - const msgs1 = await queueClient.receiveBatch(1); - const msgs2 = await queueClient.receiveBatch(1); - - // Results are checked after both receiveBatches are done to ensure that the second call doesnt - // affect the result from the first one. - should.equal(Array.isArray(msgs1), true); - should.equal(msgs1.length, 1); - should.equal(msgs1[0].body, testMessages[0].body); - should.equal(msgs1[0].messageId, testMessages[0].messageId); - - should.equal(Array.isArray(msgs2), true); - should.equal(msgs2.length, 1); - should.equal(msgs2[0].body, testMessages[1].body); - should.equal(msgs2[0].messageId, testMessages[1].messageId); - - await msgs1[0].complete(); - await msgs2[0].complete(); - - await testPeekMsgsLength(queueClient, 0); - }); - - // We test for mutilple receiveBatch specifically to ensure that batchingRecevier on a client is reused - // See https://github.com/Azure/azure-service-bus-node/issues/31 - it("Multiple receiveBatch using Topics and Subscriptions", async function(): Promise { - await topicClient.sendBatch(testMessages); - const msgs1 = await subscriptionClient.receiveBatch(1); - const msgs2 = await subscriptionClient.receiveBatch(1); - - // Results are checked after both receiveBatches are done to ensure that the second call doesnt - // affect the result from the first one. - should.equal(Array.isArray(msgs1), true); - should.equal(msgs1.length, 1); - should.equal(msgs1[0].body, testMessages[0].body); - should.equal(msgs1[0].messageId, testMessages[0].messageId); - - should.equal(Array.isArray(msgs2), true); - should.equal(msgs2.length, 1); - should.equal(msgs2[0].body, testMessages[1].body); - should.equal(msgs2[0].messageId, testMessages[1].messageId); - - await msgs1[0].complete(); - await msgs2[0].complete(); - - await testPeekMsgsLength(subscriptionClient, 0); - }); - - it("Abandoned message is retained in the Queue with incremented deliveryCount", async function(): Promise< - void - > { - await queueClient.send(testMessages[0]); - - let receivedMsgs = await queueClient.receiveBatch(1); - - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].deliveryCount, 0); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - - // TODO: This is taking 20 seconds. Why? - await receivedMsgs[0].abandon(); - - await testPeekMsgsLength(queueClient, 1); - - receivedMsgs = await queueClient.receiveBatch(1); - - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].deliveryCount, 1); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - - await receivedMsgs[0].complete(); - }); - - it("Abandoned message is retained in the Subscription with incremented deliveryCount", async function(): Promise< - void - > { - await topicClient.send(testMessages[0]); - - let receivedMsgs = await subscriptionClient.receiveBatch(1); - - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].deliveryCount, 0); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - - // TODO: This is taking 20 seconds. Why? - await receivedMsgs[0].abandon(); - - await testPeekMsgsLength(subscriptionClient, 1); - - receivedMsgs = await subscriptionClient.receiveBatch(1); - - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].deliveryCount, 1); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - - await receivedMsgs[0].complete(); - }); - - it("Message abandoned more than maxDeliveryCount goes to dead letter queue", async function(): Promise< - void - > { - await queueClient.send(testMessages[0]); - let abandonMsgCount = 0; - - while (abandonMsgCount < maxDeliveryCount) { - const receivedMsgs = await queueClient.receiveBatch(1); - - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - should.equal(receivedMsgs[0].deliveryCount, abandonMsgCount); - abandonMsgCount++; - - await receivedMsgs[0].abandon(); - } - - await testPeekMsgsLength(queueClient, 0); - - const deadletterQueueClient = await getdeadletterQueueClient(); - const deadLetterMsgs = await deadletterQueueClient.receiveBatch(1); - - should.equal(Array.isArray(deadLetterMsgs), true); - should.equal(deadLetterMsgs.length, 1); - should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); - - await deadLetterMsgs[0].complete(); - - await testPeekMsgsLength(deadletterQueueClient, 0); - }); - - it("Message abandoned more than maxDeliveryCount goes to dead letter subscriptions", async function(): Promise< - void - > { - await topicClient.send(testMessages[0]); - let abandonMsgCount = 0; - - while (abandonMsgCount < maxDeliveryCount) { - const receivedMsgs = await subscriptionClient.receiveBatch(1); - - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - should.equal(receivedMsgs[0].deliveryCount, abandonMsgCount); - abandonMsgCount++; - - await receivedMsgs[0].abandon(); - } - - await testPeekMsgsLength(subscriptionClient, 0); - - const deadletterSubscriptionClient = await getdeadletterSubscriptionClient(); - - const deadLetterMsgs = await deadletterSubscriptionClient.receiveBatch(1); - - should.equal(Array.isArray(deadLetterMsgs), true); - should.equal(deadLetterMsgs.length, 1); - should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); + const peekedSubscriptionMsg = await subscriptionClient.peek(); + if (peekedSubscriptionMsg.length) { + throw new Error("Please use an empty Subscription for integration testing"); + } + errorWasThrown = false; +} - await deadLetterMsgs[0].complete(); +async function afterEachTest(): Promise { + await namespace.close(); +} - await testPeekMsgsLength(deadletterSubscriptionClient, 0); +describe("Complete/Abandon/Defer/Deadletter normal message", () => { + beforeEach(async () => { + await beforeEachTest(); }); - it("Receive deferred message from queue", async function(): Promise { - await queueClient.sendBatch(testMessages); - const msgs = await queueClient.receiveBatch(1); - - should.equal(Array.isArray(msgs), true); - should.equal(msgs.length, 1); - should.equal(msgs[0].body, testMessages[0].body); - should.equal(msgs[0].messageId, testMessages[0].messageId); - - if (!msgs[0].sequenceNumber) { - throw "Sequence Number can not be null"; - } - const sequenceNumber = msgs[0].sequenceNumber; - await msgs[0].defer(); - - const receivedMsgs = await queueClient.receiveBatch(1); - should.equal(Array.isArray(receivedMsgs), true); - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body === testMessages[0].body, false); - should.equal(receivedMsgs[0].messageId === testMessages[0].messageId, false); - await receivedMsgs[0].complete(); - - const deferredMsgs = await queueClient.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { - throw "No message received for sequence number"; - } - should.equal(deferredMsgs.body, testMessages[0].body); - should.equal(deferredMsgs.messageId, testMessages[0].messageId); - - await deferredMsgs.complete(); - - await testPeekMsgsLength(queueClient, 0); + afterEach(async () => { + await afterEachTest(); }); - it("Receive deferred message from subscription", async function(): Promise { - await topicClient.sendBatch(testMessages); - const msgs = await subscriptionClient.receiveBatch(1); + async function sendReceiveMsg( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + await senderClient.send(testMessages[0]); + const msgs = await receiverClient.receiveBatch(1); should.equal(Array.isArray(msgs), true); should.equal(msgs.length, 1); should.equal(msgs[0].body, testMessages[0].body); should.equal(msgs[0].messageId, testMessages[0].messageId); + should.equal(msgs[0].deliveryCount, 0); - if (!msgs[0].sequenceNumber) { - throw "Sequence Number can not be null"; - } - const sequenceNumber = msgs[0].sequenceNumber; - await msgs[0].defer(); - - const receivedMsgs = await subscriptionClient.receiveBatch(1); - should.equal(Array.isArray(receivedMsgs), true); - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body === testMessages[0].body, false); - should.equal(receivedMsgs[0].messageId === testMessages[0].messageId, false); - await receivedMsgs[0].complete(); - - const deferredMsgs = await subscriptionClient.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { - throw "No message received for sequence number"; - } - should.equal(deferredMsgs.body, testMessages[0].body); - should.equal(deferredMsgs.messageId, testMessages[0].messageId); - - await deferredMsgs.complete(); - - await testPeekMsgsLength(subscriptionClient, 0); - }); - - it("Receive dead letter message from queue", async function(): Promise { - await queueClient.send(testMessages[0]); - - const receivedMsgs = await queueClient.receiveBatch(1); - - should.equal(Array.isArray(receivedMsgs), true); - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - - await receivedMsgs[0].deadLetter(); - - await testPeekMsgsLength(queueClient, 0); - - const deadletterQueueClient = await getdeadletterQueueClient(); - const deadLetterMsgs = await deadletterQueueClient.receiveBatch(1); - - should.equal(Array.isArray(deadLetterMsgs), true); - should.equal(deadLetterMsgs.length, 1); - should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); - - await deadLetterMsgs[0].complete(); - - await testPeekMsgsLength(deadletterQueueClient, 0); - }); - - it("Receive dead letter message from subscription", async function(): Promise { - await topicClient.send(testMessages[0]); - - const receivedMsgs = await subscriptionClient.receiveBatch(1); - - should.equal(Array.isArray(receivedMsgs), true); - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - - await receivedMsgs[0].deadLetter(); - - await testPeekMsgsLength(subscriptionClient, 0); - - const deadletterSubscriptionClient = await getdeadletterSubscriptionClient(); - - const deadLetterMsgs = await deadletterSubscriptionClient.receiveBatch(1); - - should.equal(Array.isArray(deadLetterMsgs), true); - should.equal(deadLetterMsgs.length, 1); - should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); - - await deadLetterMsgs[0].complete(); - - await testPeekMsgsLength(deadletterSubscriptionClient, 0); - }); - - it("No settlement of the message is retained in the Queue with incremented deliveryCount", async function(): Promise< - void - > { - await queueClient.send(testMessages[0]); - - let receivedMsgs = await queueClient.receiveBatch(1); - - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].deliveryCount, 0); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - - await testPeekMsgsLength(queueClient, 1); - - receivedMsgs = await queueClient.receiveBatch(1); - - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].deliveryCount, 1); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - - await receivedMsgs[0].complete(); - }); - - it("No settlement of the message is retained in the Subscriptions with incremented deliveryCount", async function(): Promise< - void - > { - await topicClient.send(testMessages[0]); - let receivedMsgs = await subscriptionClient.receiveBatch(1); - - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].deliveryCount, 0); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - - await testPeekMsgsLength(subscriptionClient, 1); + return msgs[0]; + } - receivedMsgs = await subscriptionClient.receiveBatch(1); + async function completeMessages( + receiverClient: QueueClient | SubscriptionClient, + expectedDeliverCount: number + ): Promise { + const receivedMsgs = await receiverClient.receiveBatch(1); should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].deliveryCount, 1); + should.equal(receivedMsgs[0].deliveryCount, expectedDeliverCount); should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); await receivedMsgs[0].complete(); - }); - it("Receive n messages but queue only has m messages, where m < n", async function(): Promise< - void - > { - await queueClient.send(testMessages[0]); - const receivedMsgs = await queueClient.receiveBatch(2); + await testPeekMsgsLength(receiverClient, 0); + } - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); + async function testComplete( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + const msg = await sendReceiveMsg(senderClient, receiverClient); + await msg.complete(); - await receivedMsgs[0].complete(); + await testPeekMsgsLength(receiverClient, 0); + } - await testPeekMsgsLength(queueClient, 0); + it("Queue: complete() removes message", async function(): Promise { + await testComplete(queueClient, queueClient); }); - it("Receive n messages but subscription only has m messages, where m < n", async function(): Promise< - void - > { - await topicClient.send(testMessages[0]); - const receivedMsgs = await subscriptionClient.receiveBatch(2); - - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - - await receivedMsgs[0].complete(); - - await testPeekMsgsLength(subscriptionClient, 0); + it("Queue: complete() removes message", async function(): Promise { + await testComplete(topicClient, subscriptionClient); }); - async function deferMsgAndGetSequenceNum( - client: QueueClient | SubscriptionClient - ): Promise { - const receivedMsgs = await client.receiveBatch(1); + async function testAbandon( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + const msg = await sendReceiveMsg(senderClient, receiverClient); + await msg.abandon(); - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].deliveryCount, 0); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); + await testPeekMsgsLength(receiverClient, 1); - if (!receivedMsgs[0].sequenceNumber) { - throw "Sequence Number can not be null"; - } - const sequenceNumber = receivedMsgs[0].sequenceNumber; - await receivedMsgs[0].defer(); - return sequenceNumber; + await completeMessages(receiverClient, 1); } - it("Abandoning a deferred message returns it to deferred queue.", async function(): Promise< + it("Queue: Abandoned message is retained with incremented deliveryCount", async function(): Promise< void > { - await queueClient.send(testMessages[0]); - const sequenceNumber = await deferMsgAndGetSequenceNum(queueClient); - - const deferredMsgs = await queueClient.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { - throw "No message received for sequence number"; - } - should.equal(deferredMsgs.body, testMessages[0].body); - should.equal(deferredMsgs.deliveryCount, 1); - should.equal(deferredMsgs.messageId, testMessages[0].messageId); - - await deferredMsgs.abandon(); - - await testPeekMsgsLength(queueClient, 1); - - const abandonedMsgs = await queueClient.receiveDeferredMessage(sequenceNumber); - if (!abandonedMsgs) { - throw "No message received for sequence number"; - } - should.equal(abandonedMsgs.body, testMessages[0].body); - should.equal(abandonedMsgs.deliveryCount, 2); - should.equal(abandonedMsgs.messageId, testMessages[0].messageId); - - await abandonedMsgs.complete(); - - await testPeekMsgsLength(queueClient, 0); + await testAbandon(queueClient, queueClient); }); - it("Abandoning a deferred message returns it to deferred subscription.", async function(): Promise< + it("Subscription: Abandoned message is retained with incremented deliveryCount", async function(): Promise< void > { - await topicClient.send(testMessages[0]); - const sequenceNumber = await deferMsgAndGetSequenceNum(subscriptionClient); - - const deferredMsgs = await subscriptionClient.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { - throw "No message received for sequence number"; - } - should.equal(deferredMsgs.body, testMessages[0].body); - should.equal(deferredMsgs.deliveryCount, 1); - should.equal(deferredMsgs.messageId, testMessages[0].messageId); - - await deferredMsgs.abandon(); - - await testPeekMsgsLength(subscriptionClient, 1); - - const abandonedMsgs = await subscriptionClient.receiveDeferredMessage(sequenceNumber); - if (!abandonedMsgs) { - throw "No message received for sequence number"; - } - should.equal(abandonedMsgs.body, testMessages[0].body); - should.equal(abandonedMsgs.deliveryCount, 2); - should.equal(abandonedMsgs.messageId, testMessages[0].messageId); - - await abandonedMsgs.complete(); - - await testPeekMsgsLength(subscriptionClient, 0); + await testAbandon(topicClient, subscriptionClient); }); - it("Deadlettering a deferred message moves it to dead letter queue.", async function(): Promise< - void - > { - await queueClient.send(testMessages[0]); - const sequenceNumber = await deferMsgAndGetSequenceNum(queueClient); - - const deferredMsgs = await queueClient.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { - throw "No message received for sequence number"; - } - should.equal(deferredMsgs.body, testMessages[0].body); - should.equal(deferredMsgs.messageId, testMessages[0].messageId); - - await deferredMsgs.deadLetter(); - - await testPeekMsgsLength(queueClient, 0); - - const deadletterQueueClient = await getdeadletterQueueClient(); - const deadLetterMsgs = await deadletterQueueClient.receiveBatch(1); - - should.equal(deadLetterMsgs.length, 1); - should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); - - await deadLetterMsgs[0].complete(); - - await testPeekMsgsLength(deadletterQueueClient, 0); - }); + async function testDefer( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + const msg = await sendReceiveMsg(senderClient, receiverClient); - it("Deadlettering a deferred message moves it to dead letter subscription.", async function(): Promise< - void - > { - await topicClient.send(testMessages[0]); - const sequenceNumber = await deferMsgAndGetSequenceNum(subscriptionClient); + if (!msg.sequenceNumber) { + throw "Sequence Number can not be null"; + } + const sequenceNumber = msg.sequenceNumber; + await msg.defer(); - const deferredMsgs = await subscriptionClient.receiveDeferredMessage(sequenceNumber); + const deferredMsgs = await receiverClient.receiveDeferredMessage(sequenceNumber); if (!deferredMsgs) { throw "No message received for sequence number"; } should.equal(deferredMsgs.body, testMessages[0].body); should.equal(deferredMsgs.messageId, testMessages[0].messageId); + should.equal(deferredMsgs.deliveryCount, 1); + + await deferredMsgs.complete(); - await deferredMsgs.deadLetter(); + await testPeekMsgsLength(receiverClient, 0); + } - await testPeekMsgsLength(subscriptionClient, 0); + it("Queue: Receive deferred message from queue/subscription", async function(): Promise { + await testDefer(queueClient, queueClient); + }); - const deadletterSubscriptionClient = await getdeadletterSubscriptionClient(); + it("Subscription: Receive deferred message from queue/subscription", async function(): Promise< + void + > { + await testDefer(topicClient, subscriptionClient); + }); - const deadLetterMsgs = await deadletterSubscriptionClient.receiveBatch(1); + async function testDeadletter( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + deadLetterClient: QueueClient | SubscriptionClient + ): Promise { + const msg = await sendReceiveMsg(senderClient, receiverClient); + await msg.deadLetter(); - should.equal(deadLetterMsgs.length, 1); - should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); + await testPeekMsgsLength(receiverClient, 0); - await deadLetterMsgs[0].complete(); - await testPeekMsgsLength(deadletterSubscriptionClient, 0); + await completeMessages(deadLetterClient, 0); + } + + it("Queue: Receive dead letter message from queue/subscription", async function(): Promise { + await testDeadletter(queueClient, queueClient, deadletterQueueClient); }); - it("Deferring a deferred message puts it back to the deferred queue.", async function(): Promise< + it("Subscription: Receive dead letter message from queue/subscription", async function(): Promise< void > { - await queueClient.send(testMessages[0]); - const sequenceNumber = await deferMsgAndGetSequenceNum(queueClient); + await testDeadletter(topicClient, subscriptionClient, deadletterSubscriptionClient); + }); +}); - let deferredMsgs = await queueClient.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { - throw "No message received for sequence number"; - } +describe("Abandon/Defer/Deadletter deferred message", () => { + beforeEach(async () => { + await beforeEachTest(); + }); - should.equal(deferredMsgs.body, testMessages[0].body); - should.equal(deferredMsgs.deliveryCount, 1); - should.equal(deferredMsgs.messageId, testMessages[0].messageId); + afterEach(async () => { + await afterEachTest(); + }); - await deferredMsgs.defer(); + async function deferMessage( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + await senderClient.send(testMessages[0]); + const receivedMsgs = await receiverClient.receiveBatch(1); + + should.equal(receivedMsgs.length, 1); + should.equal(receivedMsgs[0].body, testMessages[0].body); + should.equal(receivedMsgs[0].deliveryCount, 0); + should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - await testPeekMsgsLength(queueClient, 1); + if (!receivedMsgs[0].sequenceNumber) { + throw "Sequence Number can not be null"; + } + const sequenceNumber = receivedMsgs[0].sequenceNumber; + await receivedMsgs[0].defer(); - deferredMsgs = await queueClient.receiveDeferredMessage(sequenceNumber); + const deferredMsgs = await receiverClient.receiveDeferredMessage(sequenceNumber); if (!deferredMsgs) { throw "No message received for sequence number"; } - should.equal(deferredMsgs.body, testMessages[0].body); - should.equal(deferredMsgs.deliveryCount, 2); should.equal(deferredMsgs.messageId, testMessages[0].messageId); + should.equal(deferredMsgs.deliveryCount, 1); - await deferredMsgs.complete(); - - await testPeekMsgsLength(queueClient, 0); - }); + return deferredMsgs; + } - it("Deferring a deferred message puts it back to the deferred subscription.", async function(): Promise< - void - > { - await topicClient.send(testMessages[0]); - const sequenceNumber = await deferMsgAndGetSequenceNum(subscriptionClient); + async function completeDeferredMessage( + receiverClient: QueueClient | SubscriptionClient, + sequenceNumber: Long, + expectedDeliverCount: number + ): Promise { + await testPeekMsgsLength(receiverClient, 1); - let deferredMsgs = await subscriptionClient.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { + const deferredMsg = await receiverClient.receiveDeferredMessage(sequenceNumber); + if (!deferredMsg) { throw "No message received for sequence number"; } - should.equal(deferredMsgs.body, testMessages[0].body); - should.equal(deferredMsgs.deliveryCount, 1); - should.equal(deferredMsgs.messageId, testMessages[0].messageId); - await deferredMsgs.defer(); + should.equal(deferredMsg.body, testMessages[0].body); + should.equal(deferredMsg.deliveryCount, expectedDeliverCount); + should.equal(deferredMsg.messageId, testMessages[0].messageId); - await testPeekMsgsLength(subscriptionClient, 1); + await deferredMsg.complete(); - deferredMsgs = await subscriptionClient.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { - throw "No message received for sequence number"; - } - should.equal(deferredMsgs.body, testMessages[0].body); - should.equal(deferredMsgs.deliveryCount, 2); - should.equal(deferredMsgs.messageId, testMessages[0].messageId); + await testPeekMsgsLength(receiverClient, 0); + } - await deferredMsgs.complete(); + async function testDefer( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + const deferredMsg = await deferMessage(senderClient, receiverClient); + const sequenceNumber = deferredMsg.sequenceNumber; + if (!sequenceNumber) { + throw "Sequence Number can not be null"; + } + await deferredMsg.defer(); + await completeDeferredMessage(receiverClient, sequenceNumber, 2); + } - await testPeekMsgsLength(subscriptionClient, 0); + it("Queue: Deferring a deferred message puts it back to the deferred queue.", async function(): Promise< + void + > { + await testDefer(queueClient, queueClient); }); - it("Abandon a message received from dead letter queue", async function(): Promise { - await queueClient.send(testMessages[0]); - const receivedMsgs = await queueClient.receiveBatch(1); + it("Subscription: Deferring a deferred message puts it back to the deferred queue.", async function(): Promise< + void + > { + await testDefer(topicClient, subscriptionClient); + }); - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].deliveryCount, 0); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); + async function testDeadletter( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + deadletterClient: QueueClient | SubscriptionClient + ): Promise { + const deferredMsg = await deferMessage(senderClient, receiverClient); - await receivedMsgs[0].deadLetter(); + await deferredMsg.deadLetter(); - await testPeekMsgsLength(subscriptionClient, 0); + await testPeekMsgsLength(receiverClient, 0); - const deadletterQueueClient = await getdeadletterQueueClient(); - const deadLetterMsgs = await deadletterQueueClient.receiveBatch(1); + const deadLetterMsgs = await deadletterClient.receiveBatch(1); should.equal(deadLetterMsgs.length, 1); should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].deliveryCount, 0); + should.equal(deadLetterMsgs[0].deliveryCount, 1); should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); - await deadLetterMsgs[0].abandon(); + await deadLetterMsgs[0].complete(); + + await testPeekMsgsLength(deadletterClient, 0); + } + + it("Queue: Deadlettering a deferred message moves it to dead letter queue.", async function(): Promise< + void + > { + await testDeadletter(queueClient, queueClient, deadletterQueueClient); + }); + + it("Subscription: Deadlettering a deferred message moves it to dead letter queue.", async function(): Promise< + void + > { + await testDeadletter(topicClient, subscriptionClient, deadletterSubscriptionClient); + }); + + async function testAbandon( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + const deferredMsg = await deferMessage(senderClient, receiverClient); + const sequenceNumber = deferredMsg.sequenceNumber; + if (!sequenceNumber) { + throw "Sequence Number can not be null"; + } + await deferredMsg.abandon(); + await completeDeferredMessage(receiverClient, sequenceNumber, 2); + } - const abandonedMsgs = await deadletterQueueClient.receiveBatch(1); + it("Queue: Abandoning a deferred message puts it back to the deferred queue.", async function(): Promise< + void + > { + await testAbandon(queueClient, queueClient); + }); - should.equal(abandonedMsgs.length, 1); - should.equal(abandonedMsgs[0].body, testMessages[0].body); - should.equal(abandonedMsgs[0].deliveryCount, 0); - should.equal(abandonedMsgs[0].messageId, testMessages[0].messageId); + it("Subscription: Abandoning a deferred message puts it back to the deferred queue.", async function(): Promise< + void + > { + await testAbandon(topicClient, subscriptionClient); + }); +}); - await abandonedMsgs[0].complete(); +describe("Abandon/Defer/Deadletter deadlettered message", () => { + beforeEach(async () => { + await beforeEachTest(); + }); - await testPeekMsgsLength(deadletterQueueClient, 0); + afterEach(async () => { + await afterEachTest(); }); - it("Abandon a message received from dead letter subscription", async function(): Promise { - await topicClient.send(testMessages[0]); - const receivedMsgs = await subscriptionClient.receiveBatch(1); + async function deadLetterMessage( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + deadletterClient: QueueClient | SubscriptionClient + ): Promise { + await senderClient.send(testMessages[0]); + const receivedMsgs = await receiverClient.receiveBatch(1); should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].deliveryCount, 0); should.equal(receivedMsgs[0].body, testMessages[0].body); should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); + should.equal(receivedMsgs[0].deliveryCount, 0); await receivedMsgs[0].deadLetter(); - await testPeekMsgsLength(subscriptionClient, 0); + await testPeekMsgsLength(receiverClient, 0); - const deadletterSubscriptionClient = await getdeadletterSubscriptionClient(); - const deadLetterMsgs = await deadletterSubscriptionClient.receiveBatch(1); + const deadLetterMsgs = await deadletterClient.receiveBatch(1); should.equal(deadLetterMsgs.length, 1); should.equal(deadLetterMsgs[0].body, testMessages[0].body); + should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); should.equal(deadLetterMsgs[0].deliveryCount, 0); + + return deadLetterMsgs[0]; + } + + async function completeDeadLetteredMessage( + deadletterClient: QueueClient | SubscriptionClient, + expectedDeliverCount: number + ): Promise { + const deadLetterMsgs = await deadletterClient.receiveBatch(1); + + should.equal(deadLetterMsgs.length, 1); + should.equal(deadLetterMsgs[0].body, testMessages[0].body); should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); + should.equal(deadLetterMsgs[0].deliveryCount, expectedDeliverCount); + + await deadLetterMsgs[0].complete(); + await testPeekMsgsLength(deadletterClient, 0); + } - await deadLetterMsgs[0].abandon(); + async function testDeadLetter( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + deadletterClient: QueueClient | SubscriptionClient + ): Promise { + const deadLetterMsg = await deadLetterMessage(senderClient, receiverClient, deadletterClient); - const abandonedMsgs = await deadletterSubscriptionClient.receiveBatch(1); + await deadLetterMsg.deadLetter().catch((err) => { + should.equal(err.name, "InvalidOperationError"); + errorWasThrown = true; + }); - should.equal(abandonedMsgs.length, 1); - should.equal(abandonedMsgs[0].body, testMessages[0].body); - should.equal(abandonedMsgs[0].deliveryCount, 0); - should.equal(abandonedMsgs[0].messageId, testMessages[0].messageId); + should.equal(errorWasThrown, true); - await abandonedMsgs[0].complete(); + await completeDeadLetteredMessage(deadletterClient, 0); + } - await testPeekMsgsLength(deadletterSubscriptionClient, 0); + it("Queue: Throws error when dead lettering a dead lettered message", async function(): Promise< + void + > { + await testDeadLetter(queueClient, queueClient, deadletterQueueClient); }); - it("Defer a message received from dead letter queue", async function(): Promise { - await queueClient.send(testMessages[0]); - const receivedMsgs = await queueClient.receiveBatch(1); + it("Subscription: Throws error when dead lettering a dead lettered message", async function(): Promise< + void + > { + await testDeadLetter(topicClient, subscriptionClient, deadletterSubscriptionClient); + }); - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); + async function testAbandon( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + deadletterClient: QueueClient | SubscriptionClient + ): Promise { + const deadLetterMsg = await deadLetterMessage(senderClient, receiverClient, deadletterClient); - await receivedMsgs[0].deadLetter(); + await deadLetterMsg.abandon(); - await testPeekMsgsLength(queueClient, 0); + await completeDeadLetteredMessage(deadletterClient, 0); + } - const deadletterQueueClient = await getdeadletterQueueClient(); - const deadLetterMsgs = await deadletterQueueClient.receiveBatch(1); + it("Queue: Abandon a message received from dead letter queue", async function(): Promise { + await testAbandon(queueClient, queueClient, deadletterQueueClient); + }); - should.equal(deadLetterMsgs.length, 1); - should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); + it("Subscription: Abandon a message received from dead letter queue", async function(): Promise< + void + > { + await testAbandon(topicClient, subscriptionClient, deadletterSubscriptionClient); + }); + + async function testDefer( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + deadletterClient: QueueClient | SubscriptionClient + ): Promise { + const deadLetterMsg = await deadLetterMessage(senderClient, receiverClient, deadletterClient); - if (!deadLetterMsgs[0].sequenceNumber) { + if (!deadLetterMsg.sequenceNumber) { throw "Sequence Number can not be null"; } - const sequenceNumber = deadLetterMsgs[0].sequenceNumber; - await deadLetterMsgs[0].defer(); - const deferredMsgs = await deadletterQueueClient.receiveDeferredMessage(sequenceNumber); + const sequenceNumber = deadLetterMsg.sequenceNumber; + await deadLetterMsg.defer(); + + const deferredMsgs = await deadletterClient.receiveDeferredMessage(sequenceNumber); if (!deferredMsgs) { throw "No message received for sequence number"; } @@ -807,122 +513,213 @@ describe("ReceiveBatch from Queue/Subscription", function(): void { await deferredMsgs.complete(); - await testPeekMsgsLength(queueClient, 0); + await testPeekMsgsLength(receiverClient, 0); - await testPeekMsgsLength(deadletterQueueClient, 0); - }); + await testPeekMsgsLength(deadletterClient, 0); + } - it("Defer a message received from dead letter subscription", async function(): Promise { - await topicClient.send(testMessages[0]); - const receivedMsgs = await subscriptionClient.receiveBatch(1); + it("Queue: Defer a message received from dead letter queue", async function(): Promise { + await testDefer(queueClient, queueClient, deadletterQueueClient); + }); - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); + it("Subscription: Defer a message received from dead letter queue", async function(): Promise< + void + > { + await testDefer(topicClient, subscriptionClient, deadletterSubscriptionClient); + }); +}); - await receivedMsgs[0].deadLetter(); +describe("Multiple ReceiveBatch calls", () => { + beforeEach(async () => { + await beforeEachTest(); + }); - await testPeekMsgsLength(subscriptionClient, 0); + afterEach(async () => { + await afterEachTest(); + }); - const deadletterSubscriptionClient = await getdeadletterSubscriptionClient(); + // We use an empty queue/topic here so that the first receiveBatch call takes time to return + async function testParallelReceiveBatchCalls( + receiverClient: QueueClient | SubscriptionClient + ): Promise { + const firstBatchPromise = receiverClient.receiveBatch(1, 10); + await delay(5000); + const secondBatchPromise = receiverClient.receiveBatch(1, 10).catch((err) => { + should.equal(err.name, "Error"); + errorWasThrown = true; + }); + await Promise.all([firstBatchPromise, secondBatchPromise]); + should.equal(errorWasThrown, true); + } - const deadLetterMsgs = await deadletterSubscriptionClient.receiveBatch(1); + it("Queue: Throws error when ReceiveBatch is called while the previous call is not done", async function(): Promise< + void + > { + await testParallelReceiveBatchCalls(queueClient); + }); - should.equal(deadLetterMsgs.length, 1); - should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); + it("Subscription: Throws error when ReceiveBatch is called while the previous call is not done", async function(): Promise< + void + > { + await testParallelReceiveBatchCalls(subscriptionClient); + }); - if (!deadLetterMsgs[0].sequenceNumber) { - throw "Sequence Number can not be null"; - } - const sequenceNumber = deadLetterMsgs[0].sequenceNumber; - await deadLetterMsgs[0].defer(); + // We test for mutilple receiveBatch specifically to ensure that batchingRecevier on a client is reused + // See https://github.com/Azure/azure-service-bus-node/issues/31 + async function testSequentialReceiveBatchCalls( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + await senderClient.sendBatch(testMessages); + const msgs1 = await receiverClient.receiveBatch(1); + const msgs2 = await receiverClient.receiveBatch(1); - const deferredMsgs = await deadletterSubscriptionClient.receiveDeferredMessage(sequenceNumber); - if (!deferredMsgs) { - throw "No message received for sequence number"; - } - should.equal(deferredMsgs.body, testMessages[0].body); - should.equal(deferredMsgs.messageId, testMessages[0].messageId); + // Results are checked after both receiveBatches are done to ensure that the second call doesnt + // affect the result from the first one. + should.equal(Array.isArray(msgs1), true); + should.equal(msgs1.length, 1); + should.equal(msgs1[0].body, testMessages[0].body); + should.equal(msgs1[0].messageId, testMessages[0].messageId); - await deferredMsgs.complete(); + should.equal(Array.isArray(msgs2), true); + should.equal(msgs2.length, 1); + should.equal(msgs2[0].body, testMessages[1].body); + should.equal(msgs2[0].messageId, testMessages[1].messageId); - await testPeekMsgsLength(subscriptionClient, 0); + await msgs1[0].complete(); + await msgs2[0].complete(); + } - await testPeekMsgsLength(deadletterSubscriptionClient, 0); + it("Queue: Multiple receiveBatch using Queues/Subscriptions", async function(): Promise { + await testSequentialReceiveBatchCalls(queueClient, queueClient); }); - const testError = (err: Error) => { - should.equal(err.name, "InvalidOperationError"); - errorWasThrown = true; - }; - - it("Dead letter a message received from dead letter queue", async function(): Promise { - await queueClient.send(testMessages[0]); - const receivedMsgs = await queueClient.receiveBatch(1); + it("Subscription: Multiple receiveBatch using Queues/Subscriptions", async function(): Promise< + void + > { + await testSequentialReceiveBatchCalls(topicClient, subscriptionClient); + }); +}); - should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body, testMessages[0].body); - should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); +describe("Other ReceiveBatch Tests", function(): void { + beforeEach(async () => { + await beforeEachTest(); + }); - await receivedMsgs[0].deadLetter(); + afterEach(async () => { + await afterEachTest(); + }); - await testPeekMsgsLength(queueClient, 0); + async function testAbandonMsgsTillMaxDeliveryCount( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient, + deadLetterClient: QueueClient | SubscriptionClient + ): Promise { + await senderClient.send(testMessages[0]); + let abandonMsgCount = 0; - const deadletterQueueClient = await getdeadletterQueueClient(); - let deadLetterMsgs = await deadletterQueueClient.receiveBatch(1); + while (abandonMsgCount < maxDeliveryCount) { + const receivedMsgs = await receiverClient.receiveBatch(1); - should.equal(deadLetterMsgs.length, 1); - should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); + should.equal(receivedMsgs.length, 1); + should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); + should.equal(receivedMsgs[0].deliveryCount, abandonMsgCount); + abandonMsgCount++; - await deadLetterMsgs[0].deadLetter().catch((err) => testError(err)); + await receivedMsgs[0].abandon(); + } - should.equal(errorWasThrown, true); + await testPeekMsgsLength(receiverClient, 0); - deadLetterMsgs = await deadletterQueueClient.receiveBatch(1); + const deadLetterMsgs = await deadLetterClient.receiveBatch(1); + should.equal(Array.isArray(deadLetterMsgs), true); should.equal(deadLetterMsgs.length, 1); should.equal(deadLetterMsgs[0].body, testMessages[0].body); should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); await deadLetterMsgs[0].complete(); - await testPeekMsgsLength(deadletterQueueClient, 0); + + await testPeekMsgsLength(deadLetterClient, 0); + } + + it("Queue: Message abandoned more than maxDeliveryCount goes to dead letter queue", async function(): Promise< + void + > { + await testAbandonMsgsTillMaxDeliveryCount(queueClient, queueClient, deadletterQueueClient); }); - it("Dead letter a message received from dead letter subscription", async function(): Promise< + it("Subscription: Message abandoned more than maxDeliveryCount goes to dead letter queue", async function(): Promise< void > { - await topicClient.send(testMessages[0]); - const receivedMsgs = await subscriptionClient.receiveBatch(1); + await testAbandonMsgsTillMaxDeliveryCount( + topicClient, + subscriptionClient, + deadletterSubscriptionClient + ); + }); + + async function testNoSettlement( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + await senderClient.send(testMessages[0]); + + let receivedMsgs = await receiverClient.receiveBatch(1); should.equal(receivedMsgs.length, 1); - should.equal(receivedMsgs[0].body, testMessages[0].body); + should.equal(receivedMsgs[0].deliveryCount, 0); should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - await receivedMsgs[0].deadLetter(); + await testPeekMsgsLength(receiverClient, 1); - await testPeekMsgsLength(subscriptionClient, 0); + receivedMsgs = await receiverClient.receiveBatch(1); - const deadletterSubscriptionClient = await getdeadletterSubscriptionClient(); + should.equal(receivedMsgs.length, 1); + should.equal(receivedMsgs[0].deliveryCount, 1); + should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - let deadLetterMsgs = await deadletterSubscriptionClient.receiveBatch(1); + await receivedMsgs[0].complete(); + } - should.equal(deadLetterMsgs.length, 1); - should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); + it("Queue: No settlement of the message is retained with incremented deliveryCount", async function(): Promise< + void + > { + await testNoSettlement(queueClient, queueClient); + }); - await deadLetterMsgs[0].deadLetter().catch((err) => testError(err)); + it("Subscription: No settlement of the message is retained with incremented deliveryCount", async function(): Promise< + void + > { + await testNoSettlement(topicClient, subscriptionClient); + }); - should.equal(errorWasThrown, true); + async function testAskForMore( + senderClient: QueueClient | TopicClient, + receiverClient: QueueClient | SubscriptionClient + ): Promise { + await senderClient.send(testMessages[0]); + const receivedMsgs = await receiverClient.receiveBatch(2); - deadLetterMsgs = await deadletterSubscriptionClient.receiveBatch(1); + should.equal(receivedMsgs.length, 1); + should.equal(receivedMsgs[0].body, testMessages[0].body); + should.equal(receivedMsgs[0].messageId, testMessages[0].messageId); - should.equal(deadLetterMsgs.length, 1); - should.equal(deadLetterMsgs[0].body, testMessages[0].body); - should.equal(deadLetterMsgs[0].messageId, testMessages[0].messageId); + await receivedMsgs[0].complete(); - await deadLetterMsgs[0].complete(); + await testPeekMsgsLength(receiverClient, 0); + } + + it("Queue: Receive n messages but queue only has m messages, where m < n", async function(): Promise< + void + > { + await testAskForMore(queueClient, queueClient); + }); + + it("Subscription: Receive n messages but subscription only has m messages, where m < n", async function(): Promise< + void + > { + await testAskForMore(topicClient, subscriptionClient); }); it("Throws error when call the second ReceiveBatch while the first one is not done", async function(): Promise<