Skip to content

Commit

Permalink
fix(forkJoin): accpets observables emitting null or undefined
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Feb 19, 2016
1 parent 7e32ade commit 07827ee
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
12 changes: 12 additions & 0 deletions spec/observables/forkJoin-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ describe('Observable.forkJoin', function () {
var expected = '--------------(x|)';

expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});

//Hack - just adding one new test cases in here or either jasmin-is-weird-spec.js,
//one of test breaks under publish-spec.js
var e2 = Observable.forkJoin(
hot('--a--b--c--d--|', { d: null }),
hot('(b|)'),
hot('--1--2--3--|'),
hot('-----r--t--u--|', { u: undefined })
);
var expected2 = '--------------(x|)';

expectObservable(e2).toBe(expected2, {x: [null, 'b', '3', undefined]});
});

it('should join the last values of the provided observables with selector', function () {
Expand Down
35 changes: 18 additions & 17 deletions src/observable/ForkJoinObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ export class ForkJoinObservable<T> extends Observable<T> {
const sources = this.sources;
const len = sources.length;

const context = { completed: 0, total: len, values: emptyArray(len), selector: this.resultSelector };
const context = { completed: 0,
total: len,
values: new Array(len),
haveValues: new Array(len),
selector: this.resultSelector };

for (let i = 0; i < len; i++) {
let source = sources[i];
if (isPromise(source)) {
Expand All @@ -52,39 +57,43 @@ export class ForkJoinObservable<T> extends Observable<T> {
}

class AllSubscriber<T> extends Subscriber<T> {
private _value: T = null;

constructor(destination: Subscriber<any>,
private index: number,
private context: { completed: number,
total: number,
values: any[],
haveValues: any[],
selector: (...values: Array<any>) => any }) {
super(destination);
}

protected _next(value: T): void {
this._value = value;
const context = this.context;
const index = this.index;

context.values[index] = value;
context.haveValues[index] = true;
}

protected _complete(): void {
const destination = this.destination;
const context = this.context;

if (this._value == null) {
if (!context.haveValues[this.index]) {
destination.complete();
}

const context = this.context;
context.completed++;
context.values[this.index] = this._value;

const values = context.values;

if (context.completed !== values.length) {
return;
}

if (values.every(hasValue)) {
let value = context.selector ? context.selector.apply(this, values) :
if (context.haveValues.every(hasValue)) {
const value = context.selector ? context.selector.apply(this, values) :
values;
destination.next(value);
}
Expand All @@ -94,13 +103,5 @@ class AllSubscriber<T> extends Subscriber<T> {
}

function hasValue(x: any): boolean {
return x !== null;
}

function emptyArray(len: number): any[] {
let arr: any[] = [];
for (let i = 0; i < len; i++) {
arr.push(null);
}
return arr;
return x === true;
}

0 comments on commit 07827ee

Please sign in to comment.