diff --git a/acequia.js b/acequia.js
index 8d4c5e4..3d57d00 100644
--- a/acequia.js
+++ b/acequia.js
@@ -19,81 +19,48 @@ var DEBUG = 1,
TCP_PORT = 9093,
TIMEOUT = 600; //Seconds before kicking idle clients.
+ * The standard messages that the acequia system handles.
+ */
+var MSG_CONNECT = "/connect";
+var MSG_DISCONNECT = "/disconnect";
+var MSG_GETCLIENTS = "/getClients";
// The list of clients
var clients = [];
var oscServer, wsServer;
-//Logs str to the console if the 'global' debug is set to 1.
+// Logs str to the console if the 'global' debug is set to 1.
function debug(str) {
if (DEBUG) {
console.log('[' + (new Date()) + '] ' + str);
-//Utility function for sending an osc message to a given client.
-function msgSndOsc(to, from, title, body, tt) {
- var data = [from].concat(body),
- oscMsg = osc.newOsc(title, 's' + tt, data),
- buffer = osc.oscToBuffer(oscMsg);
- oscServer.send(buffer, 0, buffer.length, clients[to].portOut, clients[to].ip);
-//Utility function for sending a websocket message to a given client.
-function msgSndWs(to, from, title, body) {
- wsServer.send(clients[to].id, JSON.stringify({'from' : from, 'title' : title, 'body' : body}));
// The master sending function which takes a message meant for a client, decides
// which protocol to use, and calls the appropriate function.
function msgSnd(to, from, title, body, tt) {
- if (to === -1) {
- return;
- }
- switch (clients[to].protocol) {
- case ac.TYP_OSC:
- msgSndOsc(to, from, title, body, tt);
- break;
- case ac.TYP_WS:
- msgSndWs(to, from, title, body);
- break;
- }
+ clients[to].send(from, title, body, tt);
debug("Sent message " + title + " to client #" + to + " (" + clients[to].name + ")");
//The master function which sends messages to all clients except for exc.
function msgSndAll(exc, mesid, data, tt) {
- var i;
+ var i, name = clients[exc].name;
for (i = 0; i < clients.length; i += 1) {
if (i !== exc) {
- msgSnd(i, mesid, data, tt);
+ clients[i].send(name, mesid, data, tt);
// Message routing goes here.
-function msgRec(from, to, title, body, tt) {
- var time = (new Date()).getTime();
- debug('Received message ' + title + ' from client #' + from + ' (' + clients[from].name + ')');
- if (from === -1) {
- return;
- }
- clients[from].lastMessage = time;
- switch (title) {
- case "place-holder":
- break;
- case "place-holder2":
- break;
- default:
+function onMessage(from, to, title, body, tt) {
+ if (to === -1) {
+ msgSndAll(from, title, body, tt);
+ } else {
msgSnd(to, clients[from].name, title, body, tt);
- break;
@@ -136,7 +103,7 @@ function kickIdle() {
for (i = 0; i < clients.length; i += 1) {
if ((time - clients[i].lastMessage) > TIMEOUT * 1000) {
- dropClient(i, 'timeout');
+ dropClient(i, "client timeout");
i -= 1;
@@ -144,25 +111,25 @@ function kickIdle() {
// Once we actually get our internal IP, we start the servers.
function startServers() {
- var i, oscMsg, from, to, title, tt, client, portOut,
+ var i, from, to, title, tt, client, portOut,
httpClient, request, clientList;
//OSC Server.
oscServer = dgram.createSocket('udp4');
oscServer.on('message', function (msg, rinfo) {
- oscMsg = osc.bufferToOsc(msg);
+ var oscMsg = osc.bufferToOsc(msg);
switch (oscMsg.address) {
- case "/connect":
//the second parameter when logging in is an optional 'port to send to'.
portOut = (oscMsg.data[1] > 0) ? oscMsg.data[1] : rinfo.port;
- client = new ac.OSCClient(oscMsg.data[0], rinfo.address, rinfo.port, portOut);
+ client = new ac.OSCClient(oscMsg.data[0], rinfo.address, rinfo.port, portOut, oscServer);
debug('Added client ' + oscMsg.data[0] +
' (OSC@' + rinfo.address + ':' + rinfo.port + ', client #' + (clients.length - 1) + ')');
- case "/disconnect":
dropClient(lookupClient(ac.TYP_OSC, rinfo.address, rinfo.port), "disconnect by user");
@@ -171,7 +138,7 @@ function startServers() {
to = lookupClientUsername(oscMsg.data.shift());
title = oscMsg.address;
tt = oscMsg.typeTags.slice(1);
- msgRec(from, to, title, oscMsg.data, tt);
+ onMessage(from, to, title, oscMsg.data, tt);
@@ -182,7 +149,8 @@ function startServers() {
debug('oscServer is listening on ' + oscServer.address().address + ':' + oscServer.address().port);
httpClient = http.createClient('80', 'plasticsarcastic.com');
- request = httpClient.request('GET', '/nodejs/scrCreateServer.php?ip=' + INTERNAL_IP + '&port=' + OSC_PORT, {'host' : 'plasticsarcastic.com'});
+ request = httpClient.request('GET', '/nodejs/scrCreateServer.php?ip=' + INTERNAL_IP +
+ '&port=' + OSC_PORT, {'host' : 'plasticsarcastic.com'});
request.on('response', function (response) {
@@ -194,7 +162,6 @@ function startServers() {
- //"Finalize" the OSC server.
oscServer.bind(OSC_PORT, INTERNAL_IP);
//Websocket server:
@@ -204,15 +171,12 @@ function startServers() {
con.addListener('message', function (msg) {
debug("connection: message");
- var message = JSON.parse(msg),
- from = lookupClient(ac.TYP_WS, con.id),
- to = lookupClientUsername(message.to),
+ var from, to,
+ message = JSON.parse(msg),
title = message.title,
- body = message.body;
- switch (title) {
- case "/connect":
+ body = message.body;
+ if (title === MSG_CONNECT) {
//Add them to the clients list when they connect if the username is free.
for (i = 0; i < clients.length; i += 1) {
if (clients[i].name === body[0]) {
@@ -222,29 +186,34 @@ function startServers() {
- client = new ac.WebSocketClient(body[0], con.id);
+ client = new ac.WebSocketClient(body[0], con.id, wsServer);
- msgSndWs(clients.length - 1, "SYS", "/connect", 1);
+ client.send("SYS", MSG_CONNECT, 1);
debug('Added client ' + clients[clients.length - 1].name +
- ' (ws id ' + con.id + ', client #' + (clients.length - 1) + ')');
- break;
- case "/disconnect":
- dropClient(from, "disconnect by user.");
- break;
+ ' (ws id ' + con.id + ', client #' + (clients.length - 1) + ')');
+ } else {
+ from = lookupClient(ac.TYP_WS, con.id);
+ to = lookupClientUsername(message.to);
+ switch (title) {
+ clients[from].send("SYS", MSG_DISCONNECT, 1);
+ dropClient(from, "disconnect by user.");
+ break;
- case "/getClients":
- clientList = [];
- for (i = 0; i < clients.length; i += 1) {
- clientList.push(clients[i].name);
- }
- msgSndWs(from, "SYS", "/getClients", clientList);
- break;
+ clientList = [];
+ for (i = 0; i < clients.length; i += 1) {
+ clientList.push(clients[i].name);
+ }
+ clients[from].send("SYS", MSG_GETCLIENTS, clientList);
+ break;
- default:
- msgRec(from, to, title, body);
- break;
+ default:
+ onMessage(from, to, title, body);
+ break;
+ }
@@ -290,7 +259,26 @@ function startServers() {
+ // HTTP Server
+ http.createServer(function (req, res) {
+ var pathName = URL.parse(req.url, true).pathname;
+ console.log("HTTP server received " + pathName);
+ res.writeHead(200, {'Content-Type': 'text/plain'});
+ switch (pathName) {
+ break;
+ break;
+ break;
+ default:
+ break;
+ }
+ res.end(req.body + ' Hello World\n');
+ }).listen(HTTP_PORT);
+ debug("httpServer is listening on " + INTERNAL_IP + ":" + HTTP_PORT);
setInterval(kickIdle, 1000);
@@ -298,7 +286,7 @@ function startServers() {
// It starts a server for each individual protocol.
function start() {
- //First, we need to get our internal IP, by parsing ifconfig:
+ // First, we need to get our internal IP, by parsing ifconfig:
if (process.argv.length > 2) {
INTERNAL_IP = process.argv[2];
diff --git a/acequiaClient.html b/acequiaClient.html
index ab5a8b0..6d4b45a 100644
--- a/acequiaClient.html
+++ b/acequiaClient.html
@@ -1,30 +1,66 @@
Acequia Client Test Page
+ |
+ |
+ |
+ |
diff --git a/acequiaClient.js b/acequiaClient.js
index 0a89759..a3639ef 100644
--- a/acequiaClient.js
+++ b/acequiaClient.js
@@ -1,4 +1,4 @@
-/*global WebSocket*/
+/*global WebSocket console AcequiaMessage*/
* Creates an event callback by wrapping the object with a closure.
@@ -10,64 +10,197 @@ var objCallback = function (obj, func) {
return function () {
obj[func].apply(obj, arguments);
- * Class to define the outgoing messages from the acequia client.
- * @param {String} to
+ * Defines the AcequiaClient object, which is used to connect with the Acequia server
+ * through a WebSocket connection.
+ * @param {String} uri The uri of the connection to the WebSocket.
+ * @param {String} userName The user name, used to uniquely identify this user.
-function AcequiaMessage(to, title, body) {
- this.to = (typeof(to) === "undefined") ? "" : to;
- this.title = title;
- this.body = (typeof(body) === "undefined") ? [] :
- ((body instanceof Array) ? body : [body]);
-AcequiaMessage.prototype.toString = function () {
- return JSON.stringify(this);
-var acequiaClient = {
+var AcequiaClient = function (uri, userName) {
+ var m_connected = false;
- dataCallback: null,
+ this.isConnected = function () {
+ return m_connected;
+ };
- userName: "",
+ this.setConnected = function (connected) {
+ var idx;
+ m_connected = connected;
+ for (idx in this.connectionChangeHandlers) {
+ this.connectionChangeHandlers[idx](connected);
+ }
+ };
+ this.uri = uri;
+ this.userName = userName;
+ this.webSocket = null;
- webSocket: null,
+ this.listeners = {};
- connect: function (uri, userName, callback) {
- this.userName = userName;
- this.dataCallback = callback;
- this.webSocket = new WebSocket(uri);
+ this.connectionChangeHandlers = [];
+ // Add listeners for the connect and disconnect commands
+ this.addMessageListener(AcequiaClient.CONNECT, objCallback(this, "acequia_onConnect"));
+ this.addMessageListener(AcequiaClient.DISCONNECT, objCallback(this, "acequia_onDisconnect"));
+ * {String} The connect command that will be sent to and receieved from acequia.
+ */
+AcequiaClient.CONNECT = "/connect";
+ * {String} The disconnect command that will be sent to and receieved from acequia.
+ */
+AcequiaClient.DISCONNECT = "/disconnect";
+ * {String} The getClients command that will be sent to and receieved from acequia.
+ */
+AcequiaClient.GETCLIENTS = "/getClients";
+ * Adds a listener for the message with the message name.
+ * @param {String} msgName The name of the message to listen to.
+ * @param {Function} callback The callback function which will be called when the
+ * message arrives.
+ */
+AcequiaClient.prototype.addMessageListener = function (msgName, callback) {
+ if (!(msgName in this.listeners)) {
+ this.listeners[msgName] = [];
+ }
+ this.listeners[msgName].push(callback);
+ * Adds a listener for the message with the message name.
+ * @param {String} msgName The name of the message to listen to.
+ * @param {Function} callback The callback function which will be called when the
+ * message arrives.
+ */
+AcequiaClient.prototype.removeMessageListener = function (msgName, callback) {
+ var idx = this.listeners[msgName].indexOf(callback);
+ this.listeners[msgName].splice(idx, 1);
+AcequiaClient.prototype.addConnectionChangeHandler = function (handler) {
+ this.connectionChangeHandlers.push(handler);
+ * Connects to the Acequia server by creating a new web socket connection;
+ */
+AcequiaClient.prototype.connect = function () {
+ if (this.isConnected()) {
+ console.error("AcequiaClient.connect: client is already connected");
+ } else {
+ this.webSocket = new WebSocket(this.uri);
this.webSocket.onopen = objCallback(this, "ws_onopen");
this.webSocket.onclose = objCallback(this, "ws_onclose");
this.webSocket.onmessage = objCallback(this, "ws_onmessage");
this.webSocket.onerror = objCallback(this, "ws_onerror");
- },
- disconnect: function () {
- this.send('/disconnect');
- },
+ }
- send: function (title, body, to) {
- var msg = new AcequiaMessage(to, title, body);
+ * Sends the disconnect message to the Acequia server.
+ */
+AcequiaClient.prototype.disconnect = function () {
+ this.send(AcequiaClient.DISCONNECT);
+ * Sends the getClients message to the Acequia server.
+ */
+AcequiaClient.prototype.getClients = function () {
+ this.send(AcequiaClient.GETCLIENTS, this.userName);
+ * Sends a message to the Acequia server.
+ * @param {String} msgName The name of the message.
+ * @param {Object} body The message body.
+ * @param {String} to The name of the client that will receive the message.
+ */
+AcequiaClient.prototype.send = function (msgName, body, to) {
+ if (msgName !== AcequiaClient.CONNECT && !this.isConnected()) {
+ console.error("AcequiaClient.send " + msgName + ": client is not connected");
+ } else {
+ var msg = new AcequiaMessage(this.userName, msgName, body, to);
- },
+ }
- ws_onopen : function (evt) {
- this.send('/connect', this.userName);
- },
- ws_onclose: function (evt) {
- },
+ * Listens for the connect message, then sets connected flag to true.
+ */
+AcequiaClient.prototype.acequia_onConnect = function () {
+ this.setConnected(true);
+ * Listens for the disconnect message, then sets connected flag to false and closes
+ * the WebSocket connection.
+ */
+AcequiaClient.prototype.acequia_onDisconnect = function () {
+ this.setConnected(false);
+ this.webSocket.close();
+ * Handles the onopen event from the WebSocket. This method sends the connect
+ * message to the Acequia client.
+ * @param {Event} evt The event object.
+ */
+AcequiaClient.prototype.ws_onopen = function (evt) {
+ this.send(AcequiaClient.CONNECT, this.userName);
+ * Handles the onclose event from the WebSocket. This method sets the WebSocket
+ * member to null.
+ * @param {Event} evt The event object.
+ */
+AcequiaClient.prototype.ws_onclose = function (evt) {
+ this.webSocket = null;
+ this.setConnected(false);
+ * Handles the onmessage event from the WebSocket. This method calls any message listeners
+ * that have registered for the message. If there is a wildcard handler registered, then it
+ * will call that as well.
+ * @param {Event} evt The event object.
+ */
+AcequiaClient.prototype.ws_onmessage = function (evt) {
+ var i, msg = JSON.parse(evt.data);
- ws_onmessage: function (evt) {
- var msg = JSON.parse(evt.data);
- this.dataCallback(msg.from, msg.title, msg.body);
- },
+ // if there is a message listener for this message, call it.
+ if (msg.title in this.listeners) {
+ for (i in this.listeners[msg.title]) {
+ this.listeners[msg.title][i](msg, this);
+ }
+ }
- ws_onerror: function (evt) {
- alert(evt);
+ // If there is a wildcard listener, call it.
+ if ("*" in this.listeners) {
+ for (i in this.listeners["*"]) {
+ this.listeners["*"][i](msg, this);
+ }
+ * Handles the onerror event from the WebSocket. This method disconnects from the acequia server
+ * and closes the websocket connection, in case it is unable to send the message.
+ * @param {Event} evt The event object.
+ */
+AcequiaClient.prototype.ws_onerror = function (evt) {
+ console.error("WebSocket Error: " + evt.data);
+ this.disconnect();
+ this.setConnected(false);
+ this.webSocket.close();
diff --git a/client.js b/client.js
index 8d86f1c..9980c89 100644
--- a/client.js
+++ b/client.js
@@ -7,38 +7,53 @@
/*global exports */
-var TYP_OSC = 0,
- TYP_WS = 1;
+var TYP_OSC = "OSC",
* The base class for all acequia clients.
-AcequiaClient = function (name, prot) {
+var AcequiaClient = function (name, prot, server) {
this.name = name;
this.protocol = prot;
+ this.server = server;
this.lastMessage = (new Date()).getTime();
AcequiaClient.prototype.equals = function (prot) {
return (this.protocol === prot);
+AcequiaClient.prototype.update = function (prot) {
+ this.lastMessage = (new Date()).getTime();
* Defines the client connected to acequia via Websockets
* @param {String} name The unique user name associated with the client.
* @param {String} id The id assigned to the connection by the websocket
+ * @param {Object} server The WebSocketServer that will be used to send the message.
* server
-WebSocketClient = function (name, id) {
+var WebSocketClient = function (name, id, server) {
this.id = id;
- AcequiaClient.call(this, name, TYP_WS);
+ AcequiaClient.call(this, name, TYP_WS, server);
-WebSocketClient.prototype = new AcequiaClient;
+WebSocketClient.prototype = new AcequiaClient();
WebSocketClient.prototype.equals = function (prot, id) {
return (this.protocol === prot &&
this.id === id);
+WebSocketClient.prototype.send = function(from, title, body) {
+ var msgBody = JSON.stringify({'from' : from, 'title' : title, 'body' : body});
+ this.server.send(this.id, msgBody);
+ this.update();
* Defines the client connected to acequia via an OSC connection
* @param {String} name The unique user name associated with the client.
@@ -47,20 +62,32 @@ WebSocketClient.prototype.equals = function (prot, id) {
* from the client;
* @param {Integer} portOut The port that messages will be going out from acequia
* to the client;
+ * @param {Object} server The OSC Server that will be used to send the message.
-OSCClient = function (name, ip, portIn, portOut) {
+var OSCClient = function (name, ip, portIn, portOut, server) {
this.ip = ip;
this.portIn = portIn;
this.portOut = portOut;
- AcequiaClient.call(this, name, TYP_OSC);
+ AcequiaClient.call(this, name, TYP_OSC, server);
-OSCClient.prototype = new AcequiaClient;
+OSCClient.prototype = new AcequiaClient();
OSCClient.prototype.equals = function (prot, ip, portIn) {
return (this.protocol === prot &&
this.ip === ip &&
this.portIn === portIn);
+OSCClient.prototype.send = function(from, title, body, tt) {
+ var data = [from].concat(body),
+ oscMsg = osc.newOsc(title, 's' + tt, data),
+ buffer = osc.oscToBuffer(oscMsg);
+ this.server.send(buffer, 0, buffer.length, this.portOut, this.ip);
+ this.update();
// Export the entities that Acequia needs
exports.TYP_OSC = TYP_OSC;
exports.TYP_WS = TYP_WS;
diff --git a/libs/netIP.js b/libs/netIP.js
index fc48847..44325ac 100644
--- a/libs/netIP.js
+++ b/libs/netIP.js
@@ -9,18 +9,13 @@ var getNetworkIP = (function () {
var command;
var filterRE;
- switch (process.platform) {
// TODO: implement for OSs without ifconfig command
- case 'darwin':
+ if (process.platform === "darwin") {
command = 'ifconfig';
filterRE = /\binet\s+([^\s]+)/g;
- // filterRE = /\binet6\s+([^\s]+)/g; // IPv6
- break;
- default:
+ } else {
command = 'ifconfig';
filterRE = /\binet\b[^:]+:\s*([^\s]+)/g;
- // filterRE = /\binet6[^:]+:\s*([^\s]+)/g; // IPv6
- break;
return function (callback, bypassCache) {
@@ -31,18 +26,18 @@ var getNetworkIP = (function () {
// system call
exec(command, function (error, stdout, sterr) {
- var ips = [];
+ var i, ips = [];
// extract IPs
var matches = stdout.match(filterRE);
if (matches) {
// JS has no lookbehind REs, so we need a trick
- for (var i = 0; i < matches.length; i++) {
+ for (i = 0; i < matches.length; i++) {
ips.push(matches[i].replace(filterRE, '$1'));
// filter BS
- for (var i = 0, l = ips.length; i < l; i++) {
+ for (i = 0, l = ips.length; i < l; i++) {
if (!ignoreRE.test(ips[i])) {
//if (!error) {
cached = ips[i];
diff --git a/libs/osc.js b/libs/osc.js
index d3f977a..a2d43ff 100644
--- a/libs/osc.js
+++ b/libs/osc.js
@@ -1,206 +1,267 @@
-var jspack = require('../vendor/node-jspack/jspack').jspack;
+/*globals require exports */
+var jspack = require('../vendor/node-jspack/jspack').jspack;
+var Buffer = require('buffer').Buffer;
-var Osc=function(addr,tt,d){
- this.address=addr;
- this.typeTags=tt;
- this.data=d;
+var Osc = function (addr, tt, d) {
+ this.address = addr;
+ this.typeTags = tt;
+ this.data = d;
return this;
+exports.newOsc = Osc;
-var OscBundle=function(addrs,tts,ds){
- this.addresses=addrs;
- this.typeTags=tts;
- this.datas=ds;
+var OscBundle = function (addrs, tts, ds) {
+ this.addresses = addrs;
+ this.typeTags = tts;
+ this.datas = ds;
return this;
+exports.newOscBundle = OscBundle;
- var address='';
- var typeTags='';
- var data=[];
- var i=0;
+exports.bufferToOsc = function (buffer) {
+ var address = '', typeTags = '', data = [], i = 0, j, str;
+ while (buffer[i]) {
+ address += String.fromCharCode(buffer[i]);
+ i += 1;
+ }
- while(buffer[i]){
- address+=String.fromCharCode(buffer[i]);
- i++;
+ while ((i % 4) < 3) {
+ i += 1;
- while((i%4)<3){i++;}
- i++;
+ i += 1;
- while(buffer[i]){
- if(buffer[i]=='f'.charCodeAt(0) || buffer[i]=='i'.charCodeAt(0) || buffer[i]=='s'.charCodeAt(0)){typeTags+=String.fromCharCode(buffer[i]);}
- i++;
+ while (buffer[i]) {
+ if (buffer[i] === 'f'.charCodeAt(0) || buffer[i] === 'i'.charCodeAt(0) ||
+ buffer[i] === 's'.charCodeAt(0)) {
+ typeTags += String.fromCharCode(buffer[i]);
+ }
+ i += 1;
+ }
+ while ((i % 4) < 3) {
+ i += 1;
- while((i%4)<3){i++;}
- i++;
+ i += 1;
- for(var j=0; j=buffer.length){break;}
- i++;
+ if (j + 16 >= buffer.length) {
+ break;
+ }
+ i += 1;
- return new OscBundle(addresses,typeTags,datas);
+ return new OscBundle(addresses, typeTags, datas);
- if(typeof osc.data=="undefined"){osc.data=[];}
- var buffer=new Buffer(osc.address.length+osc.typeTags.length+(osc.data.length*4)+80); //could be more efficient.
+exports.oscToBuffer = function (osc) {
+ var byteArr, buffer, str, offset, i, j;
+ if (typeof osc.data === "undefined") {
+ osc.data = [];
+ }
- var str=osc.address+'\0';
- while((str.length%4)!=0){str+='\0';}
+ // This could be more efficient:
+ buffer = new Buffer(osc.address.length + osc.typeTags.length + (osc.data.length * 4) + 80);
+ str = osc.address + '\0';
+ while ((str.length % 4) !== 0) {
+ str += '\0';
+ }
- var offset=str.length;
+ offset = str.length;
- str=','+osc.typeTags+'\0';
- while((str.length%4)!=0){str+='\0';}
- buffer.write(str,offset);
- offset+=str.length;
+ str = ',' + osc.typeTags + '\0';
+ while ((str.length % 4) !== 0) {
+ str += '\0';
+ }
+ buffer.write(str, offset);
+ offset += str.length;
var byteArr=jspack.Pack(osc.typeTags,osc.data);
- for(var i=0; i=0.2.0-0"}
+, "name" : "Acequia"
+, "repository":{"type":"git","url":"http://github.com/miksago/node-websocket-server.git"}
, "scripts": { "start": "node ./acequia.js" }
+, "version" : "0.5.0"