From 8637d47eaa8547c3422ea33d480abc5299fb7873 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 3 Dec 2015 14:37:34 -0800 Subject: [PATCH] fix(bindCallback): only call function once even while scheduled The previous implementation had issues with only calling the source function one time when scheduled. Since bindCallback shares its result with all subscribers, it is multicast, that means that it would need to maintain a list of subscribers internally. In leiu of that, I'm using AsyncSubject (which might need a better name) closes #881 --- spec/observables/bindCallback-spec.js | 276 +++++++++++++++++----- spec/observables/jasmine-is-weird-spec.js | 38 +++ src/Rx.KitchenSink.ts | 1 + src/Rx.ts | 3 +- src/add/observable/bindCallback.ts | 4 +- src/observable/bindCallback.ts | 131 +++++----- 6 files changed, 330 insertions(+), 123 deletions(-) create mode 100644 spec/observables/jasmine-is-weird-spec.js diff --git a/spec/observables/bindCallback-spec.js b/spec/observables/bindCallback-spec.js index c57195303b..e6f3adb85a 100644 --- a/spec/observables/bindCallback-spec.js +++ b/spec/observables/bindCallback-spec.js @@ -1,82 +1,246 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, rxTestScheduler */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; describe('Observable.bindCallback', function () { - it('should emit one value from a callback', function (done) { + describe('when not scheduled', function () { + it('should emit one value from a callback', function () { + function callback(datum, cb) { + cb(datum); + } + var boundCallback = Observable.bindCallback(callback); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + expect(results).toEqual([42, 'done']); + }); + + it('should emit one value chosen by a selector', function () { + function callback(datum, cb) { + cb(datum); + } + var boundCallback = Observable.bindCallback(callback, function (datum) { return datum; }); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + expect(results).toEqual([42, 'done']); + }); + + it('should emit an error when the selector throws', function () { + function callback(cb) { + cb(42); + } + var boundCallback = Observable.bindCallback(callback, function (err) { throw new Error('Yikes!'); }); + var results = []; + + boundCallback() + .subscribe(function () { + throw 'should not next'; + }, function (err) { + results.push(err); + }, function () { + throw 'should not complete'; + }); + + expect(results).toEqual([new Error('Yikes!')]); + }); + + it('should not emit, throw or complete if immediately unsubscribed', function (done) { + var nextSpy = jasmine.createSpy('next'); + var throwSpy = jasmine.createSpy('throw'); + var completeSpy = jasmine.createSpy('complete'); + var timeout; + function callback(datum, cb) { + // Need to cb async in order for the unsub to trigger + timeout = setTimeout(function () { + cb(datum); + }); + } + var subscription = Observable.bindCallback(callback)(42) + .subscribe(nextSpy, throwSpy, completeSpy); + subscription.unsubscribe(); + + setTimeout(function () { + expect(nextSpy).not.toHaveBeenCalled(); + expect(throwSpy).not.toHaveBeenCalled(); + expect(completeSpy).not.toHaveBeenCalled(); + + clearTimeout(timeout); + done(); + }); + }); + }); + + describe('when scheduled', function () { + it('should emit one value from a callback', function () { + function callback(datum, cb) { + cb(datum); + } + var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([42, 'done']); + }); + + it('should error if callback throws', function () { + function callback(datum, cb) { + throw new Error('haha no callback for you'); + } + var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + throw 'should not next'; + }, function (err) { + results.push(err); + }, function () { + throw 'should not complete'; + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([new Error('haha no callback for you')]); + }); + + it('should error if selector throws', function () { + function callback(datum, cb) { + cb(datum); + } + function selector() { + throw new Error('what? a selector? I don\'t think so'); + } + var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + throw 'should not next'; + }, function (err) { + results.push(err); + }, function () { + throw 'should not complete'; + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([new Error('what? a selector? I don\'t think so')]); + }); + + it('should use a selector', function () { + function callback(datum, cb) { + cb(datum); + } + function selector(x) { + return x + '!!!'; + } + var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).toEqual(['42!!!', 'done']); + }); + }); + + it('should pass multiple inner arguments as an array', function () { function callback(datum, cb) { - cb(datum); + cb(datum, 1, 2, 3); } - var boundCallback = Observable.bindCallback(callback); + var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); + var results = []; boundCallback(42) .subscribe(function (x) { - expect(x).toBe(42); - }, function () { - done.fail('should not be called'); - }, - done); + results.push(x); + }, null, function () { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([[42, 1, 2, 3], 'done']); }); - it('should emit one value chosen by a selector', function (done) { + it('should pass multiple inner arguments to the selector if there is one', function () { function callback(datum, cb) { - cb(null, datum); + cb(datum, 1, 2, 3); + } + function selector(a, b, c, d) { + expect([a, b, c, d]).toEqual([42, 1, 2, 3]); + return a + b + c + d; } - var boundCallback = Observable.bindCallback(callback, function (err, datum) { return datum; }); + var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler); + var results = []; boundCallback(42) .subscribe(function (x) { - expect(x).toBe(42); - }, function () { - done.fail('should not be called'); - }, - done); - }); + results.push(x); + }, null, function () { + results.push('done'); + }); - it('should emit an error when the selector throws', function (done) { - function callback(cb) { - cb(42); - } - var boundCallback = Observable.bindCallback(callback, function (err) { throw new Error('Yikes!'); }); - - boundCallback() - .subscribe(function () { - // Considered a failure if we don't go directly to err handler - done.fail('should not be called'); - }, - function (err) { - expect(err.message).toBe('Yikes!'); - done(); - }, - function () { - // Considered a failure if we don't go directly to err handler - done.fail('should not be called'); - } - ); + rxTestScheduler.flush(); + + expect(results).toEqual([48, 'done']); }); - it('should not emit, throw or complete if immediately unsubscribed', function (done) { - var nextSpy = jasmine.createSpy('next'); - var throwSpy = jasmine.createSpy('throw'); - var completeSpy = jasmine.createSpy('complete'); - var timeout; + it('should cache value for next subscription and not call callbackFunc again', function () { + var calls = 0; function callback(datum, cb) { - // Need to cb async in order for the unsub to trigger - timeout = setTimeout(function () { - cb(datum); - }); + calls++; + cb(datum); } - var subscription = Observable.bindCallback(callback)(42) - .subscribe(nextSpy, throwSpy, completeSpy); - subscription.unsubscribe(); + var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); + var results1 = []; + var results2 = []; - setTimeout(function () { - expect(nextSpy).not.toHaveBeenCalled(); - expect(throwSpy).not.toHaveBeenCalled(); - expect(completeSpy).not.toHaveBeenCalled(); + var source = boundCallback(42); - clearTimeout(timeout); - done(); + source.subscribe(function (x) { + results1.push(x); + }, null, function () { + results1.push('done'); }); + + source.subscribe(function (x) { + results2.push(x); + }, null, function () { + results2.push('done'); + }); + + rxTestScheduler.flush(); + + expect(calls).toBe(1); + expect(results1).toEqual([42, 'done']); + expect(results2).toEqual([42, 'done']); }); }); \ No newline at end of file diff --git a/spec/observables/jasmine-is-weird-spec.js b/spec/observables/jasmine-is-weird-spec.js new file mode 100644 index 0000000000..dec130e968 --- /dev/null +++ b/spec/observables/jasmine-is-weird-spec.js @@ -0,0 +1,38 @@ +/* globals describe, it, expect, rxTestScheduler */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +/** + * I'm starting this file to collect tests that when put in other files break jasmine for + * no apparent reason. It seems like maybe we should move off of jasmine, but moving >1700 tests + * sounds really gross, so I don't want to do that... + */ +describe('jasmine is weird', function () { + describe('bindCallback', function () { + // HACK: If you move this test under the bindCallback-spec.js file, it will arbitrarily + // break one bufferWhen-spec.js test. + it('should not even call the callbackFn if immediately unsubscribed', function () { + var calls = 0; + function callback(datum, cb) { + calls++; + cb(datum); + } + var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); + var results1 = []; + + var source = boundCallback(42); + + var subscription = source.subscribe(function (x) { + results1.push(x); + }, null, function () { + results1.push('done'); + }); + + subscription.unsubscribe(); + + rxTestScheduler.flush(); + + expect(calls).toBe(0); + }); + }); +}); \ No newline at end of file diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 3a945ff708..86b8fbfea5 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -26,6 +26,7 @@ import './add/operator/combineLatest-static'; import './add/operator/concat-static'; import './add/operator/merge-static'; import './add/observable/bindCallback'; +import './add/observable/defer'; import './add/observable/empty'; import './add/observable/forkJoin'; import './add/observable/from'; diff --git a/src/Rx.ts b/src/Rx.ts index 3a4c6d1d7d..7a4d54176d 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -11,7 +11,8 @@ import {Observable} from './Observable'; import './add/operator/combineLatest-static'; import './add/operator/concat-static'; import './add/operator/merge-static'; -import './observable/bindCallback'; +import './add/observable/bindCallback'; +import './add/observable/defer'; import './add/observable/empty'; import './add/observable/forkJoin'; import './add/observable/from'; diff --git a/src/add/observable/bindCallback.ts b/src/add/observable/bindCallback.ts index 391ec38b28..c47857d8b5 100644 --- a/src/add/observable/bindCallback.ts +++ b/src/add/observable/bindCallback.ts @@ -1,3 +1,3 @@ import {Observable} from '../../Observable'; -import {BoundCallbackObsevable} from '../../observable/bindCallback'; -Observable.bindCallback = BoundCallbackObsevable.create; +import {BoundCallbackObservable} from '../../observable/bindCallback'; +Observable.bindCallback = BoundCallbackObservable.create; diff --git a/src/observable/bindCallback.ts b/src/observable/bindCallback.ts index b25d41990c..534d9fc73c 100644 --- a/src/observable/bindCallback.ts +++ b/src/observable/bindCallback.ts @@ -1,19 +1,17 @@ import {Observable} from '../Observable'; import {Subscriber} from '../Subscriber'; -import {Scheduler} from '../Scheduler'; import {Subscription} from '../Subscription'; -import {immediate} from '../scheduler/immediate'; +import {Scheduler} from '../Scheduler'; import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; +import {AsyncSubject} from '../subject/AsyncSubject'; export class BoundCallbackObservable extends Observable { - - _isScalar: boolean = false; - value: T | T[]; + subject: AsyncSubject; static create(callbackFunc: Function, selector: Function = undefined, - scheduler: Scheduler = immediate): Function { + scheduler?: Scheduler): Function { return (...args): Observable => { return new BoundCallbackObservable(callbackFunc, selector, args, scheduler); }; @@ -22,86 +20,91 @@ export class BoundCallbackObservable extends Observable { constructor(private callbackFunc: Function, private selector, private args: any[], - public scheduler: Scheduler = immediate) { + public scheduler: Scheduler) { super(); } - _subscribe(subscriber: Subscriber) { + _subscribe(subscriber: Subscriber): Subscription { const callbackFunc = this.callbackFunc; - const selector = this.selector; const args = this.args; const scheduler = this.scheduler; + let subject = this.subject; - let handler; - - if (scheduler === immediate) { - if (this._isScalar) { - subscriber.next(this.value); - subscriber.complete(); - } else { - handler = (...innerArgs) => { - let results; - - this._isScalar = true; - this.value = innerArgs; - + if (!scheduler) { + if (!subject) { + subject = this.subject = new AsyncSubject(); + const handler = function handlerFn(...innerArgs) { + const source = (handlerFn).source; + const { selector, subject } = source; if (selector) { - results = tryCatch(selector).apply(this, innerArgs); - if (results === errorObject) { return subscriber.error(results.e); } - subscriber.next(results); - } else { - if (innerArgs.length <= 1) { - subscriber.next(innerArgs[0]); + const result = tryCatch(selector).apply(this, innerArgs); + if (result === errorObject) { + subject.error(errorObject.e); } else { - subscriber.next(innerArgs); + subject.next(result); + subject.complete(); } + } else { + subject.next(innerArgs.length === 1 ? innerArgs[0] : innerArgs); + subject.complete(); } - subscriber.complete(); }; + // use named function instance to avoid closure. + (handler).source = this; + + const result = tryCatch(callbackFunc).apply(this, args.concat(handler)); + if (result === errorObject) { + subject.error(errorObject.e); + } } + return subject.subscribe(subscriber); } else { - const subscription = new Subscription(); - if (this._isScalar) { - const value = this.value; - subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber })); - } else { - handler = (...innerArgs) => { - let results; + subscriber.add(scheduler.schedule(dispatch, 0, { source: this, subscriber })); + return subscriber; + } + } +} - this._isScalar = true; +function dispatch(state: { source: BoundCallbackObservable, subscriber: Subscriber }) { + const { source, subscriber } = state; + const { callbackFunc, args, scheduler } = source; + let subject = source.subject; - if (selector) { - results = tryCatch(selector).apply(this, innerArgs); - if (results === errorObject) { - return subscription.add(scheduler.schedule(dispatchError, 0, { err: results.e, subscriber })); - } - this.value = results; - } else { - if (innerArgs.length <= 1) { - this.value = innerArgs[0]; - } else { - this.value = innerArgs; - } - } - const value = this.value; - subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber })); - }; - return subscription; + if (!subject) { + subject = source.subject = new AsyncSubject(); + + const handler = function handlerFn(...innerArgs) { + const source = (handlerFn).source; + const { selector, subject } = source; + if (selector) { + const result = tryCatch(selector).apply(this, innerArgs); + if (result === errorObject) { + subject.add(scheduler.schedule(dispatchError, 0, { err: errorObject.e, subject })); + } else { + subject.add(scheduler.schedule(dispatchNext, 0, { value: result, subject })); + } + } else { + const value = innerArgs.length === 1 ? innerArgs[0] : innerArgs; + subject.add(scheduler.schedule(dispatchNext, 0, { value, subject })); } - } + }; + // use named function to pass values in without closure + (handler).source = source; - if (handler) { - args.push(handler); - callbackFunc.apply(this, args); + const result = tryCatch(callbackFunc).apply(this, args.concat(handler)); + if (result === errorObject) { + subject.error(errorObject.e); } } + + (this).add(subject.subscribe(subscriber)); } -function dispatchNext({ value, subscriber }) { - subscriber.next(value); - subscriber.complete(); +function dispatchNext({ value, subject }) { + subject.next(value); + subject.complete(); } -function dispatchError({ err, subscriber }) { - subscriber.error(err); +function dispatchError({ err, subject }) { + subject.error(err); }