Skip to content

Commit

Permalink
fix(captp): don't rely on TextDecoder stream flag
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Jul 19, 2021
1 parent 003c3d1 commit 5a370a8
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 31 deletions.
65 changes: 46 additions & 19 deletions packages/captp/src/atomics.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import { assert, details as X } from '@agoric/assert';

// This is a pathological minimum, but exercised by the unit test.
// transfer.
export const MIN_DATA_BUFFER_LENGTH = 1;

// Calculate how big the transfer buffer needs to be.
Expand All @@ -29,17 +28,18 @@ const splitTransferBuffer = transferBuffer => {
X`Transfer buffer of ${transferBuffer.byteLength} bytes is smaller than MIN_TRANSFER_BUFFER_LENGTH ${MIN_TRANSFER_BUFFER_LENGTH}`,
);
const lenbuf = new BigUint64Array(transferBuffer, 0, 1);

// The documentation says that this needs to be an Int32Array for use with
// Atomics.notify:
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics/notify#syntax
const statusbuf = new Int32Array(transferBuffer, lenbuf.byteLength, 1);
const overheadLength = lenbuf.byteLength + statusbuf.byteLength;
assert.equal(
overheadLength,
TRANSFER_OVERHEAD_LENGTH,
X`Internal error; actual overhead ${overheadLength} of bytes is not TRANSFER_OVERHEAD_LENGTH ${TRANSFER_OVERHEAD_LENGTH}`,
);
const databuf = new Uint8Array(
transferBuffer,
lenbuf.byteLength + statusbuf.byteLength,
);
const databuf = new Uint8Array(transferBuffer, overheadLength);
assert(
databuf.byteLength >= MIN_DATA_BUFFER_LENGTH,
X`Transfer buffer of size ${transferBuffer.byteLength} only supports ${databuf.byteLength} data bytes; need at least ${MIN_DATA_BUFFER_LENGTH}`,
Expand Down Expand Up @@ -75,7 +75,8 @@ export const makeAtomicsTrapHost = transferBuffer => {
databuf.set(subenc);

// Save the length of the remaining data.
lenbuf[0] = BigInt(encoded.length - i);
const remaining = BigInt(encoded.length - i);
lenbuf[0] = remaining;

// Calculate the next slice, and whether this is the last one.
i += subenc.length;
Expand All @@ -91,8 +92,12 @@ export const makeAtomicsTrapHost = transferBuffer => {
statusbuf[0] = rejectFlag | doneFlag;
Atomics.notify(statusbuf, 0, +Infinity);

// Wait until the next call.
yield;
if (!done) {
// Wait until the next call to `it.next()`. If the guest calls
// `it.return()` or `it.throw()`, then this yield will return or throw,
// terminating the generator function early.
yield;
}
}
};
};
Expand All @@ -110,18 +115,18 @@ export const makeAtomicsTrapGuest = transferBuffer => {
const { statusbuf, lenbuf, databuf } = splitTransferBuffer(transferBuffer);

return ({ startTrap }) => {
const td = new TextDecoder('utf-8');

let json = '';

// Start by sending the trap call to the host.
const it = startTrap();

/** @type {Uint8Array} */
let encoded;
let i = 0;
let done = false;
while (!done) {
// Tell that we are ready for another buffer.
statusbuf[0] = STATUS_WAITING;
it.next();
const { done: itDone } = it.next();
assert(!itDone, X`Internal error; it.next() returned done=${itDone}`);

// Wait for the host to wake us.
Atomics.wait(statusbuf, 0, STATUS_WAITING);
Expand All @@ -130,18 +135,40 @@ export const makeAtomicsTrapGuest = transferBuffer => {
// eslint-disable-next-line no-bitwise
done = (statusbuf[0] & STATUS_FLAG_DONE) !== 0;

// Decode the next part of the data buffer.
json += td.decode(databuf.subarray(0, Number(lenbuf[0])), {
stream: !done,
});
// Accumulate the encoded buffer.
const remaining = Number(lenbuf[0]);
const datalen = Math.min(remaining, databuf.byteLength);
if (i === 0) {
if (done) {
// Special case: we are done on first try, so we don't need to copy
// anything.
encoded = databuf.subarray(0, datalen);
break;
}
// Allocate our buffer for the remaining data.
encoded = new Uint8Array(remaining);
}

// Copy the next buffer.
encoded.set(databuf.subarray(0, datalen), i);
i += datalen;
}

// Tell the host we're finished.
it.return();
// This throw is harmless if the host iterator has already finished, and
// if not finished, captp will correctly raise an error.
//
// TODO: It would be nice to use an error type, but captp is just too
// noisy with spurious "Temporary logging of sent error" messages.
// it.throw(assert.error(X`Trap host has not finished`));
it.throw(undefined);

// eslint-disable-next-line no-bitwise
const isReject = !!(statusbuf[0] & STATUS_FLAG_REJECT);

// Decode the accumulated encoded buffer.
const td = new TextDecoder('utf-8');
const json = td.decode(encoded);

// Parse the JSON data into marshalled form.
const serialized = JSON.parse(json);
return [isReject, serialized];
Expand Down
31 changes: 19 additions & 12 deletions packages/captp/src/captp.js
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ export const makeCapTP = (
assert.typeof(
trapHost,
'function',
X`CapTP cannot answer Trap(x) without a trapHost function`,
X`CapTP cannot answer Trap(${val}) without a trapHost function`,
);

// We need to create a promise for the "isDone" iteration right now to
Expand Down Expand Up @@ -443,31 +443,38 @@ export const makeCapTP = (

const [method, args] = unserialize(serialized);

const DONE_PUMPKIN = { toString: () => 'DONE_PUMPKIN' };
const getNextResultP = async () => {
const result = await resultP;

// Done with this trap iterator.
const cleanup = () => {
trapIterator.delete(questionID);
trapIteratorResultP.delete(questionID);
return harden({ done: true });
};

try {
if (!result || result.done) {
throw DONE_PUMPKIN;
return cleanup();
}

const ait = trapIterator.get(questionID);
if (!ait) {
// The iterator is done, so we're done.
throw DONE_PUMPKIN;
return cleanup();
}

// Drive the next iteration.
return await ait[method](...args);
} catch (e) {
// Done with this trap iterator.
trapIterator.delete(questionID);
trapIteratorResultP.delete(questionID);
if (e !== DONE_PUMPKIN) {
// We had an exception.
throw e;
cleanup();
if (e === undefined) {
assert.fail(
X`trapGuest expected trapHost AsyncIterator(${questionID}) to be done, but it wasn't`,
);
}
return harden({ done: true });
assert.note(e, X`trapHost AsyncIterator(${questionID}) threw`);
throw e;
}
};

Expand Down Expand Up @@ -672,7 +679,7 @@ export const makeCapTP = (
const value = unserialize(serialized);
assert(
!isThenable(value),
X`Trap() reply cannot be a Thenable; have ${value}`,
X`Trap(${target}) reply cannot be a Thenable; have ${value}`,
);

if (isException) {
Expand Down

0 comments on commit 5a370a8

Please sign in to comment.