diff --git a/lib/session.ts b/lib/session.ts index a6c5500..499cbdf 100644 --- a/lib/session.ts +++ b/lib/session.ts @@ -143,9 +143,9 @@ export class Session extends Entity { * - **Rejects** the promise with an AmqpError when rhea emits the "session_error" event while trying * to close an amqp session. */ - close(): Promise { - this.removeAllListeners(); - return new Promise((resolve, reject) => { + async close(): Promise { + + const closePromise = new Promise((resolve, reject) => { log.error("[%s] The amqp session '%s' is open ? -> %s", this.connection.id, this.id, this.isOpen()); if (this.isOpen()) { let onError: Func; @@ -203,6 +203,13 @@ export class Session extends Entity { return resolve(); } }); + + try { + await closePromise; + } finally { + this.removeAllListeners(); + } + } /** diff --git a/test/session.spec.ts b/test/session.spec.ts index 5e95bbe..98c13be 100644 --- a/test/session.spec.ts +++ b/test/session.spec.ts @@ -77,5 +77,61 @@ describe("Session", () => { // Open the session. session.begin(); }); + + it("sessionClose", (done: Function) => { + const session = new Session( + connection, + connection["_connection"].create_session() + ); + + session.on(SessionEvents.sessionOpen, async () => { + await session.close(); + assert.strictEqual(session.listeners(SessionEvents.sessionOpen).length, 0); + assert.strictEqual(session.listeners(SessionEvents.sessionClose).length, 0); + }); + + session.on(SessionEvents.sessionClose, (event) => { + assert.exists(event, "Expected an AMQP event."); + + done(); + }); + + // Open the session. + session.begin(); + + assert.strictEqual(session.listeners(SessionEvents.sessionOpen).length, 1); + assert.strictEqual(session.listeners(SessionEvents.sessionClose).length, 1); + }); + + it("sessionError", (done: Function) => { + const errorCondition = "amqp:connection:forced"; + const errorDescription = "testing error on close"; + mockService.on( + rhea.SessionEvents.sessionOpen, + (context: rhea.EventContext) => { + context.session?.close({ + condition: errorCondition, + description: errorDescription, + }); + } + ); + + const session = new Session( + connection, + connection["_connection"].create_session() + ); + + session.on(SessionEvents.sessionError, async (event) => { + assert.exists(event, "Expected an AMQP event."); + const error = event.session?.error as rhea.ConnectionError; + assert.exists(error, "Expected an AMQP error."); + assert.strictEqual(error.condition, errorCondition); + assert.strictEqual(error.description, errorDescription); + await session.close(); + done(); + }); + + session.begin(); + }); }); });