From 9e963aacd03a2139a95e2b5b5028a7b71b69f72f Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Sun, 25 Jun 2017 16:10:31 -0700 Subject: [PATCH] feat(auditTime): add higher-order lettable version of auditTime --- src/operator/auditTime.ts | 62 ++------------------------------------ src/operators/auditTime.ts | 51 +++++++++++++++++++++++++++++++ src/operators/index.ts | 1 + 3 files changed, 55 insertions(+), 59 deletions(-) create mode 100644 src/operators/auditTime.ts diff --git a/src/operator/auditTime.ts b/src/operator/auditTime.ts index dfcf78f93d..370b0c023b 100644 --- a/src/operator/auditTime.ts +++ b/src/operator/auditTime.ts @@ -1,9 +1,7 @@ import { async } from '../scheduler/async'; -import { Operator } from '../Operator'; import { IScheduler } from '../Scheduler'; -import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { Subscription, TeardownLogic } from '../Subscription'; +import { auditTime as higherOrder } from '../operators'; /** * Ignores source values for `duration` milliseconds, then emits the most recent @@ -48,59 +46,5 @@ import { Subscription, TeardownLogic } from '../Subscription'; * @owner Observable */ export function auditTime(this: Observable, duration: number, scheduler: IScheduler = async): Observable { - return this.lift(new AuditTimeOperator(duration, scheduler)); -} - -class AuditTimeOperator implements Operator { - constructor(private duration: number, - private scheduler: IScheduler) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new AuditTimeSubscriber(subscriber, this.duration, this.scheduler)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class AuditTimeSubscriber extends Subscriber { - - private value: T; - private hasValue: boolean = false; - private throttled: Subscription; - - constructor(destination: Subscriber, - private duration: number, - private scheduler: IScheduler) { - super(destination); - } - - protected _next(value: T): void { - this.value = value; - this.hasValue = true; - if (!this.throttled) { - this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this)); - } - } - - clearThrottle(): void { - const { value, hasValue, throttled } = this; - if (throttled) { - this.remove(throttled); - this.throttled = null; - throttled.unsubscribe(); - } - if (hasValue) { - this.value = null; - this.hasValue = false; - this.destination.next(value); - } - } -} - -function dispatchNext(subscriber: AuditTimeSubscriber): void { - subscriber.clearThrottle(); -} + return higherOrder(duration, scheduler)(this); +} \ No newline at end of file diff --git a/src/operators/auditTime.ts b/src/operators/auditTime.ts new file mode 100644 index 0000000000..c297f16280 --- /dev/null +++ b/src/operators/auditTime.ts @@ -0,0 +1,51 @@ +import { async } from '../scheduler/async'; +import { IScheduler } from '../Scheduler'; +import { audit } from './audit'; +import { timer } from '../observable/timer'; +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Ignores source values for `duration` milliseconds, then emits the most recent + * value from the source Observable, then repeats this process. + * + * When it sees a source values, it ignores that plus + * the next ones for `duration` milliseconds, and then it emits the most recent + * value from the source. + * + * + * + * `auditTime` is similar to `throttleTime`, but emits the last value from the + * silenced time window, instead of the first value. `auditTime` emits the most + * recent value from the source Observable on the output Observable as soon as + * its internal timer becomes disabled, and ignores source values while the + * timer is enabled. Initially, the timer is disabled. As soon as the first + * source value arrives, the timer is enabled. After `duration` milliseconds (or + * the time unit determined internally by the optional `scheduler`) has passed, + * the timer is disabled, then the most recent source value is emitted on the + * output Observable, and this process repeats for the next source value. + * Optionally takes a {@link IScheduler} for managing timers. + * + * @example Emit clicks at a rate of at most one click per second + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.auditTime(1000); + * result.subscribe(x => console.log(x)); + * + * @see {@link audit} + * @see {@link debounceTime} + * @see {@link delay} + * @see {@link sampleTime} + * @see {@link throttleTime} + * + * @param {number} duration Time to wait before emitting the most recent source + * value, measured in milliseconds or the time unit determined internally + * by the optional `scheduler`. + * @param {Scheduler} [scheduler=async] The {@link IScheduler} to use for + * managing the timers that handle the rate-limiting behavior. + * @return {Observable} An Observable that performs rate-limiting of + * emissions from the source Observable. + * @method auditTime + * @owner Observable + */ +export function auditTime(duration: number, scheduler: IScheduler = async): MonoTypeOperatorFunction { + return audit(() => timer(duration, scheduler)); +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 52e4959f0c..39fc39e299 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -1,4 +1,5 @@ export { audit } from './audit'; +export { auditTime } from './auditTime'; export { catchError } from './catchError'; export { concat } from './concat'; export { concatAll } from './concatAll';