From af27db9c065578f6b528e31f27a49ac5d2e90d0a Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 31 May 2018 15:47:29 +0200 Subject: [PATCH] fix: add utility methods to prevent already piped error --- src/dial.js | 165 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 102 insertions(+), 63 deletions(-) diff --git a/src/dial.js b/src/dial.js index 0082f7b..46aa401 100644 --- a/src/dial.js +++ b/src/dial.js @@ -11,6 +11,53 @@ const log = debug('libp2p:switch:dial') const getPeerInfo = require('./get-peer-info') const observeConnection = require('./observe-connection') +const UNEXPECTED_END = 'Unexpected end of input from reader.' + +/** + * Uses the given MultistreamDialer to select the protocol matching the given key + * + * A helper method to catch errors from pull streams ending unexpectedly + * Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged. + * + * @param {MultistreamDialer} msDialer a multistream.Dialer + * @param {string} key The key type to select + * @param {function(Error)} callback Used for standard async flow + * @param {function(Error)} abort A callback to be used for ending the connection outright + * @returns {void} + */ +function selectSafe (msDialer, key, callback, abort) { + msDialer.select(key, (err, conn) => { + if (err === true) { + return abort(new Error(UNEXPECTED_END)) + } + + callback(err, conn) + }) +} + +/** + * Uses the given MultistreamDialer to handle the given connection + * + * A helper method to catch errors from pull streams ending unexpectedly + * Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged + * + * @param {MultistreamDialer} msDialer + * @param {Connection} connection The connection to handle + * @param {function(Error)} callback Used for standard async flow + * @param {function(Error)} abort A callback to be used for ending the connection outright + * @returns {void} + */ +function handleSafe (msDialer, connection, callback, abort) { + msDialer.handle(connection, (err) => { + // Repackage errors from pull-streams ending unexpectedly. + // Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged. + if (err === true) { + return abort(new Error(UNEXPECTED_END)) + } + + callback(err) + }) +} /** * Manages dialing to another peer, including muxer upgrades @@ -55,6 +102,11 @@ class Dialer { cb(null) } ], (err, connection) => { + if ((err && err.message === UNEXPECTED_END) || err === true) { + log('Connection dropped for %s', this.peerInfo.id.toB58String()) + return this.callback(null, null) + } + this.callback(err, connection) }) @@ -160,13 +212,6 @@ class Dialer { connection.setPeerInfo(this.peerInfo) this._attemptMuxerUpgrade(connection, b58Id, (err, muxer) => { - // The underlying stream closed unexpectedly, so drop the connection. - // Fixes https://github.com/libp2p/js-libp2p-switch/issues/235 - if (err && err.message === 'Unexpected end of input from reader.') { - log('Connection dropped for %s', b58Id) - return callback(null, null) - } - if (err && !this.protocol) { this.switch.conns[b58Id] = connection return callback(null, null) @@ -178,7 +223,7 @@ class Dialer { } callback(null, muxer) - }) + }, callback) } /** @@ -190,65 +235,62 @@ class Dialer { * @param {Connection} connection * @param {string} b58Id * @param {function(Error, Connection)} callback + * @param {function(Error, Connection)} abort A callback to be used for ending the connection outright * @returns {void} */ - _attemptMuxerUpgrade (connection, b58Id, callback) { + _attemptMuxerUpgrade (connection, b58Id, callback, abort) { const muxers = Object.keys(this.switch.muxers) + if (muxers.length === 0) { return callback(new Error('no muxers available')) } - // 1. try to handshake in one of the muxers available - // 2. if succeeds - // - add the muxedConn to the list of muxedConns - // - add incomming new streams to connHandler - const nextMuxer = (key) => { - log('selecting %s', key) - msDialer.select(key, (err, conn) => { - if (err) { - if (muxers.length === 0) { - return callback(new Error('could not upgrade to stream muxing')) - } + const msDialer = new multistream.Dialer() + handleSafe(msDialer, connection, (err) => { + if (err) { + return callback(new Error('multistream not supported')) + } - return nextMuxer(muxers.shift()) - } + // 1. try to handshake in one of the muxers available + // 2. if succeeds + // - add the muxedConn to the list of muxedConns + // - add incomming new streams to connHandler + const nextMuxer = (key) => { + log('selecting %s', key) + selectSafe(msDialer, key, (err, conn) => { + if (err) { + if (muxers.length === 0) { + return callback(new Error('could not upgrade to stream muxing')) + } - const muxedConn = this.switch.muxers[key].dialer(conn) - this.switch.muxedConns[b58Id] = {} - this.switch.muxedConns[b58Id].muxer = muxedConn + return nextMuxer(muxers.shift()) + } - muxedConn.once('close', () => { - delete this.switch.muxedConns[b58Id] - this.peerInfo.disconnect() - this.switch._peerInfo.disconnect() - setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo)) - }) + const muxedConn = this.switch.muxers[key].dialer(conn) + this.switch.muxedConns[b58Id] = {} + this.switch.muxedConns[b58Id].muxer = muxedConn - // For incoming streams, in case identify is on - muxedConn.on('stream', (conn) => { - conn.setPeerInfo(this.peerInfo) - this.switch.protocolMuxer(null)(conn) - }) + muxedConn.once('close', () => { + delete this.switch.muxedConns[b58Id] + this.peerInfo.disconnect() + this.switch._peerInfo.disconnect() + setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo)) + }) - setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo)) + // For incoming streams, in case identify is on + muxedConn.on('stream', (conn) => { + conn.setPeerInfo(this.peerInfo) + this.switch.protocolMuxer(null)(conn) + }) - callback(null, muxedConn) - }) - } + setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo)) - const msDialer = new multistream.Dialer() - msDialer.handle(connection, (err) => { - if (err) { - // Repackage errors from pull-streams ending unexpectedly. - // Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged. - if (err === true) { - return callback(new Error('Unexpected end of input from reader.')) - } - return callback(new Error('multistream not supported')) + callback(null, muxedConn) + }, abort) } nextMuxer(muxers.shift()) - }) + }, abort) } /** @@ -314,8 +356,7 @@ class Dialer { */ _encryptConnection (connection, callback) { const msDialer = new multistream.Dialer() - - msDialer.handle(connection, (err) => { + handleSafe(msDialer, connection, (err) => { if (err) { return callback(err) } @@ -323,7 +364,7 @@ class Dialer { const myId = this.switch._peerInfo.id log('selecting crypto: %s', this.switch.crypto.tag) - msDialer.select(this.switch.crypto.tag, (err, _conn) => { + selectSafe(msDialer, this.switch.crypto.tag, (err, _conn) => { if (err) { return callback(err) } @@ -338,8 +379,8 @@ class Dialer { encryptedConnection.setPeerInfo(this.peerInfo) callback(null, encryptedConnection) }) - }) - }) + }, callback) + }, callback) } /** @@ -357,17 +398,15 @@ class Dialer { } const msDialer = new multistream.Dialer() - msDialer.handle(connection, (err) => { + handleSafe(msDialer, connection, (err) => { if (err) { return callback(err) } - msDialer.select(this.protocol, (err, conn) => { - if (err) { - return callback(err) - } - callback(null, conn) - }) - }) + + selectSafe(msDialer, this.protocol, (err, conn) => { + callback(err, conn) + }, callback) + }, callback) } }