diff --git a/.aegir.js b/.aegir.js index ddf424a..4d2f8bd 100644 --- a/.aegir.js +++ b/.aegir.js @@ -4,10 +4,14 @@ const multiaddr = require('multiaddr') const pipe = require('it-pipe') const WS = require('./src') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} let listener function boot (done) { - const ws = new WS() + const ws = new WS({ upgrader: mockUpgrader }) const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') listener = ws.createListener(conn => pipe(conn, conn)) listener.listen(ma).then(() => done()).catch(done) diff --git a/.gitignore b/.gitignore index f338286..9faa8e0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,43 +1,4 @@ -docs +node_modules package-lock.json -yarn.lock - -# Logs -logs -*.log -npm-debug.log* - -# Runtime data -pids -*.pid -*.seed - -# Directory for instrumented libs generated by jscoverage/JSCover -lib-cov - -# Coverage directory used by tools like istanbul coverage .nyc_output - -# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) -.grunt - -# node-waf configuration -.lock-wscript - -# Compiled binary addons (http://nodejs.org/api/addons.html) -build/Release - -# Dependency directory -node_modules - -# Optional npm cache directory -.npm - -# Optional REPL history -.node_repl_history - -# Vim editor swap files -*.swp - -dist diff --git a/.travis.yml b/.travis.yml index dba6e9d..ff17c4c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,7 +24,7 @@ jobs: - stage: check script: - - npx aegir commitlint --travis + - npx aegir build --bundlesize - npx aegir dep-check -- -i wrtc -i electron-webrtc - npm run lint diff --git a/README.md b/README.md index 57f4969..b99fd8a 100644 --- a/README.md +++ b/README.md @@ -40,37 +40,33 @@ ```js const WS = require('libp2p-websockets') const multiaddr = require('multiaddr') -const pull = require('pull-stream') +const pipe = require('it-pipe') +const { collect } = require('streaming-iterables') -const mh = multiaddr('/ip4/0.0.0.0/tcp/9090/ws') +const addr = multiaddr('/ip4/0.0.0.0/tcp/9090/ws') -const ws = new WS() +const ws = new WS({ upgrader }) const listener = ws.createListener((socket) => { console.log('new connection opened') - pull( - pull.values(['hello']), + pipe( + ['hello'], socket ) }) -listener.listen(mh, () => { - console.log('listening') - - pull( - ws.dial(mh), - pull.collect((err, values) => { - if (!err) { - console.log(`Value: ${values.toString()}`) - } else { - console.log(`Error: ${err}`) - } - - // Close connection after reading - listener.close() - }), - ) -}) +await listener.listen(addr) +console.log('listening') + +const socket = await ws.dial(addr) +const values = await pipe( + socket, + collect +) +console.log(`Value: ${values.toString()}`) + +// Close connection after reading +await listener.close() ``` ## API diff --git a/package.json b/package.json index 28b5051..00c6be2 100644 --- a/package.json +++ b/package.json @@ -23,8 +23,7 @@ "dist" ], "pre-push": [ - "lint", - "test" + "lint" ], "repository": { "type": "git", @@ -39,24 +38,25 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { - "abortable-iterator": "^2.0.0", + "abortable-iterator": "^2.1.0", "class-is": "^1.1.0", "debug": "^4.1.1", - "interface-connection": "~0.3.3", - "it-ws": "^2.1.0", - "mafmt": "^6.0.7", + "err-code": "^2.0.0", + "ip-address": "^6.1.0", + "it-ws": "vasco-santos/it-ws#feat/add-properties-and-functions-to-client-and-server", + "mafmt": "^7.0.0", + "multiaddr": "^7.1.0", "multiaddr-to-uri": "^5.0.0" }, "devDependencies": { "abort-controller": "^3.0.0", - "aegir": "^20.0.0", + "aegir": "^20.3.1", "chai": "^4.2.0", "dirty-chai": "^2.0.1", - "interface-transport": "~0.6.1", - "it-goodbye": "^2.0.0", - "it-pipe": "^1.0.0", - "multiaddr": "^6.0.6", - "streaming-iterables": "^4.0.2" + "interface-transport": "^0.7.0", + "it-goodbye": "^2.0.1", + "it-pipe": "^1.0.1", + "streaming-iterables": "^4.1.0" }, "contributors": [ "Chris Campbell ", diff --git a/src/adapter.js b/src/adapter.js deleted file mode 100644 index c27d009..0000000 --- a/src/adapter.js +++ /dev/null @@ -1,17 +0,0 @@ -'use strict' - -const { Adapter } = require('interface-transport') -const withIs = require('class-is') -const WebSockets = require('./') - -// Legacy adapter to old transport & connection interface -class WebSocketsAdapter extends Adapter { - constructor () { - super(new WebSockets()) - } -} - -module.exports = withIs(WebSocketsAdapter, { - className: 'WebSockets', - symbolName: '@libp2p/js-libp2p-websockets/websockets' -}) diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..b7ab8fe --- /dev/null +++ b/src/constants.js @@ -0,0 +1,8 @@ +'use strict' + +// p2p multi-address code +exports.CODE_P2P = 421 +exports.CODE_CIRCUIT = 290 + +// Time to wait for a connection to close gracefully before destroying it manually +exports.CLOSE_TIMEOUT = 2000 diff --git a/src/index.js b/src/index.js index 26de44a..b085fe8 100644 --- a/src/index.js +++ b/src/index.js @@ -4,24 +4,70 @@ const connect = require('it-ws/client') const mafmt = require('mafmt') const withIs = require('class-is') const toUri = require('multiaddr-to-uri') -const log = require('debug')('libp2p:websockets:transport') -const abortable = require('abortable-iterator') -const { AbortError } = require('interface-transport') +const { AbortError } = require('abortable-iterator') + +const log = require('debug')('libp2p:websockets') +const assert = require('assert') + const createListener = require('./listener') +const toConnection = require('./socket-to-conn') +const { CODE_CIRCUIT, CODE_P2P } = require('./constants') +/** + * @class WebSockets + */ class WebSockets { + /** + * @constructor + * @param {object} options + * @param {Upgrader} options.upgrader + */ + constructor ({ upgrader }) { + assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.') + this._upgrader = upgrader + } + + /** + * @async + * @param {Multiaddr} ma + * @param {object} options + * @param {AbortSignal} options.signal Used to abort dial requests + * @returns {Connection} An upgraded Connection + */ async dial (ma, options) { options = options || {} log('dialing %s', ma) - const socket = connect(toUri(ma), Object.assign({ binary: true }, options)) - const getObservedAddrs = () => [ma] + const stream = await this._connect(ma, options) + const maConn = toConnection(stream, { socket: stream.socket, remoteAddr: ma, signal: options.signal }) + log('new outbound connection %s', maConn.remoteAddr) + + const conn = await this._upgrader.upgradeOutbound(maConn) + log('outbound connection %s upgraded', maConn.remoteAddr) + return conn + } + + /** + * @private + * @param {Multiaddr} ma + * @param {object} options + * @param {AbortSignal} options.signal Used to abort dial requests + * @returns {Promise} Resolves a TCP Socket + */ + async _connect (ma, options = {}) { + if (options.signal && options.signal.aborted) { + throw new AbortError() + } + const cOpts = ma.toOptions() + log('dialing %s:%s', cOpts.host, cOpts.port) + + const rawSocket = connect(toUri(ma), Object.assign({ binary: true }, options)) if (!options.signal) { - socket.getObservedAddrs = getObservedAddrs - await socket.connected() + await rawSocket.connected() + log('connected %s', ma) - return socket + return rawSocket } // Allow abort via signal during connect @@ -29,7 +75,7 @@ class WebSockets { const abort = new Promise((resolve, reject) => { onAbort = () => { reject(new AbortError()) - socket.close() + rawSocket.close() } // Already aborted? @@ -38,45 +84,49 @@ class WebSockets { }) try { - await Promise.race([abort, socket.connected()]) + await Promise.race([abort, rawSocket.connected()]) } finally { options.signal.removeEventListener('abort', onAbort) } log('connected %s', ma) - return { - sink: async source => { - try { - await socket.sink(abortable(source, options.signal)) - } catch (err) { - // Re-throw non-aborted errors - if (err.type !== 'aborted') throw err - // Otherwise, this is fine... - await socket.close() - } - }, - source: abortable(socket.source, options.signal), - getObservedAddrs - } + return rawSocket } + /** + * Creates a Websockets listener. The provided `handler` function will be called + * anytime a new incoming Connection has been successfully upgraded via + * `upgrader.upgradeInbound`. + * @param {object} [options] + * @param {http.Server} [options.server] A pre-created Node.js HTTP/S server. + * @param {function (Connection)} handler + * @returns {Listener} A Websockets listener + */ createListener (options, handler) { - return createListener(options, handler) + if (typeof options === 'function') { + handler = options + options = {} + } + options = options || {} + + return createListener({ handler, upgrader: this._upgrader }, options) } + /** + * Takes a list of `Multiaddr`s and returns only valid Websockets addresses + * @param {Multiaddr[]} multiaddrs + * @returns {Multiaddr[]} Valid Websockets multiaddrs + */ filter (multiaddrs) { multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] return multiaddrs.filter((ma) => { - if (ma.protoNames().includes('p2p-circuit')) { + if (ma.protoNames().includes(CODE_CIRCUIT)) { return false } - if (ma.protoNames().includes('ipfs')) { - ma = ma.decapsulate('ipfs') - } - - return mafmt.WebSockets.matches(ma) || mafmt.WebSocketsSecure.matches(ma) + return mafmt.WebSockets.matches(ma.decapsulateCode(CODE_P2P)) || + mafmt.WebSocketsSecure.matches(ma.decapsulateCode(CODE_P2P)) }) } } diff --git a/src/ip-port-to-multiaddr.js b/src/ip-port-to-multiaddr.js new file mode 100644 index 0000000..6d980c4 --- /dev/null +++ b/src/ip-port-to-multiaddr.js @@ -0,0 +1,30 @@ +'use strict' + +const multiaddr = require('multiaddr') +const { Address4, Address6 } = require('ip-address') + +module.exports = (ip, port) => { + if (typeof ip !== 'string') { + throw new Error('invalid ip') + } + + port = parseInt(port) + + if (isNaN(port)) { + throw new Error('invalid port') + } + + if (new Address4(ip).isValid()) { + return multiaddr(`/ip4/${ip}/tcp/${port}`) + } + + const ip6 = new Address6(ip) + + if (ip6.isValid()) { + return ip6.is4() + ? multiaddr(`/ip4/${ip6.to4().correctForm()}/tcp/${port}`) + : multiaddr(`/ip6/${ip}/tcp/${port}`) + } + + throw new Error('invalid ip') +} diff --git a/src/listener.js b/src/listener.js index 48d3300..309fa8b 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,36 +1,61 @@ 'use strict' -const multiaddr = require('multiaddr') +const EventEmitter = require('events') const os = require('os') +const multiaddr = require('multiaddr') const { createServer } = require('it-ws') -module.exports = (options, handler) => { - if (typeof options === 'function') { - handler = options - options = {} - } +const log = require('debug')('libp2p:websockets:listener') + +const { CODE_P2P } = require('./constants') +const toConnection = require('./socket-to-conn') + +module.exports = ({ handler, upgrader }, options = {}) => { + const listener = new EventEmitter() + + const server = createServer(options, async (stream, req) => { + const maConn = toConnection(stream, { + socket: req.socket + }) + + log('new inbound connection %s', maConn.remoteAddr) + + const conn = await upgrader.upgradeInbound(maConn) + log('inbound connection %s upgraded', maConn.remoteAddr) + + trackConn(server, maConn) - options = options || {} + if (handler) handler(conn) + listener.emit('connection', conn) + }) - const server = createServer(options, handler ? socket => { - socket.getObservedAddrs = () => [] - handler(socket) - } : null) + server + .on('listening', () => listener.emit('listening')) + .on('error', err => listener.emit('error', err)) + .on('close', () => listener.emit('close')) - let listeningMultiaddr + // Keep track of open connections to destroy in case of timeout + server.__connections = [] - const listen = server.listen - server.listen = ma => { + let peerId, listeningMultiaddr + + listener.close = () => { + server.__connections.forEach(maConn => maConn.close()) + return server.close() + } + + listener.listen = (ma) => { listeningMultiaddr = ma + peerId = listeningMultiaddr.getPeerId() - if (ma.protoNames().includes('ipfs')) { - ma = ma.decapsulate('ipfs') + if (peerId) { + ma = ma.decapsulateCode(CODE_P2P) } - return listen(ma.toOptions()) + return server.listen(ma.toOptions()) } - server.getAddrs = () => { + listener.getAddrs = () => { const multiaddrs = [] const address = server.address() @@ -46,7 +71,7 @@ module.exports = (options, handler) => { let m = listeningMultiaddr.decapsulate('tcp') m = m.encapsulate('/tcp/' + address.port + '/ws') if (listeningMultiaddr.getPeerId()) { - m = m.encapsulate('/ipfs/' + ipfsId) + m = m.encapsulate('/p2p/' + ipfsId) } if (m.toString().indexOf('0.0.0.0') !== -1) { @@ -66,5 +91,15 @@ module.exports = (options, handler) => { return multiaddrs } - return server + return listener +} + +function trackConn (server, maConn) { + server.__connections.push(maConn) + + const untrackConn = () => { + server.__connections = server.__connections.filter(c => c !== maConn) + } + + maConn.conn.once('close', untrackConn) } diff --git a/src/socket-to-conn.js b/src/socket-to-conn.js new file mode 100644 index 0000000..f8a3db5 --- /dev/null +++ b/src/socket-to-conn.js @@ -0,0 +1,68 @@ +'use strict' + +const abortable = require('abortable-iterator') +const { CLOSE_TIMEOUT } = require('./constants') +const toMultiaddr = require('./ip-port-to-multiaddr') + +const log = require('debug')('libp2p:websockets:socket') + +// Convert a stream into a MultiaddrConnection +// https://github.com/libp2p/interface-transport#multiaddrconnection +module.exports = (stream, options = {}) => { + const socket = options.socket + + const maConn = { + async sink (source) { + if (options.signal) { + source = abortable(source, options.signal) + } + + try { + await stream.sink(source) + } catch (err) { + // Re-throw non-aborted errors + if (err.type !== 'aborted') throw err + // Otherwise, this is fine... + await stream.close() + } + }, + + source: options.signal ? abortable(stream.source, options.signal) : stream.source, + + conn: socket, + + localAddr: undefined, + + // If the remote address was passed, use it - it may have the peer ID encapsulated + remoteAddr: options.remoteAddr || toMultiaddr(stream.remoteAddress, stream.remotePort), + + timeline: { open: Date.now() }, + + close () { + return new Promise(async (resolve) => { // eslint-disable-line no-async-promise-executor + const start = Date.now() + + // Attempt to end the socket. If it takes longer to close than the + // timeout, destroy it manually. + const timeout = setTimeout(() => { + const { host, port } = maConn.remoteAddr.toOptions() + log('timeout closing socket to %s:%s after %dms, destroying it manually', + host, port, Date.now() - start) + + socket.terminate() + maConn.timeline.close = Date.now() + return resolve() + }, CLOSE_TIMEOUT) + + await stream.close() + + clearTimeout(timeout) + maConn.timeline.close = Date.now() + + resolve() + }) + } + } + + return maConn +} diff --git a/test/browser.js b/test/browser.js index c3f1ff5..f7079f1 100644 --- a/test/browser.js +++ b/test/browser.js @@ -13,13 +13,18 @@ const { collect, take } = require('streaming-iterables') const WS = require('../src') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} + describe('libp2p-websockets', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') let ws let conn beforeEach(async () => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) conn = await ws.dial(ma) }) @@ -60,6 +65,6 @@ describe('libp2p-websockets', () => { }) it('.createServer throws in browser', () => { - expect(new WS().createListener).to.throw() + expect(new WS({ upgrader: mockUpgrader }).createListener).to.throw() }) }) diff --git a/test/compliance.node.js b/test/compliance.node.js index 13e4605..591026c 100644 --- a/test/compliance.node.js +++ b/test/compliance.node.js @@ -6,13 +6,13 @@ const multiaddr = require('multiaddr') const http = require('http') const WS = require('../src') -describe('compliance', () => { +describe('interface-transport compliance', () => { tests({ - async setup () { - const ws = new WS() + async setup ({ upgrader }) { // eslint-disable-line require-await + const ws = new WS({ upgrader }) const addrs = [ multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), - multiaddr('/ip4/127.0.0.1/tcp/9092/wss'), + multiaddr('/ip4/127.0.0.1/tcp/9092/ws'), multiaddr('/dns4/ipfs.io/tcp/9092/ws'), multiaddr('/dns4/ipfs.io/tcp/9092/wss') ] diff --git a/test/fixtures/certificate.pem b/test/fixtures/certificate.pem new file mode 100644 index 0000000..840776c --- /dev/null +++ b/test/fixtures/certificate.pem @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIICATCCAWoCCQDPufXH86n2QzANBgkqhkiG9w0BAQUFADBFMQswCQYDVQQGEwJu +bzETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0 +cyBQdHkgTHRkMB4XDTEyMDEwMTE0NDQwMFoXDTIwMDMxOTE0NDQwMFowRTELMAkG +A1UEBhMCbm8xEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0 +IFdpZGdpdHMgUHR5IEx0ZDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAtrQ7 ++r//2iV/B6F+4boH0XqFn7alcV9lpjvAmwRXNKnxAoa0f97AjYPGNLKrjpkNXXhB +JROIdbRbZnCNeC5fzX1a+JCo7KStzBXuGSZr27TtFmcV4H+9gIRIcNHtZmJLnxbJ +sIhkGR8yVYdmJZe4eT5ldk1zoB1adgPF1hZhCBMCAwEAATANBgkqhkiG9w0BAQUF +AAOBgQCeWBEHYJ4mCB5McwSSUox0T+/mJ4W48L/ZUE4LtRhHasU9hiW92xZkTa7E +QLcoJKQiWfiLX2ysAro0NX4+V8iqLziMqvswnPzz5nezaOLE/9U/QvH3l8qqNkXu +rNbsW1h/IO6FV8avWFYVFoutUwOaZ809k7iMh2F2JMgXQ5EymQ== +-----END CERTIFICATE----- \ No newline at end of file diff --git a/test/fixtures/key.pem b/test/fixtures/key.pem new file mode 100644 index 0000000..3649a93 --- /dev/null +++ b/test/fixtures/key.pem @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQC2tDv6v//aJX8HoX7hugfReoWftqVxX2WmO8CbBFc0qfEChrR/ +3sCNg8Y0squOmQ1deEElE4h1tFtmcI14Ll/NfVr4kKjspK3MFe4ZJmvbtO0WZxXg +f72AhEhw0e1mYkufFsmwiGQZHzJVh2Yll7h5PmV2TXOgHVp2A8XWFmEIEwIDAQAB +AoGAAlVY8sHi/aE+9xT77twWX3mGHV0SzdjfDnly40fx6S1Gc7bOtVdd9DC7pk6l +3ENeJVR02IlgU8iC5lMHq4JEHPE272jtPrLlrpWLTGmHEqoVFv9AITPqUDLhB9Kk +Hjl7h8NYBKbr2JHKICr3DIPKOT+RnXVb1PD4EORbJ3ooYmkCQQDfknUnVxPgxUGs +ouABw1WJIOVgcCY/IFt4Ihf6VWTsxBgzTJKxn3HtgvE0oqTH7V480XoH0QxHhjLq +DrgobWU9AkEA0TRJ8/ouXGnFEPAXjWr9GdPQRZ1Use2MrFjneH2+Sxc0CmYtwwqL +Kr5kS6mqJrxprJeluSjBd+3/ElxURrEXjwJAUvmlN1OPEhXDmRHd92mKnlkyKEeX +OkiFCiIFKih1S5Y/sRJTQ0781nyJjtJqO7UyC3pnQu1oFEePL+UEniRztQJAMfav +AtnpYKDSM+1jcp7uu9BemYGtzKDTTAYfoiNF42EzSJiGrWJDQn4eLgPjY0T0aAf/ +yGz3Z9ErbhMm/Ysl+QJBAL4kBxRT8gM4ByJw4sdOvSeCCANFq8fhbgm8pGWlCPb5 +JGmX3/GHFM8x2tbWMGpyZP1DLtiNEFz7eCGktWK5rqE= +-----END RSA PRIVATE KEY----- \ No newline at end of file diff --git a/test/node.js b/test/node.js index 5ed3c9d..83d8f4d 100644 --- a/test/node.js +++ b/test/node.js @@ -2,6 +2,9 @@ /* eslint max-nested-callbacks: ["error", 6] */ 'use strict' +const https = require('https') +const fs = require('fs') + const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect @@ -16,9 +19,14 @@ const WS = require('../src') require('./compliance.node') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} + describe('instantiate the transport', () => { it('create', () => { - const ws = new WS() + const ws = new WS({ upgrader: mockUpgrader }) expect(ws).to.exist() }) }) @@ -29,7 +37,7 @@ describe('listen', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') beforeEach(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) }) it('listen, check for promise', async () => { @@ -91,7 +99,7 @@ describe('listen', () => { }) it('getAddrs on port 0 listen', async () => { - const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`) + const addr = multiaddr('/ip4/127.0.0.1/tcp/0/ws') const listener = ws.createListener((conn) => { }) await listener.listen(addr) const addrs = await listener.getAddrs() @@ -101,7 +109,7 @@ describe('listen', () => { }) it('getAddrs from listening on 0.0.0.0', async () => { - const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`) + const addr = multiaddr('/ip4/0.0.0.0/tcp/9003/ws') const listener = ws.createListener((conn) => { }) await listener.listen(addr) const addrs = await listener.getAddrs() @@ -110,7 +118,7 @@ describe('listen', () => { }) it('getAddrs from listening on 0.0.0.0 and port 0', async () => { - const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`) + const addr = multiaddr('/ip4/0.0.0.0/tcp/0/ws') const listener = ws.createListener((conn) => { }) await listener.listen(addr) const addrs = await listener.getAddrs() @@ -119,8 +127,8 @@ describe('listen', () => { await listener.close() }) - it('getAddrs preserves IPFS Id', async () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + it('getAddrs preserves p2p Id', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = ws.createListener((conn) => { }) await listener.listen(ma) @@ -136,7 +144,7 @@ describe('listen', () => { const ma = multiaddr('/ip6/::1/tcp/9091/ws') beforeEach(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) }) it('listen, check for promise', async () => { @@ -183,7 +191,7 @@ describe('dial', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') beforeEach(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) listener = ws.createListener(conn => pipe(conn, conn)) return listener.listen(ma) }) @@ -199,8 +207,8 @@ describe('dial', () => { expect(result).to.be.eql([Buffer.from('hey')]) }) - it('dial with IPFS Id', async () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + it('dial with p2p Id', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const conn = await ws.dial(ma) const s = goodbye({ source: ['hey'], sink: collect }) @@ -239,13 +247,41 @@ describe('dial', () => { }) }) + describe('ip4 with wss', () => { + let ws + let listener + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/wss') + + const server = https.createServer({ + cert: fs.readFileSync('./test/fixtures/certificate.pem'), + key: fs.readFileSync('./test/fixtures/key.pem') + }) + + beforeEach(() => { + ws = new WS({ upgrader: mockUpgrader }) + listener = ws.createListener({ server }, conn => pipe(conn, conn)) + return listener.listen(ma) + }) + + afterEach(() => listener.close()) + + it('dial', async () => { + const conn = await ws.dial(ma, { websocket: { rejectUnauthorized: false } }) + const s = goodbye({ source: ['hey'], sink: collect }) + + const result = await pipe(s, conn, s) + + expect(result).to.be.eql([Buffer.from('hey')]) + }) + }) + describe('ip6', () => { let ws let listener const ma = multiaddr('/ip6/::1/tcp/9091') beforeEach(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) listener = ws.createListener(conn => pipe(conn, conn)) return listener.listen(ma) }) @@ -261,8 +297,8 @@ describe('dial', () => { expect(result).to.be.eql([Buffer.from('hey')]) }) - it('dial with IPFS Id', async () => { - const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + it('dial with p2p Id', async () => { + const ma = multiaddr('/ip6/::1/tcp/9091/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const conn = await ws.dial(ma) const s = goodbye({ @@ -280,7 +316,7 @@ describe('filter addrs', () => { let ws before(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) }) describe('filter valid addrs for this transport', function () { @@ -411,31 +447,35 @@ describe('filter addrs', () => { }) }) -describe('valid Connection', () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') +describe.skip('valid localAddr and remoteAddr', () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws') - it('get observed addrs', async () => { - let dialerObsAddrs - let listenerObsAddrs + it('should resolve port 0', async () => { + const ws = new WS({ upgrader: mockUpgrader }) - const ws = new WS() + // Create a Promise that resolves when a connection is handled + let handled + const handlerPromise = new Promise(resolve => { handled = resolve }) + const handler = conn => handled(conn) - const listener = ws.createListener(async conn => { - expect(conn).to.exist() - dialerObsAddrs = await conn.getObservedAddrs() - pipe(conn, conn) - }) + const listener = ws.createListener(handler) + // Listen on the multiaddr await listener.listen(ma) - const conn = await ws.dial(ma) - await pipe([], conn, consume) + const localAddrs = listener.getAddrs() + expect(localAddrs.length).to.equal(1) + + // Dial to that address + const dialerConn = await ws.dial(localAddrs[0]) - listenerObsAddrs = await conn.getObservedAddrs() + // Wait for the incoming dial to be handled + const listenerConn = await handlerPromise + // close the listener await listener.close() - expect(listenerObsAddrs[0]).to.deep.equal(ma) - expect(dialerObsAddrs.length).to.equal(0) + expect(dialerConn.localAddr.toString()).to.equal(listenerConn.remoteAddr.toString()) + expect(dialerConn.remoteAddr.toString()).to.equal(listenerConn.localAddr.toString()) }) })