From fa03767bb5c45550a207ea1972caa9762c336a2f Mon Sep 17 00:00:00 2001 From: "Peter R. G. Small" Date: Mon, 3 Oct 2011 11:41:07 -0600 Subject: [PATCH] fixed client implementation. Moved the send function to each client --- acequia.js | 156 +++++++++----------- acequiaClient.html | 72 ++++++--- acequiaClient.js | 221 ++++++++++++++++++++++------ client.js | 47 ++++-- libs/netIP.js | 15 +- libs/osc.js | 353 ++++++++++++++++++++++++++------------------- msg.js | 24 +++ package.json | 8 +- 8 files changed, 582 insertions(+), 314 deletions(-) create mode 100644 msg.js diff --git a/acequia.js b/acequia.js index 8d4c5e4..3d57d00 100644 --- a/acequia.js +++ b/acequia.js @@ -19,81 +19,48 @@ var DEBUG = 1, TCP_PORT = 9093, TIMEOUT = 600; //Seconds before kicking idle clients. +/** + * The standard messages that the acequia system handles. + */ +var MSG_CONNECT = "/connect"; +var MSG_DISCONNECT = "/disconnect"; +var MSG_GETCLIENTS = "/getClients"; // The list of clients var clients = []; var oscServer, wsServer; -//Logs str to the console if the 'global' debug is set to 1. +// Logs str to the console if the 'global' debug is set to 1. function debug(str) { if (DEBUG) { console.log('[' + (new Date()) + '] ' + str); } } -//Utility function for sending an osc message to a given client. -function msgSndOsc(to, from, title, body, tt) { - var data = [from].concat(body), - oscMsg = osc.newOsc(title, 's' + tt, data), - buffer = osc.oscToBuffer(oscMsg); - - oscServer.send(buffer, 0, buffer.length, clients[to].portOut, clients[to].ip); -} - - -//Utility function for sending a websocket message to a given client. -function msgSndWs(to, from, title, body) { - wsServer.send(clients[to].id, JSON.stringify({'from' : from, 'title' : title, 'body' : body})); -} - - // The master sending function which takes a message meant for a client, decides // which protocol to use, and calls the appropriate function. function msgSnd(to, from, title, body, tt) { - if (to === -1) { - return; - } - - switch (clients[to].protocol) { - case ac.TYP_OSC: - msgSndOsc(to, from, title, body, tt); - break; - - case ac.TYP_WS: - msgSndWs(to, from, title, body); - break; - } + clients[to].send(from, title, body, tt); debug("Sent message " + title + " to client #" + to + " (" + clients[to].name + ")"); } //The master function which sends messages to all clients except for exc. function msgSndAll(exc, mesid, data, tt) { - var i; + var i, name = clients[exc].name; for (i = 0; i < clients.length; i += 1) { if (i !== exc) { - msgSnd(i, mesid, data, tt); + clients[i].send(name, mesid, data, tt); } } } // Message routing goes here. -function msgRec(from, to, title, body, tt) { - var time = (new Date()).getTime(); - debug('Received message ' + title + ' from client #' + from + ' (' + clients[from].name + ')'); - if (from === -1) { - return; - } - clients[from].lastMessage = time; - - switch (title) { - case "place-holder": - break; - case "place-holder2": - break; - default: +function onMessage(from, to, title, body, tt) { + if (to === -1) { + msgSndAll(from, title, body, tt); + } else { msgSnd(to, clients[from].name, title, body, tt); - break; } } @@ -136,7 +103,7 @@ function kickIdle() { for (i = 0; i < clients.length; i += 1) { if ((time - clients[i].lastMessage) > TIMEOUT * 1000) { - dropClient(i, 'timeout'); + dropClient(i, "client timeout"); i -= 1; } } @@ -144,25 +111,25 @@ function kickIdle() { // Once we actually get our internal IP, we start the servers. function startServers() { - var i, oscMsg, from, to, title, tt, client, portOut, + var i, from, to, title, tt, client, portOut, httpClient, request, clientList; //OSC Server. oscServer = dgram.createSocket('udp4'); oscServer.on('message', function (msg, rinfo) { - oscMsg = osc.bufferToOsc(msg); + var oscMsg = osc.bufferToOsc(msg); switch (oscMsg.address) { - case "/connect": + case MSG_CONNECT: //the second parameter when logging in is an optional 'port to send to'. portOut = (oscMsg.data[1] > 0) ? oscMsg.data[1] : rinfo.port; - client = new ac.OSCClient(oscMsg.data[0], rinfo.address, rinfo.port, portOut); + client = new ac.OSCClient(oscMsg.data[0], rinfo.address, rinfo.port, portOut, oscServer); clients.push(client); debug('Added client ' + oscMsg.data[0] + ' (OSC@' + rinfo.address + ':' + rinfo.port + ', client #' + (clients.length - 1) + ')'); break; - case "/disconnect": + case MSG_DISCONNECT: dropClient(lookupClient(ac.TYP_OSC, rinfo.address, rinfo.port), "disconnect by user"); break; @@ -171,7 +138,7 @@ function startServers() { to = lookupClientUsername(oscMsg.data.shift()); title = oscMsg.address; tt = oscMsg.typeTags.slice(1); - msgRec(from, to, title, oscMsg.data, tt); + onMessage(from, to, title, oscMsg.data, tt); break; } }); @@ -182,7 +149,8 @@ function startServers() { debug('oscServer is listening on ' + oscServer.address().address + ':' + oscServer.address().port); httpClient = http.createClient('80', 'plasticsarcastic.com'); - request = httpClient.request('GET', '/nodejs/scrCreateServer.php?ip=' + INTERNAL_IP + '&port=' + OSC_PORT, {'host' : 'plasticsarcastic.com'}); + request = httpClient.request('GET', '/nodejs/scrCreateServer.php?ip=' + INTERNAL_IP + + '&port=' + OSC_PORT, {'host' : 'plasticsarcastic.com'}); request.end(); request.on('response', function (response) { response.setEncoding('utf8'); @@ -194,7 +162,6 @@ function startServers() { }); }); - //"Finalize" the OSC server. oscServer.bind(OSC_PORT, INTERNAL_IP); //Websocket server: @@ -204,15 +171,12 @@ function startServers() { con.addListener('message', function (msg) { debug("connection: message"); - var message = JSON.parse(msg), - from = lookupClient(ac.TYP_WS, con.id), - to = lookupClientUsername(message.to), + var from, to, + message = JSON.parse(msg), title = message.title, - body = message.body; - - - switch (title) { - case "/connect": + body = message.body; + + if (title === MSG_CONNECT) { //Add them to the clients list when they connect if the username is free. for (i = 0; i < clients.length; i += 1) { if (clients[i].name === body[0]) { @@ -222,29 +186,34 @@ function startServers() { } } - client = new ac.WebSocketClient(body[0], con.id); + client = new ac.WebSocketClient(body[0], con.id, wsServer); clients.push(client); - msgSndWs(clients.length - 1, "SYS", "/connect", 1); + client.send("SYS", MSG_CONNECT, 1); debug('Added client ' + clients[clients.length - 1].name + - ' (ws id ' + con.id + ', client #' + (clients.length - 1) + ')'); - break; - - case "/disconnect": - dropClient(from, "disconnect by user."); - break; + ' (ws id ' + con.id + ', client #' + (clients.length - 1) + ')'); + } else { + from = lookupClient(ac.TYP_WS, con.id); + to = lookupClientUsername(message.to); + + switch (title) { + case MSG_DISCONNECT: + clients[from].send("SYS", MSG_DISCONNECT, 1); + dropClient(from, "disconnect by user."); + break; - case "/getClients": - clientList = []; - for (i = 0; i < clients.length; i += 1) { - clientList.push(clients[i].name); - } - msgSndWs(from, "SYS", "/getClients", clientList); - break; + case MSG_GETCLIENTS: + clientList = []; + for (i = 0; i < clients.length; i += 1) { + clientList.push(clients[i].name); + } + clients[from].send("SYS", MSG_GETCLIENTS, clientList); + break; - default: - msgRec(from, to, title, body); - break; + default: + onMessage(from, to, title, body); + break; + } } }); @@ -290,7 +259,26 @@ function startServers() { }); wsServer.listen(WS_PORT); - +/* + // HTTP Server + http.createServer(function (req, res) { + var pathName = URL.parse(req.url, true).pathname; + console.log("HTTP server received " + pathName); + res.writeHead(200, {'Content-Type': 'text/plain'}); + switch (pathName) { + case MSG_CONNECT: + break; + case MSG_DISCONNECT: + break; + case MSG_GETCLIENTS: + break; + default: + break; + } + res.end(req.body + ' Hello World\n'); + }).listen(HTTP_PORT); + debug("httpServer is listening on " + INTERNAL_IP + ":" + HTTP_PORT); +*/ setInterval(kickIdle, 1000); } @@ -298,7 +286,7 @@ function startServers() { // It starts a server for each individual protocol. function start() { - //First, we need to get our internal IP, by parsing ifconfig: + // First, we need to get our internal IP, by parsing ifconfig: if (process.argv.length > 2) { INTERNAL_IP = process.argv[2]; startServers(); diff --git a/acequiaClient.html b/acequiaClient.html index ab5a8b0..6d4b45a 100644 --- a/acequiaClient.html +++ b/acequiaClient.html @@ -1,30 +1,66 @@ + Acequia Client Test Page + + + + + - + + + + + + + + + + + + + +
diff --git a/acequiaClient.js b/acequiaClient.js index 0a89759..a3639ef 100644 --- a/acequiaClient.js +++ b/acequiaClient.js @@ -1,4 +1,4 @@ -/*global WebSocket*/ +/*global WebSocket console AcequiaMessage*/ /** * Creates an event callback by wrapping the object with a closure. @@ -10,64 +10,197 @@ var objCallback = function (obj, func) { return function () { obj[func].apply(obj, arguments); }; -}; +}; /** - * Class to define the outgoing messages from the acequia client. - * @param {String} to + * Defines the AcequiaClient object, which is used to connect with the Acequia server + * through a WebSocket connection. + * @param {String} uri The uri of the connection to the WebSocket. + * @param {String} userName The user name, used to uniquely identify this user. */ -function AcequiaMessage(to, title, body) { - this.to = (typeof(to) === "undefined") ? "" : to; - this.title = title; - this.body = (typeof(body) === "undefined") ? [] : - ((body instanceof Array) ? body : [body]); -} - -AcequiaMessage.prototype.toString = function () { - return JSON.stringify(this); -}; - -var acequiaClient = { +var AcequiaClient = function (uri, userName) { + var m_connected = false; - dataCallback: null, + this.isConnected = function () { + return m_connected; + }; - userName: "", + this.setConnected = function (connected) { + var idx; + m_connected = connected; + for (idx in this.connectionChangeHandlers) { + this.connectionChangeHandlers[idx](connected); + } + }; + + this.uri = uri; + this.userName = userName; + this.webSocket = null; - webSocket: null, + this.listeners = {}; - connect: function (uri, userName, callback) { - this.userName = userName; - this.dataCallback = callback; - - this.webSocket = new WebSocket(uri); + this.connectionChangeHandlers = []; + + // Add listeners for the connect and disconnect commands + this.addMessageListener(AcequiaClient.CONNECT, objCallback(this, "acequia_onConnect")); + this.addMessageListener(AcequiaClient.DISCONNECT, objCallback(this, "acequia_onDisconnect")); +}; + +/** + * {String} The connect command that will be sent to and receieved from acequia. + */ +AcequiaClient.CONNECT = "/connect"; + +/** + * {String} The disconnect command that will be sent to and receieved from acequia. + */ +AcequiaClient.DISCONNECT = "/disconnect"; + +/** + * {String} The getClients command that will be sent to and receieved from acequia. + */ +AcequiaClient.GETCLIENTS = "/getClients"; + +/** + * Adds a listener for the message with the message name. + * @param {String} msgName The name of the message to listen to. + * @param {Function} callback The callback function which will be called when the + * message arrives. + */ +AcequiaClient.prototype.addMessageListener = function (msgName, callback) { + if (!(msgName in this.listeners)) { + this.listeners[msgName] = []; + } + + this.listeners[msgName].push(callback); +}; + +/** + * Adds a listener for the message with the message name. + * @param {String} msgName The name of the message to listen to. + * @param {Function} callback The callback function which will be called when the + * message arrives. + */ +AcequiaClient.prototype.removeMessageListener = function (msgName, callback) { + var idx = this.listeners[msgName].indexOf(callback); + this.listeners[msgName].splice(idx, 1); +}; + +AcequiaClient.prototype.addConnectionChangeHandler = function (handler) { + this.connectionChangeHandlers.push(handler); +}; + +/** + * Connects to the Acequia server by creating a new web socket connection; + */ +AcequiaClient.prototype.connect = function () { + if (this.isConnected()) { + console.error("AcequiaClient.connect: client is already connected"); + } else { + this.webSocket = new WebSocket(this.uri); this.webSocket.onopen = objCallback(this, "ws_onopen"); this.webSocket.onclose = objCallback(this, "ws_onclose"); this.webSocket.onmessage = objCallback(this, "ws_onmessage"); this.webSocket.onerror = objCallback(this, "ws_onerror"); - }, - - disconnect: function () { - this.send('/disconnect'); - }, + } +}; - send: function (title, body, to) { - var msg = new AcequiaMessage(to, title, body); +/** + * Sends the disconnect message to the Acequia server. + */ +AcequiaClient.prototype.disconnect = function () { + this.send(AcequiaClient.DISCONNECT); +}; + +/** + * Sends the getClients message to the Acequia server. + */ +AcequiaClient.prototype.getClients = function () { + this.send(AcequiaClient.GETCLIENTS, this.userName); +}; + +/** + * Sends a message to the Acequia server. + * @param {String} msgName The name of the message. + * @param {Object} body The message body. + * @param {String} to The name of the client that will receive the message. + */ +AcequiaClient.prototype.send = function (msgName, body, to) { + if (msgName !== AcequiaClient.CONNECT && !this.isConnected()) { + console.error("AcequiaClient.send " + msgName + ": client is not connected"); + } else { + var msg = new AcequiaMessage(this.userName, msgName, body, to); this.webSocket.send(msg.toString()); - }, + } +}; - ws_onopen : function (evt) { - this.send('/connect', this.userName); - }, - - ws_onclose: function (evt) { - }, +/** + * Listens for the connect message, then sets connected flag to true. + */ +AcequiaClient.prototype.acequia_onConnect = function () { + this.setConnected(true); +}; + +/** + * Listens for the disconnect message, then sets connected flag to false and closes + * the WebSocket connection. + */ +AcequiaClient.prototype.acequia_onDisconnect = function () { + this.setConnected(false); + this.webSocket.close(); +}; + +/** + * Handles the onopen event from the WebSocket. This method sends the connect + * message to the Acequia client. + * @param {Event} evt The event object. + */ +AcequiaClient.prototype.ws_onopen = function (evt) { + this.send(AcequiaClient.CONNECT, this.userName); +}; + +/** + * Handles the onclose event from the WebSocket. This method sets the WebSocket + * member to null. + * @param {Event} evt The event object. + */ +AcequiaClient.prototype.ws_onclose = function (evt) { + this.webSocket = null; + this.setConnected(false); +}; + +/** + * Handles the onmessage event from the WebSocket. This method calls any message listeners + * that have registered for the message. If there is a wildcard handler registered, then it + * will call that as well. + * @param {Event} evt The event object. + */ +AcequiaClient.prototype.ws_onmessage = function (evt) { + var i, msg = JSON.parse(evt.data); - ws_onmessage: function (evt) { - var msg = JSON.parse(evt.data); - this.dataCallback(msg.from, msg.title, msg.body); - }, + // if there is a message listener for this message, call it. + if (msg.title in this.listeners) { + for (i in this.listeners[msg.title]) { + this.listeners[msg.title][i](msg, this); + } + } - ws_onerror: function (evt) { - alert(evt); + // If there is a wildcard listener, call it. + if ("*" in this.listeners) { + for (i in this.listeners["*"]) { + this.listeners["*"][i](msg, this); + } } }; + +/** + * Handles the onerror event from the WebSocket. This method disconnects from the acequia server + * and closes the websocket connection, in case it is unable to send the message. + * @param {Event} evt The event object. + */ +AcequiaClient.prototype.ws_onerror = function (evt) { + console.error("WebSocket Error: " + evt.data); + this.disconnect(); + this.setConnected(false); + this.webSocket.close(); +}; diff --git a/client.js b/client.js index 8d86f1c..9980c89 100644 --- a/client.js +++ b/client.js @@ -7,38 +7,53 @@ /*global exports */ -var TYP_OSC = 0, - TYP_WS = 1; +var TYP_OSC = "OSC", + TYP_WS = "WEBSOCKET", + TYPE_AJAX = "AJAX"; /** * The base class for all acequia clients. */ -AcequiaClient = function (name, prot) { +var AcequiaClient = function (name, prot, server) { this.name = name; this.protocol = prot; + this.server = server; this.lastMessage = (new Date()).getTime(); }; + AcequiaClient.prototype.equals = function (prot) { return (this.protocol === prot); }; +AcequiaClient.prototype.update = function (prot) { + this.lastMessage = (new Date()).getTime(); +}; + /** * Defines the client connected to acequia via Websockets * @param {String} name The unique user name associated with the client. * @param {String} id The id assigned to the connection by the websocket + * @param {Object} server The WebSocketServer that will be used to send the message. * server */ - -WebSocketClient = function (name, id) { +var WebSocketClient = function (name, id, server) { this.id = id; - AcequiaClient.call(this, name, TYP_WS); + AcequiaClient.call(this, name, TYP_WS, server); }; -WebSocketClient.prototype = new AcequiaClient; +WebSocketClient.prototype = new AcequiaClient(); + WebSocketClient.prototype.equals = function (prot, id) { return (this.protocol === prot && this.id === id); }; +WebSocketClient.prototype.send = function(from, title, body) { + var msgBody = JSON.stringify({'from' : from, 'title' : title, 'body' : body}); + this.server.send(this.id, msgBody); + this.update(); +}; + + /** * Defines the client connected to acequia via an OSC connection * @param {String} name The unique user name associated with the client. @@ -47,20 +62,32 @@ WebSocketClient.prototype.equals = function (prot, id) { * from the client; * @param {Integer} portOut The port that messages will be going out from acequia * to the client; + * @param {Object} server The OSC Server that will be used to send the message. */ -OSCClient = function (name, ip, portIn, portOut) { +var OSCClient = function (name, ip, portIn, portOut, server) { this.ip = ip; this.portIn = portIn; this.portOut = portOut; - AcequiaClient.call(this, name, TYP_OSC); + AcequiaClient.call(this, name, TYP_OSC, server); }; -OSCClient.prototype = new AcequiaClient; +OSCClient.prototype = new AcequiaClient(); + OSCClient.prototype.equals = function (prot, ip, portIn) { return (this.protocol === prot && this.ip === ip && this.portIn === portIn); }; +OSCClient.prototype.send = function(from, title, body, tt) { + var data = [from].concat(body), + oscMsg = osc.newOsc(title, 's' + tt, data), + buffer = osc.oscToBuffer(oscMsg); + + this.server.send(buffer, 0, buffer.length, this.portOut, this.ip); + this.update(); +}; + + // Export the entities that Acequia needs exports.TYP_OSC = TYP_OSC; exports.TYP_WS = TYP_WS; diff --git a/libs/netIP.js b/libs/netIP.js index fc48847..44325ac 100644 --- a/libs/netIP.js +++ b/libs/netIP.js @@ -9,18 +9,13 @@ var getNetworkIP = (function () { var command; var filterRE; - switch (process.platform) { // TODO: implement for OSs without ifconfig command - case 'darwin': + if (process.platform === "darwin") { command = 'ifconfig'; filterRE = /\binet\s+([^\s]+)/g; - // filterRE = /\binet6\s+([^\s]+)/g; // IPv6 - break; - default: + } else { command = 'ifconfig'; filterRE = /\binet\b[^:]+:\s*([^\s]+)/g; - // filterRE = /\binet6[^:]+:\s*([^\s]+)/g; // IPv6 - break; } return function (callback, bypassCache) { @@ -31,18 +26,18 @@ var getNetworkIP = (function () { } // system call exec(command, function (error, stdout, sterr) { - var ips = []; + var i, ips = []; // extract IPs var matches = stdout.match(filterRE); if (matches) { // JS has no lookbehind REs, so we need a trick - for (var i = 0; i < matches.length; i++) { + for (i = 0; i < matches.length; i++) { ips.push(matches[i].replace(filterRE, '$1')); } // filter BS - for (var i = 0, l = ips.length; i < l; i++) { + for (i = 0, l = ips.length; i < l; i++) { if (!ignoreRE.test(ips[i])) { //if (!error) { cached = ips[i]; diff --git a/libs/osc.js b/libs/osc.js index d3f977a..a2d43ff 100644 --- a/libs/osc.js +++ b/libs/osc.js @@ -1,206 +1,267 @@ -var jspack = require('../vendor/node-jspack/jspack').jspack; +/*globals require exports */ +var jspack = require('../vendor/node-jspack/jspack').jspack; +var Buffer = require('buffer').Buffer; -var Osc=function(addr,tt,d){ - this.address=addr; - this.typeTags=tt; - this.data=d; +var Osc = function (addr, tt, d) { + this.address = addr; + this.typeTags = tt; + this.data = d; return this; -} -exports.newOsc=Osc; +}; +exports.newOsc = Osc; -var OscBundle=function(addrs,tts,ds){ - this.addresses=addrs; - this.typeTags=tts; - this.datas=ds; +var OscBundle = function (addrs, tts, ds) { + this.addresses = addrs; + this.typeTags = tts; + this.datas = ds; return this; -} -exports.newOscBundle=OscBundle; +}; +exports.newOscBundle = OscBundle; -exports.bufferToOsc=function(buffer){ - var address=''; - var typeTags=''; - var data=[]; - var i=0; +exports.bufferToOsc = function (buffer) { + var address = '', typeTags = '', data = [], i = 0, j, str; + + while (buffer[i]) { + address += String.fromCharCode(buffer[i]); + i += 1; + } - while(buffer[i]){ - address+=String.fromCharCode(buffer[i]); - i++; + while ((i % 4) < 3) { + i += 1; } - while((i%4)<3){i++;} - i++; + i += 1; - while(buffer[i]){ - if(buffer[i]=='f'.charCodeAt(0) || buffer[i]=='i'.charCodeAt(0) || buffer[i]=='s'.charCodeAt(0)){typeTags+=String.fromCharCode(buffer[i]);} - i++; + while (buffer[i]) { + if (buffer[i] === 'f'.charCodeAt(0) || buffer[i] === 'i'.charCodeAt(0) || + buffer[i] === 's'.charCodeAt(0)) { + typeTags += String.fromCharCode(buffer[i]); + } + i += 1; + } + while ((i % 4) < 3) { + i += 1; } - while((i%4)<3){i++;} - i++; + i += 1; - for(var j=0; j=buffer.length){break;} - i++; + if (j + 16 >= buffer.length) { + break; + } + i += 1; } - return new OscBundle(addresses,typeTags,datas); -} + return new OscBundle(addresses, typeTags, datas); +}; -exports.oscToBuffer=function(osc){ - if(typeof osc.data=="undefined"){osc.data=[];} - var buffer=new Buffer(osc.address.length+osc.typeTags.length+(osc.data.length*4)+80); //could be more efficient. +exports.oscToBuffer = function (osc) { + var byteArr, buffer, str, offset, i, j; + + if (typeof osc.data === "undefined") { + osc.data = []; + } - var str=osc.address+'\0'; - while((str.length%4)!=0){str+='\0';} + // This could be more efficient: + buffer = new Buffer(osc.address.length + osc.typeTags.length + (osc.data.length * 4) + 80); + str = osc.address + '\0'; + while ((str.length % 4) !== 0) { + str += '\0'; + } buffer.write(str); - var offset=str.length; + offset = str.length; - str=','+osc.typeTags+'\0'; - while((str.length%4)!=0){str+='\0';} - buffer.write(str,offset); - offset+=str.length; + str = ',' + osc.typeTags + '\0'; + while ((str.length % 4) !== 0) { + str += '\0'; + } + buffer.write(str, offset); + offset += str.length; /*osc.typeTags=osc.typeTags.replace(/find/s,'c'); var byteArr=jspack.Pack(osc.typeTags,osc.data); console.log(byteArr); - for(var i=0; i=0.2.0-0"} +, "name" : "Acequia" +, "repository":{"type":"git","url":"http://github.com/miksago/node-websocket-server.git"} , "scripts": { "start": "node ./acequia.js" } +, "version" : "0.5.0" }