diff --git a/packages/eventual-send/src/index.js b/packages/eventual-send/src/index.js index a91b7b248c2..bb39ff38589 100644 --- a/packages/eventual-send/src/index.js +++ b/packages/eventual-send/src/index.js @@ -54,15 +54,46 @@ export function makeHandledPromise(Promise) { let presenceToPromise; let promiseToHandler; let promiseToPresence; // only for HandledPromise.unwrap + let promiseToForwardedPromise; // forwarding, union-find-ish function ensureMaps() { if (!presenceToHandler) { presenceToHandler = new WeakMap(); presenceToPromise = new WeakMap(); promiseToHandler = new WeakMap(); promiseToPresence = new WeakMap(); + promiseToForwardedPromise = new WeakMap(); } } + function shorten(target, error = false) { + let p = target; + // target -> resolved1 -> resolved2 + while (promiseToForwardedPromise.has(p)) { + p = promiseToForwardedPromise.get(p); + } + const presence = promiseToPresence.get(p); + if (presence) { + // Presences are final, so it is ok to propagate + // this upstream. + while (target !== p) { + const parent = promiseToForwardedPromise.get(target); + promiseToForwardedPromise.delete(target); + promiseToPresence.set(target, presence); + target = parent; + } + } else { + // Even if p has an unfulfilledHandler, we don't care. + // We still propagate only p upstream since + // unfulfulled handlers are transient. + while (target !== p) { + const parent = promiseToForwardedPromise.get(target); + promiseToForwardedPromise.set(target, p); + target = parent; + } + } + return target; + } + // This special handler accepts Promises, and forwards // handled Promises to their corresponding fulfilledHandler. let forwardingHandler; @@ -75,33 +106,47 @@ export function makeHandledPromise(Promise) { } let handledResolve; let handledReject; - let fulfilled = false; + let resolved = false; + let handledP; const superExecutor = (resolve, reject) => { handledResolve = value => { - fulfilled = true; + if (promiseToForwardedPromise.has(handledP)) { + throw new TypeError('internal: already forwarded'); + } + value = shorten(value); + resolved = true; + let targetP; + if (promiseToHandler.has(value) || promiseToPresence.has(value)) { + targetP = value; + } else { + targetP = presenceToPromise.get(value); + } + if (targetP && targetP !== handledP) { + promiseToForwardedPromise.set(handledP, targetP); + } else { + promiseToForwardedPromise.delete(handledP); + } resolve(value); }; handledReject = err => { - fulfilled = true; + if (promiseToForwardedPromise.has(handledP)) { + throw new TypeError('internal: already forwarded'); + } + promiseToHandler.delete(handledP); + resolved = true; reject(err); }; }; - const handledP = harden( - Reflect.construct(Promise, [superExecutor], new.target), - ); + handledP = harden(Reflect.construct(Promise, [superExecutor], new.target)); ensureMaps(); - let continueForwarding = () => {}; - if (!unfulfilledHandler) { - // Create a simple unfulfilledHandler that just postpones until the + const makePostponedHandler = () => { + // Create a simple postponedHandler that just postpones until the // fulfilledHandler is set. - // - // This is insufficient for actual remote handled Promises - // (too many round-trips), but is an easy way to create a - // local handled Promise. + let donePostponing; const interlockP = new Promise((resolve, reject) => { - continueForwarding = (err = null, targetP = undefined) => { + donePostponing = (err = null, targetP = undefined) => { if (err !== null) { reject(err); return; @@ -115,7 +160,7 @@ export function makeHandledPromise(Promise) { // It will bubble up to the HandledPromise itself. interlockP.catch(_ => {}); - const makePostponed = postponedOperation => { + const makePostponedOperation = postponedOperation => { // Just wait until the handler is resolved/rejected. return function postpone(x, ...args) { // console.log(`forwarding ${postponedOperation} ${args[0]}`); @@ -133,10 +178,19 @@ export function makeHandledPromise(Promise) { }; }; - unfulfilledHandler = { - get: makePostponed('get'), - applyMethod: makePostponed('applyMethod'), + const postponedHandler = { + get: makePostponedOperation('get'), + applyMethod: makePostponedOperation('applyMethod'), }; + return [postponedHandler, donePostponing]; + }; + + let continueForwarding = () => {}; + if (!unfulfilledHandler) { + // This is insufficient for actual remote handled Promises + // (too many round-trips), but is an easy way to create a + // local handled Promise. + [unfulfilledHandler, continueForwarding] = makePostponedHandler(); } const validateHandler = h => { @@ -150,18 +204,24 @@ export function makeHandledPromise(Promise) { promiseToHandler.set(handledP, unfulfilledHandler); const rejectHandled = reason => { - if (fulfilled) { + if (resolved) { return; } + if (promiseToForwardedPromise.has(handledP)) { + throw new TypeError('internal: already forwarded'); + } handledReject(reason); continueForwarding(reason); }; let resolvedPresence = null; const resolveWithPresence = presenceHandler => { - if (fulfilled) { + if (resolved) { return resolvedPresence; } + if (promiseToForwardedPromise.has(handledP)) { + throw new TypeError('internal: already forwarded'); + } try { // Sanity checks. validateHandler(presenceHandler); @@ -191,9 +251,12 @@ export function makeHandledPromise(Promise) { }; const resolveHandled = async (target, deprecatedPresenceHandler) => { - if (fulfilled) { + if (resolved) { return undefined; } + if (promiseToForwardedPromise.has(handledP)) { + throw new TypeError('internal: already forwarded'); + } try { if (deprecatedPresenceHandler) { throw TypeError( @@ -201,6 +264,8 @@ export function makeHandledPromise(Promise) { ); } + target = shorten(target); + // Resolve with the target when it's ready. handledResolve(target); @@ -211,21 +276,29 @@ export function makeHandledPromise(Promise) { return continueForwarding(null, target); } - // See if the target is a presence we already know of. - let presence; - try { - presence = HandledPromise.unwrap(target); - } catch (e) { - presence = await target; - } - const existingPresenceHandler = presenceToHandler.get(presence); - if (existingPresenceHandler) { + const receivePresence = presence => { + const existingPresenceHandler = presenceToHandler.get(presence); + if (!existingPresenceHandler) { + return false; + } promiseToHandler.set(handledP, existingPresenceHandler); promiseToPresence.set(handledP, presence); - return continueForwarding(null, handledP); + continueForwarding(null, handledP); + return true; + }; + + // See if the target is a presence we already know of. + if (receivePresence(promiseToPresence.get(target))) { + return undefined; + } + + // Wait for the target before continuing. + const presence = await target; + if (receivePresence(presence)) { + return undefined; } - // Remove the mapping, as we don't need a handler. + // Remove the mapping, as we don't need a handler anymore. promiseToHandler.delete(handledP); return continueForwarding(); } catch (e) { @@ -371,6 +444,7 @@ export function makeHandledPromise(Promise) { handle = (p, operation, ...args) => { ensureMaps(); + p = shorten(p); const unfulfilledHandler = promiseToHandler.get(p); let executor; if (