Skip to content

Commit

Permalink
fix(node): no more silent failure for many copies of rxjs (#3477)
Browse files Browse the repository at this point in the history
closes #3475
  • Loading branch information
benlesh authored Mar 27, 2018
1 parent 4cbd91c commit 92dcd44
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 8 deletions.
15 changes: 15 additions & 0 deletions node-tests/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
rm -rf node-tests/node_modules/

mkdir -p node-tests/node_modules/rxjs
mkdir -p node-tests/node_modules/rxjs1

cp -R dist/package/. node-tests/node_modules/rxjs
cp -R dist/package/. node-tests/node_modules/rxjs1

{
node node-tests/test.js
# node --inspect-brk node-tests/test.js
} || {
echo 'TEST FAILED'
}

32 changes: 32 additions & 0 deletions node-tests/test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@

var of = require('rxjs/observable/of').of;
var of1 = require('rxjs1/observable/of').of;
var from1 = require('rxjs1/observable/from').from;
var mergeMap = require('rxjs/operators/mergeMap').mergeMap;
var mergeMap1 = require('rxjs1/operators/mergeMap').mergeMap;

var actual = [];
var expected = [1];

var id = setTimeout(function () {
throw new Error('TIMEOUT: Observable did not complete');
}, 200);

of1(0).pipe(
mergeMap1(function (x) { return of(x); }),
mergeMap(function () { return from1(Promise.resolve(1)); })
).subscribe({
next: function (value) { actual.push(value); },
error: function (err) {
console.error(err);
throw new Error('should not error');
},
complete: function () {
if (actual.length !== expected.length || actual[0] !== expected[0] || actual[1] !== expected[1]) {
throw new Error(actual + ' does not equal ' + expected);
} else {
clearTimeout(id);
console.log('TEST PASSED');
}
},
});
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
break;
}
if (typeof destinationOrNext === 'object') {
if (destinationOrNext instanceof Subscriber) {
this.syncErrorThrowable = destinationOrNext.syncErrorThrowable;
this.destination = (<Subscriber<any>> destinationOrNext);
(<any> this.destination).add(this);
// HACK(benlesh): To resolve an issue where Node users may have multiple
// copies of rxjs in their node_modules directory.
if (isTrustedSubscriber(destinationOrNext)) {
const trustedSubscriber = destinationOrNext[rxSubscriberSymbol]() as Subscriber<any>;
this.syncErrorThrowable = trustedSubscriber.syncErrorThrowable;
this.destination = trustedSubscriber;
trustedSubscriber.add(this);
} else {
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
Expand Down Expand Up @@ -277,3 +280,7 @@ class SafeSubscriber<T> extends Subscriber<T> {
_parentSubscriber.unsubscribe();
}
}

function isTrustedSubscriber(obj: any) {
return obj instanceof Subscriber || ('syncErrorThrowable' in obj && obj[rxSubscriberSymbol]);
}

0 comments on commit 92dcd44

Please sign in to comment.