Skip to content

Commit

Permalink
http2: extract listenHandlers and one bugfix (#3722)
Browse files Browse the repository at this point in the history
* fix: restructure client-h2

* extract onHttp2SessionClose and rename onHTTP2GoAway to onHttp2SesssionGoAway

* fix function header

* extract functions

* extract onSocketClose
  • Loading branch information
Uzlopak authored Oct 13, 2024
1 parent 1bc83ea commit e8c3aba
Showing 1 changed file with 75 additions and 60 deletions.
135 changes: 75 additions & 60 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const {
kHTTP2Session,
kResume,
kSize,
kHTTPContext
kHTTPContext,
kClosed
} = require('../core/symbols.js')

const kOpenStreams = Symbol('open streams')
Expand Down Expand Up @@ -93,82 +94,37 @@ async function connectH2 (client, socket) {
session[kOpenStreams] = 0
session[kClient] = client
session[kSocket] = socket
session[kHTTP2Session] = null

util.addListener(session, 'error', onHttp2SessionError)
util.addListener(session, 'frameError', onHttp2FrameError)
util.addListener(session, 'end', onHttp2SessionEnd)
util.addListener(session, 'goaway', onHTTP2GoAway)
util.addListener(session, 'close', function () {
const { [kClient]: client } = this
const { [kSocket]: socket } = client

const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))

client[kHTTP2Session] = null

if (client.destroyed) {
assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
util.errorRequest(client, request, err)
}
}
})
util.addListener(session, 'goaway', onHttp2SessionGoAway)
util.addListener(session, 'close', onHttp2SessionClose)

session.unref()

client[kHTTP2Session] = session
socket[kHTTP2Session] = session

util.addListener(socket, 'error', function (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

this[kError] = err

this[kClient][kOnError](err)
})

util.addListener(socket, 'end', function () {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})

util.addListener(socket, 'close', function () {
const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

client[kSocket] = null
util.addListener(socket, 'error', onHttp2SocketError)
util.addListener(socket, 'end', onHttp2SocketEnd)
util.addListener(socket, 'close', onHttp2SocketClose)

if (this[kHTTP2Session] != null) {
this[kHTTP2Session].destroy(err)
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
})

let closed = false
socket.on('close', () => {
closed = true
})
socket[kClosed] = false
socket.on('close', onSocketClose)

return {
version: 'h2',
defaultPipelining: Infinity,
write (...args) {
return writeH2(client, ...args)
write (request) {
return writeH2(client, request)
},
resume () {
resumeH2(client)
},
destroy (err, callback) {
if (closed) {
if (socket[kClosed]) {
queueMicrotask(callback)
} else {
// Destroying the socket will trigger the session close
Expand Down Expand Up @@ -223,16 +179,19 @@ function onHttp2SessionEnd () {
* This is the root cause of #3011
* We need to handle GOAWAY frames properly, and trigger the session close
* along with the socket right away
*
* @this {import('http2').ClientHttp2Session}
* @param {number} errorCode
*/
function onHTTP2GoAway (code) {
function onHttp2SessionGoAway (errorCode) {
// We cannot recover, so best to close the session and the socket
const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${code}`, util.getSocketInfo(this))
const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(this[kSocket]))
const client = this[kClient]

client[kSocket] = null
client[kHTTPContext] = null

if (this[kHTTP2Session] != null) {
if (this[kHTTP2Session] !== null) {
this[kHTTP2Session].destroy(err)
this[kHTTP2Session] = null
}
Expand All @@ -253,6 +212,62 @@ function onHTTP2GoAway (code) {
client[kResume]()
}

function onHttp2SessionClose () {
const { [kClient]: client } = this
const { [kSocket]: socket } = client

const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))

client[kHTTP2Session] = null

if (client.destroyed) {
assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
util.errorRequest(client, request, err)
}
}
}

function onHttp2SocketClose () {
const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

const client = this[kHTTP2Session][kClient]

client[kSocket] = null

if (this[kHTTP2Session] !== null) {
this[kHTTP2Session].destroy(err)
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
}

function onHttp2SocketError (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

this[kError] = err

this[kClient][kOnError](err)
}

function onHttp2SocketEnd () {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
}

function onSocketClose () {
this[kClosed] = true
}

// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
function shouldSendContentLength (method) {
return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
Expand Down

0 comments on commit e8c3aba

Please sign in to comment.