From 0cef5cce1ad4b071819d343b677f1aa3451824c6 Mon Sep 17 00:00:00 2001 From: Oli Evans Date: Wed, 30 May 2018 23:17:18 +0100 Subject: [PATCH 1/2] fix: drop connection when stream ends unexpectedly Pull streams pass true in the error position when the sream ends. In https://github.com/multiformats/js-multistream-select/blob/5b19358b91850b528b3f93babd60d63ddcf56a99/src/select.js#L18-L21 ...we're getting lots of instances of pull-length-prefixed stream erroring early with `true` and it's passed back up to the dialer in https://github.com/libp2p/js-libp2p-switch/blob/fef2d11850379a4720bb9c736236a81a067dc901/src/dial.js#L238-L241 The `_createMuxedConnection` contains an assumption that any error that occurs when trying `_attemptMuxerUpgrade` is ok, and keeps the relveant baseConnecton in the cache. If the pull-stream has ended unexpectedly then keeping the connection arround starts causing the "already piped" errors when we try and use the it later. This PR adds a guard to avoid putting the connection back into the cache if the stream has ended. There is related work in an old PR to add a check for exactly this issue in pull-length-prefixed https://github.com/dignifiedquire/pull-length-prefixed/pull/8 ...but it's still open, so this PR adds a check for true in the error position at the site where the "already piped" errors were appearing. Once the PR on pull-length-prefixed is merged this check can be removed. It's not ideal to have it in this code as it is far removed from the source, but it fixes the issue for now. Arguably anywhere that `msDialer.handle` is called should do the same check, but we're not seeing this error occur anywhere else so to keep this PR small, I've left it as the minimal changeset to fix the issue. Of note, we had to add '/dns4/ws-star.discovery.libp2p.io/tcp/443/wss/p2p-websocket-star' to the swarm config to trigger the "already piped" errors. There is a minimal test app here https://github.com/tableflip/js-ipfs-already-piped-error Manual testing shows ~50 streams fail in the first 2 mins of running a node, and then things stabalise with ~90 active muxed connections after that. Fixes #235 Fixes https://github.com/ipfs/js-ipfs/issues/1366 See https://github.com/dignifiedquire/pull-length-prefixed/pull/8 License: MIT Signed-off-by: Oli Evans --- src/dial.js | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/dial.js b/src/dial.js index 116c628..0082f7b 100644 --- a/src/dial.js +++ b/src/dial.js @@ -159,12 +159,14 @@ class Dialer { } connection.setPeerInfo(this.peerInfo) - - waterfall([ - (cb) => { - this._attemptMuxerUpgrade(connection, b58Id, cb) + this._attemptMuxerUpgrade(connection, b58Id, (err, muxer) => { + // The underlying stream closed unexpectedly, so drop the connection. + // Fixes https://github.com/libp2p/js-libp2p-switch/issues/235 + if (err && err.message === 'Unexpected end of input from reader.') { + log('Connection dropped for %s', b58Id) + return callback(null, null) } - ], (err, muxer) => { + if (err && !this.protocol) { this.switch.conns[b58Id] = connection return callback(null, null) @@ -237,6 +239,11 @@ class Dialer { const msDialer = new multistream.Dialer() msDialer.handle(connection, (err) => { if (err) { + // Repackage errors from pull-streams ending unexpectedly. + // Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged. + if (err === true) { + return callback(new Error('Unexpected end of input from reader.')) + } return callback(new Error('multistream not supported')) } From d7428e5413eba58fa6e2e8287481dd6ace951547 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 31 May 2018 15:47:29 +0200 Subject: [PATCH 2/2] fix: add utility methods to prevent already piped error --- src/dial.js | 165 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 102 insertions(+), 63 deletions(-) diff --git a/src/dial.js b/src/dial.js index 0082f7b..46aa401 100644 --- a/src/dial.js +++ b/src/dial.js @@ -11,6 +11,53 @@ const log = debug('libp2p:switch:dial') const getPeerInfo = require('./get-peer-info') const observeConnection = require('./observe-connection') +const UNEXPECTED_END = 'Unexpected end of input from reader.' + +/** + * Uses the given MultistreamDialer to select the protocol matching the given key + * + * A helper method to catch errors from pull streams ending unexpectedly + * Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged. + * + * @param {MultistreamDialer} msDialer a multistream.Dialer + * @param {string} key The key type to select + * @param {function(Error)} callback Used for standard async flow + * @param {function(Error)} abort A callback to be used for ending the connection outright + * @returns {void} + */ +function selectSafe (msDialer, key, callback, abort) { + msDialer.select(key, (err, conn) => { + if (err === true) { + return abort(new Error(UNEXPECTED_END)) + } + + callback(err, conn) + }) +} + +/** + * Uses the given MultistreamDialer to handle the given connection + * + * A helper method to catch errors from pull streams ending unexpectedly + * Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged + * + * @param {MultistreamDialer} msDialer + * @param {Connection} connection The connection to handle + * @param {function(Error)} callback Used for standard async flow + * @param {function(Error)} abort A callback to be used for ending the connection outright + * @returns {void} + */ +function handleSafe (msDialer, connection, callback, abort) { + msDialer.handle(connection, (err) => { + // Repackage errors from pull-streams ending unexpectedly. + // Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged. + if (err === true) { + return abort(new Error(UNEXPECTED_END)) + } + + callback(err) + }) +} /** * Manages dialing to another peer, including muxer upgrades @@ -55,6 +102,11 @@ class Dialer { cb(null) } ], (err, connection) => { + if ((err && err.message === UNEXPECTED_END) || err === true) { + log('Connection dropped for %s', this.peerInfo.id.toB58String()) + return this.callback(null, null) + } + this.callback(err, connection) }) @@ -160,13 +212,6 @@ class Dialer { connection.setPeerInfo(this.peerInfo) this._attemptMuxerUpgrade(connection, b58Id, (err, muxer) => { - // The underlying stream closed unexpectedly, so drop the connection. - // Fixes https://github.com/libp2p/js-libp2p-switch/issues/235 - if (err && err.message === 'Unexpected end of input from reader.') { - log('Connection dropped for %s', b58Id) - return callback(null, null) - } - if (err && !this.protocol) { this.switch.conns[b58Id] = connection return callback(null, null) @@ -178,7 +223,7 @@ class Dialer { } callback(null, muxer) - }) + }, callback) } /** @@ -190,65 +235,62 @@ class Dialer { * @param {Connection} connection * @param {string} b58Id * @param {function(Error, Connection)} callback + * @param {function(Error, Connection)} abort A callback to be used for ending the connection outright * @returns {void} */ - _attemptMuxerUpgrade (connection, b58Id, callback) { + _attemptMuxerUpgrade (connection, b58Id, callback, abort) { const muxers = Object.keys(this.switch.muxers) + if (muxers.length === 0) { return callback(new Error('no muxers available')) } - // 1. try to handshake in one of the muxers available - // 2. if succeeds - // - add the muxedConn to the list of muxedConns - // - add incomming new streams to connHandler - const nextMuxer = (key) => { - log('selecting %s', key) - msDialer.select(key, (err, conn) => { - if (err) { - if (muxers.length === 0) { - return callback(new Error('could not upgrade to stream muxing')) - } + const msDialer = new multistream.Dialer() + handleSafe(msDialer, connection, (err) => { + if (err) { + return callback(new Error('multistream not supported')) + } - return nextMuxer(muxers.shift()) - } + // 1. try to handshake in one of the muxers available + // 2. if succeeds + // - add the muxedConn to the list of muxedConns + // - add incomming new streams to connHandler + const nextMuxer = (key) => { + log('selecting %s', key) + selectSafe(msDialer, key, (err, conn) => { + if (err) { + if (muxers.length === 0) { + return callback(new Error('could not upgrade to stream muxing')) + } - const muxedConn = this.switch.muxers[key].dialer(conn) - this.switch.muxedConns[b58Id] = {} - this.switch.muxedConns[b58Id].muxer = muxedConn + return nextMuxer(muxers.shift()) + } - muxedConn.once('close', () => { - delete this.switch.muxedConns[b58Id] - this.peerInfo.disconnect() - this.switch._peerInfo.disconnect() - setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo)) - }) + const muxedConn = this.switch.muxers[key].dialer(conn) + this.switch.muxedConns[b58Id] = {} + this.switch.muxedConns[b58Id].muxer = muxedConn - // For incoming streams, in case identify is on - muxedConn.on('stream', (conn) => { - conn.setPeerInfo(this.peerInfo) - this.switch.protocolMuxer(null)(conn) - }) + muxedConn.once('close', () => { + delete this.switch.muxedConns[b58Id] + this.peerInfo.disconnect() + this.switch._peerInfo.disconnect() + setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo)) + }) - setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo)) + // For incoming streams, in case identify is on + muxedConn.on('stream', (conn) => { + conn.setPeerInfo(this.peerInfo) + this.switch.protocolMuxer(null)(conn) + }) - callback(null, muxedConn) - }) - } + setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo)) - const msDialer = new multistream.Dialer() - msDialer.handle(connection, (err) => { - if (err) { - // Repackage errors from pull-streams ending unexpectedly. - // Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged. - if (err === true) { - return callback(new Error('Unexpected end of input from reader.')) - } - return callback(new Error('multistream not supported')) + callback(null, muxedConn) + }, abort) } nextMuxer(muxers.shift()) - }) + }, abort) } /** @@ -314,8 +356,7 @@ class Dialer { */ _encryptConnection (connection, callback) { const msDialer = new multistream.Dialer() - - msDialer.handle(connection, (err) => { + handleSafe(msDialer, connection, (err) => { if (err) { return callback(err) } @@ -323,7 +364,7 @@ class Dialer { const myId = this.switch._peerInfo.id log('selecting crypto: %s', this.switch.crypto.tag) - msDialer.select(this.switch.crypto.tag, (err, _conn) => { + selectSafe(msDialer, this.switch.crypto.tag, (err, _conn) => { if (err) { return callback(err) } @@ -338,8 +379,8 @@ class Dialer { encryptedConnection.setPeerInfo(this.peerInfo) callback(null, encryptedConnection) }) - }) - }) + }, callback) + }, callback) } /** @@ -357,17 +398,15 @@ class Dialer { } const msDialer = new multistream.Dialer() - msDialer.handle(connection, (err) => { + handleSafe(msDialer, connection, (err) => { if (err) { return callback(err) } - msDialer.select(this.protocol, (err, conn) => { - if (err) { - return callback(err) - } - callback(null, conn) - }) - }) + + selectSafe(msDialer, this.protocol, (err, conn) => { + callback(err, conn) + }, callback) + }, callback) } }