Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adaptors between notifiers and async iterables #1340

Merged
merged 1 commit into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/notifier/exports.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import './src/types';
4 changes: 3 additions & 1 deletion packages/notifier/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"homepage": "https://github.com/Agoric/agoric-sdk#readme",
"dependencies": {
"@agoric/assert": "^0.0.8",
"@agoric/eventual-send": "^0.9.3",
"@agoric/produce-promise": "^0.1.3"
},
"devDependencies": {
Expand Down Expand Up @@ -77,7 +78,8 @@
"no-unused-expressions": "off",
"no-loop-func": "off",
"no-inner-declarations": "off",
"import/prefer-default-export": "off"
"import/prefer-default-export": "off",
"import/no-extraneous-dependencies": "off"
}
},
"eslintIgnore": [
Expand Down
157 changes: 157 additions & 0 deletions packages/notifier/src/asyncIterableAdaptor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// @ts-check
// eslint-disable-next-line spaced-comment
/// <reference types="ses"/>

import { E } from '@agoric/eventual-send';
// eslint-disable-next-line import/no-cycle
import { makeNotifierKit } from './notifier';

import './types';

/**
* Adaptor from a notifierP to an async iterable.
* The notifierP can be any object that has an eventually invokable
* `getUpdateSince` method that behaves according to the notifier
* spec. This can be a notifier, a promise for a local or remote
* notfier, or a presence of a remote notifier.
*
* It is also used internally by notifier.js so that a notifier itself is an
* async iterable.
*
* An async iterable is an object with a `[Symbol.asyncIterator]()` method
* that returns an async iterator. The async iterator we return here has only
* a `next()` method, without the optional `return` and `throw` methods. The
* omitted methods, if present, would be used by the for/await/of loop to
* inform the iterator of early termination. But this adaptor would not do
erights marked this conversation as resolved.
Show resolved Hide resolved
* anything useful in reaction to this notification.
*
* An async iterator's `next()` method returns a promise for an iteration
* result. An iteration result is a record with `value` and `done` properties.
*
* The purpose of building on the notifier protocol is to have a lossy
* adaptor, where intermediate results can be missed in favor of more recent
* results which are therefore less stale. See
* https://github.com/Agoric/documentation/blob/master/main/distributed-programming.md#notifiers
*
* @template T
* @param {PromiseOrNot<BaseNotifier<T>>} notifierP
* @returns {AsyncIterable<T>}
*/
export const makeAsyncIterableFromNotifier = notifierP => {
return harden({
[Symbol.asyncIterator]: () => {
/** @type {UpdateCount} */
let localUpdateCount;
/** @type {Promise<{value: T, done: boolean}> | undefined} */
let myIterationResultP;
return harden({
next: () => {
if (!myIterationResultP) {
// In this adaptor, once `next()` is called and returns an
// unresolved promise, `myIterationResultP`, and until
// `myIterationResultP` is fulfilled with an
// iteration result, further `next()` calls will return the same
// `myIterationResultP` promise again without asking the notifier
// for more updates. If there's already an unanswered ask in the
// air, all further asks should just reuse the result of that one.
//
// This reuse behavior is only needed for code that uses the async
// iterator protocol explicitly. When this async iterator is
// consumed by a for/await/of loop, `next()` will only be called
// after the promise for the previous iteration result has
// fulfilled. If it fulfills with `done: true`, the for/await/of
// loop will never call `next()` again.
//
// See
// https://2ality.com/2016/10/asynchronous-iteration.html#queuing-next()-invocations
// for an explicit use that sends `next()` without waiting.
myIterationResultP = E(notifierP)
.getUpdateSince(localUpdateCount)
.then(({ value, updateCount }) => {
localUpdateCount = updateCount;
const done = localUpdateCount === undefined;
if (!done) {
// Once the outstanding question has been answered, stop
// using that answer, so any further `next()` questions
// cause a new `getUpdateSince` request.
//
// But only if more answers are expected. Once the notifier
// is `done`, that was the last answer so reuse it forever.
myIterationResultP = undefined;
}
return harden({ value, done });
});
}
return myIterationResultP;
},
});
},
});
};

/**
* This reads from `asyncIteratable` updating `updater` with each successive
* value. The `updater` the same API as the `updater` of a notifier kit,
* but can simply be an observer to react to these updates. As an observer,
* the `updater` may only be interested in certain occurrences (`updateState`,
* `finish`, `fail`), so for convenience, `updateFromIterable` feature
* tests for those methods before calling them.
*
* @template T
* @param {Partial<Updater<T>>} updater
* @param {AsyncIterable<T>} asyncIterable
* @returns {Promise<undefined>}
*/
// See https://github.com/Agoric/agoric-sdk/issues/1345 for why
// `updateFromIterable` currently needs a local `asyncIterable` rather than
// a possibly remote `asyncIterableP`.
export const updateFromIterable = (updater, asyncIterable) => {
const iterator = asyncIterable[Symbol.asyncIterator]();
return new Promise(ack => {
const recur = () => {
E.when(iterator.next()).then(
({ value, done }) => {
if (done) {
updater.finish && updater.finish(value);
ack();
} else {
updater.updateState && updater.updateState(value);
recur();
}
},
reason => {
updater.fail && updater.fail(reason);
ack();
},
);
};
recur();
});
};

/**
* Adaptor from async iterable to notifier.
*
* @template T
* @param {AsyncIterable<T>} asyncIterable
* @returns {Notifier<T>}
*/
export const makeNotifierFromAsyncIterable = asyncIterable => {
const { notifier, updater } = makeNotifierKit();
updateFromIterable(updater, asyncIterable);
return notifier;
};

/**
* As updates come in from the possibly remote `notifierP`, update
* the local `updater`. Since the updates come from a notifier, they
* are lossy, i.e., once a more recent state can be reported, less recent
* states are assumed irrelevant and dropped.
*
* @template T
* @param {Partial<Updater<T>>} updater
* @param {PromiseOrNot<Notifier<T>>} notifierP
* @returns {Promise<undefined>}
*/
export const updateFromNotifier = (updater, notifierP) =>
updateFromIterable(updater, makeAsyncIterableFromNotifier(notifierP));
Loading