diff --git a/spec/observables/bindCallback-spec.js b/spec/observables/bindCallback-spec.js index c57195303b..e6f3adb85a 100644 --- a/spec/observables/bindCallback-spec.js +++ b/spec/observables/bindCallback-spec.js @@ -1,82 +1,246 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, rxTestScheduler */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; describe('Observable.bindCallback', function () { - it('should emit one value from a callback', function (done) { + describe('when not scheduled', function () { + it('should emit one value from a callback', function () { + function callback(datum, cb) { + cb(datum); + } + var boundCallback = Observable.bindCallback(callback); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + expect(results).toEqual([42, 'done']); + }); + + it('should emit one value chosen by a selector', function () { + function callback(datum, cb) { + cb(datum); + } + var boundCallback = Observable.bindCallback(callback, function (datum) { return datum; }); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + expect(results).toEqual([42, 'done']); + }); + + it('should emit an error when the selector throws', function () { + function callback(cb) { + cb(42); + } + var boundCallback = Observable.bindCallback(callback, function (err) { throw new Error('Yikes!'); }); + var results = []; + + boundCallback() + .subscribe(function () { + throw 'should not next'; + }, function (err) { + results.push(err); + }, function () { + throw 'should not complete'; + }); + + expect(results).toEqual([new Error('Yikes!')]); + }); + + it('should not emit, throw or complete if immediately unsubscribed', function (done) { + var nextSpy = jasmine.createSpy('next'); + var throwSpy = jasmine.createSpy('throw'); + var completeSpy = jasmine.createSpy('complete'); + var timeout; + function callback(datum, cb) { + // Need to cb async in order for the unsub to trigger + timeout = setTimeout(function () { + cb(datum); + }); + } + var subscription = Observable.bindCallback(callback)(42) + .subscribe(nextSpy, throwSpy, completeSpy); + subscription.unsubscribe(); + + setTimeout(function () { + expect(nextSpy).not.toHaveBeenCalled(); + expect(throwSpy).not.toHaveBeenCalled(); + expect(completeSpy).not.toHaveBeenCalled(); + + clearTimeout(timeout); + done(); + }); + }); + }); + + describe('when scheduled', function () { + it('should emit one value from a callback', function () { + function callback(datum, cb) { + cb(datum); + } + var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([42, 'done']); + }); + + it('should error if callback throws', function () { + function callback(datum, cb) { + throw new Error('haha no callback for you'); + } + var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + throw 'should not next'; + }, function (err) { + results.push(err); + }, function () { + throw 'should not complete'; + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([new Error('haha no callback for you')]); + }); + + it('should error if selector throws', function () { + function callback(datum, cb) { + cb(datum); + } + function selector() { + throw new Error('what? a selector? I don\'t think so'); + } + var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + throw 'should not next'; + }, function (err) { + results.push(err); + }, function () { + throw 'should not complete'; + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([new Error('what? a selector? I don\'t think so')]); + }); + + it('should use a selector', function () { + function callback(datum, cb) { + cb(datum); + } + function selector(x) { + return x + '!!!'; + } + var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).toEqual(['42!!!', 'done']); + }); + }); + + it('should pass multiple inner arguments as an array', function () { function callback(datum, cb) { - cb(datum); + cb(datum, 1, 2, 3); } - var boundCallback = Observable.bindCallback(callback); + var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); + var results = []; boundCallback(42) .subscribe(function (x) { - expect(x).toBe(42); - }, function () { - done.fail('should not be called'); - }, - done); + results.push(x); + }, null, function () { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([[42, 1, 2, 3], 'done']); }); - it('should emit one value chosen by a selector', function (done) { + it('should pass multiple inner arguments to the selector if there is one', function () { function callback(datum, cb) { - cb(null, datum); + cb(datum, 1, 2, 3); + } + function selector(a, b, c, d) { + expect([a, b, c, d]).toEqual([42, 1, 2, 3]); + return a + b + c + d; } - var boundCallback = Observable.bindCallback(callback, function (err, datum) { return datum; }); + var boundCallback = Observable.bindCallback(callback, selector, rxTestScheduler); + var results = []; boundCallback(42) .subscribe(function (x) { - expect(x).toBe(42); - }, function () { - done.fail('should not be called'); - }, - done); - }); + results.push(x); + }, null, function () { + results.push('done'); + }); - it('should emit an error when the selector throws', function (done) { - function callback(cb) { - cb(42); - } - var boundCallback = Observable.bindCallback(callback, function (err) { throw new Error('Yikes!'); }); - - boundCallback() - .subscribe(function () { - // Considered a failure if we don't go directly to err handler - done.fail('should not be called'); - }, - function (err) { - expect(err.message).toBe('Yikes!'); - done(); - }, - function () { - // Considered a failure if we don't go directly to err handler - done.fail('should not be called'); - } - ); + rxTestScheduler.flush(); + + expect(results).toEqual([48, 'done']); }); - it('should not emit, throw or complete if immediately unsubscribed', function (done) { - var nextSpy = jasmine.createSpy('next'); - var throwSpy = jasmine.createSpy('throw'); - var completeSpy = jasmine.createSpy('complete'); - var timeout; + it('should cache value for next subscription and not call callbackFunc again', function () { + var calls = 0; function callback(datum, cb) { - // Need to cb async in order for the unsub to trigger - timeout = setTimeout(function () { - cb(datum); - }); + calls++; + cb(datum); } - var subscription = Observable.bindCallback(callback)(42) - .subscribe(nextSpy, throwSpy, completeSpy); - subscription.unsubscribe(); + var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); + var results1 = []; + var results2 = []; - setTimeout(function () { - expect(nextSpy).not.toHaveBeenCalled(); - expect(throwSpy).not.toHaveBeenCalled(); - expect(completeSpy).not.toHaveBeenCalled(); + var source = boundCallback(42); - clearTimeout(timeout); - done(); + source.subscribe(function (x) { + results1.push(x); + }, null, function () { + results1.push('done'); }); + + source.subscribe(function (x) { + results2.push(x); + }, null, function () { + results2.push('done'); + }); + + rxTestScheduler.flush(); + + expect(calls).toBe(1); + expect(results1).toEqual([42, 'done']); + expect(results2).toEqual([42, 'done']); }); }); \ No newline at end of file diff --git a/spec/observables/jasmine-is-weird-spec.js b/spec/observables/jasmine-is-weird-spec.js new file mode 100644 index 0000000000..dec130e968 --- /dev/null +++ b/spec/observables/jasmine-is-weird-spec.js @@ -0,0 +1,38 @@ +/* globals describe, it, expect, rxTestScheduler */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +/** + * I'm starting this file to collect tests that when put in other files break jasmine for + * no apparent reason. It seems like maybe we should move off of jasmine, but moving >1700 tests + * sounds really gross, so I don't want to do that... + */ +describe('jasmine is weird', function () { + describe('bindCallback', function () { + // HACK: If you move this test under the bindCallback-spec.js file, it will arbitrarily + // break one bufferWhen-spec.js test. + it('should not even call the callbackFn if immediately unsubscribed', function () { + var calls = 0; + function callback(datum, cb) { + calls++; + cb(datum); + } + var boundCallback = Observable.bindCallback(callback, null, rxTestScheduler); + var results1 = []; + + var source = boundCallback(42); + + var subscription = source.subscribe(function (x) { + results1.push(x); + }, null, function () { + results1.push('done'); + }); + + subscription.unsubscribe(); + + rxTestScheduler.flush(); + + expect(calls).toBe(0); + }); + }); +}); \ No newline at end of file diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 3a945ff708..86b8fbfea5 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -26,6 +26,7 @@ import './add/operator/combineLatest-static'; import './add/operator/concat-static'; import './add/operator/merge-static'; import './add/observable/bindCallback'; +import './add/observable/defer'; import './add/observable/empty'; import './add/observable/forkJoin'; import './add/observable/from'; diff --git a/src/Rx.ts b/src/Rx.ts index 3a4c6d1d7d..7a4d54176d 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -11,7 +11,8 @@ import {Observable} from './Observable'; import './add/operator/combineLatest-static'; import './add/operator/concat-static'; import './add/operator/merge-static'; -import './observable/bindCallback'; +import './add/observable/bindCallback'; +import './add/observable/defer'; import './add/observable/empty'; import './add/observable/forkJoin'; import './add/observable/from'; diff --git a/src/add/observable/bindCallback.ts b/src/add/observable/bindCallback.ts index 391ec38b28..c47857d8b5 100644 --- a/src/add/observable/bindCallback.ts +++ b/src/add/observable/bindCallback.ts @@ -1,3 +1,3 @@ import {Observable} from '../../Observable'; -import {BoundCallbackObsevable} from '../../observable/bindCallback'; -Observable.bindCallback = BoundCallbackObsevable.create; +import {BoundCallbackObservable} from '../../observable/bindCallback'; +Observable.bindCallback = BoundCallbackObservable.create; diff --git a/src/observable/bindCallback.ts b/src/observable/bindCallback.ts index b25d41990c..534d9fc73c 100644 --- a/src/observable/bindCallback.ts +++ b/src/observable/bindCallback.ts @@ -1,19 +1,17 @@ import {Observable} from '../Observable'; import {Subscriber} from '../Subscriber'; -import {Scheduler} from '../Scheduler'; import {Subscription} from '../Subscription'; -import {immediate} from '../scheduler/immediate'; +import {Scheduler} from '../Scheduler'; import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; +import {AsyncSubject} from '../subject/AsyncSubject'; export class BoundCallbackObservable extends Observable { - - _isScalar: boolean = false; - value: T | T[]; + subject: AsyncSubject; static create(callbackFunc: Function, selector: Function = undefined, - scheduler: Scheduler = immediate): Function { + scheduler?: Scheduler): Function { return (...args): Observable => { return new BoundCallbackObservable(callbackFunc, selector, args, scheduler); }; @@ -22,86 +20,91 @@ export class BoundCallbackObservable extends Observable { constructor(private callbackFunc: Function, private selector, private args: any[], - public scheduler: Scheduler = immediate) { + public scheduler: Scheduler) { super(); } - _subscribe(subscriber: Subscriber) { + _subscribe(subscriber: Subscriber): Subscription { const callbackFunc = this.callbackFunc; - const selector = this.selector; const args = this.args; const scheduler = this.scheduler; + let subject = this.subject; - let handler; - - if (scheduler === immediate) { - if (this._isScalar) { - subscriber.next(this.value); - subscriber.complete(); - } else { - handler = (...innerArgs) => { - let results; - - this._isScalar = true; - this.value = innerArgs; - + if (!scheduler) { + if (!subject) { + subject = this.subject = new AsyncSubject(); + const handler = function handlerFn(...innerArgs) { + const source = (handlerFn).source; + const { selector, subject } = source; if (selector) { - results = tryCatch(selector).apply(this, innerArgs); - if (results === errorObject) { return subscriber.error(results.e); } - subscriber.next(results); - } else { - if (innerArgs.length <= 1) { - subscriber.next(innerArgs[0]); + const result = tryCatch(selector).apply(this, innerArgs); + if (result === errorObject) { + subject.error(errorObject.e); } else { - subscriber.next(innerArgs); + subject.next(result); + subject.complete(); } + } else { + subject.next(innerArgs.length === 1 ? innerArgs[0] : innerArgs); + subject.complete(); } - subscriber.complete(); }; + // use named function instance to avoid closure. + (handler).source = this; + + const result = tryCatch(callbackFunc).apply(this, args.concat(handler)); + if (result === errorObject) { + subject.error(errorObject.e); + } } + return subject.subscribe(subscriber); } else { - const subscription = new Subscription(); - if (this._isScalar) { - const value = this.value; - subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber })); - } else { - handler = (...innerArgs) => { - let results; + subscriber.add(scheduler.schedule(dispatch, 0, { source: this, subscriber })); + return subscriber; + } + } +} - this._isScalar = true; +function dispatch(state: { source: BoundCallbackObservable, subscriber: Subscriber }) { + const { source, subscriber } = state; + const { callbackFunc, args, scheduler } = source; + let subject = source.subject; - if (selector) { - results = tryCatch(selector).apply(this, innerArgs); - if (results === errorObject) { - return subscription.add(scheduler.schedule(dispatchError, 0, { err: results.e, subscriber })); - } - this.value = results; - } else { - if (innerArgs.length <= 1) { - this.value = innerArgs[0]; - } else { - this.value = innerArgs; - } - } - const value = this.value; - subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber })); - }; - return subscription; + if (!subject) { + subject = source.subject = new AsyncSubject(); + + const handler = function handlerFn(...innerArgs) { + const source = (handlerFn).source; + const { selector, subject } = source; + if (selector) { + const result = tryCatch(selector).apply(this, innerArgs); + if (result === errorObject) { + subject.add(scheduler.schedule(dispatchError, 0, { err: errorObject.e, subject })); + } else { + subject.add(scheduler.schedule(dispatchNext, 0, { value: result, subject })); + } + } else { + const value = innerArgs.length === 1 ? innerArgs[0] : innerArgs; + subject.add(scheduler.schedule(dispatchNext, 0, { value, subject })); } - } + }; + // use named function to pass values in without closure + (handler).source = source; - if (handler) { - args.push(handler); - callbackFunc.apply(this, args); + const result = tryCatch(callbackFunc).apply(this, args.concat(handler)); + if (result === errorObject) { + subject.error(errorObject.e); } } + + (this).add(subject.subscribe(subscriber)); } -function dispatchNext({ value, subscriber }) { - subscriber.next(value); - subscriber.complete(); +function dispatchNext({ value, subject }) { + subject.next(value); + subject.complete(); } -function dispatchError({ err, subscriber }) { - subscriber.error(err); +function dispatchError({ err, subject }) { + subject.error(err); }