diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index ca8ce650af2..02aefd418a0 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -78,42 +78,86 @@ async function lazyllhttp () { return await WebAssembly.instantiate(mod, { env: { - + /** + * @param {number} p + * @param {number} at + * @param {number} len + * @returns {number} + */ wasm_on_url: (p, at, len) => { /* istanbul ignore next */ return 0 }, + /** + * @param {number} p + * @param {number} at + * @param {number} len + * @returns {number} + */ wasm_on_status: (p, at, len) => { assert(currentParser.ptr === p) const start = at - currentBufferPtr + currentBufferRef.byteOffset - return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 + return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) }, + /** + * @param {number} p + * @returns {number} + */ wasm_on_message_begin: (p) => { assert(currentParser.ptr === p) - return currentParser.onMessageBegin() || 0 + return currentParser.onMessageBegin() }, + /** + * @param {number} p + * @param {number} at + * @param {number} len + * @returns {number} + */ wasm_on_header_field: (p, at, len) => { assert(currentParser.ptr === p) const start = at - currentBufferPtr + currentBufferRef.byteOffset - return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 + return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) }, + /** + * @param {number} p + * @param {number} at + * @param {number} len + * @returns {number} + */ wasm_on_header_value: (p, at, len) => { assert(currentParser.ptr === p) const start = at - currentBufferPtr + currentBufferRef.byteOffset - return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 + return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) }, + /** + * @param {number} p + * @param {number} statusCode + * @param {0|1} upgrade + * @param {0|1} shouldKeepAlive + * @returns {number} + */ wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => { assert(currentParser.ptr === p) - return currentParser.onHeadersComplete(statusCode, Boolean(upgrade), Boolean(shouldKeepAlive)) || 0 + return currentParser.onHeadersComplete(statusCode, upgrade === 1, shouldKeepAlive === 1) }, + /** + * @param {number} p + * @param {number} at + * @param {number} len + * @returns {number} + */ wasm_on_body: (p, at, len) => { assert(currentParser.ptr === p) const start = at - currentBufferPtr + currentBufferRef.byteOffset - return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 + return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) }, + /** + * @param {number} p + * @returns {number} + */ wasm_on_message_complete: (p) => { assert(currentParser.ptr === p) - return currentParser.onMessageComplete() || 0 + return currentParser.onMessageComplete() } } @@ -121,11 +165,20 @@ async function lazyllhttp () { } let llhttpInstance = null +/** + * @type {Promise|null} + */ let llhttpPromise = lazyllhttp() llhttpPromise.catch() +/** + * @type {Parser|null} + */ let currentParser = null let currentBufferRef = null +/** + * @type {number} + */ let currentBufferSize = 0 let currentBufferPtr = null @@ -134,17 +187,25 @@ const TIMEOUT_BODY = 2 const TIMEOUT_IDLE = 3 class Parser { + /** + * @param {import('./client.js')} client + * @param {import('net').Socket} socket + * @param {*} llhttp + */ constructor (client, socket, { exports }) { assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0) this.llhttp = exports this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE) this.client = client + /** + * @type {import('net').Socket} + */ this.socket = socket this.timeout = null this.timeoutValue = null this.timeoutType = null - this.statusCode = null + this.statusCode = 0 this.statusText = '' this.upgrade = false this.headers = [] @@ -301,10 +362,18 @@ class Parser { this.paused = false } + /** + * @param {Buffer} buf + * @returns {0} + */ onStatus (buf) { this.statusText = buf.toString() + return 0 } + /** + * @returns {0|-1} + */ onMessageBegin () { const { socket, client } = this @@ -318,8 +387,14 @@ class Parser { return -1 } request.onResponseStarted() + + return 0 } + /** + * @param {Buffer} buf + * @returns {number} + */ onHeaderField (buf) { const len = this.headers.length @@ -330,8 +405,14 @@ class Parser { } this.trackHeader(buf.length) + + return 0 } + /** + * @param {Buffer} buf + * @returns {number} + */ onHeaderValue (buf) { let len = this.headers.length @@ -355,8 +436,13 @@ class Parser { } this.trackHeader(buf.length) + + return 0 } + /** + * @param {number} len + */ trackHeader (len) { this.headersSize += len if (this.headersSize >= this.headersMaxSize) { @@ -364,6 +450,9 @@ class Parser { } } + /** + * @param {Buffer} head + */ onUpgrade (head) { const { upgrade, client, socket, headers, statusCode } = this @@ -377,9 +466,9 @@ class Parser { assert(request) assert(request.upgrade || request.method === 'CONNECT') - this.statusCode = null + this.statusCode = 0 this.statusText = '' - this.shouldKeepAlive = null + this.shouldKeepAlive = false this.headers = [] this.headersSize = 0 @@ -408,6 +497,12 @@ class Parser { client[kResume]() } + /** + * @param {number} statusCode + * @param {boolean} upgrade + * @param {boolean} shouldKeepAlive + * @returns {number} + */ onHeadersComplete (statusCode, upgrade, shouldKeepAlive) { const { client, socket, headers, statusText } = this @@ -517,6 +612,10 @@ class Parser { return pause ? constants.ERROR.PAUSED : 0 } + /** + * @param {Buffer} buf + * @returns {number} + */ onBody (buf) { const { client, socket, statusCode, maxResponseSize } = this @@ -547,8 +646,13 @@ class Parser { if (request.onData(buf) === false) { return constants.ERROR.PAUSED } + + return 0 } + /** + * @returns {number} + */ onMessageComplete () { const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this @@ -557,7 +661,7 @@ class Parser { } if (upgrade) { - return + return 0 } assert(statusCode >= 100) @@ -566,7 +670,7 @@ class Parser { const request = client[kQueue][client[kRunningIdx]] assert(request) - this.statusCode = null + this.statusCode = 0 this.statusText = '' this.bytesRead = 0 this.contentLength = '' @@ -577,7 +681,7 @@ class Parser { this.headersSize = 0 if (statusCode < 200) { - return + return 0 } /* istanbul ignore next: should be handled by llhttp? */ @@ -613,6 +717,8 @@ class Parser { } else { client[kResume]() } + + return 0 } } @@ -635,6 +741,11 @@ function onParserTimeout (parser) { } } +/** + * @param {import ('./client.js')} client + * @param {import('net').Socket} socket + * @returns + */ async function connectH1 (client, socket) { client[kSocket] = socket @@ -667,11 +778,7 @@ async function connectH1 (client, socket) { this[kClient][kOnError](err) }) addListener(socket, 'readable', function () { - const parser = this[kParser] - - if (parser) { - parser.readMore() - } + this[kParser]?.readMore() }) addListener(socket, 'end', function () { const parser = this[kParser] @@ -685,7 +792,6 @@ async function connectH1 (client, socket) { util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) }) addListener(socket, 'close', function () { - const client = this[kClient] const parser = this[kParser] if (parser) { @@ -700,6 +806,8 @@ async function connectH1 (client, socket) { const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) + const client = this[kClient] + client[kSocket] = null client[kHTTPContext] = null // TODO (fix): This is hacky... @@ -737,22 +845,34 @@ async function connectH1 (client, socket) { return { version: 'h1', defaultPipelining: 1, - write (...args) { - return writeH1(client, ...args) + write (request) { + return writeH1(client, request) }, resume () { resumeH1(client) }, + /** + * @param {Error|undefined} err + * @param {() => void} callback + */ destroy (err, callback) { if (closed) { queueMicrotask(callback) } else { - socket.destroy(err).on('close', callback) + socket.on('close', callback) + socket.destroy(err) } }, + /** + * @returns {boolean} + */ get destroyed () { return socket.destroyed }, + /** + * @param {import('../core/request.js')} request + * @returns {boolean} + */ busy (request) { if (socket[kWriting] || socket[kReset] || socket[kBlocking]) { return true @@ -792,6 +912,9 @@ async function connectH1 (client, socket) { } } +/** + * @param {import('./client.js')} client + */ function resumeH1 (client) { const socket = client[kSocket] @@ -827,6 +950,11 @@ function shouldSendContentLength (method) { return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT' } +/** + * @param {import('./client.js')} client + * @param {import('../core/request.js')} request + * @returns + */ function writeH1 (client, request) { const { method, path, host, upgrade, blocking, reset } = request @@ -897,6 +1025,10 @@ function writeH1 (client, request) { const socket = client[kSocket] + /** + * @param {Error} [err] + * @returns {void} + */ const abort = (err) => { if (request.aborted || request.completed) { return @@ -1002,6 +1134,16 @@ function writeH1 (client, request) { return true } +/** + * @param {AbortCallback} abort + * @param {import('stream').Stream} body + * @param {import('./client.js')} client + * @param {import('../core/request.js')} request + * @param {import('net').Socket} socket + * @param {number} contentLength + * @param {string} header + * @param {boolean} expectsPayload + */ function writeStream (abort, body, client, request, socket, contentLength, header, expectsPayload) { assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined') @@ -1009,6 +1151,10 @@ function writeStream (abort, body, client, request, socket, contentLength, heade const writer = new AsyncWriter({ abort, socket, request, contentLength, client, expectsPayload, header }) + /** + * @param {Buffer} chunk + * @returns {void} + */ const onData = function (chunk) { if (finished) { return @@ -1022,6 +1168,10 @@ function writeStream (abort, body, client, request, socket, contentLength, heade util.destroy(this, err) } } + + /** + * @returns {void} + */ const onDrain = function () { if (finished) { return @@ -1031,6 +1181,10 @@ function writeStream (abort, body, client, request, socket, contentLength, heade body.resume() } } + + /** + * @returns {void} + */ const onClose = function () { // 'close' might be emitted *before* 'error' for // broken streams. Wait a tick to avoid this case. @@ -1045,6 +1199,11 @@ function writeStream (abort, body, client, request, socket, contentLength, heade queueMicrotask(() => onFinished(err)) } } + + /** + * @param {Error} [err] + * @returns + */ const onFinished = function (err) { if (finished) { return @@ -1105,6 +1264,24 @@ function writeStream (abort, body, client, request, socket, contentLength, heade } } +/** + * @typedef AbortCallback + * @type {Function} + * @param {Error} [err] + * @returns {void} + */ + +/** + * @param {AbortCallback} abort + * @param {Uint8Array|null} body + * @param {import('./client.js')} client + * @param {import('../core/request.js')} request + * @param {import('net').Socket} socket + * @param {number} contentLength + * @param {string} header + * @param {boolean} expectsPayload + * @returns {void} + */ function writeBuffer (abort, body, client, request, socket, contentLength, header, expectsPayload) { try { if (!body) { @@ -1135,6 +1312,17 @@ function writeBuffer (abort, body, client, request, socket, contentLength, heade } } +/** + * @param {AbortCallback} abort + * @param {Blob} body + * @param {import('./client.js')} client + * @param {import('../core/request.js')} request + * @param {import('net').Socket} socket + * @param {number} contentLength + * @param {string} header + * @param {boolean} expectsPayload + * @returns {Promise} + */ async function writeBlob (abort, body, client, request, socket, contentLength, header, expectsPayload) { assert(contentLength === body.size, 'blob body must have content length') @@ -1163,6 +1351,17 @@ async function writeBlob (abort, body, client, request, socket, contentLength, h } } +/** + * @param {AbortCallback} abort + * @param {Iterable} body + * @param {import('./client.js')} client + * @param {import('../core/request.js')} request + * @param {import('net').Socket} socket + * @param {number} contentLength + * @param {string} header + * @param {boolean} expectsPayload + * @returns {Promise} + */ async function writeIterable (abort, body, client, request, socket, contentLength, header, expectsPayload) { assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined') @@ -1213,6 +1412,17 @@ async function writeIterable (abort, body, client, request, socket, contentLengt } class AsyncWriter { + /** + * + * @param {object} arg + * @param {AbortCallback} arg.abort + * @param {import('net').Socket} arg.socket + * @param {import('../core/request.js')} arg.request + * @param {number} arg.contentLength + * @param {import('./client.js')} arg.client + * @param {boolean} arg.expectsPayload + * @param {string} arg.header + */ constructor ({ abort, socket, request, contentLength, client, expectsPayload, header }) { this.socket = socket this.request = request @@ -1226,6 +1436,10 @@ class AsyncWriter { socket[kWriting] = true } + /** + * @param {Buffer} chunk + * @returns + */ write (chunk) { const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this @@ -1289,6 +1503,9 @@ class AsyncWriter { return ret } + /** + * @returns {void} + */ end () { const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this request.onRequestSent() @@ -1336,6 +1553,10 @@ class AsyncWriter { client[kResume]() } + /** + * @param {Error} [err] + * @returns {void} + */ destroy (err) { const { socket, client, abort } = this