Skip to content

Commit

Permalink
Added option to spawn multiple sender processes
Browse files Browse the repository at this point in the history
  • Loading branch information
andris9 committed Dec 10, 2016
1 parent 88fe24a commit 8ca1fbb
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 130 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.20.0 2016-12-11

* Added option to distribute sending queue between multiple processes to speed up delivery

## 1.19.0 2016-09-15

* Changed license from GPL-V3 to MIT
Expand Down
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ Check out [ZoneMTA](https://github.com/zone-eu/zone-mta) as an alternative self

## Cons

* Alpha-grade software. Might or might not work as expected
* Awful code base, needs refactoring
* No tests
* Beta-grade software. Might or might not work as expected. There are several users with list sizes between 100k and 1M and Mailtrain seems to work for them but YMMV
* Almost no documentation (there are some guides in the [Wiki](https://github.com/andris9/mailtrain/wiki))

## Requirements

* Nodejs v5+
* Nodejs v6+
* MySQL v5.5 or MariaDB
* Redis (optional, disabled by default, used only for session storage)
* Redis. Optional, disabled by default. Used for session storage and for caching state between multiple processes. If you do not have Redis enabled then you can only use a single sender process

## Installation

Expand Down
2 changes: 1 addition & 1 deletion config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,5 @@ host="127.0.0.1"

[queue]
# How many parallel sender processes to spawn
# Do not use more than 1 for now as it would create race conditions
# You can use more than 1 process only if you have Redis enabled
processes=1
5 changes: 5 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ function spawnSenders(callback) {
let spawned = 0;
let returned = false;

if (processes > 1 && !config.redis.enabled) {
log.error('Queue', '%s processes requested but Redis is not enabled, spawning 1 process', processes);
processes = 1;
}

let spawnSender = function () {
if (spawned >= processes) {
if (!returned) {
Expand Down
28 changes: 0 additions & 28 deletions lib/caches.js

This file was deleted.

94 changes: 94 additions & 0 deletions lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,99 @@

let config = require('config');
let mysql = require('mysql');
let redis = require('redis');
let Lock = require('redfour');

module.exports = mysql.createPool(config.mysql);
if (config.redis.enabled) {

module.exports.redis = redis.createClient(config.redis);

let queueLock = new Lock({
redis: config.redis,
namespace: 'mailtrain:lock'
});

module.exports.getLock = (id, callback) => {
queueLock.waitAcquireLock(id, 60 * 1000 /* Lock expires after 60sec */ , 10 * 1000 /* Wait for lock for up to 10sec */ , (err, lock) => {
if (err) {
return callback(err);
}
if (!lock) {
return callback(null, false);
}
return callback(null, {
lock,
release(done) {
queueLock.releaseLock(lock, done);
}
});
});
};

module.exports.clearCache = (key, callback) => {
module.exports.redis.del(key, err => callback(err));
};

module.exports.addToCache = (key, value, callback) => {
if (!value) {
return setImmediate(() => callback());
}
module.exports.redis.multi().
lpush('mailtrain:cache:' + key, JSON.stringify(value)).
expire('mailtrain:cache:' + key, 24 * 3600).
exec(err => callback(err));
};

module.exports.getFromCache = (key, callback) => {
module.exports.redis.rpop('mailtrain:cache:' + key, (err, value) => {
if (err) {
return callback(err);
}
try {
value = JSON.parse(value);
} catch (E) {
return callback(E);
}

return callback(null, value);
});
};

} else {
// fakelock. does not lock anything
module.exports.getLock = (id, callback) => {
setImmediate(() => callback(null, {
lock: false,
release(done) {
setImmediate(done);
}
}));
};

let caches = new Map();

module.exports.clearCache = (key, callback) => {
caches.delete(key);
setImmediate(() => callback());
};

module.exports.addToCache = (key, value, callback) => {
if (!caches.has(key)) {
caches.set(key, []);
}
caches.get(key).push(value);
setImmediate(() => callback());
};

module.exports.getFromCache = (key, callback) => {
let value;
if (caches.has(key)) {
value = caches.get(key).shift();
if (!caches.get(key).length) {
caches.delete(key);
}
}
setImmediate(() => callback(null, value));
};
}
8 changes: 5 additions & 3 deletions lib/mailer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ let nodemailer = require('nodemailer');
let openpgpEncrypt = require('nodemailer-openpgp').openpgpEncrypt;
let settings = require('./models/settings');
let tools = require('./tools');
let caches = require('./caches');
let db = require('./db');
let Handlebars = require('handlebars');
let fs = require('fs');
let path = require('path');
Expand Down Expand Up @@ -156,6 +156,7 @@ function createMailer(callback) {
rejectUnauthorized: !configItems.smtpSelfSigned
}
}, config.nodemailer);

module.exports.transport.use('stream', openpgpEncrypt({
signingKey: configItems.pgpPrivateKey,
passphrase: configItems.pgpPassphrase
Expand Down Expand Up @@ -187,8 +188,9 @@ function createMailer(callback) {
}
};

caches.cache.delete('sender queue');
return callback(null, module.exports.transport);
db.clearCache('sender', () => {
callback(null, module.exports.transport);
});
});
}

Expand Down
32 changes: 17 additions & 15 deletions lib/models/campaigns.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ let isUrl = require('is-url');
let feed = require('../feed');
let log = require('npmlog');
let mailer = require('../mailer');
let caches = require('../caches');
let humanize = require('humanize');

let allowedKeys = ['description', 'from', 'address', 'reply_to', 'subject', 'template', 'source_url', 'list', 'segment', 'html', 'text', 'tracking_disabled'];
Expand Down Expand Up @@ -894,8 +893,9 @@ module.exports.delete = (id, callback) => {
return callback(err);
}

caches.cache.delete('sender queue');
return callback(null, affected);
db.clearCache('sender', () => {
callback(null, affected);
});
});
});
});
Expand Down Expand Up @@ -959,8 +959,9 @@ module.exports.pause = (id, callback) => {
if (err) {
return callback(err);
}
caches.cache.delete('sender queue');
return callback(null, true);
db.clearCache('sender', () => {
callback(null, true);
});
});
});
});
Expand All @@ -987,23 +988,24 @@ module.exports.reset = (id, callback) => {
return callback(err);
}

