Skip to content

Commit

Permalink
fix: shorten HandledPromises to propagate handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Apr 11, 2020
1 parent b3dbdcf commit 2ed50d2
Showing 1 changed file with 106 additions and 32 deletions.
138 changes: 106 additions & 32 deletions packages/eventual-send/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,46 @@ export function makeHandledPromise(Promise) {
let presenceToPromise;
let promiseToHandler;
let promiseToPresence; // only for HandledPromise.unwrap
let promiseToForwardedPromise; // forwarding, union-find-ish
function ensureMaps() {
if (!presenceToHandler) {
presenceToHandler = new WeakMap();
presenceToPromise = new WeakMap();
promiseToHandler = new WeakMap();
promiseToPresence = new WeakMap();
promiseToForwardedPromise = new WeakMap();
}
}

function shorten(target, error = false) {
let p = target;
// target -> resolved1 -> resolved2
while (promiseToForwardedPromise.has(p)) {
p = promiseToForwardedPromise.get(p);
}
const presence = promiseToPresence.get(p);
if (presence) {
// Presences are final, so it is ok to propagate
// this upstream.
while (target !== p) {
const parent = promiseToForwardedPromise.get(target);
promiseToForwardedPromise.delete(target);
promiseToPresence.set(target, presence);
target = parent;
}
} else {
// Even if p has an unfulfilledHandler, we don't care.
// We still propagate only p upstream since
// unfulfulled handlers are transient.
while (target !== p) {
const parent = promiseToForwardedPromise.get(target);
promiseToForwardedPromise.set(target, p);
target = parent;
}
}
return target;
}

// This special handler accepts Promises, and forwards
// handled Promises to their corresponding fulfilledHandler.
let forwardingHandler;
Expand All @@ -75,33 +106,47 @@ export function makeHandledPromise(Promise) {
}
let handledResolve;
let handledReject;
let fulfilled = false;
let resolved = false;
let handledP;
const superExecutor = (resolve, reject) => {
handledResolve = value => {
fulfilled = true;
if (promiseToForwardedPromise.has(handledP)) {
throw new TypeError('internal: already forwarded');
}
value = shorten(value);
resolved = true;
let targetP;
if (promiseToHandler.has(value) || promiseToPresence.has(value)) {
targetP = value;
} else {
targetP = presenceToPromise.get(value);
}
if (targetP && targetP !== handledP) {
promiseToForwardedPromise.set(handledP, targetP);
} else {
promiseToForwardedPromise.delete(handledP);
}
resolve(value);
};
handledReject = err => {
fulfilled = true;
if (promiseToForwardedPromise.has(handledP)) {
throw new TypeError('internal: already forwarded');
}
promiseToHandler.delete(handledP);
resolved = true;
reject(err);
};
};
const handledP = harden(
Reflect.construct(Promise, [superExecutor], new.target),
);
handledP = harden(Reflect.construct(Promise, [superExecutor], new.target));

ensureMaps();
let continueForwarding = () => {};

if (!unfulfilledHandler) {
// Create a simple unfulfilledHandler that just postpones until the
const makePostponedHandler = () => {
// Create a simple postponedHandler that just postpones until the
// fulfilledHandler is set.
//
// This is insufficient for actual remote handled Promises
// (too many round-trips), but is an easy way to create a
// local handled Promise.
let donePostponing;
const interlockP = new Promise((resolve, reject) => {
continueForwarding = (err = null, targetP = undefined) => {
donePostponing = (err = null, targetP = undefined) => {
if (err !== null) {
reject(err);
return;
Expand All @@ -115,7 +160,7 @@ export function makeHandledPromise(Promise) {
// It will bubble up to the HandledPromise itself.
interlockP.catch(_ => {});

const makePostponed = postponedOperation => {
const makePostponedOperation = postponedOperation => {
// Just wait until the handler is resolved/rejected.
return function postpone(x, ...args) {
// console.log(`forwarding ${postponedOperation} ${args[0]}`);
Expand All @@ -133,10 +178,19 @@ export function makeHandledPromise(Promise) {
};
};

unfulfilledHandler = {
get: makePostponed('get'),
applyMethod: makePostponed('applyMethod'),
const postponedHandler = {
get: makePostponedOperation('get'),
applyMethod: makePostponedOperation('applyMethod'),
};
return [postponedHandler, donePostponing];
};

let continueForwarding = () => {};
if (!unfulfilledHandler) {
// This is insufficient for actual remote handled Promises
// (too many round-trips), but is an easy way to create a
// local handled Promise.
[unfulfilledHandler, continueForwarding] = makePostponedHandler();
}

const validateHandler = h => {
Expand All @@ -150,18 +204,24 @@ export function makeHandledPromise(Promise) {
promiseToHandler.set(handledP, unfulfilledHandler);

const rejectHandled = reason => {
if (fulfilled) {
if (resolved) {
return;
}
if (promiseToForwardedPromise.has(handledP)) {
throw new TypeError('internal: already forwarded');
}
handledReject(reason);
continueForwarding(reason);
};

let resolvedPresence = null;
const resolveWithPresence = presenceHandler => {
if (fulfilled) {
if (resolved) {
return resolvedPresence;
}
if (promiseToForwardedPromise.has(handledP)) {
throw new TypeError('internal: already forwarded');
}
try {
// Sanity checks.
validateHandler(presenceHandler);
Expand Down Expand Up @@ -191,16 +251,21 @@ export function makeHandledPromise(Promise) {
};

const resolveHandled = async (target, deprecatedPresenceHandler) => {
if (fulfilled) {
if (resolved) {
return undefined;
}
if (promiseToForwardedPromise.has(handledP)) {
throw new TypeError('internal: already forwarded');
}
try {
if (deprecatedPresenceHandler) {
throw TypeError(
`resolveHandled no longer accepts a handler; use resolveWithPresence`,
);
}

target = shorten(target);

// Resolve with the target when it's ready.
handledResolve(target);

Expand All @@ -211,21 +276,29 @@ export function makeHandledPromise(Promise) {
return continueForwarding(null, target);
}

// See if the target is a presence we already know of.
let presence;
try {
presence = HandledPromise.unwrap(target);
} catch (e) {
presence = await target;
}
const existingPresenceHandler = presenceToHandler.get(presence);
if (existingPresenceHandler) {
const receivePresence = presence => {
const existingPresenceHandler = presenceToHandler.get(presence);
if (!existingPresenceHandler) {
return false;
}
promiseToHandler.set(handledP, existingPresenceHandler);
promiseToPresence.set(handledP, presence);
return continueForwarding(null, handledP);
continueForwarding(null, handledP);
return true;
};

// See if the target is a presence we already know of.
if (receivePresence(promiseToPresence.get(target))) {
return undefined;
}

// Wait for the target before continuing.
const presence = await target;
if (receivePresence(presence)) {
return undefined;
}

// Remove the mapping, as we don't need a handler.
// Remove the mapping, as we don't need a handler anymore.
promiseToHandler.delete(handledP);
return continueForwarding();
} catch (e) {
Expand Down Expand Up @@ -371,6 +444,7 @@ export function makeHandledPromise(Promise) {

handle = (p, operation, ...args) => {
ensureMaps();
p = shorten(p);
const unfulfilledHandler = promiseToHandler.get(p);
let executor;
if (
Expand Down

0 comments on commit 2ed50d2

Please sign in to comment.