Skip to content

Commit

Permalink
feat: extract listeners from client-h1 (#3725)
Browse files Browse the repository at this point in the history
  • Loading branch information
Uzlopak authored Oct 12, 2024
1 parent d6c44f3 commit 1bc83ea
Showing 1 changed file with 93 additions and 83 deletions.
176 changes: 93 additions & 83 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ const {
kMaxResponseSize,
kOnError,
kResume,
kHTTPContext
kHTTPContext,
kClosed
} = require('../core/symbols.js')

const constants = require('../llhttp/constants.js')
const EMPTY_BUF = Buffer.alloc(0)
const FastBuffer = Buffer[Symbol.species]
const addListener = util.addListener
const removeAllListeners = util.removeAllListeners

let extractBody
Expand Down Expand Up @@ -779,87 +779,13 @@ async function connectH1 (client, socket) {
socket[kBlocking] = false
socket[kParser] = new Parser(client, socket, llhttpInstance)

addListener(socket, 'error', function (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
util.addListener(socket, 'error', onHttpSocketError)
util.addListener(socket, 'readable', onHttpSocketReadable)
util.addListener(socket, 'end', onHttpSocketEnd)
util.addListener(socket, 'close', onHttpSocketClose)

const parser = this[kParser]

// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
// to the user.
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so for as a valid response.
parser.onMessageComplete()
return
}

this[kError] = err

this[kClient][kOnError](err)
})
addListener(socket, 'readable', function () {
this[kParser]?.readMore()
})
addListener(socket, 'end', function () {
const parser = this[kParser]

if (parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
return
}

util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})
addListener(socket, 'close', function () {
const parser = this[kParser]

if (parser) {
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
}

this[kParser].destroy()
this[kParser] = null
}

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

const client = this[kClient]

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...

if (client.destroyed) {
assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
util.errorRequest(client, request, err)
}
} else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null

util.errorRequest(client, request, err)
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
})

let closed = false
socket.on('close', () => {
closed = true
})
socket[kClosed] = false
socket.on('close', onSocketClose)

return {
version: 'h1',
Expand All @@ -875,7 +801,7 @@ async function connectH1 (client, socket) {
* @param {() => void} callback
*/
destroy (err, callback) {
if (closed) {
if (socket[kClosed]) {
queueMicrotask(callback)
} else {
socket.on('close', callback)
Expand Down Expand Up @@ -931,6 +857,90 @@ async function connectH1 (client, socket) {
}
}

function onHttpSocketError (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

const parser = this[kParser]

// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
// to the user.
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so for as a valid response.
parser.onMessageComplete()
return
}

this[kError] = err

this[kClient][kOnError](err)
}

function onHttpSocketReadable () {
this[kParser]?.readMore()
}

function onHttpSocketEnd () {
const parser = this[kParser]

if (parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
return
}

util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
}

function onHttpSocketClose () {
const parser = this[kParser]

if (parser) {
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
}

this[kParser].destroy()
this[kParser] = null
}

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

const client = this[kClient]

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...

if (client.destroyed) {
assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
util.errorRequest(client, request, err)
}
} else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null

util.errorRequest(client, request, err)
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
}

function onSocketClose () {
this[kClosed] = true
}

/**
* @param {import('./client.js')} client
*/
Expand Down

0 comments on commit 1bc83ea

Please sign in to comment.