diff --git a/bin/validatorWorker.js b/bin/validatorWorker.js index d59ac6b1..b57931fe 100755 --- a/bin/validatorWorker.js +++ b/bin/validatorWorker.js @@ -67,7 +67,6 @@ function validatorTick(channel) { const tick = isLeader ? leader.tick : follower.tick return tick(adapter, channel) } - function wait(ms) { return new Promise((resolve, _) => setTimeout(resolve, ms)) } diff --git a/routes/channel.js b/routes/channel.js index b595f5ce..a1132b78 100644 --- a/routes/channel.js +++ b/routes/channel.js @@ -156,11 +156,14 @@ function isValidatorMsgValid(msg) { // @TODO either make this more sophisticated, or rewrite this in a type-safe lang // for example, we should validate if every value in balances is a positive integer return msg - && typeof(msg.stateRoot) === 'string' && msg.stateRoot.length == 64 + && ( + (typeof(msg.stateRoot) === 'string' && msg.stateRoot.length == 64) + || typeof(msg.timestamp) === 'string' + ) && typeof(msg.signature) === 'string' && ( (msg.type === 'NewState' && typeof(msg.balances) === 'object') - || msg.type === 'ApproveState' + || msg.type === 'ApproveState' || msg.type === 'HeartBeat' ) } diff --git a/services/validatorWorker/follower.js b/services/validatorWorker/follower.js index c0349f0d..682e1247 100644 --- a/services/validatorWorker/follower.js +++ b/services/validatorWorker/follower.js @@ -1,9 +1,10 @@ -const BN = require('bn.js') const assert = require('assert') const db = require('../../db') const { persistAndPropagate } = require('./lib/propagation') -const { isValidTransition, isHealthy, isValidRootHash } = require('./lib/followerRules') +const { isValidTransition, isHealthy } = require('./lib/followerRules') +const { isValidRootHash, toBNMap } = require('./lib') const producer = require('./producer') +const heartbeat = require('./heartbeat') function tick(adapter, channel) { // @TODO: there's a flaw if we use this in a more-than-two validator setup @@ -28,6 +29,15 @@ function tick(adapter, channel) { return onNewState(adapter, { ...res, newMsg, approveMsg }) }) }) + .then(function(res){ + if(res && res.nothingNew){ + // send heartbeat + return heartbeat(adapter, channel) + .then(() => res) + } else { + return res + } + }) } function onNewState(adapter, {channel, balances, newMsg, approveMsg}) { @@ -57,7 +67,7 @@ function onNewState(adapter, {channel, balances, newMsg, approveMsg}) { console.error(`validatatorWorker: ${channel.id}: invalid signature NewState`, prevBalances, newBalances) return { nothingNew: true } } - + const stateRootRaw = Buffer.from(stateRoot, 'hex') return adapter.sign(stateRootRaw) .then(function(signature) { @@ -71,13 +81,6 @@ function onNewState(adapter, {channel, balances, newMsg, approveMsg}) { }) } -function toBNMap(raw) { - assert.ok(raw && typeof(raw) === 'object', 'raw map is a valid object') - const balances = {} - Object.entries(raw).forEach(([acc, bal]) => balances[acc] = new BN(bal, 10)) - return balances -} - // @TODO getLatestMsg should be a part of a DB abstraction so we can use it in other places too // e.g. validating on POST /validator-messages (to get the previous), and a public API to get the latest msgs of a type function getLatestMsg(channelId, from, type) { diff --git a/services/validatorWorker/heartbeat.js b/services/validatorWorker/heartbeat.js new file mode 100644 index 00000000..7b61f767 --- /dev/null +++ b/services/validatorWorker/heartbeat.js @@ -0,0 +1,35 @@ +const assert = require('assert') +const { persistAndPropagate } = require('./lib/propagation') + +function heartbeat(adapter, channel){ + const whoami = adapter.whoami() + const validatorIdx = channel.validators.indexOf(whoami) + assert.ok(validatorIdx !== -1, 'validatorTick: sending heartbeat for a channel where we are not validating') + const otherValidators = channel.spec.validators.filter(v => v.id != whoami) + + let timestamp = Buffer.alloc(32); + timestamp.writeUIntBE(Date.now(), 26, 6); + + // in the future, we can add more information to this tree, + // such as the validator node capacity/status, + // or a proof of 'no earlier than' (hash of the latest blockchain block) + const tree = new adapter.MerkleTree([ timestamp ]) + const infoRootRaw = tree.getRoot() + + const stateRootRaw = adapter.getSignableStateRoot(Buffer.from(channel.id), infoRootRaw) + + return adapter.sign(stateRootRaw) + .then(function(signature) { + const stateRoot = stateRootRaw.toString('hex') + timestamp = timestamp.toString('hex') + + return persistAndPropagate(adapter, otherValidators, channel, { + type: 'HeartBeat', + timestamp, + signature, + stateRoot, + }); + }) +} + +module.exports = heartbeat diff --git a/services/validatorWorker/leader.js b/services/validatorWorker/leader.js index 2573b671..d9d3aeb5 100644 --- a/services/validatorWorker/leader.js +++ b/services/validatorWorker/leader.js @@ -1,7 +1,7 @@ const { persistAndPropagate } = require('./lib/propagation') -const { getStateRootHash } = require('./lib/followerRules') - +const { getStateRootHash } = require('./lib') const producer = require('./producer') +const heartbeat = require('./heartbeat') function tick(adapter, channel) { return producer.tick(channel) @@ -9,6 +9,16 @@ function tick(adapter, channel) { res => res.newStateTree ? afterProducer(adapter, res) : { nothingNew: true } + ).then( + res => { + if(res && res.nothingNew){ + // send heartbeat + return heartbeat(adapter, channel) + .then(() => res) + } else { + return res + } + } ) } diff --git a/services/validatorWorker/lib/followerRules.js b/services/validatorWorker/lib/followerRules.js index c1fd730b..10070ad1 100644 --- a/services/validatorWorker/lib/followerRules.js +++ b/services/validatorWorker/lib/followerRules.js @@ -47,21 +47,6 @@ function sumMins(our, approved) { ) } -function getStateRootHash(channel, balances, adapter){ - // Note: MerkleTree takes care of deduplicating and sorting - const elems = Object.keys(balances).map( - acc => adapter.getBalanceLeaf(acc, balances[acc]) - ) - const tree = new adapter.MerkleTree(elems) - const balanceRoot = tree.getRoot() - // keccak256(channelId, balanceRoot) - const stateRoot = adapter.getSignableStateRoot(Buffer.from(channel.id), balanceRoot).toString('hex') - return stateRoot -} - -function isValidRootHash(leaderRootHash, { channel, balances, adapter }) { - return getStateRootHash(channel, balances, adapter) === leaderRootHash -} -module.exports = { isValidTransition, isHealthy, isValidRootHash, getStateRootHash } +module.exports = { isValidTransition, isHealthy } diff --git a/services/validatorWorker/lib/index.js b/services/validatorWorker/lib/index.js new file mode 100644 index 00000000..fd162c65 --- /dev/null +++ b/services/validatorWorker/lib/index.js @@ -0,0 +1,28 @@ +const assert = require('assert') +const BN = require('bn.js') + +function getStateRootHash(channel, balances, adapter){ + // Note: MerkleTree takes care of deduplicating and sorting + const elems = Object.keys(balances).map( + acc => adapter.getBalanceLeaf(acc, balances[acc]) + ) + const tree = new adapter.MerkleTree(elems) + const balanceRoot = tree.getRoot() + // keccak256(channelId, balanceRoot) + const stateRoot = adapter.getSignableStateRoot(Buffer.from(channel.id), balanceRoot).toString('hex') + return stateRoot +} + +function isValidRootHash(leaderRootHash, { channel, balances, adapter }) { + return getStateRootHash(channel, balances, adapter) === leaderRootHash +} + +function toBNMap(raw) { + assert.ok(raw && typeof(raw) === 'object', 'raw map is a valid object') + const balances = {} + Object.entries(raw).forEach(([acc, bal]) => balances[acc] = new BN(bal, 10)) + return balances +} + + +module.exports = { getStateRootHash, isValidRootHash, toBNMap } \ No newline at end of file diff --git a/test/index.js b/test/index.js index a5665231..a764f409 100755 --- a/test/index.js +++ b/test/index.js @@ -3,7 +3,8 @@ const tape = require('tape') const BN = require('bn.js') const { isValidTransition, isHealthy } = require('../services/validatorWorker/lib/followerRules') - +const { getStateRootHash } = require('../services/validatorWorker/lib') +const dummyAdapter = require('../adapters/dummy') const channel = { depositAmount: new BN(100) } tape('isValidTransition: empty to empty', function(t) { @@ -80,5 +81,37 @@ tape('isHealthy: they have the same sum, but different entities are earning', fu t.end() }) +// +// State Root Hash +// +tape('getStateRootHash: returns correct result', function(t) { + [ + { + channel: { + id: "testing" + }, + balances: { + "publisher": 1, + "tester": 2 + }, + expectedHash: "da9b42bb60da9622404cade0aec4cda0a10104c6ec5f07ad67de081abb58c803" + }, + { + channel: { + id: "fake" + }, + balances: { + "publisher": 0, + }, + expectedHash: "0b64767e909e9f36ab9574e6b93921390c40a0d899c3587db3b2df077b8e87d7" + } + ].forEach(({ expectedHash, channel, balances }) => { + const actualHash = getStateRootHash(channel, balances, dummyAdapter) + t.equal(actualHash, expectedHash, "correct root hash") + }); + + t.end() +}) + // @TODO: event aggregator // @TODO: producer, possibly leader/follower; mergePayableIntoBalances diff --git a/test/integration.js b/test/integration.js index ddd9801b..6086a4d0 100755 --- a/test/integration.js +++ b/test/integration.js @@ -2,6 +2,8 @@ const tape = require('tape') const fetch = require('node-fetch') const { Channel, MerkleTree } = require('adex-protocol-eth/js') +const { getStateRootHash } = require('../services/validatorWorker/lib') +const dummyAdapter = require('../adapters/dummy') const cfg = require('../cfg') const dummyVals = require('./prep-db/mongo') @@ -98,17 +100,24 @@ tape('submit events and ensure they are accounted for', function(t) { // the NewState was generated, sent to the follower, // who generated ApproveState and sent back to the leader // first wait though, as we need the follower to discover they have an event to approve - return wait(waitAggrTime).then(function() { - return fetch(`${leaderUrl}/channel/${dummyVals.channel.id}/validator-messages`) + return wait(waitTime).then(function() { + return fetch(`${leaderUrl}/channel/${dummyVals.channel.id}/validator-messages/${dummyVals.ids.leader}/NewState?limit=1`) .then(res => res.json()) + }).then(function(resp){ + return fetch(`${leaderUrl}/channel/${dummyVals.channel.id}/validator-messages/${dummyVals.ids.follower}/ApproveState?limit=1`) + .then(res => res.json()) + .then(res => { + resp.validatorMessages = resp.validatorMessages.concat(res.validatorMessages) + return resp + }) }) }) .then(function(resp) { const msgs = resp.validatorMessages t.ok(Array.isArray(msgs), 'has validatorMessages') - // ensure NewState is in order const lastNew = msgs.find(x => x.msg.type === 'NewState') + t.ok(lastNew, 'has NewState') t.equal(lastNew.from, channel.validators[0], 'NewState: is by the leader') t.equal(lastNew.msg.balances[defaultPubName], expectedBal, 'NewState: balances is right') @@ -189,6 +198,12 @@ tape('health works correctly', function(t) { // @TODO: Should we assert balances numbers? t.equal(lastApprove.msg.isHealthy, false, 'channel is registered as unhealthy') + // should propagate heartbeat notification + const health = resp.validatorMessages.find(x => x.msg.type === 'HeartBeat') + t.ok(health, 'should propagate heartbeat notification') + t.ok(health.msg.signature, 'heartbeat notification has signature') + t.ok(health.msg.timestamp, 'heartbeat notification has timestamp') + // send events to the leader so it catches up return postEvents(leaderUrl, dummyVals.channel.id, genImpressions(diff)) }) @@ -205,6 +220,34 @@ tape('health works correctly', function(t) { .catch(err => t.fail(err)) }) +tape('heartbeat works correctly', function(t){ + Promise.resolve() + .then(() => wait(waitTime)) // wait till a new state is schedule to be produced + .then(function() { + [ + `${followerUrl}/channel/${dummyVals.channel.id}/validator-messages/${dummyVals.ids.follower}/HeartBeat?limit=1`, + `${followerUrl}/channel/${dummyVals.channel.id}/validator-messages/${dummyVals.ids.leader}/HeartBeat?limit=1`, + `${leaderUrl}/channel/${dummyVals.channel.id}/validator-messages/${dummyVals.ids.leader}/HeartBeat?limit=1`, + `${leaderUrl}/channel/${dummyVals.channel.id}/validator-messages/${dummyVals.ids.follower}/HeartBeat?limit=1` + ].forEach((url)=> { + fetch(url) + .then(res => res.json()) + .then(function(resp){ + const health = resp.validatorMessages.find(x => x.msg.type === 'HeartBeat') + t.ok(health, 'should propagate heartbeat notification') + t.ok(health.msg.signature, 'heartbeat notification has signature') + t.ok(health.msg.timestamp, 'heartbeat notification has timestamp') + t.ok(health.msg.stateRoot, 'heartbeat notification has stateRoot') + }) + }) + + return fetch(`${followerUrl}/channel/${dummyVals.channel.id}/validator-messages/${dummyVals.ids.follower}/HeartBeat?limit=1`) + .then(res => res.json()) + }) + .then(() => t.end()) + .catch(err => t.fail(err)) +}) + tape('POST /channel/{id}/{events,validator-messages}: wrong authentication', function(t) { Promise.all( ['events', 'validator-messages'].map(path => @@ -226,23 +269,35 @@ tape('POST /channel/{id}/{events,validator-messages}: wrong authentication', fun }) tape('POST /channel/{id}/{validator-messages}: wrong signature', function(t) { - const stateRoot = "6def5a300acb6fcaa0dab3a41e9d6457b5147a641e641380f8cc4bf5308b16fe" + let stateRoot = "" - fetch(`${followerUrl}/channel/${dummyVals.channel.id}/validator-messages`, { - method: 'POST', - headers: { - 'authorization': `Bearer ${dummyVals.auth.leader}`, - 'content-type': 'application/json', - }, - body: JSON.stringify({ - "messages": [{ - "type": 'NewState', - stateRoot, - "balances": { "myAwesomePublisher" : "9" }, - "lastEvAggr": "2019-01-23T09:08:29.959Z", - "signature": "Dummy adapter for 6def5a300acb6fcaa0dab3a41e9d6457b5147a641e641380f8cc4bf5308b16fe by awesomeLeader1" - }] - }), + fetch(`${followerUrl}/channel/${dummyVals.channel.id}/validator-messages/${dummyVals.ids.leader}/NewState?limit=1`) + .then(res => res.json()) + .then(function(res){ + const { balances } = res.validatorMessages[0].msg + + let incBalances = {} + // increase the state tree balance by 1 + Object.keys(balances).forEach((item) => (incBalances[item] = `${parseInt(balances[item])+1}`)) + + stateRoot = getStateRootHash({"id": dummyVals.channel.id}, incBalances, dummyAdapter) + + return fetch(`${followerUrl}/channel/${dummyVals.channel.id}/validator-messages`, { + method: 'POST', + headers: { + 'authorization': `Bearer ${dummyVals.auth.leader}`, + 'content-type': 'application/json', + }, + body: JSON.stringify({ + "messages": [{ + "type": 'NewState', + stateRoot, + balances, + "lastEvAggr": "2019-01-23T09:09:29.959Z", + "signature": getDummySig(stateRoot, "awesomeLeader1") + }] + }), + }) }) .then(() => wait(waitTime)) .then(function() { @@ -257,25 +312,34 @@ tape('POST /channel/{id}/{validator-messages}: wrong signature', function(t) { .catch(err => t.fail(err)) }) -tape('POST /channel/{id}/{validator-messages}: wrong (deceptive) root hash', function(t) { - const stateRoot = '6def5a300acb6fcaa0dab3a41e9d6457b5147a641e641380f8cc4bf5308b16f1' +tape('POST /channel/{id}/{validator-messages}: wrong (deceptive) root hash', function(t) { + let deceptiveRootHash = "" - fetch(`${followerUrl}/channel/${dummyVals.channel.id}/validator-messages`, { - method: 'POST', - headers: { - 'authorization': `Bearer ${dummyVals.auth.leader}`, - 'content-type': 'application/json', - }, - body: JSON.stringify({ - "messages": [{ - "type": 'NewState', - stateRoot, - // the real tree is 11, 2 - "balances": { "myAwesomePublisher" : "12", "anotherPublisher": "3" }, - "lastEvAggr": "2019-01-23T09:10:29.959Z", - "signature": `Dummy adapter for ${stateRoot} by awesomeLeader` - }] - }), + fetch(`${followerUrl}/channel/${dummyVals.channel.id}/validator-messages/${dummyVals.ids.leader}/NewState?limit=1`) + .then(res => res.json()) + .then(function(res) { + + const { balances } = res.validatorMessages[0].msg + const fakeBalances = { "publisher": "3" } + + deceptiveRootHash = getStateRootHash(dummyVals.channel, fakeBalances, dummyAdapter) + + return fetch(`${followerUrl}/channel/${dummyVals.channel.id}/validator-messages`, { + method: 'POST', + headers: { + 'authorization': `Bearer ${dummyVals.auth.leader}`, + 'content-type': 'application/json', + }, + body: JSON.stringify({ + "messages": [{ + "type": 'NewState', + "stateRoot": deceptiveRootHash, + balances, + "lastEvAggr": "2019-01-23T09:10:29.959Z", + "signature": `Dummy adapter for ${deceptiveRootHash} by awesomeLeader` + }] + }), + }) }) .then(() => wait(waitTime)) .then(function() { @@ -283,7 +347,7 @@ tape('POST /channel/{id}/{validator-messages}: wrong (deceptive) root hash', fun .then(res => res.json()) }) .then(function(resp) { - const lastApprove = resp.validatorMessages.find(x => x.msg.stateRoot === stateRoot) + const lastApprove = resp.validatorMessages.find(x => x.msg.stateRoot === deceptiveRootHash) t.equal(lastApprove, undefined, 'follower should not sign state with wrong root hash') t.end() })