Skip to content

Commit

Permalink
feat(pegasus): properly abort on connection close
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Feb 4, 2022
1 parent e7ea320 commit 1b17f7a
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 70 deletions.
73 changes: 63 additions & 10 deletions packages/pegasus/src/pegasus.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ const TRANSFER_PROPOSAL_SHAPE = {
const makePegasus = (zcf, board, namesByAddress) => {
/**
* @typedef {Object} LocalDenomState
* @property {Address} localAddr
* @property {Address} remoteAddr
* @property {Store<Denom, PromiseRecord<Courier>>} remoteDenomToCourierPK
* @property {IterationObserver<Denom>} remoteDenomPublication
* @property {Subscription<Denom>} remoteDenomSubscription
* @property {number} lastDenomNonce
* @property {(reason?: any) => void} abort
*/

let lastLocalIssuerNonce = 0;
Expand Down Expand Up @@ -97,9 +100,15 @@ const makePegasus = (zcf, board, namesByAddress) => {
localDenomState,
transferProtocol,
}) => {
let checkAbort = () => {};

/** @type {Set<Peg>} */
const pegs = new Set();

/** @type {PegasusConnectionActions} */
const pegasusConnectionActions = {
async rejectStuckTransfers(remoteDenom) {
checkAbort();
const { remoteDenomToCourierPK } = localDenomState;

const { reject, promise } = remoteDenomToCourierPK.get(remoteDenom);
Expand All @@ -115,6 +124,7 @@ const makePegasus = (zcf, board, namesByAddress) => {
assetKind = undefined,
displayInfo = undefined,
) {
checkAbort();
const { remoteDenomToCourierPK } = localDenomState;

// Create the issuer for the local erights corresponding to the remote values.
Expand All @@ -124,6 +134,7 @@ const makePegasus = (zcf, board, namesByAddress) => {
assetKind,
displayInfo,
);
checkAbort();
const { brand: localBrand } = zcfMint.getIssuerRecord();

// Describe how to retain/redeem pegged shadow erights.
Expand All @@ -144,14 +155,19 @@ const makePegasus = (zcf, board, namesByAddress) => {
const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK);
courierPK.resolve(courier);

return makePeg(localDenomState, {
checkAbort();
const peg = makePeg(localDenomState, {
localBrand,
remoteDenom,
allegedName,
});
pegs.add(peg);
return peg;
},

async pegLocal(allegedName, localIssuer) {
checkAbort();

// We need the last nonce for our denom name.
localDenomState.lastDenomNonce += 1;
const remoteDenom = `pegasus${localDenomState.lastDenomNonce}`;
Expand All @@ -165,6 +181,7 @@ const makePegasus = (zcf, board, namesByAddress) => {
localIssuer,
localKeyword,
);
checkAbort();

/**
* Transfer amount (of localBrand) from loser to winner seats.
Expand Down Expand Up @@ -219,11 +236,22 @@ const makePegasus = (zcf, board, namesByAddress) => {
const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK);
courierPK.resolve(courier);

return makePeg(localDenomState, {
const peg = makePeg(localDenomState, {
localBrand,
remoteDenom,
allegedName,
});
pegs.add(peg);
return peg;
},
abort: reason => {
checkAbort();
checkAbort = () => {
throw reason;
};
pegs.forEach(peg => {
pegToDenomState.delete(peg);
});
},
};
return Far('pegasusConnectionActions', pegasusConnectionActions);
Expand All @@ -244,7 +272,7 @@ const makePegasus = (zcf, board, namesByAddress) => {
const connectionToLocalDenomState = makeLegacyWeakMap('Connection');

/**
* @type {SubscriptionRecord<PegasusConnectionSubscription>}
* @type {SubscriptionRecord<PegasusConnectionState>}
*/
const {
subscription: connectionSubscription,
Expand All @@ -263,10 +291,16 @@ const makePegasus = (zcf, board, namesByAddress) => {

/** @type {LocalDenomState} */
const localDenomState = {
localAddr,
remoteAddr,
remoteDenomToCourierPK,
lastDenomNonce: 0,
remoteDenomPublication,
remoteDenomSubscription,
abort: reason => {
// eslint-disable-next-line no-use-before-define
actions.abort(reason);
},
};

// The courier is the only thing that we use to send messages to C.
Expand All @@ -279,14 +313,14 @@ const makePegasus = (zcf, board, namesByAddress) => {

connectionToLocalDenomState.init(c, localDenomState);

/** @type {PegasusConnectionSubscription} */
const subData = harden({
/** @type {PegasusConnectionState} */
const state = harden({
localAddr,
remoteAddr,
actions,
remoteDenomSubscription,
});
connectionPublication.updateState(subData);
connectionPublication.updateState(state);
},
async onReceive(c, packetBytes) {
const doReceive = async () => {
Expand Down Expand Up @@ -320,11 +354,30 @@ const makePegasus = (zcf, board, namesByAddress) => {
},
async onClose(c) {
// Unregister C. Pending transfers will be rejected by the Network API.
const { remoteDenomPublication } = connectionToLocalDenomState.get(c);
const {
remoteDenomPublication,
remoteDenomToCourierPK,
localAddr,
remoteAddr,
abort,
} = connectionToLocalDenomState.get(c);
connectionToLocalDenomState.delete(c);
remoteDenomPublication.fail(
assert.error(X`pegasusConnectionHandler closed`),
);
const err = assert.error(X`pegasusConnectionHandler closed`);
remoteDenomPublication.fail(err);
/** @type {PegasusConnectionState} */
const state = harden({
localAddr,
remoteAddr,
});
connectionPublication.updateState(state);
remoteDenomToCourierPK.values().forEach(courierPK => {
try {
courierPK.reject(err);
} catch (e) {
// Already resolved/rejected, so ignore.
}
});
abort(err);
},
};

Expand Down
9 changes: 5 additions & 4 deletions packages/pegasus/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,19 @@
* @property {PegLocal} pegLocal
* @property {PegRemote} pegRemote
* @property {RejectStuckTransfers} rejectStuckTransfers
* @property {(reason?: any) => void} abort
*/

/**
* @typedef {Object} PegasusConnectionSubscription
* @property {PegasusConnectionActions} actions
* @typedef {Object} PegasusConnectionState
* @property {PegasusConnectionActions} [actions]
* @property {Address} localAddr
* @property {Address} remoteAddr
* @property {Subscription<Denom>} remoteDenomSubscription
* @property {Subscription<Denom>} [remoteDenomSubscription]
*/

/**
* @typedef {Object} PegasusConnectionKit
* @property {ConnectionHandler} handler
* @property {Subscription<PegasusConnectionSubscription>} subscription
* @property {Subscription<PegasusConnectionState>} subscription
*/
126 changes: 70 additions & 56 deletions packages/vats/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
} from '@agoric/zoe/src/contractSupport/index.js';
import { AmountMath } from '@agoric/ertp';
import { Nat } from '@agoric/nat';
import { observeIteration } from '@agoric/notifier';
import { makeBridgeManager } from './bridge.js';
import { makeNameHubKit } from './nameHub.js';
import {
Expand Down Expand Up @@ -94,19 +95,26 @@ export function buildRootObject(vatPowers, vatParameters) {
E(vats.board).getBoard(),
chainTimerServiceP,
/** @type {Promise<{ zoeService: ZoeService, feeMintAccess:
* FeeMintAccess }>} */ (
E(vats.zoe).buildZoe(vatAdminSvc, feeIssuerConfig)
),
* FeeMintAccess }>} */ (E(vats.zoe).buildZoe(
vatAdminSvc,
feeIssuerConfig,
)),
E(vats.priceAuthority).makePriceAuthority(),
E(vats.walletManager).buildWalletManager(vatAdminSvc),
]);

const { nameHub: agoricNames, nameAdmin: agoricNamesAdmin } =
makeNameHubKit();
const { nameHub: namesByAddress, nameAdmin: namesByAddressAdmin } =
makeNameHubKit();
const { nameHub: pegasusConnections, nameAdmin: pegasusConnectionsAdmin } =
makeNameHubKit();
const {
nameHub: agoricNames,
nameAdmin: agoricNamesAdmin,
} = makeNameHubKit();
const {
nameHub: namesByAddress,
nameAdmin: namesByAddressAdmin,
} = makeNameHubKit();
const {
nameHub: pegasusConnections,
nameAdmin: pegasusConnectionsAdmin,
} = makeNameHubKit();

async function installEconomy(bootstrapPaymentValue) {
// Create a mapping from all the nameHubs we create to their corresponding
Expand Down Expand Up @@ -222,16 +230,23 @@ export function buildRootObject(vatPowers, vatParameters) {
const bootstrapPaymentValue = bankBootstrapSupply + ammDepositValue;
// NOTE: no use of the voteCreator. We'll need it to initiate votes on
// changing VaultFactory parameters.
const { vaultFactoryCreator, _voteCreator, ammFacets } =
await installEconomy(bootstrapPaymentValue);

const [centralIssuer, centralBrand, ammInstance, pegasusInstance] =
await Promise.all([
E(agoricNames).lookup('issuer', CENTRAL_ISSUER_NAME),
E(agoricNames).lookup('brand', CENTRAL_ISSUER_NAME),
E(agoricNames).lookup('instance', 'amm'),
E(agoricNames).lookup('instance', 'Pegasus'),
]);
const {
vaultFactoryCreator,
_voteCreator,
ammFacets,
} = await installEconomy(bootstrapPaymentValue);

const [
centralIssuer,
centralBrand,
ammInstance,
pegasusInstance,
] = await Promise.all([
E(agoricNames).lookup('issuer', CENTRAL_ISSUER_NAME),
E(agoricNames).lookup('brand', CENTRAL_ISSUER_NAME),
E(agoricNames).lookup('instance', 'amm'),
E(agoricNames).lookup('instance', 'Pegasus'),
]);

// Start the reward distributor.
const epochTimerService = chainTimerService;
Expand Down Expand Up @@ -579,11 +594,10 @@ export function buildRootObject(vatPowers, vatParameters) {
async createUserBundle(_nickname, address, powerFlags = []) {
// Bind to some fresh ports (unspecified name) on the IBC implementation
// and provide them for the user to have.
const ibcport = [];
const ibcportP = [];
for (let i = 0; i < NUM_IBC_PORTS; i += 1) {
// eslint-disable-next-line no-await-in-loop
const port = await E(network).bind('/ibc-port/');
ibcport.push(port);
const port = E(network).bind('/ibc-port/');
ibcportP.push(port);
}

/** @type {{ issuerName: string, purseName: string, issuer: Issuer,
Expand Down Expand Up @@ -656,14 +670,23 @@ export function buildRootObject(vatPowers, vatParameters) {
}),
);

const bank = await E(bankManager).getBankForAddress(address);
const [bank, ibcport] = await Promise.all([
E(bankManager).getBankForAddress(address),
Promise.all(ibcportP),
]);

// Separate out the purse-creating payments from the bank payments.
const faucetPaymentInfo = [];
await Promise.all(
userPaymentRecords.map(async precord => {
const { payToBank, issuer, issuerName, payment, brand, purseName } =
precord;
const {
payToBank,
issuer,
issuerName,
payment,
brand,
purseName,
} = precord;
if (!payToBank) {
// Just a faucet payment to be claimed by a wallet.
faucetPaymentInfo.push({
Expand Down Expand Up @@ -696,8 +719,10 @@ export function buildRootObject(vatPowers, vatParameters) {
});

// Create a name hub for this address.
const { nameHub: myAddressNameHub, nameAdmin: rawMyAddressNameAdmin } =
makeNameHubKit();
const {
nameHub: myAddressNameHub,
nameAdmin: rawMyAddressNameAdmin,
} = makeNameHubKit();
// Register it with the namesByAddress hub.
namesByAddressAdmin.update(address, myAddressNameHub);

Expand Down Expand Up @@ -826,37 +851,26 @@ export function buildRootObject(vatPowers, vatParameters) {
if (pegasus) {
// Add the Pegasus transfer port.
const port = await E(network).bind('/ibc-port/pegasus');

const { handler, subscription } = await E(
pegasus,
).makePegasusConnectionKit();
observeIteration(subscription, {
updateState(connectionState) {
const { localAddr, actions } = connectionState;
if (actions) {
// We're open and ready for business.
pegasusConnectionsAdmin.update(localAddr, connectionState);
} else {
// We're closed.
pegasusConnectionsAdmin.delete(localAddr);
}
},
});
E(port).addListener(
Far('listener', {
async onAccept(_port, _localAddr, _remoteAddr, _listenHandler) {
const chandlerP = E(pegasus).makePegConnectionHandler();
const proxyMethod =
name =>
(...args) =>
E(chandlerP)[name](...args);
const onOpen = proxyMethod('onOpen');
const onClose = proxyMethod('onClose');

let localAddr;
return Far('pegasusConnectionHandler', {
onOpen(c, actualLocalAddr, ...args) {
localAddr = actualLocalAddr;
if (pegasusConnectionsAdmin) {
pegasusConnectionsAdmin.update(localAddr, c);
}
return onOpen(c, ...args);
},
onReceive: proxyMethod('onReceive'),
onClose(c, ...args) {
try {
return onClose(c, ...args);
} finally {
if (pegasusConnectionsAdmin) {
pegasusConnectionsAdmin.delete(localAddr, c);
}
}
},
});
return handler;
},
async onListen(p, _listenHandler) {
console.debug(`Listening on Pegasus transfer port: ${p}`);
Expand Down

0 comments on commit 1b17f7a

Please sign in to comment.