Skip to content

Commit

Permalink
feat: implement the Cosmos block manager
Browse files Browse the repository at this point in the history
Eliminate some technical debt FTW!
  • Loading branch information
michaelfig committed Mar 8, 2020
1 parent 2e8d22d commit 3a5936a
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 96 deletions.
6 changes: 5 additions & 1 deletion packages/cosmic-swingset/lib/ag-solo/web.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ import path from 'path';
import http from 'http';
import express from 'express';
import WebSocket from 'ws';
import morgan from 'morgan';
import fs from 'fs';

// We need to CommonJS require morgan or else it warns, until:
// https://github.com/expressjs/morgan/issues/190
// is fixed.
const morgan = require('morgan');

const points = new Map();
const broadcasts = new Map();

Expand Down
165 changes: 165 additions & 0 deletions packages/cosmic-swingset/lib/block-manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// TODO: Put this somewhere else.
function deepEquals(a, b, already = new WeakSet()) {
if (Object.is(a, b)) {
return true;
}

// Must both be objects.
if (Object(a) !== a || Object(b) !== b) {
return false;
}

// That we haven't seen before.
if (already.has(a) || already.has(b)) {
return false;
}
already.add(a);
already.add(b);

// With the same prototype.
if (Object.getPrototypeOf(a) !== Object.getPrototypeOf(b)) {
return false;
}

// And deepEquals entries.
const amap = new Map(Object.entries(a));
for (const [key, bval] of Object.entries(b)) {
if (!amap.has(key)) {
return false;
}
if (!deepEquals(amap.get(key), bval, already)) {
return false;
}
amap.delete(key);
}

// And no extra keys in b.
if (amap.size > 0) {
return false;
}
return true;
}

const BEGIN_BLOCK = 'BEGIN_BLOCK';
const DELIVER_INBOUND = 'DELIVER_INBOUND';
const END_BLOCK = 'END_BLOCK';

export default function makeBlockManager({
deliverInbound,
beginBlock,
saveChainState,
saveOutsideState,
savedActions,
savedHeight,
}) {
let runTime = 0;
async function kernelPerformAction(action) {
const start = Date.now();
const finish = _ => (runTime += Date.now() - start);

let p;
switch (action.type) {
case BEGIN_BLOCK:
p = beginBlock(action.blockHeight, action.blockTime);
break;

case DELIVER_INBOUND:
p = deliverInbound(
action.peer,
action.messages,
action.ack,
action.blockHeight,
action.blockTime,
);
break;

case END_BLOCK:
return true;

default:
throw new Error(`${action.type} not recognized`);
}
p.then(finish, finish);
return p;
}

let currentActions;
let currentIndex;
let replaying;
let decohered;

async function blockManager(action) {
if (decohered) {
throw decohered;
}

if (action.type === BEGIN_BLOCK) {
// Start a new block, or possibly replay the prior one.
replaying = action.blockHeight === savedHeight;
currentIndex = 0;
currentActions = [];
runTime = 0;
} else {
// We're working on a subsequent actions.
currentIndex += 1;
}

currentActions.push(action);

if (!replaying) {
// Compute new state by running the kernel.
await kernelPerformAction(action);
} else if (!deepEquals(action, savedActions[currentIndex])) {
// Divergence of the inbound messages, so rewind the state if we need to.
console.log(action, 'and', savedActions[currentIndex], 'are not equal');
replaying = false;

// We only handle the trivial case.
const restoreHeight = action.blockHeight - 1;
if (restoreHeight !== savedHeight) {
// Keep throwing forever.
decohered = Error(
`Cannot reset state from ${savedHeight} to ${restoreHeight}; unimplemented`,
);
throw decohered;
}

// Replay the saved actions.
for (const a of currentActions) {
// eslint-disable-next-line no-await-in-loop
await kernelPerformAction(a);
}
}

if (action.type !== END_BLOCK) {
return;
}

// Commit all the keeper state, even on replay.
// This is necessary since the block proposer will be asked to validate
// the actions it just proposed (in Tendermint v0.33.0).
let start = Date.now();
const { mailboxSize } = saveChainState();
let now = Date.now();

const mbTime = now - start;
start = now;

// Advance our saved state variables.
savedActions = currentActions;
savedHeight = action.blockHeight;

if (!replaying) {
// Save the kernel's new state.
saveOutsideState(savedHeight, savedActions);
}
now = Date.now();
const saveTime = now - start;

console.log(
`wrote SwingSet checkpoint (mailbox=${mailboxSize}), [run=${runTime}ms, mb=${mbTime}ms, save=${saveTime}ms]`,
);
}

return blockManager;
}
63 changes: 11 additions & 52 deletions packages/cosmic-swingset/lib/chain-main.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import stringify from '@agoric/swingset-vat/src/kernel/json-stable-stringify';

