diff --git a/spec/operators/bufferTime-spec.ts b/spec/operators/bufferTime-spec.ts index feaa69c9f5..eae8628e18 100644 --- a/spec/operators/bufferTime-spec.ts +++ b/spec/operators/bufferTime-spec.ts @@ -18,7 +18,7 @@ describe('Observable.prototype.bufferTime', () => { z: [] }; - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(subs); @@ -34,11 +34,47 @@ describe('Observable.prototype.bufferTime', () => { z: [] }; - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); }); + it('should emit buffers at intervals or when the buffer is full', () => { + const e1 = hot('---a---b---c---d---e---f---g-----|'); + const subs = '^ !'; + const t = time( '----------|'); + const expected = '-------w-------x-------y---------(z|)'; + const values = { + w: ['a', 'b'], + x: ['c', 'd'], + y: ['e', 'f'], + z: ['g'] + }; + + const result = e1.bufferTime(t, null, 2, rxTestScheduler); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should emit buffers at intervals or when the buffer is full test 2', () => { + const e1 = hot('---a---b---c---d---e---f---g-----|'); + const subs = '^ !'; + const t = time( '----------|'); + const expected = '----------w--------x---------y---(z|)'; + const values = { + w: ['a', 'b'], + x: ['c', 'd', 'e'], + y: ['f', 'g'], + z: [] + }; + + const result = e1.bufferTime(t, null, 3, rxTestScheduler); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + it('should emit buffers that have been created at intervals and close after the specified delay', () => { const e1 = hot('---a---b---c----d----e----f----g----h----i----(k|)'); // --------------------*--------------------*---- start interval @@ -54,7 +90,28 @@ describe('Observable.prototype.bufferTime', () => { z: ['i', 'k'] }; - const result = e1.bufferTime(t, interval, rxTestScheduler); + const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler); + + expectObservable(result).toBe(expected, values); + }); + + it('should emit buffers that have been created at intervals and close after the specified delay ' + + 'or when the buffer is full', () => { + const e1 = hot('---a---b---c----d----e----f----g----h----i----(k|)'); + // --------------------*--------------------*---- start interval + // ---------------------| timespans + // ---------------------| + // -----| + const t = time( '---------------------|'); + const interval = time( '--------------------|'); + const expected = '----------------x-------------------y---------(z|)'; + const values = { + x: ['a', 'b', 'c', 'd'], + y: ['e', 'f', 'g', 'h'], + z: ['i', 'k'] + }; + + const result = e1.bufferTime(t, interval, 4, rxTestScheduler); expectObservable(result).toBe(expected, values); }); @@ -81,7 +138,7 @@ describe('Observable.prototype.bufferTime', () => { f: [] }; - const result = e1.bufferTime(t, interval, rxTestScheduler); + const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -107,7 +164,7 @@ describe('Observable.prototype.bufferTime', () => { e: [] }; - const result = e1.bufferTime(t, interval, rxTestScheduler); + const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); }); @@ -127,7 +184,7 @@ describe('Observable.prototype.bufferTime', () => { a: ['2', '3', '4'] }; - const result = e1.bufferTime(t, interval, rxTestScheduler); + const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result, unsub).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(subs); @@ -150,7 +207,7 @@ describe('Observable.prototype.bufferTime', () => { const result = e1 .mergeMap((x: any) => Observable.of(x)) - .bufferTime(t, interval, rxTestScheduler) + .bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler) .mergeMap((x: any) => Observable.of(x)); expectObservable(result, unsub).toBe(expected, values); @@ -164,7 +221,7 @@ describe('Observable.prototype.bufferTime', () => { const values = { b: [] }; const t = time('----------|'); - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -176,7 +233,7 @@ describe('Observable.prototype.bufferTime', () => { const t = time( '----------|'); const expected = '----------a---------a---------a---------a----'; - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result, unsub).toBe(expected, { a: [] }); }); @@ -186,7 +243,7 @@ describe('Observable.prototype.bufferTime', () => { const expected = '#'; const t = time('----------|'); - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, undefined, new Error('haha')); }); @@ -200,7 +257,7 @@ describe('Observable.prototype.bufferTime', () => { w: ['a', 'b'] }; - const result = e1.bufferTime(t, null, rxTestScheduler); + const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -222,7 +279,7 @@ describe('Observable.prototype.bufferTime', () => { y: ['e', 'f', 'g', 'h', 'i'] }; - const result = e1.bufferTime(t, interval, rxTestScheduler); + const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(e1.subscriptions).toBe(e1subs); diff --git a/src/operator/bufferTime.ts b/src/operator/bufferTime.ts index fed4bce35b..c56972b1f6 100644 --- a/src/operator/bufferTime.ts +++ b/src/operator/bufferTime.ts @@ -1,5 +1,6 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; import {Observable} from '../Observable'; import {Scheduler} from '../Scheduler'; import {Action} from '../scheduler/Action'; @@ -18,7 +19,9 @@ import {async} from '../scheduler/async'; * resets the buffer every `bufferTimeSpan` milliseconds. If * `bufferCreationInterval` is given, this operator opens the buffer every * `bufferCreationInterval` milliseconds and closes (emits and resets) the - * buffer every `bufferTimeSpan` milliseconds. + * buffer every `bufferTimeSpan` milliseconds. When the optional argument + * `maxBufferSize` is specified, the buffer will be closed either after + * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements. * * @example