diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6cb685e --- /dev/null +++ b/.gitignore @@ -0,0 +1,37 @@ +/src/app/system/console/colors.package.js~ +/package.json~ +/docs/RULES.md~ +# Logs +logs +*.log + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (http://nodejs.org/api/addons.html) +build/Release + +# Dependency directory +# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git +node_modules + +# Compiled api docs +api_docs +/log.txt +/bin +/stacks.out +/out diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..7ea8582 --- /dev/null +++ b/.npmignore @@ -0,0 +1,5 @@ +tests +README.md +.travis.yml +webpack.config +.jshintrc \ No newline at end of file diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..1471ada --- /dev/null +++ b/.travis.yml @@ -0,0 +1,4 @@ +language: node_js +node_js: + - "6.9" +script: "npm run-script test && node tests/benchmarks/index.js" diff --git a/README.md b/README.md new file mode 100644 index 0000000..a7d63c3 --- /dev/null +++ b/README.md @@ -0,0 +1,148 @@ +

+ + Kalm +

+
+ Kalm +

+

+ The Socket Optimizer +


+

+
+ +[![Kalm](https://img.shields.io/npm/v/kalm.svg)](https://www.npmjs.com/package/kalm) +[![Node](https://img.shields.io/badge/node->%3D4.0-blue.svg)](https://nodejs.org) +[![Build Status](https://travis-ci.org/fed135/Kalm.svg?branch=master)](https://travis-ci.org/fed135/Kalm) +[![Dependencies Status](https://david-dm.org/fed135/Kalm.svg)](https://www.npmjs.com/package/kalm) +[![Gitter](https://img.shields.io/gitter/room/fed135/kalm.svg)](https://gitter.im/fed135/Kalm) + + +## Still under development - check out the current release [here](https://github.com/fed135/Kalm) + +--- + +- **Easy-to-use syntax** and feature parity for all protocols +- Flexible and extensible, load your own transports and serializers +- **Multiplexing, session stores and transactions** +- Can be used between servers or in the **browser** +- Lower resource footprint and over **50x better throughtput** than plain sockets + + +## How it works + +**Bytes transfered** + +Call buffering can reduce payload sizes at the cost of some initial latency. +This makes a huge difference when you need to send a large number of small packets, such as multiplayer games do. See [Nagle's algorithm](https://en.wikipedia.org/wiki/Nagle's_algorithm). + + +## Usage + +**Server** + +```node + const Kalm = require('kalm'); + + // Listening for incoming UDP transactions on port 6000 + const server = Kalm.listen({ + port: 6000 + }); + + server.on('connection', (client) => { + // Subscribe to 'user.action' channel + client.subscribe('user.action', (req) => { + /* + req.body The body of the request + req.client The connection handle reference + req.frame The details of the network frame + req.session The session store for that connection + */ + }); + + // Broadcast to all connections subscribed to the channel 'user.join' + server.broadcast('user.join', { foo: 'bar' }); + }); + +``` + +**Client** + +```node + import Kalm from 'kalm'; + + // Opens a connection to the server + // Port, transport and serial settings should match + const client = Kalm.connect({ + hostname: '0.0.0.0', // Server's IP + port: 6000 // Server's port + }); + + client.write('user.action', {body: 'This is an object!'}); + client.subscribe('user.join', () => { //... }); + +``` + +## Options + +**Transports** + +Name | Module +--- | --- +IPC | `Kalm.transports.IPC` +TCP | `Kalm.transports.TCP` +UDP | `Kalm.transports.UDP` +WebSockets | [kalm-websocket](https://github.com/fed135/kalm-websocket) + +**Serializers** + +Name | Module +--- | --- +JSON | `Kalm.serials.JSON` +MSG-PACK | [kalm-msgpack](https://github.com/fed135/kalm-msgpack) +Snappy | [kalm-snappy](https://github.com/fed135/kalm-snappy) +`null` | As-is + + +**Profiles** + +Name | Module | Condition +--- | --- | --- | +dynamic | `Kalm.profiles.dynamic()` | Triggers based on buffer size and maximum time range (default) `{ step: 16, maxBytes: 1400 }` +heartbeat | `Kalm.profiles.heartbeat()` | Triggers at a fixed time interval `{ step: 16, maxBytes: null }` +threshold | `Kalm.profiles.threshold()` | Triggers when buffer reaches a certain size `{ step: null, maxBytes: 1400 }` +manual | `Kalm.profiles.manual()` | Need to process queues by hand `{ step: null, maxBytes: null }` + + +**Loading transports, profiles and serializers** + +```node + // Custom adapter loading example + const Kalm = require('kalm'); + const ws = require('kalm-websocket'); + const msgpack = require('kalm-msgpack'); + + const server = Kalm.listen({ + port: 3000, + transport: ws, + serial: msgpack, + profile: Kalm.profiles.heartbeat({ step: 5 }) // Triggers every 5ms + }); +``` + +## Testing + +**Unit + Smoke tests** + +`npm test` + +**Benchmarks** + +`node tests/benchmarks` + + +## Logging + +Kalm uses [debug](https://github.com/visionmedia/debug) + +`export DEBUG=kalm` diff --git a/index.js b/index.js new file mode 100644 index 0000000..7c68bee --- /dev/null +++ b/index.js @@ -0,0 +1,13 @@ +/** + * Kalm entry point + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const Kalm = require('./src'); + +/* Exports -------------------------------------------------------------------*/ + +module.exports = Kalm; \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..1fe4c5e --- /dev/null +++ b/package.json @@ -0,0 +1,47 @@ +{ + "name": "kalm", + "version": "2.0.0", + "description": "The socket optimizer", + "main": "./index.js", + "scripts": { + "test": "mocha tests/unit --recursive && mocha tests/integration" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/fed135/Kalm.git" + }, + "keywords": [ + "socket", + "tcp", + "udp", + "client", + "server", + "service", + "peer", + "micro-service", + "low-latency", + "light", + "ipc", + "messaging", + "queue" + ], + "author": "frederic charette ", + "license": "Apache-2.0", + "bugs": { + "url": "https://github.com/fed135/Kalm/issues" + }, + "homepage": "https://github.com/fed135/Kalm#readme", + "devDependencies": { + "chai": "^3.5.0", + "mocha": "^3.1.0", + "sinon": "^1.17.0" + }, + "dependencies": { + "debug": "2.6.x" + }, + "browser": { + "net": false, + "fs": false, + "dgram": false + } +} diff --git a/src/clientFactory.js b/src/clientFactory.js new file mode 100644 index 0000000..ea9e156 --- /dev/null +++ b/src/clientFactory.js @@ -0,0 +1,50 @@ +/** + * Client Factory + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const EventEmitter = require('events').EventEmitter; +const crypto = require('crypto'); + +const profiles = require('./profiles'); +const serials = require('./serials'); +const transports = require('./transports'); + +const Queue = require('./components/queue'); +const Multiplex = require('./components/multiplex'); +const Client = require('./components/client'); + +/* Methods -------------------------------------------------------------------*/ + +function create(options) { + const client = { + port: 3000, + hostname: '0.0.0.0', + transport: transports.TCP, + serial: serials.JSON, + secretKey: null, + profile: profiles.dynamic(), + channels: {}, + backlog: [] + }; + + Object.assign(client, + options, + Multiplex(client), + Queue(client), + Client(client), + EventEmitter.prototype + ); + + client.socket = client.socket || client.transport.createSocket(client); + client.transport.attachSocket(client.socket, client); + return client; +} + + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { create }; \ No newline at end of file diff --git a/src/components/client.js b/src/components/client.js new file mode 100644 index 0000000..0ee3cdd --- /dev/null +++ b/src/components/client.js @@ -0,0 +1,109 @@ +/** + * Client class + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const crypto = require('crypto'); + +const serializer = require('../utils/serializer'); +const debug = require('debug')('kalm'); +const profiles = require('../profiles'); +const sessions = require('../utils/sessions'); +const encrypter = require('../utils/encrypter'); + +/* Local variables -----------------------------------------------------------*/ + +const _pendingChannel = '_pending'; + +/* Methods -------------------------------------------------------------------*/ + +function Client(scope) { + return { + /** + * Queues a packet for transfer on the given channel + * @param {string} name The channel to send to data through + * @param {string|object} payload The payload to send + * @param {boolean} once Wether to override packets with scope one + * @returns {Client} The client, for chaining + */ + write: (name, message) => { + scope.queue(name) + .add(scope.serial ? scope.serial.encode(message) : message); + return scope; + }, + + /** + * Sends a packet - triggered by middlewares + * @param {string} channel The channel targeted for transfer + */ + end: (queue, packets) => { + let payload = serializer.serialize(queue.frame, queue.name, packets); + if (scope.secretKey !== null) payload = encrypter.encrypt(payload, scope.secretKey); + if (scope.socket.__connected) scope.transport.send(scope.socket, payload); + else scope.backlog.push(payload); + }, + + /** + * Destroys the client and connection + */ + destroy: () => { + if (scope.socket.__connected) scope.transport.disconnect(scope, scope.socket); + + + for (let channel in scope.queues) { + if (scope.queues.hasOwnProperty(channel)) { + scope.queues[channel].resetBundler(); + } + } + }, + + handleConnect: () => { + scope.socket.__connected = true; + scope.backlog.forEach(scope.transport.send.bind(null, scope.socket)); + scope.backlog.length = 0; + scope.emit('connect', scope); + }, + + handleError: (err) => { + debug(`error: ${err.message}`); + }, + + handleRequest: (payload) => { + const frames = serializer.deserialize((scope.secretKey !== null) ? encrypter.decrypt(payload, scope.secretKey) : payload); + frames.forEach((frame) => { + frame.packets.forEach((packet, messageIndex) => { + Promise.resolve() + .then(() => scope.serial ? scope.serial.decode(packet) : packet) + .catch(err => packet) + .then((decodedPacket) => { + scope.trigger(frame.channel, { + body: decodedPacket, + client: scope, + reply: scope.write.bind(null, frame.channel), + frame: { + id: frame.frame, + channel: frame.channel, + payloadBytes: frame.payloadBytes, + payloadMessages: frame.packets.length, + messageIndex + }, + session: sessions.resolve(scope.id) + }); + }); + }); + }); + }, + + handleDisconnect: () => { + scope.socket.__connected = false; + scope.emit('disconnect', scope); + } + }; +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = Client; \ No newline at end of file diff --git a/src/components/multiplex.js b/src/components/multiplex.js new file mode 100644 index 0000000..30b7672 --- /dev/null +++ b/src/components/multiplex.js @@ -0,0 +1,46 @@ +/** + * Multiplexed + */ + +'use strict'; + +/* Methods -------------------------------------------------------------------*/ + +function Multiplexed(scope) { + return { + /** + * Creates a channel for the client + * @param {string} name The name of the channel. + * @param {function} handler The handler to add to the channel + * @returns {Client} The client, for chaining + */ + subscribe: (name, handler) => { + name = '' + name; + scope.channels[name] = scope.channels[name] || []; + scope.channels[name].push(handler); + if (scope.connections) scope.emit('subscribe', name, handler); + }, + + /** + * Removes a handler from a channel + * @param {string} name The name of the channel. + * @param {function} handler The handler to remove from the channel + * @returns {Client} The client, for chaining + */ + unsubscribe: (name, handler) => { + name = '' + name; + scope.channels[name] = (scope.channels[name] || []) + .filter((event) => event !== handler && handler !== undefined); + if (scope.connections) scope.emit('unsubscribe', name, handler); + }, + + trigger: (name, params) => { + (scope.channels[name] || []) + .forEach((handler) => handler(params)); + } + }; +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = Multiplexed; \ No newline at end of file diff --git a/src/components/queue.js b/src/components/queue.js new file mode 100644 index 0000000..875027c --- /dev/null +++ b/src/components/queue.js @@ -0,0 +1,62 @@ +/** + * Queue system + */ + +'use strict'; + +/* Local variables -----------------------------------------------------------*/ + +const reservedBytes = 4; + +/* Methods -------------------------------------------------------------------*/ + +function QueueManager(scope) { + return { + queues: {}, + queue: (name, profile) => { + if (scope.queues.hasOwnProperty(name)) return scope.queues[name]; + + scope.queues[name] = Queue({ + name, + frame: 0, + packets: [], + timer: null, + bytes: 0 + }, profile || scope.profile, scope.end); + + return scope.queues[name]; + } + }; +} + +function Queue(scope, profile, end) { + scope.timer = setInterval(step, profile.tick); + + function add(packet) { + if (profile.maxBytes !== null) { + if (bytes() + packet.length > profile.maxBytes) step(); + scope.packets.push(packet); + scope.bytes += packet.length; + } + else scope.packets.push(packet); + } + + function bytes() { + return scope.bytes + scope.packets.length * 2 + scope.name.split('').length + reservedBytes; + } + + function step() { + if (scope.packets.length > 0) { + end(scope, scope.packets.concat()); + scope.packets.length = 0; + scope.bytes = 0; + scope.frame++; + } + } + + return { add, step }; +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = QueueManager; \ No newline at end of file diff --git a/src/components/server.js b/src/components/server.js new file mode 100644 index 0000000..8e26d7b --- /dev/null +++ b/src/components/server.js @@ -0,0 +1,100 @@ +/** + * Server class + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const debug = require('debug')('kalm'); +const clientFactory = require('../clientFactory'); +const sessions = require('../utils/sessions'); +const crypto = require('crypto'); + +/* Methods -------------------------------------------------------------------*/ + +function Server(scope) { + return { + /** + * Sends data to all connected clients + * @param {string} channel The name of the channel to send to + * @param {string|object} payload The payload to send + * @returns {Server} Returns itself for chaining + */ + broadcast: (channel, payload) => { + for (let i = scope.connections.length - 1; i >= 0; i--) { + scope.connections[i].send(channel, payload); + } + + return scope; + }, + + /** + * Closes the server + * @param {function} callback The callback method for the operation + */ + stop: (callback) => { + callback = callback || function() {}; + debug('warn: stopping server'); + + if (scope.listener) { + Promise.resolve() + .then(() => { + scope.connections.forEach(scope.transport.disconnect.bind(null)); + scope.connections.length = 0; + scope.transport.stop(scope, callback); + scope.listener = null; + }).then(null, scope.handleError.bind(scope)) + } + else { + scope.listener = null; + setTimeout(callback, 0); + } + }, + + /** + * Server error handler + * @param {Error} err The triggered error + */ + handleError: (err) => { + debug('error: ', err); + scope.emit('error', err); + }, + + /** + * Handler for receiving a new connection + * @private + * @param {Socket} socket The received connection socket + */ + handleConnection: (socket) => { + const origin = scope.transport.getOrigin(socket); + const hash = crypto.createHash('sha1'); + hash.update(scope.id); + hash.update(origin.host); + hash.update('' + origin.port); + + socket.__connected = true; + + const client = clientFactory.create({ + id: hash.digest('hex'), + transport: scope.transport, + serial: scope.serial, + catch: scope.catch, + socket, + secretKey: scope.secretKey, + isServer: true, + hostname: origin.host, + port: origin.port + }); + + scope.connections.push(client); + scope.emit('connection', client, sessions.resolve(client.id)); + client.on('disconnect', scope.emit.bind('disconnection')); + return client; + } + }; +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = Server; \ No newline at end of file diff --git a/src/index.js b/src/index.js new file mode 100644 index 0000000..34c7e65 --- /dev/null +++ b/src/index.js @@ -0,0 +1,28 @@ +/** + * Kalm bootstraper + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const clientFactory = require('./clientFactory'); +const serverFactory = require('./serverFactory'); +const profiles = require('./profiles'); +const serials = require('./serials'); +const transports = require('./transports'); + +/* Methods -------------------------------------------------------------------*/ + +function listen(options) { + return serverFactory.create(options); +} + +function connect(options) { + return clientFactory.create(options); +} + + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { listen, connect, serials, transports, profiles }; \ No newline at end of file diff --git a/src/profiles.js b/src/profiles.js new file mode 100644 index 0000000..94ed4fe --- /dev/null +++ b/src/profiles.js @@ -0,0 +1,36 @@ +/** + * class @profiles + */ + +'use strict'; + +/* Methods -------------------------------------------------------------------*/ + +function dynamic(options) { + return Object.assign({ + tick: 16, + maxBytes: 1400 + }, options || {}); +} + +function heartbeat(options) { + return Object.assign({ + tick: 16, + maxBytes: null + }, options || {}); +} + +function threshold(options) { + return Object.assign({ + tick: null, + maxBytes: 1400 + }, options || {}); +} + +function manual() { + return { tick: null, maxBytes: null }; +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { dynamic, heartbeat, threshold, manual }; \ No newline at end of file diff --git a/src/serials/index.js b/src/serials/index.js new file mode 100644 index 0000000..bbfbce7 --- /dev/null +++ b/src/serials/index.js @@ -0,0 +1,13 @@ +/** + * Encoders + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const JSON = require('./json'); + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { JSON }; \ No newline at end of file diff --git a/src/serials/json.js b/src/serials/json.js new file mode 100644 index 0000000..853c4d9 --- /dev/null +++ b/src/serials/json.js @@ -0,0 +1,30 @@ +/** + * JSON Encoder + * @module serials.JSON + */ + +'use strict'; + +/* Methods -------------------------------------------------------------------*/ + +/** + * Encodes a payload + * @param {object} payload The payload to encode + * @returns {Buffer} The encoded payload + */ +function encode(payload) { + return (Buffer.isBuffer(payload)) ? payload : new Buffer(JSON.stringify(payload)); +} + +/** + * Decodes a payload + * @param {Buffer} payload The payload to decode + * @returns {object} The decoded payload + */ +function decode(payload) { + return JSON.parse(String.fromCharCode.apply(null, payload)); +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { encode, decode }; \ No newline at end of file diff --git a/src/serverFactory.js b/src/serverFactory.js new file mode 100644 index 0000000..b70469d --- /dev/null +++ b/src/serverFactory.js @@ -0,0 +1,50 @@ +/** + * Kalm bootstraper + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const EventEmitter = require('events').EventEmitter; +const crypto = require('crypto'); + +const profiles = require('./profiles'); +const serials = require('./serials'); +const transports = require('./transports'); + +const Multiplex = require('./components/multiplex'); +const Server = require('./components/server'); + +const clientFactory = require('./clientFactory'); + + +/* Methods -------------------------------------------------------------------*/ + +function create(options) { + const server = { + id: crypto.randomBytes(8).toString('hex'), + port: 3000, + profile: profiles.dynamic(), + serial: serials.JSON, + secretKey: null, + transport: transports.TCP, + connections: [] + }; + + Object.assign(server, + options, + Server(server), + EventEmitter.prototype + ); + + server.transport.listen(server, options, clientFactory) + .then(listener => server.listener = listener); + + return server; +} + + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { create }; \ No newline at end of file diff --git a/src/transports/index.js b/src/transports/index.js new file mode 100644 index 0000000..7e8fa63 --- /dev/null +++ b/src/transports/index.js @@ -0,0 +1,15 @@ +/** + * Adapters + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const IPC = require('./ipc'); +const TCP = require('./tcp'); +const UDP = require('./udp'); + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { IPC, TCP, UDP }; \ No newline at end of file diff --git a/src/transports/ipc.js b/src/transports/ipc.js new file mode 100644 index 0000000..ad2043b --- /dev/null +++ b/src/transports/ipc.js @@ -0,0 +1,93 @@ +/** + * IPC transport methods + * @module transports.IPC + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const net = require('net'); +const fs = require('fs'); + +/* Local variables -----------------------------------------------------------*/ + +const _path = '/tmp/app.socket-'; + +/* Methods -------------------------------------------------------------------*/ + +/** + * Returns a new listener + * @param {Server} server The server object + * @param {object} options The options for the listener + * @param {function} callback The callback for the operation + * @returns {Promise(object)} The new listener + */ +function listen(server, options, callback) { + const res = Promise.defer(); + fs.unlink(_path + options.port, (err) => { + const listener = net.createServer(server.handleConnection.bind(server)); + listener.on('error', server.handleError.bind(server)); + listener.listen(_path + options.port, res.resolve.bind(res, listener)); + }); + return res.promise; +} + +function getOrigin(socket) { + return { + host: socket._server._pipeName, + port: '' + socket._handle.fd + }; +} + +/** + * Creates a client and adds the data listener(s) to it + * @param {Client} client The client to create the socket for + * @param {Socket} socket Optionnal existing socket object. + * @returns {Socket} The created ipc socket + */ +function createSocket(client) { + return net.connect(_path + client.port); +} + +function attachSocket(socket, client) { + socket.on('data', client.handleRequest.bind(client)); + socket.on('error', client.handleError.bind(client)); + socket.on('connect', client.handleConnect.bind(client)); + socket.on('close', client.handleDisconnect.bind(client)); +} + +/** + * Stops the server + * @placeholder + * @param {Server} server The server object + * @param {function} callback The success callback for the operation + */ +function stop(server, callback) { + server.listener.close(() => setTimeout(callback, 0)); +} + +/** + * Sends a message with a socket client + * @placeholder + * @param {Socket} socket The socket to use + * @param {Buffer} payload The body of the request + */ +function send(socket, payload) { + socket.write(payload); +} + +/** + * @placeholder + * Attempts to disconnect the client's connection + * @param {Client} client The client to disconnect + */ +function disconnect(client) { + client.socket.end(); + client.socket.destroy(); + setTimeout(client.handleDisconnect.bind(client), 0); +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { listen, getOrigin, stop, send, disconnect, createSocket, attachSocket }; \ No newline at end of file diff --git a/src/transports/tcp.js b/src/transports/tcp.js new file mode 100644 index 0000000..749dc43 --- /dev/null +++ b/src/transports/tcp.js @@ -0,0 +1,89 @@ +/** + * TCP transport methods + * @module transports.TCP + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const net = require('net'); + +/* Local variables -----------------------------------------------------------*/ + +const _socketTimeout = 300000; // 5 Minutes + +/* Methods -------------------------------------------------------------------*/ + +/** + * Listens for tcp connections, updates the 'listener' property of the server + * @param {Server} server The server object + */ +function listen (server, options) { + const res = Promise.defer(); + const listener = net.createServer(server.handleConnection.bind(server)); + listener.on('error', server.handleError.bind(server)); + listener.listen(options.port, res.resolve.bind(res, listener)); + return res.promise; +} + +function getOrigin(socket) { + return { + host: socket.remoteAddress, + port: socket.remotePort + }; +} + +/** + * Creates a client + * @param {Client} client The client to create the socket for + * @param {Socket} socket Optionnal existing socket object. + * @returns {Socket} The created tcp client + */ +function createSocket(client) { + return net.connect(client.port, client.hostname); +} + +function attachSocket(socket, client) { + socket.on('data', client.handleRequest.bind(client)); + socket.on('error', client.handleError.bind(client)); + socket.on('connect', client.handleConnect.bind(client)); + socket.on('close', client.handleDisconnect.bind(client)); + socket.on('timeout', () => this.disconnect(client)); + socket.setTimeout(client.socketTimeout || _socketTimeout); +} + +/** + * Stops the server + * @placeholder + * @param {Server} server The server object + * @param {function} callback The success callback for the operation + */ +function stop(server, callback) { + server.listener.close(() => setTimeout(callback, 0)); +} + +/** + * Sends a message with a socket client + * @placeholder + * @param {Socket} socket The socket to use + * @param {Buffer} payload The body of the request + */ +function send(socket, payload) { + socket.write(payload); +} + +/** + * @placeholder + * Attempts to disconnect the client's connection + * @param {Client} client The client to disconnect + */ +function disconnect(client) { + client.socket.end(); + client.socket.destroy(); + setTimeout(client.handleDisconnect.bind(client), 0); +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { listen, getOrigin, stop, send, disconnect, createSocket, attachSocket }; \ No newline at end of file diff --git a/src/transports/udp.js b/src/transports/udp.js new file mode 100644 index 0000000..4d1653c --- /dev/null +++ b/src/transports/udp.js @@ -0,0 +1,124 @@ +/** + * UDP transport methods + * @module transports.UDP + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const dgram = require('dgram'); + +/* Local variables -----------------------------------------------------------*/ + +const _socketType = 'udp4'; +const _keySeparator = ':'; +const _localAddress = '0.0.0.0'; +const _reuseAddr = true; + +/* Methods -------------------------------------------------------------------*/ + +/** + * Creates a socket + Client on UDP data + * @private + * @param {Server} server The server object + * @param {array} data Payload from an incomming request + * @param {object} origin The call origin info + */ +function _handleNewSocket(server, clientFactory, data, origin) { + let key = [origin.address, _keySeparator, origin.port].join(); + + if (!server.__clients) server.__clients = {}; + if (!(key in server.__clients)) { + // Circular dependency... + server.__clients[key] = clientFactory.create({ + hostname: origin.address, + port: origin.port, + transport: server.transport, + serial: server.serial, + secretKey: server.secretKey + }); + } + + server.__clients[key].handleRequest(data); + } + +/** + * Listens for udp connections, updates the 'listener' property of the server + * @param {Server} server The server object + * @param {function} callback The success callback for the operation + */ +function listen(server, options, clientFactory) { + const listener = dgram.createSocket({ type: _socketType, reuseAddr: _reuseAddr }); + listener.on('message', _handleNewSocket.bind(null, server, clientFactory)); + listener.on('error', server.handleError.bind(server)); + listener.bind(options.port, _localAddress); + + return Promise.resolve(listener); +} + +function getOrigin(socket) { + return { + host: socket.hostname, + port: socket.port + }; +} + +/** + * Sends a message with a socket client + * @param {Socket} socket The socket to use + * @param {Buffer} payload The body of the request + */ +function send(socket, payload) { + socket.send(payload, 0, payload.length, socket._port, socket._hostname); +} + +/** + * Stops the server. + * @param {Server} server The server object + * @param {function} callback The success callback for the operation + */ +function stop(server, callback) { + Object.keys(server.__clients).forEach((client) => { + disconnect(server.__clients[client]) + }); + server.listener.close(); + setTimeout(callback, 0); +} + +/** + * Creates a client + * @param {Client} client The client to create the socket for + * @param {Socket} soc Optionnal existing socket object. - Not used for UPC + * @returns {Socket} The created tcp client + */ +function createSocket(client) { + let socket = dgram.createSocket(_socketType); + socket._port = client.port; + socket._hostname = client.hostname; + + setTimeout(client.handleConnect.bind(client), 0); + return socket; +} + +function attachSocket(socket, client) { + socket.on('error', client.handleError.bind(client)); + socket.on('message', client.handleRequest.bind(client)); + + // Bind socket to also listen on it's address + socket.bind(null, _localAddress); +} + +/** + * Attempts to disconnect the client's connection + * @param {Client} client The client to disconnect + */ +function disconnect(client) { + // Nothing to do + setTimeout(client.handleDisconnect.bind(client), 0); +} + + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { listen, getOrigin, stop, send, disconnect, createSocket, attachSocket }; \ No newline at end of file diff --git a/src/utils/encrypter.js b/src/utils/encrypter.js new file mode 100644 index 0000000..576e86a --- /dev/null +++ b/src/utils/encrypter.js @@ -0,0 +1,66 @@ +/** + * Password protected buffer encryption + */ + +'use strict'; + +/* Methods -------------------------------------------------------------------*/ + +function mapKeyIn(key) { + const seed = Number(toUint8(key).join('')); + const list = new Array(256); + const dict = new Array(256); + + for (let i = 0; i < 256; i++) { + const temp = list[i] || i; + const rand = (seed % (i+1) + i) % 256; + list[i] = list[rand] || rand; + list[rand] = temp; + } + + list.forEach((val, index) => dict[val] = index); + + return dict; +} + +function mapKeyOut(key) { + const seed = Number(toUint8(key).join('')); + const dict = new Array(256); + + for (let i = 0; i < 256; i++) { + const temp = dict[i] || i; + const rand = (seed % (i+1) + i) % 256; + dict[i] = dict[rand] || rand; + dict[rand] = temp; + } + + return dict; +} + +function toUint8(str) { + return str.toString() + .split('') + .map(char => char.charCodeAt(0)); +} + +function byteIn(keyMap, val, index) { + return keyMap[val]; +} + +function byteOut(keyMap, val, index) { + return keyMap[val]; +} + +function encrypt(bytes, key) { + if (typeof bytes === 'string') bytes = toUint8(bytes); + return bytes.map(byteIn.bind(null, mapKeyIn(String(key)))); +} + + +function decrypt(bytes, key) { + return bytes.map(byteOut.bind(null, mapKeyOut(String(key)))); +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { encrypt, decrypt }; \ No newline at end of file diff --git a/src/utils/serializer.js b/src/utils/serializer.js new file mode 100644 index 0000000..cab1dce --- /dev/null +++ b/src/utils/serializer.js @@ -0,0 +1,86 @@ +/** + * Serializer + */ + +'use strict'; + +/* Methods -------------------------------------------------------------------*/ + +function serialize(frame, channel, packets) { + let result = [frame, channel.length]; + + for (let letter = 0; letter < channel.length; letter++) { + result.push(channel.charCodeAt(letter)); + } + + result = result.concat(uint16Size(packets.length)); + + packets.forEach((packet) => { + result = result.concat( + uint16Size(packet.length), + Array.prototype.slice.call(packet) + ); + }); + + return Buffer.from(result); +} + +function uint16Size(value) { + return [value >>> 8, value & 0xff]; +} + +function numericSize(a, b) { + return (a << 8) | b; +} + +function parseFrame(frames, payload, startIndex) { + const result = { + frame: payload[startIndex], + channel: '', + payloadBytes: payload.length, + packets: [] + }; + + const letters = []; + const channelLength = payload[startIndex + 1]; + let caret = startIndex + channelLength + 2; + + for (let letter = startIndex + 2; letter < startIndex + channelLength + 2; letter++) { + letters.push(payload[letter]); + } + result.channel = String.fromCharCode.apply(null, letters); + + const totalPackets = numericSize(payload[caret], payload[caret + 1]); + + caret = caret + 2; + + for (let p = 0; p < totalPackets; p++) { + let packetLength = numericSize(payload[caret], payload[caret + 1]); + let packet = []; + for (let byte = caret + 2; byte < packetLength + caret + 2; byte++) { + packet.push(payload[byte]); + } + result.packets.push(packet); + + caret = caret + packetLength + 2; + } + + frames.push(result); + return caret; +} + +function deserialize(payload) { + const frames = []; + const payloadBytes = payload.length; + let caret = 0; + + while(caret= _maxCount) return; + _curr++; + } + setImmediate(function _stepHandler() { + transport.step(_repeat); + }); + } + + _repeat(); + }); +} + +function _updateSettings(obj, resolve) { + settings.transport = obj.transport || settings.transport; + resolve(); +} + +function _errorHandler(err) { + console.log(err); +} + +function _postResults() { + console.log(JSON.stringify(results)); + // Do something with the info + process.exit(); +} + +/* Init ----------------------------------------------------------------------*/ + + +// Roll port number +settings.port = 3000 + Math.round(Math.random()*1000); + +var adpts = Object.keys(Suite).map(function(k) { + return { + transport: k, + settings: {transport: k}, + raw: Suite[k], + kalm: Kalm + }; +}); + +adpts.forEach(function(i) { + tests.push(function(resolve) { + console.log('Configuring ' + i.transport); + _updateSettings(i.settings, resolve); + }); + + tests.push(function(resolve) { + console.log('Measuring raw ' + i.transport); + _measure(i.raw, function(total) { + results['raw_' + i.transport] = total; + resolve(); + }); + }); + + tests.push(function(resolve) { + console.log('Measuring Kalm ' + i.transport); + _measure(i.kalm, function(total) { + results['kalm_' + i.transport] = total; + resolve(); + }); + }); +}); + +tests.push(_postResults); + +tests.reduce(function(current, next) { + return current.then(function(resolve) { + return new Promise(next).then(resolve, _errorHandler); + }, _errorHandler); +}, Promise.resolve()); \ No newline at end of file diff --git a/tests/benchmarks/settings.js b/tests/benchmarks/settings.js new file mode 100644 index 0000000..25de268 --- /dev/null +++ b/tests/benchmarks/settings.js @@ -0,0 +1,8 @@ +module.exports = { + transport: 'TCP', + port: 3000, + profile: { tick: 5, maxBytes: 1400 }, + testDuration: 1000 * 60, + testPayload: { foo: 'bar'}, + testChannel: 'test' +}; \ No newline at end of file diff --git a/tests/benchmarks/transports/ipc.js b/tests/benchmarks/transports/ipc.js new file mode 100644 index 0000000..394c596 --- /dev/null +++ b/tests/benchmarks/transports/ipc.js @@ -0,0 +1,72 @@ +/** + * KALM Benchmark + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +var net = require('net'); + +var settings = require('../settings'); + +/* Local variables -----------------------------------------------------------*/ + +var server; +var client; + +var count = 0; +var handbreak = true; + +/* Methods -------------------------------------------------------------------*/ + +function _absorb(err) { + console.log(err); + return true; +} + +function setup(resolve) { + server = net.createServer(function(socket) { + socket.on('error', _absorb); + socket.on('data', function() { + count++; + }); + }); + handbreak = false; + server.on('error', _absorb); + server.listen('/tmp/app.socket-' + settings.port, resolve); +} + +function teardown(resolve) { + if (client) client.destroy(); + if (server) server.close(function() { + server = null; + client = null; + resolve(count); + }); +} + +function stop(resolve) { + handbreak = true; + setTimeout(resolve, 0); +} + +function step(resolve) { + if (handbreak) return; + if (!client) { + client = net.connect('/tmp/app.socket-' + settings.port); + client.on('error', _absorb); + } + + client.write(JSON.stringify(settings.testPayload)); + resolve(); +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { + setup: setup, + teardown: teardown, + step: step, + stop: stop +}; \ No newline at end of file diff --git a/tests/benchmarks/transports/kalm.js b/tests/benchmarks/transports/kalm.js new file mode 100644 index 0000000..4796c00 --- /dev/null +++ b/tests/benchmarks/transports/kalm.js @@ -0,0 +1,74 @@ +/** + * KALM Benchmark + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +var settings = require('../settings'); +var Kalm = require('../../../index'); + +/* Local variables -----------------------------------------------------------*/ + +var server; +var client; + +var count = 0; +var handbreak = true; + +/* Methods -------------------------------------------------------------------*/ + +function setup(resolve) { + server = Kalm.listen({ + port: settings.port, + transport: Kalm.transports[settings.transport], + profile: settings.profile, + secretKey: 'secretkeyshouldbeatleast16chars' + }); + + server.on('connection', (c) => { + c.subscribe(settings.testChannel, () => count++); + }); + + + handbreak = false; + setTimeout(resolve, 0); +} + +function teardown(resolve) { + if (server) server.stop(function() { + server = null; + client = null; + resolve(count); + }); +} + +function stop(resolve) { + handbreak = true; + setTimeout(resolve, 0); +} + +function step(resolve) { + if (handbreak) return; + if (!client) { + client = Kalm.connect({ + port: settings.port, + transport: Kalm.transports[settings.transport], + profile: settings.profile, + secretKey: 'secretkeyshouldbeatleast16chars' + }); + } + + client.write(settings.testChannel, settings.testPayload); + resolve(); +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { + setup: setup, + teardown: teardown, + step: step, + stop: stop +}; \ No newline at end of file diff --git a/tests/benchmarks/transports/tcp.js b/tests/benchmarks/transports/tcp.js new file mode 100644 index 0000000..bd4f522 --- /dev/null +++ b/tests/benchmarks/transports/tcp.js @@ -0,0 +1,72 @@ +/** + * KALM Benchmark + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +var net = require('net'); +var settings = require('../settings'); + +/* Local variables -----------------------------------------------------------*/ + +var server; +var client; + +var count = 0; +var handbreak = true; + +/* Methods -------------------------------------------------------------------*/ + +function _absorb(err) { + console.log(err); + return; +} + +function setup(resolve) { + server = net.createServer(function(socket) { + socket.on('data', function() { + count++; + }); + socket.on('error', _absorb); + }); + handbreak = false; + server.on('error', _absorb); + server.listen(settings.port, resolve); +} + +function teardown(resolve) { + if (client) client.destroy(); + if (server) server.close(function() { + server = null; + client = null; + resolve(count); + }); +} + +function stop(resolve) { + handbreak = true; + setTimeout(resolve, 0); +} + +function step(resolve) { + if (handbreak) return; + if (!client) { + client = net.connect(settings.port, '0.0.0.0'); + client.on('error', _absorb); + } + + if (client) + client.write(JSON.stringify(settings.testPayload)); + resolve(); +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { + setup: setup, + teardown: teardown, + step: step, + stop: stop +}; \ No newline at end of file diff --git a/tests/benchmarks/transports/udp.js b/tests/benchmarks/transports/udp.js new file mode 100644 index 0000000..0ead264 --- /dev/null +++ b/tests/benchmarks/transports/udp.js @@ -0,0 +1,78 @@ +/** + * KALM Benchmark + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +var dgram = require('dgram'); + +var settings = require('../settings'); + +/* Local variables -----------------------------------------------------------*/ + +var server; +var client; + +var count = 0; +var handbreak = true; + +/* Methods -------------------------------------------------------------------*/ + +function _absorb(err) { + console.log(err); + return; +} + +function setup(resolve) { + server = dgram.createSocket('udp4'); + server.on('message', function() { + count++; + }); + handbreak = false; + server.on('error', _absorb); + server.bind(settings.port, '0.0.0.0'); + resolve(); +} + +function teardown(resolve) { + server.close(function() { + server = null; + client = null; + resolve(count); + }); +} + +function stop(resolve) { + handbreak = true; + setTimeout(resolve, 0); +} + +function step(resolve) { + if (handbreak) return; + if (!client) { + client = dgram.createSocket('udp4'); + client.on('error', _absorb); + } + + var payload = new Buffer(JSON.stringify(settings.testPayload)); + + client.send( + payload, + 0, + payload.length, + settings.port, + '0.0.0.0' + ); + resolve(); +} + +/* Exports -------------------------------------------------------------------*/ + +module.exports = { + setup: setup, + teardown: teardown, + step: step, + stop: stop +}; \ No newline at end of file diff --git a/tests/integration/index.js b/tests/integration/index.js new file mode 100644 index 0000000..67c72a3 --- /dev/null +++ b/tests/integration/index.js @@ -0,0 +1,64 @@ +/** + * Kalm integration test suite + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const Kalm = require('../../index'); + +/* Suite --------------------------------------------------------------------*/ + +describe('Integration tests', () => { + + ['IPC', 'TCP', 'UDP'].forEach((transport) => { + describe('Testing ' + transport + ' transport', () => { + let server; + + /* --- Setup ---*/ + + // Create a server before each scenario + beforeEach(() => { + server = Kalm.listen({ transport: Kalm.transports[transport] }); + }); + + // Cleanup afterwards + afterEach((done) => { + server.stop(() => { + server = null; + done(); + }) + }); + + /* --- Tests --- */ + + it('should work with ' + transport, (done) => { + let payload = {foo:'bar'}; + server.subscribe('test', (data) => { + expect(data).to.eql(payload); + done(); + }); + + let client = Kalm.connect({ transport: Kalm.transports[transport] }); + client.write('test', payload); + }); + + it('should handle large payloads with ' + transport, (done) => { + let largePayload = []; + while(largePayload.length < 2048) { + largePayload.push({foo: 'bar'}); + } + + server.subscribe('test', (data) => { + expect(data).to.eql(largePayload); + done(); + }); + + let client = Kalm.connect({ transport: Kalm.transports[transport] }); + client.write('test', largePayload); + }); + }); + }); +}); \ No newline at end of file diff --git a/tests/unit/clientFactory.spec.js b/tests/unit/clientFactory.spec.js new file mode 100644 index 0000000..41da619 --- /dev/null +++ b/tests/unit/clientFactory.spec.js @@ -0,0 +1,161 @@ +/** + * Client class + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../src/clientFactory'); + +const EventEmitter = require('events').EventEmitter; + +/* Tests ---------------------------------------------------------------------*/ + +describe('Client', () => { + + var testSocket = { + on: function() {}, + setTimeout: function() {}, + end: function() {}, + write: function() {}, + once: function() {}, + destroy: function() {}, + pipe: function() { + return new EventEmitter(); + } + }; + + describe('#constructor(options, socket)', () => { + it('should create a valid Client', () => { + const result = testModule.create(); + expect(result.id).to.be.string; + expect(result.channels).to.not.be.object; + expect(result.fromServer).to.be.false; + expect(result.tick).to.be.null; + expect(result.socket).to.not.be.null; + }); + }); + + describe('#subscribe(name, handler, options)', () => { + it('should use/create a channel and add the handler to it', () => { + const testClient = testModule.create(); + const testHandler = function foo() {}; + testClient.subscribe('test', testHandler); + + expect(testClient.channels.test).to.be.instanceof(Channel); + expect(testClient.channels.test.handlers).to.include(testHandler); + }); + }); + + describe('#unsubscribe(name, handler)', () => { + it('should remove a handler from a channel', () => { + const testClient = testModule.create(); + const testHandler = function foo() {}; + testClient.subscribe('test', testHandler); + testClient.unsubscribe('test', testHandler); + + expect(testClient.channels.test).to.be.instanceof(Channel); + expect(testClient.channels.test.handlers).to.not.include(testHandler); + }); + }); + + describe('#handleError(err)', () => { + it('should print and dispatch the error', (done) => { + const testClient = testModule.create({ socket: testSocket }); + testClient.on('error', () => done()); + testClient.handleError(new Error); + }); + }); + + describe('#handleConnect(socket)', () => { + it('should print and dispatch the event', (done) => { + const testClient = testModule.create({ socket: testSocket }); + testClient.on('connect', () => done()); + testClient.handleConnect({}); + }); + + it('should print and dispatch the alternate event', (done) => { + const testClient = testModule.create({ socket: testSocket }); + testClient.on('connection', () => done()); + testClient.handleConnect({}); + }); + }); + + describe('#handleDisconnect(socket)', () => { + it('should print and dispatch the event', (done) => { + const testClient = testModule.create({ socket: testSocket }); + testClient.on('disconnect', () => done()); + testClient.handleDisconnect({}); + }); + + it('should print and dispatch the alternate event', (done) => { + const testClient = testModule.create({ socket: testSocket }); + testClient.on('disconnection', () => done()); + testClient.handleDisconnect({}); + }); + }); + + describe('#send(name, payload)', () => { + it('should call send on the proper channel', () => { + const testClient = testModule.create({ socket: testSocket }); + const testPayloads = [ + {foo: 'bar'}, + {foo: 'baz'} + ]; + + testPayloads.forEach((payload) => { + testClient.send('test-channel', payload); + }); + expect(testClient.channels['test-channel']).to.not.be.null; + expect(testClient.channels['test-channel'].packets).to.deep.equal(testPayloads); + }); + }); + + describe('#createSocket(socket)', () => { + it('should call the appropriate adapter\'s createSocket', () => { + const testClient = testModule.create({}); + const result = testClient.createSocket(); + expect(result.on).is.function; + expect(result.emit).is.function; + expect(result.write).is.function; + expect(result.end).is.function; + expect(result.connect).is.function; + expect(result.setTimeout).is.function; + expect(result.destroy).is.function; + }); + }); + + describe('#handleRequest(evt)', () => { + it('should call handleData on the appropriate channels', (done) => { + const testClient = testModule.create({ socket: testSocket }); + const testHandler1 = sinon.spy(); + const testHandler2 = sinon.spy(); + testClient.subscribe('test', testHandler1); + testClient.subscribe('test2', testHandler2); + + testClient.handleRequest(JSON.stringify(['test', ['data']])); + + setTimeout(() => { + expect(testHandler1.withArgs('data').calledOnce).to.be.true; + expect(testHandler2.calledOnce).to.be.false; + done(); + },1); + }); + }); + + describe('#destroy()', () => { + it('should call the appropriate adapter\'s disconnect', (done) => { + const testClient = testModule.create({ socket: testSocket }); + + testClient.on('disconnect', () => { + expect(testClient.socket).to.be.null; + done(); + }); + + testClient.destroy(); + }); + }); +}); diff --git a/tests/unit/components/client.spec.js b/tests/unit/components/client.spec.js new file mode 100644 index 0000000..5eb7251 --- /dev/null +++ b/tests/unit/components/client.spec.js @@ -0,0 +1,13 @@ +/** + * Client + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../../src/components/client'); + +/* Tests ---------------------------------------------------------------------*/ \ No newline at end of file diff --git a/tests/unit/components/multiplex.spec.js b/tests/unit/components/multiplex.spec.js new file mode 100644 index 0000000..b135f7e --- /dev/null +++ b/tests/unit/components/multiplex.spec.js @@ -0,0 +1,13 @@ +/** + * Multiplex + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../../src/components/multiplex'); + +/* Tests ---------------------------------------------------------------------*/ \ No newline at end of file diff --git a/tests/unit/components/queue.spec.js b/tests/unit/components/queue.spec.js new file mode 100644 index 0000000..d638579 --- /dev/null +++ b/tests/unit/components/queue.spec.js @@ -0,0 +1,13 @@ +/** + * Queue + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../../src/components/queue'); + +/* Tests ---------------------------------------------------------------------*/ \ No newline at end of file diff --git a/tests/unit/components/server.spec.js b/tests/unit/components/server.spec.js new file mode 100644 index 0000000..f470ef5 --- /dev/null +++ b/tests/unit/components/server.spec.js @@ -0,0 +1,13 @@ +/** + * Server + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../../src/components/server'); + +/* Tests ---------------------------------------------------------------------*/ \ No newline at end of file diff --git a/tests/unit/serials/json.spec.js b/tests/unit/serials/json.spec.js new file mode 100644 index 0000000..31267d2 --- /dev/null +++ b/tests/unit/serials/json.spec.js @@ -0,0 +1,51 @@ +/** + * JSON Encoder + * @module serials/json + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../../src/serials/json'); + +/* Tests ---------------------------------------------------------------------*/ + +var tests = { + strings: ['string', 'with\"\'quotes'], + //buffers: [new Buffer('test')], + numbers: [42, 0, Math.PI], + booleans: [true, false], + arrays: [[1,2,3]], + objects: [{foo: 'bar'}] +}; + +function wrap(data) { + return ['test-channel', [data]]; +} + +describe('JSON Encoder', () => { + describe('#encode(payload)', () => { + Object.keys(tests).forEach((test) => { + it('should encode ' + test, () => { + tests[test].forEach((payload) => { + expect(testModule.encode(wrap(payload))).to.be.instanceof(Buffer); + }); + }); + }); + }); + + describe('#decode(payload)', () => { + Object.keys(tests).forEach((test) => { + it('should decode ' + test, () => { + tests[test].forEach((payload) => { + var buffer = testModule.encode(wrap(payload)); + var result = testModule.decode(buffer); + expect(result[1][0]).to.deep.equal(payload); + }); + }); + }); + }); +}); \ No newline at end of file diff --git a/tests/unit/serverFactory.spec.js b/tests/unit/serverFactory.spec.js new file mode 100644 index 0000000..b1d3ffb --- /dev/null +++ b/tests/unit/serverFactory.spec.js @@ -0,0 +1,149 @@ +/** + * Server class + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../src/serverFactory'); +const clientFactory = require('../../src/clientFactory'); + +const EventEmitter = require('events').EventEmitter; + +/* Tests ---------------------------------------------------------------------*/ + +describe('Server', () => { + + var testServer; + + afterEach((done) => { + if (testServer.stop) { + testServer.stop(); + } + sinon.mock.restore(); + done(); + }); + + describe('#constructor(options)', () => { + it('should create a valid Server', () => { + testServer = testModule.create(); + expect(testServer.id).to.be.string; + expect(testServer.connections).to.be.array; + expect(testServer.channels).to.be.object; + }); + }); + + describe('#subscribe(name, handler, options)', () => { + it('new and existing connections subscribe to the channel and add the handler', () => { + const testHandler = function () {}; + testServer = testModule.create(); + const testSubscribe = sinon.spy(); + testServer.connections.push({ + subscribe: testSubscribe + }); + + testServer.subscribe('test', testHandler); + expect(testServer.channels.test).to.deep.equal([['test', testHandler, undefined]]); + expect(testSubscribe.calledOnce).to.be.true; + }); + }); + + describe('#unsubscribe(name, handler)', () => { + it('new and existing connections remove the handler from their channel', () => { + const testHandler = function () {}; + testServer = testModule.create(); + const testUnsubscribe = sinon.spy(); + testServer.connections.push({ + unsubscribe: testUnsubscribe + }); + testServer.unsubscribe('test', testHandler); + expect(testServer.channels.test).to.be.array; + expect(testUnsubscribe.calledOnce).to.be.true; + }); + }); + + describe('#broadcast(channel, payload)', () => { + it('should call send on all connections', () => { + testServer = testModule.create(); + const testSocket = { + on: function() {}, + setTimeout: function() {}, + end: function() {}, + destroy: function() {}, + pipe: function() { + return new EventEmitter(); + } + }; + testServer.connections.push(clientFactory.create({ socket: testSocket })); + testServer.broadcast('test', 'test'); + expect(testServer.connections[0].channels.test.packets).to.include('test'); + }); + }); + + describe('#stop(callback)', () => { + it('should call the appropriate adapter\'s stop', (done) => { + testServer = testModule.create(); + const adapterTest = sinon.mock(adapters.resolve(testServer.options.adapter)); + testServer.stop(() => { + testServer.listener = { + close: function(cb) { + cb(); + } + }; + + testServer.stop(() => { + expect(testServer.listener).to.be.null; + sinon.mock.restore(); + done(); + }); + }); + }); + }); + + describe('#handleError(err)', () => { + it('should print and dispatch the error', (done) => { + testServer = testModule.create(); + testServer.on('error', (e) => done()); + testServer.handleError('testError'); + }); + }); + + describe('#handleRequest(socket)', () => { + it('should push the new connection and dispatch connection events', (done) => { + const testSocket = { + on: function() {}, + setTimeout: function() {}, + end: function() {}, + destroy: function() {}, + pipe: function() { + return new EventEmitter(); + } + }; + + testServer = testModule.create(); + testServer.on('connect', () => { + expect(testServer.connections.length).to.be.equal(1); + expect(testServer.connections[0].options).to.deep.equal({ + adapter: 'tcp', + encoder: 'json', + hostname: '0.0.0.0', + port: 3000, + bundler: { + delay: 16, + maxPackets: 2048, + serverTick: false, + splitBatches: true + }, + socketTimeout: 30000, + stats: false, + rejectForeign: true + }); + done(); + }); + testServer.handleRequest(testSocket); + }); + }); +}); diff --git a/tests/unit/transports/ipc.spec.js b/tests/unit/transports/ipc.spec.js new file mode 100644 index 0000000..dd9e1cf --- /dev/null +++ b/tests/unit/transports/ipc.spec.js @@ -0,0 +1,161 @@ +/** + * InterProcessCall connector methods + * @module transports/ipc + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../../src/transports/ipc'); + +const net = require('net'); +const fs = require('fs'); + +const EventEmitter = require('events').EventEmitter; + +/* Local variables -----------------------------------------------------------*/ + +const _path = '/tmp/app.socket-'; + +/* Tests ---------------------------------------------------------------------*/ + +describe('IPC', () => { + describe('#listen(server, callback)', () => { + + afterEach(() => sinon.mock.restore()); + + it('should bind to a defined port', (done) => { + + var netMock = sinon.mock(net); + var fsStub = sinon.stub(fs, 'unlink'); + var port = 9000; + var serverTest = { + listener: null, + handleError: function(){}, + handleRequest: function(){}, + options: { + port: port + } + }; + var listenerTest = sinon.mock({ + on: function(){}, + listen: function(port, callback){ + callback(); + } + }); + + fsStub.withArgs('/tmp/app.socket-9000') + .callsArg(1); + + netMock.expects('createServer') + .once() + .returns(listenerTest.object); + + listenerTest.expects('on') + .once() + .withArgs('error'); + + listenerTest.expects('listen') + .once() + .withArgs('/tmp/app.socket-9000'); + + testModule.listen(serverTest); + + expect(serverTest.listener).to.be.not.null; + listenerTest.verify(); + netMock.verify(); + fsStub.restore(); + done(); + }); + }); + + describe('#stop(server, callback)', () => { + it('should disconnect all sockets and close the server', (done) => { + var clientStub = sinon.stub(testModule, 'disconnect'); + var serverClose = sinon.spy(); + + var testServer = { + listener: { + close: serverClose + } + }; + + testModule.stop(testServer); + + expect(serverClose.calledOnce).to.be.true; + clientStub.restore(); + done(); + }); + }); + + describe('#send(socket, payload)', () => { + it('should send the payload through the socket', () => { + var testPayload = new Buffer(JSON.stringify({foo:'bar'})); + var socketMock = sinon.mock({ + write: function() {} + }); + + socketMock.expects('write') + .twice(); + + testModule.send(socketMock.object, testPayload); + socketMock.verify(); + }); + }); + + describe('#createSocket(client, socket)', () => { + it('should create a socket client and return it', () => { + var netMock = sinon.mock(net); + + netMock.expects('connect') + .once() + .withArgs('/tmp/app.socket-9000') + .returns({ + on:function() {}, + setTimeout: function() {}, + connect: function() {}, + pipe: function() { + return new EventEmitter(); + } + }); + + var result = testModule.createSocket({ + options: { + hostname: '0.0.0.0', + port: 9000 + }, + handleError: function() {}, + handleConnect: function() {}, + handleRequest: function() {}, + handleDisconnect: function() {} + }); + + netMock.verify(); + }); + }); + + describe('#disconnect(client)', () => { + it('should call the client\'s disconnect method', (done) => { + var testSocket = { + destroy: function() {} + }; + var clientMock = sinon.mock({ + handleDisconnect: function() {}, + socket: testSocket + }); + + clientMock.expects('handleDisconnect') + .once(); + + testModule.disconnect(clientMock.object); + + setTimeout(() => { + clientMock.verify(); + done(); + }, 10); + }); + }); +}); \ No newline at end of file diff --git a/tests/unit/transports/tcp.spec.js b/tests/unit/transports/tcp.spec.js new file mode 100644 index 0000000..914f343 --- /dev/null +++ b/tests/unit/transports/tcp.spec.js @@ -0,0 +1,151 @@ +/** + * TCP connector methods + * @module transports/tcp + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../../src/transports/tcp'); + +const net = require('net'); + +const EventEmitter = require('events').EventEmitter; + +/* Tests ---------------------------------------------------------------------*/ + +describe('TCP', () => { + describe('#listen(server, callback)', () => { + + afterEach(() => sinon.mock.restore()); + + it('should bind to a defined port', (done) => { + + var netMock = sinon.mock(net); + var port = 9000; + var serverTest = { + listener: null, + handleError: function(){}, + handleRequest: function(){}, + options: { + port: port + } + }; + var listenerTest = sinon.mock({ + on: function(){}, + listen: function(port, callback){ + callback(); + } + }); + + netMock.expects('createServer') + .once() + .returns(listenerTest.object); + + listenerTest.expects('on') + .once() + .withArgs('error'); + + listenerTest.expects('listen') + .once() + .withArgs(port); + + testModule.listen(serverTest); + + expect(serverTest.listener).to.be.not.null; + listenerTest.verify(); + netMock.verify(); + done(); + }); + }); + + describe('#stop(server, callback)', () => { + it('should disconnect all sockets and close the server', (done) => { + var clientStub = sinon.stub(testModule, 'disconnect'); + var serverClose = sinon.spy(); + + var testServer = { + listener: { + close: serverClose + } + }; + + testModule.stop(testServer); + + expect(serverClose.calledOnce).to.be.true; + clientStub.restore(); + done(); + }); + }); + + describe('#send(socket, payload)', () => { + it('should send the payload through the socket', () => { + var testPayload = new Buffer(JSON.stringify({foo:'bar'})); + var socketMock = sinon.mock({ + write: function() {} + }); + + socketMock.expects('write') + .twice(); + + testModule.send(socketMock.object, testPayload); + socketMock.verify(); + }); + }); + + describe('#createSocket(client, socket)', () => { + it('should create a socket client and return it', () => { + var netMock = sinon.mock(net); + + netMock.expects('connect') + .once() + .withArgs(9000, '0.0.0.0') + .returns({ + on:function() {}, + pipe: function() { + return new EventEmitter(); + }, + setTimeout: function() {}, + connect: function() {} + }); + + var result = testModule.createSocket({ + options: { + hostname: '0.0.0.0', + port: 9000 + }, + handleError: function() {}, + handleConnect: function() {}, + handleRequest: function() {}, + handleDisconnect: function() {} + }); + + netMock.verify(); + }); + }); + + describe('#disconnect(client)', () => { + it('should call the client\'s disconnect method', () => { + var testSocket = { + destroy: function() {} + }; + var clientMock = sinon.mock({ + handleDisconnect: function() {}, + socket: testSocket + }); + + clientMock.expects('handleDisconnect') + .once(); + + testModule.disconnect(clientMock.object); + + setTimeout(() => { + clientMock.verify(); + done(); + }, 10); + }); + }); +}); \ No newline at end of file diff --git a/tests/unit/transports/udp.spec.js b/tests/unit/transports/udp.spec.js new file mode 100644 index 0000000..e458f93 --- /dev/null +++ b/tests/unit/transports/udp.spec.js @@ -0,0 +1,162 @@ +/** + * UDP connector methods + * @module transports/udp + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../../src/transports/udp'); + +const dgram = require('dgram'); + +/* Tests ---------------------------------------------------------------------*/ + +describe('UDP', () => { + describe('#listen(server, callback)', () => { + + afterEach(() => sinon.mock.restore()); + + it('should bind to a defined port', (done) => { + + var dgramMock = sinon.mock(dgram); + var port = 9000; + var serverTest = { + listener: null, + handleError: function(){}, + options: { + port: port + } + }; + var listenerTest = sinon.mock({ + on: function(){}, + bind: function(){} + }); + + dgramMock.expects('createSocket') + .once() + .withArgs({ + type: 'udp4', + reuseAddr: true + }) + .returns(listenerTest.object); + + listenerTest.expects('on') + .twice(); + + listenerTest.expects('bind') + .once() + .withArgs(port, '0.0.0.0'); + + testModule.listen(serverTest, () => { + expect(serverTest.listener).to.be.not.null; + listenerTest.verify(); + dgramMock.verify(); + done(); + }); + }); + }); + + describe('#stop(server, callback)', () => { + it('should disconnect all sockets and close the server', (done) => { + var clientStub = sinon.stub(testModule, 'disconnect'); + var serverClose = sinon.spy(); + var clients = { + a: 'a', + b: 'b', + c: 'c' + }; + + var testServer = { + __clients: clients, + listener: { + close: serverClose + } + }; + + testModule.stop(testServer, () => { + expect(clientStub.withArgs('a').called).to.be.true; + expect(clientStub.withArgs('b').called).to.be.true; + expect(clientStub.withArgs('c').called).to.be.true; + expect(serverClose.calledOnce).to.be.true; + clientStub.restore(); + done(); + }); + }); + }); + + describe('#send(socket, payload)', () => { + it('should send the payload through the socket', () => { + var testPayload = new Buffer(JSON.stringify({foo:'bar'})); + var socketMock = sinon.mock({ + send: function() {}, + __port: 9000, + __hostname: '0.0.0.0' + }); + + socketMock.expects('send') + .once() + .withArgs( + testPayload, + 0, + testPayload.length, + 9000, + '0.0.0.0' + ); + + testModule.send(socketMock.object, testPayload); + socketMock.verify(); + }); + }); + + describe('#createSocket(client, socket)', () => { + it('should create a socket client and return it', () => { + var dgramMock = sinon.mock(dgram); + + dgramMock.expects('createSocket') + .once() + .withArgs('udp4') + .returns({ + on: function() {}, + bind: function() {} + }); + + var result = testModule.createSocket({ + options: { + hostname: '0.0.0.0', + port: 9000 + }, + handleError: function() {}, + handleConnect: function() {}, + handleRequest: function() {} + }); + + expect(result.__port).to.equal(9000); + expect(result.__hostname).to.equal('0.0.0.0'); + dgramMock.verify(); + }); + }); + + describe('#disconnect(client)', () => { + it('should call the client\'s disconnect method', () => { + var testSocket = {foo: 'bar'}; + var clientMock = sinon.mock({ + handleDisconnect: function() {}, + socket: testSocket + }); + + clientMock.expects('handleDisconnect') + .once(); + + testModule.disconnect(clientMock.object); + + setTimeout(() => { + clientMock.verify(); + done(); + }, 10); + }); + }); +}); \ No newline at end of file diff --git a/tests/unit/utils/serializer.spec.js b/tests/unit/utils/serializer.spec.js new file mode 100644 index 0000000..1f5598d --- /dev/null +++ b/tests/unit/utils/serializer.spec.js @@ -0,0 +1,13 @@ +/** + * Serializer + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../../src/utils/serializer'); + +/* Tests ---------------------------------------------------------------------*/ \ No newline at end of file diff --git a/tests/unit/utils/sessions.spec.js b/tests/unit/utils/sessions.spec.js new file mode 100644 index 0000000..1e44bb2 --- /dev/null +++ b/tests/unit/utils/sessions.spec.js @@ -0,0 +1,13 @@ +/** + * Sessions + */ + +'use strict'; + +/* Requires ------------------------------------------------------------------*/ + +const expect = require('chai').expect; +const sinon = require('sinon'); +const testModule = require('../../../src/utils/sessions'); + +/* Tests ---------------------------------------------------------------------*/ \ No newline at end of file