From 5a1bb2628796d259cbbf575b13f8e1d8d95323b5 Mon Sep 17 00:00:00 2001 From: Shane Froebel Date: Tue, 12 Dec 2023 22:06:54 -0500 Subject: [PATCH] feat: message queue - if the server happens to go offline while sending, it will attempt to send the message, or it will timeout - future build in #21, that the messages could be saved to a RabbitMQ or REDDIS or a FILE for rehydrating #15 [ci skip] --- README.md | 4 +-- src/client/hl7Outbound.ts | 66 ++++++++++++++++++++++++++++++++--- src/utils/normalizedClient.ts | 18 +++++++++- 3 files changed, 81 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 255951a..7410a03 100644 --- a/README.md +++ b/README.md @@ -13,8 +13,8 @@ Benefits: - No dependencies, making this ultra-fast. - Automatically re-connect or retry sending - Written in typescript and published with heavily commented type definitions -- Peer `node-hl7-server' npm package that in conjunction with this one could create a powerfull Hl7 system. -- Works in Windows or Linux based systems +- Peer `node-hl7-server' npm package that in conjunction with this one could create a powerful HL7 system. +- Works in Windows or Linux-based systems ## Table of Contents diff --git a/src/client/hl7Outbound.ts b/src/client/hl7Outbound.ts index 7bdf2bc..f2358b5 100644 --- a/src/client/hl7Outbound.ts +++ b/src/client/hl7Outbound.ts @@ -36,6 +36,8 @@ export class HL7Outbound extends EventEmitter { private readonly _sockets: Map /** @internal */ protected _readyState: READY_STATE + /** @internal */ + _pendingSetup: Promise | boolean constructor (client: Client, props: ClientListenerOptions, handler: OutboundHandler) { super() @@ -45,6 +47,7 @@ export class HL7Outbound extends EventEmitter { this._main = client this._nodeId = randomString(5) this._opt = normalizeClientListenerOptions(props) + this._pendingSetup = true this._sockets = new Map() this._retryCount = 1 this._retryTimer = undefined @@ -66,17 +69,70 @@ export class HL7Outbound extends EventEmitter { * @since 1.0.0 */ async sendMessage (message: Message | Batch): Promise { - // if we are waiting for an ack before we can send something else, and we are in that process. - if (this._opt.waitAck && this._awaitingResponse) { - throw new HL7FatalError(500, 'Can\'t send message while we are waiting for a response.') + let attempts = 0 + const maxAttempts = typeof this._opt.maxAttempts === 'undefined' ? this._main._opt.maxAttempts : this._opt.maxAttempts + + const checkConnection = async (): Promise => { + return this._readyState === READY_STATE.CONNECTED + } + + const checkAck = async (): Promise => { + return this._awaitingResponse } + const checkSend = async (_message: string): Promise => { + // noinspection InfiniteLoopJS + while (true) { + try { + // first, if we are closed, sorry, no more sending messages + if ((this._readyState === READY_STATE.CLOSED) || (this._readyState === READY_STATE.CLOSING)) { + // noinspection ExceptionCaughtLocallyJS + throw new HL7FatalError(500, 'In an invalid state to be able to send message.') + } + if (this._readyState !== READY_STATE.CONNECTED) { + // if we are not connected, + // check to see if we are now connected. + if (this._pendingSetup === false) { + // @todo in the future, add here to store the messages in a file or a + this._pendingSetup = checkConnection().finally(() => { this._pendingSetup = false }) + } + } else if (this._readyState === READY_STATE.CONNECTED && this._opt.waitAck && this._awaitingResponse) { + // Ok, we ar now conformed connected. + // However, since we are checking + // to make sure we wait for an ACKNOWLEDGEMENT from the server, + // that the message was gotten correctly from the last one we sent. + // We are still waiting, we need to recheck again + // if we are not connected, + // check to see if we are now connected. + if (this._pendingSetup === false) { + this._pendingSetup = checkAck().finally(() => { this._pendingSetup = false }) + } + } + return await this._pendingSetup + } catch (err: any) { + Error.captureStackTrace(err) + if (++attempts >= maxAttempts) { + throw err + } else { + emitter.emit('retry', err) + } + } + } + } + + const emitter = new EventEmitter() + + const theMessage = message.toString() + + // check to see if we should be sending + await checkSend(theMessage) + // ok, if our options are to wait for an acknowledgement, set the var to "true" if (this._opt.waitAck) { this._awaitingResponse = true } - const messageToSend = Buffer.from(message.toString()) + const messageToSend = Buffer.from(theMessage) const header = Buffer.alloc(6) header.writeInt32BE(messageToSend.length + 6, 1) @@ -164,6 +220,8 @@ export class HL7Outbound extends EventEmitter { async close (): Promise { // mark that we set our internal that we are closing, so we do not try to re-connect this._readyState = READY_STATE.CLOSING + // @todo Remove all pending messages that might be pending sending. + // @todo Do we dare save them as a file so if the kube process fails and restarts up, it can send them again?? this._sockets.forEach((socket) => { if (typeof socket.destroyed !== 'undefined') { socket.end() diff --git a/src/utils/normalizedClient.ts b/src/utils/normalizedClient.ts index 15aa7e6..678d424 100644 --- a/src/utils/normalizedClient.ts +++ b/src/utils/normalizedClient.ts @@ -13,6 +13,7 @@ const DEFAULT_CLIENT_OPTS = { const DEFAULT_LISTEN_CLIENT_OPTS = { connectionTimeout: 10000, encoding: 'utf-8', + maxAttempts: 10, maxConnections: 10, retryHigh: 30000, retryLow: 1000, @@ -39,6 +40,10 @@ export interface ClientOptions { /** Keep the connection alive after sending data and getting a response. * @default true */ keepAlive?: boolean + /** Max Connections this connection makes. + * Has to be greater than 1. You cannot exceed 50. + * @default 10 */ + maxAttempts?: number /** Max delay, in milliseconds, for exponential-backoff when reconnecting * @default 30_000 */ retryHigh?: number @@ -57,6 +62,10 @@ export interface ClientListenerOptions { * @default "utf-8" */ encoding?: BufferEncoding + /** Max Connections this connection makes. + * Has to be greater than 1. You cannot exceed 50. + * @default 10 */ + maxAttempts?: number /** Max Connections this connection makes. * Has to be greater than 1. * @default 10 */ @@ -76,12 +85,16 @@ export interface ClientListenerOptions { type ValidatedClientKeys = | 'connectionTimeout' | 'host' + | 'maxAttempts' type ValidatedClientListenerKeys = | 'port' + | 'maxAttempts' + | 'maxConnections' interface ValidatedClientOptions extends Pick, ValidatedClientKeys> { host: string + maxAttempts: number retryHigh: number retryLow: number socket?: TcpSocketConnectOpts @@ -91,6 +104,7 @@ interface ValidatedClientOptions extends Pick, Validated interface ValidatedClientListenerOptions extends Pick, ValidatedClientListenerKeys> { encoding: BufferEncoding port: number + maxAttempts: number maxConnections: number retryHigh: number retryLow: number @@ -122,7 +136,7 @@ export function normalizeClientOptions (raw?: ClientOptions): ValidatedClientOpt } assertNumber(props, 'connectionTimeout', 0) - assertNumber(props, 'maxConnections', 1) + assertNumber(props, 'maxConnections', 1, 50) if (props.tls === true) { props.tls = {} @@ -144,6 +158,8 @@ export function normalizeClientListenerOptions (raw?: ClientListenerOptions): Va } assertNumber(props, 'connectionTimeout', 0) + assertNumber(props, 'maxAttempts', 1, 50) + assertNumber(props, 'maxConnections', 1, 50) assertNumber(props, 'port', 0, 65353) return props