diff --git a/commands/export.bones b/commands/export.bones index 1ae65aec4..cbcbca09a 100644 --- a/commands/export.bones +++ b/commands/export.bones @@ -108,12 +108,6 @@ command.prototype.initialize = function(plugin, callback) { // Set process title. process.title = 'tm-' + path.basename(opts.filepath); - // Catch SIGINT. - process.on('SIGINT', function () { - console.log('Got SIGINT. Run "kill ' + process.pid + '" to terminate.'); - }); - process.on('SIGUSR1', process.exit); - // Upload format does not require loaded project. if (opts.format === 'upload') return this[opts.format](callback); diff --git a/lib/queue.js b/lib/queue.js new file mode 100644 index 000000000..f695df095 --- /dev/null +++ b/lib/queue.js @@ -0,0 +1,46 @@ +var util = require('util'); +var EventEmitter = require('events').EventEmitter; + +module.exports = Queue; +function Queue(callback, concurrency, timeout) { + this.callback = callback; + this.concurrency = concurrency || 10; + this.timeout = timeout || 0; + this.add = this.add.bind(this); + this.next = this.next.bind(this); + this.invoke = this.invoke.bind(this); + this.queue = []; + this.running = 0; +} +util.inherits(Queue, EventEmitter); + +Queue.prototype.add = function(item) { + this.queue.push(item); + if (this.running < this.concurrency) { + this.running++; + this.next(); + } +}; + +Queue.prototype.invoke = function() { + if (this.queue.length) { + this.callback(this.queue.shift(), this.next); + } else { + this.next(); + } +}; + +Queue.prototype.next = function(err) { + if (this.queue.length) { + if (this.timeout) { + setTimeout(this.invoke, this.timeout); + } else { + process.nextTick(this.invoke); + } + } else { + this.running--; + if (!this.running) { + this.emit('empty'); + } + } +}; diff --git a/models/Export.bones b/models/Export.bones index dcc4b2dfa..62e11ea2b 100644 --- a/models/Export.bones +++ b/models/Export.bones @@ -63,12 +63,9 @@ model.prototype.schema = { }; model.prototype.initialize = function() { - if (this.isNew()){ - this.set({ - created: +new Date, - id: (+new Date) + '' - }, {silent: true}); - } + if (this.isNew()) this.set({ + id: Date.now().toString() + }, {silent: true}); }; model.prototype.url = function() { diff --git a/models/Exports.server.bones b/models/Exports.server.bones index d0e261d7a..cbe1c626c 100644 --- a/models/Exports.server.bones +++ b/models/Exports.server.bones @@ -1,16 +1,25 @@ var Step = require('step'), + Queue = require('../lib/queue'), fs = require('fs'), path = require('path'), exec = require('child_process').exec, spawn = require('child_process').spawn, - settings = Bones.plugin.config; + settings = Bones.plugin.config, + pids = {}; + +// Queue exports with concurrency 4. +// @TODO make concurrency configurable? +var queue = new Queue(start, 4); + +function start(id, callback) { + var model = new models.Export({id:id}); + Step(function() { + Backbone.sync('read', model, + function(data) { this(null, data) }.bind(this), + function(err) { this(err) }.bind(this)) + }, function(err, data) { + if (err || data.status !== 'waiting') return callback(); -// @TODO: need a queue system. Difficult to manage atm because process -// completion is now determined outside this process. -// See http://en.wikipedia.org/wiki/SIGCHLD ... may be useful for determining -// when a child process has died and can be removed from the pool. -var start = function(model, data, callback) { - if (data.status === 'waiting') { var args = []; // nice the export process. args.push('-n19'); @@ -37,72 +46,73 @@ var start = function(model, data, callback) { if (data.minzoom) args.push('--minzoom=' + data.minzoom); if (data.maxzoom) args.push('--maxzoom=' + data.maxzoom); - var options = { - env: process.env, + var child = spawn('nice', args, { + env: _(process.env).extend({ + tilemillConfig:JSON.stringify(settings) + }), cwd: undefined, customFds: [-1, -1, -1], setsid: false - }; - options.env.tilemillConfig = JSON.stringify(settings); - var child = spawn('nice', args, options); - model.set({pid:child.pid, status:'processing'}); - Backbone.sync('update', model, callback, callback); - } else if (data.status === 'processing') { - var pid = data.pid || 0; - exec('ps -p ' + pid + ' | grep ' + pid, function(err, stdout) { - if (!err) return callback(); - model.set({status: 'error', error: 'Export process died.'}); - Backbone.sync('update', model, callback, callback); }); - } else { - callback(); - } + child.on('exit', callback); + pids[child.pid] = true; + (new models.Export(data)).save({ + pid:child.pid, + created:Date.now(), + status:'processing' + }); + }); +}; + +// Export child processes are managed from the parent: +// 1. when an export model with status 'waiting' is created or read +// it is queued for export. +// 2. when reading models the process health is checked. if the pid +// is not found, the model's status should be updated. +function check(data) { + if (data.status === 'processing' && data.pid && !pids[data.pid]) + return { status: 'error', error: 'Export process died' }; + if (data.status === 'waiting' && !_(queue.queue).include(data.id)) + queue.add(data.id); }; models.Export.prototype.sync = function(method, model, success, error) { switch (method) { - // Export child processes are managed from the parent: - // 1. when an export model is first created a process is started. - // 2. when reading models the process health is checked. case 'read': case 'create': + case 'update': + if (!model.id) throw new Error('Model ID is required.'); Backbone.sync(method, model, function(data) { - start(model, model.toJSON(), function() { - success(data); - }); + var attr = check(model.toJSON()); + if (!attr) return success(data); + + // If attributes are set we must further update this model + // to reflect its process status (it's dead, basically). + model.set(attr); + Backbone.sync('update', model, function() { + success(_(data).extend(attr)); + }, error); }, error); break; - // Updates occur via the child process. - case 'update': - Backbone.sync(method, model, success, error); - break; - // Deletion kills the child process and removes the export file if it - // exists. Note that SIGUSR1 is used instead of SIGINT for two reasons: - // 1. The child process does not exit directly on SIGINT to prevent it - // from going down if the parent goes down. - // 2. If the model `pid` is stale and somehow the `pid` is now occupied - // by another process SIGUSR1 likely won't kill the process. + // Deletion kills the child process and removes the export file. case 'delete': Step(function() { Backbone.sync('read', model, this, this); - }, - function(data) { - if (data && data.pid) { - // Try/catch as process may not exist. - try { process.kill(data.pid, 'SIGUSR1'); } + }, function(data) { + // Try/catch as process may not exist. + if (data && data.pid && pids[data.pid]) { + delete pids[data.pid]; + // try { process.kill(data.pid, 'SIGUSR1'); } + try { process.kill(data.pid, 'SIGINT'); } catch(err) {} } if (data && data.filename && data.format !== 'upload') { - var filepath = path.join(settings.files, 'export', data.filename); - path.exists(filepath, function(exists) { - if (exists) return fs.unlink(filepath, this); - this(); - }.bind(this)); + fs.unlink(path.join(settings.files, 'export', data.filename), this); } else { this(); } - }, - function(err) { + }, function(err) { + if (err && err.code !== 'ENOENT') return error(err); Backbone.sync(method, model, success, error); }); break; @@ -111,15 +121,16 @@ models.Export.prototype.sync = function(method, model, success, error) { models.Exports.prototype.sync = function(method, model, success, error) { if (method !== 'read') return success({}); - Backbone.sync(method, model, function(data) { - Step(function() { - var group = this.group(); - _(data).each(function(m) { - var model = new models.Export(m); - start(model, model.toJSON(), group()); - }); - }, function() { - success(data); - }); - }, error); + Step(function() { + Backbone.sync(method, model, this, error); + }, function(data) { + if (!data || !data.length) return this(null, []); + Bones.utils.fetch(data.reduce(function(memo, d) { + memo[d.id] = new models.Export(d); + return memo; + }, {}), this); + }, function(err, models) { + if (err) return error(err); + success(_(models).map(function(m) { return m.toJSON() })); + }); }; diff --git a/models/Preview.server.bones b/models/Preview.server.bones index 3224b6855..507c68407 100644 --- a/models/Preview.server.bones +++ b/models/Preview.server.bones @@ -9,7 +9,7 @@ models.Preview.prototype.sync = function(method, model, success, error) { if (err) return error(err); source.getInfo(function(err, info) { if (err) return error(err); - info.tiles = ['/tile/' + model.id + '/{z}/{x}/{y}.png']; + info.tiles = ['http://' + settings.tileUrl + '/tile/' + model.id + '/{z}/{x}/{y}.png']; success(_(info).extend({id: model.id })); }); }); diff --git a/templates/Exports._ b/templates/Exports._ index ee55fcd71..f66a9bffc 100644 --- a/templates/Exports._ +++ b/templates/Exports._ @@ -11,7 +11,11 @@

<%= m.get('filename') %>

- Started <%= (new Date(m.get('created'))).format('F j h:ia') %> + <% if (m.get('created')) { %> + Started <%= (new Date(m.get('created'))).format('M j h:ia') %> + <% } else if (m.get('status') === 'waiting') { %> + Waiting to be processed + <% } %> <% if (m.get('updated')) { %> — <%= obj.time(m.get('updated') - m.get('created')) %> <% } %> diff --git a/test/abilities.test.js b/test/abilities.test.js index 0ef37b8e1..8cd53914b 100644 --- a/test/abilities.test.js +++ b/test/abilities.test.js @@ -1,6 +1,8 @@ var assert = require('assert'); require('./support/start')(function(command) { + command.servers['Tile'].close(); + exports['test abilities endpoint'] = function() { assert.response(command.servers['Core'], { url: '/assets/tilemill/js/abilities.js' }, diff --git a/test/config.test.js b/test/config.test.js index 6ed83ca4c..0bc51c414 100644 --- a/test/config.test.js +++ b/test/config.test.js @@ -2,6 +2,8 @@ var assert = require('assert'); var Step = require('step'); require('./support/start')(function(command) { +command.servers['Tile'].close(); + exports['config'] = function() { var server = command.servers['Core']; diff --git a/test/datasource.test.js b/test/datasource.test.js index 738c53994..365588ae6 100644 --- a/test/datasource.test.js +++ b/test/datasource.test.js @@ -8,6 +8,7 @@ function readJSON(name) { } require('./support/start')(function(command) { + command.servers['Core'].close(); exports['test sqlite datasource'] = function() { assert.response(command.servers['Tile'], diff --git a/test/export.test.js b/test/export.test.js index 961f4b93b..444411c30 100644 --- a/test/export.test.js +++ b/test/export.test.js @@ -8,14 +8,13 @@ function readJSON(name) { } require('./support/start')(function(command) { + command.servers['Tile'].close(); exports['test export job creation'] = function(beforeExit) { var completed = false; - var created = Date.now() - var id = String(created); + var id = Date.now().toString(); var job = readJSON('export-job'); var token = job['bones.token']; - job.created = created; job.id = id; assert.response(command.servers['Core'], { @@ -35,10 +34,12 @@ require('./support/start')(function(command) { }, { status: 200 }, function(res) { var body = JSON.parse(res.body); job.status = "processing"; - delete job['bones.token']; assert.ok(body[0].pid); + assert.ok(body[0].created); + delete job['bones.token']; + delete body[0].created; delete body[0].pid; - assert.deepEqual([job], body); + assert.deepEqual(job, body[0]); job['bones.token'] = token; assert.response(command.servers['Core'], { diff --git a/test/fixtures/export-job.json b/test/fixtures/export-job.json index b90680f1d..366a52f7a 100644 --- a/test/fixtures/export-job.json +++ b/test/fixtures/export-job.json @@ -4,11 +4,10 @@ "format": "mbtiles", "project": "demo_01", "tile_format": "png", - "created": 0, "id": "", "filename": "demo_01.mbtiles", "bbox": [ -29.5642, 31.8402, 42.1545, 71.9245 ], "minzoom": 0, - "maxzoom": 9, + "maxzoom": 2, "bones.token": "zbx6dr0ghgvRNZOuu8PXtt2VCAIKO2qK" } \ No newline at end of file diff --git a/test/project.test.js b/test/project.test.js index 9c2f092ab..92179dae2 100644 --- a/test/project.test.js +++ b/test/project.test.js @@ -19,6 +19,8 @@ function cleanProject(proj) { } require('./support/start')(function(command) { + command.servers['Tile'].close(); + exports['test project collection endpoint'] = function() { assert.response(command.servers['Core'], { url: '/api/Project' }, diff --git a/test/support/start.js b/test/support/start.js index cfb94994e..8174927fd 100644 --- a/test/support/start.js +++ b/test/support/start.js @@ -1,6 +1,7 @@ process.env.NODE_ENV = 'test'; process.argv[2] = 'test'; +var Queue = require('../../lib/queue'); var Step = require('step'); var exec = require('child_process').exec; var path = require('path'); @@ -12,12 +13,31 @@ process.env.HOME = path.resolve(__dirname + '/../fixtures/files'); try { fs.unlinkSync(process.env.HOME + '/.tilemill.json'); } catch (err) { if (err.code !== 'ENOENT') throw err } -// Load application. This file's purpose is to start TileMill only once and -// share the server with all tests. +// Load application. require('../..'); var tilemill = require('bones').plugin; tilemill.config.files = path.resolve(__dirname + '/../fixtures/files'); tilemill.config.examples = false; + +var queue = new Queue(function(next, done) { + var command = tilemill.start(function() { + var remaining = 2; + command.servers['Core'].close = (function(parent) { return function() { + if (remaining-- === 1) done(); + return parent.apply(this, arguments); + }})(command.servers['Core'].close); + command.servers['Tile'].close = (function(parent) { return function() { + if (remaining-- === 1) done(); + return parent.apply(this, arguments); + }})(command.servers['Tile'].close); + next(command); + }); +}, 1); +// @TODO: +// Not sure why this is necessary. tile.test.js doesn't seem to exit out +// despite both servers closing. +queue.on('empty', process.exit); + tilemill.commands.test.augment({ bootstrap: function(parent, plugin, callback) { // Create a clean environment. @@ -39,13 +59,5 @@ tilemill.commands.test.augment({ } }); -var started = false, waiting = []; -var command = tilemill.start(function() { - started = true; - for (var fn; fn = waiting.shift();) fn(command); -}); +module.exports = queue.add; -module.exports = function(cb) { - if (started) cb(command); - else waiting.push(cb); -}; diff --git a/test/tile.test.js b/test/tile.test.js index 83957964a..8382cd492 100644 --- a/test/tile.test.js +++ b/test/tile.test.js @@ -1,6 +1,8 @@ var assert = require('assert'); require('./support/start')(function(command) { + command.servers['Core'].close(); + exports['test non-existant tile endpoint'] = function() { assert.response(command.servers['Tile'], { url: '/tile/does_not_exist/0/0/0.png', encoding: 'binary' },