Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
fix: add utility methods to prevent already piped error
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun committed May 31, 2018
1 parent 0cef5cc commit af27db9
Showing 1 changed file with 102 additions and 63 deletions.
165 changes: 102 additions & 63 deletions src/dial.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,53 @@ const log = debug('libp2p:switch:dial')

const getPeerInfo = require('./get-peer-info')
const observeConnection = require('./observe-connection')
const UNEXPECTED_END = 'Unexpected end of input from reader.'

/**
* Uses the given MultistreamDialer to select the protocol matching the given key
*
* A helper method to catch errors from pull streams ending unexpectedly
* Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged.
*
* @param {MultistreamDialer} msDialer a multistream.Dialer
* @param {string} key The key type to select
* @param {function(Error)} callback Used for standard async flow
* @param {function(Error)} abort A callback to be used for ending the connection outright
* @returns {void}
*/
function selectSafe (msDialer, key, callback, abort) {
msDialer.select(key, (err, conn) => {
if (err === true) {
return abort(new Error(UNEXPECTED_END))
}

callback(err, conn)
})
}

/**
* Uses the given MultistreamDialer to handle the given connection
*
* A helper method to catch errors from pull streams ending unexpectedly
* Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged
*
* @param {MultistreamDialer} msDialer
* @param {Connection} connection The connection to handle
* @param {function(Error)} callback Used for standard async flow
* @param {function(Error)} abort A callback to be used for ending the connection outright
* @returns {void}
*/
function handleSafe (msDialer, connection, callback, abort) {
msDialer.handle(connection, (err) => {
// Repackage errors from pull-streams ending unexpectedly.
// Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged.
if (err === true) {
return abort(new Error(UNEXPECTED_END))
}

callback(err)
})
}

/**
* Manages dialing to another peer, including muxer upgrades
Expand Down Expand Up @@ -55,6 +102,11 @@ class Dialer {
cb(null)
}
], (err, connection) => {
if ((err && err.message === UNEXPECTED_END) || err === true) {
log('Connection dropped for %s', this.peerInfo.id.toB58String())
return this.callback(null, null)
}

this.callback(err, connection)
})

Expand Down Expand Up @@ -160,13 +212,6 @@ class Dialer {

connection.setPeerInfo(this.peerInfo)
this._attemptMuxerUpgrade(connection, b58Id, (err, muxer) => {
// The underlying stream closed unexpectedly, so drop the connection.
// Fixes https://github.com/libp2p/js-libp2p-switch/issues/235
if (err && err.message === 'Unexpected end of input from reader.') {
log('Connection dropped for %s', b58Id)
return callback(null, null)
}

if (err && !this.protocol) {
this.switch.conns[b58Id] = connection
return callback(null, null)
Expand All @@ -178,7 +223,7 @@ class Dialer {
}

callback(null, muxer)
})
}, callback)
}

/**
Expand All @@ -190,65 +235,62 @@ class Dialer {
* @param {Connection} connection
* @param {string} b58Id
* @param {function(Error, Connection)} callback
* @param {function(Error, Connection)} abort A callback to be used for ending the connection outright
* @returns {void}
*/
_attemptMuxerUpgrade (connection, b58Id, callback) {
_attemptMuxerUpgrade (connection, b58Id, callback, abort) {
const muxers = Object.keys(this.switch.muxers)

if (muxers.length === 0) {
return callback(new Error('no muxers available'))
}

// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler
const nextMuxer = (key) => {
log('selecting %s', key)
msDialer.select(key, (err, conn) => {
if (err) {
if (muxers.length === 0) {
return callback(new Error('could not upgrade to stream muxing'))
}
const msDialer = new multistream.Dialer()
handleSafe(msDialer, connection, (err) => {
if (err) {
return callback(new Error('multistream not supported'))
}

return nextMuxer(muxers.shift())
}
// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler
const nextMuxer = (key) => {
log('selecting %s', key)
selectSafe(msDialer, key, (err, conn) => {
if (err) {
if (muxers.length === 0) {
return callback(new Error('could not upgrade to stream muxing'))
}

const muxedConn = this.switch.muxers[key].dialer(conn)
this.switch.muxedConns[b58Id] = {}
this.switch.muxedConns[b58Id].muxer = muxedConn
return nextMuxer(muxers.shift())
}

muxedConn.once('close', () => {
delete this.switch.muxedConns[b58Id]
this.peerInfo.disconnect()
this.switch._peerInfo.disconnect()
setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo))
})
const muxedConn = this.switch.muxers[key].dialer(conn)
this.switch.muxedConns[b58Id] = {}
this.switch.muxedConns[b58Id].muxer = muxedConn

// For incoming streams, in case identify is on
muxedConn.on('stream', (conn) => {
conn.setPeerInfo(this.peerInfo)
this.switch.protocolMuxer(null)(conn)
})
muxedConn.once('close', () => {
delete this.switch.muxedConns[b58Id]
this.peerInfo.disconnect()
this.switch._peerInfo.disconnect()
setImmediate(() => this.switch.emit('peer-mux-closed', this.peerInfo))
})

setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo))
// For incoming streams, in case identify is on
muxedConn.on('stream', (conn) => {
conn.setPeerInfo(this.peerInfo)
this.switch.protocolMuxer(null)(conn)
})

callback(null, muxedConn)
})
}
setImmediate(() => this.switch.emit('peer-mux-established', this.peerInfo))

const msDialer = new multistream.Dialer()
msDialer.handle(connection, (err) => {
if (err) {
// Repackage errors from pull-streams ending unexpectedly.
// Needed until https://github.com/dignifiedquire/pull-length-prefixed/pull/8 is merged.
if (err === true) {
return callback(new Error('Unexpected end of input from reader.'))
}
return callback(new Error('multistream not supported'))
callback(null, muxedConn)
}, abort)
}

nextMuxer(muxers.shift())
})
}, abort)
}

/**
Expand Down Expand Up @@ -314,16 +356,15 @@ class Dialer {
*/
_encryptConnection (connection, callback) {
const msDialer = new multistream.Dialer()

msDialer.handle(connection, (err) => {
handleSafe(msDialer, connection, (err) => {
if (err) {
return callback(err)
}

const myId = this.switch._peerInfo.id
log('selecting crypto: %s', this.switch.crypto.tag)

msDialer.select(this.switch.crypto.tag, (err, _conn) => {
selectSafe(msDialer, this.switch.crypto.tag, (err, _conn) => {
if (err) {
return callback(err)
}
Expand All @@ -338,8 +379,8 @@ class Dialer {
encryptedConnection.setPeerInfo(this.peerInfo)
callback(null, encryptedConnection)
})
})
})
}, callback)
}, callback)
}

/**
Expand All @@ -357,17 +398,15 @@ class Dialer {
}

const msDialer = new multistream.Dialer()
msDialer.handle(connection, (err) => {
handleSafe(msDialer, connection, (err) => {
if (err) {
return callback(err)
}
msDialer.select(this.protocol, (err, conn) => {
if (err) {
return callback(err)
}
callback(null, conn)
})
})

selectSafe(msDialer, this.protocol, (err, conn) => {
callback(err, conn)
}, callback)
}, callback)
}
}

Expand Down

0 comments on commit af27db9

Please sign in to comment.