Skip to content

Commit

Permalink
Made the acequia server an object that can be created.
Browse files Browse the repository at this point in the history
  • Loading branch information
prgsmall committed Apr 23, 2012
1 parent f78a4f0 commit be16511
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 113 deletions.
9 changes: 8 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
module.exports = require('./lib/acequia');
var acequia = require('./lib/acequia');

var start = function () {
var acequiaServer = acequia.createServer();
acequiaServer.start();
};

start();
234 changes: 123 additions & 111 deletions lib/acequia.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*global console process require setInterval*/
/*global console process require setInterval module*/

// Imports and globals.
var http = require("http"),
Expand All @@ -12,29 +12,61 @@ var http = require("http"),
Buffer = require('buffer').Buffer,
genclient = require("./genclient");

var INTERNAL_IP = "0.0.0.0",
OSC_PORT = 9090,
WS_PORT = 9091,
TCP_PORT = 9092,
TIMEOUT = 600000; // Seconds before kicking idle clients.

Object.extend = function(destination, source) {
for (var property in source) {
destination[property] = source[property];
}
return destination;
};

var tcp_ad, osc_ad, ws_ad;

// The list of clients
var clients = new ac.AcequiaClients(TIMEOUT * 1000);
// Create a logger
var logger = require('log4js').getLogger("acequia");

var acequiaClients = null;

// Initialize the datastore
require("./datastore").init(clients);
function AcequiaServer(options) {

var oscServer, wsServer, tcpServer, httpServer, clientCode = null;
if (typeof(options) === "undefined") {
options = {};
}

// Create a logger
var logger = require('log4js').getLogger("acequia");
this.options = Object.extend({
ipAddress: "0.0.0.0",
oscPort: 9090,
wsPort: 9091,
tcpPort: 9092,
timeout: 600000
}, options || {});

this.acequiaClients = new ac.AcequiaClients(this.options.timeout * 1000);
require("./datastore").init(this.acequiaClients);

// Set the global value:
acequiaClients = this.acequiaClients;

this.oscServer = null;
this.wsServer = null;
this.tcpServer = null;
}

// Once we actually get our internal IP, we start the servers.
function startServers() {
AcequiaServer.prototype.start = function () {
if (this.options.oscPort) {
this.oscServer = this.createOSCServer();
}

if (this.options.wsPort) {
this.wsServer = this.createWSServer();
}

if (this.tcpPort) {
this.tcpServer = this.createTCPServer();
}
};

oscServer = dgram.createSocket("udp4");
AcequiaServer.prototype.createOSCServer = function () {
var oscServer = dgram.createSocket("udp4");

// TODO: We need to implement connect/disconnct/subscribe messages for osc
oscServer.on("message", function (data, rinfo) {
Expand All @@ -49,7 +81,7 @@ function startServers() {
message = new msg.AcequiaMessage("", oscMsg.address, oscMsg.data);
}

clients.onMessage(ac.TYP_OSC, message, oscServer, rinfo);
acequiaClients.onMessage(ac.TYP_OSC, message, oscServer, rinfo);
}
});

Expand All @@ -69,9 +101,71 @@ function startServers() {
logger.debug("oscServer closed");
});

oscServer.bind(OSC_PORT, INTERNAL_IP);
oscServer.bind(this.options.oscPort, this.options.ipAddress);

return oscServer;
};

// Websocket server:
AcequiaServer.prototype.createTCPServer = function () {
// Setup a tcp server
var tcpServer = net.createServer(function (socket) {

socket.on("connect", function () {
logger.debug("TCP: [%s:%s] connect", socket.remoteAddress, socket.remotePort);
});

socket.on("data", function (data) {
var index = 0, msgs = [], size, message,
buffer = new Buffer(data);

while (index < buffer.length) {
size = buffer.readInt32BE(index);
index += 4;
message = buffer.slice(index, index + size);
msgs.push(new msg.AcequiaMessage(JSON.parse(message)));
index += size;
}

for (index = 0; index < msgs.length; index += 1) {
acequiaClients.onMessage(ac.TYP_TCP, msgs[index], socket);
}
});

socket.on("end", function () {
logger.debug("TCP: [%s:%s] end", socket.remoteAddress, socket.remotePort);
acequiaClients.findAndRemove(ac.TYP_TCP, socket.remoteAddress, socket.remotePort, "socket.on.end");
});

socket.on("close", function (had_error) {
logger.debug("TCP: [%s:%s] close", socket.remoteAddress, socket.remotePort);
acequiaClients.findAndRemove(ac.TYP_TCP, socket.remoteAddress, socket.remotePort, "socket.on.close");
});

socket.on("error", function (exception) {
logger.debug("TCP: [%s:%s] error %s", socket.remoteAddress, socket.remotePort, exception);
socket.destroy();
});
});

tcpServer.on("listening", function () {
logger.debug("TCP Server is listening on [%s:%s]", tcpServer.address().address, tcpServer.address().port);

try {
var mdns = require("mdns");
tcp_ad = mdns.createAdvertisement(mdns.tcp('acequia'), tcpServer.address().port, {name: "Acequia TCP Server"});
tcp_ad.start();
} catch (e) {
logger.error("Error creating mDNS advertisement: " + e.message);
}
});

tcpServer.listen(this.options.tcpServer, this.options.ipAddress);

return tcpServer;
};

