From b92ff9b51040fe4189f1bceff7904a2ec13cef78 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 21 Nov 2017 17:09:32 -0800 Subject: [PATCH 1/9] http2: fixup js debug messages --- lib/internal/http2/core.js | 130 +++++++++++++++---------------------- 1 file changed, 52 insertions(+), 78 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index de20eb23f2c660..72558f8defcd66 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -150,8 +150,8 @@ function onSessionHeaders(handle, id, cat, flags, headers) { const owner = this[kOwner]; const type = owner[kType]; _unrefActive(owner); - debug(`[${sessionName(type)}] headers were received on ` + - `stream ${id}: ${cat}`); + debug(`Http2Stream ${id} [Http2Session ` + + `${sessionName(type)}]: headers received`); const streams = owner[kState].streams; const endOfStream = !!(flags & NGHTTP2_FLAG_END_STREAM); @@ -196,7 +196,8 @@ function onSessionHeaders(handle, id, cat, flags, headers) { } else { event = endOfStream ? 'trailers' : 'headers'; } - debug(`[${sessionName(type)}] emitting stream '${event}' event`); + debug(`Http2Stream ${id} [Http2Session ` + + `${sessionName(type)}]: emitting stream '${event}' event`); process.nextTick(emit, stream, event, obj, flags, headers); } if (endOfStream) { @@ -272,7 +273,7 @@ function onStreamRead(nread, buf, handle) { // Resets the cached settings. function onSettings(ack) { const owner = this[kOwner]; - debug(`[${sessionName(owner[kType])}] new settings received`); + debug(`Http2Session ${sessionName(owner[kType])}: new settings received`); _unrefActive(owner); let event = 'remoteSettings'; if (ack) { @@ -293,9 +294,9 @@ function onSettings(ack) { // session (which may, in turn, forward it on to the server) function onPriority(id, parent, weight, exclusive) { const owner = this[kOwner]; - debug(`[${sessionName(owner[kType])}] priority advisement for stream ` + - `${id}: \n parent: ${parent},\n weight: ${weight},\n` + - ` exclusive: ${exclusive}`); + debug(`Http2Stream ${id} [Http2Session ` + + `${sessionName(owner[kType])}]: priority [parent: ${parent}, ` + + `weight: ${weight}, exclusive: ${exclusive}]`); _unrefActive(owner); const streams = owner[kState].streams; const stream = streams.get(id); @@ -315,7 +316,7 @@ function emitFrameError(self, id, type, code) { // frame. This should be exceedingly rare. function onFrameError(id, type, code) { const owner = this[kOwner]; - debug(`[${sessionName(owner[kType])}] error sending frame type ` + + debug(`Http2Session ${sessionName(owner[kType])}: error sending frame type ` + `${type} on stream ${id}, code: ${code}`); _unrefActive(owner); const streams = owner[kState].streams; @@ -340,7 +341,8 @@ function emitGoaway(self, code, lastStreamID, buf) { // Called by the native layer when a goaway frame has been received function onGoawayData(code, lastStreamID, buf) { const owner = this[kOwner]; - debug(`[${sessionName(owner[kType])}] goaway data received`); + debug(`Http2Session ${sessionName(owner[kType])}: goaway ${code} received ` + + `[last stream id: ${lastStreamID}]`); process.nextTick(emitGoaway, owner, code, lastStreamID, buf); } @@ -350,7 +352,6 @@ function onGoawayData(code, lastStreamID, buf) { // frameLen <= n <= maxPayloadLen. function onSelectPadding(fn) { return function getPadding() { - debug('fetching padding for frame'); const frameLen = paddingBuffer[PADDING_BUF_FRAME_LENGTH]; const maxFramePayloadLen = paddingBuffer[PADDING_BUF_MAX_PAYLOAD_LENGTH]; paddingBuffer[PADDING_BUF_RETURN_VALUE] = @@ -366,7 +367,8 @@ function onSelectPadding(fn) { // will be deferred until the socket is ready to go. function requestOnConnect(headers, options) { const session = this[kSession]; - debug(`[${sessionName(session[kType])}] connected.. initializing request`); + debug(`HttpSession ${sessionName(session[kType])}: connected, ` + + 'initializing request'); const streams = session[kState].streams; validatePriorityOptions(options); @@ -480,7 +482,7 @@ function onSessionInternalError(code) { // of the socket. No other code should read from or write to the socket. function setupHandle(session, socket, type, options) { return function() { - debug(`[${sessionName(type)}] setting up session handle`); + debug(`Http2Session ${sessionName(type)}: setting up session handle`); session[kState].connecting = false; updateOptionsBuffer(options); @@ -515,12 +517,11 @@ function setupHandle(session, socket, type, options) { // Submits a SETTINGS frame to be sent to the remote peer. function submitSettings(settings) { const type = this[kType]; - debug(`[${sessionName(type)}] submitting actual settings`); + debug(`Http2Session ${sessionName(type)}: submitting settings`); _unrefActive(this); this[kLocalSettings] = undefined; updateSettingsBuffer(settings); this[kHandle].settings(); - debug(`[${sessionName(type)}] settings complete`); } // Submits a PRIORITY frame to be sent to the remote peer @@ -561,19 +562,18 @@ function doShutdown(options) { state.shuttingDown = false; state.shutdown = true; if (ret < 0) { - debug(`[${sessionName(this[kType])}] shutdown failed! code: ${ret}`); + debug(`Http2Session ${sessionName(this[kType])}: shutdown failed`); const err = new NghttpError(ret); process.nextTick(emit, this, 'error', err); return; } process.nextTick(emit, this, 'shutdown', options); - debug(`[${sessionName(this[kType])}] shutdown is complete`); } // Submit a graceful or immediate shutdown request for the Http2Session. function submitShutdown(options) { const type = this[kType]; - debug(`[${sessionName(type)}] submitting actual shutdown request`); + debug(`Http2Session ${sessionName(type)}: submitting shutdown request`); if (type === NGHTTP2_SESSION_SERVER && options.graceful === true) { // first send a shutdown notice this[kHandle].shutdownNotice(); @@ -712,7 +712,7 @@ class Http2Session extends EventEmitter { // of concurrent streams (2^31-1 is the upper limit on the number // of streams) this.setMaxListeners(kMaxStreams); - debug(`[${sessionName(type)}] http2session created`); + debug(`Http2Session ${sessionName(type)}: created`); } setNextStreamID(id) { @@ -854,12 +854,10 @@ class Http2Session extends EventEmitter { throw new errors.Error('ERR_HTTP2_MAX_PENDING_SETTINGS_ACK', this[kState].pendingAck); } - debug(`[${sessionName(this[kType])}] sending settings`); + debug(`Http2Session ${sessionName(this[kType])}: sending settings`); state.pendingAck++; if (state.connecting) { - debug(`[${sessionName(this[kType])}] session still connecting, ` + - 'queue settings'); this.once('connect', submitSettings.bind(this, settings)); return; } @@ -871,7 +869,7 @@ class Http2Session extends EventEmitter { const state = this[kState]; if (state.destroyed || state.destroying) return; - debug(`[${sessionName(this[kType])}] destroying nghttp2session`); + debug(`Http2Session ${sessionName(this[kType])}: destroying`); state.destroying = true; state.destroyed = false; @@ -942,7 +940,7 @@ class Http2Session extends EventEmitter { options.lastStreamID); } - debug(`[${sessionName(type)}] initiating shutdown`); + debug(`Http2Session ${sessionName(type)}: initiating shutdown`); state.shuttingDown = true; if (callback) { @@ -950,13 +948,11 @@ class Http2Session extends EventEmitter { } if (state.connecting) { - debug(`[${sessionName(type)}] session still connecting, queue ` + - 'shutdown'); this.once('connect', submitShutdown.bind(this, options)); return; } - debug(`[${sessionName(type)}] sending shutdown`); + debug(`Http2Session ${sessionName(type)}: sending shutdown`); submitShutdown.call(this, options); } @@ -995,7 +991,6 @@ class ServerHttp2Session extends Http2Session { class ClientHttp2Session extends Http2Session { constructor(options, socket) { super(NGHTTP2_SESSION_CLIENT, options, socket); - debug(`[${sessionName(this[kType])}] clienthttp2session created`); } // Submits a new HTTP2 request to the connected peer. Returns the @@ -1004,7 +999,7 @@ class ClientHttp2Session extends Http2Session { const state = this[kState]; if (state.destroyed || state.destroying) throw new errors.Error('ERR_HTTP2_INVALID_SESSION'); - debug(`[${sessionName(this[kType])}] initiating request`); + debug(`HttpSession ${sessionName(this[kType])}: initiating request`); _unrefActive(this); assertIsObject(headers, 'headers'); assertIsObject(options, 'options'); @@ -1055,19 +1050,14 @@ class ClientHttp2Session extends Http2Session { const stream = new ClientHttp2Stream(this, undefined, undefined, {}); - const onConnect = requestOnConnect.bind(stream, headers, options); - // Close the writable side of the stream if options.endStream is set. if (options.endStream) stream.end(); + const onConnect = requestOnConnect.bind(stream, headers, options); if (state.connecting) { - debug(`[${sessionName(this[kType])}] session still connecting, queue ` + - 'stream init'); stream.on('connect', onConnect); } else { - debug(`[${sessionName(this[kType])}] session connected, immediate ` + - 'stream init'); onConnect(); } return stream; @@ -1166,15 +1156,15 @@ function handleFlushData(handle) { function streamOnSessionConnect() { const session = this[kSession]; - debug(`[${sessionName(session[kType])}] session connected. emiting stream ` + - 'connect'); + debug(`Http2Session ${sessionName(session[kType])}: session connected`); this[kState].connecting = false; process.nextTick(emit, this, 'connect'); } function streamOnceReady() { const session = this[kSession]; - debug(`[${sessionName(session[kType])}] stream ${this[kID]} is ready`); + debug(`Http2Stream ${this[kID]} [Http2Session ` + + `${sessionName(session[kType])}]: is ready`); this.uncork(); } @@ -1215,12 +1205,9 @@ class Http2Stream extends Duplex { session.once('close', state.closeHandler); if (session[kState].connecting) { - debug(`[${sessionName(session[kType])}] session is still connecting, ` + - 'queuing stream init'); state.connecting = true; session.once('connect', streamOnSessionConnect.bind(this)); } - debug(`[${sessionName(session[kType])}] http2stream created`); } [kInit](id, handle) { @@ -1433,12 +1420,11 @@ class Http2Stream extends Duplex { throw new errors.Error('ERR_HTTP2_INVALID_STREAM'); const session = this[kSession]; if (this[kID] === undefined) { - debug(`[${sessionName(session[kType])}] queuing priority for new stream`); this.once('ready', this.priority.bind(this, options)); return; } - debug(`[${sessionName(session[kType])}] sending priority for stream ` + - `${this[kID]}`); + debug(`Http2Stream ${this[kID]} [Http2Session ` + + `${sessionName(session[kType])}]: sending priority`); _unrefActive(this); assertIsObject(options, 'options'); @@ -1446,8 +1432,6 @@ class Http2Stream extends Duplex { validatePriorityOptions(options); const id = this[kID]; - debug(`[${sessionName(session[kType])}] sending priority for stream ` + - `${id}`); // A stream cannot be made to depend on itself if (options.parent === id) { @@ -1472,11 +1456,13 @@ class Http2Stream extends Duplex { _destroy(err, callback) { const session = this[kSession]; if (this[kID] === undefined) { - debug(`[${sessionName(session[kType])}] queuing destroy for new stream`); this.once('ready', this._destroy.bind(this, err, callback)); return; } + debug(`Http2Stream ${this[kID]} [Http2Session ` + + `${sessionName(session[kType])}]: destroying stream`); + const state = this[kState]; session[kState].writeQueueSize -= state.writeQueueSize; state.writeQueueSize = 0; @@ -1494,8 +1480,6 @@ function continueStreamDestroy(err, callback) { const session = this[kSession]; const state = this[kState]; - debug(`[${sessionName(session[kType])}] destroying stream ${this[kID]}`); - // Submit RST-STREAM frame if one hasn't been sent already and the // stream hasn't closed normally... const rst = state.rst; @@ -1521,7 +1505,6 @@ function continueStreamDestroy(err, callback) { } callback(err); process.nextTick(emit, this, 'streamClosed', code); - debug(`[${sessionName(session[kType])}] stream ${this[kID]} destroyed`); } function finishStreamDestroy() { @@ -1696,7 +1679,8 @@ function afterOpen(session, options, headers, streamOptions, err, fd) { function streamOnError(err) { // we swallow the error for parity with HTTP1 // all the errors that ends here are not critical for the project - debug('ServerHttp2Stream errored, avoiding uncaughtException', err); + debug(`Http2Stream ${this[kID]} [Http2Session ` + + `${this[kSession][kType]}: error`, err); } @@ -1707,7 +1691,6 @@ class ServerHttp2Stream extends Http2Stream { this[kProtocol] = headers[HTTP2_HEADER_SCHEME]; this[kAuthority] = headers[HTTP2_HEADER_AUTHORITY]; this.on('error', streamOnError); - debug(`[${sessionName(session[kType])}] created serverhttp2stream`); } // true if the HEADERS frame has been sent @@ -1730,8 +1713,8 @@ class ServerHttp2Stream extends Http2Stream { if (!session.remoteSettings.enablePush) throw new errors.Error('ERR_HTTP2_PUSH_DISABLED'); - debug(`[${sessionName(session[kType])}] initiating push stream for stream` + - ` ${this[kID]}`); + debug(`Http2Stream ${this[kID]} [Http2Session ` + + `${sessionName(session[kType])}]: initiating push stream`); _unrefActive(this); const state = session[kState]; @@ -1794,7 +1777,6 @@ class ServerHttp2Stream extends Http2Stream { } const id = ret.id(); - debug(`[${sessionName(session[kType])}] push stream ${id} created`); options.readable = !options.endStream; const stream = new ServerHttp2Stream(session, ret, id, options, headers); @@ -1816,8 +1798,8 @@ class ServerHttp2Stream extends Http2Stream { const session = this[kSession]; if (this.destroyed) throw new errors.Error('ERR_HTTP2_INVALID_STREAM'); - debug(`[${sessionName(session[kType])}] initiating response for stream ` + - `${this[kID]}`); + debug(`Http2Stream ${this[kID]} [Http2Session ` + + `${sessionName(session[kType])}]: initiating response`); _unrefActive(this); const state = this[kState]; @@ -1883,8 +1865,8 @@ class ServerHttp2Stream extends Http2Stream { const session = this[kSession]; if (this.destroyed) throw new errors.Error('ERR_HTTP2_INVALID_STREAM'); - debug(`[${sessionName(session[kType])}] initiating response for stream ` + - `${this[kID]}`); + debug(`Http2Stream ${this[kID]} [Http2Session ` + + `${sessionName(session[kType])}]: initiating response`); _unrefActive(this); const state = this[kState]; @@ -1966,8 +1948,8 @@ class ServerHttp2Stream extends Http2Stream { const session = this[kSession]; if (this.destroyed) throw new errors.Error('ERR_HTTP2_INVALID_STREAM'); - debug(`[${sessionName(session[kType])}] initiating response for stream ` + - `${this[kID]}`); + debug(`Http2Stream ${this[kID]} [Http2Session ` + + `${sessionName(session[kType])}]: initiating response`); _unrefActive(this); const state = this[kState]; @@ -2033,7 +2015,8 @@ class ServerHttp2Stream extends Http2Stream { throw new errors.Error('ERR_HTTP2_HEADERS_AFTER_RESPOND'); const session = this[kSession]; - debug(`[${sessionName(session[kType])}] sending additional headers`); + debug(`Http2Stream ${this[kID]} [Http2Session ` + + `${sessionName(session[kType])}]: sending additional headers`); assertIsObject(headers, 'headers'); headers = Object.assign(Object.create(null), headers); @@ -2072,7 +2055,6 @@ class ClientHttp2Stream extends Http2Stream { if (id !== undefined) this[kInit](id, handle); this.on('headers', handleHeaderContinue); - debug(`[${sessionName(session[kType])}] clienthttp2stream created`); } } @@ -2121,13 +2103,12 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout); function socketDestroy(error) { const session = this[kSession]; const type = session[kType]; - debug(`[${sessionName(type)}] socket destroy called`); + debug(`Http2Session ${sessionName(type)}: socket destroy called`); delete this[kServer]; // destroy the session first so that it will stop trying to // send data while we close the socket. session.destroy(); this.destroy = this[kDestroySocket]; - debug(`[${sessionName(type)}] destroying the socket`); this.destroy(error); } @@ -2136,7 +2117,8 @@ function socketDestroy(error) { // a sessionError; failing that, destroy, remove the error listener, and // re-emit the error event function sessionOnError(error) { - debug(`[${sessionName(this[kType])}] server session error: ${error.message}`); + debug(`Http2Session ${sessionName(this[kType])}: session error: ` + + `${error.message}`); if (this[kServer] !== undefined && this[kServer].emit('sessionError', error)) return; if (this[kSocket] !== undefined && this[kSocket].emit('sessionError', error)) @@ -2151,7 +2133,7 @@ function sessionOnError(error) { function socketOnError(error) { const session = this[kSession]; const type = session && session[kType]; - debug(`[${sessionName(type)}] server socket error: ${error.message}`); + debug(`Http2Session ${sessionName(type)}: socket error: ${error.message}`); if (kRenegTest.test(error.message)) return this.destroy(); if (session !== undefined && @@ -2164,12 +2146,11 @@ function socketOnError(error) { // Handles the on('stream') event for a session and forwards // it on to the server object. function sessionOnStream(stream, headers, flags, rawHeaders) { - debug(`[${sessionName(this[kType])}] emit server stream event`); this[kServer].emit('stream', stream, headers, flags, rawHeaders); } function sessionOnPriority(stream, parent, weight, exclusive) { - debug(`[${sessionName(this[kType])}] priority change received`); + debug(`Http2Session ${sessionName(this[kType])}: priority change received`); this[kServer].emit('priority', stream, parent, weight, exclusive); } @@ -2180,7 +2161,6 @@ function sessionOnSocketError(error, socket) { // When the session times out on the server, attempt a graceful shutdown function sessionOnTimeout() { - debug('session timeout'); process.nextTick(() => { const state = this[kState]; // if destroyed or destryoing, do nothing @@ -2204,7 +2184,7 @@ function sessionOnTimeout() { } function connectionListener(socket) { - debug('[server] received a connection'); + debug('Http2Session server: received a connection'); const options = this[kOptions] || {}; if (socket.alpnProtocol === false || socket.alpnProtocol === 'http/1.1') { @@ -2273,7 +2253,6 @@ class Http2SecureServer extends TLSServer { if (typeof requestListener === 'function') this.on('request', requestListener); this.on('tlsClientError', onErrorSecureServerSession); - debug('http2secureserver created'); } setTimeout(msecs, callback) { @@ -2295,7 +2274,6 @@ class Http2Server extends NETServer { this.on('newListener', setupCompat); if (typeof requestListener === 'function') this.on('request', requestListener); - debug('http2server created'); } setTimeout(msecs, callback) { @@ -2311,7 +2289,6 @@ class Http2Server extends NETServer { function setupCompat(ev) { if (ev === 'request') { - debug('setting up compatibility handler'); this.removeListener('newListener', setupCompat); this.on('stream', onServerStream); } @@ -2329,7 +2306,8 @@ function socketOnClose() { // If the session emits an error, forward it to the socket as a sessionError; // failing that, destroy the session, remove the listener and re-emit the error function clientSessionOnError(error) { - debug(`[${sessionName(this[kType])}] session error: ${error.message}`); + debug(`Http2Session ${sessionName(this[kType])}]: session error: ` + + `${error.message}`); if (this[kSocket] !== undefined && this[kSocket].emit('sessionError', error)) return; this.destroy(); @@ -2351,8 +2329,6 @@ function connect(authority, options, listener) { assertIsObject(authority, 'authority', ['string', 'Object', 'URL']); - debug(`connecting to ${authority}`); - const protocol = authority.protocol || options.protocol || 'https:'; const port = '' + (authority.port !== '' ? authority.port : (authority.protocol === 'http:' ? 80 : 443)); @@ -2406,7 +2382,6 @@ function createSecureServer(options, handler) { 'options', 'Object'); } - debug('creating http2secureserver'); return new Http2SecureServer(options, handler); } @@ -2415,7 +2390,6 @@ function createServer(options, handler) { handler = options; options = Object.create(null); } - debug('creating htt2pserver'); return new Http2Server(options, handler); } From 2c13db4872e28c7e0d65e37975f8110fef85bc97 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 21 Nov 2017 17:14:35 -0800 Subject: [PATCH 2/9] http2: simplify and improve rstStream --- lib/internal/http2/core.js | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 72558f8defcd66..f836e65311e028 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -542,6 +542,12 @@ function submitPriority(options) { function submitRstStream(code) { _unrefActive(this); _unrefActive(this[kSession]); + + const state = this[kState]; + if (state.rst) return; + state.rst = true; + state.rstCode = code; + const ret = this[kHandle].rstStream(code); if (ret < 0) { const err = new NghttpError(ret); @@ -1369,30 +1375,15 @@ class Http2Stream extends Duplex { rstStream(code = NGHTTP2_NO_ERROR) { if (typeof code !== 'number') throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'code', 'number'); + if (code < 0 || code > 2 ** 32 - 1) + throw new errors.RangeError('ERR_OUT_OF_RANGE', 'code'); + const fn = submitRstStream.bind(this, code); if (this[kID] === undefined) { - this.once('ready', this.rstStream.bind(this, code)); - return; - } - - const state = this[kState]; - if (state.rst) { - // rst has already been set by self or peer, do not set again - return; - } - state.rst = true; - state.rstCode = code; - - _unrefActive(this); - _unrefActive(this[kSession]); - - const id = this[kID]; - - if (id === undefined) { - this.once('ready', submitRstStream.bind(this, code)); + this.once('ready', fn); return; } - submitRstStream.call(this, code); + fn(); } rstWithNoError() { From 97d488d51ca79831eb56acf652cc85db19604ba6 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 21 Nov 2017 20:04:53 -0800 Subject: [PATCH 3/9] http2: improve and simplify _read --- lib/internal/http2/core.js | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index f836e65311e028..b5eac528867b43 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1355,16 +1355,12 @@ class Http2Stream extends Duplex { } _read(nread) { - if (this[kID] === undefined) { - this.once('ready', this._read.bind(this, nread)); - return; - } if (this.destroyed) { this.push(null); return; } - _unrefActive(this); - process.nextTick(handleFlushData, this[kHandle]); + if (this[kHandle] !== undefined) + process.nextTick(handleFlushData, this[kHandle]); } // Submits an RST-STREAM frame to shutdown this stream. From 215a347c6c9996fff71f84bbcc06d7abe9f18be0 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 21 Nov 2017 20:20:19 -0800 Subject: [PATCH 4/9] http2: simplify and improve priority --- lib/internal/http2/core.js | 31 ++++------ .../test-http2-priority-parent-self.js | 57 ------------------- 2 files changed, 10 insertions(+), 78 deletions(-) delete mode 100644 test/parallel/test-http2-priority-parent-self.js diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index b5eac528867b43..d7f0f56e2ced7a 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -531,6 +531,11 @@ function submitPriority(options) { _unrefActive(this); _unrefActive(this[kSession]); + // If the parent is the id, do nothing because a + // stream cannot be made to depend on itself. + if (options.parent === this[kID]) + return; + this[kHandle].priority(options.parent | 0, options.weight | 0, !!options.exclusive, @@ -1405,33 +1410,17 @@ class Http2Stream extends Duplex { priority(options) { if (this.destroyed) throw new errors.Error('ERR_HTTP2_INVALID_STREAM'); - const session = this[kSession]; - if (this[kID] === undefined) { - this.once('ready', this.priority.bind(this, options)); - return; - } - debug(`Http2Stream ${this[kID]} [Http2Session ` + - `${sessionName(session[kType])}]: sending priority`); - _unrefActive(this); assertIsObject(options, 'options'); - options = Object.assign(Object.create(null), options); + options = Object.assign({}, options); validatePriorityOptions(options); - const id = this[kID]; - - // A stream cannot be made to depend on itself - if (options.parent === id) { - throw new errors.TypeError('ERR_INVALID_OPT_VALUE', - 'parent', - options.parent); - } - - if (id === undefined) { - this.once('ready', submitPriority.bind(this, options)); + const fn = submitPriority.bind(this, options); + if (this[kID] === undefined) { + this.once('ready', fn); return; } - submitPriority.call(this, options); + fn(); } // Called by this.destroy(). diff --git a/test/parallel/test-http2-priority-parent-self.js b/test/parallel/test-http2-priority-parent-self.js deleted file mode 100644 index 55a161bf17fed2..00000000000000 --- a/test/parallel/test-http2-priority-parent-self.js +++ /dev/null @@ -1,57 +0,0 @@ -'use strict'; - -const common = require('../common'); -if (!common.hasCrypto) - common.skip('missing crypto'); -const h2 = require('http2'); - -const server = h2.createServer(); -const invalidOptValueError = (value) => ({ - type: TypeError, - code: 'ERR_INVALID_OPT_VALUE', - message: `The value "${value}" is invalid for option "parent"` -}); - -// we use the lower-level API here -server.on('stream', common.mustCall((stream) => { - common.expectsError( - () => stream.priority({ - parent: stream.id, - weight: 1, - exclusive: false - }), - invalidOptValueError(stream.id) - ); - stream.respond({ - 'content-type': 'text/html', - ':status': 200 - }); - stream.end('hello world'); -})); - -server.listen(0, common.mustCall(() => { - - const client = h2.connect(`http://localhost:${server.address().port}`); - const req = client.request({ ':path': '/' }); - - req.on( - 'ready', - () => common.expectsError( - () => req.priority({ - parent: req.id, - weight: 1, - exclusive: false - }), - invalidOptValueError(req.id) - ) - ); - - req.on('response', common.mustCall()); - req.resume(); - req.on('end', common.mustCall(() => { - server.close(); - client.destroy(); - })); - req.end(); - -})); From d22b61c7def12121a9d0a9a4a5e41ae1014f19b6 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 21 Nov 2017 20:42:03 -0800 Subject: [PATCH 5/9] http2: simplify on ready a bit --- lib/internal/http2/core.js | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index d7f0f56e2ced7a..d2d4c8f4bc59aa 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1172,13 +1172,6 @@ function streamOnSessionConnect() { process.nextTick(emit, this, 'connect'); } -function streamOnceReady() { - const session = this[kSession]; - debug(`Http2Stream ${this[kID]} [Http2Session ` + - `${sessionName(session[kType])}]: is ready`); - this.uncork(); -} - function abort(stream) { if (!stream[kState].aborted && !(stream._writableState.ended || stream._writableState.ending)) { @@ -1208,7 +1201,6 @@ class Http2Stream extends Duplex { writeQueueSize: 0 }; - this.once('ready', streamOnceReady); this.once('streamClosed', onStreamClosed); this.once('finish', onHandleFinish); this.on('resume', streamOnResume); @@ -1229,6 +1221,7 @@ class Http2Stream extends Duplex { handle.ontrailers = onStreamTrailers; handle.onstreamclose = onStreamClose; handle.onread = onStreamRead; + this.uncork(); this.emit('ready'); } From f35a21a678e2e48028576fa5c206370c525e15b0 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 21 Nov 2017 21:03:18 -0800 Subject: [PATCH 6/9] http2: simplify and improve respond/push --- lib/internal/http2/core.js | 46 ++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index d2d4c8f4bc59aa..ee5ebfc83d665b 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1011,12 +1011,16 @@ class ClientHttp2Session extends Http2Session { if (state.destroyed || state.destroying) throw new errors.Error('ERR_HTTP2_INVALID_SESSION'); debug(`HttpSession ${sessionName(this[kType])}: initiating request`); + _unrefActive(this); + if (this[kSession]) + _unrefActive(this[kSession]); + assertIsObject(headers, 'headers'); assertIsObject(options, 'options'); headers = Object.assign(Object.create(null), headers); - options = Object.assign(Object.create(null), options); + options = Object.assign({}, options); if (headers[HTTP2_HEADER_METHOD] === undefined) headers[HTTP2_HEADER_METHOD] = HTTP2_METHOD_GET; @@ -1686,8 +1690,7 @@ class ServerHttp2Stream extends Http2Stream { `${sessionName(session[kType])}]: initiating push stream`); _unrefActive(this); - const state = session[kState]; - const streams = state.streams; + _unrefActive(this[kSession]); if (typeof options === 'function') { callback = options; @@ -1698,7 +1701,7 @@ class ServerHttp2Stream extends Http2Stream { throw new errors.TypeError('ERR_INVALID_CALLBACK'); assertIsObject(options, 'options'); - options = Object.assign(Object.create(null), options); + options = Object.assign({}, options); options.endStream = !!options.endStream; assertIsObject(headers, 'headers'); @@ -1714,16 +1717,13 @@ class ServerHttp2Stream extends Http2Stream { headers[HTTP2_HEADER_PATH] = '/'; let headRequest = false; - if (headers[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD) { - headRequest = true; - options.endStream = true; - } + if (headers[HTTP2_HEADER_METHOD] === HTTP2_METHOD_HEAD) + headRequest = options.endStream = true; + options.readable = !options.endStream; const headersList = mapToHeaders(headers); - if (!Array.isArray(headersList)) { - // An error occurred! + if (!Array.isArray(headersList)) throw headersList; - } const streamOptions = options.endStream ? STREAM_OPTION_EMPTY_PAYLOAD : 0; @@ -1746,18 +1746,14 @@ class ServerHttp2Stream extends Http2Stream { } const id = ret.id(); - options.readable = !options.endStream; - const stream = new ServerHttp2Stream(session, ret, id, options, headers); - streams.set(id, stream); + session[kState].streams.set(id, stream); - // If the push stream is a head request, close the writable side of - // the stream immediately as there won't be any data sent. - if (headRequest) { + if (options.endStream) stream.end(); - const state = stream[kState]; - state.headRequest = true; - } + + if (headRequest) + stream[kState].headRequest = true; process.nextTick(callback, stream, headers, 0); } @@ -1770,13 +1766,14 @@ class ServerHttp2Stream extends Http2Stream { debug(`Http2Stream ${this[kID]} [Http2Session ` + `${sessionName(session[kType])}]: initiating response`); _unrefActive(this); + _unrefActive(this[kSession]); const state = this[kState]; if (state.headersSent) throw new errors.Error('ERR_HTTP2_HEADERS_SENT'); assertIsObject(options, 'options'); - options = Object.assign(Object.create(null), options); + options = Object.assign({}, options); options.endStream = !!options.endStream; let streamOptions = 0; @@ -1807,10 +1804,9 @@ class ServerHttp2Stream extends Http2Stream { } const headersList = mapToHeaders(headers, assertValidPseudoHeaderResponse); - if (!Array.isArray(headersList)) { - // An error occurred! + if (!Array.isArray(headersList)) throw headersList; - } + state.headersSent = true; // Close the writable side if the endStream option is set @@ -1837,6 +1833,7 @@ class ServerHttp2Stream extends Http2Stream { debug(`Http2Stream ${this[kID]} [Http2Session ` + `${sessionName(session[kType])}]: initiating response`); _unrefActive(this); + _unrefActive(this[kSession]); const state = this[kState]; if (state.headersSent) @@ -1920,6 +1917,7 @@ class ServerHttp2Stream extends Http2Stream { debug(`Http2Stream ${this[kID]} [Http2Session ` + `${sessionName(session[kType])}]: initiating response`); _unrefActive(this); + _unrefActive(this[kSession]); const state = this[kState]; if (state.headersSent) From 365209980da3c759b1424a6889a9cafed600d552 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 21 Nov 2017 21:31:31 -0800 Subject: [PATCH 7/9] http2: reduce duplication with _unrefActive --- lib/internal/http2/core.js | 72 ++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index ee5ebfc83d665b..69b6b318e083c7 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -79,6 +79,7 @@ const kServer = Symbol('server'); const kSession = Symbol('session'); const kState = Symbol('state'); const kType = Symbol('type'); +const kUpdateTimer = Symbol('update-timer'); const kDefaultSocketTimeout = 2 * 60 * 1000; const kRenegTest = /TLS session renegotiation disabled for this socket/; @@ -149,7 +150,7 @@ function emit(self, ...args) { function onSessionHeaders(handle, id, cat, flags, headers) { const owner = this[kOwner]; const type = owner[kType]; - _unrefActive(owner); + owner[kUpdateTimer](); debug(`Http2Stream ${id} [Http2Session ` + `${sessionName(type)}]: headers received`); const streams = owner[kState].streams; @@ -229,8 +230,7 @@ function onStreamTrailers() { // Readable and Writable sides of the Duplex. function onStreamClose(code) { const stream = this[kOwner]; - _unrefActive(stream); - _unrefActive(stream[kSession]); + stream[kUpdateTimer](); abort(stream); const state = stream[kState]; state.rst = true; @@ -249,7 +249,7 @@ function afterFDClose(err) { // Called when an error event needs to be triggered function onSessionError(error) { const owner = this[kOwner]; - _unrefActive(owner); + owner[kUpdateTimer](); process.nextTick(emit, owner, 'error', error); } @@ -257,8 +257,7 @@ function onSessionError(error) { // to the Http2Stream Duplex for processing. function onStreamRead(nread, buf, handle) { const stream = handle[kOwner]; - _unrefActive(stream); - _unrefActive(stream[kSession]); + stream[kUpdateTimer](); if (nread >= 0 && !stream.destroyed) { if (!stream.push(buf)) { handle.readStop(); @@ -274,7 +273,7 @@ function onStreamRead(nread, buf, handle) { function onSettings(ack) { const owner = this[kOwner]; debug(`Http2Session ${sessionName(owner[kType])}: new settings received`); - _unrefActive(owner); + owner[kUpdateTimer](); let event = 'remoteSettings'; if (ack) { if (owner[kState].pendingAck > 0) @@ -297,7 +296,7 @@ function onPriority(id, parent, weight, exclusive) { debug(`Http2Stream ${id} [Http2Session ` + `${sessionName(owner[kType])}]: priority [parent: ${parent}, ` + `weight: ${weight}, exclusive: ${exclusive}]`); - _unrefActive(owner); + owner[kUpdateTimer](); const streams = owner[kState].streams; const stream = streams.get(id); const emitter = stream === undefined ? owner : stream; @@ -318,7 +317,7 @@ function onFrameError(id, type, code) { const owner = this[kOwner]; debug(`Http2Session ${sessionName(owner[kType])}: error sending frame type ` + `${type} on stream ${id}, code: ${code}`); - _unrefActive(owner); + owner[kUpdateTimer](); const streams = owner[kState].streams; const stream = streams.get(id); const emitter = stream !== undefined ? stream : owner; @@ -518,7 +517,7 @@ function setupHandle(session, socket, type, options) { function submitSettings(settings) { const type = this[kType]; debug(`Http2Session ${sessionName(type)}: submitting settings`); - _unrefActive(this); + this[kUpdateTimer](); this[kLocalSettings] = undefined; updateSettingsBuffer(settings); this[kHandle].settings(); @@ -528,8 +527,7 @@ function submitSettings(settings) { // Note: If the silent option is true, the change will be made // locally with no PRIORITY frame sent. function submitPriority(options) { - _unrefActive(this); - _unrefActive(this[kSession]); + this[kUpdateTimer](); // If the parent is the id, do nothing because a // stream cannot be made to depend on itself. @@ -545,8 +543,7 @@ function submitPriority(options) { // Submit an RST-STREAM frame to be sent to the remote peer. // This will cause the Http2Stream to be closed. function submitRstStream(code) { - _unrefActive(this); - _unrefActive(this[kSession]); + this[kUpdateTimer](); const state = this[kState]; if (state.rst) return; @@ -726,6 +723,10 @@ class Http2Session extends EventEmitter { debug(`Http2Session ${sessionName(type)}: created`); } + [kUpdateTimer]() { + _unrefActive(this); + } + setNextStreamID(id) { if (typeof id !== 'number') throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'id', 'number'); @@ -979,7 +980,7 @@ class Http2Session extends EventEmitter { handle.chunksSentSinceLastWrite : null; if (chunksSentSinceLastWrite !== null && chunksSentSinceLastWrite !== handle.updateChunksSent()) { - _unrefActive(this); + this[kUpdateTimer](); return; } } @@ -1012,9 +1013,7 @@ class ClientHttp2Session extends Http2Session { throw new errors.Error('ERR_HTTP2_INVALID_SESSION'); debug(`HttpSession ${sessionName(this[kType])}: initiating request`); - _unrefActive(this); - if (this[kSession]) - _unrefActive(this[kSession]); + this[kUpdateTimer](); assertIsObject(headers, 'headers'); assertIsObject(options, 'options'); @@ -1112,15 +1111,13 @@ function afterDoStreamWrite(status, handle, req) { const stream = handle[kOwner]; const session = stream[kSession]; - _unrefActive(stream); + stream[kUpdateTimer](); const { bytes } = req; stream[kState].writeQueueSize -= bytes; - if (session !== undefined) { - _unrefActive(session); + if (session !== undefined) session[kState].writeQueueSize -= bytes; - } if (typeof req.callback === 'function') req.callback(); @@ -1217,6 +1214,12 @@ class Http2Stream extends Duplex { } } + [kUpdateTimer]() { + _unrefActive(this); + if (this[kSession]) + _unrefActive(this[kSession]); + } + [kInit](id, handle) { this[kID] = id; this[async_id_symbol] = handle.getAsyncId(); @@ -1262,8 +1265,7 @@ class Http2Stream extends Duplex { handle.chunksSentSinceLastWrite : null; if (chunksSentSinceLastWrite !== null && chunksSentSinceLastWrite !== handle.updateChunksSent()) { - _unrefActive(this); - _unrefActive(this[kSession]); + this[kUpdateTimer](); return; } } @@ -1306,8 +1308,7 @@ class Http2Stream extends Duplex { return; } - _unrefActive(this); - _unrefActive(this[kSession]); + this[kUpdateTimer](); if (!this[kState].headersSent) this[kProceed](); @@ -1331,8 +1332,7 @@ class Http2Stream extends Duplex { return; } - _unrefActive(this); - _unrefActive(this[kSession]); + this[kUpdateTimer](); if (!this[kState].headersSent) this[kProceed](); @@ -1689,8 +1689,7 @@ class ServerHttp2Stream extends Http2Stream { debug(`Http2Stream ${this[kID]} [Http2Session ` + `${sessionName(session[kType])}]: initiating push stream`); - _unrefActive(this); - _unrefActive(this[kSession]); + this[kUpdateTimer](); if (typeof options === 'function') { callback = options; @@ -1765,8 +1764,7 @@ class ServerHttp2Stream extends Http2Stream { throw new errors.Error('ERR_HTTP2_INVALID_STREAM'); debug(`Http2Stream ${this[kID]} [Http2Session ` + `${sessionName(session[kType])}]: initiating response`); - _unrefActive(this); - _unrefActive(this[kSession]); + this[kUpdateTimer](); const state = this[kState]; if (state.headersSent) @@ -1832,8 +1830,7 @@ class ServerHttp2Stream extends Http2Stream { throw new errors.Error('ERR_HTTP2_INVALID_STREAM'); debug(`Http2Stream ${this[kID]} [Http2Session ` + `${sessionName(session[kType])}]: initiating response`); - _unrefActive(this); - _unrefActive(this[kSession]); + this[kUpdateTimer](); const state = this[kState]; if (state.headersSent) @@ -1916,8 +1913,7 @@ class ServerHttp2Stream extends Http2Stream { throw new errors.Error('ERR_HTTP2_INVALID_STREAM'); debug(`Http2Stream ${this[kID]} [Http2Session ` + `${sessionName(session[kType])}]: initiating response`); - _unrefActive(this); - _unrefActive(this[kSession]); + this[kUpdateTimer](); const state = this[kState]; if (state.headersSent) @@ -1997,7 +1993,7 @@ class ServerHttp2Stream extends Http2Stream { } } - _unrefActive(this); + this[kUpdateTimer](); const headersList = mapToHeaders(headers, assertValidPseudoHeaderResponse); @@ -2050,7 +2046,7 @@ const setTimeout = { } } else { enroll(this, msecs); - _unrefActive(this); + this[kUpdateTimer](); if (callback !== undefined) { if (typeof callback !== 'function') throw new errors.TypeError('ERR_INVALID_CALLBACK'); From a9f60afa122a5c561c8f264a03d8e8ee2bde91f3 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 21 Nov 2017 21:39:40 -0800 Subject: [PATCH 8/9] http2: simplify stream close handling --- lib/internal/http2/core.js | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 69b6b318e083c7..31947309a050d2 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1144,12 +1144,6 @@ function onSessionClose(hadError, code) { this.end(); // Close the writable side } -function onStreamClosed(code) { - abort(this); - this.push(null); // Close the readable side - this.end(); // Close the writable side -} - function streamOnResume() { if (this[kID] === undefined) { this.once('ready', streamOnResume); @@ -1202,7 +1196,6 @@ class Http2Stream extends Duplex { writeQueueSize: 0 }; - this.once('streamClosed', onStreamClosed); this.once('finish', onHandleFinish); this.on('resume', streamOnResume); this.on('pause', streamOnPause); @@ -1477,6 +1470,9 @@ function continueStreamDestroy(err, callback) { err = new errors.Error('ERR_HTTP2_STREAM_ERROR', code); } callback(err); + abort(this); + this.push(null); // Close the readable side + this.end(); // Close the writable side process.nextTick(emit, this, 'streamClosed', code); } From 46827dbbd53a01d3e42ba8bc9e0fa2dfe25b6501 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 22 Nov 2017 09:58:31 -0800 Subject: [PATCH 9/9] [Squash] Fix nits --- lib/internal/http2/core.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 31947309a050d2..39f67c482382b2 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -366,7 +366,7 @@ function onSelectPadding(fn) { // will be deferred until the socket is ready to go. function requestOnConnect(headers, options) { const session = this[kSession]; - debug(`HttpSession ${sessionName(session[kType])}: connected, ` + + debug(`Http2Session ${sessionName(session[kType])}: connected, ` + 'initializing request'); const streams = session[kState].streams; @@ -1011,7 +1011,7 @@ class ClientHttp2Session extends Http2Session { const state = this[kState]; if (state.destroyed || state.destroying) throw new errors.Error('ERR_HTTP2_INVALID_SESSION'); - debug(`HttpSession ${sessionName(this[kType])}: initiating request`); + debug(`Http2Session ${sessionName(this[kType])}: initiating request`); this[kUpdateTimer]();