Skip to content

Commit

Permalink
Merge branch 'master' into parser-execute
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag authored Feb 27, 2021
2 parents 2361b8e + 3d002c1 commit 6920db3
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 139 deletions.
5 changes: 4 additions & 1 deletion lib/client-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ class RequestHandler extends AsyncResource {

if (res) {
this.res = null
util.destroy(res, err)
// Ensure all queued handlers are invoked before destroying res.
util.queueMicrotask(() => {
util.destroy(res, err)
})
}

if (body) {
Expand Down
199 changes: 108 additions & 91 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,19 @@ class Parser extends HTTPParser {
return
}

const { queue, socket } = this

this.paused = false

this.resuming = true
while (this.queue.length) {
const [fn, ...args] = this.queue.shift()
while (queue.length) {
if (socket.destroyed) {
return
}

const [key, ...args] = queue.shift()

Reflect.apply(fn, this, args)
this[key](...args)

if (this.paused) {
this.resuming = false
Expand Down Expand Up @@ -430,7 +436,7 @@ class Parser extends HTTPParser {
[HTTPParser.kOnHeaders] (rawHeaders) {
/* istanbul ignore next: difficult to make a test case for */
if (this.paused) {
this.queue.push([this[HTTPParser.kOnHeaders], rawHeaders])
this.queue.push([HTTPParser.kOnHeaders, rawHeaders])
return
}

Expand All @@ -441,108 +447,86 @@ class Parser extends HTTPParser {
}
}

[HTTPParser.kOnExecute] (ret, currentBuffer) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnExecute], ret])
return
}
onUpgrade (head) {
const { socket } = this
const { client, headers, statusCode, request } = this

const { upgrade, socket } = this
assert(!socket.destroyed)
assert(socket === client[kSocket])
assert(!socket.isPaused())
assert(socket._handle && socket._handle.reading)
assert(request.upgrade)

if (!Number.isFinite(ret)) {
assert(ret instanceof Error)
util.destroy(socket, ret)
return
}
this.headers = null
this.statusCode = null
this.request = null

// This logic cannot live in kOnHeadersComplete since we
// have no way of slicing the parsing buffer without knowing
// the offset which is only provided in kOnExecute.
if (upgrade && !socket.destroyed) {
const { client, headers, statusCode, request } = this
// _readableState.flowing might be `true` if the socket has been
// explicitly `resume()`:d even if we never registered a 'data'
// listener.

assert(!socket.destroyed)
assert(socket === client[kSocket])
assert(!socket.isPaused())
assert(socket._handle && socket._handle.reading)
assert(request.upgrade)
// We need to stop unshift from emitting 'data'. However, we cannot
// call pause() as that will stop socket from automatically resuming
// when 'data' listener is registered.

this.headers = null
this.statusCode = null
this.request = null
// Reset socket state to non flowing:
socket._readableState.flowing = null
socket.unshift(head)

// _readableState.flowing might be `true` if the socket has been
// explicitly `resume()`:d even if we never registered a 'data'
// listener.
try {
if (!request.aborted) {
detachSocket(socket)
client[kSocket] = null

// We need to stop unshift from emitting 'data'. However, we cannot
// call pause() as that will stop socket from automatically resuming
// when 'data' listener is registered.
client[kQueue][client[kRunningIdx]++] = null

// Reset socket state to non flowing:
socket._readableState.flowing = null
client.emit('disconnect', client, new InformationalError('upgrade'))

socket.unshift(currentBuffer.slice(ret))
request.onUpgrade(statusCode, headers, socket)
}
} catch (err) {
util.destroy(socket, err)
}

try {
if (!socket.destroyed && !request.aborted) {
detachSocket(socket)
client[kSocket] = null
resume(client)
}

client[kQueue][client[kRunningIdx]++] = null
[HTTPParser.kOnExecute] (ret, currentBuffer) {
const { upgrade, socket } = this

client.emit('disconnect', client, new InformationalError('upgrade'))
if (socket.destroyed) {
return
}

request.onUpgrade(statusCode, headers, socket)
}
if (!Number.isFinite(ret)) {
assert(ret instanceof Error)
util.destroy(socket, ret)
return
}

resume(client)
} catch (err) {
util.destroy(socket, err)
// This logic cannot live in kOnHeadersComplete since we
// have no way of slicing the parsing buffer without knowing
// the offset which is only provided in kOnExecute.
if (upgrade) {
const head = currentBuffer.slice(ret)
if (this.paused) {
this.queue.push(['onUpgrade', head])
} else {
this.onUpgrade(head)
}
}
}

[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
/* istanbul ignore next: difficult to make a test case for */
if (this.paused) {
this.queue.push([this[HTTPParser.kOnHeadersComplete], versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive])
return
}

onHeadersComplete (rawHeaders, statusCode, shouldKeepAlive) {
const { client, socket } = this

const request = client[kQueue][client[kRunningIdx]]

/* istanbul ignore next: difficult to make a test case for */
if (socket.destroyed) {
return
}

clearTimeout(this.timeout)
this.timeout = client[kBodyTimeout]
? setTimeout(onBodyTimeout, client[kBodyTimeout], this)
: null

// TODO: Check for content-length mismatch from server?

assert(!this.upgrade)
assert(this.statusCode < 200)

// TODO: More statusCode validation?

if (statusCode === 100) {
util.destroy(socket, new SocketError('bad response'))
return 1
}

if (request.upgrade !== true && upgrade !== Boolean(request.upgrade)) {
util.destroy(socket, new SocketError('bad upgrade'))
return 1
}

if (this.headers) {
Array.prototype.push.apply(this.headers, rawHeaders)
} else {
Expand All @@ -555,7 +539,7 @@ class Parser extends HTTPParser {

if (request.upgrade) {
this.upgrade = true
return 2
return
}

let keepAlive
Expand Down Expand Up @@ -604,24 +588,58 @@ class Parser extends HTTPParser {
}
} catch (err) {
util.destroy(socket, err)
}
}

[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
const { client, socket } = this

const request = client[kQueue][client[kRunningIdx]]

/* istanbul ignore next: difficult to make a test case for */
if (socket.destroyed) {
return
}

// TODO: Check for content-length mismatch from server?

assert(!this.upgrade)
assert(this.statusCode < 200)

// TODO: More statusCode validation?

if (statusCode === 100) {
util.destroy(socket, new SocketError('bad response'))
return 1
}

return request.method === 'HEAD' || statusCode < 200 ? 1 : 0
}
if (request.upgrade !== true && upgrade !== Boolean(request.upgrade)) {
util.destroy(socket, new SocketError('bad upgrade'))
return 1
}

[HTTPParser.kOnBody] (chunk, offset, length) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnBody], chunk, offset, length])
return
this.queue.push(['onHeadersComplete', rawHeaders, statusCode, shouldKeepAlive])
} else {
this.onHeadersComplete(rawHeaders, statusCode, shouldKeepAlive)
}

return request.upgrade ? 2 : request.method === 'HEAD' || statusCode < 200 ? 1 : 0
}

[HTTPParser.kOnBody] (chunk, offset, length) {
const { socket, statusCode, request, timeout } = this

if (socket.destroyed) {
return
}

if (this.paused) {
this.queue.push([HTTPParser.kOnBody, chunk, offset, length])
return
}

if (timeout && timeout.refresh) {
timeout.refresh()
}
Expand All @@ -638,18 +656,17 @@ class Parser extends HTTPParser {
}

[HTTPParser.kOnMessageComplete] () {
/* istanbul ignore next: difficult to make a test case for */
if (this.paused) {
this.queue.push([this[HTTPParser.kOnMessageComplete]])
return
}

const { client, socket, statusCode, headers, upgrade, request, trailers } = this

if (socket.destroyed) {
return
}

if (this.paused) {
this.queue.push([HTTPParser.kOnMessageComplete])
return
}

assert(statusCode >= 100)
assert(this.resuming || (socket._handle && socket._handle.reading))

Expand Down
4 changes: 1 addition & 3 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ class Request {
return
}
this.aborted = true

// Ensure all queued handlers are invoked before calling onError.
util.queueMicrotask(() => this[kHandler].onError(err))
return this[kHandler].onError(err)
}
}

Expand Down
43 changes: 0 additions & 43 deletions test/client-dispatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const { test } = require('tap')
const { Client, Pool, errors } = require('..')
const http = require('http')
const { PassThrough } = require('stream')

test('dispatch invalid opts', (t) => {
t.plan(3)
Expand Down Expand Up @@ -598,45 +597,3 @@ test('dispatch pool onError missing', (t) => {
}
})
})

test('ensure promise callback runs before onError', t => {
t.plan(2)

const server = http.createServer((req, res) => {
res.end()
})
t.tearDown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))

const stream = new PassThrough()
new Promise(resolve => client.dispatch({
path: '/',
method: 'POST',
body: Buffer.alloc(1e6)
}, {
onConnect () {

},
onHeaders () {
resolve()
},
onData () {

},
onComplete () {
throw new Error()
},
onError (err) {
t.ok(err)
stream.destroy(err)
}
})).then(() => {
stream.on('error', (err) => {
t.ok(err)
})
})
})
})
2 changes: 1 addition & 1 deletion test/client-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ test('request abort before headers', (t) => {
signal
}, (err) => {
t.ok(err instanceof errors.RequestAbortedError)
t.strictEqual(signal.listenerCount('abort'), 1)
t.strictEqual(signal.listenerCount('abort'), 0)
})
t.strictEqual(signal.listenerCount('abort'), 2)
})
Expand Down

0 comments on commit 6920db3

Please sign in to comment.