Skip to content

Commit

Permalink
Merge pull request #1043 from Agoric/mfig/dibc-inbound
Browse files Browse the repository at this point in the history
dIBC inbound
  • Loading branch information
michaelfig authored May 2, 2020
2 parents 3424ca0 + a16a444 commit 28e6166
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 140 deletions.
10 changes: 4 additions & 6 deletions packages/SwingSet/docs/networking.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ Solo machines may be able to talk to chains and vice versa using specialized pro

== The agoric-sdk User Local Port ==

Each user of the Agoric testnet gets their own personal IBC listening port. You can access this `Port` object in `home.ibcport`, and you can learn its local address by calling `home.ibcport~.getLocalAddress()`, which will give you something like `/ibc-port/port8312`.
Each user of the Agoric testnet gets a few personal IBC listening ports. You can access these `Port` objects in the `home.ibcport` array, and you can learn their local address by calling something like `home.ibcport[0]~.getLocalAddress()`, which will give you a local address like `/ibc-port/portbvmnfb`.

This is currently the only way for user code to get an IBC `Port`, though non-IBC ports can be allocated using the local `home.network` object. This is an advanced use case, to be documented later.

`home.ibcport~.connect()`

== Connecting to a Remote Port ==

To establish a connection, you must start with a local `Port` object, and you must know the name of the remote endpoint. The remote endpoint will have a name like `/ibc-hop/$HOPNAME/ibc-port/$PORTNAME/ordered/$VERSION` (where `ibc-hop`, `ibc-port` and `ordered` are literal strings, spelled just like that, but `$HOPNAME`, `$PORTNAME`, and `$VERSION` are placeholders for arbitrary values that will vary from one endpoint to another).
Expand All @@ -30,7 +28,7 @@ You must also prepare a `ConnectionHandler` object to manage the connection you'
Then you will call the `connect()` method on your local `Port`. This will return a `Promise` that will fire with a new `Connection` object, on which you can send data. Your `ConnectionHandler` will be notified about the new channel, and will receive inbound data from the other side.

```js
home.ibcport~.connect(endpoint, connectionHandler)
home.ibcport[0]~.connect(endpoint, connectionHandler)
.then(conn => doSomethingWithConnection(conn));
```

