diff --git a/lib/client.js b/lib/client.js index 688df9e6156..21806fd670c 100644 --- a/lib/client.js +++ b/lib/client.js @@ -6,6 +6,7 @@ const assert = require('assert') const net = require('net') +const http2 = require('http2') const util = require('./core/util') const timers = require('./timers') const Request = require('./core/request') @@ -67,7 +68,10 @@ const { kDispatch, kInterceptors, kLocalAddress, - kMaxResponseSize + kMaxResponseSize, + // HTTP2 + kHost, + kHTTP2Session, } = require('./core/symbols') const FastBuffer = Buffer[Symbol.species] @@ -241,6 +245,10 @@ class Client extends DispatcherBase { this[kClosedResolve] = null this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1 + // HTTP/2 + this[kHTTP2Session] = null + this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}` + // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. // | complete | running | pending | @@ -356,6 +364,15 @@ class Client extends DispatcherBase { } } + +function onHttp2SessionError (err) { + assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') + + this[kError] = err + + onError(this[kClient], err) +} + const constants = require('./llhttp/constants') const createRedirectInterceptor = require('./interceptor/redirectInterceptor') const EMPTY_BUF = Buffer.alloc(0) @@ -995,14 +1012,16 @@ function onSocketEnd () { function onSocketClose () { const { [kClient]: client } = this - if (!this[kError] && this[kParser].statusCode && !this[kParser].shouldKeepAlive) { - // We treat all incoming data so far as a valid response. - this[kParser].onMessageComplete() + if (!this[kHTTP2Session]) { + if (!this[kError] && this[kParser].statusCode && !this[kParser].shouldKeepAlive) { + // We treat all incoming data so far as a valid response. + this[kParser].onMessageComplete() + } + + this[kParser].destroy() + this[kParser] = null } - this[kParser].destroy() - this[kParser] = null - const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) client[kSocket] = null @@ -1089,29 +1108,43 @@ async function connect (client) { return } - if (!llhttpInstance) { - llhttpInstance = await llhttpPromise - llhttpPromise = null - } - client[kConnecting] = false assert(socket) - socket[kNoRef] = false - socket[kWriting] = false - socket[kReset] = false - socket[kBlocking] = false - socket[kError] = null - socket[kParser] = new Parser(client, socket, llhttpInstance) - socket[kClient] = client - socket[kCounter] = 0 - socket[kMaxRequests] = client[kMaxRequests] - socket - .on('error', onSocketError) - .on('readable', onSocketReadable) - .on('end', onSocketEnd) - .on('close', onSocketClose) + if (socket.alpnProtocol === 'h2') { + const session = http2.connect(client[kUrl], { + createConnection: () => socket + }) + + session[kError] = null + session[kClient] = client + session.on('error', onHttp2SessionError) + session.on('close', onSocketClose) + session.unref() + + client[kHTTP2Session] = session + } else { + if (!llhttpInstance) { + llhttpInstance = await llhttpPromise + llhttpPromise = null + } + + socket[kNoRef] = false + socket[kWriting] = false + socket[kReset] = false + socket[kBlocking] = false + socket[kError] = null + socket[kParser] = new Parser(client, socket, llhttpInstance) + socket[kClient] = client + socket[kCounter] = 0 + socket[kMaxRequests] = client[kMaxRequests] + socket + .on('error', onSocketError) + .on('readable', onSocketReadable) + .on('end', onSocketEnd) + .on('close', onSocketClose) + } client[kSocket] = socket @@ -1205,7 +1238,7 @@ function _resume (client, sync) { const socket = client[kSocket] - if (socket && !socket.destroyed) { + if (socket && !socket.destroyed && socket.alpnProtocol !== 'h2') { if (client[kSize] === 0) { if (!socket[kNoRef] && socket.unref) { socket.unref() @@ -1270,12 +1303,14 @@ function _resume (client, sync) { return } - if (!socket) { + if (!socket && !client[kHTTP2Session]) { connect(client) return } - if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) { + if ((socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) || + (client[kHTTP2Session] && client[kHTTP2Session].destroyed)) { + // TODO(HTTP/2): what if exceeds max concurrent streams or can't accept new return } @@ -1331,6 +1366,12 @@ function _resume (client, sync) { } function write (client, request) { + if (client[kHTTP2Session]) { + console.log('http/2') + writeH2(client, client[kHTTP2Session], request) + return + } + const { body, method, path, host, upgrade, headers, blocking, reset } = request // https://tools.ietf.org/html/rfc7231#section-4.3.1 @@ -1486,6 +1527,155 @@ function write (client, request) { return true } +function writeH2 (client, session, request) { + // TODO(HTTP/2): upgrade is not supported in HTTP/2 + const { body, method, path, host, upgrade } = request + + // https://tools.ietf.org/html/rfc7231#section-4.3.1 + // https://tools.ietf.org/html/rfc7231#section-4.3.2 + // https://tools.ietf.org/html/rfc7231#section-4.3.5 + + // Sending a payload body on a request that does not + // expect it can cause undefined behavior on some + // servers and corrupt connection state. Do not + // re-use the connection for further requests. + + + const expectsPayload = ( + method === 'PUT' || + method === 'POST' || + method === 'PATCH' + ) + + if (body && typeof body.read === 'function') { + // Try to read EOF in order to get length. + body.read(0) + } + + let contentLength = util.bodyLength(body) + + if (contentLength == null) { + contentLength = request.contentLength + } + + if (contentLength === 0 || !expectsPayload) { + // https://tools.ietf.org/html/rfc7230#section-3.3.2 + // A user agent SHOULD NOT send a Content-Length header field when + // the request message does not contain a payload body and the method + // semantics do not anticipate such a body. + + contentLength = null + } + + if (request.contentLength != null && request.contentLength !== contentLength) { + if (client[kStrictContentLength]) { + errorRequest(client, request, new RequestContentLengthMismatchError()) + return false + } + + process.emitWarning(new RequestContentLengthMismatchError()) + } + + try { + // TODO(HTTP/2): Should we call onConnect immediately or on stream ready event? + request.onConnect((err) => { + if (request.aborted || request.completed) { + return + } + + errorRequest(client, request, err || new RequestAbortedError()) + }) + } catch (err) { + errorRequest(client, request, err) + } + + if (request.aborted) { + return false + } + + const headers = { ...request.headers } + headers[':authority'] = host || client[kHost] + headers[':path'] = path + + // TODO(HTTP/2): Expect: 100-continue + + /* istanbul ignore else: assertion */ + if (!body) { + if (contentLength === 0) { + headers['content-length'] = '0' + } else { + assert(contentLength == null, 'no body must not have content length') + } + } else if (util.isBuffer(body)) { + assert(contentLength === body.byteLength, 'buffer body must have content length') + + headers['content-length'] = String(contentLength) + + process.nextTick(() => { + stream.end(body) + request.onBodySent(body) + }) + } else if (util.isBlob(body)) { + process.nextTick(() => { + writeBlob({ client, request, stream, contentLength, expectsPayload }) + }) + } else if (util.isStream(body)) { + process.nextTick(() => { + writeStream({ client, request, stream, contentLength, expectsPayload }) + }) + } else if (util.isIterable(body)) { + process.nextTick(() => { + writeIterable({ client, request, stream, contentLength, expectsPayload }) + }) + } else { + assert(false) + } + + console.log('http/2 request') + // TODO(HTTP/2): ref only if current streams count is 0 + session.ref() + // TODO(HTTP/2): The handler expects an array but the native http2 module provides us with an object. What should we do? + const stream = session.request(headers) + + stream.on('response', headers => { + if (request.onHeaders(Number(headers[':status']), headers, stream.resume.bind(stream), '') === false) { + stream.pause() + } + }) + + stream.on('data', chunk => { + if (request.onData(chunk) === false) { + stream.pause() + } + }) + + stream.on('trailers', headers => { + // TODO(HTTP/2): Suppor trailers + }) + stream.on('end', () => { + request.onComplete([]) + }) + + stream.on('aborted', () => { + // TODO(HTTP/2): Support aborted + }) + + stream.on('ready', () => { + // TODO(HTTP/2): Support ready + }) + + stream.on('timeout', () => { + // TODO(HTTP/2): Support timeout + }) + + stream.on('close', () => { + // TODO(HTTP/2): unref only if current streams count is 0 + session.unref() + }) + + return true +} + function writeStream ({ body, client, request, socket, contentLength, header, expectsPayload }) { assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined') diff --git a/lib/core/connect.js b/lib/core/connect.js index f3b5cc33edd..b63786bae7f 100644 --- a/lib/core/connect.js +++ b/lib/core/connect.js @@ -99,6 +99,8 @@ function buildConnector ({ maxCachedSessions, socketPath, timeout, ...opts }) { servername, session, localAddress, + // TODO(HTTP/2): Should we support h2c? + ALPNProtocols: ['h2', 'http/1.1'], socket: httpSocket, // upgrade socket connection port: port || 443, host: hostname diff --git a/lib/core/symbols.js b/lib/core/symbols.js index c852107a72a..ae96d4efa97 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -51,5 +51,7 @@ module.exports = { kProxy: Symbol('proxy agent options'), kCounter: Symbol('socket request counter'), kInterceptors: Symbol('dispatch interceptors'), - kMaxResponseSize: Symbol('max response size') + kMaxResponseSize: Symbol('max response size'), + kHost: Symbol('host'), + kHTTP2Session: Symbol('http2Session') }