diff --git a/index.js b/index.js index 5f3c44e..0e90234 100644 --- a/index.js +++ b/index.js @@ -1 +1,8 @@ -module.exports = require('./lib/acequia'); \ No newline at end of file +var acequia = require('./lib/acequia'); + +var start = function () { + var acequiaServer = acequia.createServer(); + acequiaServer.start(); +}; + +start(); \ No newline at end of file diff --git a/lib/acequia.js b/lib/acequia.js index db538d4..c7cbccc 100644 --- a/lib/acequia.js +++ b/lib/acequia.js @@ -1,4 +1,4 @@ -/*global console process require setInterval*/ +/*global console process require setInterval module*/ // Imports and globals. var http = require("http"), @@ -12,29 +12,61 @@ var http = require("http"), Buffer = require('buffer').Buffer, genclient = require("./genclient"); -var INTERNAL_IP = "0.0.0.0", - OSC_PORT = 9090, - WS_PORT = 9091, - TCP_PORT = 9092, - TIMEOUT = 600000; // Seconds before kicking idle clients. - +Object.extend = function(destination, source) { + for (var property in source) { + destination[property] = source[property]; + } + return destination; +}; + var tcp_ad, osc_ad, ws_ad; -// The list of clients -var clients = new ac.AcequiaClients(TIMEOUT * 1000); +// Create a logger +var logger = require('log4js').getLogger("acequia"); + +var acequiaClients = null; -// Initialize the datastore -require("./datastore").init(clients); +function AcequiaServer(options) { -var oscServer, wsServer, tcpServer, httpServer, clientCode = null; + if (typeof(options) === "undefined") { + options = {}; + } -// Create a logger -var logger = require('log4js').getLogger("acequia"); + this.options = Object.extend({ + ipAddress: "0.0.0.0", + oscPort: 9090, + wsPort: 9091, + tcpPort: 9092, + timeout: 600000 + }, options || {}); + + this.acequiaClients = new ac.AcequiaClients(this.options.timeout * 1000); + require("./datastore").init(this.acequiaClients); + + // Set the global value: + acequiaClients = this.acequiaClients; + + this.oscServer = null; + this.wsServer = null; + this.tcpServer = null; +} -// Once we actually get our internal IP, we start the servers. -function startServers() { +AcequiaServer.prototype.start = function () { + if (this.options.oscPort) { + this.oscServer = this.createOSCServer(); + } + + if (this.options.wsPort) { + this.wsServer = this.createWSServer(); + } + + if (this.tcpPort) { + this.tcpServer = this.createTCPServer(); + } +}; - oscServer = dgram.createSocket("udp4"); +AcequiaServer.prototype.createOSCServer = function () { + var oscServer = dgram.createSocket("udp4"); // TODO: We need to implement connect/disconnct/subscribe messages for osc oscServer.on("message", function (data, rinfo) { @@ -49,7 +81,7 @@ function startServers() { message = new msg.AcequiaMessage("", oscMsg.address, oscMsg.data); } - clients.onMessage(ac.TYP_OSC, message, oscServer, rinfo); + acequiaClients.onMessage(ac.TYP_OSC, message, oscServer, rinfo); } }); @@ -69,9 +101,71 @@ function startServers() { logger.debug("oscServer closed"); }); - oscServer.bind(OSC_PORT, INTERNAL_IP); + oscServer.bind(this.options.oscPort, this.options.ipAddress); + + return oscServer; +}; - // Websocket server: +AcequiaServer.prototype.createTCPServer = function () { + // Setup a tcp server + var tcpServer = net.createServer(function (socket) { + + socket.on("connect", function () { + logger.debug("TCP: [%s:%s] connect", socket.remoteAddress, socket.remotePort); + }); + + socket.on("data", function (data) { + var index = 0, msgs = [], size, message, + buffer = new Buffer(data); + + while (index < buffer.length) { + size = buffer.readInt32BE(index); + index += 4; + message = buffer.slice(index, index + size); + msgs.push(new msg.AcequiaMessage(JSON.parse(message))); + index += size; + } + + for (index = 0; index < msgs.length; index += 1) { + acequiaClients.onMessage(ac.TYP_TCP, msgs[index], socket); + } + }); + + socket.on("end", function () { + logger.debug("TCP: [%s:%s] end", socket.remoteAddress, socket.remotePort); + acequiaClients.findAndRemove(ac.TYP_TCP, socket.remoteAddress, socket.remotePort, "socket.on.end"); + }); + + socket.on("close", function (had_error) { + logger.debug("TCP: [%s:%s] close", socket.remoteAddress, socket.remotePort); + acequiaClients.findAndRemove(ac.TYP_TCP, socket.remoteAddress, socket.remotePort, "socket.on.close"); + }); + + socket.on("error", function (exception) { + logger.debug("TCP: [%s:%s] error %s", socket.remoteAddress, socket.remotePort, exception); + socket.destroy(); + }); + }); + + tcpServer.on("listening", function () { + logger.debug("TCP Server is listening on [%s:%s]", tcpServer.address().address, tcpServer.address().port); + + try { + var mdns = require("mdns"); + tcp_ad = mdns.createAdvertisement(mdns.tcp('acequia'), tcpServer.address().port, {name: "Acequia TCP Server"}); + tcp_ad.start(); + } catch (e) { + logger.error("Error creating mDNS advertisement: " + e.message); + } + }); + + tcpServer.listen(this.options.tcpServer, this.options.ipAddress); + + return tcpServer; +}; + +AcequiaServer.prototype.createWSServer = function () { + var clientCode, httpServer, wsServer; if (!clientCode) { // Generate the client code @@ -99,7 +193,7 @@ function startServers() { logger.debug(" WS Server is listening on [%s:%s]", this.address().address, this.address().port); }); - httpServer.listen(WS_PORT); + httpServer.listen(this.wsPort); // Create the socket io server and attach it to the httpServer wsServer = require('socket.io').listen(httpServer); @@ -124,100 +218,18 @@ function startServers() { socket.on('message', function (data) { var message = new msg.AcequiaMessage(JSON.parse(data)); - clients.onMessage(ac.TYP_WS, message, socket); + acequiaClients.onMessage(ac.TYP_WS, message, socket); }); socket.on("disconnect", function () { - clients.findAndRemove(ac.TYP_WS, this.id, "connection closed"); - }); - }); - - // Setup a tcp server - tcpServer = net.createServer(function (socket) { - - socket.on("connect", function () { - logger.debug("TCP: [%s:%s] connect", socket.remoteAddress, socket.remotePort); - }); - - socket.on("data", function (data) { - var index = 0, msgs = [], size, message, - buffer = new Buffer(data); - - while (index < buffer.length) { - size = buffer.readInt32BE(index); - index += 4; - message = buffer.slice(index, index + size); - msgs.push(new msg.AcequiaMessage(JSON.parse(message))); - index += size; - } - - for (index = 0; index < msgs.length; index += 1) { - clients.onMessage(ac.TYP_TCP, msgs[index], socket); - } - }); - - socket.on("end", function () { - logger.debug("TCP: [%s:%s] end", socket.remoteAddress, socket.remotePort); - clients.findAndRemove(ac.TYP_TCP, socket.remoteAddress, socket.remotePort, "socket.on.end"); - }); - - socket.on("close", function (had_error) { - logger.debug("TCP: [%s:%s] close", socket.remoteAddress, socket.remotePort); - clients.findAndRemove(ac.TYP_TCP, socket.remoteAddress, socket.remotePort, "socket.on.close"); - }); - - socket.on("error", function (exception) { - logger.debug("TCP: [%s:%s] error %s", socket.remoteAddress, socket.remotePort, exception); - socket.destroy(); + acequiaClients.findAndRemove(ac.TYP_WS, this.id, "connection closed"); }); }); - - tcpServer.on("listening", function () { - logger.debug("TCP Server is listening on [%s:%s]", tcpServer.address().address, tcpServer.address().port); - - try { - var mdns = require("mdns"); - tcp_ad = mdns.createAdvertisement(mdns.tcp('acequia'), tcpServer.address().port, {name: "Acequia TCP Server"}); - tcp_ad.start(); - } catch (e) { - logger.error("Error creating mDNS advertisement: " + e.message); - } - }); - - tcpServer.listen(TCP_PORT, INTERNAL_IP); -} - -// The exported function is called to start the server. -// It starts a server for each individual protocol. -function start() { - var args = process.argv.splice(2), - index = 0; - - while (index < args.length) { - switch (args[index]) { - case "--ip": - index += 1; - INTERNAL_IP = args[index]; - break; - case "--osc": - index += 1; - OSC_PORT = parseInt(args[index], 10); - break; - case "--tcp": - index += 1; - TCP_PORT = parseInt(args[index], 10); - break; - case "--ws": - index += 1; - WS_PORT = parseInt(args[index], 10); - break; - } - index += 1; - } - console.log("Platform: %s", require("os").platform()); - - startServers(); -} + return wsServer; +}; + -start(); +module.exports.createServer = function (options) { + return new AcequiaServer(options); +}; diff --git a/package.json b/package.json index f4b72d6..bd5672f 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name" : "acequia" , "version" : "0.1.0" , "description":"Message router for node supporting multiple protocols" -, "main": "lib/acequia.js" +, "main": "index.js" , "preferGlobal": "true" , "directories": {"lib": "./lib" } , "scripts": { "start": "node ." }