Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(H2): onboard H2 into Undici queueing system #3707

Merged
merged 5 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 56 additions & 19 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ const {
kOnError,
kMaxConcurrentStreams,
kHTTP2Session,
kResume
kResume,
kSize,
kHTTPContext
} = require('../core/symbols.js')

const kOpenStreams = Symbol('open streams')
Expand Down Expand Up @@ -160,11 +162,10 @@ async function connectH2 (client, socket) {
version: 'h2',
defaultPipelining: Infinity,
write (...args) {
// TODO (fix): return
writeH2(client, ...args)
return writeH2(client, ...args)
},
resume () {

resumeH2(client)
},
destroy (err, callback) {
if (closed) {
Expand All @@ -183,6 +184,20 @@ async function connectH2 (client, socket) {
}
}

function resumeH2 (client) {
const socket = client[kSocket]

if (socket?.destroyed === false) {
if (client[kSize] === 0 && client[kMaxConcurrentStreams] === 0) {
socket.unref()
client[kHTTP2Session].unref()
} else {
socket.ref()
client[kHTTP2Session].ref()
}
}
}

function onHttp2SessionError (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

Expand Down Expand Up @@ -210,17 +225,32 @@ function onHttp2SessionEnd () {
* along with the socket right away
*/
function onHTTP2GoAway (code) {
const err = new RequestAbortedError(`HTTP/2: "GOAWAY" frame received with code ${code}`)
// We cannot recover, so best to close the session and the socket
const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${code}`, util.getSocketInfo(this))
const client = this[kClient]

// We need to trigger the close cycle right away
// We need to destroy the session and the socket
// Requests should be failed with the error after the current one is handled
this[kSocket][kError] = err
this[kClient][kOnError](err)
client[kSocket] = null
client[kHTTPContext] = null

this.unref()
if (this[kHTTP2Session] != null) {
this[kHTTP2Session].destroy(err)
this[kHTTP2Session] = null
}

util.destroy(this[kSocket], err)

// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null
util.errorRequest(client, request, err)

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
}

// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
Expand All @@ -237,10 +267,6 @@ function writeH2 (client, request) {
return false
}

if (request.aborted) {
return false
}

const headers = {}
for (let n = 0; n < reqHeaders.length; n += 2) {
const key = reqHeaders[n + 0]
Expand Down Expand Up @@ -283,6 +309,8 @@ function writeH2 (client, request) {
// We do not destroy the socket as we can continue using the session
// the stream gets destroyed and the session remains to create new streams
util.destroy(body, err)
client[kQueue][client[kRunningIdx]++] = null
client[kResume]()
}

try {
Expand All @@ -293,6 +321,10 @@ function writeH2 (client, request) {
util.errorRequest(client, request, err)
}

if (request.aborted) {
return false
}

if (method === 'CONNECT') {
session.ref()
// We are already connected, streams are pending, first request
Expand All @@ -304,10 +336,12 @@ function writeH2 (client, request) {
if (stream.id && !stream.pending) {
request.onUpgrade(null, null, stream)
++session[kOpenStreams]
client[kQueue][client[kRunningIdx]++] = null
} else {
stream.once('ready', () => {
request.onUpgrade(null, null, stream)
++session[kOpenStreams]
client[kQueue][client[kRunningIdx]++] = null
})
}

Expand Down Expand Up @@ -428,17 +462,20 @@ function writeH2 (client, request) {
// Present specially when using pipeline or stream
if (stream.state?.state == null || stream.state.state < 6) {
request.onComplete([])
return
}

// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side
if (session[kOpenStreams] === 0) {
// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side

session.unref()
}

abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
client[kQueue][client[kRunningIdx]++] = null
client[kPendingIdx] = client[kRunningIdx]
client[kResume]()
})

stream.once('close', () => {
Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
"test:eventsource": "npm run build:node && borp --expose-gc -p \"test/eventsource/*.js\"",
"test:fuzzing": "node test/fuzzing/fuzzing.test.js",
"test:fetch": "npm run build:node && borp --timeout 180000 --expose-gc --concurrency 1 -p \"test/fetch/*.js\" && npm run test:webidl && npm run test:busboy",
"test:h2": "npm run test:h2:core && npm run test:h2:fetch",
"test:h2:core": "borp -p \"test/http2*.js\"",
"test:h2:fetch": "npm run build:node && borp -p \"test/fetch/http2*.js\"",
"test:interceptors": "borp -p \"test/interceptors/*.js\"",
"test:jest": "cross-env NODE_V8_COVERAGE= jest",
"test:unit": "borp --expose-gc -p \"test/*.js\"",
Expand Down
Loading
Loading