Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

refactor: async with multiaddr conn #92

Merged
merged 18 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
'use strict'

const multiaddr = require('multiaddr')
const pull = require('pull-stream')

const pipe = require('it-pipe')
const WS = require('./src')

const mockUpgrader = {
upgradeInbound: maConn => maConn,
upgradeOutbound: maConn => maConn
}
let listener

function boot (done) {
const ws = new WS()
const ws = new WS({ upgrader: mockUpgrader })
const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws')
listener = ws.createListener((conn) => pull(conn, conn))
listener.listen(ma, done)
listener = ws.createListener(conn => pipe(conn, conn))
listener.listen(ma).then(() => done()).catch(done)
listener.on('error', console.error)
}

function shutdown (done) {
listener.close(done)
listener.close().then(done).catch(done)
}

module.exports = {
Expand Down
43 changes: 2 additions & 41 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,43 +1,4 @@
docs
node_modules
package-lock.json
yarn.lock

# Logs
logs
*.log
npm-debug.log*

# Runtime data
pids
*.pid
*.seed

# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

# Coverage directory used by tools like istanbul
coverage

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt

# node-waf configuration
.lock-wscript

# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release

# Dependency directory
node_modules

# Optional npm cache directory
.npm

# Optional REPL history
.node_repl_history

# Vim editor swap files
*.swp

dist

.nyc_output
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:

- stage: check
script:
- npx aegir commitlint --travis
- npx aegir build --bundlesize
- npx aegir dep-check -- -i wrtc -i electron-webrtc
- npm run lint

Expand Down
40 changes: 18 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,33 @@
```js
const WS = require('libp2p-websockets')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const pipe = require('it-pipe')
const { collect } = require('streaming-iterables')

const mh = multiaddr('/ip4/0.0.0.0/tcp/9090/ws')
const addr = multiaddr('/ip4/0.0.0.0/tcp/9090/ws')

const ws = new WS()
const ws = new WS({ upgrader })

const listener = ws.createListener((socket) => {
console.log('new connection opened')
pull(
pull.values(['hello']),
pipe(
['hello'],
socket
)
})

listener.listen(mh, () => {
console.log('listening')

pull(
ws.dial(mh),
pull.collect((err, values) => {
if (!err) {
console.log(`Value: ${values.toString()}`)
} else {
console.log(`Error: ${err}`)
}

// Close connection after reading
listener.close()
}),
)
})
await listener.listen(addr)
console.log('listening')

const socket = await ws.dial(addr)
const values = await pipe(
socket,
collect
)
console.log(`Value: ${values.toString()}`)

// Close connection after reading
await listener.close()
```

## API
Expand Down
2 changes: 0 additions & 2 deletions ci/Jenkinsfile

This file was deleted.

27 changes: 15 additions & 12 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
"release": "aegir release -t node -t browser ",
"release-minor": "aegir release --type minor -t node -t browser",
"release-major": "aegir release --type major -t node -t browser",
"coverage": "aegir coverage",
"coverage-publish": "aegir coverage --provider coveralls"
"coverage": "nyc --reporter=lcov --reporter=text npm run test:node"
},
"browser": {
"src/listener": "./src/listener.browser.js"
Expand All @@ -24,8 +23,7 @@
"dist"
],
"pre-push": [
"lint",
"test"
"lint"
],
"repository": {
"type": "git",
Expand All @@ -40,21 +38,26 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-websockets#readme",
"dependencies": {
"abortable-iterator": "^2.1.0",
"class-is": "^1.1.0",
"debug": "^4.1.1",
"interface-connection": "~0.3.3",
"mafmt": "^6.0.7",
"err-code": "^2.0.0",
"it-ws": "vasco-santos/it-ws#v2.1.1-rc.0",
"libp2p-utils": "~0.1.0",
"mafmt": "^7.0.0",
"multiaddr": "^7.1.0",
"multiaddr-to-uri": "^5.0.0",
"pull-ws": "hugomrdias/pull-ws#fix/bundle-size"
"p-timeout": "^3.2.0"
},
"devDependencies": {
"aegir": "^20.0.0",
"abort-controller": "^3.0.0",
"aegir": "^20.3.1",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"interface-transport": "~0.3.7",
"multiaddr": "^6.0.6",
"pull-goodbye": "0.0.2",
"pull-stream": "^3.6.9"
"interface-transport": "^0.7.0",
"it-goodbye": "^2.0.1",
"it-pipe": "^1.0.1",
"streaming-iterables": "^4.1.0"
},
"contributors": [
"Chris Campbell <christopher.d.campbell@gmail.com>",
Expand Down
8 changes: 8 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict'

// p2p multi-address code
exports.CODE_P2P = 421
exports.CODE_CIRCUIT = 290

// Time to wait for a connection to close gracefully before destroying it manually
exports.CLOSE_TIMEOUT = 2000
133 changes: 100 additions & 33 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,68 +1,135 @@
'use strict'

const connect = require('pull-ws/client')
const connect = require('it-ws/client')
const mafmt = require('mafmt')
const withIs = require('class-is')
const Connection = require('interface-connection').Connection

const toUri = require('multiaddr-to-uri')
const debug = require('debug')
const log = debug('libp2p:websockets:dialer')
const { AbortError } = require('abortable-iterator')

const log = require('debug')('libp2p:websockets')
const assert = require('assert')

const createListener = require('./listener')
const toConnection = require('./socket-to-conn')
const { CODE_CIRCUIT, CODE_P2P } = require('./constants')

/**
* @class WebSockets
*/
class WebSockets {
dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
/**
* @constructor
* @param {object} options
* @param {Upgrader} options.upgrader
*/
constructor ({ upgrader }) {
assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.')
this._upgrader = upgrader
}

/**
* @async
* @param {Multiaddr} ma
* @param {object} [options]
* @param {AbortSignal} [options.signal] Used to abort dial requests
* @returns {Connection} An upgraded Connection
*/
async dial (ma, options = {}) {
log('dialing %s', ma)

const socket = await this._connect(ma, options)
const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal })
log('new outbound connection %s', maConn.remoteAddr)

const conn = await this._upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
return conn
}

/**
* @private
* @param {Multiaddr} ma
* @param {object} [options]
* @param {AbortSignal} [options.signal] Used to abort dial requests
* @returns {Promise<WebSocket>} Resolves a extended duplex iterable on top of a WebSocket
*/
async _connect (ma, options = {}) {
if (options.signal && options.signal.aborted) {
throw new AbortError()
}
const cOpts = ma.toOptions()
log('dialing %s:%s', cOpts.host, cOpts.port)

const rawSocket = connect(toUri(ma), Object.assign({ binary: true }, options))

callback = callback || function () { }
if (!options.signal) {
await rawSocket.connected()

const url = toUri(ma)
log('dialing %s', url)
const socket = connect(url, {
binary: true,
onConnect: (err) => {
callback(err)
log('connected %s', ma)
return rawSocket
}

// Allow abort via signal during connect
let onAbort
const abort = new Promise((resolve, reject) => {
onAbort = () => {
reject(new AbortError())
rawSocket.close()
}

// Already aborted?
if (options.signal.aborted) return onAbort()
options.signal.addEventListener('abort', onAbort)
})

const conn = new Connection(socket)
conn.getObservedAddrs = (cb) => cb(null, [ma])
conn.close = (cb) => socket.close(cb)
try {
await Promise.race([abort, rawSocket.connected()])
} finally {
options.signal.removeEventListener('abort', onAbort)
}

return conn
log('connected %s', ma)
return rawSocket
}

createListener (options, handler) {
/**
* Creates a Websockets listener. The provided `handler` function will be called
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
* @param {object} [options]
* @param {http.Server} [options.server] A pre-created Node.js HTTP/S server.
* @param {function (Connection)} handler
* @returns {Listener} A Websockets listener
*/
createListener (options = {}, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}

return createListener(options, handler)
return createListener({ handler, upgrader: this._upgrader }, options)
}

/**
* Takes a list of `Multiaddr`s and returns only valid Websockets addresses
* @param {Multiaddr[]} multiaddrs
* @returns {Multiaddr[]} Valid Websockets multiaddrs
*/
filter (multiaddrs) {
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]

return multiaddrs.filter((ma) => {
if (ma.protoNames().includes('p2p-circuit')) {
if (ma.protoCodes().includes(CODE_CIRCUIT)) {
return false
}

if (ma.protoNames().includes('ipfs')) {
ma = ma.decapsulate('ipfs')
}

return mafmt.WebSockets.matches(ma) ||
mafmt.WebSocketsSecure.matches(ma)
return mafmt.WebSockets.matches(ma.decapsulateCode(CODE_P2P)) ||
mafmt.WebSocketsSecure.matches(ma.decapsulateCode(CODE_P2P))
})
}
}

module.exports = withIs(WebSockets, { className: 'WebSockets', symbolName: '@libp2p/js-libp2p-websockets/websockets' })
module.exports = withIs(WebSockets, {
className: 'WebSockets',
symbolName: '@libp2p/js-libp2p-websockets/websockets'
})
Loading