From a295353404db79aa3f3644bc20b82818840ae03d Mon Sep 17 00:00:00 2001 From: Matthieu Sieben Date: Mon, 4 Mar 2024 20:44:06 +0100 Subject: [PATCH] watch: debounce restart in watch mode --- lib/internal/debounce_iterable.js | 169 ++++++++++++++++++++++++++++++ lib/internal/main/watch_mode.js | 5 +- 2 files changed, 172 insertions(+), 2 deletions(-) create mode 100644 lib/internal/debounce_iterable.js diff --git a/lib/internal/debounce_iterable.js b/lib/internal/debounce_iterable.js new file mode 100644 index 000000000000000..6e745f9eff1163d --- /dev/null +++ b/lib/internal/debounce_iterable.js @@ -0,0 +1,169 @@ +'use strict'; + +const { + ArrayPrototypePushApply, + ObjectGetPrototypeOf, + ObjectSetPrototypeOf, + Promise, + PromiseResolve, + SymbolAsyncIterator, + SymbolIterator, +} = primordials; + +const { + codes: { ERR_INVALID_ARG_TYPE }, +} = require('internal/errors'); +const FixedQueue = require('internal/fixed_queue'); + +const AsyncIteratorPrototype = ObjectGetPrototypeOf( + ObjectGetPrototypeOf(async function* () {}).prototype +); + +/** + * Wraps an iterable in a debounced iterable. When trying to get the next item, + * the debounced iterable will group all items that are returned less than + * `delay` milliseconds apart into a single batch. + * + * The debounced iterable will only start consuming the original iterable when + * the first consumer requests the next item, and will stop consuming the + * original iterable when no more items are requested (through `next` calls). + * + * Each debounced iterable item will be an array of items from the original + * iterable, and will always contain at least one item. This allows the consumer + * to decide how to handle the batch of items (e.g. take tha latest only, ensure + * unicity, etc.). + * + * @template T + * @param {Iterable | AsyncIterable} iterable + * @param {number} delay + * @returns {AsyncIterableIterator<[T, ...T[]]>} + */ +exports.debounceIterable = function debounceIterable(iterable, delay) { + const innerIterator = + SymbolAsyncIterator in iterable + ? iterable[SymbolAsyncIterator]() + : iterable[SymbolIterator](); + + let doneProducing = false; + let doneConsuming = false; + let consuming = false; + let error = null; + let timer = null; + + const unconsumedPromises = new FixedQueue(); + let unconsumedValues = []; + + return ObjectSetPrototypeOf( + { + [SymbolAsyncIterator]() { + return this; + }, + + next() { + return new Promise((resolve, reject) => { + unconsumedPromises.push({ resolve, reject }); + startConsuming(); + }); + }, + + return() { + return closeHandler(); + }, + + throw(err) { + if (!err || !(err instanceof Error)) { + throw new ERR_INVALID_ARG_TYPE('AsyncIterator.throw', 'Error', err); + } + errorHandler(err); + }, + }, + AsyncIteratorPrototype + ); + + async function startConsuming() { + if (consuming) return; + + consuming = true; + + while (!doneProducing && !doneConsuming && !unconsumedPromises.isEmpty()) { + try { + // if `result` takes longer than `delay` to resolve, make sure any + // unconsumedValue are flushed. + scheduleFlush(); + + const result = await innerIterator.next(); + + // A new value just arrived. Make sure we wont flush just yet. + unscheduleFlush(); + + if (result.done) { + doneProducing = true; + } else if (!doneConsuming) { + ArrayPrototypePushApply(unconsumedValues, result.value); + } + } catch (err) { + doneProducing = true; + error ||= err; + } + } + + flushNow(); + + consuming = false; + } + + function scheduleFlush() { + if (timer == null) { + timer = setTimeout(flushNow, delay).unref(); + } + } + + function unscheduleFlush() { + if (timer != null) { + clearTimeout(timer); + timer = null; + } + } + + function flushNow() { + unscheduleFlush(); + + if (!doneConsuming) { + if (unconsumedValues.length > 0 && !unconsumedPromises.isEmpty()) { + unconsumedPromises + .shift() + .resolve({ done: false, value: unconsumedValues }); + unconsumedValues = []; + } + if (doneProducing && unconsumedValues.length === 0) { + doneConsuming = true; + } + } + + while (doneConsuming && !unconsumedPromises.isEmpty()) { + const { resolve, reject } = unconsumedPromises.shift(); + if (error) reject(error); + else resolve({ done: true, value: undefined }); + } + } + + function errorHandler(err) { + error ||= err; + + closeHandler(); + } + + function closeHandler() { + doneConsuming = true; + unconsumedValues = []; + + flushNow(); + + if (!doneProducing) { + doneProducing = true; + innerIterator.return?.(); + } + + return PromiseResolve({ done: true, value: undefined }); + } +}; diff --git a/lib/internal/main/watch_mode.js b/lib/internal/main/watch_mode.js index 3be329c57af6db4..fba135cfe0a09b6 100644 --- a/lib/internal/main/watch_mode.js +++ b/lib/internal/main/watch_mode.js @@ -17,6 +17,7 @@ const { triggerUncaughtException, exitCodes: { kNoFailure }, } = internalBinding('errors'); +const { debounceIterable } = require('internal/debounce_iterable'); const { getOptionValue } = require('internal/options'); const { emitExperimentalWarning } = require('internal/util'); const { FilesWatcher } = require('internal/watch_mode/files_watcher'); @@ -44,7 +45,7 @@ const args = ArrayPrototypeFilter(process.execArgv, (arg, i, arr) => arg !== '--watch' && !StringPrototypeStartsWith(arg, '--watch=') && arg !== '--watch-preserve-output'); ArrayPrototypePushApply(args, kCommand); -const watcher = new FilesWatcher({ debounce: 200, mode: kShouldFilterModules ? 'filter' : 'all' }); +const watcher = new FilesWatcher({ mode: kShouldFilterModules ? 'filter' : 'all' }); ArrayPrototypeForEach(kWatchedPaths, (p) => watcher.watchPath(p)); let graceTimer; @@ -117,7 +118,7 @@ async function restart() { start(); // eslint-disable-next-line no-unused-vars - for await (const _ of on(watcher, 'changed')) { + for await (const _ of debounceIterable(on(watcher, 'changed'), 200)) { await restart(); } } catch (error) {