caches.cache.delete('sender queue');
connection.query('UPDATE links SET `clicks`=0 WHERE campaign=?', [id], err => {
if (err) {
connection.release();
return callback(err);
}
connection.query('TRUNCATE TABLE `campaign__' + id + '`', [id], err => {
db.clearCache('sender', () => {
connection.query('UPDATE links SET `clicks`=0 WHERE campaign=?', [id], err => {
if (err) {
connection.release();
return callback(err);
}
connection.query('TRUNCATE TABLE `campaign_tracker__' + id + '`', [id], err => {
connection.release();
connection.query('TRUNCATE TABLE `campaign__' + id + '`', [id], err => {
if (err) {
connection.release();
return callback(err);
}
return callback(null, true);
connection.query('TRUNCATE TABLE `campaign_tracker__' + id + '`', [id], err => {
connection.release();
if (err) {
return callback(err);
}
return callback(null, true);
});
});
});
});
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "mailtrain",
"private": true,
"version": "1.19.1",
"version": "1.20.0",
"description": "Self hosted email newsletter app",
"main": "index.js",
"scripts": {
Expand Down Expand Up @@ -67,6 +67,8 @@
"openpgp": "^2.3.5",
"passport": "^0.3.2",
"passport-local": "^1.0.0",
"redfour": "^1.0.0",
"redis": "^2.6.3",
"request": "^2.79.0",
"serve-favicon": "^2.3.2",
"shortid": "^2.2.6",
Expand Down
Loading

0 comments on commit 8ca1fbb

Please sign in to comment.