Skip to content

Commit

Permalink
fix: message batches reduce wallet setup from 80 to 20 chain trips
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Dec 11, 2020
1 parent 12ce7c4 commit 7d17f2f
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 43 deletions.
35 changes: 35 additions & 0 deletions packages/cosmic-swingset/lib/ag-solo/batched-deliver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
const DEFAULT_BATCH_TIMEOUT_MS = 1000;

export function makeBatchedDeliver(
deliver,
batchTimeoutMs = DEFAULT_BATCH_TIMEOUT_MS,
) {
let batchedMessages = [];
let latestAckNum = 0;
let deliverTimeout;

async function batchedDeliver(newMessages, ackNum) {
// If we have no existing messages, reset the deliver timeout.
//
// This defers sending an ack until the timeout expires or we have new
// messages.
if (!batchedMessages.length) {
clearTimeout(deliverTimeout);
deliverTimeout = setTimeout(() => {
// Transfer the batched messages to the deliver function.
const msgs = batchedMessages;
batchedMessages = [];
deliver(msgs, latestAckNum);
}, batchTimeoutMs);
}

// Add new messages to the batch.
batchedMessages.push(...newMessages);
if (ackNum > latestAckNum) {
// Increase the latest ack.
latestAckNum = ackNum;
}
}

return batchedDeliver;
}
13 changes: 11 additions & 2 deletions packages/cosmic-swingset/lib/ag-solo/chain-cosmos-sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import anylogger from 'anylogger';
import { makeNotifierKit } from '@agoric/notifier';
import { makePromiseKit } from '@agoric/promise-kit';

import { makeBatchedDeliver } from './batched-deliver';

const log = anylogger('chain-cosmos-sdk');

const HELPER = 'ag-cosmos-helper';
Expand Down Expand Up @@ -412,10 +414,17 @@ ${chainID} chain does not yet know of address ${myAddr}${adviseEgress(myAddr)}
// Begin the block consumer.
recurseEachNewBlock();

let totalDeliveries = 0;
async function deliver(newMessages, acknum) {
let tmpInfo;
try {
log(`delivering to chain`, GCI, newMessages, acknum);
totalDeliveries += 1;
log(
`delivering to chain (trips=${totalDeliveries})`,
GCI,
newMessages,
acknum,
);

// Peer and submitter are combined in the message format (i.e. we removed
// the extra 'myAddr' after 'tx swingset deliver'). All messages from
Expand Down Expand Up @@ -504,5 +513,5 @@ ${chainID} chain does not yet know of address ${myAddr}${adviseEgress(myAddr)}

// Now that we've started consuming blocks, tell our caller how to deliver
// messages.
return deliver;
return makeBatchedDeliver(deliver);
}
9 changes: 7 additions & 2 deletions packages/cosmic-swingset/lib/ag-solo/fake-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import anylogger from 'anylogger';
import { launch } from '../launch-chain';
import makeBlockManager from '../block-manager';
import { makeWithQueue } from './vats/queue';
import { makeBatchedDeliver } from './batched-deliver';

const log = anylogger('fake-chain');

const PRETEND_BLOCK_DELAY = 2;
const PRETEND_BLOCK_DELAY = 5;
const scaleBlockTime = ms => Math.floor(ms / 1000);

async function makeMapStorage(file) {
Expand Down Expand Up @@ -139,7 +140,11 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) {
inbound(GCI, outbox, ack);
});

let totalDeliveries = 0;
async function deliver(newMessages, acknum) {
totalDeliveries += 1;
console.log(`delivering to ${GCI} (trips=${totalDeliveries})`);

intoChain.push([newMessages, acknum]);
if (!delay) {
clearTimeout(nextBlockTimeout);
Expand All @@ -152,5 +157,5 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) {

// Start the first pretend block.
nextBlockTimeout = setTimeout(simulateBlock, maximumDelay);
return deliver;
return makeBatchedDeliver(deliver);
}
7 changes: 2 additions & 5 deletions packages/cosmic-swingset/lib/ag-solo/outbound.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@ export function deliver(mbs) {
// console.debug(` ${newMessages.length} new messages`);
const acknum = data[target].inboundAck;
if (newMessages.length || acknum !== t.highestAck) {
if (newMessages.length) {
t.trips += 1;
}
log(
`invoking deliverator; ${newMessages.length} new messages for ${target} (trips=${t.trips})`,
`invoking deliverator; ${newMessages.length} new messages for ${target}`,
);
t.deliverator(newMessages, acknum);
if (newMessages.length) {
Expand All @@ -57,6 +54,6 @@ export function addDeliveryTarget(target, deliverator) {
throw new Error(`target ${target} already added`);
}
/** @type {TargetRecord} */
const targetRecord = { deliverator, highestSent: 0, highestAck: 0, trips: 0 };
const targetRecord = { deliverator, highestSent: 0, highestAck: 0 };
knownTargets.set(target, targetRecord);
}
15 changes: 5 additions & 10 deletions packages/cosmic-swingset/lib/ag-solo/start.js
Original file line number Diff line number Diff line change
Expand Up @@ -398,20 +398,15 @@ export default async function start(basedir, argv) {
}

// Launch the agoric wallet deploys (if any).
exec(
const cp = exec(
`${agoricCli} deploy${verbosity} --provide=wallet --hostport=${hostport} ${agWalletDeploy}`,
(err, stdout, stderr) => {
err => {
if (err) {
console.error(err);
return;
}
if (stderr) {
// Report the error.
process.stderr.write(stderr);
}
if (stdout) {
process.stdout.write(stdout);
}
},
);

cp.stderr.pipe(process.stderr);
cp.stdout.pipe(process.stdout);
}
30 changes: 6 additions & 24 deletions packages/dapp-svelte-wallet/api/deploy.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,17 @@ export default async function deployWallet(
);
};

const importedPaymentInfo = await walletToPaymentInfo(oldWallet);

// Get the payments that were given to us by the chain.
const tapPaymentInfo = await E(faucet).tapFaucet();
const [importedPaymentInfo, tapPaymentInfo] = await Promise.all([
walletToPaymentInfo(oldWallet),
E(faucet).tapFaucet(),
]);
const paymentInfo = [...importedPaymentInfo, ...tapPaymentInfo];

// Claim the payments.
const issuerToPetname = new Map();
const issuerToPursePetnameP = new Map();
const wallet = await E(walletVat).getWallet();
const wallet = E(walletVat).getWallet();
const walletAdmin = E(wallet).getAdminFacet();
await Promise.all(
paymentInfo.map(async ({ issuerPetname, issuer }) => {
Expand All @@ -81,32 +82,13 @@ export default async function deployWallet(
paymentInfo.map(async ({ pursePetname, issuer, payment, purse }) => {
const issuerPetname = issuerToPetname.get(issuer);

let paymentP;

if (!payment && purse) {
// Withdraw the payment from the purse.
paymentP = E(purse)
payment = E(purse)
.getCurrentAmount()
.then(amount => E(purse).withdraw(amount));
} else {
paymentP = E(issuer)
.isLive(payment)
.then(isLive => isLive && payment);
}

payment = await paymentP;
if (!payment) {
return;
}
const amount = await E(issuer).getAmountOf(payment);

// TODO: Use AmountMath.
const isEmpty =
amount.value === 0 ||
(Array.isArray(amount.value) && !amount.value.length);
if (isEmpty) {
return;
}
if (!issuerToPursePetnameP.has(issuer)) {
issuerToPursePetnameP.set(
issuer,
Expand Down

0 comments on commit 7d17f2f

Please sign in to comment.