From 7851885dc558c13779e8c0ad60b48e417623f636 Mon Sep 17 00:00:00 2001 From: ptaylor Date: Tue, 18 Aug 2015 18:16:18 -0700 Subject: [PATCH] feat(operator): Add minimal delay operator. --- spec/operators/delay-spec.js | 18 ++++ src/Observable.ts | 1 + src/Rx.ts | 3 + src/observables/SubscribeOnObservable.ts | 12 ++- src/operators/delay.ts | 103 +++++++++++++++++++++++ 5 files changed, 133 insertions(+), 4 deletions(-) create mode 100644 spec/operators/delay-spec.js create mode 100644 src/operators/delay.ts diff --git a/spec/operators/delay-spec.js b/spec/operators/delay-spec.js new file mode 100644 index 0000000000..8a2163d7af --- /dev/null +++ b/spec/operators/delay-spec.js @@ -0,0 +1,18 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.delay()', function () { + it('should delay by 100ms', function (done) { + var time = Date.now(); + Observable + .value(42) + .delay(100) + .subscribe(function (x) { + expect(Date.now() - time >= 100).toBe(true); + }, null, function() { + expect(Date.now() - time >= 100).toBe(true); + done(); + }); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 9e5d2ffa9d..b409921372 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -109,6 +109,7 @@ export default class Observable { concurrent?: number) => Observable; expand: (project: (x: T, ix: number) => Observable) => Observable; + delay: (delay: number, scheduler?: Scheduler) => Observable; switchAll: () => Observable; switchLatest: (project: ((x: T, ix: number) => Observable), diff --git a/src/Rx.ts b/src/Rx.ts index c536104369..a39c8ef871 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -157,6 +157,9 @@ import groupBy from './operators/groupBy'; observableProto.groupBy = groupBy; +import delay from './operators/delay'; +observableProto.delay = delay; + export default { Subject, Scheduler, diff --git a/src/observables/SubscribeOnObservable.ts b/src/observables/SubscribeOnObservable.ts index bd36410a74..349a0b8876 100644 --- a/src/observables/SubscribeOnObservable.ts +++ b/src/observables/SubscribeOnObservable.ts @@ -11,15 +11,19 @@ export default class SubscribeOnObservable extends Observable { return source.subscribe(subscriber); } - constructor(public source: Observable, - protected delay: number = 0, - protected scheduler: Scheduler = Scheduler.nextTick) { + private delayTime: number; + private scheduler: Scheduler; + + constructor(source: Observable, delay: number = 0, scheduler: Scheduler = Scheduler.nextTick) { super(); + this.source = source; + this.delayTime = delay; + this.scheduler = scheduler; } _subscribe(subscriber) { - const delay = this.delay; + const delay = this.delayTime; const source = this.source; const scheduler = this.scheduler; diff --git a/src/operators/delay.ts b/src/operators/delay.ts new file mode 100644 index 0000000000..ab30c411d1 --- /dev/null +++ b/src/operators/delay.ts @@ -0,0 +1,103 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Scheduler from '../Scheduler'; +import Subscriber from '../Subscriber'; +import Notification from '../Notification'; + +export default function delay(delay: number, scheduler: Scheduler = Scheduler.immediate) { + return this.lift(new DelayOperator(delay, scheduler)); +} + +export class DelayOperator extends Operator { + + delay: number; + scheduler: Scheduler; + + constructor(delay: number, scheduler: Scheduler) { + super(); + this.delay = delay; + this.scheduler = scheduler; + } + + call(observer: Observer): Observer { + return new DelaySubscriber(observer, this.delay, this.scheduler); + } +} + +export class DelaySubscriber extends Subscriber { + + protected delay: number; + protected queue: Array=[]; + protected scheduler: Scheduler; + protected active: boolean = false; + protected errored: boolean = false; + + static dispatch(state) { + const source = state.source; + const queue = source.queue; + const scheduler = state.scheduler; + const destination = state.destination; + while (queue.length > 0 && (queue[0].time - scheduler.now()) <= 0) { + queue.shift().notification.observe(destination); + } + if (queue.length > 0) { + ( this).delay = Math.max(0, queue[0].time - scheduler.now()); + ( this).schedule(state); + } else { + source.active = false; + } + } + + constructor(destination: Observer, delay: number, scheduler: Scheduler) { + super(destination); + this.delay = delay; + this.scheduler = scheduler; + } + + _next(x) { + if (this.errored) { + return; + } + const scheduler = this.scheduler; + this.queue.push(new DelayMessage(scheduler.now() + this.delay, Notification.createNext(x))); + if (this.active === false) { + this._schedule(scheduler); + } + } + + _error(e) { + const scheduler = this.scheduler; + this.errored = true; + this.queue = [new DelayMessage(scheduler.now() + this.delay, Notification.createError(e))]; + if (this.active === false) { + this._schedule(scheduler); + } + } + + _complete() { + if (this.errored) { + return; + } + const scheduler = this.scheduler; + this.queue.push(new DelayMessage(scheduler.now() + this.delay, Notification.createComplete())); + if (this.active === false) { + this._schedule(scheduler); + } + } + + _schedule(scheduler) { + this.active = true; + this.add(scheduler.schedule(this.delay, { + source: this, destination: this.destination, scheduler: scheduler + }, DelaySubscriber.dispatch)); + } +} + +class DelayMessage { + time: number; + notification: any; + constructor(time: number, notification: any) { + this.time = time; + this.notification = notification; + } +} \ No newline at end of file