Skip to content

Commit

Permalink
debugging the datstore and the REST inteface to it.
Browse files Browse the repository at this point in the history
  • Loading branch information
prgsmall committed Aug 1, 2012
1 parent 2b4ad24 commit d50858a
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 55 deletions.
26 changes: 17 additions & 9 deletions lib/acequia.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ AcequiaServer.prototype.createOSCServer = function () {
message = new AcequiaMessage("", oscMsg.address, oscMsg.data);
}

acequiaClients.onMessage(ac.TYP_OSC, message, oscServer, rinfo);
acequiaClients.onOSCMessage(message, oscServer, rinfo);
}
});

Expand Down Expand Up @@ -218,7 +218,7 @@ AcequiaServer.prototype.createTCPServer = function () {
}

for (index = 0; index < msgs.length; index += 1) {
acequiaClients.onMessage(ac.TYP_TCP, msgs[index], socket);
acequiaClients.onTCPMessage(msgs[index], socket);
}
});

Expand Down Expand Up @@ -270,12 +270,20 @@ AcequiaServer.prototype.createWSServer = function () {
// options
app.acequiaServer = this;

app.configure(function(){
app.configure(function () {
app.use(express.methodOverride());
app.use(express.bodyParser());
app.use(app.router);
});


app.all('*', function (req, res, next) {
res.header("Access-Control-Allow-Origin", "*");
res.header("Access-Control-Allow-Headers", "X-Requested-With");
res.header('Access-Control-Allow-Credentials', true);
res.header("Access-Control-Allow-Methods", "POST, GET, PUT, DELETE, OPTIONS");
next();
});

app.get('/acequia/*', function (req, res) {
var pathName = url.parse(req.url, true).pathname,

Expand All @@ -297,14 +305,14 @@ AcequiaServer.prototype.createWSServer = function () {
});

app.get('/datastore', function (req, res) {
res.send('datastore API is running');
res.send('datastore API is running');
});

// Set up the routing for the REST interface
app.get("/datastore/*", objCallback(datastore.REST, "get"));
app.put("/datastore/*", objCallback(datastore.REST, "put"));
app.get("/datastore/*", objCallback(datastore.REST, "get"));
app.put("/datastore/*", objCallback(datastore.REST, "put"));
app.post("/datastore/*", objCallback(datastore.REST, "post"));
app.del("/datastore/*", objCallback(datastore.REST, "del"));
app.del("/datastore/*", objCallback(datastore.REST, "del"));

app.on("listening", function (data) {
logger.debug(" WS Server is listening on [%s:%s]", this.address().address, this.address().port);
Expand Down Expand Up @@ -332,7 +340,7 @@ AcequiaServer.prototype.createWSServer = function () {

socket.on('message', function (data) {
var message = new AcequiaMessage(JSON.parse(data));
acequiaClients.onMessage(ac.TYP_WS, message, socket);
acequiaClients.onWSMessage(message, socket);
});

socket.on("disconnect", function () {
Expand Down
46 changes: 42 additions & 4 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,37 @@ AcequiaClient.prototype.update = function () {
this.lastMessage = (new Date()).getTime();
};

/**
* Subscribes this client to a particular event, if it is not already subscribed
* @param {String} evt The name of the message to subscribe to.
*/
AcequiaClient.prototype.subscribe = function (evt) {
// see if the user is already subscribed
var i, listeners = msgEmitter.listeners(evt);
for (i = 0; i < listeners.length; i += 1) {
if (listeners[i] === this.messageCallback) {
logger.info("Client is already subscribed to " + evt);
return;
}
}

msgEmitter.addListener(evt, this.messageCallback);
this.events[evt] = evt;
};

/**
* Unsubscribes this client to a particular event
* @param {String} evt The name of the message
*/
AcequiaClient.prototype.unsubscribe = function (evt) {
msgEmitter.removeListener(evt, this.messageCallback);
delete this.events[evt];
};

/**
* Unsubscribes from all events
*/
AcequiaClient.prototype.unsubscribeAll = function () {
// TODO: Can we just call a method on msgEmitter?

var i, evt, events = [];
for (evt in this.events) {
events.push(evt);
Expand All @@ -89,6 +107,10 @@ AcequiaClient.prototype.unsubscribeAll = function () {
}
};

/**
* Handles a message sent to the client
* @param {Object} message The message to send.
*/
AcequiaClient.prototype.onMessage = function (message) {

if (message.from === this.name) {
Expand All @@ -98,8 +120,12 @@ AcequiaClient.prototype.onMessage = function (message) {
}
};

AcequiaClient.prototype.send = function () {
logger.error("Method send not implemented");
/**
* Stub method for sending data.
* @param {Object} message The message to send.
*/
AcequiaClient.prototype.send = function (message) {
throw new Error("Method send not implemented");
};

//=============================================================================
Expand Down Expand Up @@ -232,6 +258,18 @@ AcequiaClients.prototype.createClient = function (type, message, socket, rinfo)
}
};

AcequiaClients.prototype.onOSCMessage = function (message, socket, rinfo) {
return this.onMessage(TYP_OSC, message, socket, rinfo);
};

AcequiaClients.prototype.onTCPMessage = function (message, socket) {
return this.onMessage(TYP_TCP, message, socket);
};

AcequiaClients.prototype.onWSMessage = function (message, socket) {
return this.onMessage(TYP_WS, message, socket);
};

/**
* Processor for the acequia messages
* @param type {string} The type of client
Expand Down
12 changes: 11 additions & 1 deletion lib/client/ClientStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,21 @@ ClientStore.prototype.normalizePath = function (path) {
*/
ClientStore.prototype.on = function (eventType, callback) {
this.acequiaClient.on(this.changedMessage(this.path), callback);
this.acequiaClient.send(msg.MSG_SUBSCRIBE_DS, {path: this.path});
this.subscribe();
};

ClientStore.prototype.set = function (value) {
this.acequiaClient.send(msg.MSG_UPDATE_DS, value);
};

ClientStore.prototype.subscribe = function() {
if (!this.acequiaClient.isConnected()) {
this.acequiaClient.addConnectionChangeHandler(objCallback(this, "sub"));
} else {
this.sub();
}
};

ClientStore.prototype.sub = function () {
this.acequiaClient.send(msg.MSG_SUBSCRIBE_DS, {path: this.path});
};
83 changes: 65 additions & 18 deletions lib/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

var fs = require("fs"),
path = require("path"),
URL = require("url"),
msg = require("./msg"),
mkdirp = require("./utils").mkdirp,
deepCopy = require("./utils").deepCopy,
Expand Down Expand Up @@ -122,7 +123,7 @@ DatastoreBase.prototype.sendChangeEvents = function (path) {
msg_path += "/" + parts[i];
obj = obj[parts[i]];
this.acequiaServer.send("datastore",
this.changedMessage(path),
this.changedMessage(msg_path),
{path: msg_path,
value: obj});
}
Expand Down Expand Up @@ -150,6 +151,8 @@ DatastoreBase.prototype.onUpdateDataStore = function (message) {
* @returns the normalized path
*/
DatastoreBase.prototype.normalizePath = function (path) {
path = path.split("?")[0];

if (path[0] === "/") {
path = path.substring(1);
}
Expand Down Expand Up @@ -239,20 +242,51 @@ DatastoreBase.prototype.createResponse = function (path, description) {
};
};

DatastoreBase.prototype.parseQS = function (qs) {
var i, ret = {}, parts, pair;
if (typeof(qs) !== "undefined") {
parts = qs.split("&");
for (i = 0; i < parts.length; i += 1) {
pair = parts[i].split("=");
ret[pair[0]] = pair[1];
}
}
return ret;
};

DatastoreBase.prototype.parseUrl = function (url) {
var urlObject = URL.parse(url), qs = this.parseQS(urlObject.query);
return {
path: urlObject.pathname,
callback: qs.callback
};
};

DatastoreBase.prototype.genResponse = function (obj, callback) {
if (typeof(callback) === "undefined") {
return obj;
} else {
return callback + "(" + JSON.stringify(obj) + ")";
}
};

/**
* Handler for the REST GET request
* @param {Object} req The request object
* @param {Object} res The response object
*/
DatastoreBase.prototype.get = function (req, res) {
var obj = this.objectFromPath(req.url);
logger.debug("GET: " + req.url);

var obj, urlobj;
urlobj = this.parseUrl(req.url);
obj = this.objectFromPath(urlobj.path);

if (typeof(obj) === "undefined") {
obj = this.createResponse(req.url, req.url + " is undefined");
return res.json(obj, 404);
} else {
return res.send(obj);
obj = this.createResponse(req.url, urlobj.path + " is undefined");
res.statusCode = 404;
}
return res.send(this.genResponse(obj, urlobj.callback));
};

/**
Expand All @@ -261,16 +295,20 @@ DatastoreBase.prototype.get = function (req, res) {
* @param {Object} res The response object
*/
DatastoreBase.prototype.post = function (req, res) {
var obj = this.objectFromPath(req.url);
logger.debug("POST: " + req.url);

var obj, urlobj;
urlobj = this.parseUrl(req.url);
obj = this.objectFromPath(urlobj.path);

if (typeof(obj) !== "undefined") {
obj = this.createResponse(req.url, "Cannot create: " + req.url + ". It is already defined.");
return res.json(obj, 404);
obj = this.createResponse(req.url, "Cannot create: " + urlobj.path + ". It is already defined.");
res.statusCode = 404;
} else {
obj = this.setDataStoreValue(req.url, req.body);
this.saveDatastore(req.url);
return res.send(obj);
this.saveDatastore(urlobj.path);
}
return res.send(this.genResponse(obj, urlobj.callback));
};

/**
Expand All @@ -279,7 +317,11 @@ DatastoreBase.prototype.post = function (req, res) {
* @param {Object} res The response object
*/
DatastoreBase.prototype.put = function (req, res) {
var obj = this.objectFromPath(req.url);
logger.debug("PUT: " + req.url);

var obj, urlobj;
urlobj = this.parseUrl(req.url);
obj = this.objectFromPath(urlobj.path);

if (typeof(obj) !== "undefined") {
this.appendAttributes(obj, req.body);
Expand All @@ -289,7 +331,7 @@ DatastoreBase.prototype.put = function (req, res) {

this.saveDatastore(req.url);

return res.send(obj);
return res.send(this.genResponse(obj, urlobj.callback));
};

/**
Expand All @@ -298,11 +340,15 @@ DatastoreBase.prototype.put = function (req, res) {
* @param {Object} res The response object
*/
DatastoreBase.prototype.del = function (req, res) {
var parts, i, path, obj = this.objectFromPath(req.url);

logger.debug("DELETE: " + req.url);

var parts, i, path,
urlobj = this.parseUrl(req.url),
obj = this.objectFromPath(urlobj.path);

if (typeof(obj) === "undefined") {
obj = this.createResponse(req.url, req.url + " is undefined.");
return res.json(obj, 404);
res.statusCode = 404;
} else {
path = this.normalizePath(req.url);
parts = path.split("/");
Expand All @@ -314,9 +360,10 @@ DatastoreBase.prototype.del = function (req, res) {
delete obj[parts[i]];

this.saveDatastore(req.url);

return res.send(this.createResponse(req.url));
obj = this.createResponse(req.url);
}

return res.send(this.genResponse(obj, urlobj.callback));
};


Expand Down
4 changes: 3 additions & 1 deletion server.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
var acequia = require('./lib/acequia');
var options = {};

var START = function () {
var acequiaServer = acequia.createServer();
var acequiaServer = acequia.createServer(options);
acequiaServer.start();
};

Expand All @@ -11,6 +12,7 @@ var start = function () {
process.argv.forEach(function (val, index, array) {
if (val === "--debug") {
timeout = 20000;
options["minify_client"] = false;
}
});

Expand Down
Loading

0 comments on commit d50858a

Please sign in to comment.