Skip to content

Commit

Permalink
fix: fix error emit error
Browse files Browse the repository at this point in the history
* issue with a looping connect code

Closes #56
  • Loading branch information
Bugs5382 committed Dec 28, 2023
1 parent 65f890f commit 3daa8e8
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 21 deletions.
39 changes: 39 additions & 0 deletions __tests__/hl7.end2end.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,45 @@ describe('node hl7 end to end - client', () => {
})
})

describe('server/client failure checks', () => {
test('...host does not exist, timeout', async () => {

const client = new Client({ host: '192.0.2.1' })

// forced to lower connection timeout so unit testing is not slow
const ob = client.createOutbound({ port: 1234, connectionTimeout: 100 }, async () => {})

await expectEvent(ob, 'timeout')

})

test('...host exist, but not listening on the port, timeout', async () => {

const server = new Server({ bindAddress: '0.0.0.0' })
const listener = server.createInbound({ port: 3000 }, async () => {})

await expectEvent(listener, 'listen')

const client = new Client({ host: '0.0.0.0' })

// forced to lower connection timeout so unit testing is not slow
const ob = client.createOutbound({ port: 1234, connectionTimeout: 10 }, async () => {})

// we couldn't connect
await expectEvent(ob, 'error')

// we are attempting to reconnect
await expectEvent(ob, 'connecting')

// close ob now. were done
await ob.close()

// close the server connection
await listener.close()

})
})