import { launch } from './launch-chain';
import makeBlockManager from './block-manager';

const AG_COSMOS_INIT = 'AG_COSMOS_INIT';
const BEGIN_BLOCK = 'BEGIN_BLOCK';
const DELIVER_INBOUND = 'DELIVER_INBOUND';
const END_BLOCK = 'END_BLOCK';

export default async function main(progname, args, { path, env, agcc }) {
const bootAddress = env.BOOT_ADDRESS;
Expand Down Expand Up @@ -60,7 +58,7 @@ export default async function main(progname, args, { path, env, agcc }) {
const p = Promise.resolve(handler(action));
p.then(
res => replier.resolve(`${res}`),
rej => replier.reject(`rejection ${rej} ignored`),
rej => replier.reject(`rejection ignored: ${rej.stack || rej}`),
);
}

Expand All @@ -72,31 +70,11 @@ export default async function main(progname, args, { path, env, agcc }) {
setInterval(() => undefined, 30000);
agcc.runAgCosmosDaemon(nodePort, fromGo, [progname, ...args]);

let deliverInbound;
let deliverStartBlock;
let deliveryFunctionsInitialized = false;

// this storagePort changes for every single message. We define it out here
// so the 'externalStorage' object can close over the single mutable
// instance, and we update the 'sPort' value each time toSwingSet is called
let sPort;

function toSwingSet(action, replier) {
// console.log(`toSwingSet`, action, replier);
// eslint-disable-next-line no-use-before-define
return blockManager(action, replier).then(
ret => {
// console.log(`blockManager returning:`, ret);
return ret;
},
err => {
console.log('blockManager threw error:', err);
throw err;
},
);
}

async function launchAndInitializeDeliverInbound() {
async function launchAndInitializeSwingSet() {
// this object is used to store the mailbox state. we only ever use
// key='mailbox'
const mailboxStorage = {
Expand Down Expand Up @@ -143,7 +121,9 @@ export default async function main(progname, args, { path, env, agcc }) {
return s;
}

async function blockManager(action, _replier) {
let blockManager;
async function toSwingSet(action, _replier) {
// console.log(`toSwingSet`, action, replier);
if (action.type === AG_COSMOS_INIT) {
return true;
}
Expand All @@ -152,34 +132,13 @@ export default async function main(progname, args, { path, env, agcc }) {
// Initialize the storage for this particular transaction.
// console.log(` setting sPort to`, action.storagePort);
sPort = action.storagePort;
}

// launch the swingset once
if (!deliveryFunctionsInitialized) {
const deliveryFunctions = await launchAndInitializeDeliverInbound();
deliverInbound = deliveryFunctions.deliverInbound;
deliverStartBlock = deliveryFunctions.deliverStartBlock;
deliveryFunctionsInitialized = true;
if (!blockManager) {
const fns = await launchAndInitializeSwingSet();
blockManager = makeBlockManager(fns);
}
}

switch (action.type) {
case BEGIN_BLOCK:
return deliverStartBlock(action.blockHeight, action.blockTime);
case DELIVER_INBOUND:
return deliverInbound(
action.peer,
action.messages,
action.ack,
action.blockHeight,
action.blockTime,
);
case END_BLOCK:
return true;

default:
throw new Error(
`${action.type} not recognized. must be BEGIN_BLOCK, DELIVER_INBOUND, or END_BLOCK`,
);
}
return blockManager(action);
}
}
66 changes: 28 additions & 38 deletions packages/cosmic-swingset/lib/launch-chain.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import fs from 'fs';

import djson from 'deterministic-json';
import readlines from 'n-readlines';
import {
buildMailbox,
buildMailboxStateMap,
Expand All @@ -13,6 +12,8 @@ import {
} from '@agoric/swingset-vat';
import { openSwingStore } from '@agoric/swing-store-simple';

const SWING_STORE_META_KEY = 'cosmos/meta';

async function buildSwingset(withSES, mailboxState, storage, vatsDir, argv) {
const config = {};
const mbs = buildMailboxStateMap();
Expand Down Expand Up @@ -69,17 +70,12 @@ export async function launch(kernelStateDBDir, mailboxStorage, vatsDir, argv) {
argv,
);

let mailboxLastData = djson.stringify(mbs.exportToData());
function saveState(runTime = undefined) {
let start = Date.now();

function saveChainState() {
// now check mbs
const newState = mbs.exportToData();
const newData = djson.stringify(newState);
if (newData !== mailboxLastData) {
console.log(`outbox changed`);
}

// Save the mailbox state.
for (const peer of Object.getOwnPropertyNames(newState)) {
const data = {
outbox: newState[peer].outbox,
Expand All @@ -88,51 +84,45 @@ export async function launch(kernelStateDBDir, mailboxStorage, vatsDir, argv) {
mailboxStorage.set(`mailbox.${peer}`, djson.stringify(data));
}
mailboxStorage.set('mailbox', newData);
mailboxLastData = newData;

const mbTime = Date.now() - start;

// Save the rest of the kernel state.
start = Date.now();
commit();
const saveTime = Date.now() - start;

const mailboxSize = mailboxLastData.length;
const runTimeStr = runTime === undefined ? '' : `run=${runTime}ms, `;
console.log(
`wrote SwingSet checkpoint (mailbox=${mailboxSize}), [${runTimeStr}mb=${mbTime}ms, save=${saveTime}ms]`,
);
return { mailboxSize: newData.length };
}

// save the initial state immediately
saveState();

// then arrange for inbound messages to be processed, after which we save
async function turnCrank() {
const start = Date.now();
await controller.run();
const runTime = Date.now() - start;
// Have to save state every time.
saveState(runTime);
function saveOutsideState(savedHeight, savedActions) {
storage.set(
SWING_STORE_META_KEY,
JSON.stringify([savedHeight, savedActions]),
);
commit();
}

async function deliverInbound(sender, messages, ack) {
if (!(messages instanceof Array)) {
throw new Error(`inbound given non-Array: ${messages}`);
}
if (mb.deliverInbound(sender, messages, ack)) {
console.log(`mboxDeliver: ADDED messages`);
if (!mb.deliverInbound(sender, messages, ack)) {
return;
}
await turnCrank();
console.log(`mboxDeliver: ADDED messages`);
await controller.run();
}

async function deliverStartBlock(blockHeight, blockTime) {
async function beginBlock(blockHeight, blockTime) {
const addedToQueue = timer.poll(blockTime);
console.log(
`polled; blockTime:${blockTime}, h:${blockHeight} ADDED: ${addedToQueue}`,
);
await turnCrank();
await controller.run();
}

return { deliverInbound, deliverStartBlock };
const [savedHeight, savedActions] = JSON.parse(
storage.get(SWING_STORE_META_KEY) || '[0, []]',
);
return {
deliverInbound,
beginBlock,
saveChainState,
saveOutsideState,
savedHeight,
savedActions,
};
}
Loading

0 comments on commit 3a5936a

Please sign in to comment.