From 4ef41b0469de6ee58e284e47d6f5daf46f200688 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 17 Sep 2015 17:34:07 -0700 Subject: [PATCH] fix(bufferTime): inner intervals will now clean up properly - adds marble tests around bufferTime - adds `maxFrames` property to `VirtualTimeScheduler` that will limit the execution of tests --- spec/operators/bufferTime-spec.js | 103 ++++++++++++++++++------- src/operators/bufferTime.ts | 10 ++- src/schedulers/VirtualTimeScheduler.ts | 9 ++- 3 files changed, 90 insertions(+), 32 deletions(-) diff --git a/spec/operators/bufferTime-spec.js b/spec/operators/bufferTime-spec.js index b7881a9d02..5245b413ee 100644 --- a/spec/operators/bufferTime-spec.js +++ b/spec/operators/bufferTime-spec.js @@ -1,34 +1,81 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, hot, cold, rxTestScheduler, expectObservable */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; -describe('Observable.prototype.bufferTime', function () { - it('should emit buffers at intervals', function (done) { - var expected = [ - [0, 1, 2], - [3, 4, 5], - [6, 7, 8] - ]; - Observable.interval(100) - .bufferTime(320) - .take(3) - .subscribe(function (w) { - expect(w).toEqual(expected.shift()) - }, null, done); - }, 2000); +describe('Observable.prototype.bufferTime', function () { + it('should emit buffers at intervals', function (){ + var values = { + w: ['a','b'], + x: ['c','d','e'], + y: ['f', 'g'], + z: [] + }; + var e1 = hot('---a---b---c---d---e---f---g---|'); + var expected = '----------w---------x---------y(z|)'; + + expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, values); + }); + it('should emit buffers at intervals test 2', function() { + var e1 = hot('---------a---------b---------c---------d---------e---------g--------|') + var expected = '--------------------------------x-------------------------------y---(z|)'; + + expectObservable(e1.bufferTime(320, null, rxTestScheduler)).toBe(expected, { x: ['a','b','c'], y: ['d', 'e', 'g'], z: []}); + }); - it('should emit buffers that have been created at intervals and close after the specified delay', function (done) { - var expected = [ - [0, 1, 2, 3, 4], - [2, 3, 4, 5, 6], - [4, 5, 6, 7, 8] - ]; - Observable.interval(100) - .bufferTime(520, 220) - .take(3) - .subscribe(function (w) { - expect(w).toEqual(expected.shift()) - }, null, done); - }, 2000); + it('should emit buffers that have been created at intervals and close after the specified delay', function (){ + var e1 = hot('---a---b---c----d----e----f----g----h----i----(k|)'); + // --------------------*--------------------*---- start interval + // ---------------------| timespans + // ---------------------| + // -----| + var expected = '---------------------x-------------------y----(z|)'; + var values = { + x: ['a', 'b', 'c', 'd', 'e'], + y: ['e', 'f', 'g', 'h', 'i'], + z: ['i', 'k'] + }; + expectObservable(e1.bufferTime(210, 200, rxTestScheduler)).toBe(expected, values); + }); + + it('should handle empty', function (){ + var e1 = Observable.empty(); + expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe('(a|)', { a: [] }); + }); + + it('should handle never', function () { + var e1 = Observable.never(); + var expected = '----------a---------a---------a---------a---------a---------a---------a-----'; // 750 frame limit + expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, { a: [] }); + }); + + it('should handle throw', function (){ + var e1 = Observable.throw(new Error('haha')); + var expected = '#'; + expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, undefined, new Error('haha')); + }); + + it('should handle errors', function () { + var values = { + w: ['a','b'] + }; + var e1 = hot('---a---b---c---#---e---f---g---|'); + var expected = '----------w----#'; + + expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, values); + }); + + it('should emit buffers that have been created at intervals and close after the specified delay with errors', function (){ + var e1 = hot('---a---b---c----d----e----f----g----h----i--#'); + // --------------------*--------------------*---- start interval + // ---------------------| timespans + // ---------------------| + // -----| + var expected = '---------------------x-------------------y--#'; + var values = { + x: ['a', 'b', 'c', 'd', 'e'], + y: ['e', 'f', 'g', 'h', 'i'] + }; + expectObservable(e1.bufferTime(210, 200, rxTestScheduler)).toBe(expected, values); + }); }); \ No newline at end of file diff --git a/src/operators/bufferTime.ts b/src/operators/bufferTime.ts index df5b6c06dc..30659b368b 100644 --- a/src/operators/bufferTime.ts +++ b/src/operators/bufferTime.ts @@ -82,15 +82,19 @@ function dispatchBufferTimeSpanOnly(state) { } state.buffer = subscriber.openBuffer(); - (this).schedule(state, state.bufferTimeSpan); + if(!subscriber.isUnsubscribed) { + (this).schedule(state, state.bufferTimeSpan); + } } function dispatchBufferCreation(state) { let { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state; let buffer = subscriber.openBuffer(); var action = this; - action.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer })); - action.schedule(state, bufferCreationInterval); + if(!subscriber.isUnsubscribed) { + action.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer })); + action.schedule(state, bufferCreationInterval); + } } function dispatchBufferClose({ subscriber, buffer }) { diff --git a/src/schedulers/VirtualTimeScheduler.ts b/src/schedulers/VirtualTimeScheduler.ts index bdc2ad37cd..d62f9c28f4 100644 --- a/src/schedulers/VirtualTimeScheduler.ts +++ b/src/schedulers/VirtualTimeScheduler.ts @@ -9,6 +9,7 @@ export default class VirtualTimeScheduler implements Scheduler { index: number = 0; sorted: boolean = false; frame: number = 0; + maxFrames: number = 750; now() { return 0; @@ -16,11 +17,17 @@ export default class VirtualTimeScheduler implements Scheduler { flush() { const actions = this.actions; + const maxFrames = this.maxFrames; while (actions.length > 0) { let action = actions.shift(); this.frame = action.delay; - action.execute(); + if(this.frame <= maxFrames) { + action.execute(); + } else { + break; + } } + actions.length = 0; this.frame = 0; }