diff --git a/packages/SwingSet/src/vats/network/bytes.js b/packages/SwingSet/src/vats/network/bytes.js index 4635b5bcc47..bbcdce84960 100644 --- a/packages/SwingSet/src/vats/network/bytes.js +++ b/packages/SwingSet/src/vats/network/bytes.js @@ -13,7 +13,6 @@ export function toBytes(data) { // TODO: We really need marshallable TypedArrays. if (typeof data === 'string') { - // eslint-disable-next-line no-bitwise data = data.split('').map(c => c.charCodeAt(0)); } diff --git a/packages/SwingSet/src/vats/network/network.js b/packages/SwingSet/src/vats/network/network.js index 10d7c837a87..7caedb6c9ce 100644 --- a/packages/SwingSet/src/vats/network/network.js +++ b/packages/SwingSet/src/vats/network/network.js @@ -33,6 +33,11 @@ export const ENDPOINT_SEPARATOR = '/'; * See multiaddr.js for an opinionated router implementation */ +/** + * @typedef {Object} Closable A closable object + * @property {() => Promise} close Terminate the object + */ + /** * @typedef {Object} Protocol The network Protocol * @property {(prefix: Endpoint) => Promise} bind Claim a port, or if ending in ENDPOINT_SEPARATOR, a fresh name @@ -50,6 +55,7 @@ export const ENDPOINT_SEPARATOR = '/'; /** * @typedef {Object} ListenHandler A handler for incoming connections * @property {(port: Port, l: ListenHandler) => Promise} [onListen] The listener has been registered + * @property {(port: Port, listenAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise} [onInbound] Return metadata for inbound connection attempt * @property {(port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise} onAccept A new connection is incoming * @property {(port: Port, rej: any, l: ListenHandler) => Promise} [onError] There was an error while listening * @property {(port: Port, l: ListenHandler) => Promise} [onRemove] The listener has been removed @@ -58,7 +64,7 @@ export const ENDPOINT_SEPARATOR = '/'; /** * @typedef {Object} Connection * @property {(packetBytes: Data) => Promise} send Send a packet on the connection - * @property {() => void} close Close both ends of the connection + * @property {() => Promise} close Close both ends of the connection * @property {() => Endpoint} getLocalAddress Get the locally bound name of this connection * @property {() => Endpoint} getRemoteAddress Get the name of the counterparty */ @@ -82,9 +88,14 @@ export const ENDPOINT_SEPARATOR = '/'; * @property {(port: Port, localAddr: Endpoint, remote: Endpoint, c: ConnectionHandler, p: ProtocolHandler) => Promise} onConnect A port initiates an outbound connection * @property {(port: Port, localAddr: Endpoint, p: ProtocolHandler) => Promise} onRevoke The port is being completely destroyed * + * @typedef {Object} InboundAttempt An inbound connection attempt + * @property {(connectionHandler: ConnectionHandler) => Promise} accept Establish the connection + * @property {() => Endpoint} getLocalAddress Return the local address for this attempt + * @property {() => Endpoint} getRemoteAddress Return the remote address for this attempt + * @property {() => Promise} close Abort the attempt + * * @typedef {Object} ProtocolImpl Things the protocol can do for us - * @property {(listenSearch: Endpoint[]) => Promise} isListening Tell whether anything in listenSearch is listening - * @property {(listenSearch: Endpoint[], localAddr: Endpoint, remoteAddr: Endpoint, connectionHandler: ConnectionHandler) => Promise} inbound Establish a connection into this protocol + * @property {(listenAddr: Endpoint, remoteAddr: Endpoint) => Promise} inbound Make an attempt to connect into this protocol * @property {(port: Port, remoteAddr: Endpoint, connectionHandler: ConnectionHandler) => Promise} outbound Create an outbound connection */ @@ -97,7 +108,7 @@ export const rethrowUnlessMissing = err => { ) { throw err; } - return true; + return false; }; /** @@ -106,7 +117,7 @@ export const rethrowUnlessMissing = err => { * @param {ConnectionHandler} handler * @param {Endpoint} localAddr * @param {Endpoint} remoteAddr - * @param {WeakSet} [current=new WeakSet()] + * @param {Set} [current=new Set()] * @param {typeof defaultE} [E=defaultE] Eventual send function * @returns {Connection} */ @@ -114,7 +125,7 @@ export const makeConnection = ( handler, localAddr, remoteAddr, - current = new WeakSet(), + current = new Set(), E = defaultE, ) => { let closed; @@ -286,7 +297,7 @@ export function getPrefixes(addr) { * @returns {Protocol} the local capability for connecting and listening */ export function makeNetworkProtocol(protocolHandler, E = defaultE) { - /** @type {Store>} */ + /** @type {Store>} */ const currentConnections = makeStore('port'); /** @@ -300,39 +311,89 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) { * @type {ProtocolImpl} */ const protocolImpl = harden({ - async isListening(listenSearch) { - const listener = listenSearch.find(addr => listening.has(addr)); - return !!listener; - }, - async inbound(listenSearch, localAddr, remoteAddr, rchandler) { - const listenAddr = listenSearch.find(addr => listening.has(addr)); - if (!listenAddr) { - throw Error(`Connection refused to ${localAddr}`); + async inbound(listenAddr, remoteAddr) { + let lastFailure = Error(`No listeners for ${listenAddr}`); + for (const listenPrefix of getPrefixes(listenAddr)) { + if (!listening.has(listenPrefix)) { + // eslint-disable-next-line no-continue + continue; + } + const [port, listener] = listening.get(listenPrefix); + let localAddr; + try { + // See if we have a listener that's willing to receive this connection. + // eslint-disable-next-line no-await-in-loop + const localSuffix = await E(listener) + .onInbound(port, listenPrefix, remoteAddr, listener) + .catch(rethrowUnlessMissing); + localAddr = localSuffix + ? `${listenPrefix}/${localSuffix}` + : listenAddr; + } catch (e) { + lastFailure = e; + // eslint-disable-next-line no-continue + continue; + } + // We have a legitimate inbound attempt. + let consummated; + const current = currentConnections.get(port); + const inboundAttempt = harden({ + getLocalAddress() { + // Return address metadata. + return localAddr; + }, + getRemoteAddress() { + return remoteAddr; + }, + async close() { + if (consummated) { + throw consummated; + } + consummated = Error(`Already closed`); + current.delete(inboundAttempt); + await E(listener) + .onReject(port, localAddr, remoteAddr, listener) + .catch(rethrowUnlessMissing); + }, + async accept(rchandler) { + if (consummated) { + throw consummated; + } + consummated = Error(`Already accepted`); + current.delete(inboundAttempt); + + const lchandler = + /** @type {ConnectionHandler} */ + // eslint-disable-next-line prettier/prettier + (await E(listener).onAccept(port, localAddr, remoteAddr, listener)); + + return crossoverConnection( + lchandler, + localAddr, + rchandler, + remoteAddr, + current, + E, + )[1]; + }, + }); + current.add(inboundAttempt); + return inboundAttempt; } - const [port, listener] = listening.get(listenAddr); - const current = currentConnections.get(port); - - const lchandler = - /** @type {ConnectionHandler} */ - (await E(listener).onAccept(port, localAddr, remoteAddr, listener)); - - return crossoverConnection( - lchandler, - localAddr, - rchandler, - remoteAddr, - current, - E, - )[1]; + throw lastFailure; }, async outbound(port, remoteAddr, lchandler) { const localAddr = /** @type {string} */ (await E(port).getLocalAddress()); - const ret = getPrefixes(remoteAddr); - if (await protocolImpl.isListening(ret)) { - return protocolImpl.inbound(ret, remoteAddr, localAddr, lchandler); + let lastFailure; + try { + // Attempt the loopback connection. + const attempt = await protocolImpl.inbound(remoteAddr, localAddr); + return attempt.accept(lchandler); + } catch (e) { + lastFailure = e; } const rchandler = @@ -346,7 +407,7 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) { )); if (!rchandler) { - throw Error(`Cannot connect to ${remoteAddr}`); + throw lastFailure; } const current = currentConnections.get(port); diff --git a/packages/cosmic-swingset/lib/ag-solo/vats/ibc.js b/packages/cosmic-swingset/lib/ag-solo/vats/ibc.js index 932651c38ee..f08a6e520de 100644 --- a/packages/cosmic-swingset/lib/ag-solo/vats/ibc.js +++ b/packages/cosmic-swingset/lib/ag-solo/vats/ibc.js @@ -4,7 +4,6 @@ import { rethrowUnlessMissing, dataToBase64, base64ToBytes, - getPrefixes, } from '@agoric/swingset-vat/src/vats/network'; import makeStore from '@agoric/store'; import { producePromise } from '@agoric/produce-promise'; @@ -26,6 +25,7 @@ const FIXME_ALLOW_NAIVE_RELAYS = true; * @typedef {import('@agoric/swingset-vat/src/vats/network').ProtocolImpl} ProtocolImpl * @typedef {import('@agoric/swingset-vat/src/vats/network').ConnectionHandler} ConnectionHandler * @typedef {import('@agoric/swingset-vat/src/vats/network').Connection} Connection + * @typedef {import('@agoric/swingset-vat/src/vats/network').InboundAttempt} InboundAttempt * @typedef {import('@agoric/swingset-vat/src/vats/network').Port} Port * @typedef {import('@agoric/swingset-vat/src/vats/network').Endpoint} Endpoint * @typedef {import('@agoric/swingset-vat/src/vats/network').Bytes} Bytes @@ -113,6 +113,11 @@ export function makeIBCProtocolHandler( */ const channelKeyToInfo = makeStore('CHANNEL:PORT'); + /** + * @type {Store>} + */ + const channelKeyToAttemptP = makeStore('CHANNEL:PORT'); + /** * @type {Set} */ @@ -396,7 +401,6 @@ export function makeIBCProtocolHandler( // TODO: Will need to change to dispatch (without sending) // a ChanOpenInit to get a passive relayer flowing. - // eslint-disable-next-line no-constant-condition if (false) { const packet = { source_channel: channelID, @@ -494,6 +498,7 @@ EOF rPortID, removeMatching = false, ) { + // /ibc-port/portID/ibc-channel/channelID(/ibc-hop/hop1(/ibc-hop/hop2))/ibc-port/rPortID/ibc-channel/rChannelID // FIXME: Leaves garbage behind in the less specific outboundWaiters. for (let i = 0; i <= hops.length; i += 1) { // Try most specific to least specific outbound connections. @@ -546,8 +551,9 @@ EOF async fromBridge(srcID, obj) { console.warn('IBC fromBridge', srcID, obj); switch (obj.event) { - case 'channelOpenTry': case 'channelOpenInit': { + // This event is sent by a naive relayer that wants to initiate + // a connection. const { channelID, portID, @@ -556,20 +562,96 @@ EOF } = obj; const channelKey = `${channelID}:${portID}`; - const waiter = getWaiter( - hops, + + if (FIXME_ALLOW_NAIVE_RELAYS) { + // Continue the handshake if we are waiting for it. + const waiter = getWaiter( + hops, + channelID, + portID, + rChannelID, + rPortID, + false, + ); + if (waiter) { + // We have more specific information for the outbound connection. + channelKeyToInfo.set(channelKey, obj); + break; + } + } + + // We're not waiting for an init, so throw. + throw Error(`No waiting outbound connection for ${channelKey}`); + } + + case 'attemptChannelOpenTry': + case 'channelOpenTry': { + // They're (more or less politely) asking if we are listening, so make an attempt. + const { channelID, portID, - rChannelID, - rPortID, - false, + counterparty: { port_id: rPortID, channel_id: rChannelID }, + connectionHops: hops, + order, + version, + counterpartyVersion: rVersion, + } = obj; + + const channelKey = `${channelID}:${portID}`; + if (channelKeyToAttemptP.has(channelKey)) { + // We have a pending attempt, so continue the handshake. + break; + } + + const versionSuffix = version ? `/${version}` : ''; + const localAddr = `/ibc-port/${portID}/${order.toLowerCase()}${versionSuffix}`; + const ibcHops = hops.map(hop => `/ibc-hop/${hop}`).join('/'); + const remoteAddr = `${ibcHops}/ibc-port/${rPortID}/${order.toLowerCase()}/${rVersion}`; + + // See if we allow an inbound attempt for this address pair (without rejecting). + const attemptP = + /** @type {Promise} */ + (E(protocolImpl).inbound(localAddr, remoteAddr)); + + // Tell what version string we negotiated. + const attemptedLocal = + /** @type {string} */ + (await E(attemptP).getLocalAddress()); + const match = attemptedLocal.match( + // Match: /ibc-port/PORT /ORDER/VERSION... + new RegExp('^/ibc-port/[^/]+/[^/]+/([^/]+)(/|$)'), ); - if (!waiter) { - await E(protocolImpl).isListening([`/ibc-port/${portID}`]); - channelKeyToInfo.init(channelKey, obj); - } else { - // We have more specific information. - channelKeyToInfo.set(channelKey, obj); + if (!match) { + throw Error( + `Cannot determine version from attempted local address ${attemptedLocal}`, + ); + } + + channelKeyToAttemptP.init(channelKey, attemptP); + channelKeyToInfo.init(channelKey, obj); + + const negotiatedVersion = match[1]; + if (obj.type === 'attemptChannelOpenTry') { + // We can try to open with the version we wanted. + const packet = { + source_channel: channelID, + source_port: portID, + destination_channel: rChannelID, + destination_port: rPortID, + }; + + await callIBCDevice('channelOpenTry', { + packet, + order, + hops, + version: negotiatedVersion, + counterpartyVersion: rVersion, + }); + } else if (negotiatedVersion !== version) { + // Too late to change the version. + throw Error( + `Rejecting version ${version}; we negotiated ${negotiatedVersion}`, + ); } break; } @@ -615,8 +697,6 @@ EOF break; } - // Check for a listener for this subprotocol. - const listenSearch = getPrefixes(localAddr); const rchandler = makeIBCConnectionHandler( channelID, portID, @@ -625,9 +705,8 @@ EOF order === 'ORDERED', ); - // Actually connect. - // eslint-disable-next-line prettier/prettier - E(protocolImpl).inbound(listenSearch, localAddr, remoteAddr, rchandler); + // Attempt a connection and accept our handler. + E(E(protocolImpl).inbound(localAddr, remoteAddr)).accept(rchandler); break; } diff --git a/packages/cosmic-swingset/x/swingset/ibc.go b/packages/cosmic-swingset/x/swingset/ibc.go index 433d1b770b5..e071e7f0485 100644 --- a/packages/cosmic-swingset/x/swingset/ibc.go +++ b/packages/cosmic-swingset/x/swingset/ibc.go @@ -24,12 +24,34 @@ type channelMessage struct { // comes from swingset's IBC handler Method string `json:"method"` Packet channeltypes.Packet `json:"packet"` RelativeTimeout uint64 `json:"relativeTimeout"` - Order ibctypes.Order `json:"order"` + Order string `json:"order"` Hops []string `json:"hops"` Version string `json:"version"` Ack []byte `json:"ack"` } +func stringToOrder(order string) ibctypes.Order { + switch order { + case "ORDERED": + return ibctypes.ORDERED + case "UNORDERED": + return ibctypes.UNORDERED + default: + return ibctypes.NONE + } +} + +func orderToString(order ibctypes.Order) string { + switch order { + case ibctypes.ORDERED: + return "ORDERED" + case ibctypes.UNORDERED: + return "UNORDERED" + default: + return "NONE" + } +} + // DefaultRouter is a temporary hack until cosmos-sdk implements its features FIXME. type DefaultRouter struct { *port.Router @@ -97,7 +119,7 @@ func (ch channelHandler) Receive(ctx *ControllerContext, str string) (ret string case "channelOpenInit": err = ctx.Keeper.ChanOpenInit( - ctx.Context, msg.Order, msg.Hops, + ctx.Context, stringToOrder(msg.Order), msg.Hops, msg.Packet.SourcePort, msg.Packet.SourceChannel, msg.Packet.DestinationPort, msg.Packet.DestinationChannel, msg.Version, @@ -142,7 +164,7 @@ func (am AppModule) CallToController(ctx sdk.Context, send string) (string, erro type channelOpenInitEvent struct { Type string `json:"type"` // IBC Event string `json:"event"` // channelOpenInit - Order ibctypes.Order `json:"order"` + Order string `json:"order"` ConnectionHops []string `json:"connectionHops"` PortID string `json:"portID"` ChannelID string `json:"channelID"` @@ -164,7 +186,7 @@ func (am AppModule) OnChanOpenInit( event := channelOpenInitEvent{ Type: "IBC_EVENT", Event: "channelOpenInit", - Order: order, + Order: orderToString(order), ConnectionHops: connectionHops, PortID: portID, ChannelID: channelID, @@ -193,7 +215,7 @@ func (am AppModule) OnChanOpenInit( type channelOpenTryEvent struct { Type string `json:"type"` // IBC Event string `json:"event"` // channelOpenTry - Order ibctypes.Order `json:"order"` + Order string `json:"order"` ConnectionHops []string `json:"connectionHops"` PortID string `json:"portID"` ChannelID string `json:"channelID"` @@ -217,7 +239,7 @@ func (am AppModule) OnChanOpenTry( event := channelOpenTryEvent{ Type: "IBC_EVENT", Event: "channelOpenTry", - Order: order, + Order: orderToString(order), ConnectionHops: connectionHops, PortID: portID, ChannelID: channelID,