From 119e0d45098e3e32956797328f0357f56bac6af2 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Wed, 15 Feb 2017 14:50:26 -0800 Subject: [PATCH 01/18] fix(timeout-spec): fix merge conflicts --- spec/operators/timeout-spec.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/spec/operators/timeout-spec.ts b/spec/operators/timeout-spec.ts index 74a8a9a184..8b616ec719 100644 --- a/spec/operators/timeout-spec.ts +++ b/spec/operators/timeout-spec.ts @@ -2,12 +2,6 @@ import { expect } from 'chai'; import * as Rx from '../../dist/cjs/Rx'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports -declare const { asDiagram }; -declare const hot: typeof marbleTestingSignature.hot; -declare const cold: typeof marbleTestingSignature.cold; -declare const expectObservable: typeof marbleTestingSignature.expectObservable; -declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; - declare const { asDiagram }; declare const rxTestScheduler: Rx.TestScheduler; declare const hot: typeof marbleTestingSignature.hot; From fc86e5e076f95dd7b9c914d2c3a3542dce1c6dd4 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Sun, 12 Feb 2017 21:20:10 -0800 Subject: [PATCH 02/18] fix(Subscription): fold ChildSubscription logic into Subscriber to prevent operators from leaking ChildSubscriptions. (#2360) The addition of ChildSubscription to fix #2244 accidentally introduced a different memory leak. Most operators that add and remove inner Subscriptions store the inner Subscriber instance, not the value returned by Subscription#add. When they try to remove the inner Subscription manually, nothing is removed, because the ChildSubscription wrapper instance is the one added to the subscriptions list. Fixes #2355 --- spec/operators/observeOn-spec.ts | 10 ++-- spec/operators/switch-spec.ts | 42 +++++++++++++- src/Subscriber.ts | 44 +++++++++------ src/Subscription.ts | 94 +++++++++++++++++++------------- src/operator/catch.ts | 4 +- src/operator/observeOn.ts | 19 +++---- src/operator/repeat.ts | 5 +- src/operator/repeatWhen.ts | 32 +++++------ src/operator/retry.ts | 5 +- src/operator/retryWhen.ts | 8 +-- 10 files changed, 157 insertions(+), 106 deletions(-) diff --git a/spec/operators/observeOn-spec.ts b/spec/operators/observeOn-spec.ts index 039f04c486..22e295c7b8 100644 --- a/spec/operators/observeOn-spec.ts +++ b/spec/operators/observeOn-spec.ts @@ -104,21 +104,21 @@ describe('Observable.prototype.observeOn', () => { .observeOn(Rx.Scheduler.asap) .subscribe( x => { - const observeOnSubscriber = subscription._subscriptions[0]._innerSub; + const observeOnSubscriber = subscription._subscriptions[0]; expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, and one for the notification - expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind) + expect(observeOnSubscriber._subscriptions[1].state.notification.kind) .to.equal('N'); - expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.value) + expect(observeOnSubscriber._subscriptions[1].state.notification.value) .to.equal(x); results.push(x); }, err => done(err), () => { // now that the last nexted value is done, there should only be a complete notification scheduled - const observeOnSubscriber = subscription._subscriptions[0]._innerSub; + const observeOnSubscriber = subscription._subscriptions[0]; expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, one for the complete notification // only this completion notification should remain. - expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind) + expect(observeOnSubscriber._subscriptions[1].state.notification.kind) .to.equal('C'); // After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this. expect(results).to.deep.equal([1, 2, 3]); diff --git a/spec/operators/switch-spec.ts b/spec/operators/switch-spec.ts index fd4135a2a0..1f0703ab6f 100644 --- a/spec/operators/switch-spec.ts +++ b/spec/operators/switch-spec.ts @@ -223,4 +223,44 @@ describe('Observable.prototype.switch', () => { expect(completed).to.be.true; }); -}); \ No newline at end of file + + it('should not leak when child completes before each switch (prevent memory leaks #2355)', () => { + let iStream: Rx.Subject; + const oStreamControl = new Rx.Subject(); + const oStream = oStreamControl.map(() => { + return (iStream = new Rx.Subject()); + }); + const switcher = oStream.switch(); + const result = []; + let sub = switcher.subscribe((x: number) => result.push(x)); + + [0, 1, 2, 3, 4].forEach((n) => { + oStreamControl.next(n); // creates inner + iStream.complete(); + }); + // Expect one child of switch(): The oStream + expect( + (sub)._subscriptions[0]._subscriptions.length + ).to.equal(1); + sub.unsubscribe(); + }); + + it('should not leak if we switch before child completes (prevent memory leaks #2355)', () => { + const oStreamControl = new Rx.Subject(); + const oStream = oStreamControl.map(() => { + return (new Rx.Subject()); + }); + const switcher = oStream.switch(); + const result = []; + let sub = switcher.subscribe((x: number) => result.push(x)); + + [0, 1, 2, 3, 4].forEach((n) => { + oStreamControl.next(n); // creates inner + }); + // Expect two children of switch(): The oStream and the first inner + expect( + (sub)._subscriptions[0]._subscriptions.length + ).to.equal(2); + sub.unsubscribe(); + }); +}); diff --git a/src/Subscriber.ts b/src/Subscriber.ts index 88c900fdf9..50f5e541e6 100644 --- a/src/Subscriber.ts +++ b/src/Subscriber.ts @@ -144,6 +144,18 @@ export class Subscriber extends Subscription implements Observer { this.destination.complete(); this.unsubscribe(); } + + protected _unsubscribeAndRecycle(): Subscriber { + const { _parent, _parents } = this; + this._parent = null; + this._parents = null; + this.unsubscribe(); + this.closed = false; + this.isStopped = false; + this._parent = _parent; + this._parents = _parents; + return this; + } } /** @@ -155,7 +167,7 @@ class SafeSubscriber extends Subscriber { private _context: any; - constructor(private _parent: Subscriber, + constructor(private _parentSubscriber: Subscriber, observerOrNext?: PartialObserver | ((value: T) => void), error?: (e?: any) => void, complete?: () => void) { @@ -185,10 +197,10 @@ class SafeSubscriber extends Subscriber { next(value?: T): void { if (!this.isStopped && this._next) { - const { _parent } = this; - if (!_parent.syncErrorThrowable) { + const { _parentSubscriber } = this; + if (!_parentSubscriber.syncErrorThrowable) { this.__tryOrUnsub(this._next, value); - } else if (this.__tryOrSetError(_parent, this._next, value)) { + } else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) { this.unsubscribe(); } } @@ -196,21 +208,21 @@ class SafeSubscriber extends Subscriber { error(err?: any): void { if (!this.isStopped) { - const { _parent } = this; + const { _parentSubscriber } = this; if (this._error) { - if (!_parent.syncErrorThrowable) { + if (!_parentSubscriber.syncErrorThrowable) { this.__tryOrUnsub(this._error, err); this.unsubscribe(); } else { - this.__tryOrSetError(_parent, this._error, err); + this.__tryOrSetError(_parentSubscriber, this._error, err); this.unsubscribe(); } - } else if (!_parent.syncErrorThrowable) { + } else if (!_parentSubscriber.syncErrorThrowable) { this.unsubscribe(); throw err; } else { - _parent.syncErrorValue = err; - _parent.syncErrorThrown = true; + _parentSubscriber.syncErrorValue = err; + _parentSubscriber.syncErrorThrown = true; this.unsubscribe(); } } @@ -218,13 +230,13 @@ class SafeSubscriber extends Subscriber { complete(): void { if (!this.isStopped) { - const { _parent } = this; + const { _parentSubscriber } = this; if (this._complete) { - if (!_parent.syncErrorThrowable) { + if (!_parentSubscriber.syncErrorThrowable) { this.__tryOrUnsub(this._complete); this.unsubscribe(); } else { - this.__tryOrSetError(_parent, this._complete); + this.__tryOrSetError(_parentSubscriber, this._complete); this.unsubscribe(); } } else { @@ -254,9 +266,9 @@ class SafeSubscriber extends Subscriber { } protected _unsubscribe(): void { - const { _parent } = this; + const { _parentSubscriber } = this; this._context = null; - this._parent = null; - _parent.unsubscribe(); + this._parentSubscriber = null; + _parentSubscriber.unsubscribe(); } } diff --git a/src/Subscription.ts b/src/Subscription.ts index aec99264ec..6dd7d3258b 100644 --- a/src/Subscription.ts +++ b/src/Subscription.ts @@ -40,7 +40,9 @@ export class Subscription implements ISubscription { */ public closed: boolean = false; - private _subscriptions: ISubscription[]; + protected _parent: Subscription = null; + protected _parents: Subscription[] = null; + private _subscriptions: ISubscription[] = null; /** * @param {function(): void} [unsubscribe] A function describing how to @@ -66,11 +68,26 @@ export class Subscription implements ISubscription { return; } - this.closed = true; - - const { _unsubscribe, _subscriptions } = ( this); + let { _parent, _parents, _unsubscribe, _subscriptions } = ( this); - ( this)._subscriptions = null; + this.closed = true; + this._parent = null; + this._parents = null; + // null out _subscriptions first so any child subscriptions that attempt + // to remove themselves from this subscription will noop + this._subscriptions = null; + + let index = -1; + let len = _parents ? _parents.length : 0; + + // if this._parent is null, then so is this._parents, and we + // don't have to remove ourselves from any parent subscriptions. + while (_parent) { + _parent.remove(this); + // if this._parents is null or index >= len, + // then _parent is set to null, and the loop exits + _parent = ++index < len && _parents[index] || null; + } if (isFunction(_unsubscribe)) { let trial = tryCatch(_unsubscribe).call(this); @@ -85,8 +102,8 @@ export class Subscription implements ISubscription { if (isArray(_subscriptions)) { - let index = -1; - const len = _subscriptions.length; + index = -1; + len = _subscriptions.length; while (++index < len) { const sub = _subscriptions[index]; @@ -138,27 +155,33 @@ export class Subscription implements ISubscription { return this; } - let sub = ( teardown); + let subscription = ( teardown); switch (typeof teardown) { case 'function': - sub = new Subscription(<(() => void) > teardown); + subscription = new Subscription(<(() => void) > teardown); case 'object': - if (sub.closed || typeof sub.unsubscribe !== 'function') { - return sub; + if (subscription.closed || typeof subscription.unsubscribe !== 'function') { + return subscription; } else if (this.closed) { - sub.unsubscribe(); - return sub; + subscription.unsubscribe(); + return subscription; + } else if (typeof subscription._addParent !== 'function' /* quack quack */) { + const tmp = subscription; + subscription = new Subscription(); + subscription._subscriptions = [tmp]; } break; default: throw new Error('unrecognized teardown ' + teardown + ' added to Subscription.'); } - const childSub = new ChildSubscription(sub, this); - this._subscriptions = this._subscriptions || []; - this._subscriptions.push(childSub); - return childSub; + const subscriptions = this._subscriptions || (this._subscriptions = []); + + subscriptions.push(subscription); + subscription._addParent(this); + + return subscription; } /** @@ -168,16 +191,7 @@ export class Subscription implements ISubscription { * @return {void} */ remove(subscription: Subscription): void { - - // HACK: This might be redundant because of the logic in `add()` - if (subscription == null || ( - subscription === this) || ( - subscription === Subscription.EMPTY)) { - return; - } - - const subscriptions = ( this)._subscriptions; - + const subscriptions = this._subscriptions; if (subscriptions) { const subscriptionIndex = subscriptions.indexOf(subscription); if (subscriptionIndex !== -1) { @@ -185,20 +199,24 @@ export class Subscription implements ISubscription { } } } -} - -export class ChildSubscription extends Subscription { - constructor(private _innerSub: ISubscription, private _parent: Subscription) { - super(); - } - _unsubscribe() { - const { _innerSub, _parent } = this; - _parent.remove(this); - _innerSub.unsubscribe(); + private _addParent(parent: Subscription) { + let { _parent, _parents } = this; + if (!_parent || _parent === parent) { + // If we don't have a parent, or the new parent is the same as the + // current parent, then set this._parent to the new parent. + this._parent = parent; + } else if (!_parents) { + // If there's already one parent, but not multiple, allocate an Array to + // store the rest of the parent Subscriptions. + this._parents = [parent]; + } else if (_parents.indexOf(parent) === -1) { + // Only add the new parent to the _parents list if it's not already there. + _parents.push(parent); + } } } function flattenUnsubscriptionErrors(errors: any[]) { return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []); -} \ No newline at end of file +} diff --git a/src/operator/catch.ts b/src/operator/catch.ts index 2fb91ffbd5..961ff4f953 100644 --- a/src/operator/catch.ts +++ b/src/operator/catch.ts @@ -107,9 +107,7 @@ class CatchSubscriber extends OuterSubscriber { super.error(err2); return; } - this.unsubscribe(); - this.closed = false; - this.isStopped = false; + this._unsubscribeAndRecycle(); this.add(subscribeToResult(this, result)); } } diff --git a/src/operator/observeOn.ts b/src/operator/observeOn.ts index e4650aaa8c..0b437bd639 100644 --- a/src/operator/observeOn.ts +++ b/src/operator/observeOn.ts @@ -4,7 +4,7 @@ import { Operator } from '../Operator'; import { PartialObserver } from '../Observer'; import { Subscriber } from '../Subscriber'; import { Notification } from '../Notification'; -import { TeardownLogic, Subscription } from '../Subscription'; +import { TeardownLogic } from '../Subscription'; import { Action } from '../scheduler/Action'; /** @@ -36,11 +36,9 @@ export class ObserveOnOperator implements Operator { */ export class ObserveOnSubscriber extends Subscriber { static dispatch(this: Action, arg: ObserveOnMessage) { - const { notification, destination, subscription } = arg; + const { notification, destination } = arg; notification.observe(destination); - if (subscription) { - subscription.unsubscribe(); - } + this.unsubscribe(); } constructor(destination: Subscriber, @@ -50,10 +48,11 @@ export class ObserveOnSubscriber extends Subscriber { } private scheduleMessage(notification: Notification): void { - const message = new ObserveOnMessage(notification, this.destination); - message.subscription = this.add( - this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message) - ); + this.add(this.scheduler.schedule( + ObserveOnSubscriber.dispatch, + this.delay, + new ObserveOnMessage(notification, this.destination) + )); } protected _next(value: T): void { @@ -70,8 +69,6 @@ export class ObserveOnSubscriber extends Subscriber { } export class ObserveOnMessage { - public subscription: Subscription; - constructor(public notification: Notification, public destination: PartialObserver) { } diff --git a/src/operator/repeat.ts b/src/operator/repeat.ts index b3b7fb8b82..1f0abb36ff 100644 --- a/src/operator/repeat.ts +++ b/src/operator/repeat.ts @@ -56,10 +56,7 @@ class RepeatSubscriber extends Subscriber { } else if (count > -1) { this.count = count - 1; } - this.unsubscribe(); - this.isStopped = false; - this.closed = false; - source.subscribe(this); + source.subscribe(this._unsubscribeAndRecycle()); } } } diff --git a/src/operator/repeatWhen.ts b/src/operator/repeatWhen.ts index 31d4e7e349..e70c316763 100644 --- a/src/operator/repeatWhen.ts +++ b/src/operator/repeatWhen.ts @@ -60,8 +60,8 @@ class RepeatWhenSubscriber extends OuterSubscriber { notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber): void { - this.source.subscribe(this); this.sourceIsBeingSubscribedTo = true; + this.source.subscribe(this); } notifyComplete(innerSub: InnerSubscriber): void { @@ -80,7 +80,7 @@ class RepeatWhenSubscriber extends OuterSubscriber { return super.complete(); } - this.temporarilyUnsubscribe(); + this._unsubscribeAndRecycle(); this.notifications.next(); } } @@ -98,29 +98,25 @@ class RepeatWhenSubscriber extends OuterSubscriber { this.retries = null; } - private subscribeToRetries() { - this.notifications = new Subject(); - const retries = tryCatch(this.notifier)(this.notifications); - if (retries === errorObject) { - return super.complete(); - } - this.retries = retries; - this.retriesSubscription = subscribeToResult(this, retries); - } - - private temporarilyUnsubscribe() { + protected _unsubscribeAndRecycle(): Subscriber { const { notifications, retries, retriesSubscription } = this; this.notifications = null; this.retries = null; this.retriesSubscription = null; - - this.unsubscribe(); - this.isStopped = false; - this.closed = false; - + super._unsubscribeAndRecycle(); this.notifications = notifications; this.retries = retries; this.retriesSubscription = retriesSubscription; + return this; } + private subscribeToRetries() { + this.notifications = new Subject(); + const retries = tryCatch(this.notifier)(this.notifications); + if (retries === errorObject) { + return super.complete(); + } + this.retries = retries; + this.retriesSubscription = subscribeToResult(this, retries); + } } diff --git a/src/operator/retry.ts b/src/operator/retry.ts index fe267df6ed..30acb31d0d 100644 --- a/src/operator/retry.ts +++ b/src/operator/retry.ts @@ -53,10 +53,7 @@ class RetrySubscriber extends Subscriber { } else if (count > -1) { this.count = count - 1; } - this.unsubscribe(); - this.isStopped = false; - this.closed = false; - source.subscribe(this); + source.subscribe(this._unsubscribeAndRecycle()); } } } diff --git a/src/operator/retryWhen.ts b/src/operator/retryWhen.ts index 21633648d7..2f7c1a6db3 100644 --- a/src/operator/retryWhen.ts +++ b/src/operator/retryWhen.ts @@ -76,8 +76,7 @@ class RetryWhenSubscriber extends OuterSubscriber { this.retriesSubscription = null; } - this.unsubscribe(); - this.closed = false; + this._unsubscribeAndRecycle(); this.errors = errors; this.retries = retries; @@ -103,15 +102,12 @@ class RetryWhenSubscriber extends OuterSubscriber { notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber): void { - const { errors, retries, retriesSubscription } = this; this.errors = null; this.retries = null; this.retriesSubscription = null; - this.unsubscribe(); - this.isStopped = false; - this.closed = false; + this._unsubscribeAndRecycle(); this.errors = errors; this.retries = retries; From 5a2901d12bfba2b2cf36872fe8849b050a619a02 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Sun, 12 Feb 2017 21:21:58 -0800 Subject: [PATCH 03/18] chore(publish): 5.1.1 --- CHANGELOG.md | 12 ++++++++++++ package.json | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89594a89da..d718512a89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ + +## [5.1.1](https://github.com/ReactiveX/RxJS/compare/5.1.0...v5.1.1) (2017-02-13) + + +### Bug Fixes + +* **bindCallback:** input function context can now be properly set via output function ([#2319](https://github.com/ReactiveX/RxJS/issues/2319)) ([cb91c76](https://github.com/ReactiveX/RxJS/commit/cb91c76)) +* **bindNodeCallback:** input function context can now be properly set via output function ([#2320](https://github.com/ReactiveX/RxJS/issues/2320)) ([3ec315d](https://github.com/ReactiveX/RxJS/commit/3ec315d)) +* **Subscription:** fold ChildSubscription logic into Subscriber to prevent operators from leaking ChildSubscriptions. ([#2360](https://github.com/ReactiveX/RxJS/issues/2360)) ([22e4c17](https://github.com/ReactiveX/RxJS/commit/22e4c17)), closes [#2244](https://github.com/ReactiveX/RxJS/issues/2244) [#2355](https://github.com/ReactiveX/RxJS/issues/2355) + + + # [5.1.0](https://github.com/ReactiveX/RxJS/compare/5.0.3...v5.1.0) (2017-02-01) diff --git a/package.json b/package.json index 6aaf4592a2..3b683e3641 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@reactivex/rxjs", - "version": "5.1.0", + "version": "5.1.1", "description": "Reactive Extensions for modern JavaScript", "main": "index.js", "config": { From ad277116a8f3c4c9ec77a2a7f454d67c0733a8b3 Mon Sep 17 00:00:00 2001 From: Tim Ruffles Date: Mon, 13 Feb 2017 19:07:11 +0000 Subject: [PATCH 04/18] Ignore coverage It's 5.5mb that people installing this don't need :) --- .npmignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.npmignore b/.npmignore index 8819a150e6..70563f585b 100644 --- a/.npmignore +++ b/.npmignore @@ -1,4 +1,5 @@ perf spec .git -yarn.lock \ No newline at end of file +yarn.lock +coverage From 24de73465cc52571d0ebf764a8c46ff3303b630d Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Sun, 12 Feb 2017 20:40:52 +0100 Subject: [PATCH 05/18] fix(subscribeToResult): accept array-like as result Accept array-like as a result to subscribe, so that Observable.from and operators using subscribeToResult have identical behaviour. --- spec/util/subscribeToResult-spec.ts | 11 +++++++++++ src/observable/FromObservable.ts | 3 +-- src/util/isArrayLike.ts | 1 + src/util/subscribeToResult.ts | 4 ++-- 4 files changed, 15 insertions(+), 4 deletions(-) create mode 100644 src/util/isArrayLike.ts diff --git a/spec/util/subscribeToResult-spec.ts b/spec/util/subscribeToResult-spec.ts index 6101060b6c..57b4b259aa 100644 --- a/spec/util/subscribeToResult-spec.ts +++ b/spec/util/subscribeToResult-spec.ts @@ -58,6 +58,17 @@ describe('subscribeToResult', () => { expect(expected).to.be.deep.equal(result); }); + it('should subscribe to an array-like and emit synchronously', () => { + const result = {0: 0, 1: 1, 2: 2, length: 3}; + const expected = []; + + const subscriber = new OuterSubscriber(x => expected.push(x)); + + subscribeToResult(subscriber, result); + + expect(expected).to.be.deep.equal([0, 1, 2]); + }); + it('should subscribe to a promise', (done: MochaDone) => { const result = Promise.resolve(42); diff --git a/src/observable/FromObservable.ts b/src/observable/FromObservable.ts index 03d9306639..5e9bf48953 100644 --- a/src/observable/FromObservable.ts +++ b/src/observable/FromObservable.ts @@ -1,4 +1,5 @@ import { isArray } from '../util/isArray'; +import { isArrayLike } from '../util/isArrayLike'; import { isPromise } from '../util/isPromise'; import { PromiseObservable } from './PromiseObservable'; import { IteratorObservable } from'./IteratorObservable'; @@ -12,8 +13,6 @@ import { Subscriber } from '../Subscriber'; import { ObserveOnSubscriber } from '../operator/observeOn'; import { $$observable } from '../symbol/observable'; -const isArrayLike = ((x: any): x is ArrayLike => x && typeof x.length === 'number'); - /** * We need this JSDoc comment for affecting ESDoc. * @extends {Ignored} diff --git a/src/util/isArrayLike.ts b/src/util/isArrayLike.ts new file mode 100644 index 0000000000..15da7a50f4 --- /dev/null +++ b/src/util/isArrayLike.ts @@ -0,0 +1 @@ +export const isArrayLike = ((x: any): x is ArrayLike => x && typeof x.length === 'number'); \ No newline at end of file diff --git a/src/util/subscribeToResult.ts b/src/util/subscribeToResult.ts index 3c603f1d80..29af49afa5 100644 --- a/src/util/subscribeToResult.ts +++ b/src/util/subscribeToResult.ts @@ -1,5 +1,5 @@ import { root } from './root'; -import { isArray } from './isArray'; +import { isArrayLike } from './isArrayLike'; import { isPromise } from './isPromise'; import { isObject } from './isObject'; import { Subscriber } from '../Subscriber'; @@ -32,7 +32,7 @@ export function subscribeToResult(outerSubscriber: OuterSubscriber, } else { return result.subscribe(destination); } - } else if (isArray(result)) { + } else if (isArrayLike(result)) { for (let i = 0, len = result.length; i < len && !destination.closed; i++) { destination.next(result[i]); } From aef558e18ff4c90e178b33c74957f7a0ba21de8e Mon Sep 17 00:00:00 2001 From: Aniko Litvanyi Date: Fri, 10 Feb 2017 16:56:51 +0100 Subject: [PATCH 06/18] feat(AjaxObservable) : support 'PATCH' request type Add support of the 'PATCH' request type based on the already existing 'PUT' request. --- src/observable/dom/AjaxObservable.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/observable/dom/AjaxObservable.ts b/src/observable/dom/AjaxObservable.ts index b5e2e0365b..a8178461a4 100644 --- a/src/observable/dom/AjaxObservable.ts +++ b/src/observable/dom/AjaxObservable.ts @@ -66,6 +66,7 @@ export interface AjaxCreationMethod { get(url: string, headers?: Object): Observable; post(url: string, body?: any, headers?: Object): Observable; put(url: string, body?: any, headers?: Object): Observable; + patch(url: string, body?: any, headers?: Object): Observable; delete(url: string, headers?: Object): Observable; getJSON(url: string, headers?: Object): Observable; } @@ -86,6 +87,10 @@ export function ajaxPut(url: string, body?: any, headers?: Object): Observable({ method: 'PUT', url, body, headers }); }; +export function ajaxPatch(url: string, body?: any, headers?: Object): Observable { + return new AjaxObservable({ method: 'PATCH', url, body, headers }); +}; + export function ajaxGetJSON(url: string, headers?: Object): Observable { return new AjaxObservable({ method: 'GET', url, responseType: 'json', headers }) .lift(new MapOperator((x: AjaxResponse, index: number): T => x.response, null)); @@ -132,6 +137,7 @@ export class AjaxObservable extends Observable { create.post = ajaxPost; create.delete = ajaxDelete; create.put = ajaxPut; + create.patch = ajaxPatch; create.getJSON = ajaxGetJSON; return create; From 39f4009230ea72db4fb9b0d2fcce05d2562d6a38 Mon Sep 17 00:00:00 2001 From: Jay Phelps Date: Mon, 13 Feb 2017 15:41:36 -0800 Subject: [PATCH 07/18] chore(ajax.patch): Adds test for ajax.patch --- spec/observables/dom/ajax-spec.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/spec/observables/dom/ajax-spec.ts b/spec/observables/dom/ajax-spec.ts index 8ef9b8e668..854380f2bc 100644 --- a/spec/observables/dom/ajax-spec.ts +++ b/spec/observables/dom/ajax-spec.ts @@ -853,6 +853,21 @@ describe('Observable.ajax', () => { delete root.XMLHttpRequest.prototype.onerror; delete root.XMLHttpRequest.prototype.upload; }); + + describe('ajax.patch', () => { + it('should create an AjaxObservable with correct options', () => { + const body = { foo: 'bar' }; + const headers = { first: 'first' }; + // returns Observable, not AjaxObservable, so needs a cast + const { request } = Rx.Observable + .ajax.patch('/flibbertyJibbet', body, headers); + + expect(request.method).to.equal('PATCH'); + expect(request.url).to.equal('/flibbertyJibbet'); + expect(request.body).to.equal(body); + expect(request.headers).to.equal(headers); + }); + }); }); class MockXMLHttpRequest { From f38a27ea8efac2f22227c4ab6e9fe07404911973 Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Sun, 12 Feb 2017 17:45:25 +0100 Subject: [PATCH 08/18] fix(merge): return Observable when called with single lowerCaseO Return Observable when merge is called with single lower case observable, so that merge would always return Observable instance. --- spec/helpers/test-helper.ts | 3 +-- spec/observables/merge-spec.ts | 31 +++++++++++++++++++++++++++++++ src/operator/merge.ts | 2 +- 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/spec/helpers/test-helper.ts b/spec/helpers/test-helper.ts index 5f9e2c19c3..2acd0f40d9 100644 --- a/spec/helpers/test-helper.ts +++ b/spec/helpers/test-helper.ts @@ -8,11 +8,10 @@ import {$$iterator} from '../../dist/cjs/symbol/iterator'; import $$symbolObservable from 'symbol-observable'; export function lowerCaseO(...args): Rx.Observable { - const values = [].slice.apply(arguments); const o = { subscribe: function (observer) { - values.forEach(function (v) { + args.forEach(function (v) { observer.next(v); }); observer.complete(); diff --git a/spec/observables/merge-spec.ts b/spec/observables/merge-spec.ts index 552144abe8..c49e61f811 100644 --- a/spec/observables/merge-spec.ts +++ b/spec/observables/merge-spec.ts @@ -1,5 +1,6 @@ import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; +import {lowerCaseO} from '../helpers/test-helper'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports declare const hot: typeof marbleTestingSignature.hot; @@ -210,6 +211,36 @@ describe('Observable.merge(...observables)', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should merge single lowerCaseO into RxJS Observable', () => { + const e1 = lowerCaseO('a', 'b', 'c'); + + const result = Observable.merge(e1); + + expect(result).to.be.instanceof(Observable); + expectObservable(result).toBe('(abc|)'); + }); + + it('should merge two lowerCaseO into RxJS Observable', () => { + const e1 = lowerCaseO('a', 'b', 'c'); + const e2 = lowerCaseO('d', 'e', 'f'); + + const result = Observable.merge(e1, e2); + + expect(result).to.be.instanceof(Observable); + expectObservable(result).toBe('(abcdef|)'); + }); +}); + +describe('Observable.merge(...observables, Scheduler)', () => { + it('should merge single lowerCaseO into RxJS Observable', () => { + const e1 = lowerCaseO('a', 'b', 'c'); + + const result = Observable.merge(e1, rxTestScheduler); + + expect(result).to.be.instanceof(Observable); + expectObservable(result).toBe('(abc|)'); + }); }); describe('Observable.merge(...observables, Scheduler, number)', () => { diff --git a/src/operator/merge.ts b/src/operator/merge.ts index 1a34d10db4..1500cb4f19 100644 --- a/src/operator/merge.ts +++ b/src/operator/merge.ts @@ -160,7 +160,7 @@ export function mergeStatic(...observables: Array | I concurrent = observables.pop(); } - if (scheduler === null && observables.length === 1) { + if (scheduler === null && observables.length === 1 && observables[0] instanceof Observable) { return >observables[0]; } From db7275625d70794990ea5fd2baa1d8412218f6ad Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Fri, 10 Feb 2017 17:57:15 +0100 Subject: [PATCH 09/18] feat(webSocket): Add binaryType to config object Add binaryType to config object, so that it is possible to set that parameter on underlying socket before any data emits happen. Closes #2353 --- spec/observables/dom/webSocket-spec.ts | 15 +++++++++++++++ src/observable/dom/WebSocketSubject.ts | 5 +++++ 2 files changed, 20 insertions(+) diff --git a/spec/observables/dom/webSocket-spec.ts b/spec/observables/dom/webSocket-spec.ts index 975f27e071..548f2cc961 100644 --- a/spec/observables/dom/webSocket-spec.ts +++ b/spec/observables/dom/webSocket-spec.ts @@ -261,6 +261,20 @@ describe('Observable.webSocket', () => { subject.unsubscribe(); }); + it('should take a binaryType and set it properly on the web socket', () => { + const subject = Observable.webSocket({ + url: 'ws://mysocket', + binaryType: 'blob' + }); + + subject.subscribe(); + + const socket = MockWebSocket.lastSocket; + expect(socket.binaryType).to.equal('blob'); + + subject.unsubscribe(); + }); + it('should take a resultSelector', () => { const results = []; @@ -632,6 +646,7 @@ class MockWebSocket { readyState: number = 0; closeCode: any; closeReason: any; + binaryType?: string; constructor(public url: string, public protocol: string) { MockWebSocket.sockets.push(this); diff --git a/src/observable/dom/WebSocketSubject.ts b/src/observable/dom/WebSocketSubject.ts index 4f92a27f14..f5707d37ba 100644 --- a/src/observable/dom/WebSocketSubject.ts +++ b/src/observable/dom/WebSocketSubject.ts @@ -18,6 +18,7 @@ export interface WebSocketSubjectConfig { closeObserver?: NextObserver; closingObserver?: NextObserver; WebSocketCtor?: { new(url: string, protocol?: string|Array): WebSocket }; + binaryType?: 'blob' | 'arraybuffer'; } /** @@ -34,6 +35,7 @@ export class WebSocketSubject extends AnonymousSubject { closeObserver: NextObserver; closingObserver: NextObserver; WebSocketCtor: { new(url: string, protocol?: string|Array): WebSocket }; + binaryType?: 'blob' | 'arraybuffer'; private _output: Subject; @@ -159,6 +161,9 @@ export class WebSocketSubject extends AnonymousSubject { new WebSocketCtor(this.url, this.protocol) : new WebSocketCtor(this.url); this.socket = socket; + if (this.binaryType) { + this.socket.binaryType = this.binaryType; + } } catch (e) { observer.error(e); return; From ad7dd69d6356cb1fb18c906ba138b5cfc9e939f9 Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Fri, 10 Feb 2017 17:22:42 +0100 Subject: [PATCH 10/18] fix(forkJoin): add type signature for single observable with selector Add type signature for using forkJoin with single observable as first parameter and selector function as second parameter, so that typings would not prevent usage which is permitted and properly handled by operator. Closes #2347 --- spec/observables/forkJoin-spec.ts | 7 ++++++- src/observable/ForkJoinObservable.ts | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/spec/observables/forkJoin-spec.ts b/spec/observables/forkJoin-spec.ts index 06f9f2a92e..b6b5203785 100644 --- a/spec/observables/forkJoin-spec.ts +++ b/spec/observables/forkJoin-spec.ts @@ -1,7 +1,12 @@ import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; -declare const {hot, expectObservable, expectSubscriptions, type}; import {lowerCaseO} from '../helpers/test-helper'; +import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports + +declare const {type}; +declare const hot: typeof marbleTestingSignature.hot; +declare const expectObservable: typeof marbleTestingSignature.expectObservable; +declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; const Observable = Rx.Observable; diff --git a/src/observable/ForkJoinObservable.ts b/src/observable/ForkJoinObservable.ts index 062dc6e6d1..6fa17aef3b 100644 --- a/src/observable/ForkJoinObservable.ts +++ b/src/observable/ForkJoinObservable.ts @@ -25,6 +25,7 @@ export class ForkJoinObservable extends Observable { static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, v4: SubscribableOrPromise): Observable<[T, T2, T3, T4]>; static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, v4: SubscribableOrPromise, v5: SubscribableOrPromise): Observable<[T, T2, T3, T4, T5]>; static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, v4: SubscribableOrPromise, v5: SubscribableOrPromise, v6: SubscribableOrPromise): Observable<[T, T2, T3, T4, T5, T6]>; + static create(v1: SubscribableOrPromise, project: (v1: T) => R): Observable; static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, project: (v1: T, v2: T2) => R): Observable; static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, project: (v1: T, v2: T2, v3: T3) => R): Observable; static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, v4: SubscribableOrPromise, project: (v1: T, v2: T2, v3: T3, v4: T4) => R): Observable; From 2b66fc4a47293296b3335defc048ca93990e25cc Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Wed, 8 Feb 2017 22:03:38 -0800 Subject: [PATCH 11/18] chore(danger): update dangerfile to validate commit message --- dangerfile.js | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/dangerfile.js b/dangerfile.js index 499a447201..357ba64cbb 100644 --- a/dangerfile.js +++ b/dangerfile.js @@ -1,6 +1,7 @@ var fs = require('fs'); var path = require('path'); var _ = require('lodash'); +var validateMessage = require('validate-commit-msg'); //simple regex matcher to detect usage of helper function and its type signature var hotMatch = /\bhot\(/gi; @@ -53,4 +54,15 @@ var testFilesMissingTypes = modifiedSpecFiles.reduce(function (acc, value) { if (testFilesMissingTypes.length > 0) { fail('missing type definition import in tests (' + testFilesMissingTypes + ') (' + ++errorCount + ')'); markdown('> (' + errorCount + ') : It seems updated test cases uses test scheduler interface `hot`, `cold` but miss to import type signature for those.'); +} + +//validate commit message in PR if it conforms conventional change log, notify if it doesn't. +var messageConventionValid = danger.git.commits.reduce(function (acc, value) { + var valid = validateMessage(value.message); + return valid && acc; +}, true); + +if (!messageConventionValid) { + fail('commit message does not follows conventional change log (' + ++errorCount + ')'); + markdown('> (' + errorCount + ') : RxJS uses conventional change log to generate changelog automatically. It seems some of commit messages are not following those, please check [contributing guideline](https://github.com/ReactiveX/rxjs/blob/master/CONTRIBUTING.md#commit-message-format) and update commit messages.'); } \ No newline at end of file From 0bbfdf29c839e2ab66b6eddf31021373236d92c8 Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Sun, 5 Feb 2017 13:49:04 +0100 Subject: [PATCH 12/18] fix(bindNodeCallback): emit undefined when callback has no success arguments Emit undefined insteady of empty array by resulting Observable, when callback function is called without success parameters. Closes #2254 --- spec/observables/bindNodeCallback-spec.ts | 38 +++++++++++++++++++ src/observable/BoundNodeCallbackObservable.ts | 4 +- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/spec/observables/bindNodeCallback-spec.ts b/spec/observables/bindNodeCallback-spec.ts index 15a9e53d32..e6e50de25a 100644 --- a/spec/observables/bindNodeCallback-spec.ts +++ b/spec/observables/bindNodeCallback-spec.ts @@ -8,6 +8,24 @@ const Observable = Rx.Observable; /** @test {bindNodeCallback} */ describe('Observable.bindNodeCallback', () => { describe('when not scheduled', () => { + it('should emit undefined when callback is called without success arguments', () => { + function callback(cb) { + cb(null); + } + + const boundCallback = Observable.bindNodeCallback(callback); + const results = []; + + boundCallback() + .subscribe((x: any) => { + results.push(typeof x); + }, null, () => { + results.push('done'); + }); + + expect(results).to.deep.equal(['undefined', 'done']); + }); + it('should emit one value from a callback', () => { function callback(datum, cb) { cb(null, datum); @@ -126,6 +144,26 @@ describe('Observable.bindNodeCallback', () => { }); describe('when scheduled', () => { + it('should emit undefined when callback is called without success arguments', () => { + function callback(cb) { + cb(null); + } + + const boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); + const results = []; + + boundCallback() + .subscribe((x: any) => { + results.push(typeof x); + }, null, () => { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).to.deep.equal(['undefined', 'done']); + }); + it('should emit one value from a callback', () => { function callback(datum, cb) { cb(null, datum); diff --git a/src/observable/BoundNodeCallbackObservable.ts b/src/observable/BoundNodeCallbackObservable.ts index b3b3877df9..8eac5e1533 100644 --- a/src/observable/BoundNodeCallbackObservable.ts +++ b/src/observable/BoundNodeCallbackObservable.ts @@ -112,7 +112,7 @@ export class BoundNodeCallbackObservable extends Observable { subject.complete(); } } else { - subject.next(innerArgs.length === 1 ? innerArgs[0] : innerArgs); + subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs); subject.complete(); } }; @@ -162,7 +162,7 @@ function dispatch(this: Action>, state: DispatchState) { self.add(scheduler.schedule(dispatchNext, 0, { value: result, subject })); } } else { - const value = innerArgs.length === 1 ? innerArgs[0] : innerArgs; + const value = innerArgs.length <= 1 ? innerArgs[0] : innerArgs; self.add(scheduler.schedule(dispatchNext, 0, { value, subject })); } }; From dc568da22b33bd045512a7ba20a7e5bf74e058a8 Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Sun, 5 Feb 2017 13:26:37 +0100 Subject: [PATCH 13/18] fix(bindCallback): emit undefined when callback is without arguments In resulting Observable emit undefined when callback is called without parameters, instead of emitting empty array. --- spec/observables/bindCallback-spec.ts | 36 +++++++++++++++++++++++ src/observable/BoundCallbackObservable.ts | 5 ++-- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/spec/observables/bindCallback-spec.ts b/spec/observables/bindCallback-spec.ts index cae44820a0..59c92ab1e0 100644 --- a/spec/observables/bindCallback-spec.ts +++ b/spec/observables/bindCallback-spec.ts @@ -8,6 +8,23 @@ const Observable = Rx.Observable; /** @test {bindCallback} */ describe('Observable.bindCallback', () => { describe('when not scheduled', () => { + it('should emit undefined from a callback without arguments', () => { + function callback(cb) { + cb(); + } + const boundCallback = Observable.bindCallback(callback); + const results = []; + + boundCallback() + .subscribe((x: any) => { + results.push(typeof x); + }, null, () => { + results.push('done'); + }); + + expect(results).to.deep.equal(['undefined', 'done']); + }); + it('should emit one value from a callback', () => { function callback(datum, cb) { cb(datum); @@ -104,6 +121,25 @@ describe('Observable.bindCallback', () => { }); describe('when scheduled', () => { + it('should emit undefined from a callback without arguments', () => { + function callback(cb) { + cb(); + } + const boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); + const results = []; + + boundCallback() + .subscribe((x: any) => { + results.push(typeof x); + }, null, () => { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).to.deep.equal(['undefined', 'done']); + }); + it('should emit one value from a callback', () => { function callback(datum, cb) { cb(datum); diff --git a/src/observable/BoundCallbackObservable.ts b/src/observable/BoundCallbackObservable.ts index bdcc87a178..dcc95cac5e 100644 --- a/src/observable/BoundCallbackObservable.ts +++ b/src/observable/BoundCallbackObservable.ts @@ -15,6 +15,7 @@ export class BoundCallbackObservable extends Observable { subject: AsyncSubject; /* tslint:disable:max-line-length */ + static create(callbackFunc: (callback: () => any) => any, selector?: void, scheduler?: IScheduler): () => Observable; static create(callbackFunc: (callback: (result: R) => any) => any, selector?: void, scheduler?: IScheduler): () => Observable; static create(callbackFunc: (v1: T, callback: (result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T) => Observable; static create(callbackFunc: (v1: T, v2: T2, callback: (result: R) => any) => any, selector?: void, scheduler?: IScheduler): (v1: T, v2: T2) => Observable; @@ -119,7 +120,7 @@ export class BoundCallbackObservable extends Observable { subject.complete(); } } else { - subject.next(innerArgs.length === 1 ? innerArgs[0] : innerArgs); + subject.next(innerArgs.length <= 1 ? innerArgs[0] : innerArgs); subject.complete(); } }; @@ -157,7 +158,7 @@ export class BoundCallbackObservable extends Observable { self.add(scheduler.schedule(dispatchNext, 0, { value: result, subject })); } } else { - const value = innerArgs.length === 1 ? innerArgs[0] : innerArgs; + const value = innerArgs.length <= 1 ? innerArgs[0] : innerArgs; self.add(scheduler.schedule(dispatchNext, 0, { value, subject })); } }; From d59d7332afba4ac65e58655d2bea4e8b6de3d8ab Mon Sep 17 00:00:00 2001 From: Georgios Kalpakas Date: Tue, 24 Jan 2017 00:12:29 +0200 Subject: [PATCH 14/18] chore(*): correctly scope disabled `max-line-length` tslint rule The max line length is set to 150 in 'tslint.json'. In specific regions, it is desirable to allow longer lines, so these regions should be wrapped in comments like the following: ```js // Max line length enforced here. /* tslint:disable:max-line-length */ // Max line length NOT enforced here. /* tslint:enable:max-line-length */ <-- CORRECT // Max line length enforced here. ``` In many cases, the re-enabling comment incorrectly included `disable` instead of `enable` (as shown below), which essentially keeps the `max-line-length` **disabled** for the rest of the file: ```js // Max line length enforced here. /* tslint:disable:max-line-length */ // Max line length NOT enforced here. /* tslint:disable:max-line-length */ <-- INCORRECT // Max line length NOT enforced here. ``` This commit fixes these comments, so the `max-line-length` rule is properly enforced in regions that don't need longer lines. --- src/operator/bufferTime.ts | 2 +- src/operator/combineLatest.ts | 2 +- src/operator/concat.ts | 2 +- src/operator/concatAll.ts | 2 +- src/operator/concatMap.ts | 2 +- src/operator/concatMapTo.ts | 2 +- src/operator/defaultIfEmpty.ts | 2 +- src/operator/distinctUntilChanged.ts | 2 +- src/operator/distinctUntilKeyChanged.ts | 2 +- src/operator/do.ts | 2 +- src/operator/exhaustMap.ts | 2 +- src/operator/expand.ts | 2 +- src/operator/filter.ts | 2 +- src/operator/find.ts | 2 +- src/operator/groupBy.ts | 2 +- src/operator/last.ts | 2 +- src/operator/merge.ts | 2 +- src/operator/mergeMap.ts | 2 +- src/operator/mergeMapTo.ts | 2 +- src/operator/multicast.ts | 2 +- src/operator/onErrorResumeNext.ts | 2 +- src/operator/publish.ts | 2 +- src/operator/race.ts | 2 +- src/operator/reduce.ts | 2 +- src/operator/scan.ts | 5 +++-- src/operator/startWith.ts | 2 +- src/operator/switchMap.ts | 2 +- src/operator/switchMapTo.ts | 2 +- src/operator/timeoutWith.ts | 2 +- src/operator/toPromise.ts | 2 +- src/operator/withLatestFrom.ts | 2 +- src/operator/zip.ts | 2 +- 32 files changed, 34 insertions(+), 33 deletions(-) diff --git a/src/operator/bufferTime.ts b/src/operator/bufferTime.ts index 3488290376..d15b8fc674 100644 --- a/src/operator/bufferTime.ts +++ b/src/operator/bufferTime.ts @@ -11,7 +11,7 @@ import { isScheduler } from '../util/isScheduler'; export function bufferTime(this: Observable, bufferTimeSpan: number, scheduler?: IScheduler): Observable; export function bufferTime(this: Observable, bufferTimeSpan: number, bufferCreationInterval: number, scheduler?: IScheduler): Observable; export function bufferTime(this: Observable, bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: IScheduler): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Buffers the source Observable values for a specific time period. diff --git a/src/operator/combineLatest.ts b/src/operator/combineLatest.ts index d06d1dfa24..317d1cba3b 100644 --- a/src/operator/combineLatest.ts +++ b/src/operator/combineLatest.ts @@ -23,7 +23,7 @@ export function combineLatest(this: Observable, v2: Ob export function combineLatest(this: Observable, ...observables: Array | ((...values: Array) => R)>): Observable; export function combineLatest(this: Observable, array: ObservableInput[]): Observable>; export function combineLatest(this: Observable, array: ObservableInput[], project: (v1: T, ...values: Array) => R): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Combines multiple Observables to create an Observable whose values are diff --git a/src/operator/concat.ts b/src/operator/concat.ts index 1e1a32313f..42d348402c 100644 --- a/src/operator/concat.ts +++ b/src/operator/concat.ts @@ -13,7 +13,7 @@ export function concat(this: Observable, v2: ObservableInp export function concat(this: Observable, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, scheduler?: IScheduler): Observable; export function concat(this: Observable, ...observables: Array | IScheduler>): Observable; export function concat(this: Observable, ...observables: Array | IScheduler>): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Creates an output Observable which sequentially emits all values from every diff --git a/src/operator/concatAll.ts b/src/operator/concatAll.ts index e8228a94b3..e81ed990f7 100644 --- a/src/operator/concatAll.ts +++ b/src/operator/concatAll.ts @@ -5,7 +5,7 @@ import { MergeAllOperator } from './mergeAll'; /* tslint:disable:max-line-length */ export function concatAll(this: Observable): T; export function concatAll(this: Observable): Subscribable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Converts a higher-order Observable into a first-order Observable by diff --git a/src/operator/concatMap.ts b/src/operator/concatMap.ts index 1fdb1807d0..fa1391765b 100644 --- a/src/operator/concatMap.ts +++ b/src/operator/concatMap.ts @@ -4,7 +4,7 @@ import { Observable, ObservableInput } from '../Observable'; /* tslint:disable:max-line-length */ export function concatMap(this: Observable, project: (value: T, index: number) => ObservableInput): Observable; export function concatMap(this: Observable, project: (value: T, index: number) => ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Projects each source value to an Observable which is merged in the output diff --git a/src/operator/concatMapTo.ts b/src/operator/concatMapTo.ts index e1b468c1f2..0d6ddd3a95 100644 --- a/src/operator/concatMapTo.ts +++ b/src/operator/concatMapTo.ts @@ -4,7 +4,7 @@ import { MergeMapToOperator } from './mergeMapTo'; /* tslint:disable:max-line-length */ export function concatMapTo(this: Observable, observable: ObservableInput): Observable; export function concatMapTo(this: Observable, observable: ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Projects each source value to the same Observable which is merged multiple diff --git a/src/operator/defaultIfEmpty.ts b/src/operator/defaultIfEmpty.ts index 7de18b7e13..cb33ff6b4e 100644 --- a/src/operator/defaultIfEmpty.ts +++ b/src/operator/defaultIfEmpty.ts @@ -5,7 +5,7 @@ import { Subscriber } from '../Subscriber'; /* tslint:disable:max-line-length */ export function defaultIfEmpty(this: Observable, defaultValue?: T): Observable; export function defaultIfEmpty(this: Observable, defaultValue?: R): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Emits a given value if the source Observable completes without emitting any diff --git a/src/operator/distinctUntilChanged.ts b/src/operator/distinctUntilChanged.ts index 06e73d6cb5..354cda140f 100644 --- a/src/operator/distinctUntilChanged.ts +++ b/src/operator/distinctUntilChanged.ts @@ -8,7 +8,7 @@ import { TeardownLogic } from '../Subscription'; /* tslint:disable:max-line-length */ export function distinctUntilChanged(this: Observable, compare?: (x: T, y: T) => boolean): Observable; export function distinctUntilChanged(this: Observable, compare: (x: K, y: K) => boolean, keySelector: (x: T) => K): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item. diff --git a/src/operator/distinctUntilKeyChanged.ts b/src/operator/distinctUntilKeyChanged.ts index 02ee938219..7447d75391 100644 --- a/src/operator/distinctUntilKeyChanged.ts +++ b/src/operator/distinctUntilKeyChanged.ts @@ -4,7 +4,7 @@ import { Observable } from '../Observable'; /* tslint:disable:max-line-length */ export function distinctUntilKeyChanged(this: Observable, key: string): Observable; export function distinctUntilKeyChanged(this: Observable, key: string, compare: (x: K, y: K) => boolean): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from the previous item, diff --git a/src/operator/do.ts b/src/operator/do.ts index a48acaeba9..0e76094b2d 100644 --- a/src/operator/do.ts +++ b/src/operator/do.ts @@ -7,7 +7,7 @@ import { TeardownLogic } from '../Subscription'; /* tslint:disable:max-line-length */ export function _do(this: Observable, next: (x: T) => void, error?: (e: any) => void, complete?: () => void): Observable; export function _do(this: Observable, observer: PartialObserver): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Perform a side effect for every emission on the source Observable, but return diff --git a/src/operator/exhaustMap.ts b/src/operator/exhaustMap.ts index 6f65db9ff6..7fca11d226 100644 --- a/src/operator/exhaustMap.ts +++ b/src/operator/exhaustMap.ts @@ -9,7 +9,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; /* tslint:disable:max-line-length */ export function exhaustMap(this: Observable, project: (value: T, index: number) => ObservableInput): Observable; export function exhaustMap(this: Observable, project: (value: T, index: number) => ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Projects each source value to an Observable which is merged in the output diff --git a/src/operator/expand.ts b/src/operator/expand.ts index 9e7f9f97ea..e4975e4cfe 100644 --- a/src/operator/expand.ts +++ b/src/operator/expand.ts @@ -12,7 +12,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; /* tslint:disable:max-line-length */ export function expand(this: Observable, project: (value: T, index: number) => Observable, concurrent?: number, scheduler?: IScheduler): Observable; export function expand(this: Observable, project: (value: T, index: number) => Observable, concurrent?: number, scheduler?: IScheduler): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Recursively projects each source value to an Observable which is merged in diff --git a/src/operator/filter.ts b/src/operator/filter.ts index 06314e5e7b..f0fba4c329 100644 --- a/src/operator/filter.ts +++ b/src/operator/filter.ts @@ -10,7 +10,7 @@ export function filter(this: Observable, export function filter(this: Observable, predicate: (value: T, index: number) => boolean, thisArg?: any): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Filter items emitted by the source Observable by only emitting those that diff --git a/src/operator/find.ts b/src/operator/find.ts index e8183a7db4..110c813958 100644 --- a/src/operator/find.ts +++ b/src/operator/find.ts @@ -9,7 +9,7 @@ export function find(this: Observable, export function find(this: Observable, predicate: (value: T, index: number) => boolean, thisArg?: any): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Emits only the first value emitted by the source Observable that meets some diff --git a/src/operator/groupBy.ts b/src/operator/groupBy.ts index e94517594b..a64a12f749 100644 --- a/src/operator/groupBy.ts +++ b/src/operator/groupBy.ts @@ -11,7 +11,7 @@ export function groupBy(this: Observable, keySelector: (value: T) => K) export function groupBy(this: Observable, keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable) => Observable): Observable>; export function groupBy(this: Observable, keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable) => Observable): Observable>; export function groupBy(this: Observable, keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable) => Observable, subjectSelector?: () => Subject): Observable>; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Groups the items emitted by an Observable according to a specified criterion, diff --git a/src/operator/last.ts b/src/operator/last.ts index 4658e4a8c1..ac4c21bc47 100644 --- a/src/operator/last.ts +++ b/src/operator/last.ts @@ -23,7 +23,7 @@ export function last(this: Observable, predicate: (value: T, index: number, source: Observable) => boolean, resultSelector: void, defaultValue?: T): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Returns an Observable that emits only the last item emitted by the source Observable. diff --git a/src/operator/merge.ts b/src/operator/merge.ts index 1500cb4f19..1345a3d54f 100644 --- a/src/operator/merge.ts +++ b/src/operator/merge.ts @@ -19,7 +19,7 @@ export function merge(this: Observable, v2: Observable export function merge(this: Observable, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; export function merge(this: Observable, ...observables: Array | IScheduler | number>): Observable; export function merge(this: Observable, ...observables: Array | IScheduler | number>): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Creates an output Observable which concurrently emits all values from every diff --git a/src/operator/mergeMap.ts b/src/operator/mergeMap.ts index 89f64936da..ba5078719b 100644 --- a/src/operator/mergeMap.ts +++ b/src/operator/mergeMap.ts @@ -9,7 +9,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; /* tslint:disable:max-line-length */ export function mergeMap(this: Observable, project: (value: T, index: number) => ObservableInput, concurrent?: number): Observable; export function mergeMap(this: Observable, project: (value: T, index: number) => ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, concurrent?: number): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Projects each source value to an Observable which is merged in the output diff --git a/src/operator/mergeMapTo.ts b/src/operator/mergeMapTo.ts index 5fc173e701..125fb26a1a 100644 --- a/src/operator/mergeMapTo.ts +++ b/src/operator/mergeMapTo.ts @@ -10,7 +10,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; /* tslint:disable:max-line-length */ export function mergeMapTo(this: Observable, observable: ObservableInput, concurrent?: number): Observable; export function mergeMapTo(this: Observable, observable: ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R, concurrent?: number): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Projects each source value to the same Observable which is merged multiple diff --git a/src/operator/multicast.ts b/src/operator/multicast.ts index dec29ffe7c..4cbece5292 100644 --- a/src/operator/multicast.ts +++ b/src/operator/multicast.ts @@ -7,7 +7,7 @@ import { ConnectableObservable, connectableObservableDescriptor } from '../obser /* tslint:disable:max-line-length */ export function multicast(this: Observable, subjectOrSubjectFactory: factoryOrValue>): ConnectableObservable; export function multicast(SubjectFactory: (this: Observable) => Subject, selector?: selector): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Returns an Observable that emits the results of invoking a specified selector on items diff --git a/src/operator/onErrorResumeNext.ts b/src/operator/onErrorResumeNext.ts index 81147524a5..e2786fc26d 100644 --- a/src/operator/onErrorResumeNext.ts +++ b/src/operator/onErrorResumeNext.ts @@ -15,7 +15,7 @@ export function onErrorResumeNext(this: Observable, v2: export function onErrorResumeNext(this: Observable, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput): Observable ; export function onErrorResumeNext(this: Observable, ...observables: Array | ((...values: Array) => R)>): Observable; export function onErrorResumeNext(this: Observable, array: ObservableInput[]): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ export function onErrorResumeNext(this: Observable, ...nextSources: Array | Array> | ((...values: Array) => R)>): Observable { diff --git a/src/operator/publish.ts b/src/operator/publish.ts index 17d2b3f52e..c18b6bf7b2 100644 --- a/src/operator/publish.ts +++ b/src/operator/publish.ts @@ -6,7 +6,7 @@ import { ConnectableObservable } from '../observable/ConnectableObservable'; /* tslint:disable:max-line-length */ export function publish(this: Observable): ConnectableObservable; export function publish(this: Observable, selector: selector): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Returns a ConnectableObservable, which is a variety of Observable that waits until its connect method is called diff --git a/src/operator/race.ts b/src/operator/race.ts index 60cef58d1e..a36f3fd83b 100644 --- a/src/operator/race.ts +++ b/src/operator/race.ts @@ -11,7 +11,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; /* tslint:disable:max-line-length */ export function race(this: Observable, ...observables: Array | Array>>): Observable; export function race(this: Observable, ...observables: Array | Array>>): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Returns an Observable that mirrors the first source Observable to emit an item diff --git a/src/operator/reduce.ts b/src/operator/reduce.ts index 72e29490d9..0ff56b891c 100644 --- a/src/operator/reduce.ts +++ b/src/operator/reduce.ts @@ -6,7 +6,7 @@ import { Subscriber } from '../Subscriber'; export function reduce(this: Observable, accumulator: (acc: T, value: T, index: number) => T, seed?: T): Observable; export function reduce(this: Observable, accumulator: (acc: T[], value: T, index: number) => T[], seed?: T[]): Observable; export function reduce(this: Observable, accumulator: (acc: R, value: T, index: number) => R, seed?: R): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Applies an accumulator function over the source Observable, and returns the diff --git a/src/operator/scan.ts b/src/operator/scan.ts index 6c78b0a617..558f87667d 100644 --- a/src/operator/scan.ts +++ b/src/operator/scan.ts @@ -6,7 +6,7 @@ import { Subscriber } from '../Subscriber'; export function scan(this: Observable, accumulator: (acc: T, value: T, index: number) => T, seed?: T): Observable; export function scan(this: Observable, accumulator: (acc: T[], value: T, index: number) => T[], seed?: T[]): Observable; export function scan(this: Observable, accumulator: (acc: R, value: T, index: number) => R, seed?: R): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Applies an accumulator function over the source Observable, and returns each @@ -84,7 +84,8 @@ class ScanSubscriber extends Subscriber { this._seed = value; } - constructor(destination: Subscriber, private accumulator: (acc: R, value: T, index: number) => R, private _seed: T | R, private hasSeed: boolean) { + constructor(destination: Subscriber, private accumulator: (acc: R, value: T, index: number) => R, private _seed: T | R, + private hasSeed: boolean) { super(destination); } diff --git a/src/operator/startWith.ts b/src/operator/startWith.ts index f2c8555e33..55b540c7ee 100644 --- a/src/operator/startWith.ts +++ b/src/operator/startWith.ts @@ -14,7 +14,7 @@ export function startWith(this: Observable, v1: T, v2: T, v3: T, v4: T, sc export function startWith(this: Observable, v1: T, v2: T, v3: T, v4: T, v5: T, scheduler?: IScheduler): Observable; export function startWith(this: Observable, v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, scheduler?: IScheduler): Observable; export function startWith(this: Observable, ...array: Array): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Returns an Observable that emits the items in a specified Iterable before it begins to emit items emitted by the diff --git a/src/operator/switchMap.ts b/src/operator/switchMap.ts index b789f1dc78..96d62641e3 100644 --- a/src/operator/switchMap.ts +++ b/src/operator/switchMap.ts @@ -9,7 +9,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; /* tslint:disable:max-line-length */ export function switchMap(this: Observable, project: (value: T, index: number) => ObservableInput): Observable; export function switchMap(this: Observable, project: (value: T, index: number) => ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Projects each source value to an Observable which is merged in the output diff --git a/src/operator/switchMapTo.ts b/src/operator/switchMapTo.ts index 80bf3f6158..ee65ba41be 100644 --- a/src/operator/switchMapTo.ts +++ b/src/operator/switchMapTo.ts @@ -9,7 +9,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; /* tslint:disable:max-line-length */ export function switchMapTo(this: Observable, observable: ObservableInput): Observable; export function switchMapTo(this: Observable, observable: ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Projects each source value to the same Observable which is flattened multiple diff --git a/src/operator/timeoutWith.ts b/src/operator/timeoutWith.ts index 8cfce014a9..ce9a26bb28 100644 --- a/src/operator/timeoutWith.ts +++ b/src/operator/timeoutWith.ts @@ -12,7 +12,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; /* tslint:disable:max-line-length */ export function timeoutWith(this: Observable, due: number | Date, withObservable: ObservableInput, scheduler?: IScheduler): Observable; export function timeoutWith(this: Observable, due: number | Date, withObservable: ObservableInput, scheduler?: IScheduler): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * @param due diff --git a/src/operator/toPromise.ts b/src/operator/toPromise.ts index de4381e308..3cfe071004 100644 --- a/src/operator/toPromise.ts +++ b/src/operator/toPromise.ts @@ -4,7 +4,7 @@ import { root } from '../util/root'; /* tslint:disable:max-line-length */ export function toPromise(this: Observable): Promise; export function toPromise(this: Observable, PromiseCtor: typeof Promise): Promise; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Converts an Observable sequence to a ES2015 compliant promise. diff --git a/src/operator/withLatestFrom.ts b/src/operator/withLatestFrom.ts index dc4777b66e..7ccd1506e4 100644 --- a/src/operator/withLatestFrom.ts +++ b/src/operator/withLatestFrom.ts @@ -20,7 +20,7 @@ export function withLatestFrom(this: Observable, v2: O export function withLatestFrom(this: Observable, ...observables: Array | ((...values: Array) => R)>): Observable; export function withLatestFrom(this: Observable, array: ObservableInput[]): Observable; export function withLatestFrom(this: Observable, array: ObservableInput[], project: (...values: Array) => R): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * Combines the source Observable with other Observables to create an Observable diff --git a/src/operator/zip.ts b/src/operator/zip.ts index b979956bb5..c02f8c7d30 100644 --- a/src/operator/zip.ts +++ b/src/operator/zip.ts @@ -24,7 +24,7 @@ export function zipProto(this: Observable, v2: Observa export function zipProto(this: Observable, ...observables: Array | ((...values: Array) => R)>): Observable; export function zipProto(this: Observable, array: Array>): Observable; export function zipProto(this: Observable, array: Array>, project: (v1: T, ...values: Array) => R): Observable; -/* tslint:disable:max-line-length */ +/* tslint:enable:max-line-length */ /** * @param observables From 74dd2350776ea352804f69eaed0b490ecd6651a2 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Tue, 14 Feb 2017 11:16:37 -0800 Subject: [PATCH 15/18] fix(mergeAll): introduce variant support for mergeMap - closes #2372 --- src/operator/mergeAll.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/operator/mergeAll.ts b/src/operator/mergeAll.ts index 6d42ef10ae..73fed64cfd 100644 --- a/src/operator/mergeAll.ts +++ b/src/operator/mergeAll.ts @@ -3,8 +3,12 @@ import { Operator } from '../Operator'; import { Observer } from '../Observer'; import { Subscription } from '../Subscription'; import { OuterSubscriber } from '../OuterSubscriber'; +import { Subscribable } from '../Observable'; import { subscribeToResult } from '../util/subscribeToResult'; +export function mergeAll(this: Observable, concurrent?: number): T; +export function mergeAll(this: Observable, concurrent?: number): Subscribable; + /** * Converts a higher-order Observable into a first-order Observable which * concurrently delivers all values that are emitted on the inner Observables. From b1bb0ce446562cccb81ea1137bb2eec6e3d5915d Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Mon, 12 Dec 2016 19:09:29 +0100 Subject: [PATCH 16/18] feat(windowTime): maxWindowSize parameter in windowTime operator Adds new parameter in windowTime operator to control how much values given window can emit. Closes #1301 --- spec/operators/windowTime-spec.ts | 59 +++++++++++++++++---- src/operator/windowTime.ts | 87 ++++++++++++++++++++++++++----- 2 files changed, 122 insertions(+), 24 deletions(-) diff --git a/spec/operators/windowTime-spec.ts b/spec/operators/windowTime-spec.ts index 7709c38660..b8dd9a41df 100644 --- a/spec/operators/windowTime-spec.ts +++ b/spec/operators/windowTime-spec.ts @@ -23,7 +23,7 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '--a--(b|) '); const y = cold( '-d--e| '); const z = cold( '-g--h| '); - const values = { x: x, y: y, z: z }; + const values = { x, y, z }; const result = source.windowTime(50, 100, rxTestScheduler); @@ -31,6 +31,43 @@ describe('Observable.prototype.windowTime', () => { expectSubscriptions(source.subscriptions).toBe(subs); }); + it('should close windows after max count is reached', () => { + const source = hot('--1--2--^--a--b--c--d--e--f--g-----|'); + const subs = '^ !'; + const timeSpan = time( '----------|'); + // 100 frames 0---------1---------2------| + const expected = 'x---------y---------z------|'; + const x = cold( '---a--(b|) '); + const y = cold( '--d--(e|) '); + const z = cold( '-g-----|'); + const values = { x, y, z }; + + const result = source.windowTime(timeSpan, null, 2, rxTestScheduler); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should close window after max count is reached with' + + 'windowCreationInterval', () => { + const source = hot('--1--2--^-a--b--c--de-f---g--h--i-|'); + const subs = '^ !'; + // 100 frames 0---------1---------2-----| + // 50 ----| + // 50 ----| + // 50 ----| + const expected = 'x---------y---------z-----|'; + const x = cold( '--a--(b|) '); + const y = cold( '-de-(f|) '); + const z = cold( '-h--i| '); + const values = { x, y, z }; + + const result = source.windowTime(50, 100, 3, rxTestScheduler); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + it('should emit windows given windowTimeSpan', () => { const source = hot('--1--2--^--a--b--c--d--e--f--g--h--|'); const subs = '^ !'; @@ -40,9 +77,9 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '---a--b--c| '); const y = cold( '--d--e--f-| '); const z = cold( '-g--h--|'); - const values = { x: x, y: y, z: z }; + const values = { x, y, z }; - const result = source.windowTime(timeSpan, null, rxTestScheduler); + const result = source.windowTime(timeSpan, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(source.subscriptions).toBe(subs); @@ -61,7 +98,7 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '---a-| '); const y = cold( '--d--(e|) '); const z = cold( '-g--h| '); - const values = { x: x, y: y, z: z }; + const values = { x, y, z }; const result = source.windowTime(timeSpan, interval, rxTestScheduler); @@ -74,7 +111,7 @@ describe('Observable.prototype.windowTime', () => { const subs = '(^!)'; const expected = '(w|)'; const w = cold('|'); - const expectedValues = { w: w }; + const expectedValues = { w }; const timeSpan = time('-----|'); const interval = time('----------|'); @@ -89,7 +126,7 @@ describe('Observable.prototype.windowTime', () => { const subs = '(^!)'; const expected = '(w|)'; const w = cold('(a|)'); - const expectedValues = { w: w }; + const expectedValues = { w }; const timeSpan = time('-----|'); const interval = time('----------|'); @@ -110,7 +147,7 @@ describe('Observable.prototype.windowTime', () => { const c = cold( '---| '); const d = cold( '--'); const unsub = ' !'; - const expectedValues = { a: a, b: b, c: c, d: d }; + const expectedValues = { a, b, c, d }; const result = source.windowTime(timeSpan, interval, rxTestScheduler); @@ -123,7 +160,7 @@ describe('Observable.prototype.windowTime', () => { const subs = '(^!)'; const expected = '(w#)'; const w = cold('#'); - const expectedValues = { w: w }; + const expectedValues = { w }; const timeSpan = time('-----|'); const interval = time('----------|'); @@ -146,7 +183,7 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '---a-| '); const y = cold( '--d--(e|) '); const z = cold( '-g--h| '); - const values = { x: x, y: y, z: z }; + const values = { x, y, z }; const result = source.windowTime(timeSpan, interval, rxTestScheduler); @@ -168,7 +205,7 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '---a-| '); const y = cold( '-- '); const unsub = ' ! '; - const values = { x: x, y: y }; + const values = { x, y }; const result = source.windowTime(timeSpan, interval, rxTestScheduler); @@ -189,7 +226,7 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '---a-| '); const y = cold( '--d-- '); const unsub = ' ! '; - const values = { x: x, y: y }; + const values = { x, y }; const result = source .mergeMap((x: string) => Observable.of(x)) diff --git a/src/operator/windowTime.ts b/src/operator/windowTime.ts index 3c563ee719..a1f9964a5b 100644 --- a/src/operator/windowTime.ts +++ b/src/operator/windowTime.ts @@ -6,6 +6,8 @@ import { async } from '../scheduler/async'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subscription } from '../Subscription'; +import { isNumeric } from '../util/isNumeric'; +import { isScheduler } from '../util/isScheduler'; /** * Branch out the source Observable values as a nested Observable periodically @@ -24,7 +26,10 @@ import { Subscription } from '../Subscription'; * emits the current window and propagates the notification from the source * Observable. If `windowCreationInterval` is not provided, the output * Observable starts a new window when the previous window of duration - * `windowTimeSpan` completes. + * `windowTimeSpan` completes. If `maxWindowCount` is provided, each window + * will emit at most fixed number of values. Window will complete immediately + * after emitting last value and next one still will open as specified by + * `windowTimeSpan` and `windowCreationInterval` arguments. * * @example In every window of 1 second each, emit at most 2 click events * var clicks = Rx.Observable.fromEvent(document, 'click'); @@ -40,6 +45,12 @@ import { Subscription } from '../Subscription'; * .mergeAll(); // flatten the Observable-of-Observables * result.subscribe(x => console.log(x)); * + * @example Same as example above but with maxWindowCount instead of take + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.windowTime(1000, 5000, 2) // each window has still at most 2 emissions + * .mergeAll(); // flatten the Observable-of-Observables + * result.subscribe(x => console.log(x)); + * @see {@link window} * @see {@link windowCount} * @see {@link windowToggle} @@ -49,6 +60,8 @@ import { Subscription } from '../Subscription'; * @param {number} windowTimeSpan The amount of time to fill each window. * @param {number} [windowCreationInterval] The interval at which to start new * windows. + * @param {number} [maxWindowSize=Number.POSITIVE_INFINITY] Max number of + * values each window can emit before completion. * @param {Scheduler} [scheduler=async] The scheduler on which to schedule the * intervals that determine window boundaries. * @return {Observable>} An observable of windows, which in turn @@ -57,21 +70,52 @@ import { Subscription } from '../Subscription'; * @owner Observable */ export function windowTime(this: Observable, windowTimeSpan: number, - windowCreationInterval: number = null, - scheduler: IScheduler = async): Observable> { - return this.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, scheduler)); + scheduler?: IScheduler): Observable>; +export function windowTime(this: Observable, windowTimeSpan: number, + windowCreationInterval: number, + scheduler?: IScheduler): Observable>; +export function windowTime(this: Observable, windowTimeSpan: number, + windowCreationInterval: number, + maxWindowSize: number, + scheduler?: IScheduler): Observable>; + +export function windowTime(this: Observable, + windowTimeSpan: number): Observable> { + + let scheduler: IScheduler = async; + let windowCreationInterval: number = null; + let maxWindowSize: number = Number.POSITIVE_INFINITY; + + if (isScheduler(arguments[3])) { + scheduler = arguments[3]; + } + + if (isScheduler(arguments[2])) { + scheduler = arguments[2]; + } else if (isNumeric(arguments[2])) { + maxWindowSize = arguments[2]; + } + + if (isScheduler(arguments[1])) { + scheduler = arguments[1]; + } else if (isNumeric(arguments[1])) { + windowCreationInterval = arguments[1]; + } + + return this.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler)); } class WindowTimeOperator implements Operator> { constructor(private windowTimeSpan: number, - private windowCreationInterval: number, + private windowCreationInterval: number | null, + private maxWindowSize: number, private scheduler: IScheduler) { } call(subscriber: Subscriber>, source: any): any { return source.subscribe(new WindowTimeSubscriber( - subscriber, this.windowTimeSpan, this.windowCreationInterval, this.scheduler + subscriber, this.windowTimeSpan, this.windowCreationInterval, this.maxWindowSize, this.scheduler )); } } @@ -84,7 +128,7 @@ interface CreationState { } interface TimeSpanOnlyState { - window: Subject; + window: CountedSubject; windowTimeSpan: number; subscriber: WindowTimeSubscriber; } @@ -96,21 +140,35 @@ interface CloseWindowContext { interface CloseState { subscriber: WindowTimeSubscriber; - window: Subject; + window: CountedSubject; context: CloseWindowContext; } +class CountedSubject extends Subject { + private _numberOfNextedValues: number = 0; + + next(value?: T): void { + this._numberOfNextedValues++; + super.next(value); + } + + get numberOfNextedValues(): number { + return this._numberOfNextedValues; + } +} + /** * We need this JSDoc comment for affecting ESDoc. * @ignore * @extends {Ignored} */ class WindowTimeSubscriber extends Subscriber { - private windows: Array> = []; + private windows: CountedSubject[] = []; constructor(protected destination: Subscriber>, private windowTimeSpan: number, - private windowCreationInterval: number, + private windowCreationInterval: number | null, + private maxWindowSize: number, private scheduler: IScheduler) { super(destination); @@ -133,6 +191,9 @@ class WindowTimeSubscriber extends Subscriber { const window = windows[i]; if (!window.closed) { window.next(value); + if (window.numberOfNextedValues >= this.maxWindowSize) { + this.closeWindow(window); + } } } } @@ -156,15 +217,15 @@ class WindowTimeSubscriber extends Subscriber { this.destination.complete(); } - public openWindow(): Subject { - const window = new Subject(); + public openWindow(): CountedSubject { + const window = new CountedSubject(); this.windows.push(window); const destination = this.destination; destination.next(window); return window; } - public closeWindow(window: Subject): void { + public closeWindow(window: CountedSubject): void { window.complete(); const windows = this.windows; windows.splice(windows.indexOf(window), 1); From 42a886a5034331af0e87cd39bde1246c83d52bce Mon Sep 17 00:00:00 2001 From: Mateusz Podlasin Date: Mon, 6 Feb 2017 06:48:18 +0100 Subject: [PATCH 17/18] docs(ObservableInput): add ObservableInput and SubscribableOrPromise descriptions Add ObservableInput and SubscribableOrPromise interface descriptions, as well as link these interfaces in type descriptions of operators, so that users always know what kind of parameters they can pass to used methods. --- src/MiscJSDoc.ts | 218 ++++++++++++++++++++++++++++++ src/observable/DeferObservable.ts | 2 +- src/observable/combineLatest.ts | 4 +- src/operator/audit.ts | 2 +- src/operator/combineLatest.ts | 2 +- src/operator/concat.ts | 6 +- src/operator/concatMap.ts | 2 +- src/operator/concatMapTo.ts | 2 +- src/operator/debounce.ts | 2 +- src/operator/exhaustMap.ts | 2 +- src/operator/merge.ts | 4 +- src/operator/mergeMap.ts | 2 +- src/operator/mergeMapTo.ts | 2 +- src/operator/switchMap.ts | 2 +- src/operator/switchMapTo.ts | 2 +- src/operator/throttle.ts | 2 +- src/operator/withLatestFrom.ts | 2 +- 17 files changed, 238 insertions(+), 20 deletions(-) diff --git a/src/MiscJSDoc.ts b/src/MiscJSDoc.ts index d9ca775a61..0104579963 100644 --- a/src/MiscJSDoc.ts +++ b/src/MiscJSDoc.ts @@ -128,3 +128,221 @@ export class ObserverDoc { return void 0; } } + +/** + * `SubscribableOrPromise` interface describes values that behave like either + * Observables or Promises. Every operator that accepts arguments annotated + * with this interface, can be also used with parameters that are not necessarily + * RxJS Observables. + * + * Following types of values might be passed to operators expecting this interface: + * + * ## Observable + * + * RxJS {@link Observable} instance. + * + * ## Observable-like (Subscribable) + * + * This might be any object that has `Symbol.observable` method. This method, + * when called, should return object with `subscribe` method on it, which should + * behave the same as RxJS `Observable.subscribe`. + * + * `Symbol.observable` is part of https://github.com/tc39/proposal-observable proposal. + * Since currently it is not supported natively, and every symbol is equal only to itself, + * you should use https://github.com/blesh/symbol-observable polyfill, when implementing + * custom Observable-likes. + * + * **TypeScript Subscribable interface issue** + * + * Although TypeScript interface claims that Subscribable is an object that has `subscribe` + * method declared directly on it, passing custom objects that have `subscribe` + * method but not `Symbol.observable` method will fail at runtime. Conversely, passing + * objects with `Symbol.observable` but without `subscribe` will fail at compile time + * (if you use TypeScript). + * + * TypeScript has problem supporting interfaces with methods defined as symbol + * properties. To get around that, you should implement `subscribe` directly on + * passed object, and make `Symbol.observable` method simply return `this`. That way + * everything will work as expected, and compiler will not complain. If you really + * do not want to put `subscribe` directly on your object, you will have to type cast + * it to `any`, before passing it to an operator. + * + * When this issue is resolved, Subscribable interface will only permit Observable-like + * objects with `Symbol.observable` defined, no matter if they themselves implement + * `subscribe` method or not. + * + * ## ES6 Promise + * + * Promise can be interpreted as Observable that emits value and completes + * when it is resolved or errors when it is rejected. + * + * ## Promise-like (Thenable) + * + * Promises passed to operators do not have to be native ES6 Promises. + * They can be implementations from popular Promise libraries, polyfills + * or even custom ones. They just need to have `then` method that works + * as the same as ES6 Promise `then`. + * + * @example Use merge and then map with non-RxJS observable + * const nonRxJSObservable = { + * subscribe(observer) { + * observer.next(1000); + * observer.complete(); + * }, + * [Symbol.observable]() { + * return this; + * } + * }; + * + * Rx.Observable.merge(nonRxJSObservable) + * .map(value => "This value is " + value) + * .subscribe(result => console.log(result)); // Logs "This value is 1000" + * + * + * @example Use combineLatest with ES6 Promise + * Rx.Observable.combineLatest(Promise.resolve(5), Promise.resolve(10), Promise.resolve(15)) + * .subscribe( + * value => console.log(value), + * err => {}, + * () => console.log('the end!') + * ); + * // Logs + * // [5, 10, 15] + * // "the end!" + * + * + * @interface + * @name SubscribableOrPromise + * @noimport true + */ +export class SubscribableOrPromiseDoc { + +} + +/** + * `ObservableInput` interface describes all values that are either an + * {@link SubscribableOrPromise} or some kind of collection of values that + * can be transformed to Observable emitting that values. Every operator that + * accepts arguments annotated with this interface, can be also used with + * parameters that are not necessarily RxJS Observables. + * + * `ObservableInput` extends {@link SubscribableOrPromise} with following types: + * + * ## Array + * + * Arrays can be interpreted as observables that emit all values in array one by one, + * from left to right, and then complete immediately. + * + * ## Array-like + * + * Arrays passed to operators do not have to be built-in JavaScript Arrays. They + * can be also, for example, `arguments` property available inside every function, + * [DOM NodeList](https://developer.mozilla.org/pl/docs/Web/API/NodeList), + * or, actually, any object that has `length` property (which is a number) + * and stores values under non-negative (zero and up) integers. + * + * ## ES6 Iterable + * + * Operators will accept both built-in and custom ES6 Iterables, by treating them as + * observables that emit all its values in order of iteration and then complete + * when iteration ends. Note that contrary to arrays, Iterables do not have to + * necessarily be finite, so creating Observables that never complete is possible as well. + * + * Note that you can make iterator an instance of Iterable by having it return itself + * in `Symbol.iterator` method. It means that every operator accepting Iterables accepts, + * though indirectly, iterators themselves as well. All native ES6 iterators are instances + * of Iterable by default, so you do not have to implement their `Symbol.iterator` method + * yourself. + * + * **TypeScript Iterable interface issue** + * + * TypeScript `ObservableInput` interface actually lacks type signature for Iterables, + * because of issues it caused in some projects (see [this issue](https://github.com/ReactiveX/rxjs/issues/2306)). + * If you want to use Iterable as argument for operator, cast it to `any` first. + * Remember of course that, because of casting, you have to yourself ensure that passed + * argument really implements said interface. + * + * + * @example Use merge with arrays + * Rx.Observable.merge([1, 2], [4], [5, 6]) + * .subscribe( + * value => console.log(value), + * err => {}, + * () => console.log('ta dam!') + * ); + * + * // Logs + * // 1 + * // 2 + * // 3 + * // 4 + * // 5 + * // 6 + * // "ta dam!" + * + * + * @example Use merge with array-like + * Rx.Observable.merge({0: 1, 1: 2, length: 2}, {0: 3, length: 1}) + * .subscribe( + * value => console.log(value), + * err => {}, + * () => console.log('nice, huh?') + * ); + * + * // Logs + * // 1 + * // 2 + * // 3 + * // "nice, huh?" + * + * @example Use merge with an Iterable (Map) + * const firstMap = new Map([[1, 'a'], [2, 'b']]); + * const secondMap = new Map([[3, 'c'], [4, 'd']]); + * + * Rx.Observable.merge( + * firstMap, // pass Iterable + * secondMap.values() // pass iterator, which is itself an Iterable + * ).subscribe( + * value => console.log(value), + * err => {}, + * () => console.log('yup!') + * ); + * + * // Logs + * // [1, "a"] + * // [2, "b"] + * // "c" + * // "d" + * // "yup!" + * + * @example Use from with generator (returning infinite iterator) + * // infinite stream of incrementing numbers + * const infinite = function* () { + * let i = 0; + * + * while (true) { + * yield i++; + * } + * }; + * + * Rx.Observable.from(infinite()) + * .take(3) // only take 3, cause this is infinite + * .subscribe( + * value => console.log(value), + * err => {}, + * () => console.log('ta dam!') + * ); + * + * // Logs + * // 0 + * // 1 + * // 2 + * // "ta dam!" + * + * @interface + * @name ObservableInput + * @noimport true + */ +export class ObservableInputDoc { + +} diff --git a/src/observable/DeferObservable.ts b/src/observable/DeferObservable.ts index 17bff114c0..7bf6234cc4 100644 --- a/src/observable/DeferObservable.ts +++ b/src/observable/DeferObservable.ts @@ -47,7 +47,7 @@ export class DeferObservable extends Observable { * * @see {@link create} * - * @param {function(): Observable|Promise} observableFactory The Observable + * @param {function(): SubscribableOrPromise} observableFactory The Observable * factory function to invoke for each Observer that subscribes to the output * Observable. May also return a Promise, which will be converted on the fly * to an Observable. diff --git a/src/observable/combineLatest.ts b/src/observable/combineLatest.ts index 2f152c5d5f..daa903d0c7 100644 --- a/src/observable/combineLatest.ts +++ b/src/observable/combineLatest.ts @@ -61,9 +61,9 @@ export function combineLatest(...observables: Array | (( * @see {@link merge} * @see {@link withLatestFrom} * - * @param {Observable} observable1 An input Observable to combine with the + * @param {ObservableInput} observable1 An input Observable to combine with the * source Observable. - * @param {Observable} observable2 An input Observable to combine with the + * @param {ObservableInput} observable2 An input Observable to combine with the * source Observable. More than one input Observables may be given as argument. * @param {function} [project] An optional function to project the values from * the combined latest values into a new value on the output Observable. diff --git a/src/operator/audit.ts b/src/operator/audit.ts index e9caef0991..41a14bd652 100644 --- a/src/operator/audit.ts +++ b/src/operator/audit.ts @@ -40,7 +40,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @see {@link sample} * @see {@link throttle} * - * @param {function(value: T): Observable|Promise} durationSelector A function + * @param {function(value: T): SubscribableOrPromise} durationSelector A function * that receives a value from the source Observable, for computing the silencing * duration, returned as an Observable or a Promise. * @return {Observable} An Observable that performs rate-limiting of diff --git a/src/operator/combineLatest.ts b/src/operator/combineLatest.ts index 317d1cba3b..7025c54791 100644 --- a/src/operator/combineLatest.ts +++ b/src/operator/combineLatest.ts @@ -58,7 +58,7 @@ export function combineLatest(this: Observable, array: Observab * @see {@link merge} * @see {@link withLatestFrom} * - * @param {Observable} other An input Observable to combine with the source + * @param {ObservableInput} other An input Observable to combine with the source * Observable. More than one input Observables may be given as argument. * @param {function} [project] An optional function to project the values from * the combined latest values into a new value on the output Observable. diff --git a/src/operator/concat.ts b/src/operator/concat.ts index 42d348402c..e835ba60fc 100644 --- a/src/operator/concat.ts +++ b/src/operator/concat.ts @@ -55,7 +55,7 @@ export function concat(this: Observable, ...observables: Array(...observables: (ObservableInput | ISche * @see {@link concatMap} * @see {@link concatMapTo} * - * @param {Observable} input1 An input Observable to concatenate with others. - * @param {Observable} input2 An input Observable to concatenate with others. + * @param {ObservableInput} input1 An input Observable to concatenate with others. + * @param {ObservableInput} input2 An input Observable to concatenate with others. * More than one input Observables may be given as argument. * @param {Scheduler} [scheduler=null] An optional IScheduler to schedule each * Observable subscription on. diff --git a/src/operator/concatMap.ts b/src/operator/concatMap.ts index fa1391765b..fc0280bb23 100644 --- a/src/operator/concatMap.ts +++ b/src/operator/concatMap.ts @@ -47,7 +47,7 @@ export function concatMap(this: Observable, project: (value: T, inde * @see {@link mergeMap} * @see {@link switchMap} * - * @param {function(value: T, ?index: number): Observable} project A function + * @param {function(value: T, ?index: number): ObservableInput} project A function * that, when applied to an item emitted by the source Observable, returns an * Observable. * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] diff --git a/src/operator/concatMapTo.ts b/src/operator/concatMapTo.ts index 0d6ddd3a95..649aaf188e 100644 --- a/src/operator/concatMapTo.ts +++ b/src/operator/concatMapTo.ts @@ -46,7 +46,7 @@ export function concatMapTo(this: Observable, observable: Observable * @see {@link mergeMapTo} * @see {@link switchMapTo} * - * @param {Observable} innerObservable An Observable to replace each value from + * @param {ObservableInput} innerObservable An Observable to replace each value from * the source Observable. * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] * A function to produce the value on the output Observable based on the values diff --git a/src/operator/debounce.ts b/src/operator/debounce.ts index 1520fe067f..d19d508d26 100644 --- a/src/operator/debounce.ts +++ b/src/operator/debounce.ts @@ -40,7 +40,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @see {@link delayWhen} * @see {@link throttle} * - * @param {function(value: T): Observable|Promise} durationSelector A function + * @param {function(value: T): SubscribableOrPromise} durationSelector A function * that receives a value from the source Observable, for computing the timeout * duration for each source value, returned as an Observable or a Promise. * @return {Observable} An Observable that delays the emissions of the source diff --git a/src/operator/exhaustMap.ts b/src/operator/exhaustMap.ts index 7fca11d226..15886751ce 100644 --- a/src/operator/exhaustMap.ts +++ b/src/operator/exhaustMap.ts @@ -39,7 +39,7 @@ export function exhaustMap(this: Observable, project: (value: T, ind * @see {@link mergeMap} * @see {@link switchMap} * - * @param {function(value: T, ?index: number): Observable} project A function + * @param {function(value: T, ?index: number): ObservableInput} project A function * that, when applied to an item emitted by the source Observable, returns an * Observable. * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] diff --git a/src/operator/merge.ts b/src/operator/merge.ts index 1345a3d54f..51f9b4ab60 100644 --- a/src/operator/merge.ts +++ b/src/operator/merge.ts @@ -56,7 +56,7 @@ export function merge(this: Observable, ...observables: Array(...observables: (ObservableInput | ISched * @see {@link mergeMapTo} * @see {@link mergeScan} * - * @param {...Observable} observables Input Observables to merge together. + * @param {...ObservableInput} observables Input Observables to merge together. * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input * Observables being subscribed to concurrently. * @param {Scheduler} [scheduler=null] The IScheduler to use for managing diff --git a/src/operator/mergeMap.ts b/src/operator/mergeMap.ts index ba5078719b..44d53906c2 100644 --- a/src/operator/mergeMap.ts +++ b/src/operator/mergeMap.ts @@ -49,7 +49,7 @@ export function mergeMap(this: Observable, project: (value: T, index * @see {@link mergeScan} * @see {@link switchMap} * - * @param {function(value: T, ?index: number): Observable} project A function + * @param {function(value: T, ?index: number): ObservableInput} project A function * that, when applied to an item emitted by the source Observable, returns an * Observable. * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] diff --git a/src/operator/mergeMapTo.ts b/src/operator/mergeMapTo.ts index 125fb26a1a..ced68e9a0b 100644 --- a/src/operator/mergeMapTo.ts +++ b/src/operator/mergeMapTo.ts @@ -37,7 +37,7 @@ export function mergeMapTo(this: Observable, observable: ObservableI * @see {@link mergeScan} * @see {@link switchMapTo} * - * @param {Observable} innerObservable An Observable to replace each value from + * @param {ObservableInput} innerObservable An Observable to replace each value from * the source Observable. * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] * A function to produce the value on the output Observable based on the values diff --git a/src/operator/switchMap.ts b/src/operator/switchMap.ts index 96d62641e3..fd59954437 100644 --- a/src/operator/switchMap.ts +++ b/src/operator/switchMap.ts @@ -40,7 +40,7 @@ export function switchMap(this: Observable, project: (value: T, inde * @see {@link switch} * @see {@link switchMapTo} * - * @param {function(value: T, ?index: number): Observable} project A function + * @param {function(value: T, ?index: number): ObservableInput} project A function * that, when applied to an item emitted by the source Observable, returns an * Observable. * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] diff --git a/src/operator/switchMapTo.ts b/src/operator/switchMapTo.ts index ee65ba41be..7d44126949 100644 --- a/src/operator/switchMapTo.ts +++ b/src/operator/switchMapTo.ts @@ -36,7 +36,7 @@ export function switchMapTo(this: Observable, observable: Observable * @see {@link switchMap} * @see {@link mergeMapTo} * - * @param {Observable} innerObservable An Observable to replace each value from + * @param {ObservableInput} innerObservable An Observable to replace each value from * the source Observable. * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] * A function to produce the value on the output Observable based on the values diff --git a/src/operator/throttle.ts b/src/operator/throttle.ts index fc352ad71b..a3e06488a5 100644 --- a/src/operator/throttle.ts +++ b/src/operator/throttle.ts @@ -37,7 +37,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @see {@link sample} * @see {@link throttleTime} * - * @param {function(value: T): Observable|Promise} durationSelector A function + * @param {function(value: T): SubscribableOrPromise} durationSelector A function * that receives a value from the source Observable, for computing the silencing * duration for each source value, returned as an Observable or a Promise. * @return {Observable} An Observable that performs the throttle operation to diff --git a/src/operator/withLatestFrom.ts b/src/operator/withLatestFrom.ts index 7ccd1506e4..718b831e9b 100644 --- a/src/operator/withLatestFrom.ts +++ b/src/operator/withLatestFrom.ts @@ -47,7 +47,7 @@ export function withLatestFrom(this: Observable, array: ObservableInput * * @see {@link combineLatest} * - * @param {Observable} other An input Observable to combine with the source + * @param {ObservableInput} other An input Observable to combine with the source * Observable. More than one input Observables may be given as argument. * @param {Function} [project] Projection function for combining values * together. Receives all values in order of the Observables passed, where the From 790a469f9feae8e515e6053a464bd3860c286612 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Wed, 15 Feb 2017 15:18:50 -0800 Subject: [PATCH 18/18] fix(timeoutWith): update timeoutWith to work with new Subscriber leak fix changes --- src/operator/timeoutWith.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/operator/timeoutWith.ts b/src/operator/timeoutWith.ts index ce9a26bb28..86b55d2982 100644 --- a/src/operator/timeoutWith.ts +++ b/src/operator/timeoutWith.ts @@ -64,9 +64,7 @@ class TimeoutWithSubscriber extends OuterSubscriber { private static dispatchTimeout(subscriber: TimeoutWithSubscriber): void { const { withObservable } = subscriber; - subscriber.unsubscribe(); - subscriber.closed = false; - subscriber.isStopped = false; + ( subscriber)._unsubscribeAndRecycle(); subscriber.add(subscribeToResult(subscriber, withObservable)); }