Skip to content

Commit

Permalink
Streaming Receiver Tests - Part 3 (Azure#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
HarshaNalluru authored and ramya-rao-a committed Jan 12, 2019
1 parent daac149 commit f2aa4d4
Showing 1 changed file with 197 additions and 1 deletion.
198 changes: 197 additions & 1 deletion test/streamingReceiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import {
ServiceBusMessage,
TopicClient,
SubscriptionClient,
delay
delay,
ReceiveHandler
} from "../lib";

const testMessages: SendableMessageInfo[] = [
Expand Down Expand Up @@ -402,6 +403,55 @@ describe("Streaming Receiver from Queue/Subscription", function(): void {
await testPeekMsgsLength(subscriptionClient, 0);
});

it("With auto-complete enabled, manual abandon in the Queue by the user should not result in errors", async function(): Promise<
void
> {
await queueClient.send(testMessages[0]);
const receiveListener: ReceiveHandler = await queueClient.receive(
(msg: ServiceBusMessage) => {
return msg.abandon().then(() => {
return receiveListener.stop();
});
},
(err: Error) => {
should.not.exist(err);
},
{ maxAutoRenewDurationInSeconds: 0 }
);
await delay(4000);

const receivedMsgs = await queueClient.receiveBatch(1);
should.equal(receivedMsgs.length, 1);
should.equal(receivedMsgs[0].messageId, testMessages[0].messageId);
await receivedMsgs[0].complete();
await testPeekMsgsLength(queueClient, 0);
});

it("With auto-complete enabled, manual abandon in the Subscription by the user should not result in errors", async function(): Promise<
void
> {
await topicClient.send(testMessages[0]);
const receiveListener: ReceiveHandler = await subscriptionClient.receive(
(msg: ServiceBusMessage) => {
return msg.abandon().then(() => {
return receiveListener.stop();
});
},
(err: Error) => {
should.not.exist(err);
},
{ maxAutoRenewDurationInSeconds: 0 }
);

await delay(4000);

const receivedMsgs = await subscriptionClient.receiveBatch(1);
should.equal(receivedMsgs.length, 1);
should.equal(receivedMsgs[0].messageId, testMessages[0].messageId);
await receivedMsgs[0].complete();
await testPeekMsgsLength(subscriptionClient, 0);
});

it("With auto-complete enabled, manual deadletter in the Queue by the user should not result in errors", async function(): Promise<
void
> {
Expand Down Expand Up @@ -479,6 +529,93 @@ describe("Streaming Receiver from Queue/Subscription", function(): void {
await testPeekMsgsLength(deadletterSubscriptionClient, 0);
});

it("With auto-complete enabled, manual defer in the Queue by the user should not result in errors", async function(): Promise<
void
> {
await queueClient.sendBatch(testMessages);

let seq0: any = 0;
let seq1: any = 0;
const receiveListener = await queueClient.receive(
(msg: ServiceBusMessage) => {
if (msg.messageId === testMessages[0].messageId) {
seq0 = msg.sequenceNumber;
} else if (msg.messageId === testMessages[1].messageId) {
seq1 = msg.sequenceNumber;
}
return msg.defer();
},
(err: Error) => {
should.not.exist(err);
}
);

await delay(4000);

await receiveListener.stop();
const deferredMsg0 = await queueClient.receiveDeferredMessage(seq0);
const deferredMsg1 = await queueClient.receiveDeferredMessage(seq1);
if (!deferredMsg0) {
throw "No message received for sequence number";
}
if (!deferredMsg1) {
throw "No message received for sequence number";
}
should.equal(deferredMsg0.body, testMessages[0].body);
should.equal(deferredMsg0.messageId, testMessages[0].messageId);

should.equal(deferredMsg1.body, testMessages[1].body);
should.equal(deferredMsg1.messageId, testMessages[1].messageId);
await deferredMsg0.complete();
await deferredMsg1.complete();

await testPeekMsgsLength(queueClient, 0);
});

it("With auto-complete enabled, manual defer in the Subscription by the user should not result in errors", async function(): Promise<
void
> {
await topicClient.sendBatch(testMessages);

let seq0: any = 0;
let seq1: any = 0;
const receiveListener = await subscriptionClient.receive(
(msg: ServiceBusMessage) => {
if (msg.messageId === testMessages[0].messageId) {
seq0 = msg.sequenceNumber;
} else if (msg.messageId === testMessages[1].messageId) {
seq1 = msg.sequenceNumber;
}
return msg.defer();
},
(err: Error) => {
should.not.exist(err);
}
);

await delay(4000);

await receiveListener.stop();

const deferredMsg0 = await subscriptionClient.receiveDeferredMessage(seq0);
const deferredMsg1 = await subscriptionClient.receiveDeferredMessage(seq1);
if (!deferredMsg0) {
throw "No message received for sequence number";
}
if (!deferredMsg1) {
throw "No message received for sequence number";
}
should.equal(deferredMsg0.body, testMessages[0].body);
should.equal(deferredMsg0.messageId, testMessages[0].messageId);

should.equal(deferredMsg1.body, testMessages[1].body);
should.equal(deferredMsg1.messageId, testMessages[1].messageId);
await deferredMsg0.complete();
await deferredMsg1.complete();

await testPeekMsgsLength(subscriptionClient, 0);
});

it("With auto-complete disabled, deferring a message results in not getting the same message again from queue. The message is then gotten using receiveDefferedMessages", async function(): Promise<
void
> {
Expand Down Expand Up @@ -646,4 +783,63 @@ describe("Streaming Receiver from Queue/Subscription", function(): void {

await testPeekMsgsLength(deadletterSubscriptionClient, 0);
});

it("Second Streaming Receiver call should fail if the first one is not stopped for Queues", async function(): Promise<
void
> {
const receiveListener: ReceiveHandler = await queueClient.receive(
(msg: ServiceBusMessage) => {
return msg.complete();
},
(err: Error) => {
should.not.exist(err);
}
);
await delay(5000);
try {
const receiveListener2 = await queueClient.receive(
(msg: ServiceBusMessage) => {
return Promise.resolve();
},
(err: Error) => {
should.exist(err);
}
);
await receiveListener2.stop();
} catch (err) {
should.equal(!err.message.search("has already been created for the Subscription"), false);
}

await receiveListener.stop();
});

it("Second Streaming Receiver call should fail if the first one is not stopped for Subscriptions", async function(): Promise<
void
> {
const receiveListener: ReceiveHandler = await subscriptionClient.receive(
(msg: ServiceBusMessage) => {
return msg.complete();
},
(err: Error) => {
should.not.exist(err);
}
);
await delay(5000);

try {
const receiveListener2 = await subscriptionClient.receive(
(msg: ServiceBusMessage) => {
return Promise.resolve();
},
(err: Error) => {
should.exist(err);
}
);
await receiveListener2.stop();
} catch (err) {
should.equal(!err.message.search("has already been created for the Subscription"), false);
}

await receiveListener.stop();
});
});

0 comments on commit f2aa4d4

Please sign in to comment.