Expand Down Expand Up @@ -58,7 +56,7 @@ You can ask the `Port` object this returns for its local address, which is espec
port~.getLocalAddress().then(localAddress => useIt(localAddress))
```

`home.ibcport~.addListener()`
`home.ibcport[0]~.addListener()`

Once the port is bound, you must call `addListener` to mark it as ready for inbound connections. You must provide this with a `ListenHandler` object, which has methods to react to listening events. As with `ConnectionHandler`, these methods are all optional.

Expand Down Expand Up @@ -140,4 +138,4 @@ port.revoke();

to completely deallocate the port, remove all listeners, close all pending connections, and release its address.

**CAUTION:** Be aware that if you call `home.ibcport~.revoke()`, it will be useless for new `.connect` or `.addListener` attempts. You will need to provision a new Agoric client via https://testnet.agoric.com/ to obtain a new setup with a functioning `home.ibcport`.
**CAUTION:** Be aware that if you call `home.ibcport[0]~.revoke()`, it will be useless for new `.connect` or `.addListener` attempts. You will need to provision a new Agoric client via https://testnet.agoric.com/ to obtain a new setup with a functioning `home.ibcport`.
69 changes: 52 additions & 17 deletions packages/SwingSet/src/vats/network/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,27 @@
import makeStore from '@agoric/store';
import rawHarden from '@agoric/harden';
import { E as defaultE } from '@agoric/eventual-send';
import { producePromise } from '@agoric/produce-promise';
import { toBytes } from './bytes';

const harden = /** @type {<T>(x: T) => T} */ (rawHarden);

/**
* Compatibility note: this must match what our peers use,
* so don't change it casually.
*/
export const ENDPOINT_SEPARATOR = '/';

/**
* @template T,U
* @typedef {import('@agoric/store').Store<T,U>} Store
*/

/**
* @template T,U
* @typedef {import('@agoric/produce-promise').PromiseRecord<T, U>} PromiseRecord
*/

/**
* @typedef {import('./bytes').Bytes} Bytes
* @typedef {import('./bytes').Data} Data
Expand All @@ -23,7 +35,7 @@ const harden = /** @type {<T>(x: T) => T} */ (rawHarden);

/**
* @typedef {Object} Protocol The network Protocol
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if ending in '/', a fresh name
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if ending in ENDPOINT_SEPARATOR, a fresh name
*/

/**
Expand All @@ -38,7 +50,7 @@ const harden = /** @type {<T>(x: T) => T} */ (rawHarden);
/**
* @typedef {Object} ListenHandler A handler for incoming connections
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onListen] The listener has been registered
* @property {(port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<ConnectionHandler>} [onAccept] A new connection is incoming
* @property {(port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<ConnectionHandler>} onAccept A new connection is incoming
* @property {(port: Port, rej: any, l: ListenHandler) => Promise<void>} [onError] There was an error while listening
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onRemove] The listener has been removed
*/
Expand Down Expand Up @@ -106,6 +118,10 @@ export const makeConnection = (
E = defaultE,
) => {
let closed;
/**
* @type {Set<PromiseRecord<Bytes,any>>}
*/
const pendingAcks = new Set();
/**
* @type {Connection}
*/
Expand All @@ -122,6 +138,10 @@ export const makeConnection = (
}
current.delete(connection);
closed = Error('Connection closed');
for (const ackDeferred of [...pendingAcks.values()]) {
pendingAcks.delete(ackDeferred);
ackDeferred.reject(closed);
}
await E(handler)
.onClose(connection, undefined, handler)
.catch(rethrowUnlessMissing);
Expand All @@ -132,10 +152,22 @@ export const makeConnection = (
throw closed;
}
const bytes = toBytes(data);
const ack = await E(handler)
const ackDeferred = producePromise();
pendingAcks.add(ackDeferred);
E(handler)
.onReceive(connection, bytes, handler)
.catch(err => rethrowUnlessMissing(err) || '');
return toBytes(ack);
.catch(err => rethrowUnlessMissing(err) || '')
.then(
ack => {
pendingAcks.delete(ackDeferred);
ackDeferred.resolve(toBytes(ack));
},
err => {
pendingAcks.delete(ackDeferred);
ackDeferred.reject(err);
},
);
return ackDeferred.promise;
},
});

Expand Down Expand Up @@ -230,18 +262,17 @@ export function crossoverConnection(
/**
* Get the list of prefixes from longest to shortest.
* @param {string} addr
* @param {string} [sep='/']
*/
export function getPrefixes(addr, sep = '/') {
const parts = addr.split(sep);
export function getPrefixes(addr) {
const parts = addr.split(ENDPOINT_SEPARATOR);

/**
* @type {string[]}
*/
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(sep);
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
ret.push(prefix);
}
return ret;
Expand Down Expand Up @@ -283,9 +314,7 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {

const lchandler =
/** @type {ConnectionHandler} */
(await E(listener)
.onAccept(port, localAddr, remoteAddr, listener)
.catch(rethrowUnlessMissing));
(await E(listener).onAccept(port, localAddr, remoteAddr, listener));

return crossoverConnection(
lchandler,
Expand All @@ -301,7 +330,7 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
/** @type {string} */
(await E(port).getLocalAddress());

const ret = getPrefixes(remoteAddr, '/');
const ret = getPrefixes(remoteAddr);
if (await protocolImpl.isListening(ret)) {
return protocolImpl.inbound(ret, remoteAddr, localAddr, lchandler);
}
Expand Down Expand Up @@ -345,7 +374,7 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
*/
const bind = async localAddr => {
// Check if we are underspecified (ends in slash)
if (localAddr.endsWith('/')) {
if (localAddr.endsWith(ENDPOINT_SEPARATOR)) {
for (;;) {
// eslint-disable-next-line no-await-in-loop
const portID = await E(protocolHandler).generatePortID(localAddr);
Expand Down Expand Up @@ -388,6 +417,7 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
throw TypeError(`listenHandler is not defined`);
}
if (listening.has(localAddr)) {
// Last one wins.
const [lport, lhandler] = listening.get(localAddr);
if (lhandler === listenHandler) {
return;
Expand All @@ -400,6 +430,8 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
listening.init(localAddr, [port, listenHandler]);
}

// TODO: Check that the listener defines onAccept.

await E(protocolHandler).onListen(
port,
localAddr,
Expand Down Expand Up @@ -536,9 +568,12 @@ export function makeLoopbackProtocolHandler(E = defaultE) {
}
const [lport, lhandler] = listeners.get(remoteAddr);
// console.log(`looking up onAccept in`, lhandler);
const rport = await E(lhandler)
.onAccept(lport, remoteAddr, localAddr, lhandler)
.catch(rethrowUnlessMissing);
const rport = await E(lhandler).onAccept(
lport,
remoteAddr,
localAddr,
lhandler,
);
// console.log(`rport is`, rport);
return rport;
},
Expand Down
19 changes: 10 additions & 9 deletions packages/SwingSet/src/vats/network/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { E as defaultE } from '@agoric/eventual-send';
import rawHarden from '@agoric/harden';
import makeStore from '@agoric/store';
import { makeNetworkProtocol } from './network';
import { makeNetworkProtocol, ENDPOINT_SEPARATOR } from './network';

const harden = /** @type {<T>(x: T) => T} */ (rawHarden);

Expand All @@ -26,29 +26,31 @@ const harden = /** @type {<T>(x: T) => T} */ (rawHarden);
/**
* Create a slash-delimited router.
*
* @param {string} [sep='/'] the delimiter of the routing strings
* @returns {Router} a new Router
*/
export default function makeRouter(sep = '/') {
export default function makeRouter() {
/**
* @type {Store<string, any>}
*/
const prefixToRoute = makeStore('prefix');
return harden({
getRoutes(addr) {
const parts = addr.split(sep);
const parts = addr.split(ENDPOINT_SEPARATOR);
/**
* @type {[string, any][]}
*/
const ret = [];
for (let i = parts.length; i > 0; i -= 1) {
// Try most specific match.
const prefix = parts.slice(0, i).join(sep);
const prefix = parts.slice(0, i).join(ENDPOINT_SEPARATOR);
if (prefixToRoute.has(prefix)) {
ret.push([prefix, prefixToRoute.get(prefix)]);
}
// Trim off the last value (after the slash).
const defaultPrefix = prefix.substr(0, prefix.lastIndexOf('/') + 1);
const defaultPrefix = prefix.substr(
0,
prefix.lastIndexOf(ENDPOINT_SEPARATOR) + 1,
);
if (prefixToRoute.has(defaultPrefix)) {
ret.push([defaultPrefix, prefixToRoute.get(defaultPrefix)]);
}
Expand All @@ -75,12 +77,11 @@ export default function makeRouter(sep = '/') {
/**
* Create a router that behaves like a Protocol.
*
* @param {string} [sep='/'] the route separator
* @param {typeof defaultE} [E=defaultE] Eventual sender
* @returns {RouterProtocol} The new delegated protocol
*/
export function makeRouterProtocol(sep = '/', E = defaultE) {
const router = makeRouter(sep);
export function makeRouterProtocol(E = defaultE) {
const router = makeRouter();
const protocols = makeStore('prefix');
const protocolHandlers = makeStore('prefix');

Expand Down
5 changes: 5 additions & 0 deletions packages/SwingSet/test/test-network.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ const makeProtocolHandler = t => {
*/
let l;
let lp;
let nonce = 0;
return harden({
async onCreate(_protocol, _impl) {
log('created', _protocol, _impl);
},
async generatePortID() {
nonce += 1;
return `${nonce}`;
},
async onBind(port, localAddr) {
t.assert(port, `port is supplied to onBind`);
t.assert(localAddr, `local address is supplied to onBind`);
Expand Down
Loading

0 comments on commit 28e6166

Please sign in to comment.