describe('...send message, get proper ACK', () => {
let LISTEN_PORT: number

Expand Down
68 changes: 59 additions & 9 deletions src/client/hl7Outbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ export class HL7Outbound extends EventEmitter {
_pendingSetup: Promise<boolean> | boolean
/** @internal */
private _responseBuffer: string
/** @internal */
private _initialConnection: boolean;

/**
* @since 1.0.0
Expand All @@ -58,11 +60,14 @@ export class HL7Outbound extends EventEmitter {
constructor (client: Client, props: ClientListenerOptions, handler: OutboundHandler) {
super()
this._awaitingResponse = false
this._initialConnection = false
this._connectionTimer = undefined
this._handler = handler
this._main = client
this._nodeId = randomString(5)

this._opt = normalizeClientListenerOptions(props)

this._pendingSetup = true
this._sockets = new Map()
this._retryCount = 1
Expand All @@ -81,7 +86,7 @@ export class HL7Outbound extends EventEmitter {
* @since 1.0.0
* @example
* ```ts
* OB.close()
* OB.close()
* ```
*/
async close (): Promise<boolean> {
Expand Down Expand Up @@ -128,7 +133,7 @@ export class HL7Outbound extends EventEmitter {
* @example
* ```ts
*
* // OB was set from the orginial 'createOutbound' method.
* // the OB was set from the orginial 'createOutbound' method.
*
* let message = new Message({
* messageHeader: {
Expand Down Expand Up @@ -225,38 +230,73 @@ export class HL7Outbound extends EventEmitter {

/** @internal */
private _connect (): Socket {
this._retryTimer = undefined

let socket: Socket
const host = this._main._opt.host
const port = this._opt.port

if (typeof this._main._opt.tls !== 'undefined') {
socket = tls.connect({ host, port, ...this._main._opt.socket, ...this._main._opt.tls }, () => this._listener(socket))
socket = tls.connect({ host, port, timeout: this._opt.connectionTimeout, ...this._main._opt.socket, ...this._main._opt.tls }, () => this._listener(socket))
} else {
socket = net.createConnection({ host, port }, () => this._listener(socket))
socket = net.connect({ host, port, timeout: this._opt.connectionTimeout }, () => this._listener(socket))
}

socket.on('error', err => {
socket.setNoDelay(true)

// send this to tell we are pending connecting to the host/port
this.emit('connecting')

let connectionError: Error | undefined

if (this._opt.connectionTimeout > 0) {
this._connectionTimer = setTimeout(() => {
socket.destroy(new HL7FatalError(500, 'Connection timed out.'))
this._removeSocket(this._nodeId)
}, this._opt.connectionTimeout)
}

socket.on('timeout', async () => {
this._readyState = ReadyState.CLOSING
this._removeSocket(this._nodeId)
this.emit('client.error', err, this._nodeId)
this.emit('timeout')
})

socket.on('ready', () => {
this.emit('ready')
// fired right after successful connection,
// tell the user ready to be able to send messages to server/broker
})

socket.on('error', err => {
connectionError = connectionError || err
})

socket.on('close', () => {
if (this._readyState === ReadyState.CLOSING) {
this._readyState = ReadyState.CLOSED
this._reset()
} else {
connectionError = connectionError || new HL7FatalError(500, 'Socket closed unexpectedly by server.')
const retryHigh = typeof this._opt.retryHigh === 'undefined' ? this._main._opt.retryHigh : this._opt.retryLow
const retryLow = typeof this._opt.retryLow === 'undefined' ? this._main._opt.retryLow : this._opt.retryLow
const retryCount = this._retryCount++
const delay = expBackoff(retryLow, retryHigh, retryCount)
this._readyState = ReadyState.OPEN
this._reset()
this._retryTimer = setTimeout(this._connect, delay)
if (retryCount <= 1) {
this.emit('error')
if ((retryCount <= 1) && (retryCount < this._opt.maxConnectionAttempts)) {
this.emit('error', connectionError)
} else if ((retryCount > this._opt.maxConnectionAttempts) && !this._initialConnection) {
// this._removeSocket(this._nodeId)
this.emit('timeout')
}
}
})

socket.on('connect', () => {
// we have connected. we should now follow trying to reconnect until total failure
this._initialConnection = true
this._readyState = ReadyState.CONNECTED
this.emit('connect', true, this._socket)
})
Expand Down Expand Up @@ -285,11 +325,13 @@ export class HL7Outbound extends EventEmitter {

socket.on('end', () => {
this._removeSocket(this._nodeId)
this.emit('client.end')
this.emit('end')
})

socket.unref()

this._addSocket(this._nodeId, socket, true)

return socket
}

Expand Down Expand Up @@ -334,4 +376,12 @@ export class HL7Outbound extends EventEmitter {
}
this._sockets.delete(nodeId)
}

/** @internal */
private _reset() {
if (typeof this._connectionTimer !== 'undefined') {
clearTimeout(this._connectionTimer)
}
this._connectionTimer = undefined
}
}
29 changes: 17 additions & 12 deletions src/utils/normalizedClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const DEFAULT_LISTEN_CLIENT_OPTS = {
connectionTimeout: 10000,
encoding: 'utf-8',
maxAttempts: 10,
maxConnectionAttempts: 30,
maxConnections: 10,
retryHigh: 30000,
retryLow: 1000,
Expand All @@ -44,10 +45,18 @@ export interface ClientOptions {
/** Keep the connection alive after sending data and getting a response.
* @default true */
keepAlive?: boolean
/** Max Connections this connection makes.
/** Max attempts
* to send the message before an error is thrown if we are in the process of re-attempting to connect to the server.
* Has to be greater than 1. You cannot exceed 50.
* @default 10 */
maxAttempts?: number
/** If we are trying to establish an initial connection to the server, let's end it after this many attempts.
* The time between re-connects is determined by {@see connectionTimeout}.
* You cannot exceed 50.
* @since 1.1.0
* @default 30
*/
maxConnectionAttempts?: number
/** Max delay, in milliseconds, for exponential-backoff when reconnecting
* @default 30_000 */
retryHigh?: number
Expand All @@ -61,25 +70,15 @@ export interface ClientOptions {
tls?: boolean | TLSOptions
}

export interface ClientListenerOptions {
export interface ClientListenerOptions extends ClientOptions {
/** Encoding of the messages we expect from the HL7 message.
* @default "utf-8"
*/
encoding?: BufferEncoding
/** Max Connections this connection makes.
* Has to be greater than 1. You cannot exceed 50.
* @default 10 */
maxAttempts?: number
/** Max Connections this connection makes.
* Has to be greater than 1.
* @default 10 */
maxConnections?: number
/** Max delay, in milliseconds, for exponential-backoff when reconnecting
* @default 30_000 */
retryHigh?: number
/** Step size, in milliseconds, for exponential-backoff when reconnecting
* @default 1000 */
retryLow?: number
/** The port we should connect on the server. */
port: number
/** Wait for ACK **/
Expand All @@ -92,11 +91,14 @@ type ValidatedClientKeys =
| 'maxAttempts'

type ValidatedClientListenerKeys =
| 'connectionTimeout'
| 'port'
| 'maxAttempts'
| 'maxConnectionAttempts'
| 'maxConnections'

interface ValidatedClientOptions extends Pick<Required<ClientOptions>, ValidatedClientKeys> {
connectionTimeout: number
host: string
maxAttempts: number
retryHigh: number
Expand All @@ -106,9 +108,11 @@ interface ValidatedClientOptions extends Pick<Required<ClientOptions>, Validated
}

interface ValidatedClientListenerOptions extends Pick<Required<ClientListenerOptions>, ValidatedClientListenerKeys> {
connectionTimeout: number
encoding: BufferEncoding
port: number
maxAttempts: number
maxConnectionAttempts: number
maxConnections: number
retryHigh: number
retryLow: number
Expand Down Expand Up @@ -163,6 +167,7 @@ export function normalizeClientListenerOptions (raw?: ClientListenerOptions): Va

assertNumber(props, 'connectionTimeout', 0)
assertNumber(props, 'maxAttempts', 1, 50)
assertNumber(props, 'maxConnectionAttempts', 1, 50)
assertNumber(props, 'maxConnections', 1, 50)
assertNumber(props, 'port', 0, 65353)

Expand Down

0 comments on commit 3daa8e8

Please sign in to comment.