diff --git a/MIGRATION.md b/MIGRATION.md index 744d57fc33..e76b0cd1fd 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -59,6 +59,8 @@ enabling "composite" subscription behavior. |`flatMapFirst`|`exhaustMap`| |`flatMapLatest`|`switchMap`| |`flatMapWithMaxConcurrent`|`mergeMap` or `flatMap`(alias)| +|`fromCallback`|`bindCallback`| +|`fromNodeCallback`|`bindNodeCallback`| |`publishValue`|`publishBehavior`| |`replay`|`publishReplay`| |`select`|`map`| diff --git a/spec/observables/bindNodeCallback-spec.js b/spec/observables/bindNodeCallback-spec.js new file mode 100644 index 0000000000..e12ecbb988 --- /dev/null +++ b/spec/observables/bindNodeCallback-spec.js @@ -0,0 +1,292 @@ +/* globals describe, it, expect, rxTestScheduler */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.bindNodeCallback', function () { + describe('when not scheduled', function () { + it('should emit one value from a callback', function () { + function callback(datum, cb) { + cb(null, datum); + } + var boundCallback = Observable.bindNodeCallback(callback); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + expect(results).toEqual([42, 'done']); + }); + + it('should emit one value chosen by a selector', function () { + function callback(datum, cb) { + cb(null, datum); + } + var boundCallback = Observable.bindNodeCallback(callback, function (datum) { return datum; }); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + expect(results).toEqual([42, 'done']); + }); + + it('should raise error from callback', function () { + var error = new Error(); + + function callback(cb) { + cb(error); + } + + var boundCallback = Observable.bindNodeCallback(callback); + var results = []; + + boundCallback() + .subscribe(function () { + throw 'should not next'; + }, function (err) { + results.push(err); + }, function () { + throw 'should not complete'; + }); + + expect(results).toEqual([error]); + }); + + it('should emit an error when the selector throws', function () { + function callback(cb) { + cb(null, 42); + } + var boundCallback = Observable.bindNodeCallback(callback, function (err) { throw new Error('Yikes!'); }); + var results = []; + + boundCallback() + .subscribe(function () { + throw 'should not next'; + }, function (err) { + results.push(err); + }, function () { + throw 'should not complete'; + }); + + expect(results).toEqual([new Error('Yikes!')]); + }); + + it('should not emit, throw or complete if immediately unsubscribed', function (done) { + var nextSpy = jasmine.createSpy('next'); + var throwSpy = jasmine.createSpy('throw'); + var completeSpy = jasmine.createSpy('complete'); + var timeout; + function callback(datum, cb) { + // Need to cb async in order for the unsub to trigger + timeout = setTimeout(function () { + cb(null, datum); + }); + } + var subscription = Observable.bindNodeCallback(callback)(42) + .subscribe(nextSpy, throwSpy, completeSpy); + subscription.unsubscribe(); + + setTimeout(function () { + expect(nextSpy).not.toHaveBeenCalled(); + expect(throwSpy).not.toHaveBeenCalled(); + expect(completeSpy).not.toHaveBeenCalled(); + + clearTimeout(timeout); + done(); + }); + }); + }); + + describe('when scheduled', function () { + it('should emit one value from a callback', function () { + function callback(datum, cb) { + cb(null, datum); + } + var boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([42, 'done']); + }); + + it('should error if callback throws', function () { + function callback(datum, cb) { + throw new Error('haha no callback for you'); + } + var boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + throw 'should not next'; + }, function (err) { + results.push(err); + }, function () { + throw 'should not complete'; + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([new Error('haha no callback for you')]); + }); + + it('should raise error from callback', function () { + var error = new Error(); + + function callback(cb) { + cb(error); + } + + var boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); + var results = []; + + boundCallback() + .subscribe(function () { + throw 'should not next'; + }, function (err) { + results.push(err); + }, function () { + throw 'should not complete'; + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([error]); + }); + + it('should error if selector throws', function () { + function callback(datum, cb) { + cb(null, datum); + } + function selector() { + throw new Error('what? a selector? I don\'t think so'); + } + var boundCallback = Observable.bindNodeCallback(callback, selector, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + throw 'should not next'; + }, function (err) { + results.push(err); + }, function () { + throw 'should not complete'; + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([new Error('what? a selector? I don\'t think so')]); + }); + + it('should use a selector', function () { + function callback(datum, cb) { + cb(null, datum); + } + function selector(x) { + return x + '!!!'; + } + var boundCallback = Observable.bindNodeCallback(callback, selector, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).toEqual(['42!!!', 'done']); + }); + }); + + it('should pass multiple inner arguments as an array', function () { + function callback(datum, cb) { + cb(null, datum, 1, 2, 3); + } + var boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([[42, 1, 2, 3], 'done']); + }); + + it('should pass multiple inner arguments to the selector if there is one', function () { + function callback(datum, cb) { + cb(null, datum, 1, 2, 3); + } + function selector(a, b, c, d) { + expect([a, b, c, d]).toEqual([42, 1, 2, 3]); + return a + b + c + d; + } + var boundCallback = Observable.bindNodeCallback(callback, selector, rxTestScheduler); + var results = []; + + boundCallback(42) + .subscribe(function (x) { + results.push(x); + }, null, function () { + results.push('done'); + }); + + rxTestScheduler.flush(); + + expect(results).toEqual([48, 'done']); + }); + + it('should cache value for next subscription and not call callbackFunc again', function () { + var calls = 0; + function callback(datum, cb) { + calls++; + cb(null, datum); + } + var boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler); + var results1 = []; + var results2 = []; + + var source = boundCallback(42); + + source.subscribe(function (x) { + results1.push(x); + }, null, function () { + results1.push('done'); + }); + + source.subscribe(function (x) { + results2.push(x); + }, null, function () { + results2.push('done'); + }); + + rxTestScheduler.flush(); + + expect(calls).toBe(1); + expect(results1).toEqual([42, 'done']); + expect(results2).toEqual([42, 'done']); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index f592b5704d..6440bdd436 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -17,6 +17,7 @@ import {concat as concatStatic} from './operator/concat-static'; import {merge as mergeStatic} from './operator/merge-static'; import {zip as zipStatic} from './operator/zip-static'; import {BoundCallbackObservable} from './observable/bindCallback'; +import {BoundNodeCallbackObservable} from './observable/bindNodeCallback'; import {DeferObservable} from './observable/defer'; import {EmptyObservable} from './observable/empty'; import {ForkJoinObservable} from './observable/forkJoin'; @@ -165,6 +166,7 @@ export class Observable implements CoreOperators { // static method stubs static ajax: AjaxCreationMethod; static bindCallback: typeof BoundCallbackObservable.create; + static bindNodeCallback: typeof BoundNodeCallbackObservable.create; static combineLatest: typeof combineLatestStatic; static concat: typeof concatStatic; static defer: typeof DeferObservable.create; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index e7347d80a8..9c9d9f1fef 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -28,6 +28,7 @@ import './add/operator/concat-static'; import './add/operator/merge-static'; import './add/operator/race-static'; import './add/observable/bindCallback'; +import './add/observable/bindNodeCallback'; import './add/observable/defer'; import './add/observable/empty'; import './add/observable/forkJoin'; diff --git a/src/Rx.ts b/src/Rx.ts index 0c3650d689..323eebac4f 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -13,6 +13,7 @@ import './add/operator/concat-static'; import './add/operator/merge-static'; import './add/operator/race-static'; import './add/observable/bindCallback'; +import './add/observable/bindNodeCallback'; import './add/observable/defer'; import './add/observable/empty'; import './add/observable/forkJoin'; diff --git a/src/add/observable/bindNodeCallback.ts b/src/add/observable/bindNodeCallback.ts new file mode 100644 index 0000000000..4de0453d1e --- /dev/null +++ b/src/add/observable/bindNodeCallback.ts @@ -0,0 +1,5 @@ +import {Observable} from '../../Observable'; +import {BoundNodeCallbackObservable} from '../../observable/bindNodeCallback'; +Observable.bindNodeCallback = BoundNodeCallbackObservable.create; + +export var _void: void; \ No newline at end of file diff --git a/src/observable/bindNodeCallback.ts b/src/observable/bindNodeCallback.ts new file mode 100644 index 0000000000..085c681b7f --- /dev/null +++ b/src/observable/bindNodeCallback.ts @@ -0,0 +1,118 @@ +import {Observable} from '../Observable'; +import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; +import {Scheduler} from '../Scheduler'; +import {tryCatch} from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import {AsyncSubject} from '../subject/AsyncSubject'; + +export class BoundNodeCallbackObservable extends Observable { + subject: AsyncSubject; + + static create(callbackFunc: Function, + selector: Function = undefined, + scheduler?: Scheduler): Function { + return (...args): Observable => { + return new BoundNodeCallbackObservable(callbackFunc, selector, args, scheduler); + }; + } + + constructor(private callbackFunc: Function, + private selector, + private args: any[], + public scheduler: Scheduler) { + super(); + } + + _subscribe(subscriber: Subscriber): Subscription { + const callbackFunc = this.callbackFunc; + const args = this.args; + const scheduler = this.scheduler; + let subject = this.subject; + + if (!scheduler) { + if (!subject) { + subject = this.subject = new AsyncSubject(); + const handler = function handlerFn(...innerArgs) { + const source = (handlerFn).source; + const { selector, subject } = source; + const err = innerArgs.shift(); + + if (err) { + subject.error(err); + } else if (selector) { + const result = tryCatch(selector).apply(this, innerArgs); + if (result === errorObject) { + subject.error(errorObject.e); + } else { + subject.next(result); + subject.complete(); + } + } else { + subject.next(innerArgs.length === 1 ? innerArgs[0] : innerArgs); + subject.complete(); + } + }; + // use named function instance to avoid closure. + (handler).source = this; + + const result = tryCatch(callbackFunc).apply(this, args.concat(handler)); + if (result === errorObject) { + subject.error(errorObject.e); + } + } + return subject.subscribe(subscriber); + } else { + return scheduler.schedule(dispatch, 0, { source: this, subscriber }); + } + } +} + +function dispatch(state: { source: BoundNodeCallbackObservable, subscriber: Subscriber }) { + const self = ( this); + const { source, subscriber } = state; + const { callbackFunc, args, scheduler } = source; + let subject = source.subject; + + if (!subject) { + subject = source.subject = new AsyncSubject(); + + const handler = function handlerFn(...innerArgs) { + const source = (handlerFn).source; + const { selector, subject } = source; + const err = innerArgs.shift(); + + if (err) { + subject.error(err); + } else if (selector) { + const result = tryCatch(selector).apply(this, innerArgs); + if (result === errorObject) { + self.add(scheduler.schedule(dispatchError, 0, { err: errorObject.e, subject })); + } else { + self.add(scheduler.schedule(dispatchNext, 0, { value: result, subject })); + } + } else { + const value = innerArgs.length === 1 ? innerArgs[0] : innerArgs; + self.add(scheduler.schedule(dispatchNext, 0, { value, subject })); + } + }; + // use named function to pass values in without closure + (handler).source = source; + + const result = tryCatch(callbackFunc).apply(this, args.concat(handler)); + if (result === errorObject) { + subject.error(errorObject.e); + } + } + + self.add(subject.subscribe(subscriber)); +} + +function dispatchNext({ value, subject }) { + subject.next(value); + subject.complete(); +} + +function dispatchError({ err, subject }) { + subject.error(err); +}