AcequiaServer.prototype.createWSServer = function () {
var clientCode, httpServer, wsServer;

if (!clientCode) {
// Generate the client code
Expand Down Expand Up @@ -99,7 +193,7 @@ function startServers() {
logger.debug(" WS Server is listening on [%s:%s]", this.address().address, this.address().port);
});

httpServer.listen(WS_PORT);
httpServer.listen(this.wsPort);

// Create the socket io server and attach it to the httpServer
wsServer = require('socket.io').listen(httpServer);
Expand All @@ -124,100 +218,18 @@ function startServers() {

socket.on('message', function (data) {
var message = new msg.AcequiaMessage(JSON.parse(data));
clients.onMessage(ac.TYP_WS, message, socket);
acequiaClients.onMessage(ac.TYP_WS, message, socket);
});

socket.on("disconnect", function () {
clients.findAndRemove(ac.TYP_WS, this.id, "connection closed");
});
});

// Setup a tcp server
tcpServer = net.createServer(function (socket) {

socket.on("connect", function () {
logger.debug("TCP: [%s:%s] connect", socket.remoteAddress, socket.remotePort);
});

socket.on("data", function (data) {
var index = 0, msgs = [], size, message,
buffer = new Buffer(data);

while (index < buffer.length) {
size = buffer.readInt32BE(index);
index += 4;
message = buffer.slice(index, index + size);
msgs.push(new msg.AcequiaMessage(JSON.parse(message)));
index += size;
}

for (index = 0; index < msgs.length; index += 1) {
clients.onMessage(ac.TYP_TCP, msgs[index], socket);
}
});

socket.on("end", function () {
logger.debug("TCP: [%s:%s] end", socket.remoteAddress, socket.remotePort);
clients.findAndRemove(ac.TYP_TCP, socket.remoteAddress, socket.remotePort, "socket.on.end");
});

socket.on("close", function (had_error) {
logger.debug("TCP: [%s:%s] close", socket.remoteAddress, socket.remotePort);
clients.findAndRemove(ac.TYP_TCP, socket.remoteAddress, socket.remotePort, "socket.on.close");
});

socket.on("error", function (exception) {
logger.debug("TCP: [%s:%s] error %s", socket.remoteAddress, socket.remotePort, exception);
socket.destroy();
acequiaClients.findAndRemove(ac.TYP_WS, this.id, "connection closed");
});
});

tcpServer.on("listening", function () {
logger.debug("TCP Server is listening on [%s:%s]", tcpServer.address().address, tcpServer.address().port);

try {
var mdns = require("mdns");
tcp_ad = mdns.createAdvertisement(mdns.tcp('acequia'), tcpServer.address().port, {name: "Acequia TCP Server"});
tcp_ad.start();
} catch (e) {
logger.error("Error creating mDNS advertisement: " + e.message);
}
});

tcpServer.listen(TCP_PORT, INTERNAL_IP);
}

// The exported function is called to start the server.
// It starts a server for each individual protocol.
function start() {
var args = process.argv.splice(2),
index = 0;

while (index < args.length) {
switch (args[index]) {
case "--ip":
index += 1;
INTERNAL_IP = args[index];
break;
case "--osc":
index += 1;
OSC_PORT = parseInt(args[index], 10);
break;
case "--tcp":
index += 1;
TCP_PORT = parseInt(args[index], 10);
break;
case "--ws":
index += 1;
WS_PORT = parseInt(args[index], 10);
break;
}
index += 1;
}

console.log("Platform: %s", require("os").platform());

startServers();
}
return wsServer;
};


start();
module.exports.createServer = function (options) {
return new AcequiaServer(options);
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{ "name" : "acequia"
, "version" : "0.1.0"
, "description":"Message router for node supporting multiple protocols"
, "main": "lib/acequia.js"
, "main": "index.js"
, "preferGlobal": "true"
, "directories": {"lib": "./lib" }
, "scripts": { "start": "node ." }
Expand Down

0 comments on commit be16511

Please sign in to comment.